1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325
// Copyright 2020 Kodebox, Inc. // This file is part of CodeChain. // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as // published by the Free Software Foundation, either version 3 of the // License, or (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see <https://www.gnu.org/licenses/>. use crate::packet::{PacketView, SlotType}; use crate::port::{client::Client, server::Server, BasicPort, Port}; use crate::transport::multiplex::{self, ForwardResult, MultiplexResult, Multiplexer}; use crate::transport::{TransportRecv, TransportSend}; use crate::{raw_exchange::*, Service, ServiceToExport, ServiceToImport}; use parking_lot::Mutex; use std::sync::{Arc, Barrier, Weak}; use threadpool::ThreadPool; mod meta_service { use super::*; /// This is required because of macro use crate as remote_trait_object; #[remote_trait_object_macro::service] pub trait MetaService: Service { fn firm_close(&self); } pub struct MetaServiceImpl { barrier: Arc<Barrier>, } impl MetaServiceImpl { pub fn new(barrier: Arc<Barrier>) -> Self { Self { barrier, } } } impl Service for MetaServiceImpl {} impl MetaService for MetaServiceImpl { fn firm_close(&self) { self.barrier.wait(); } } } use meta_service::{MetaService, MetaServiceImpl}; /// A configuration of a `remote-trait-object` context. #[derive(Clone, Debug)] pub struct Config { /// A name that will be appended to the names of various threads spawned by `remote-trait-object`, for an easy debug. /// /// This can be helpful if you handle multiple contexts of `remote-trait-object`. pub name: String, /// Number of the maximum of concurrent calls. /// /// Value of this doesn't have anything to do with the number of threads that would be spawned. /// Having a large number of this wouldn't charge any cost except really small additional memory allocation. /// /// [`server_threads`]: ./struct.Config.html#field.server_threads pub call_slots: usize, /// A timeout for a remote method call. /// /// All remote method invocations through your proxy object and delete requests (that happens when you drop a proxy object) /// will have this timeout. If it exceeds, it will cause an error. /// /// Use `None` for to wait indefinitely. pub call_timeout: Option<std::time::Duration>, /// A maximum number of services that this context can export. pub maximum_services_num: usize, /// A shared instance of a thread pool that will be used in call handling /// /// A `remote-trait-object` context will use this thread pool to handle an incoming method call. /// Size of this pool determines the maximum number of concurrent calls that the context can handle. /// Note that this pool is wrapped in `Arc`, which means that it can be possibly shared with other places. pub thread_pool: Arc<Mutex<threadpool::ThreadPool>>, } impl Config { pub fn default_setup() -> Self { Self { name: "my rto".to_owned(), call_slots: 512, maximum_services_num: 65536, call_timeout: Some(std::time::Duration::from_millis(1000)), thread_pool: Arc::new(Mutex::new(ThreadPool::new(8))), } } } /// One end of a `remote-trait-object` connection. /// /// If you establish a remote-trait-object connection, /// there must be two ends and each will be provided as a `Context` to each user on both sides. /// /// A context holds multiple things to function as a `remote-trait-object` connection end. /// Since the connection is symmetric, it manages both _server_ and _client_ toward the other end. /// It also manages a _registry_ that contains all exported services. /// The server will look up the registry to find a target object for handling an incoming method invocation. /// /// Note that `remote-trait-object` is a point-to-point connection protocol. /// Exporting & importing a service are always performed on a specific connection, /// which is toward the other side, or another instance of `Context`. /// /// If you created an instance of this, that means you have a connection that has been successfully established **once**, /// but is not guaranteed to be alive. /// If the other end (or the other `Context`) is closed, most operations performed on `Context` will just cause an error. pub struct Context { config: Config, multiplexer: Option<Multiplexer>, server: Option<Server>, port: Option<Arc<BasicPort>>, meta_service: Option<Box<dyn MetaService>>, firm_close_barrier: Arc<Barrier>, } impl std::fmt::Debug for Context { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Context").field("config", &self.config).finish() } } impl Context { /// Creates a new context without any initial services. /// /// If you decide to use this, you have to exchange raw [`HandleToExchange`] at least once using a secondary transportation means. /// It is really rarely needed, so please consider introducing an initializing service as an initial service, to avoid any raw exchange. /// /// Please see [`with_initial_service()`] for a general explanation of creation of `Context`. /// /// [`with_initial_service()`]: ./struct.Context.html#method.with_initial_service pub fn new<S: TransportSend + 'static, R: TransportRecv + 'static>( config: Config, transport_send: S, transport_recv: R, ) -> Self { let null_to_export = crate::service::create_null_service(); let (ctx, _null_to_import): (Self, ServiceToImport<dyn crate::service::NullService>) = Self::with_initial_service(config, transport_send, transport_recv, ServiceToExport::new(null_to_export)); ctx } /// Creates a new context only exporting a service, but importing nothing. /// /// The other end's context must be initialized with `with_initial_service_import()`. /// Please see [`with_initial_service()`] for a general explanation of creation of `Context`. /// /// [`with_initial_service()`]: ./struct.Context.html#method.with_initial_service pub fn with_initial_service_export<S: TransportSend + 'static, R: TransportRecv + 'static, A: ?Sized + Service>( config: Config, transport_send: S, transport_recv: R, initial_service: ServiceToExport<A>, ) -> Self { let (ctx, _null_to_import): (Self, ServiceToImport<dyn crate::service::NullService>) = Self::with_initial_service(config, transport_send, transport_recv, initial_service); ctx } /// Creates a new context only importing a service, but exporting nothing. /// /// The other end's context must be initialized with `with_initial_service_export()`. /// Please see [`with_initial_service()`] for a general explanation of creation of `Context`. /// /// [`with_initial_service()`]: ./struct.Context.html#method.with_initial_service pub fn with_initial_service_import<S: TransportSend + 'static, R: TransportRecv + 'static, B: ?Sized + Service>( config: Config, transport_send: S, transport_recv: R, ) -> (Self, ServiceToImport<B>) { let null_to_export = crate::service::create_null_service(); let (ctx, import) = Self::with_initial_service(config, transport_send, transport_recv, ServiceToExport::new(null_to_export)); (ctx, import) } /// Creates a new context exchanging two services, one for export and one for import. /// /// It takes `initial_service` and registers in it, and passes a `HandleToExchange` internally. (_export_). /// Also it receives a `HandleToExchange` from the other side, and makes it into a proxy object. (_import_) /// /// The other end's context must be initialized with `with_initial_service()` as well, and /// such processes will be symmetric for both. /// /// [`HandleToExchange`]: ../raw_exchange/struct.HandleToExchange.html pub fn with_initial_service< S: TransportSend + 'static, R: TransportRecv + 'static, A: ?Sized + Service, B: ?Sized + Service, >( config: Config, transport_send: S, transport_recv: R, initial_service: ServiceToExport<A>, ) -> (Self, ServiceToImport<B>) { let firm_close_barrier = Arc::new(Barrier::new(2)); let MultiplexResult { multiplexer, request_recv, response_recv, } = Multiplexer::multiplex::<R, PacketForward>(config.clone(), transport_recv); let transport_send = Arc::new(transport_send) as Arc<dyn TransportSend>; let client = Client::new(config.clone(), Arc::clone(&transport_send), Box::new(response_recv)); let port = BasicPort::new( config.clone(), client, (Box::new(MetaServiceImpl::new(Arc::clone(&firm_close_barrier))) as Box<dyn MetaService>).into_skeleton(), initial_service.get_raw_export(), ); let server = Server::new(config.clone(), port.get_registry(), transport_send, Box::new(request_recv)); let port_weak = Arc::downgrade(&port) as Weak<dyn Port>; let meta_service = <Box<dyn MetaService> as ImportProxy<dyn MetaService>>::import_proxy( Weak::clone(&port_weak), HandleToExchange(crate::forwarder::META_SERVICE_OBJECT_ID), ); let initial_handle = HandleToExchange(crate::forwarder::INITIAL_SERVICE_OBJECT_ID); let ctx = Context { config, multiplexer: Some(multiplexer), server: Some(server), port: Some(port), meta_service: Some(meta_service), firm_close_barrier, }; let initial_service = ServiceToImport::from_raw_import(initial_handle, port_weak); (ctx, initial_service) } pub(crate) fn get_port(&self) -> Weak<dyn Port> { Arc::downgrade(&self.port.clone().expect("It becomes None only when the context is dropped.")) as Weak<dyn Port> } /// Clears all service objects in its registry. /// /// The most usual way of deleting a service object is dropping its proxy object on the client side, and letting it request a delete to the exporter side. /// However, in some cases (especially while you're trying to shut down the connection) it is useful to clear all exported service objects /// **by the exporter side itself**. /// /// Note that it will cause an error if the client side drops a proxy object of an already deleted (by this method) service object. /// Consider calling [`disable_garbage_collection()`] on the other end if there's such an issue. /// /// Note also that this might trigger _delete request_ as a side effect since the service object might own a proxy object. /// /// [`disable_garbage_collection()`]: ./struct.Context.html#method.disable_garbage_collection pub fn clear_service_registry(&mut self) { self.port.as_mut().unwrap().clear_registry(); } /// Disables all _delete request_ from this end to the other end. /// /// If you call this, all `drop()` of proxy objects imported from this context won't send a delete request anymore. /// This is useful when you're not sure if the connection is still alive, but you have to close your side's context anyway. pub fn disable_garbage_collection(&self) { self.port.as_ref().expect("It becomes None only when the context is dropped.").set_no_drop(); } /// Closes a context with a firm synchronization with the other end. /// /// If you call this method, it will block until the other end calls `firm_close()` too. /// This is useful when you want to assure that two ends never suffer from 'other end has been closed' error. /// If one of the contexts dropped too early, all remote calls (including delete request) from the other end will fail. /// To avoid such a situation, consider using this to stay alive as long as it is required. /// /// FIXME: currently it doesn't use `timeout` and blocks indefinitely. pub fn firm_close(self, _timeout: Option<std::time::Duration>) -> Result<(), Self> { let barrier = Arc::clone(&self.firm_close_barrier); let t = std::thread::spawn(move || { barrier.wait(); }); self.meta_service.as_ref().unwrap().firm_close(); t.join().unwrap(); Ok(()) } } impl Drop for Context { /// This will delete all service objects after calling `disable_garbage_collection()` internally. fn drop(&mut self) { // We have to clean all registered service, as some might hold another proxy object inside, which refers this context's port. // For such case, we have to make them be dropped first before we unwrap the Arc<BasicPort> self.port.as_ref().unwrap().set_no_drop(); self.port.as_ref().unwrap().clear_registry(); drop(self.meta_service.take().unwrap()); self.multiplexer.take().expect("It becomes None only when the context is dropped.").shutdown(); // Shutdown server after multiplexer self.server.take().expect("It becomes None only when the context is dropped.").shutdown(); // Shutdown port after multiplexer Arc::try_unwrap(self.port.take().expect("It becomes None only when the context is dropped.")) .unwrap() .shutdown(); } } pub struct PacketForward; impl multiplex::Forward for PacketForward { fn forward(packet: PacketView) -> ForwardResult { match packet.slot().get_type() { SlotType::Request => ForwardResult::Request, SlotType::Response => ForwardResult::Response, } } }