remote_trait_object/context.rs
1use crate::packet::{PacketView, SlotType};
2use crate::port::{client::Client, server::Server, BasicPort, Port};
3use crate::transport::multiplex::{self, ForwardResult, MultiplexResult, Multiplexer};
4use crate::transport::{TransportRecv, TransportSend};
5use crate::{raw_exchange::*, Service, ServiceToExport, ServiceToImport};
6use parking_lot::Mutex;
7use std::sync::{Arc, Weak};
8use threadpool::ThreadPool;
9
10mod meta_service {
11 use super::*;
12 /// This is required because of macro
13 use crate as remote_trait_object;
14
15 #[remote_trait_object_macro::service]
16 pub trait MetaService: Service {}
17
18 pub struct MetaServiceImpl {}
19
20 impl MetaServiceImpl {
21 pub fn new() -> Self {
22 Self {}
23 }
24 }
25
26 impl Service for MetaServiceImpl {}
27
28 impl MetaService for MetaServiceImpl {}
29}
30use meta_service::{MetaService, MetaServiceImpl};
31
32/// A configuration of a `remote-trait-object` context.
33#[derive(Clone, Debug)]
34pub struct Config {
35 /// A name that will be appended to the names of various threads spawned by `remote-trait-object`, for an easy debug.
36 ///
37 /// This can be helpful if you handle multiple contexts of `remote-trait-object`.
38 pub name: String,
39
40 /// Number of the maximum of concurrent calls.
41 ///
42 /// Value of this doesn't have anything to do with the number of threads that would be spawned.
43 /// Having a large number of this wouldn't charge any cost except really small additional memory allocation.
44 ///
45 /// [`server_threads`]: ./struct.Config.html#field.server_threads
46 pub call_slots: usize,
47
48 /// A timeout for a remote method call.
49 ///
50 /// All remote method invocations through your proxy object and delete requests (that happens when you drop a proxy object)
51 /// will have this timeout. If it exceeds, it will cause an error.
52 ///
53 /// Use `None` for to wait indefinitely.
54 pub call_timeout: Option<std::time::Duration>,
55
56 /// A maximum number of services that this context can export.
57 pub maximum_services_num: usize,
58
59 /// A shared instance of a thread pool that will be used in call handling
60 ///
61 /// A `remote-trait-object` context will use this thread pool to handle an incoming method call.
62 /// Size of this pool determines the maximum number of concurrent calls that the context can handle.
63 /// Note that this pool is wrapped in `Arc`, which means that it can be possibly shared with other places.
64 pub thread_pool: Arc<Mutex<threadpool::ThreadPool>>,
65}
66
67impl Config {
68 pub fn default_setup() -> Self {
69 Self {
70 name: "my rto".to_owned(),
71 call_slots: 512,
72 maximum_services_num: 65536,
73 call_timeout: Some(std::time::Duration::from_millis(1000)),
74
75 thread_pool: Arc::new(Mutex::new(ThreadPool::new(8))),
76 }
77 }
78}
79
80/// One end of a `remote-trait-object` connection.
81///
82/// If you establish a remote-trait-object connection,
83/// there must be two ends and each will be provided as a `Context` to each user on both sides.
84///
85/// A context holds multiple things to function as a `remote-trait-object` connection end.
86/// Since the connection is symmetric, it manages both _server_ and _client_ toward the other end.
87/// It also manages a _registry_ that contains all exported services.
88/// The server will look up the registry to find a target object for handling an incoming method invocation.
89///
90/// Note that `remote-trait-object` is a point-to-point connection protocol.
91/// Exporting & importing a service are always performed on a specific connection,
92/// which is toward the other side, or another instance of `Context`.
93///
94/// If you created an instance of this, that means you have a connection that has been successfully established **once**,
95/// but is not guaranteed to be alive.
96/// If the other end (or the other `Context`) is closed, most operations performed on `Context` will just cause an error.
97pub struct Context {
98 config: Config,
99 multiplexer: Option<Multiplexer>,
100 server: Option<Server>,
101 port: Option<Arc<BasicPort>>,
102 meta_service: Option<Box<dyn MetaService>>,
103 cleaned: bool,
104}
105
106impl std::fmt::Debug for Context {
107 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108 f.debug_struct("Context")
109 .field("config", &self.config)
110 .finish()
111 }
112}
113
114impl Context {
115 /// Creates a new context without any initial services.
116 ///
117 /// If you decide to use this, you have to exchange raw [`HandleToExchange`] at least once using a secondary transportation means.
118 /// It is really rarely needed, so please consider introducing an initializing service as an initial service, to avoid any raw exchange.
119 ///
120 /// Please see [`with_initial_service()`] for a general explanation of creation of `Context`.
121 ///
122 /// [`with_initial_service()`]: ./struct.Context.html#method.with_initial_service
123 pub fn new<S: TransportSend + 'static, R: TransportRecv + 'static>(
124 config: Config,
125 transport_send: S,
126 transport_recv: R,
127 ) -> Self {
128 let null_to_export = crate::service::create_null_service();
129 let (ctx, _null_to_import): (Self, ServiceToImport<dyn crate::service::NullService>) =
130 Self::with_initial_service(
131 config,
132 transport_send,
133 transport_recv,
134 ServiceToExport::new(null_to_export),
135 );
136 ctx
137 }
138
139 /// Creates a new context only exporting a service, but importing nothing.
140 ///
141 /// The other end's context must be initialized with `with_initial_service_import()`.
142 /// Please see [`with_initial_service()`] for a general explanation of creation of `Context`.
143 ///
144 /// [`with_initial_service()`]: ./struct.Context.html#method.with_initial_service
145 pub fn with_initial_service_export<
146 S: TransportSend + 'static,
147 R: TransportRecv + 'static,
148 A: ?Sized + Service,
149 >(
150 config: Config,
151 transport_send: S,
152 transport_recv: R,
153 initial_service: ServiceToExport<A>,
154 ) -> Self {
155 let (ctx, _null_to_import): (Self, ServiceToImport<dyn crate::service::NullService>) =
156 Self::with_initial_service(config, transport_send, transport_recv, initial_service);
157 ctx
158 }
159
160 /// Creates a new context only importing a service, but exporting nothing.
161 ///
162 /// The other end's context must be initialized with `with_initial_service_export()`.
163 /// Please see [`with_initial_service()`] for a general explanation of creation of `Context`.
164 ///
165 /// [`with_initial_service()`]: ./struct.Context.html#method.with_initial_service
166 pub fn with_initial_service_import<
167 S: TransportSend + 'static,
168 R: TransportRecv + 'static,
169 B: ?Sized + Service,
170 >(
171 config: Config,
172 transport_send: S,
173 transport_recv: R,
174 ) -> (Self, ServiceToImport<B>) {
175 let null_to_export = crate::service::create_null_service();
176 let (ctx, import) = Self::with_initial_service(
177 config,
178 transport_send,
179 transport_recv,
180 ServiceToExport::new(null_to_export),
181 );
182 (ctx, import)
183 }
184
185 /// Creates a new context exchanging two services, one for export and one for import.
186 ///
187 /// It takes `initial_service` and registers in it, and passes a `HandleToExchange` internally. (_export_).
188 /// Also it receives a `HandleToExchange` from the other side, and makes it into a proxy object. (_import_)
189 ///
190 /// The other end's context must be initialized with `with_initial_service()` as well, and
191 /// such processes will be symmetric for both.
192 ///
193 /// [`HandleToExchange`]: ../raw_exchange/struct.HandleToExchange.html
194 pub fn with_initial_service<
195 S: TransportSend + 'static,
196 R: TransportRecv + 'static,
197 A: ?Sized + Service,
198 B: ?Sized + Service,
199 >(
200 config: Config,
201 transport_send: S,
202 transport_recv: R,
203 initial_service: ServiceToExport<A>,
204 ) -> (Self, ServiceToImport<B>) {
205 let MultiplexResult {
206 multiplexer,
207 request_recv,
208 response_recv,
209 } = Multiplexer::multiplex::<R, PacketForward>(config.clone(), transport_recv);
210 let transport_send = Arc::new(transport_send) as Arc<dyn TransportSend>;
211
212 let client = Client::new(
213 config.clone(),
214 Arc::clone(&transport_send),
215 Box::new(response_recv),
216 );
217 let port = BasicPort::new(
218 config.clone(),
219 client,
220 (Box::new(MetaServiceImpl::new()) as Box<dyn MetaService>).into_skeleton(),
221 initial_service.get_raw_export(),
222 );
223 let server = Server::new(
224 config.clone(),
225 port.get_registry(),
226 transport_send,
227 Box::new(request_recv),
228 );
229
230 let port_weak = Arc::downgrade(&port) as Weak<dyn Port>;
231 let meta_service = <Box<dyn MetaService> as ImportProxy<dyn MetaService>>::import_proxy(
232 Weak::clone(&port_weak),
233 HandleToExchange(crate::forwarder::META_SERVICE_OBJECT_ID),
234 );
235 let initial_handle = HandleToExchange(crate::forwarder::INITIAL_SERVICE_OBJECT_ID);
236
237 let ctx = Context {
238 config,
239 multiplexer: Some(multiplexer),
240 server: Some(server),
241 port: Some(port),
242 meta_service: Some(meta_service),
243 cleaned: false,
244 };
245 let initial_service = ServiceToImport::from_raw_import(initial_handle, port_weak);
246 (ctx, initial_service)
247 }
248
249 pub(crate) fn get_port(&self) -> Weak<dyn Port> {
250 Arc::downgrade(
251 &self
252 .port
253 .clone()
254 .expect("It becomes None only when the context is dropped."),
255 ) as Weak<dyn Port>
256 }
257
258 /// Clears all service objects in its registry.
259 ///
260 /// The most usual way of deleting a service object is dropping its proxy object on the client side, and letting it request a delete to the exporter side.
261 /// However, in some cases (especially while you're trying to shut down the connection) it is useful to clear all exported service objects
262 /// **by the exporter side itself**.
263 ///
264 /// Note that it will cause an error if the client side drops a proxy object of an already deleted (by this method) service object.
265 /// Consider calling [`disable_garbage_collection()`] on the other end if there's such an issue.
266 ///
267 /// Note also that this might trigger _delete request_ as a side effect since the service object might own a proxy object.
268 ///
269 /// [`disable_garbage_collection()`]: ./struct.Context.html#method.disable_garbage_collection
270 pub fn clear_service_registry(&mut self) {
271 self.port.as_mut().unwrap().clear_registry();
272 }
273
274 /// Disables all _delete request_ from this end to the other end.
275 ///
276 /// If you call this, all `drop()` of proxy objects imported from this context won't send a delete request anymore.
277 /// This is useful when you're not sure if the connection is still alive, but you have to close your side's context anyway.
278 pub fn disable_garbage_collection(&self) {
279 self.port
280 .as_ref()
281 .expect("It becomes None only when the context is dropped.")
282 .set_no_drop();
283 }
284
285 /// Waits until the transport is closed.
286 ///
287 /// Technically, this method will block until `TransportRecv` returns an error.
288 /// Use this if you have nothing to do while the connection is working well.
289 ///
290 /// TODO: We should actually consider `timeout`
291 pub fn wait(mut self, timeout: Option<std::time::Duration>) -> Result<(), Self> {
292 if let Err(multiplexer) = self
293 .multiplexer
294 .take()
295 .expect("It becomes None only when the context is dropped.")
296 .wait(timeout)
297 {
298 self.multiplexer.replace(multiplexer);
299 return Err(self);
300 }
301
302 self.port.as_ref().unwrap().set_no_drop();
303 self.port.as_ref().unwrap().clear_registry();
304 drop(self.meta_service.take().unwrap());
305
306 self.cleaned = true;
307 Ok(())
308 }
309}
310
311impl Drop for Context {
312 /// This will delete all service objects after calling `disable_garbage_collection()` internally.
313 fn drop(&mut self) {
314 if !self.cleaned {
315 self.multiplexer
316 .take()
317 .expect("It becomes None only when the context is dropped.")
318 .shutdown();
319 // We have to clean all registered service, as some might hold another proxy object inside, which refers this context's port.
320 // For such case, we have to make them be dropped first before we unwrap the Arc<BasicPort>
321 self.port.as_ref().unwrap().set_no_drop();
322 self.port.as_ref().unwrap().clear_registry();
323 drop(self.meta_service.take().unwrap());
324 }
325
326 // Shutdown server after multiplexer
327 self.server
328 .take()
329 .expect("It becomes None only when the context is dropped.")
330 .shutdown();
331 // Shutdown port after multiplexer
332 Arc::try_unwrap(
333 self.port
334 .take()
335 .expect("It becomes None only when the context is dropped."),
336 )
337 .unwrap()
338 .shutdown();
339 }
340}
341
342pub struct PacketForward;
343
344impl multiplex::Forward for PacketForward {
345 fn forward(packet: PacketView) -> ForwardResult {
346 match packet.slot().get_type() {
347 SlotType::Request => ForwardResult::Request,
348 SlotType::Response => ForwardResult::Response,
349 }
350 }
351}