strange_loop/nano_agent/
bus.rs

1//! Lock-free message bus for nano-agents
2
3use crossbeam::queue::ArrayQueue;
4use std::sync::Arc;
5
6/// Message for inter-agent communication
7#[derive(Clone, Debug)]
8pub struct Message {
9    pub topic: &'static str,
10    pub data: MessageData,
11    pub timestamp_ns: u128,
12}
13
14/// Message data variants
15#[derive(Clone, Debug)]
16pub enum MessageData {
17    U64(u64),
18    F64(f64),
19    Bool(bool),
20    Bytes([u8; 32]),
21    Empty,
22}
23
24/// Lock-free message bus using fixed-size queue
25pub struct NanoBus {
26    queue: Arc<ArrayQueue<Message>>,
27    capacity: usize,
28}
29
30impl NanoBus {
31    /// Create a new message bus with given capacity
32    pub fn new(capacity: usize) -> Self {
33        Self {
34            queue: Arc::new(ArrayQueue::new(capacity)),
35            capacity,
36        }
37    }
38
39    /// Publish a message (non-blocking, may drop if full)
40    #[inline(always)]
41    pub fn publish(&self, message: Message) -> bool {
42        self.queue.push(message).is_ok()
43    }
44
45    /// Try to receive a message (non-blocking)
46    #[inline(always)]
47    pub fn try_recv(&self) -> Option<Message> {
48        self.queue.pop()
49    }
50
51    /// Drain up to `max` messages without blocking
52    #[inline(always)]
53    pub fn drain(&self, max: usize) -> Vec<Message> {
54        let mut messages = Vec::with_capacity(max.min(16));
55        for _ in 0..max {
56            match self.try_recv() {
57                Some(msg) => messages.push(msg),
58                None => break,
59            }
60        }
61        messages
62    }
63
64    /// Get current queue length
65    pub fn len(&self) -> usize {
66        self.queue.len()
67    }
68
69    /// Check if queue is empty
70    pub fn is_empty(&self) -> bool {
71        self.queue.is_empty()
72    }
73
74    /// Check if queue is full
75    pub fn is_full(&self) -> bool {
76        self.queue.len() >= self.capacity
77    }
78
79    /// Clone the bus for sharing between agents
80    pub fn clone_bus(&self) -> Self {
81        Self {
82            queue: self.queue.clone(),
83            capacity: self.capacity,
84        }
85    }
86}
87
88impl Clone for NanoBus {
89    fn clone(&self) -> Self {
90        self.clone_bus()
91    }
92}
93
94/// Topic-based message filter
95pub struct TopicFilter {
96    topics: Vec<&'static str>,
97}
98
99impl TopicFilter {
100    pub fn new(topics: Vec<&'static str>) -> Self {
101        Self { topics }
102    }
103
104    #[inline(always)]
105    pub fn matches(&self, message: &Message) -> bool {
106        self.topics.iter().any(|&t| t == message.topic)
107    }
108}
109
110#[cfg(test)]
111mod tests {
112    use super::*;
113
114    #[test]
115    fn test_nano_bus() {
116        let bus = NanoBus::new(10);
117
118        // Publish messages
119        let msg1 = Message {
120            topic: "test",
121            data: MessageData::U64(42),
122            timestamp_ns: 1000,
123        };
124
125        assert!(bus.publish(msg1.clone()));
126        assert_eq!(bus.len(), 1);
127
128        // Receive message
129        let received = bus.try_recv().unwrap();
130        assert_eq!(received.topic, "test");
131        match received.data {
132            MessageData::U64(val) => assert_eq!(val, 42),
133            _ => panic!("Wrong message type"),
134        }
135
136        assert!(bus.is_empty());
137    }
138
139    #[test]
140    fn test_bus_overflow() {
141        let bus = NanoBus::new(2);
142
143        let msg = Message {
144            topic: "test",
145            data: MessageData::Empty,
146            timestamp_ns: 0,
147        };
148
149        assert!(bus.publish(msg.clone()));
150        assert!(bus.publish(msg.clone()));
151        assert!(!bus.publish(msg.clone())); // Should fail when full
152
153        assert!(bus.is_full());
154    }
155
156    #[test]
157    fn test_topic_filter() {
158        let filter = TopicFilter::new(vec!["sensor", "control"]);
159
160        let msg1 = Message {
161            topic: "sensor",
162            data: MessageData::Empty,
163            timestamp_ns: 0,
164        };
165
166        let msg2 = Message {
167            topic: "debug",
168            data: MessageData::Empty,
169            timestamp_ns: 0,
170        };
171
172        assert!(filter.matches(&msg1));
173        assert!(!filter.matches(&msg2));
174    }
175}