1use crate::error::{Error, Result};
2use crate::events::error_event::{ErrorEventData, ERROR_EVENT_NAME};
3use crate::events::event::Event;
4use crate::events::event_handler::EventHandler;
5use crate::ipc::client::IPCClient;
6use crate::ipc::context::{Context, PooledContext, ReplyListeners};
7use crate::ipc::server::IPCServer;
8use crate::namespaces::builder::NamespaceBuilder;
9use crate::namespaces::namespace::Namespace;
10use std::collections::HashMap;
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::Arc;
14use tokio::sync::RwLock;
15use typemap_rev::{TypeMap, TypeMapKey};
16
17pub struct IPCBuilder {
50 handler: EventHandler,
51 address: Option<String>,
52 namespaces: HashMap<String, Namespace>,
53 data: TypeMap,
54}
55
56impl IPCBuilder {
57 pub fn new() -> Self {
58 let mut handler = EventHandler::new();
59 handler.on(ERROR_EVENT_NAME, |_, event| {
60 Box::pin(async move {
61 let error_data = event.data::<ErrorEventData>()?;
62 tracing::warn!(error_data.code);
63 tracing::warn!("error_data.message = '{}'", error_data.message);
64
65 Ok(())
66 })
67 });
68 Self {
69 handler,
70 address: None,
71 namespaces: HashMap::new(),
72 data: TypeMap::new(),
73 }
74 }
75
76 pub fn insert<K: TypeMapKey>(mut self, value: K::Value) -> Self {
78 self.data.insert::<K>(value);
79
80 self
81 }
82
83 pub fn on<F: 'static>(mut self, event: &str, callback: F) -> Self
85 where
86 F: for<'a> Fn(
87 &'a Context,
88 Event,
89 ) -> Pin<Box<(dyn Future<Output = Result<()>> + Send + 'a)>>
90 + Send
91 + Sync,
92 {
93 self.handler.on(event, callback);
94
95 self
96 }
97
98 pub fn address<S: ToString>(mut self, address: S) -> Self {
100 self.address = Some(address.to_string());
101
102 self
103 }
104
105 pub fn namespace<S: ToString>(self, name: S) -> NamespaceBuilder {
107 NamespaceBuilder::new(self, name.to_string())
108 }
109
110 pub fn add_namespace(mut self, namespace: Namespace) -> Self {
112 self.namespaces
113 .insert(namespace.name().to_owned(), namespace);
114
115 self
116 }
117
118 #[tracing::instrument(skip(self))]
120 pub async fn build_server(self) -> Result<()> {
121 self.validate()?;
122 let server = IPCServer {
123 namespaces: self.namespaces,
124 handler: self.handler,
125 data: self.data,
126 };
127 server.start(&self.address.unwrap()).await?;
128
129 Ok(())
130 }
131
132 #[tracing::instrument(skip(self))]
134 pub async fn build_client(self) -> Result<Context> {
135 self.validate()?;
136 let data = Arc::new(RwLock::new(self.data));
137 let reply_listeners = ReplyListeners::default();
138 let client = IPCClient {
139 namespaces: self.namespaces,
140 handler: self.handler,
141 data,
142 reply_listeners,
143 };
144
145 let ctx = client.connect(&self.address.unwrap()).await?;
146
147 Ok(ctx)
148 }
149
150 #[tracing::instrument(skip(self))]
155 pub async fn build_pooled_client(self, pool_size: usize) -> Result<PooledContext> {
156 if pool_size == 0 {
157 Error::BuildError("Pool size must be greater than 0".to_string());
158 }
159 self.validate()?;
160 let data = Arc::new(RwLock::new(self.data));
161 let mut contexts = Vec::new();
162 let address = self.address.unwrap();
163 let reply_listeners = ReplyListeners::default();
164
165 for _ in 0..pool_size {
166 let client = IPCClient {
167 namespaces: self.namespaces.clone(),
168 handler: self.handler.clone(),
169 data: Arc::clone(&data),
170 reply_listeners: Arc::clone(&reply_listeners),
171 };
172
173 let ctx = client.connect(&address).await?;
174 contexts.push(ctx);
175 }
176
177 Ok(PooledContext::new(contexts))
178 }
179
180 #[tracing::instrument(skip(self))]
182 fn validate(&self) -> Result<()> {
183 if self.address.is_none() {
184 Err(Error::BuildError("Missing Address".to_string()))
185 } else {
186 Ok(())
187 }
188 }
189}