use crate::packet::{PacketView, SlotType};
use crate::port::{client::Client, server::Server, BasicPort, Port};
use crate::transport::multiplex::{self, ForwardResult, MultiplexResult, Multiplexer};
use crate::transport::{TransportRecv, TransportSend};
use crate::{raw_exchange::*, Service, ServiceToExport, ServiceToImport};
use parking_lot::Mutex;
use std::sync::{Arc, Weak};
use threadpool::ThreadPool;
mod meta_service {
use super::*;
use crate as remote_trait_object;
#[remote_trait_object_macro::service]
pub trait MetaService: Service {}
pub struct MetaServiceImpl {}
impl MetaServiceImpl {
pub fn new() -> Self {
Self {}
}
}
impl Service for MetaServiceImpl {}
impl MetaService for MetaServiceImpl {}
}
use meta_service::{MetaService, MetaServiceImpl};
#[derive(Clone, Debug)]
pub struct Config {
pub name: String,
pub call_slots: usize,
pub call_timeout: Option<std::time::Duration>,
pub maximum_services_num: usize,
pub thread_pool: Arc<Mutex<threadpool::ThreadPool>>,
}
impl Config {
pub fn default_setup() -> Self {
Self {
name: "my rto".to_owned(),
call_slots: 512,
maximum_services_num: 65536,
call_timeout: Some(std::time::Duration::from_millis(1000)),
thread_pool: Arc::new(Mutex::new(ThreadPool::new(8))),
}
}
}
pub struct Context {
config: Config,
multiplexer: Option<Multiplexer>,
server: Option<Server>,
port: Option<Arc<BasicPort>>,
meta_service: Option<Box<dyn MetaService>>,
cleaned: bool,
}
impl std::fmt::Debug for Context {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Context")
.field("config", &self.config)
.finish()
}
}
impl Context {
pub fn new<S: TransportSend + 'static, R: TransportRecv + 'static>(
config: Config,
transport_send: S,
transport_recv: R,
) -> Self {
let null_to_export = crate::service::create_null_service();
let (ctx, _null_to_import): (Self, ServiceToImport<dyn crate::service::NullService>) =
Self::with_initial_service(
config,
transport_send,
transport_recv,
ServiceToExport::new(null_to_export),
);
ctx
}
pub fn with_initial_service_export<
S: TransportSend + 'static,
R: TransportRecv + 'static,
A: ?Sized + Service,
>(
config: Config,
transport_send: S,
transport_recv: R,
initial_service: ServiceToExport<A>,
) -> Self {
let (ctx, _null_to_import): (Self, ServiceToImport<dyn crate::service::NullService>) =
Self::with_initial_service(config, transport_send, transport_recv, initial_service);
ctx
}
pub fn with_initial_service_import<
S: TransportSend + 'static,
R: TransportRecv + 'static,
B: ?Sized + Service,
>(
config: Config,
transport_send: S,
transport_recv: R,
) -> (Self, ServiceToImport<B>) {
let null_to_export = crate::service::create_null_service();
let (ctx, import) = Self::with_initial_service(
config,
transport_send,
transport_recv,
ServiceToExport::new(null_to_export),
);
(ctx, import)
}
pub fn with_initial_service<
S: TransportSend + 'static,
R: TransportRecv + 'static,
A: ?Sized + Service,
B: ?Sized + Service,
>(
config: Config,
transport_send: S,
transport_recv: R,
initial_service: ServiceToExport<A>,
) -> (Self, ServiceToImport<B>) {
let MultiplexResult {
multiplexer,
request_recv,
response_recv,
} = Multiplexer::multiplex::<R, PacketForward>(config.clone(), transport_recv);
let transport_send = Arc::new(transport_send) as Arc<dyn TransportSend>;
let client = Client::new(
config.clone(),
Arc::clone(&transport_send),
Box::new(response_recv),
);
let port = BasicPort::new(
config.clone(),
client,
(Box::new(MetaServiceImpl::new()) as Box<dyn MetaService>).into_skeleton(),
initial_service.get_raw_export(),
);
let server = Server::new(
config.clone(),
port.get_registry(),
transport_send,
Box::new(request_recv),
);
let port_weak = Arc::downgrade(&port) as Weak<dyn Port>;
let meta_service = <Box<dyn MetaService> as ImportProxy<dyn MetaService>>::import_proxy(
Weak::clone(&port_weak),
HandleToExchange(crate::forwarder::META_SERVICE_OBJECT_ID),
);
let initial_handle = HandleToExchange(crate::forwarder::INITIAL_SERVICE_OBJECT_ID);
let ctx = Context {
config,
multiplexer: Some(multiplexer),
server: Some(server),
port: Some(port),
meta_service: Some(meta_service),
cleaned: false,
};
let initial_service = ServiceToImport::from_raw_import(initial_handle, port_weak);
(ctx, initial_service)
}
pub(crate) fn get_port(&self) -> Weak<dyn Port> {
Arc::downgrade(
&self
.port
.clone()
.expect("It becomes None only when the context is dropped."),
) as Weak<dyn Port>
}
pub fn clear_service_registry(&mut self) {
self.port.as_mut().unwrap().clear_registry();
}
pub fn disable_garbage_collection(&self) {
self.port
.as_ref()
.expect("It becomes None only when the context is dropped.")
.set_no_drop();
}
pub fn wait(mut self, timeout: Option<std::time::Duration>) -> Result<(), Self> {
if let Err(multiplexer) = self
.multiplexer
.take()
.expect("It becomes None only when the context is dropped.")
.wait(timeout)
{
self.multiplexer.replace(multiplexer);
return Err(self);
}
self.port.as_ref().unwrap().set_no_drop();
self.port.as_ref().unwrap().clear_registry();
drop(self.meta_service.take().unwrap());
self.cleaned = true;
Ok(())
}
}
impl Drop for Context {
fn drop(&mut self) {
if !self.cleaned {
self.multiplexer
.take()
.expect("It becomes None only when the context is dropped.")
.shutdown();
self.port.as_ref().unwrap().set_no_drop();
self.port.as_ref().unwrap().clear_registry();
drop(self.meta_service.take().unwrap());
}
self.server
.take()
.expect("It becomes None only when the context is dropped.")
.shutdown();
Arc::try_unwrap(
self.port
.take()
.expect("It becomes None only when the context is dropped."),
)
.unwrap()
.shutdown();
}
}
pub struct PacketForward;
impl multiplex::Forward for PacketForward {
fn forward(packet: PacketView) -> ForwardResult {
match packet.slot().get_type() {
SlotType::Request => ForwardResult::Request,
SlotType::Response => ForwardResult::Response,
}
}
}