1use super::handle_connection;
2use crate::error::Result;
3use crate::events::event_handler::EventHandler;
4use crate::ipc::context::{Context, ReplyListeners};
5use crate::ipc::stream_emitter::StreamEmitter;
6use crate::namespaces::namespace::Namespace;
7use std::collections::HashMap;
8use std::sync::Arc;
9use tokio::net::TcpStream;
10use tokio::sync::oneshot;
11use tokio::sync::RwLock;
12use typemap_rev::TypeMap;
13
14#[derive(Clone)]
18pub struct IPCClient {
19 pub(crate) handler: EventHandler,
20 pub(crate) namespaces: HashMap<String, Namespace>,
21 pub(crate) data: Arc<RwLock<TypeMap>>,
22 pub(crate) reply_listeners: ReplyListeners,
23}
24
25impl IPCClient {
26 #[tracing::instrument(skip(self))]
29 pub async fn connect(self, address: &str) -> Result<Context> {
30 let stream = TcpStream::connect(address).await?;
31 let (read_half, write_half) = stream.into_split();
32 let emitter = StreamEmitter::new(write_half);
33 let (tx, rx) = oneshot::channel();
34 let ctx = Context::new(
35 StreamEmitter::clone(&emitter),
36 self.data,
37 Some(tx),
38 self.reply_listeners,
39 );
40 let handler = Arc::new(self.handler);
41 let namespaces = Arc::new(self.namespaces);
42
43 let handle = tokio::spawn({
44 let ctx = Context::clone(&ctx);
45 async move {
46 handle_connection(namespaces, handler, read_half, ctx).await;
47 }
48 });
49 tokio::spawn(async move {
50 let _ = rx.await;
51 handle.abort();
52 });
53
54 Ok(ctx)
55 }
56}