Skip to main content

tl_stream/
connector.rs

1// ThinkingLanguage — Connector abstraction for streaming I/O
2
3use std::collections::HashMap;
4use std::fmt;
5use std::sync::{Arc, Mutex};
6use tokio::sync::mpsc;
7
8/// Configuration for a connector instance.
9#[derive(Debug, Clone)]
10pub struct ConnectorConfig {
11    pub name: String,
12    pub connector_type: String,
13    pub properties: HashMap<String, String>,
14}
15
16impl fmt::Display for ConnectorConfig {
17    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
18        write!(f, "<connector {}:{}>", self.connector_type, self.name)
19    }
20}
21
22/// Trait for connectors that can send and receive string messages.
23pub trait Connector: Send + Sync {
24    fn name(&self) -> &str;
25    fn connector_type(&self) -> &str;
26    fn send(&self, message: &str) -> Result<(), String>;
27    fn recv(&self, timeout_ms: u64) -> Result<Option<String>, String>;
28}
29
30/// In-memory channel connector for testing and inter-pipeline communication.
31pub struct ChannelConnector {
32    name: String,
33    tx: mpsc::Sender<String>,
34    rx: Arc<Mutex<mpsc::Receiver<String>>>,
35}
36
37impl ChannelConnector {
38    pub fn new(name: &str, buffer_size: usize) -> Self {
39        let (tx, rx) = mpsc::channel(buffer_size);
40        ChannelConnector {
41            name: name.to_string(),
42            tx,
43            rx: Arc::new(Mutex::new(rx)),
44        }
45    }
46
47    /// Create a paired set of connectors (source + sink) sharing a channel.
48    pub fn pair(name: &str, buffer_size: usize) -> (ChannelSender, ChannelReceiver) {
49        let (tx, rx) = mpsc::channel(buffer_size);
50        (
51            ChannelSender {
52                name: name.to_string(),
53                tx,
54            },
55            ChannelReceiver {
56                name: name.to_string(),
57                rx: Arc::new(Mutex::new(rx)),
58            },
59        )
60    }
61}
62
63impl Connector for ChannelConnector {
64    fn name(&self) -> &str {
65        &self.name
66    }
67
68    fn connector_type(&self) -> &str {
69        "channel"
70    }
71
72    fn send(&self, message: &str) -> Result<(), String> {
73        self.tx
74            .blocking_send(message.to_string())
75            .map_err(|e| format!("Channel send failed: {e}"))
76    }
77
78    fn recv(&self, timeout_ms: u64) -> Result<Option<String>, String> {
79        let mut rx = self.rx.lock().map_err(|e| format!("Lock error: {e}"))?;
80        let rt = tokio::runtime::Handle::try_current();
81        match rt {
82            Ok(handle) => {
83                // We're inside a tokio runtime
84                handle.block_on(async {
85                    match tokio::time::timeout(
86                        std::time::Duration::from_millis(timeout_ms),
87                        rx.recv(),
88                    )
89                    .await
90                    {
91                        Ok(Some(msg)) => Ok(Some(msg)),
92                        Ok(None) => Ok(None), // channel closed
93                        Err(_) => Ok(None),   // timeout
94                    }
95                })
96            }
97            Err(_) => {
98                // No runtime — create one
99                let rt = tokio::runtime::Runtime::new()
100                    .map_err(|e| format!("Failed to create runtime: {e}"))?;
101                rt.block_on(async {
102                    match tokio::time::timeout(
103                        std::time::Duration::from_millis(timeout_ms),
104                        rx.recv(),
105                    )
106                    .await
107                    {
108                        Ok(Some(msg)) => Ok(Some(msg)),
109                        Ok(None) => Ok(None),
110                        Err(_) => Ok(None),
111                    }
112                })
113            }
114        }
115    }
116}
117
118/// Send-only half of a channel pair.
119pub struct ChannelSender {
120    name: String,
121    tx: mpsc::Sender<String>,
122}
123
124impl Connector for ChannelSender {
125    fn name(&self) -> &str {
126        &self.name
127    }
128    fn connector_type(&self) -> &str {
129        "channel"
130    }
131    fn send(&self, message: &str) -> Result<(), String> {
132        self.tx
133            .blocking_send(message.to_string())
134            .map_err(|e| format!("Channel send failed: {e}"))
135    }
136    fn recv(&self, _timeout_ms: u64) -> Result<Option<String>, String> {
137        Err("ChannelSender does not support recv".to_string())
138    }
139}
140
141/// Receive-only half of a channel pair.
142pub struct ChannelReceiver {
143    name: String,
144    rx: Arc<Mutex<mpsc::Receiver<String>>>,
145}
146
147impl Connector for ChannelReceiver {
148    fn name(&self) -> &str {
149        &self.name
150    }
151    fn connector_type(&self) -> &str {
152        "channel"
153    }
154    fn send(&self, _message: &str) -> Result<(), String> {
155        Err("ChannelReceiver does not support send".to_string())
156    }
157    fn recv(&self, timeout_ms: u64) -> Result<Option<String>, String> {
158        let mut rx = self.rx.lock().map_err(|e| format!("Lock error: {e}"))?;
159        let rt =
160            tokio::runtime::Runtime::new().map_err(|e| format!("Failed to create runtime: {e}"))?;
161        rt.block_on(async {
162            match tokio::time::timeout(std::time::Duration::from_millis(timeout_ms), rx.recv())
163                .await
164            {
165                Ok(Some(msg)) => Ok(Some(msg)),
166                Ok(None) => Ok(None),
167                Err(_) => Ok(None),
168            }
169        })
170    }
171}
172
173/// Kafka connector (feature-gated).
174#[cfg(feature = "kafka")]
175pub mod kafka;
176
177type SendFn = Box<dyn Fn(&str) -> Result<(), String> + Send + Sync>;
178type RecvFn = Box<dyn Fn(u64) -> Result<Option<String>, String> + Send + Sync>;
179
180/// User-defined connector wrapping TL struct methods.
181/// Allows TL code to create custom connectors by implementing send/recv.
182pub struct UserDefinedConnector {
183    name: String,
184    send_fn: SendFn,
185    recv_fn: RecvFn,
186}
187
188impl UserDefinedConnector {
189    pub fn new(name: String, send_fn: SendFn, recv_fn: RecvFn) -> Self {
190        UserDefinedConnector {
191            name,
192            send_fn,
193            recv_fn,
194        }
195    }
196}
197
198impl Connector for UserDefinedConnector {
199    fn name(&self) -> &str {
200        &self.name
201    }
202
203    fn connector_type(&self) -> &str {
204        "user_defined"
205    }
206
207    fn send(&self, message: &str) -> Result<(), String> {
208        (self.send_fn)(message)
209    }
210
211    fn recv(&self, timeout_ms: u64) -> Result<Option<String>, String> {
212        (self.recv_fn)(timeout_ms)
213    }
214}
215
216/// Factory function to create a connector from config.
217pub fn create_connector(config: &ConnectorConfig) -> Result<Box<dyn Connector>, String> {
218    match config.connector_type.as_str() {
219        "channel" => {
220            let buffer = config
221                .properties
222                .get("buffer")
223                .and_then(|s| s.parse::<usize>().ok())
224                .unwrap_or(1000);
225            Ok(Box::new(ChannelConnector::new(&config.name, buffer)))
226        }
227        #[cfg(feature = "kafka")]
228        "kafka" => kafka::create_kafka_connector(config),
229        other => Err(format!("Unknown connector type: {other}")),
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236
237    #[test]
238    fn test_channel_connector_send_recv() {
239        let rt = tokio::runtime::Runtime::new().unwrap();
240        rt.block_on(async {
241            let conn = ChannelConnector::new("test", 10);
242            conn.tx.send("hello".to_string()).await.unwrap();
243            let msg = {
244                let mut rx = conn.rx.lock().unwrap();
245                rx.recv().await
246            };
247            assert_eq!(msg, Some("hello".to_string()));
248        });
249    }
250
251    #[test]
252    fn test_channel_pair() {
253        let rt = tokio::runtime::Runtime::new().unwrap();
254        rt.block_on(async {
255            let (sender, receiver) = ChannelConnector::pair("test_pair", 10);
256            sender.tx.send("world".to_string()).await.unwrap();
257            let msg = {
258                let mut rx = receiver.rx.lock().unwrap();
259                rx.recv().await
260            };
261            assert_eq!(msg, Some("world".to_string()));
262        });
263    }
264
265    #[test]
266    fn test_connector_config_display() {
267        let config = ConnectorConfig {
268            name: "my_source".to_string(),
269            connector_type: "channel".to_string(),
270            properties: HashMap::new(),
271        };
272        assert_eq!(format!("{config}"), "<connector channel:my_source>");
273    }
274
275    #[test]
276    fn test_create_connector_channel() {
277        let config = ConnectorConfig {
278            name: "test".to_string(),
279            connector_type: "channel".to_string(),
280            properties: HashMap::new(),
281        };
282        let conn = create_connector(&config);
283        assert!(conn.is_ok());
284        let conn = conn.unwrap();
285        assert_eq!(conn.name(), "test");
286        assert_eq!(conn.connector_type(), "channel");
287    }
288
289    #[test]
290    fn test_create_connector_unknown() {
291        let config = ConnectorConfig {
292            name: "test".to_string(),
293            connector_type: "unknown".to_string(),
294            properties: HashMap::new(),
295        };
296        let conn = create_connector(&config);
297        assert!(conn.is_err());
298    }
299
300    #[test]
301    fn test_user_defined_connector() {
302        let sent = Arc::new(Mutex::new(Vec::new()));
303        let sent_clone = sent.clone();
304
305        let conn = UserDefinedConnector::new(
306            "my_connector".to_string(),
307            Box::new(move |msg: &str| {
308                sent_clone.lock().unwrap().push(msg.to_string());
309                Ok(())
310            }),
311            Box::new(|_timeout_ms| Ok(Some("received".to_string()))),
312        );
313
314        assert_eq!(conn.name(), "my_connector");
315        assert_eq!(conn.connector_type(), "user_defined");
316        conn.send("hello").unwrap();
317        assert_eq!(sent.lock().unwrap().len(), 1);
318
319        let msg = conn.recv(1000).unwrap();
320        assert_eq!(msg, Some("received".to_string()));
321    }
322
323    #[test]
324    fn test_user_defined_connector_as_trait_object() {
325        let conn: Box<dyn Connector> = Box::new(UserDefinedConnector::new(
326            "test_udc".to_string(),
327            Box::new(|_msg| Ok(())),
328            Box::new(|_timeout| Ok(None)),
329        ));
330        assert_eq!(conn.name(), "test_udc");
331        assert_eq!(conn.connector_type(), "user_defined");
332        let msg = conn.recv(100).unwrap();
333        assert_eq!(msg, None);
334    }
335}