rmp_ipc/ipc/
client.rs

1use super::handle_connection;
2use crate::error::Result;
3use crate::events::event_handler::EventHandler;
4use crate::ipc::context::{Context, ReplyListeners};
5use crate::ipc::stream_emitter::StreamEmitter;
6use crate::namespaces::namespace::Namespace;
7use std::collections::HashMap;
8use std::sync::Arc;
9use tokio::net::TcpStream;
10use tokio::sync::oneshot;
11use tokio::sync::RwLock;
12use typemap_rev::TypeMap;
13
14/// The IPC Client to connect to an IPC Server.
15/// Use the [IPCBuilder](crate::builder::IPCBuilder) to create the client.
16/// Usually one does not need to use the IPCClient object directly.
17#[derive(Clone)]
18pub struct IPCClient {
19    pub(crate) handler: EventHandler,
20    pub(crate) namespaces: HashMap<String, Namespace>,
21    pub(crate) data: Arc<RwLock<TypeMap>>,
22    pub(crate) reply_listeners: ReplyListeners,
23}
24
25impl IPCClient {
26    /// Connects to a given address and returns an emitter for events to that address.
27    /// Invoked by [IPCBuilder::build_client](crate::builder::IPCBuilder::build_client)
28    #[tracing::instrument(skip(self))]
29    pub async fn connect(self, address: &str) -> Result<Context> {
30        let stream = TcpStream::connect(address).await?;
31        let (read_half, write_half) = stream.into_split();
32        let emitter = StreamEmitter::new(write_half);
33        let (tx, rx) = oneshot::channel();
34        let ctx = Context::new(
35            StreamEmitter::clone(&emitter),
36            self.data,
37            Some(tx),
38            self.reply_listeners,
39        );
40        let handler = Arc::new(self.handler);
41        let namespaces = Arc::new(self.namespaces);
42
43        let handle = tokio::spawn({
44            let ctx = Context::clone(&ctx);
45            async move {
46                handle_connection(namespaces, handler, read_half, ctx).await;
47            }
48        });
49        tokio::spawn(async move {
50            let _ = rx.await;
51            handle.abort();
52        });
53
54        Ok(ctx)
55    }
56}