bromine/ipc/
server.rs

1use super::handle_connection;
2use crate::error::Result;
3use crate::events::event_handler::EventHandler;
4use crate::ipc::context::{Context, ReplyListeners};
5use crate::ipc::stream_emitter::StreamEmitter;
6use crate::namespaces::namespace::Namespace;
7
8#[cfg(feature = "serialize")]
9use crate::payload::DynamicSerializer;
10
11use crate::protocol::{AsyncProtocolStreamSplit, AsyncStreamProtocolListener};
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::sync::RwLock;
16use trait_bound_typemap::SendSyncTypeMap;
17
18/// The IPC Server listening for connections.
19/// Use the [IPCBuilder](crate::builder::IPCBuilder) to create a server.
20/// Usually one does not need to use the IPCServer object directly.
21pub struct IPCServer {
22    pub(crate) handler: EventHandler,
23    pub(crate) namespaces: HashMap<String, Namespace>,
24    pub(crate) data: SendSyncTypeMap,
25    pub(crate) timeout: Duration,
26
27    #[cfg(feature = "serialize")]
28    pub(crate) default_serializer: DynamicSerializer,
29}
30
31impl IPCServer {
32    /// Starts the IPC Server.
33    /// Invoked by [IPCBuilder::build_server](crate::builder::IPCBuilder::build_server)
34    #[tracing::instrument(skip(self, options))]
35    pub async fn start<L: AsyncStreamProtocolListener>(
36        self,
37        address: L::AddressType,
38        options: L::ListenerOptions,
39    ) -> Result<()> {
40        let listener = L::protocol_bind(address.clone(), options).await?;
41        let handler = Arc::new(self.handler);
42        let namespaces = Arc::new(self.namespaces);
43        let data = Arc::new(RwLock::new(self.data));
44        tracing::info!("address = {:?}", address);
45
46        while let Ok((stream, remote_address)) = listener.protocol_accept().await {
47            tracing::debug!("remote_address = {:?}", remote_address);
48            let handler = Arc::clone(&handler);
49            let namespaces = Arc::clone(&namespaces);
50            let data = Arc::clone(&data);
51            let timeout = self.timeout;
52            #[cfg(feature = "serialize")]
53            let default_serializer = self.default_serializer.clone();
54
55            tokio::spawn(async move {
56                let (read_half, write_half) = stream.protocol_into_split();
57
58                let emitter = StreamEmitter::new::<L::Stream>(write_half);
59
60                let reply_listeners = ReplyListeners::default();
61
62                #[cfg(feature = "serialize")]
63                let ctx = Context::new(
64                    emitter,
65                    data,
66                    None,
67                    reply_listeners,
68                    timeout.into(),
69                    default_serializer.clone(),
70                );
71                #[cfg(not(feature = "serialize"))]
72                let ctx = Context::new(emitter, data, None, reply_listeners, timeout);
73
74                handle_connection::<L::Stream>(namespaces, handler, read_half, ctx).await;
75            });
76        }
77
78        Ok(())
79    }
80}