1use std::collections::HashMap;
4use std::fmt;
5use std::sync::{Arc, Mutex};
6use tokio::sync::mpsc;
7
8#[derive(Debug, Clone)]
10pub struct ConnectorConfig {
11 pub name: String,
12 pub connector_type: String,
13 pub properties: HashMap<String, String>,
14}
15
16impl fmt::Display for ConnectorConfig {
17 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
18 write!(f, "<connector {}:{}>", self.connector_type, self.name)
19 }
20}
21
22pub trait Connector: Send + Sync {
24 fn name(&self) -> &str;
25 fn connector_type(&self) -> &str;
26 fn send(&self, message: &str) -> Result<(), String>;
27 fn recv(&self, timeout_ms: u64) -> Result<Option<String>, String>;
28}
29
30pub struct ChannelConnector {
32 name: String,
33 tx: mpsc::Sender<String>,
34 rx: Arc<Mutex<mpsc::Receiver<String>>>,
35}
36
37impl ChannelConnector {
38 pub fn new(name: &str, buffer_size: usize) -> Self {
39 let (tx, rx) = mpsc::channel(buffer_size);
40 ChannelConnector {
41 name: name.to_string(),
42 tx,
43 rx: Arc::new(Mutex::new(rx)),
44 }
45 }
46
47 pub fn pair(name: &str, buffer_size: usize) -> (ChannelSender, ChannelReceiver) {
49 let (tx, rx) = mpsc::channel(buffer_size);
50 (
51 ChannelSender {
52 name: name.to_string(),
53 tx,
54 },
55 ChannelReceiver {
56 name: name.to_string(),
57 rx: Arc::new(Mutex::new(rx)),
58 },
59 )
60 }
61}
62
63impl Connector for ChannelConnector {
64 fn name(&self) -> &str {
65 &self.name
66 }
67
68 fn connector_type(&self) -> &str {
69 "channel"
70 }
71
72 fn send(&self, message: &str) -> Result<(), String> {
73 self.tx
74 .blocking_send(message.to_string())
75 .map_err(|e| format!("Channel send failed: {e}"))
76 }
77
78 fn recv(&self, timeout_ms: u64) -> Result<Option<String>, String> {
79 let mut rx = self.rx.lock().map_err(|e| format!("Lock error: {e}"))?;
80 let rt = tokio::runtime::Handle::try_current();
81 match rt {
82 Ok(handle) => {
83 handle.block_on(async {
85 match tokio::time::timeout(
86 std::time::Duration::from_millis(timeout_ms),
87 rx.recv(),
88 )
89 .await
90 {
91 Ok(Some(msg)) => Ok(Some(msg)),
92 Ok(None) => Ok(None), Err(_) => Ok(None), }
95 })
96 }
97 Err(_) => {
98 let rt = tokio::runtime::Runtime::new()
100 .map_err(|e| format!("Failed to create runtime: {e}"))?;
101 rt.block_on(async {
102 match tokio::time::timeout(
103 std::time::Duration::from_millis(timeout_ms),
104 rx.recv(),
105 )
106 .await
107 {
108 Ok(Some(msg)) => Ok(Some(msg)),
109 Ok(None) => Ok(None),
110 Err(_) => Ok(None),
111 }
112 })
113 }
114 }
115 }
116}
117
118pub struct ChannelSender {
120 name: String,
121 tx: mpsc::Sender<String>,
122}
123
124impl Connector for ChannelSender {
125 fn name(&self) -> &str {
126 &self.name
127 }
128 fn connector_type(&self) -> &str {
129 "channel"
130 }
131 fn send(&self, message: &str) -> Result<(), String> {
132 self.tx
133 .blocking_send(message.to_string())
134 .map_err(|e| format!("Channel send failed: {e}"))
135 }
136 fn recv(&self, _timeout_ms: u64) -> Result<Option<String>, String> {
137 Err("ChannelSender does not support recv".to_string())
138 }
139}
140
141pub struct ChannelReceiver {
143 name: String,
144 rx: Arc<Mutex<mpsc::Receiver<String>>>,
145}
146
147impl Connector for ChannelReceiver {
148 fn name(&self) -> &str {
149 &self.name
150 }
151 fn connector_type(&self) -> &str {
152 "channel"
153 }
154 fn send(&self, _message: &str) -> Result<(), String> {
155 Err("ChannelReceiver does not support send".to_string())
156 }
157 fn recv(&self, timeout_ms: u64) -> Result<Option<String>, String> {
158 let mut rx = self.rx.lock().map_err(|e| format!("Lock error: {e}"))?;
159 let rt =
160 tokio::runtime::Runtime::new().map_err(|e| format!("Failed to create runtime: {e}"))?;
161 rt.block_on(async {
162 match tokio::time::timeout(std::time::Duration::from_millis(timeout_ms), rx.recv())
163 .await
164 {
165 Ok(Some(msg)) => Ok(Some(msg)),
166 Ok(None) => Ok(None),
167 Err(_) => Ok(None),
168 }
169 })
170 }
171}
172
173#[cfg(feature = "kafka")]
175pub mod kafka;
176
177type SendFn = Box<dyn Fn(&str) -> Result<(), String> + Send + Sync>;
178type RecvFn = Box<dyn Fn(u64) -> Result<Option<String>, String> + Send + Sync>;
179
180pub struct UserDefinedConnector {
183 name: String,
184 send_fn: SendFn,
185 recv_fn: RecvFn,
186}
187
188impl UserDefinedConnector {
189 pub fn new(name: String, send_fn: SendFn, recv_fn: RecvFn) -> Self {
190 UserDefinedConnector {
191 name,
192 send_fn,
193 recv_fn,
194 }
195 }
196}
197
198impl Connector for UserDefinedConnector {
199 fn name(&self) -> &str {
200 &self.name
201 }
202
203 fn connector_type(&self) -> &str {
204 "user_defined"
205 }
206
207 fn send(&self, message: &str) -> Result<(), String> {
208 (self.send_fn)(message)
209 }
210
211 fn recv(&self, timeout_ms: u64) -> Result<Option<String>, String> {
212 (self.recv_fn)(timeout_ms)
213 }
214}
215
216pub fn create_connector(config: &ConnectorConfig) -> Result<Box<dyn Connector>, String> {
218 match config.connector_type.as_str() {
219 "channel" => {
220 let buffer = config
221 .properties
222 .get("buffer")
223 .and_then(|s| s.parse::<usize>().ok())
224 .unwrap_or(1000);
225 Ok(Box::new(ChannelConnector::new(&config.name, buffer)))
226 }
227 #[cfg(feature = "kafka")]
228 "kafka" => kafka::create_kafka_connector(config),
229 other => Err(format!("Unknown connector type: {other}")),
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236
237 #[test]
238 fn test_channel_connector_send_recv() {
239 let rt = tokio::runtime::Runtime::new().unwrap();
240 rt.block_on(async {
241 let conn = ChannelConnector::new("test", 10);
242 conn.tx.send("hello".to_string()).await.unwrap();
243 let msg = {
244 let mut rx = conn.rx.lock().unwrap();
245 rx.recv().await
246 };
247 assert_eq!(msg, Some("hello".to_string()));
248 });
249 }
250
251 #[test]
252 fn test_channel_pair() {
253 let rt = tokio::runtime::Runtime::new().unwrap();
254 rt.block_on(async {
255 let (sender, receiver) = ChannelConnector::pair("test_pair", 10);
256 sender.tx.send("world".to_string()).await.unwrap();
257 let msg = {
258 let mut rx = receiver.rx.lock().unwrap();
259 rx.recv().await
260 };
261 assert_eq!(msg, Some("world".to_string()));
262 });
263 }
264
265 #[test]
266 fn test_connector_config_display() {
267 let config = ConnectorConfig {
268 name: "my_source".to_string(),
269 connector_type: "channel".to_string(),
270 properties: HashMap::new(),
271 };
272 assert_eq!(format!("{config}"), "<connector channel:my_source>");
273 }
274
275 #[test]
276 fn test_create_connector_channel() {
277 let config = ConnectorConfig {
278 name: "test".to_string(),
279 connector_type: "channel".to_string(),
280 properties: HashMap::new(),
281 };
282 let conn = create_connector(&config);
283 assert!(conn.is_ok());
284 let conn = conn.unwrap();
285 assert_eq!(conn.name(), "test");
286 assert_eq!(conn.connector_type(), "channel");
287 }
288
289 #[test]
290 fn test_create_connector_unknown() {
291 let config = ConnectorConfig {
292 name: "test".to_string(),
293 connector_type: "unknown".to_string(),
294 properties: HashMap::new(),
295 };
296 let conn = create_connector(&config);
297 assert!(conn.is_err());
298 }
299
300 #[test]
301 fn test_user_defined_connector() {
302 let sent = Arc::new(Mutex::new(Vec::new()));
303 let sent_clone = sent.clone();
304
305 let conn = UserDefinedConnector::new(
306 "my_connector".to_string(),
307 Box::new(move |msg: &str| {
308 sent_clone.lock().unwrap().push(msg.to_string());
309 Ok(())
310 }),
311 Box::new(|_timeout_ms| Ok(Some("received".to_string()))),
312 );
313
314 assert_eq!(conn.name(), "my_connector");
315 assert_eq!(conn.connector_type(), "user_defined");
316 conn.send("hello").unwrap();
317 assert_eq!(sent.lock().unwrap().len(), 1);
318
319 let msg = conn.recv(1000).unwrap();
320 assert_eq!(msg, Some("received".to_string()));
321 }
322
323 #[test]
324 fn test_user_defined_connector_as_trait_object() {
325 let conn: Box<dyn Connector> = Box::new(UserDefinedConnector::new(
326 "test_udc".to_string(),
327 Box::new(|_msg| Ok(())),
328 Box::new(|_timeout| Ok(None)),
329 ));
330 assert_eq!(conn.name(), "test_udc");
331 assert_eq!(conn.connector_type(), "user_defined");
332 let msg = conn.recv(100).unwrap();
333 assert_eq!(msg, None);
334 }
335}