remote-trait-object 0.5.0

A remote method invocation library based on trait objects
Documentation
use crate::packet::{PacketView, SlotType};
use crate::port::{client::Client, server::Server, BasicPort, Port};
use crate::transport::multiplex::{self, ForwardResult, MultiplexResult, Multiplexer};
use crate::transport::{TransportRecv, TransportSend};
use crate::{raw_exchange::*, Service, ServiceToExport, ServiceToImport};
use parking_lot::Mutex;
use std::sync::{Arc, Weak};
use threadpool::ThreadPool;

mod meta_service {
    use super::*;
    /// This is required because of macro
    use crate as remote_trait_object;

    #[remote_trait_object_macro::service]
    pub trait MetaService: Service {}

    pub struct MetaServiceImpl {}

    impl MetaServiceImpl {
        pub fn new() -> Self {
            Self {}
        }
    }

    impl Service for MetaServiceImpl {}

    impl MetaService for MetaServiceImpl {}
}
use meta_service::{MetaService, MetaServiceImpl};

/// A configuration of a `remote-trait-object` context.
#[derive(Clone, Debug)]
pub struct Config {
    /// A name that will be appended to the names of various threads spawned by `remote-trait-object`, for an easy debug.
    ///
    /// This can be helpful if you handle multiple contexts of `remote-trait-object`.
    pub name: String,

    /// Number of the maximum of concurrent calls.
    ///
    /// Value of this doesn't have anything to do with the number of threads that would be spawned.
    /// Having a large number of this wouldn't charge any cost except really small additional memory allocation.
    ///
    /// [`server_threads`]: ./struct.Config.html#field.server_threads
    pub call_slots: usize,

    /// A timeout for a remote method call.
    ///
    /// All remote method invocations through your proxy object and delete requests (that happens when you drop a proxy object)
    /// will have this timeout. If it exceeds, it will cause an error.
    ///
    /// Use `None` for to wait indefinitely.
    pub call_timeout: Option<std::time::Duration>,

    /// A maximum number of services that this context can export.
    pub maximum_services_num: usize,

    /// A shared instance of a thread pool that will be used in call handling
    ///
    /// A `remote-trait-object` context will use this thread pool to handle an incoming method call.
    /// Size of this pool determines the maximum number of concurrent calls that the context can handle.
    /// Note that this pool is wrapped in `Arc`, which means that it can be possibly shared with other places.
    pub thread_pool: Arc<Mutex<threadpool::ThreadPool>>,
}

impl Config {
    pub fn default_setup() -> Self {
        Self {
            name: "my rto".to_owned(),
            call_slots: 512,
            maximum_services_num: 65536,
            call_timeout: Some(std::time::Duration::from_millis(1000)),

            thread_pool: Arc::new(Mutex::new(ThreadPool::new(8))),
        }
    }
}

/// One end of a `remote-trait-object` connection.
///
/// If you establish a remote-trait-object connection,
/// there must be two ends and each will be provided as a `Context` to each user on both sides.
///
/// A context holds multiple things to function as a `remote-trait-object` connection end.
/// Since the connection is symmetric, it manages both _server_ and _client_ toward the other end.
/// It also manages a _registry_ that contains all exported services.
/// The server will look up the registry to find a target object for handling an incoming method invocation.
///
/// Note that `remote-trait-object` is a point-to-point connection protocol.
/// Exporting & importing a service are always performed on a specific connection,
/// which is toward the other side, or another instance of `Context`.
///
/// If you created an instance of this, that means you have a connection that has been successfully established **once**,
/// but is not guaranteed to be alive.
/// If the other end (or the other `Context`) is closed, most operations performed on `Context` will just cause an error.
pub struct Context {
    config: Config,
    multiplexer: Option<Multiplexer>,
    server: Option<Server>,
    port: Option<Arc<BasicPort>>,
    meta_service: Option<Box<dyn MetaService>>,
    cleaned: bool,
}

impl std::fmt::Debug for Context {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Context")
            .field("config", &self.config)
            .finish()
    }
}

impl Context {
    /// Creates a new context without any initial services.
    ///
    /// If you decide to use this, you have to exchange raw [`HandleToExchange`] at least once using a secondary transportation means.
    /// It is really rarely needed, so please consider introducing an initializing service as an initial service, to avoid any raw exchange.
    ///
    /// Please see [`with_initial_service()`] for a general explanation of creation of `Context`.
    ///
    /// [`with_initial_service()`]: ./struct.Context.html#method.with_initial_service
    pub fn new<S: TransportSend + 'static, R: TransportRecv + 'static>(
        config: Config,
        transport_send: S,
        transport_recv: R,
    ) -> Self {
        let null_to_export = crate::service::create_null_service();
        let (ctx, _null_to_import): (Self, ServiceToImport<dyn crate::service::NullService>) =
            Self::with_initial_service(
                config,
                transport_send,
                transport_recv,
                ServiceToExport::new(null_to_export),
            );
        ctx
    }

    /// Creates a new context only exporting a service, but importing nothing.
    ///
    /// The other end's context must be initialized with `with_initial_service_import()`.
    /// Please see [`with_initial_service()`] for a general explanation of creation of `Context`.
    ///
    /// [`with_initial_service()`]: ./struct.Context.html#method.with_initial_service
    pub fn with_initial_service_export<
        S: TransportSend + 'static,
        R: TransportRecv + 'static,
        A: ?Sized + Service,
    >(
        config: Config,
        transport_send: S,
        transport_recv: R,
        initial_service: ServiceToExport<A>,
    ) -> Self {
        let (ctx, _null_to_import): (Self, ServiceToImport<dyn crate::service::NullService>) =
            Self::with_initial_service(config, transport_send, transport_recv, initial_service);
        ctx
    }

    /// Creates a new context only importing a service, but exporting nothing.
    ///
    /// The other end's context must be initialized with `with_initial_service_export()`.
    /// Please see [`with_initial_service()`] for a general explanation of creation of `Context`.
    ///
    /// [`with_initial_service()`]: ./struct.Context.html#method.with_initial_service
    pub fn with_initial_service_import<
        S: TransportSend + 'static,
        R: TransportRecv + 'static,
        B: ?Sized + Service,
    >(
        config: Config,
        transport_send: S,
        transport_recv: R,
    ) -> (Self, ServiceToImport<B>) {
        let null_to_export = crate::service::create_null_service();
        let (ctx, import) = Self::with_initial_service(
            config,
            transport_send,
            transport_recv,
            ServiceToExport::new(null_to_export),
        );
        (ctx, import)
    }

    /// Creates a new context exchanging two services, one for export and one for import.
    ///
    /// It takes `initial_service` and registers in it, and passes a `HandleToExchange` internally. (_export_).
    /// Also it receives a `HandleToExchange` from the other side, and makes it into a proxy object. (_import_)
    ///
    /// The other end's context must be initialized with `with_initial_service()` as well, and
    /// such processes will be symmetric for both.
    ///
    /// [`HandleToExchange`]: ../raw_exchange/struct.HandleToExchange.html
    pub fn with_initial_service<
        S: TransportSend + 'static,
        R: TransportRecv + 'static,
        A: ?Sized + Service,
        B: ?Sized + Service,
    >(
        config: Config,
        transport_send: S,
        transport_recv: R,
        initial_service: ServiceToExport<A>,
    ) -> (Self, ServiceToImport<B>) {
        let MultiplexResult {
            multiplexer,
            request_recv,
            response_recv,
        } = Multiplexer::multiplex::<R, PacketForward>(config.clone(), transport_recv);
        let transport_send = Arc::new(transport_send) as Arc<dyn TransportSend>;

        let client = Client::new(
            config.clone(),
            Arc::clone(&transport_send),
            Box::new(response_recv),
        );
        let port = BasicPort::new(
            config.clone(),
            client,
            (Box::new(MetaServiceImpl::new()) as Box<dyn MetaService>).into_skeleton(),
            initial_service.get_raw_export(),
        );
        let server = Server::new(
            config.clone(),
            port.get_registry(),
            transport_send,
            Box::new(request_recv),
        );

        let port_weak = Arc::downgrade(&port) as Weak<dyn Port>;
        let meta_service = <Box<dyn MetaService> as ImportProxy<dyn MetaService>>::import_proxy(
            Weak::clone(&port_weak),
            HandleToExchange(crate::forwarder::META_SERVICE_OBJECT_ID),
        );
        let initial_handle = HandleToExchange(crate::forwarder::INITIAL_SERVICE_OBJECT_ID);

        let ctx = Context {
            config,
            multiplexer: Some(multiplexer),
            server: Some(server),
            port: Some(port),
            meta_service: Some(meta_service),
            cleaned: false,
        };
        let initial_service = ServiceToImport::from_raw_import(initial_handle, port_weak);
        (ctx, initial_service)
    }

    pub(crate) fn get_port(&self) -> Weak<dyn Port> {
        Arc::downgrade(
            &self
                .port
                .clone()
                .expect("It becomes None only when the context is dropped."),
        ) as Weak<dyn Port>
    }

    /// Clears all service objects in its registry.
    ///
    /// The most usual way of deleting a service object is dropping its proxy object on the client side, and letting it request a delete to the exporter side.
    /// However, in some cases (especially while you're trying to shut down the connection) it is useful to clear all exported service objects
    /// **by the exporter side itself**.
    ///
    /// Note that it will cause an error if the client side drops a proxy object of an already deleted (by this method) service object.
    /// Consider calling [`disable_garbage_collection()`] on the other end if there's such an issue.
    ///
    /// Note also that this might trigger _delete request_ as a side effect since the service object might own a proxy object.
    ///
    /// [`disable_garbage_collection()`]: ./struct.Context.html#method.disable_garbage_collection
    pub fn clear_service_registry(&mut self) {
        self.port.as_mut().unwrap().clear_registry();
    }

    /// Disables all _delete request_ from this end to the other end.
    ///
    /// If you call this, all `drop()` of proxy objects imported from this context won't send a delete request anymore.
    /// This is useful when you're not sure if the connection is still alive, but you have to close your side's context anyway.
    pub fn disable_garbage_collection(&self) {
        self.port
            .as_ref()
            .expect("It becomes None only when the context is dropped.")
            .set_no_drop();
    }

    /// Waits until the transport is closed.
    ///
    /// Technically, this method will block until `TransportRecv` returns an error.
    /// Use this if you have nothing to do while the connection is working well.
    ///
    /// TODO: We should actually consider `timeout`
    pub fn wait(mut self, timeout: Option<std::time::Duration>) -> Result<(), Self> {
        if let Err(multiplexer) = self
            .multiplexer
            .take()
            .expect("It becomes None only when the context is dropped.")
            .wait(timeout)
        {
            self.multiplexer.replace(multiplexer);
            return Err(self);
        }

        self.port.as_ref().unwrap().set_no_drop();
        self.port.as_ref().unwrap().clear_registry();
        drop(self.meta_service.take().unwrap());

        self.cleaned = true;
        Ok(())
    }
}

impl Drop for Context {
    /// This will delete all service objects after calling `disable_garbage_collection()` internally.
    fn drop(&mut self) {
        if !self.cleaned {
            self.multiplexer
                .take()
                .expect("It becomes None only when the context is dropped.")
                .shutdown();
            // We have to clean all registered service, as some might hold another proxy object inside, which refers this context's port.
            // For such case, we have to make them be dropped first before we unwrap the Arc<BasicPort>
            self.port.as_ref().unwrap().set_no_drop();
            self.port.as_ref().unwrap().clear_registry();
            drop(self.meta_service.take().unwrap());
        }

        // Shutdown server after multiplexer
        self.server
            .take()
            .expect("It becomes None only when the context is dropped.")
            .shutdown();
        // Shutdown port after multiplexer
        Arc::try_unwrap(
            self.port
                .take()
                .expect("It becomes None only when the context is dropped."),
        )
        .unwrap()
        .shutdown();
    }
}

pub struct PacketForward;

impl multiplex::Forward for PacketForward {
    fn forward(packet: PacketView) -> ForwardResult {
        match packet.slot().get_type() {
            SlotType::Request => ForwardResult::Request,
            SlotType::Response => ForwardResult::Response,
        }
    }
}