use crate::packet::{PacketView, SlotType};
use crate::port::{client::Client, server::Server, BasicPort, Port};
use crate::transport::multiplex::{self, ForwardResult, MultiplexResult, Multiplexer};
use crate::transport::{TransportRecv, TransportSend};
use crate::{raw_exchange::*, Service, ServiceToExport, ServiceToImport};
use parking_lot::Mutex;
use std::sync::{Arc, Barrier, Weak};
use threadpool::ThreadPool;
mod meta_service {
use super::*;
use crate as remote_trait_object;
#[remote_trait_object_macro::service]
pub trait MetaService: Service {
fn firm_close(&self);
}
pub struct MetaServiceImpl {
barrier: Arc<Barrier>,
}
impl MetaServiceImpl {
pub fn new(barrier: Arc<Barrier>) -> Self {
Self {
barrier,
}
}
}
impl Service for MetaServiceImpl {}
impl MetaService for MetaServiceImpl {
fn firm_close(&self) {
self.barrier.wait();
}
}
}
use meta_service::{MetaService, MetaServiceImpl};
#[derive(Clone, Debug)]
pub struct Config {
pub name: String,
pub call_slots: usize,
pub call_timeout: Option<std::time::Duration>,
pub maximum_services_num: usize,
pub thread_pool: Arc<Mutex<threadpool::ThreadPool>>,
}
impl Config {
pub fn default_setup() -> Self {
Self {
name: "my rto".to_owned(),
call_slots: 512,
maximum_services_num: 65536,
call_timeout: Some(std::time::Duration::from_millis(1000)),
thread_pool: Arc::new(Mutex::new(ThreadPool::new(8))),
}
}
}
pub struct Context {
config: Config,
multiplexer: Option<Multiplexer>,
server: Option<Server>,
port: Option<Arc<BasicPort>>,
meta_service: Option<Box<dyn MetaService>>,
firm_close_barrier: Arc<Barrier>,
}
impl std::fmt::Debug for Context {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Context").field("config", &self.config).finish()
}
}
impl Context {
pub fn new<S: TransportSend + 'static, R: TransportRecv + 'static>(
config: Config,
transport_send: S,
transport_recv: R,
) -> Self {
let null_to_export = crate::service::create_null_service();
let (ctx, _null_to_import): (Self, ServiceToImport<dyn crate::service::NullService>) =
Self::with_initial_service(config, transport_send, transport_recv, ServiceToExport::new(null_to_export));
ctx
}
pub fn with_initial_service_export<S: TransportSend + 'static, R: TransportRecv + 'static, A: ?Sized + Service>(
config: Config,
transport_send: S,
transport_recv: R,
initial_service: ServiceToExport<A>,
) -> Self {
let (ctx, _null_to_import): (Self, ServiceToImport<dyn crate::service::NullService>) =
Self::with_initial_service(config, transport_send, transport_recv, initial_service);
ctx
}
pub fn with_initial_service_import<S: TransportSend + 'static, R: TransportRecv + 'static, B: ?Sized + Service>(
config: Config,
transport_send: S,
transport_recv: R,
) -> (Self, ServiceToImport<B>) {
let null_to_export = crate::service::create_null_service();
let (ctx, import) =
Self::with_initial_service(config, transport_send, transport_recv, ServiceToExport::new(null_to_export));
(ctx, import)
}
pub fn with_initial_service<
S: TransportSend + 'static,
R: TransportRecv + 'static,
A: ?Sized + Service,
B: ?Sized + Service,
>(
config: Config,
transport_send: S,
transport_recv: R,
initial_service: ServiceToExport<A>,
) -> (Self, ServiceToImport<B>) {
let firm_close_barrier = Arc::new(Barrier::new(2));
let MultiplexResult {
multiplexer,
request_recv,
response_recv,
} = Multiplexer::multiplex::<R, PacketForward>(config.clone(), transport_recv);
let transport_send = Arc::new(transport_send) as Arc<dyn TransportSend>;
let client = Client::new(config.clone(), Arc::clone(&transport_send), Box::new(response_recv));
let port = BasicPort::new(
config.clone(),
client,
(Box::new(MetaServiceImpl::new(Arc::clone(&firm_close_barrier))) as Box<dyn MetaService>).into_skeleton(),
initial_service.get_raw_export(),
);
let server = Server::new(config.clone(), port.get_registry(), transport_send, Box::new(request_recv));
let port_weak = Arc::downgrade(&port) as Weak<dyn Port>;
let meta_service = <Box<dyn MetaService> as ImportProxy<dyn MetaService>>::import_proxy(
Weak::clone(&port_weak),
HandleToExchange(crate::forwarder::META_SERVICE_OBJECT_ID),
);
let initial_handle = HandleToExchange(crate::forwarder::INITIAL_SERVICE_OBJECT_ID);
let ctx = Context {
config,
multiplexer: Some(multiplexer),
server: Some(server),
port: Some(port),
meta_service: Some(meta_service),
firm_close_barrier,
};
let initial_service = ServiceToImport::from_raw_import(initial_handle, port_weak);
(ctx, initial_service)
}
pub(crate) fn get_port(&self) -> Weak<dyn Port> {
Arc::downgrade(&self.port.clone().expect("It becomes None only when the context is dropped.")) as Weak<dyn Port>
}
pub fn clear_service_registry(&mut self) {
self.port.as_mut().unwrap().clear_registry();
}
pub fn disable_garbage_collection(&self) {
self.port.as_ref().expect("It becomes None only when the context is dropped.").set_no_drop();
}
pub fn firm_close(self, _timeout: Option<std::time::Duration>) -> Result<(), Self> {
let barrier = Arc::clone(&self.firm_close_barrier);
let t = std::thread::spawn(move || {
barrier.wait();
});
self.meta_service.as_ref().unwrap().firm_close();
t.join().unwrap();
Ok(())
}
}
impl Drop for Context {
fn drop(&mut self) {
self.port.as_ref().unwrap().set_no_drop();
self.port.as_ref().unwrap().clear_registry();
drop(self.meta_service.take().unwrap());
self.multiplexer.take().expect("It becomes None only when the context is dropped.").shutdown();
self.server.take().expect("It becomes None only when the context is dropped.").shutdown();
Arc::try_unwrap(self.port.take().expect("It becomes None only when the context is dropped."))
.unwrap()
.shutdown();
}
}
pub struct PacketForward;
impl multiplex::Forward for PacketForward {
fn forward(packet: PacketView) -> ForwardResult {
match packet.slot().get_type() {
SlotType::Request => ForwardResult::Request,
SlotType::Response => ForwardResult::Response,
}
}
}