Skip to main content

lau_port/
lib.rs

1use serde::{Deserialize, Serialize};
2use std::sync::Mutex;
3use std::collections::HashMap;
4use uuid::Uuid;
5
6// ─── Protocols ──────────────────────────────────────────────────────────────
7
8#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
9pub enum PortProtocol {
10    Telegram,
11    Discord,
12    Slack,
13    WebSocket,
14    Http,
15    Https,
16    Mqtt,
17    Amqp,
18    Serial,
19    Gpio,
20    I2c,
21    Spi,
22    StdinStdout,
23    File(String),
24    Custom(String),
25}
26
27impl PortProtocol {
28    pub fn label(&self) -> &str {
29        match self {
30            PortProtocol::Telegram => "Telegram",
31            PortProtocol::Discord => "Discord",
32            PortProtocol::Slack => "Slack",
33            PortProtocol::WebSocket => "WebSocket",
34            PortProtocol::Http => "HTTP",
35            PortProtocol::Https => "HTTPS",
36            PortProtocol::Mqtt => "MQTT",
37            PortProtocol::Amqp => "AMQP",
38            PortProtocol::Serial => "Serial",
39            PortProtocol::Gpio => "GPIO",
40            PortProtocol::I2c => "I2C",
41            PortProtocol::Spi => "SPI",
42            PortProtocol::StdinStdout => "stdin/stdout",
43            PortProtocol::File(_) => "File",
44            PortProtocol::Custom(name) => name.as_str(),
45        }
46    }
47
48    pub fn is_network(&self) -> bool {
49        matches!(
50            self,
51            PortProtocol::Telegram
52                | PortProtocol::Discord
53                | PortProtocol::Slack
54                | PortProtocol::WebSocket
55                | PortProtocol::Http
56                | PortProtocol::Https
57                | PortProtocol::Mqtt
58                | PortProtocol::Amqp
59        )
60    }
61
62    pub fn is_hardware(&self) -> bool {
63        matches!(
64            self,
65            PortProtocol::Serial | PortProtocol::Gpio | PortProtocol::I2c | PortProtocol::Spi
66        )
67    }
68
69    pub fn is_filesystem(&self) -> bool {
70        matches!(self, PortProtocol::File(_))
71    }
72}
73
74// ─── Direction ──────────────────────────────────────────────────────────────
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
77pub enum PortDirection {
78    Inbound,
79    Outbound,
80    Bidirectional,
81}
82
83impl PortDirection {
84    pub fn can_send(&self) -> bool {
85        matches!(self, PortDirection::Outbound | PortDirection::Bidirectional)
86    }
87
88    pub fn can_receive(&self) -> bool {
89        matches!(self, PortDirection::Inbound | PortDirection::Bidirectional)
90    }
91}
92
93// ─── Message content ────────────────────────────────────────────────────────
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct SensorReading {
97    pub sensor_id: String,
98    pub value: f64,
99    pub unit: String,
100    pub timestamp: i64,
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
104pub enum MessageContent {
105    Text(String),
106    Html(String),
107    Markdown(String),
108    Image {
109        data: Vec<u8>,
110        mime: String,
111        caption: Option<String>,
112    },
113    Audio {
114        data: Vec<u8>,
115        mime: String,
116        duration_ms: Option<u32>,
117    },
118    File {
119        data: Vec<u8>,
120        mime: String,
121        name: String,
122    },
123    Command {
124        name: String,
125        args: Vec<String>,
126    },
127    Event {
128        event_type: String,
129        payload: String,
130    },
131    Sensor {
132        readings: Vec<SensorReading>,
133    },
134}
135
136impl MessageContent {
137    pub fn text(content: &str) -> Self {
138        MessageContent::Text(content.to_string())
139    }
140
141    pub fn command(name: &str, args: &[&str]) -> Self {
142        MessageContent::Command {
143            name: name.to_string(),
144            args: args.iter().map(|a| a.to_string()).collect(),
145        }
146    }
147
148    pub fn is_text(&self) -> bool {
149        matches!(self, MessageContent::Text(_))
150    }
151
152    pub fn is_command(&self) -> bool {
153        matches!(self, MessageContent::Command { .. })
154    }
155
156    pub fn as_text(&self) -> Option<&str> {
157        match self {
158            MessageContent::Text(t) => Some(t.as_str()),
159            _ => None,
160        }
161    }
162}
163
164// ─── Messages ───────────────────────────────────────────────────────────────
165
166#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct InboundMessage {
168    pub id: String,
169    pub port_id: String,
170    pub sender: Option<String>,
171    pub content: MessageContent,
172    pub timestamp: i64,
173    pub reply_to: Option<String>,
174    pub metadata: HashMap<String, String>,
175}
176
177impl InboundMessage {
178    pub fn text(content: &str) -> Self {
179        InboundMessage {
180            id: Uuid::new_v4().to_string(),
181            port_id: String::new(),
182            sender: None,
183            content: MessageContent::text(content),
184            timestamp: chrono::Utc::now().timestamp(),
185            reply_to: None,
186            metadata: HashMap::new(),
187        }
188    }
189
190    pub fn command(name: &str, args: &[&str]) -> Self {
191        InboundMessage {
192            id: Uuid::new_v4().to_string(),
193            port_id: String::new(),
194            sender: None,
195            content: MessageContent::command(name, args),
196            timestamp: chrono::Utc::now().timestamp(),
197            reply_to: None,
198            metadata: HashMap::new(),
199        }
200    }
201
202    pub fn is_text(&self) -> bool {
203        self.content.is_text()
204    }
205
206    pub fn is_command(&self) -> bool {
207        self.content.is_command()
208    }
209
210    pub fn as_text(&self) -> Option<&str> {
211        self.content.as_text()
212    }
213}
214
215#[derive(Debug, Clone, Serialize, Deserialize)]
216pub struct OutboundMessage {
217    pub id: String,
218    pub port_id: String,
219    pub target: Option<String>,
220    pub content: MessageContent,
221    pub reply_to: Option<String>,
222    pub metadata: HashMap<String, String>,
223}
224
225impl OutboundMessage {
226    pub fn text(content: &str) -> Self {
227        OutboundMessage {
228            id: Uuid::new_v4().to_string(),
229            port_id: String::new(),
230            target: None,
231            content: MessageContent::text(content),
232            reply_to: None,
233            metadata: HashMap::new(),
234        }
235    }
236
237    pub fn reply(reply_to: &str, content: &str) -> Self {
238        OutboundMessage {
239            id: Uuid::new_v4().to_string(),
240            port_id: String::new(),
241            target: None,
242            content: MessageContent::text(content),
243            reply_to: Some(reply_to.to_string()),
244            metadata: HashMap::new(),
245        }
246    }
247}
248
249// ─── Deadband ───────────────────────────────────────────────────────────────
250
251#[derive(Debug, Clone, Serialize, Deserialize)]
252pub struct PortDeadband {
253    pub lower: f64,
254    pub upper: f64,
255    pub check_interval_ms: u64,
256}
257
258// ─── Port Config ────────────────────────────────────────────────────────────
259
260#[derive(Debug, Clone, Serialize, Deserialize)]
261pub struct PortConfig {
262    pub id: String,
263    pub protocol: PortProtocol,
264    pub direction: PortDirection,
265    pub params: HashMap<String, String>,
266    pub enabled: bool,
267    #[serde(skip_serializing_if = "Option::is_none")]
268    pub max_rate: Option<u32>,
269    #[serde(skip_serializing_if = "Option::is_none")]
270    pub deadband: Option<PortDeadband>,
271}
272
273impl PortConfig {
274    pub fn from_json(json: &str) -> Result<Self, PortError> {
275        serde_json::from_str(json).map_err(|e| PortError::ConfigInvalid(e.to_string()))
276    }
277
278    pub fn to_json(&self) -> String {
279        serde_json::to_string_pretty(self).unwrap_or_default()
280    }
281}
282
283// ─── Port Stats ─────────────────────────────────────────────────────────────
284
285#[derive(Debug, Clone, Serialize, Deserialize, Default)]
286pub struct PortStats {
287    pub messages_sent: u64,
288    pub messages_received: u64,
289    pub bytes_sent: u64,
290    pub bytes_received: u64,
291    pub errors: u64,
292    pub last_activity: Option<i64>,
293    pub avg_latency_ms: f64,
294}
295
296// ─── Port Error ─────────────────────────────────────────────────────────────
297
298#[derive(Debug, Clone)]
299pub enum PortError {
300    NotConnected(String),
301    SendFailed(String),
302    ReceiveFailed(String),
303    ConfigInvalid(String),
304    RateLimited { retry_after_ms: u64 },
305    AuthFailed(String),
306    Timeout(u64),
307    ProtocolError(String),
308    ConnectFailed(String),
309}
310
311impl std::fmt::Display for PortError {
312    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313        match self {
314            PortError::NotConnected(s) => write!(f, "not connected: {s}"),
315            PortError::SendFailed(s) => write!(f, "send failed: {s}"),
316            PortError::ReceiveFailed(s) => write!(f, "receive failed: {s}"),
317            PortError::ConfigInvalid(s) => write!(f, "invalid config: {s}"),
318            PortError::RateLimited { retry_after_ms } => {
319                write!(f, "rate limited, retry after {retry_after_ms}ms")
320            }
321            PortError::AuthFailed(s) => write!(f, "auth failed: {s}"),
322            PortError::Timeout(ms) => write!(f, "timeout after {ms}ms"),
323            PortError::ProtocolError(s) => write!(f, "protocol error: {s}"),
324            PortError::ConnectFailed(s) => write!(f, "connect failed: {s}"),
325        }
326    }
327}
328
329impl std::error::Error for PortError {}
330
331// ─── Port Trait ─────────────────────────────────────────────────────────────
332
333pub trait Port: Send + Sync {
334    fn id(&self) -> &str;
335    fn protocol(&self) -> PortProtocol;
336    fn direction(&self) -> PortDirection;
337    fn send(&self, message: OutboundMessage) -> Result<(), PortError>;
338    fn receive(&self) -> Result<Vec<InboundMessage>, PortError>;
339    fn is_connected(&self) -> bool;
340    fn connect(&mut self) -> Result<(), PortError>;
341    fn disconnect(&mut self) -> Result<(), PortError>;
342    fn stats(&self) -> PortStats;
343}
344
345// ─── StdioPort ──────────────────────────────────────────────────────────────
346
347pub struct StdioPort {
348    id: String,
349    connected: Mutex<bool>,
350    direction: PortDirection,
351    stats: Mutex<PortStats>,
352}
353
354impl StdioPort {
355    pub fn new(id: &str) -> Self {
356        StdioPort {
357            id: id.to_string(),
358            connected: Mutex::new(false),
359            direction: PortDirection::Bidirectional,
360            stats: Mutex::new(PortStats::default()),
361        }
362    }
363}
364
365impl Port for StdioPort {
366    fn id(&self) -> &str {
367        &self.id
368    }
369
370    fn protocol(&self) -> PortProtocol {
371        PortProtocol::StdinStdout
372    }
373
374    fn direction(&self) -> PortDirection {
375        self.direction
376    }
377
378    fn send(&self, message: OutboundMessage) -> Result<(), PortError> {
379        if !*self.connected.lock().unwrap() {
380            return Err(PortError::NotConnected(
381                "stdio port is not connected".to_string(),
382            ));
383        }
384        let text = match &message.content {
385            MessageContent::Text(t) => t.clone(),
386            MessageContent::Html(h) => h.clone(),
387            MessageContent::Markdown(m) => m.clone(),
388            _ => return Err(PortError::SendFailed("unsupported content type".into())),
389        };
390        {
391            let mut stats = self.stats.lock().unwrap();
392            stats.messages_sent += 1;
393            stats.bytes_sent += text.len() as u64;
394            stats.last_activity = Some(chrono::Utc::now().timestamp());
395        }
396        println!("{text}");
397        Ok(())
398    }
399
400    fn receive(&self) -> Result<Vec<InboundMessage>, PortError> {
401        Err(PortError::NotConnected(
402            "stdio receive not implemented; use stdin reader externally".to_string(),
403        ))
404    }
405
406    fn is_connected(&self) -> bool {
407        *self.connected.lock().unwrap()
408    }
409
410    fn connect(&mut self) -> Result<(), PortError> {
411        *self.connected.lock().unwrap() = true;
412        Ok(())
413    }
414
415    fn disconnect(&mut self) -> Result<(), PortError> {
416        *self.connected.lock().unwrap() = false;
417        Ok(())
418    }
419
420    fn stats(&self) -> PortStats {
421        self.stats.lock().unwrap().clone()
422    }
423}
424
425// ─── MemoryPort ─────────────────────────────────────────────────────────────
426
427pub struct MemoryPort {
428    id: String,
429    protocol: PortProtocol,
430    direction: PortDirection,
431    inbox: Mutex<Vec<InboundMessage>>,
432    outbox: Mutex<Vec<OutboundMessage>>,
433    connected: Mutex<bool>,
434    stats: Mutex<PortStats>,
435}
436
437impl MemoryPort {
438    pub fn new(id: &str, protocol: PortProtocol, direction: PortDirection) -> Self {
439        MemoryPort {
440            id: id.to_string(),
441            protocol,
442            direction,
443            inbox: Mutex::new(Vec::new()),
444            outbox: Mutex::new(Vec::new()),
445            connected: Mutex::new(false),
446            stats: Mutex::new(PortStats::default()),
447        }
448    }
449
450    pub fn inject(&self, msg: InboundMessage) {
451        {
452            let mut stats = self.stats.lock().unwrap();
453            stats.messages_received += 1;
454            stats.bytes_received += content_bytes(&msg.content) as u64;
455            stats.last_activity = Some(chrono::Utc::now().timestamp());
456        }
457        self.inbox.lock().unwrap().push(msg);
458    }
459
460    pub fn drain_outbox(&self) -> Vec<OutboundMessage> {
461        self.outbox.lock().unwrap().drain(..).collect()
462    }
463}
464
465impl Port for MemoryPort {
466    fn id(&self) -> &str {
467        &self.id
468    }
469
470    fn protocol(&self) -> PortProtocol {
471        self.protocol.clone()
472    }
473
474    fn direction(&self) -> PortDirection {
475        self.direction
476    }
477
478    fn send(&self, message: OutboundMessage) -> Result<(), PortError> {
479        if !*self.connected.lock().unwrap() {
480            return Err(PortError::NotConnected(format!(
481                "memory port '{}' not connected",
482                self.id
483            )));
484        }
485        if !self.direction.can_send() {
486            return Err(PortError::SendFailed(format!(
487                "port '{}' is {:?}, cannot send",
488                self.id, self.direction
489            )));
490        }
491        {
492            let mut stats = self.stats.lock().unwrap();
493            stats.messages_sent += 1;
494            stats.bytes_sent += content_bytes(&message.content) as u64;
495            stats.last_activity = Some(chrono::Utc::now().timestamp());
496        }
497        self.outbox.lock().unwrap().push(message);
498        Ok(())
499    }
500
501    fn receive(&self) -> Result<Vec<InboundMessage>, PortError> {
502        if !*self.connected.lock().unwrap() {
503            return Err(PortError::NotConnected(format!(
504                "memory port '{}' not connected",
505                self.id
506            )));
507        }
508        if !self.direction.can_receive() {
509            return Err(PortError::ReceiveFailed(format!(
510                "port '{}' is {:?}, cannot receive",
511                self.id, self.direction
512            )));
513        }
514        Ok(self.inbox.lock().unwrap().drain(..).collect())
515    }
516
517    fn is_connected(&self) -> bool {
518        *self.connected.lock().unwrap()
519    }
520
521    fn connect(&mut self) -> Result<(), PortError> {
522        *self.connected.lock().unwrap() = true;
523        Ok(())
524    }
525
526    fn disconnect(&mut self) -> Result<(), PortError> {
527        *self.connected.lock().unwrap() = false;
528        Ok(())
529    }
530
531    fn stats(&self) -> PortStats {
532        self.stats.lock().unwrap().clone()
533    }
534}
535
536// ─── FilePort ───────────────────────────────────────────────────────────────
537
538#[derive(Debug, Clone, Serialize, Deserialize)]
539pub enum FileFormat {
540    Jsonl,
541    Csv,
542    Raw,
543}
544
545pub struct FilePort {
546    id: String,
547    path: String,
548    format: FileFormat,
549    connected: Mutex<bool>,
550    direction: PortDirection,
551    stats: Mutex<PortStats>,
552}
553
554impl FilePort {
555    pub fn new(id: &str, path: &str, format: FileFormat) -> Self {
556        FilePort {
557            id: id.to_string(),
558            path: path.to_string(),
559            format,
560            connected: Mutex::new(false),
561            direction: PortDirection::Outbound,
562            stats: Mutex::new(PortStats::default()),
563        }
564    }
565}
566
567impl Port for FilePort {
568    fn id(&self) -> &str {
569        &self.id
570    }
571
572    fn protocol(&self) -> PortProtocol {
573        PortProtocol::File(self.path.clone())
574    }
575
576    fn direction(&self) -> PortDirection {
577        self.direction
578    }
579
580    fn send(&self, message: OutboundMessage) -> Result<(), PortError> {
581        use std::io::Write;
582
583        if !*self.connected.lock().unwrap() {
584            return Err(PortError::NotConnected(format!(
585                "file port '{}' not connected",
586                self.id
587            )));
588        }
589
590        let line = match self.format {
591            FileFormat::Jsonl => serde_json::to_string(&message)
592                .map_err(|e| PortError::SendFailed(e.to_string()))?,
593            FileFormat::Csv => {
594                format!(
595                    "{},{},{}\n",
596                    message.id,
597                    message.target.unwrap_or_default(),
598                    text_preview(&message.content)
599                )
600            }
601            FileFormat::Raw => match &message.content {
602                MessageContent::Text(t) => t.clone(),
603                MessageContent::Html(h) => h.clone(),
604                MessageContent::Markdown(m) => m.clone(),
605                _ => return Err(PortError::SendFailed("unsupported content for raw".into())),
606            },
607        };
608
609        let mut file = std::fs::OpenOptions::new()
610            .create(true)
611            .append(true)
612            .open(&self.path)
613            .map_err(|e| PortError::SendFailed(e.to_string()))?;
614
615        writeln!(file, "{line}")
616            .map_err(|e| PortError::SendFailed(e.to_string()))?;
617
618        {
619            let mut stats = self.stats.lock().unwrap();
620            stats.messages_sent += 1;
621            stats.bytes_sent += line.len() as u64 + 1;
622            stats.last_activity = Some(chrono::Utc::now().timestamp());
623        }
624
625        Ok(())
626    }
627
628    fn receive(&self) -> Result<Vec<InboundMessage>, PortError> {
629        Err(PortError::NotConnected("file ports are write-only".into()))
630    }
631
632    fn is_connected(&self) -> bool {
633        *self.connected.lock().unwrap()
634    }
635
636    fn connect(&mut self) -> Result<(), PortError> {
637        let parent = std::path::Path::new(&self.path).parent().unwrap_or(std::path::Path::new("."));
638        if !parent.exists() {
639            std::fs::create_dir_all(parent)
640                .map_err(|e| PortError::ConnectFailed(e.to_string()))?;
641        }
642        *self.connected.lock().unwrap() = true;
643        Ok(())
644    }
645
646    fn disconnect(&mut self) -> Result<(), PortError> {
647        *self.connected.lock().unwrap() = false;
648        Ok(())
649    }
650
651    fn stats(&self) -> PortStats {
652        self.stats.lock().unwrap().clone()
653    }
654}
655
656// ─── PortRegistry ───────────────────────────────────────────────────────────
657
658pub struct PortRegistry {
659    ports: HashMap<String, Box<dyn Port>>,
660    configs: HashMap<String, PortConfig>,
661}
662
663impl PortRegistry {
664    pub fn new() -> Self {
665        PortRegistry {
666            ports: HashMap::new(),
667            configs: HashMap::new(),
668        }
669    }
670
671    pub fn register(&mut self, port: Box<dyn Port>) {
672        let id = port.id().to_string();
673        self.ports.insert(id, port);
674    }
675
676    pub fn register_with_config(&mut self, port: Box<dyn Port>, config: PortConfig) {
677        let id = port.id().to_string();
678        self.configs.insert(id.clone(), config);
679        self.ports.insert(id, port);
680    }
681
682    pub fn unregister(&mut self, port_id: &str) -> Result<Box<dyn Port>, PortError> {
683        self.ports
684            .remove(port_id)
685            .ok_or_else(|| PortError::NotConnected(format!("port '{port_id}' not found")))
686    }
687
688    pub fn send(&self, port_id: &str, message: OutboundMessage) -> Result<(), PortError> {
689        let port = self
690            .ports
691            .get(port_id)
692            .ok_or_else(|| PortError::NotConnected(format!("port '{port_id}' not found")))?;
693        port.send(message)
694    }
695
696    pub fn receive_all(&self) -> Vec<(String, Result<Vec<InboundMessage>, PortError>)> {
697        self.ports
698            .iter()
699            .map(|(id, port)| {
700                let result = match port.direction().can_receive() {
701                    true => port.receive(),
702                    false => Err(PortError::NotConnected(format!(
703                        "port '{id}' cannot receive"
704                    ))),
705                };
706                (id.clone(), result)
707            })
708            .collect()
709    }
710
711    pub fn get(&self, port_id: &str) -> Option<&dyn Port> {
712        self.ports.get(port_id).map(|p| p.as_ref())
713    }
714
715    pub fn get_mut(&mut self, port_id: &str) -> Option<&mut Box<dyn Port>> {
716        self.ports.get_mut(port_id)
717    }
718
719    pub fn list(&self) -> Vec<(&str, PortProtocol, PortDirection, bool)> {
720        self.ports
721            .iter()
722            .map(|(id, p)| (id.as_str(), p.protocol(), p.direction(), p.is_connected()))
723            .collect()
724    }
725
726    pub fn connect_all(&mut self) -> Vec<(String, Result<(), PortError>)> {
727        self.ports
728            .iter_mut()
729            .map(|(id, port)| (id.clone(), port.connect()))
730            .collect()
731    }
732
733    pub fn disconnect_all(&mut self) {
734        for port in self.ports.values_mut() {
735            let _ = port.disconnect();
736        }
737    }
738
739    pub fn health_check(&self) -> HashMap<String, bool> {
740        self.ports
741            .iter()
742            .map(|(id, port)| (id.clone(), port.is_connected()))
743            .collect()
744    }
745}
746
747impl Default for PortRegistry {
748    fn default() -> Self {
749        Self::new()
750    }
751}
752
753// ─── helpers ────────────────────────────────────────────────────────────────
754
755fn content_bytes(content: &MessageContent) -> usize {
756    match content {
757        MessageContent::Text(t) => t.len(),
758        MessageContent::Html(h) => h.len(),
759        MessageContent::Markdown(m) => m.len(),
760        MessageContent::Image { data, .. } => data.len(),
761        MessageContent::Audio { data, .. } => data.len(),
762        MessageContent::File { data, .. } => data.len(),
763        MessageContent::Command { name, args } => {
764            name.len() + args.iter().map(|a| a.len()).sum::<usize>()
765        }
766        MessageContent::Event {
767            event_type,
768            payload,
769        } => event_type.len() + payload.len(),
770        MessageContent::Sensor { readings } => readings.len() * 32,
771    }
772}
773
774fn text_preview(content: &MessageContent) -> String {
775    match content {
776        MessageContent::Text(t) => t.chars().take(80).collect(),
777        MessageContent::Html(h) => h.chars().take(80).collect(),
778        MessageContent::Markdown(m) => m.chars().take(80).collect(),
779        MessageContent::Image { caption, .. } => {
780            format!("[image: {}]", caption.as_deref().unwrap_or("no caption"))
781        }
782        MessageContent::Audio { .. } => "[audio]".to_string(),
783        MessageContent::File { name, .. } => format!("[file: {name}]"),
784        MessageContent::Command { name, args } => {
785            format!("cmd /{name} {:?}", args.join(" "))
786        }
787        MessageContent::Event { event_type, .. } => format!("[event: {event_type}]"),
788        MessageContent::Sensor { readings } => {
789            format!("[sensor: {} readings]", readings.len())
790        }
791    }
792}