1use super::handle_connection;
2use crate::error::Result;
3use crate::events::event_handler::EventHandler;
4use crate::ipc::context::{Context, ReplyListeners};
5use crate::ipc::stream_emitter::StreamEmitter;
6use crate::namespaces::namespace::Namespace;
7
8#[cfg(feature = "serialize")]
9use crate::payload::DynamicSerializer;
10
11use crate::protocol::{AsyncProtocolStreamSplit, AsyncStreamProtocolListener};
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::sync::RwLock;
16use trait_bound_typemap::SendSyncTypeMap;
17
18pub struct IPCServer {
22 pub(crate) handler: EventHandler,
23 pub(crate) namespaces: HashMap<String, Namespace>,
24 pub(crate) data: SendSyncTypeMap,
25 pub(crate) timeout: Duration,
26
27 #[cfg(feature = "serialize")]
28 pub(crate) default_serializer: DynamicSerializer,
29}
30
31impl IPCServer {
32 #[tracing::instrument(skip(self, options))]
35 pub async fn start<L: AsyncStreamProtocolListener>(
36 self,
37 address: L::AddressType,
38 options: L::ListenerOptions,
39 ) -> Result<()> {
40 let listener = L::protocol_bind(address.clone(), options).await?;
41 let handler = Arc::new(self.handler);
42 let namespaces = Arc::new(self.namespaces);
43 let data = Arc::new(RwLock::new(self.data));
44 tracing::info!("address = {:?}", address);
45
46 while let Ok((stream, remote_address)) = listener.protocol_accept().await {
47 tracing::debug!("remote_address = {:?}", remote_address);
48 let handler = Arc::clone(&handler);
49 let namespaces = Arc::clone(&namespaces);
50 let data = Arc::clone(&data);
51 let timeout = self.timeout;
52 #[cfg(feature = "serialize")]
53 let default_serializer = self.default_serializer.clone();
54
55 tokio::spawn(async move {
56 let (read_half, write_half) = stream.protocol_into_split();
57
58 let emitter = StreamEmitter::new::<L::Stream>(write_half);
59
60 let reply_listeners = ReplyListeners::default();
61
62 #[cfg(feature = "serialize")]
63 let ctx = Context::new(
64 emitter,
65 data,
66 None,
67 reply_listeners,
68 timeout.into(),
69 default_serializer.clone(),
70 );
71 #[cfg(not(feature = "serialize"))]
72 let ctx = Context::new(emitter, data, None, reply_listeners, timeout);
73
74 handle_connection::<L::Stream>(namespaces, handler, read_half, ctx).await;
75 });
76 }
77
78 Ok(())
79 }
80}