1use serde::{Deserialize, Serialize};
2use std::sync::Mutex;
3use std::collections::HashMap;
4use uuid::Uuid;
5
6#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
9pub enum PortProtocol {
10 Telegram,
11 Discord,
12 Slack,
13 WebSocket,
14 Http,
15 Https,
16 Mqtt,
17 Amqp,
18 Serial,
19 Gpio,
20 I2c,
21 Spi,
22 StdinStdout,
23 File(String),
24 Custom(String),
25}
26
27impl PortProtocol {
28 pub fn label(&self) -> &str {
29 match self {
30 PortProtocol::Telegram => "Telegram",
31 PortProtocol::Discord => "Discord",
32 PortProtocol::Slack => "Slack",
33 PortProtocol::WebSocket => "WebSocket",
34 PortProtocol::Http => "HTTP",
35 PortProtocol::Https => "HTTPS",
36 PortProtocol::Mqtt => "MQTT",
37 PortProtocol::Amqp => "AMQP",
38 PortProtocol::Serial => "Serial",
39 PortProtocol::Gpio => "GPIO",
40 PortProtocol::I2c => "I2C",
41 PortProtocol::Spi => "SPI",
42 PortProtocol::StdinStdout => "stdin/stdout",
43 PortProtocol::File(_) => "File",
44 PortProtocol::Custom(name) => name.as_str(),
45 }
46 }
47
48 pub fn is_network(&self) -> bool {
49 matches!(
50 self,
51 PortProtocol::Telegram
52 | PortProtocol::Discord
53 | PortProtocol::Slack
54 | PortProtocol::WebSocket
55 | PortProtocol::Http
56 | PortProtocol::Https
57 | PortProtocol::Mqtt
58 | PortProtocol::Amqp
59 )
60 }
61
62 pub fn is_hardware(&self) -> bool {
63 matches!(
64 self,
65 PortProtocol::Serial | PortProtocol::Gpio | PortProtocol::I2c | PortProtocol::Spi
66 )
67 }
68
69 pub fn is_filesystem(&self) -> bool {
70 matches!(self, PortProtocol::File(_))
71 }
72}
73
74#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
77pub enum PortDirection {
78 Inbound,
79 Outbound,
80 Bidirectional,
81}
82
83impl PortDirection {
84 pub fn can_send(&self) -> bool {
85 matches!(self, PortDirection::Outbound | PortDirection::Bidirectional)
86 }
87
88 pub fn can_receive(&self) -> bool {
89 matches!(self, PortDirection::Inbound | PortDirection::Bidirectional)
90 }
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct SensorReading {
97 pub sensor_id: String,
98 pub value: f64,
99 pub unit: String,
100 pub timestamp: i64,
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
104pub enum MessageContent {
105 Text(String),
106 Html(String),
107 Markdown(String),
108 Image {
109 data: Vec<u8>,
110 mime: String,
111 caption: Option<String>,
112 },
113 Audio {
114 data: Vec<u8>,
115 mime: String,
116 duration_ms: Option<u32>,
117 },
118 File {
119 data: Vec<u8>,
120 mime: String,
121 name: String,
122 },
123 Command {
124 name: String,
125 args: Vec<String>,
126 },
127 Event {
128 event_type: String,
129 payload: String,
130 },
131 Sensor {
132 readings: Vec<SensorReading>,
133 },
134}
135
136impl MessageContent {
137 pub fn text(content: &str) -> Self {
138 MessageContent::Text(content.to_string())
139 }
140
141 pub fn command(name: &str, args: &[&str]) -> Self {
142 MessageContent::Command {
143 name: name.to_string(),
144 args: args.iter().map(|a| a.to_string()).collect(),
145 }
146 }
147
148 pub fn is_text(&self) -> bool {
149 matches!(self, MessageContent::Text(_))
150 }
151
152 pub fn is_command(&self) -> bool {
153 matches!(self, MessageContent::Command { .. })
154 }
155
156 pub fn as_text(&self) -> Option<&str> {
157 match self {
158 MessageContent::Text(t) => Some(t.as_str()),
159 _ => None,
160 }
161 }
162}
163
164#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct InboundMessage {
168 pub id: String,
169 pub port_id: String,
170 pub sender: Option<String>,
171 pub content: MessageContent,
172 pub timestamp: i64,
173 pub reply_to: Option<String>,
174 pub metadata: HashMap<String, String>,
175}
176
177impl InboundMessage {
178 pub fn text(content: &str) -> Self {
179 InboundMessage {
180 id: Uuid::new_v4().to_string(),
181 port_id: String::new(),
182 sender: None,
183 content: MessageContent::text(content),
184 timestamp: chrono::Utc::now().timestamp(),
185 reply_to: None,
186 metadata: HashMap::new(),
187 }
188 }
189
190 pub fn command(name: &str, args: &[&str]) -> Self {
191 InboundMessage {
192 id: Uuid::new_v4().to_string(),
193 port_id: String::new(),
194 sender: None,
195 content: MessageContent::command(name, args),
196 timestamp: chrono::Utc::now().timestamp(),
197 reply_to: None,
198 metadata: HashMap::new(),
199 }
200 }
201
202 pub fn is_text(&self) -> bool {
203 self.content.is_text()
204 }
205
206 pub fn is_command(&self) -> bool {
207 self.content.is_command()
208 }
209
210 pub fn as_text(&self) -> Option<&str> {
211 self.content.as_text()
212 }
213}
214
215#[derive(Debug, Clone, Serialize, Deserialize)]
216pub struct OutboundMessage {
217 pub id: String,
218 pub port_id: String,
219 pub target: Option<String>,
220 pub content: MessageContent,
221 pub reply_to: Option<String>,
222 pub metadata: HashMap<String, String>,
223}
224
225impl OutboundMessage {
226 pub fn text(content: &str) -> Self {
227 OutboundMessage {
228 id: Uuid::new_v4().to_string(),
229 port_id: String::new(),
230 target: None,
231 content: MessageContent::text(content),
232 reply_to: None,
233 metadata: HashMap::new(),
234 }
235 }
236
237 pub fn reply(reply_to: &str, content: &str) -> Self {
238 OutboundMessage {
239 id: Uuid::new_v4().to_string(),
240 port_id: String::new(),
241 target: None,
242 content: MessageContent::text(content),
243 reply_to: Some(reply_to.to_string()),
244 metadata: HashMap::new(),
245 }
246 }
247}
248
249#[derive(Debug, Clone, Serialize, Deserialize)]
252pub struct PortDeadband {
253 pub lower: f64,
254 pub upper: f64,
255 pub check_interval_ms: u64,
256}
257
258#[derive(Debug, Clone, Serialize, Deserialize)]
261pub struct PortConfig {
262 pub id: String,
263 pub protocol: PortProtocol,
264 pub direction: PortDirection,
265 pub params: HashMap<String, String>,
266 pub enabled: bool,
267 #[serde(skip_serializing_if = "Option::is_none")]
268 pub max_rate: Option<u32>,
269 #[serde(skip_serializing_if = "Option::is_none")]
270 pub deadband: Option<PortDeadband>,
271}
272
273impl PortConfig {
274 pub fn from_json(json: &str) -> Result<Self, PortError> {
275 serde_json::from_str(json).map_err(|e| PortError::ConfigInvalid(e.to_string()))
276 }
277
278 pub fn to_json(&self) -> String {
279 serde_json::to_string_pretty(self).unwrap_or_default()
280 }
281}
282
283#[derive(Debug, Clone, Serialize, Deserialize, Default)]
286pub struct PortStats {
287 pub messages_sent: u64,
288 pub messages_received: u64,
289 pub bytes_sent: u64,
290 pub bytes_received: u64,
291 pub errors: u64,
292 pub last_activity: Option<i64>,
293 pub avg_latency_ms: f64,
294}
295
296#[derive(Debug, Clone)]
299pub enum PortError {
300 NotConnected(String),
301 SendFailed(String),
302 ReceiveFailed(String),
303 ConfigInvalid(String),
304 RateLimited { retry_after_ms: u64 },
305 AuthFailed(String),
306 Timeout(u64),
307 ProtocolError(String),
308 ConnectFailed(String),
309}
310
311impl std::fmt::Display for PortError {
312 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313 match self {
314 PortError::NotConnected(s) => write!(f, "not connected: {s}"),
315 PortError::SendFailed(s) => write!(f, "send failed: {s}"),
316 PortError::ReceiveFailed(s) => write!(f, "receive failed: {s}"),
317 PortError::ConfigInvalid(s) => write!(f, "invalid config: {s}"),
318 PortError::RateLimited { retry_after_ms } => {
319 write!(f, "rate limited, retry after {retry_after_ms}ms")
320 }
321 PortError::AuthFailed(s) => write!(f, "auth failed: {s}"),
322 PortError::Timeout(ms) => write!(f, "timeout after {ms}ms"),
323 PortError::ProtocolError(s) => write!(f, "protocol error: {s}"),
324 PortError::ConnectFailed(s) => write!(f, "connect failed: {s}"),
325 }
326 }
327}
328
329impl std::error::Error for PortError {}
330
331pub trait Port: Send + Sync {
334 fn id(&self) -> &str;
335 fn protocol(&self) -> PortProtocol;
336 fn direction(&self) -> PortDirection;
337 fn send(&self, message: OutboundMessage) -> Result<(), PortError>;
338 fn receive(&self) -> Result<Vec<InboundMessage>, PortError>;
339 fn is_connected(&self) -> bool;
340 fn connect(&mut self) -> Result<(), PortError>;
341 fn disconnect(&mut self) -> Result<(), PortError>;
342 fn stats(&self) -> PortStats;
343}
344
345pub struct StdioPort {
348 id: String,
349 connected: Mutex<bool>,
350 direction: PortDirection,
351 stats: Mutex<PortStats>,
352}
353
354impl StdioPort {
355 pub fn new(id: &str) -> Self {
356 StdioPort {
357 id: id.to_string(),
358 connected: Mutex::new(false),
359 direction: PortDirection::Bidirectional,
360 stats: Mutex::new(PortStats::default()),
361 }
362 }
363}
364
365impl Port for StdioPort {
366 fn id(&self) -> &str {
367 &self.id
368 }
369
370 fn protocol(&self) -> PortProtocol {
371 PortProtocol::StdinStdout
372 }
373
374 fn direction(&self) -> PortDirection {
375 self.direction
376 }
377
378 fn send(&self, message: OutboundMessage) -> Result<(), PortError> {
379 if !*self.connected.lock().unwrap() {
380 return Err(PortError::NotConnected(
381 "stdio port is not connected".to_string(),
382 ));
383 }
384 let text = match &message.content {
385 MessageContent::Text(t) => t.clone(),
386 MessageContent::Html(h) => h.clone(),
387 MessageContent::Markdown(m) => m.clone(),
388 _ => return Err(PortError::SendFailed("unsupported content type".into())),
389 };
390 {
391 let mut stats = self.stats.lock().unwrap();
392 stats.messages_sent += 1;
393 stats.bytes_sent += text.len() as u64;
394 stats.last_activity = Some(chrono::Utc::now().timestamp());
395 }
396 println!("{text}");
397 Ok(())
398 }
399
400 fn receive(&self) -> Result<Vec<InboundMessage>, PortError> {
401 Err(PortError::NotConnected(
402 "stdio receive not implemented; use stdin reader externally".to_string(),
403 ))
404 }
405
406 fn is_connected(&self) -> bool {
407 *self.connected.lock().unwrap()
408 }
409
410 fn connect(&mut self) -> Result<(), PortError> {
411 *self.connected.lock().unwrap() = true;
412 Ok(())
413 }
414
415 fn disconnect(&mut self) -> Result<(), PortError> {
416 *self.connected.lock().unwrap() = false;
417 Ok(())
418 }
419
420 fn stats(&self) -> PortStats {
421 self.stats.lock().unwrap().clone()
422 }
423}
424
425pub struct MemoryPort {
428 id: String,
429 protocol: PortProtocol,
430 direction: PortDirection,
431 inbox: Mutex<Vec<InboundMessage>>,
432 outbox: Mutex<Vec<OutboundMessage>>,
433 connected: Mutex<bool>,
434 stats: Mutex<PortStats>,
435}
436
437impl MemoryPort {
438 pub fn new(id: &str, protocol: PortProtocol, direction: PortDirection) -> Self {
439 MemoryPort {
440 id: id.to_string(),
441 protocol,
442 direction,
443 inbox: Mutex::new(Vec::new()),
444 outbox: Mutex::new(Vec::new()),
445 connected: Mutex::new(false),
446 stats: Mutex::new(PortStats::default()),
447 }
448 }
449
450 pub fn inject(&self, msg: InboundMessage) {
451 {
452 let mut stats = self.stats.lock().unwrap();
453 stats.messages_received += 1;
454 stats.bytes_received += content_bytes(&msg.content) as u64;
455 stats.last_activity = Some(chrono::Utc::now().timestamp());
456 }
457 self.inbox.lock().unwrap().push(msg);
458 }
459
460 pub fn drain_outbox(&self) -> Vec<OutboundMessage> {
461 self.outbox.lock().unwrap().drain(..).collect()
462 }
463}
464
465impl Port for MemoryPort {
466 fn id(&self) -> &str {
467 &self.id
468 }
469
470 fn protocol(&self) -> PortProtocol {
471 self.protocol.clone()
472 }
473
474 fn direction(&self) -> PortDirection {
475 self.direction
476 }
477
478 fn send(&self, message: OutboundMessage) -> Result<(), PortError> {
479 if !*self.connected.lock().unwrap() {
480 return Err(PortError::NotConnected(format!(
481 "memory port '{}' not connected",
482 self.id
483 )));
484 }
485 if !self.direction.can_send() {
486 return Err(PortError::SendFailed(format!(
487 "port '{}' is {:?}, cannot send",
488 self.id, self.direction
489 )));
490 }
491 {
492 let mut stats = self.stats.lock().unwrap();
493 stats.messages_sent += 1;
494 stats.bytes_sent += content_bytes(&message.content) as u64;
495 stats.last_activity = Some(chrono::Utc::now().timestamp());
496 }
497 self.outbox.lock().unwrap().push(message);
498 Ok(())
499 }
500
501 fn receive(&self) -> Result<Vec<InboundMessage>, PortError> {
502 if !*self.connected.lock().unwrap() {
503 return Err(PortError::NotConnected(format!(
504 "memory port '{}' not connected",
505 self.id
506 )));
507 }
508 if !self.direction.can_receive() {
509 return Err(PortError::ReceiveFailed(format!(
510 "port '{}' is {:?}, cannot receive",
511 self.id, self.direction
512 )));
513 }
514 Ok(self.inbox.lock().unwrap().drain(..).collect())
515 }
516
517 fn is_connected(&self) -> bool {
518 *self.connected.lock().unwrap()
519 }
520
521 fn connect(&mut self) -> Result<(), PortError> {
522 *self.connected.lock().unwrap() = true;
523 Ok(())
524 }
525
526 fn disconnect(&mut self) -> Result<(), PortError> {
527 *self.connected.lock().unwrap() = false;
528 Ok(())
529 }
530
531 fn stats(&self) -> PortStats {
532 self.stats.lock().unwrap().clone()
533 }
534}
535
536#[derive(Debug, Clone, Serialize, Deserialize)]
539pub enum FileFormat {
540 Jsonl,
541 Csv,
542 Raw,
543}
544
545pub struct FilePort {
546 id: String,
547 path: String,
548 format: FileFormat,
549 connected: Mutex<bool>,
550 direction: PortDirection,
551 stats: Mutex<PortStats>,
552}
553
554impl FilePort {
555 pub fn new(id: &str, path: &str, format: FileFormat) -> Self {
556 FilePort {
557 id: id.to_string(),
558 path: path.to_string(),
559 format,
560 connected: Mutex::new(false),
561 direction: PortDirection::Outbound,
562 stats: Mutex::new(PortStats::default()),
563 }
564 }
565}
566
567impl Port for FilePort {
568 fn id(&self) -> &str {
569 &self.id
570 }
571
572 fn protocol(&self) -> PortProtocol {
573 PortProtocol::File(self.path.clone())
574 }
575
576 fn direction(&self) -> PortDirection {
577 self.direction
578 }
579
580 fn send(&self, message: OutboundMessage) -> Result<(), PortError> {
581 use std::io::Write;
582
583 if !*self.connected.lock().unwrap() {
584 return Err(PortError::NotConnected(format!(
585 "file port '{}' not connected",
586 self.id
587 )));
588 }
589
590 let line = match self.format {
591 FileFormat::Jsonl => serde_json::to_string(&message)
592 .map_err(|e| PortError::SendFailed(e.to_string()))?,
593 FileFormat::Csv => {
594 format!(
595 "{},{},{}\n",
596 message.id,
597 message.target.unwrap_or_default(),
598 text_preview(&message.content)
599 )
600 }
601 FileFormat::Raw => match &message.content {
602 MessageContent::Text(t) => t.clone(),
603 MessageContent::Html(h) => h.clone(),
604 MessageContent::Markdown(m) => m.clone(),
605 _ => return Err(PortError::SendFailed("unsupported content for raw".into())),
606 },
607 };
608
609 let mut file = std::fs::OpenOptions::new()
610 .create(true)
611 .append(true)
612 .open(&self.path)
613 .map_err(|e| PortError::SendFailed(e.to_string()))?;
614
615 writeln!(file, "{line}")
616 .map_err(|e| PortError::SendFailed(e.to_string()))?;
617
618 {
619 let mut stats = self.stats.lock().unwrap();
620 stats.messages_sent += 1;
621 stats.bytes_sent += line.len() as u64 + 1;
622 stats.last_activity = Some(chrono::Utc::now().timestamp());
623 }
624
625 Ok(())
626 }
627
628 fn receive(&self) -> Result<Vec<InboundMessage>, PortError> {
629 Err(PortError::NotConnected("file ports are write-only".into()))
630 }
631
632 fn is_connected(&self) -> bool {
633 *self.connected.lock().unwrap()
634 }
635
636 fn connect(&mut self) -> Result<(), PortError> {
637 let parent = std::path::Path::new(&self.path).parent().unwrap_or(std::path::Path::new("."));
638 if !parent.exists() {
639 std::fs::create_dir_all(parent)
640 .map_err(|e| PortError::ConnectFailed(e.to_string()))?;
641 }
642 *self.connected.lock().unwrap() = true;
643 Ok(())
644 }
645
646 fn disconnect(&mut self) -> Result<(), PortError> {
647 *self.connected.lock().unwrap() = false;
648 Ok(())
649 }
650
651 fn stats(&self) -> PortStats {
652 self.stats.lock().unwrap().clone()
653 }
654}
655
656pub struct PortRegistry {
659 ports: HashMap<String, Box<dyn Port>>,
660 configs: HashMap<String, PortConfig>,
661}
662
663impl PortRegistry {
664 pub fn new() -> Self {
665 PortRegistry {
666 ports: HashMap::new(),
667 configs: HashMap::new(),
668 }
669 }
670
671 pub fn register(&mut self, port: Box<dyn Port>) {
672 let id = port.id().to_string();
673 self.ports.insert(id, port);
674 }
675
676 pub fn register_with_config(&mut self, port: Box<dyn Port>, config: PortConfig) {
677 let id = port.id().to_string();
678 self.configs.insert(id.clone(), config);
679 self.ports.insert(id, port);
680 }
681
682 pub fn unregister(&mut self, port_id: &str) -> Result<Box<dyn Port>, PortError> {
683 self.ports
684 .remove(port_id)
685 .ok_or_else(|| PortError::NotConnected(format!("port '{port_id}' not found")))
686 }
687
688 pub fn send(&self, port_id: &str, message: OutboundMessage) -> Result<(), PortError> {
689 let port = self
690 .ports
691 .get(port_id)
692 .ok_or_else(|| PortError::NotConnected(format!("port '{port_id}' not found")))?;
693 port.send(message)
694 }
695
696 pub fn receive_all(&self) -> Vec<(String, Result<Vec<InboundMessage>, PortError>)> {
697 self.ports
698 .iter()
699 .map(|(id, port)| {
700 let result = match port.direction().can_receive() {
701 true => port.receive(),
702 false => Err(PortError::NotConnected(format!(
703 "port '{id}' cannot receive"
704 ))),
705 };
706 (id.clone(), result)
707 })
708 .collect()
709 }
710
711 pub fn get(&self, port_id: &str) -> Option<&dyn Port> {
712 self.ports.get(port_id).map(|p| p.as_ref())
713 }
714
715 pub fn get_mut(&mut self, port_id: &str) -> Option<&mut Box<dyn Port>> {
716 self.ports.get_mut(port_id)
717 }
718
719 pub fn list(&self) -> Vec<(&str, PortProtocol, PortDirection, bool)> {
720 self.ports
721 .iter()
722 .map(|(id, p)| (id.as_str(), p.protocol(), p.direction(), p.is_connected()))
723 .collect()
724 }
725
726 pub fn connect_all(&mut self) -> Vec<(String, Result<(), PortError>)> {
727 self.ports
728 .iter_mut()
729 .map(|(id, port)| (id.clone(), port.connect()))
730 .collect()
731 }
732
733 pub fn disconnect_all(&mut self) {
734 for port in self.ports.values_mut() {
735 let _ = port.disconnect();
736 }
737 }
738
739 pub fn health_check(&self) -> HashMap<String, bool> {
740 self.ports
741 .iter()
742 .map(|(id, port)| (id.clone(), port.is_connected()))
743 .collect()
744 }
745}
746
747impl Default for PortRegistry {
748 fn default() -> Self {
749 Self::new()
750 }
751}
752
753fn content_bytes(content: &MessageContent) -> usize {
756 match content {
757 MessageContent::Text(t) => t.len(),
758 MessageContent::Html(h) => h.len(),
759 MessageContent::Markdown(m) => m.len(),
760 MessageContent::Image { data, .. } => data.len(),
761 MessageContent::Audio { data, .. } => data.len(),
762 MessageContent::File { data, .. } => data.len(),
763 MessageContent::Command { name, args } => {
764 name.len() + args.iter().map(|a| a.len()).sum::<usize>()
765 }
766 MessageContent::Event {
767 event_type,
768 payload,
769 } => event_type.len() + payload.len(),
770 MessageContent::Sensor { readings } => readings.len() * 32,
771 }
772}
773
774fn text_preview(content: &MessageContent) -> String {
775 match content {
776 MessageContent::Text(t) => t.chars().take(80).collect(),
777 MessageContent::Html(h) => h.chars().take(80).collect(),
778 MessageContent::Markdown(m) => m.chars().take(80).collect(),
779 MessageContent::Image { caption, .. } => {
780 format!("[image: {}]", caption.as_deref().unwrap_or("no caption"))
781 }
782 MessageContent::Audio { .. } => "[audio]".to_string(),
783 MessageContent::File { name, .. } => format!("[file: {name}]"),
784 MessageContent::Command { name, args } => {
785 format!("cmd /{name} {:?}", args.join(" "))
786 }
787 MessageContent::Event { event_type, .. } => format!("[event: {event_type}]"),
788 MessageContent::Sensor { readings } => {
789 format!("[sensor: {} readings]", readings.len())
790 }
791 }
792}