rmp_ipc/ipc/
builder.rs

1use crate::error::{Error, Result};
2use crate::events::error_event::{ErrorEventData, ERROR_EVENT_NAME};
3use crate::events::event::Event;
4use crate::events::event_handler::EventHandler;
5use crate::ipc::client::IPCClient;
6use crate::ipc::context::{Context, PooledContext, ReplyListeners};
7use crate::ipc::server::IPCServer;
8use crate::namespaces::builder::NamespaceBuilder;
9use crate::namespaces::namespace::Namespace;
10use std::collections::HashMap;
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::Arc;
14use tokio::sync::RwLock;
15use typemap_rev::{TypeMap, TypeMapKey};
16
17/// A builder for the IPC server or client.
18/// ```no_run
19///use typemap_rev::TypeMapKey;
20/// use rmp_ipc::IPCBuilder;
21///
22/// struct CustomKey;
23///
24/// impl TypeMapKey for CustomKey {
25///     type Value = String;
26/// }
27///
28///# async fn a() {
29/// IPCBuilder::new()
30///     .address("127.0.0.1:2020")
31///    // register callback
32///     .on("ping", |_ctx, _event| Box::pin(async move {
33///         println!("Received ping event.");
34///         Ok(())
35///     }))
36///     // register a namespace    
37///     .namespace("namespace")
38///     .on("namespace-event", |_ctx, _event| Box::pin(async move {
39///         println!("Namespace event.");
40///         Ok(())
41///     }))
42///     .build()
43///     // add context shared data
44///     .insert::<CustomKey>("Hello World".to_string())
45///     // can also be build_client which would return an emitter for events
46///     .build_server().await.unwrap();
47///# }
48/// ```
49pub struct IPCBuilder {
50    handler: EventHandler,
51    address: Option<String>,
52    namespaces: HashMap<String, Namespace>,
53    data: TypeMap,
54}
55
56impl IPCBuilder {
57    pub fn new() -> Self {
58        let mut handler = EventHandler::new();
59        handler.on(ERROR_EVENT_NAME, |_, event| {
60            Box::pin(async move {
61                let error_data = event.data::<ErrorEventData>()?;
62                tracing::warn!(error_data.code);
63                tracing::warn!("error_data.message = '{}'", error_data.message);
64
65                Ok(())
66            })
67        });
68        Self {
69            handler,
70            address: None,
71            namespaces: HashMap::new(),
72            data: TypeMap::new(),
73        }
74    }
75
76    /// Adds globally shared data
77    pub fn insert<K: TypeMapKey>(mut self, value: K::Value) -> Self {
78        self.data.insert::<K>(value);
79
80        self
81    }
82
83    /// Adds an event callback
84    pub fn on<F: 'static>(mut self, event: &str, callback: F) -> Self
85    where
86        F: for<'a> Fn(
87                &'a Context,
88                Event,
89            ) -> Pin<Box<(dyn Future<Output = Result<()>> + Send + 'a)>>
90            + Send
91            + Sync,
92    {
93        self.handler.on(event, callback);
94
95        self
96    }
97
98    /// Adds the address to connect to
99    pub fn address<S: ToString>(mut self, address: S) -> Self {
100        self.address = Some(address.to_string());
101
102        self
103    }
104
105    /// Adds a namespace
106    pub fn namespace<S: ToString>(self, name: S) -> NamespaceBuilder {
107        NamespaceBuilder::new(self, name.to_string())
108    }
109
110    /// Adds a namespace to the ipc server
111    pub fn add_namespace(mut self, namespace: Namespace) -> Self {
112        self.namespaces
113            .insert(namespace.name().to_owned(), namespace);
114
115        self
116    }
117
118    /// Builds an ipc server
119    #[tracing::instrument(skip(self))]
120    pub async fn build_server(self) -> Result<()> {
121        self.validate()?;
122        let server = IPCServer {
123            namespaces: self.namespaces,
124            handler: self.handler,
125            data: self.data,
126        };
127        server.start(&self.address.unwrap()).await?;
128
129        Ok(())
130    }
131
132    /// Builds an ipc client
133    #[tracing::instrument(skip(self))]
134    pub async fn build_client(self) -> Result<Context> {
135        self.validate()?;
136        let data = Arc::new(RwLock::new(self.data));
137        let reply_listeners = ReplyListeners::default();
138        let client = IPCClient {
139            namespaces: self.namespaces,
140            handler: self.handler,
141            data,
142            reply_listeners,
143        };
144
145        let ctx = client.connect(&self.address.unwrap()).await?;
146
147        Ok(ctx)
148    }
149
150    /// Builds a pooled IPC client
151    /// This causes the builder to actually create `pool_size` clients and
152    /// return a [crate::context::PooledContext] that allows one to [crate::context::PooledContext::acquire] a single context
153    /// to emit events.
154    #[tracing::instrument(skip(self))]
155    pub async fn build_pooled_client(self, pool_size: usize) -> Result<PooledContext> {
156        if pool_size == 0 {
157            Error::BuildError("Pool size must be greater than 0".to_string());
158        }
159        self.validate()?;
160        let data = Arc::new(RwLock::new(self.data));
161        let mut contexts = Vec::new();
162        let address = self.address.unwrap();
163        let reply_listeners = ReplyListeners::default();
164
165        for _ in 0..pool_size {
166            let client = IPCClient {
167                namespaces: self.namespaces.clone(),
168                handler: self.handler.clone(),
169                data: Arc::clone(&data),
170                reply_listeners: Arc::clone(&reply_listeners),
171            };
172
173            let ctx = client.connect(&address).await?;
174            contexts.push(ctx);
175        }
176
177        Ok(PooledContext::new(contexts))
178    }
179
180    /// Validates that all required fields have been provided
181    #[tracing::instrument(skip(self))]
182    fn validate(&self) -> Result<()> {
183        if self.address.is_none() {
184            Err(Error::BuildError("Missing Address".to_string()))
185        } else {
186            Ok(())
187        }
188    }
189}