remote_trait_object/
context.rs

1use crate::packet::{PacketView, SlotType};
2use crate::port::{client::Client, server::Server, BasicPort, Port};
3use crate::transport::multiplex::{self, ForwardResult, MultiplexResult, Multiplexer};
4use crate::transport::{TransportRecv, TransportSend};
5use crate::{raw_exchange::*, Service, ServiceToExport, ServiceToImport};
6use parking_lot::Mutex;
7use std::sync::{Arc, Weak};
8use threadpool::ThreadPool;
9
10mod meta_service {
11    use super::*;
12    /// This is required because of macro
13    use crate as remote_trait_object;
14
15    #[remote_trait_object_macro::service]
16    pub trait MetaService: Service {}
17
18    pub struct MetaServiceImpl {}
19
20    impl MetaServiceImpl {
21        pub fn new() -> Self {
22            Self {}
23        }
24    }
25
26    impl Service for MetaServiceImpl {}
27
28    impl MetaService for MetaServiceImpl {}
29}
30use meta_service::{MetaService, MetaServiceImpl};
31
32/// A configuration of a `remote-trait-object` context.
33#[derive(Clone, Debug)]
34pub struct Config {
35    /// A name that will be appended to the names of various threads spawned by `remote-trait-object`, for an easy debug.
36    ///
37    /// This can be helpful if you handle multiple contexts of `remote-trait-object`.
38    pub name: String,
39
40    /// Number of the maximum of concurrent calls.
41    ///
42    /// Value of this doesn't have anything to do with the number of threads that would be spawned.
43    /// Having a large number of this wouldn't charge any cost except really small additional memory allocation.
44    ///
45    /// [`server_threads`]: ./struct.Config.html#field.server_threads
46    pub call_slots: usize,
47
48    /// A timeout for a remote method call.
49    ///
50    /// All remote method invocations through your proxy object and delete requests (that happens when you drop a proxy object)
51    /// will have this timeout. If it exceeds, it will cause an error.
52    ///
53    /// Use `None` for to wait indefinitely.
54    pub call_timeout: Option<std::time::Duration>,
55
56    /// A maximum number of services that this context can export.
57    pub maximum_services_num: usize,
58
59    /// A shared instance of a thread pool that will be used in call handling
60    ///
61    /// A `remote-trait-object` context will use this thread pool to handle an incoming method call.
62    /// Size of this pool determines the maximum number of concurrent calls that the context can handle.
63    /// Note that this pool is wrapped in `Arc`, which means that it can be possibly shared with other places.
64    pub thread_pool: Arc<Mutex<threadpool::ThreadPool>>,
65}
66
67impl Config {
68    pub fn default_setup() -> Self {
69        Self {
70            name: "my rto".to_owned(),
71            call_slots: 512,
72            maximum_services_num: 65536,
73            call_timeout: Some(std::time::Duration::from_millis(1000)),
74
75            thread_pool: Arc::new(Mutex::new(ThreadPool::new(8))),
76        }
77    }
78}
79
80/// One end of a `remote-trait-object` connection.
81///
82/// If you establish a remote-trait-object connection,
83/// there must be two ends and each will be provided as a `Context` to each user on both sides.
84///
85/// A context holds multiple things to function as a `remote-trait-object` connection end.
86/// Since the connection is symmetric, it manages both _server_ and _client_ toward the other end.
87/// It also manages a _registry_ that contains all exported services.
88/// The server will look up the registry to find a target object for handling an incoming method invocation.
89///
90/// Note that `remote-trait-object` is a point-to-point connection protocol.
91/// Exporting & importing a service are always performed on a specific connection,
92/// which is toward the other side, or another instance of `Context`.
93///
94/// If you created an instance of this, that means you have a connection that has been successfully established **once**,
95/// but is not guaranteed to be alive.
96/// If the other end (or the other `Context`) is closed, most operations performed on `Context` will just cause an error.
97pub struct Context {
98    config: Config,
99    multiplexer: Option<Multiplexer>,
100    server: Option<Server>,
101    port: Option<Arc<BasicPort>>,
102    meta_service: Option<Box<dyn MetaService>>,
103    cleaned: bool,
104}
105
106impl std::fmt::Debug for Context {
107    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108        f.debug_struct("Context")
109            .field("config", &self.config)
110            .finish()
111    }
112}
113
114impl Context {
115    /// Creates a new context without any initial services.
116    ///
117    /// If you decide to use this, you have to exchange raw [`HandleToExchange`] at least once using a secondary transportation means.
118    /// It is really rarely needed, so please consider introducing an initializing service as an initial service, to avoid any raw exchange.
119    ///
120    /// Please see [`with_initial_service()`] for a general explanation of creation of `Context`.
121    ///
122    /// [`with_initial_service()`]: ./struct.Context.html#method.with_initial_service
123    pub fn new<S: TransportSend + 'static, R: TransportRecv + 'static>(
124        config: Config,
125        transport_send: S,
126        transport_recv: R,
127    ) -> Self {
128        let null_to_export = crate::service::create_null_service();
129        let (ctx, _null_to_import): (Self, ServiceToImport<dyn crate::service::NullService>) =
130            Self::with_initial_service(
131                config,
132                transport_send,
133                transport_recv,
134                ServiceToExport::new(null_to_export),
135            );
136        ctx
137    }
138
139    /// Creates a new context only exporting a service, but importing nothing.
140    ///
141    /// The other end's context must be initialized with `with_initial_service_import()`.
142    /// Please see [`with_initial_service()`] for a general explanation of creation of `Context`.
143    ///
144    /// [`with_initial_service()`]: ./struct.Context.html#method.with_initial_service
145    pub fn with_initial_service_export<
146        S: TransportSend + 'static,
147        R: TransportRecv + 'static,
148        A: ?Sized + Service,
149    >(
150        config: Config,
151        transport_send: S,
152        transport_recv: R,
153        initial_service: ServiceToExport<A>,
154    ) -> Self {
155        let (ctx, _null_to_import): (Self, ServiceToImport<dyn crate::service::NullService>) =
156            Self::with_initial_service(config, transport_send, transport_recv, initial_service);
157        ctx
158    }
159
160    /// Creates a new context only importing a service, but exporting nothing.
161    ///
162    /// The other end's context must be initialized with `with_initial_service_export()`.
163    /// Please see [`with_initial_service()`] for a general explanation of creation of `Context`.
164    ///
165    /// [`with_initial_service()`]: ./struct.Context.html#method.with_initial_service
166    pub fn with_initial_service_import<
167        S: TransportSend + 'static,
168        R: TransportRecv + 'static,
169        B: ?Sized + Service,
170    >(
171        config: Config,
172        transport_send: S,
173        transport_recv: R,
174    ) -> (Self, ServiceToImport<B>) {
175        let null_to_export = crate::service::create_null_service();
176        let (ctx, import) = Self::with_initial_service(
177            config,
178            transport_send,
179            transport_recv,
180            ServiceToExport::new(null_to_export),
181        );
182        (ctx, import)
183    }
184
185    /// Creates a new context exchanging two services, one for export and one for import.
186    ///
187    /// It takes `initial_service` and registers in it, and passes a `HandleToExchange` internally. (_export_).
188    /// Also it receives a `HandleToExchange` from the other side, and makes it into a proxy object. (_import_)
189    ///
190    /// The other end's context must be initialized with `with_initial_service()` as well, and
191    /// such processes will be symmetric for both.
192    ///
193    /// [`HandleToExchange`]: ../raw_exchange/struct.HandleToExchange.html
194    pub fn with_initial_service<
195        S: TransportSend + 'static,
196        R: TransportRecv + 'static,
197        A: ?Sized + Service,
198        B: ?Sized + Service,
199    >(
200        config: Config,
201        transport_send: S,
202        transport_recv: R,
203        initial_service: ServiceToExport<A>,
204    ) -> (Self, ServiceToImport<B>) {
205        let MultiplexResult {
206            multiplexer,
207            request_recv,
208            response_recv,
209        } = Multiplexer::multiplex::<R, PacketForward>(config.clone(), transport_recv);
210        let transport_send = Arc::new(transport_send) as Arc<dyn TransportSend>;
211
212        let client = Client::new(
213            config.clone(),
214            Arc::clone(&transport_send),
215            Box::new(response_recv),
216        );
217        let port = BasicPort::new(
218            config.clone(),
219            client,
220            (Box::new(MetaServiceImpl::new()) as Box<dyn MetaService>).into_skeleton(),
221            initial_service.get_raw_export(),
222        );
223        let server = Server::new(
224            config.clone(),
225            port.get_registry(),
226            transport_send,
227            Box::new(request_recv),
228        );
229
230        let port_weak = Arc::downgrade(&port) as Weak<dyn Port>;
231        let meta_service = <Box<dyn MetaService> as ImportProxy<dyn MetaService>>::import_proxy(
232            Weak::clone(&port_weak),
233            HandleToExchange(crate::forwarder::META_SERVICE_OBJECT_ID),
234        );
235        let initial_handle = HandleToExchange(crate::forwarder::INITIAL_SERVICE_OBJECT_ID);
236
237        let ctx = Context {
238            config,
239            multiplexer: Some(multiplexer),
240            server: Some(server),
241            port: Some(port),
242            meta_service: Some(meta_service),
243            cleaned: false,
244        };
245        let initial_service = ServiceToImport::from_raw_import(initial_handle, port_weak);
246        (ctx, initial_service)
247    }
248
249    pub(crate) fn get_port(&self) -> Weak<dyn Port> {
250        Arc::downgrade(
251            &self
252                .port
253                .clone()
254                .expect("It becomes None only when the context is dropped."),
255        ) as Weak<dyn Port>
256    }
257
258    /// Clears all service objects in its registry.
259    ///
260    /// The most usual way of deleting a service object is dropping its proxy object on the client side, and letting it request a delete to the exporter side.
261    /// However, in some cases (especially while you're trying to shut down the connection) it is useful to clear all exported service objects
262    /// **by the exporter side itself**.
263    ///
264    /// Note that it will cause an error if the client side drops a proxy object of an already deleted (by this method) service object.
265    /// Consider calling [`disable_garbage_collection()`] on the other end if there's such an issue.
266    ///
267    /// Note also that this might trigger _delete request_ as a side effect since the service object might own a proxy object.
268    ///
269    /// [`disable_garbage_collection()`]: ./struct.Context.html#method.disable_garbage_collection
270    pub fn clear_service_registry(&mut self) {
271        self.port.as_mut().unwrap().clear_registry();
272    }
273
274    /// Disables all _delete request_ from this end to the other end.
275    ///
276    /// If you call this, all `drop()` of proxy objects imported from this context won't send a delete request anymore.
277    /// This is useful when you're not sure if the connection is still alive, but you have to close your side's context anyway.
278    pub fn disable_garbage_collection(&self) {
279        self.port
280            .as_ref()
281            .expect("It becomes None only when the context is dropped.")
282            .set_no_drop();
283    }
284
285    /// Waits until the transport is closed.
286    ///
287    /// Technically, this method will block until `TransportRecv` returns an error.
288    /// Use this if you have nothing to do while the connection is working well.
289    ///
290    /// TODO: We should actually consider `timeout`
291    pub fn wait(mut self, timeout: Option<std::time::Duration>) -> Result<(), Self> {
292        if let Err(multiplexer) = self
293            .multiplexer
294            .take()
295            .expect("It becomes None only when the context is dropped.")
296            .wait(timeout)
297        {
298            self.multiplexer.replace(multiplexer);
299            return Err(self);
300        }
301
302        self.port.as_ref().unwrap().set_no_drop();
303        self.port.as_ref().unwrap().clear_registry();
304        drop(self.meta_service.take().unwrap());
305
306        self.cleaned = true;
307        Ok(())
308    }
309}
310
311impl Drop for Context {
312    /// This will delete all service objects after calling `disable_garbage_collection()` internally.
313    fn drop(&mut self) {
314        if !self.cleaned {
315            self.multiplexer
316                .take()
317                .expect("It becomes None only when the context is dropped.")
318                .shutdown();
319            // We have to clean all registered service, as some might hold another proxy object inside, which refers this context's port.
320            // For such case, we have to make them be dropped first before we unwrap the Arc<BasicPort>
321            self.port.as_ref().unwrap().set_no_drop();
322            self.port.as_ref().unwrap().clear_registry();
323            drop(self.meta_service.take().unwrap());
324        }
325
326        // Shutdown server after multiplexer
327        self.server
328            .take()
329            .expect("It becomes None only when the context is dropped.")
330            .shutdown();
331        // Shutdown port after multiplexer
332        Arc::try_unwrap(
333            self.port
334                .take()
335                .expect("It becomes None only when the context is dropped."),
336        )
337        .unwrap()
338        .shutdown();
339    }
340}
341
342pub struct PacketForward;
343
344impl multiplex::Forward for PacketForward {
345    fn forward(packet: PacketView) -> ForwardResult {
346        match packet.slot().get_type() {
347            SlotType::Request => ForwardResult::Request,
348            SlotType::Response => ForwardResult::Response,
349        }
350    }
351}