strange_loop/nano_agent/
bus.rs1use crossbeam::queue::ArrayQueue;
4use std::sync::Arc;
5
6#[derive(Clone, Debug)]
8pub struct Message {
9 pub topic: &'static str,
10 pub data: MessageData,
11 pub timestamp_ns: u128,
12}
13
14#[derive(Clone, Debug)]
16pub enum MessageData {
17 U64(u64),
18 F64(f64),
19 Bool(bool),
20 Bytes([u8; 32]),
21 Empty,
22}
23
24pub struct NanoBus {
26 queue: Arc<ArrayQueue<Message>>,
27 capacity: usize,
28}
29
30impl NanoBus {
31 pub fn new(capacity: usize) -> Self {
33 Self {
34 queue: Arc::new(ArrayQueue::new(capacity)),
35 capacity,
36 }
37 }
38
39 #[inline(always)]
41 pub fn publish(&self, message: Message) -> bool {
42 self.queue.push(message).is_ok()
43 }
44
45 #[inline(always)]
47 pub fn try_recv(&self) -> Option<Message> {
48 self.queue.pop()
49 }
50
51 #[inline(always)]
53 pub fn drain(&self, max: usize) -> Vec<Message> {
54 let mut messages = Vec::with_capacity(max.min(16));
55 for _ in 0..max {
56 match self.try_recv() {
57 Some(msg) => messages.push(msg),
58 None => break,
59 }
60 }
61 messages
62 }
63
64 pub fn len(&self) -> usize {
66 self.queue.len()
67 }
68
69 pub fn is_empty(&self) -> bool {
71 self.queue.is_empty()
72 }
73
74 pub fn is_full(&self) -> bool {
76 self.queue.len() >= self.capacity
77 }
78
79 pub fn clone_bus(&self) -> Self {
81 Self {
82 queue: self.queue.clone(),
83 capacity: self.capacity,
84 }
85 }
86}
87
88impl Clone for NanoBus {
89 fn clone(&self) -> Self {
90 self.clone_bus()
91 }
92}
93
94pub struct TopicFilter {
96 topics: Vec<&'static str>,
97}
98
99impl TopicFilter {
100 pub fn new(topics: Vec<&'static str>) -> Self {
101 Self { topics }
102 }
103
104 #[inline(always)]
105 pub fn matches(&self, message: &Message) -> bool {
106 self.topics.iter().any(|&t| t == message.topic)
107 }
108}
109
110#[cfg(test)]
111mod tests {
112 use super::*;
113
114 #[test]
115 fn test_nano_bus() {
116 let bus = NanoBus::new(10);
117
118 let msg1 = Message {
120 topic: "test",
121 data: MessageData::U64(42),
122 timestamp_ns: 1000,
123 };
124
125 assert!(bus.publish(msg1.clone()));
126 assert_eq!(bus.len(), 1);
127
128 let received = bus.try_recv().unwrap();
130 assert_eq!(received.topic, "test");
131 match received.data {
132 MessageData::U64(val) => assert_eq!(val, 42),
133 _ => panic!("Wrong message type"),
134 }
135
136 assert!(bus.is_empty());
137 }
138
139 #[test]
140 fn test_bus_overflow() {
141 let bus = NanoBus::new(2);
142
143 let msg = Message {
144 topic: "test",
145 data: MessageData::Empty,
146 timestamp_ns: 0,
147 };
148
149 assert!(bus.publish(msg.clone()));
150 assert!(bus.publish(msg.clone()));
151 assert!(!bus.publish(msg.clone())); assert!(bus.is_full());
154 }
155
156 #[test]
157 fn test_topic_filter() {
158 let filter = TopicFilter::new(vec!["sensor", "control"]);
159
160 let msg1 = Message {
161 topic: "sensor",
162 data: MessageData::Empty,
163 timestamp_ns: 0,
164 };
165
166 let msg2 = Message {
167 topic: "debug",
168 data: MessageData::Empty,
169 timestamp_ns: 0,
170 };
171
172 assert!(filter.matches(&msg1));
173 assert!(!filter.matches(&msg2));
174 }
175}