1use std::fmt;
12use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
13use std::sync::Mutex;
14
15use serde::{Deserialize, Serialize};
16
17#[non_exhaustive]
23#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
24pub enum ImpulseType {
25 BeliefUpdate,
27 CoherenceAlert,
29 NoveltyDetected,
31 EdgeConfirmed,
33 EmbeddingRefined,
35 Custom(u8),
37}
38
39impl fmt::Display for ImpulseType {
40 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41 match self {
42 Self::BeliefUpdate => write!(f, "BeliefUpdate"),
43 Self::CoherenceAlert => write!(f, "CoherenceAlert"),
44 Self::NoveltyDetected => write!(f, "NoveltyDetected"),
45 Self::EdgeConfirmed => write!(f, "EdgeConfirmed"),
46 Self::EmbeddingRefined => write!(f, "EmbeddingRefined"),
47 Self::Custom(code) => write!(f, "Custom({code})"),
48 }
49 }
50}
51
52pub struct Impulse {
62 pub id: u64,
64 pub source_structure: u8,
66 pub source_node: [u8; 32],
68 pub target_structure: u8,
70 pub impulse_type: ImpulseType,
72 pub payload: serde_json::Value,
74 pub hlc_timestamp: u64,
76 pub acknowledged: AtomicBool,
78}
79
80impl Clone for Impulse {
82 fn clone(&self) -> Self {
83 Self {
84 id: self.id,
85 source_structure: self.source_structure,
86 source_node: self.source_node,
87 target_structure: self.target_structure,
88 impulse_type: self.impulse_type.clone(),
89 payload: self.payload.clone(),
90 hlc_timestamp: self.hlc_timestamp,
91 acknowledged: AtomicBool::new(self.acknowledged.load(Ordering::Acquire)),
92 }
93 }
94}
95
96impl fmt::Debug for Impulse {
97 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
98 f.debug_struct("Impulse")
99 .field("id", &self.id)
100 .field("source_structure", &self.source_structure)
101 .field("target_structure", &self.target_structure)
102 .field("impulse_type", &self.impulse_type)
103 .field("hlc_timestamp", &self.hlc_timestamp)
104 .field("acknowledged", &self.acknowledged.load(Ordering::Relaxed))
105 .finish()
106 }
107}
108
109pub struct ImpulseQueue {
115 queue: Mutex<Vec<Impulse>>,
116 next_id: AtomicU64,
117}
118
119impl ImpulseQueue {
120 pub fn new() -> Self {
122 Self {
123 queue: Mutex::new(Vec::new()),
124 next_id: AtomicU64::new(1),
125 }
126 }
127
128 pub fn emit(
130 &self,
131 source_structure: u8,
132 source_node: [u8; 32],
133 target_structure: u8,
134 impulse_type: ImpulseType,
135 payload: serde_json::Value,
136 hlc_timestamp: u64,
137 ) -> u64 {
138 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
139 let impulse = Impulse {
140 id,
141 source_structure,
142 source_node,
143 target_structure,
144 impulse_type,
145 payload,
146 hlc_timestamp,
147 acknowledged: AtomicBool::new(false),
148 };
149 let mut q = self.queue.lock().expect("impulse queue poisoned");
150 q.push(impulse);
151 id
152 }
153
154 pub fn drain_ready(&self) -> Vec<Impulse> {
157 let mut q = self.queue.lock().expect("impulse queue poisoned");
158 let drained: Vec<Impulse> = q
159 .drain(..)
160 .filter(|imp| !imp.acknowledged.load(Ordering::Acquire))
161 .collect();
162 let mut sorted = drained;
163 sorted.sort_by_key(|imp| imp.hlc_timestamp);
164 sorted
165 }
166
167 pub fn len(&self) -> usize {
169 self.queue.lock().expect("impulse queue poisoned").len()
170 }
171
172 pub fn is_empty(&self) -> bool {
174 self.len() == 0
175 }
176
177 pub fn clear(&self) {
179 self.queue.lock().expect("impulse queue poisoned").clear();
180 }
181
182 pub fn pending_count(&self) -> usize {
184 self.queue
185 .lock()
186 .expect("impulse queue poisoned")
187 .iter()
188 .filter(|imp| !imp.acknowledged.load(Ordering::Acquire))
189 .count()
190 }
191}
192
193impl Default for ImpulseQueue {
194 fn default() -> Self {
195 Self::new()
196 }
197}
198
199#[cfg(test)]
204mod tests {
205 use super::*;
206
207 #[test]
208 fn impulse_type_display() {
209 assert_eq!(ImpulseType::BeliefUpdate.to_string(), "BeliefUpdate");
210 assert_eq!(ImpulseType::CoherenceAlert.to_string(), "CoherenceAlert");
211 assert_eq!(ImpulseType::NoveltyDetected.to_string(), "NoveltyDetected");
212 assert_eq!(ImpulseType::EdgeConfirmed.to_string(), "EdgeConfirmed");
213 assert_eq!(
214 ImpulseType::EmbeddingRefined.to_string(),
215 "EmbeddingRefined"
216 );
217 assert_eq!(ImpulseType::Custom(42).to_string(), "Custom(42)");
218 }
219
220 #[test]
221 fn impulse_queue_new_empty() {
222 let q = ImpulseQueue::new();
223 assert!(q.is_empty());
224 assert_eq!(q.len(), 0);
225 assert_eq!(q.pending_count(), 0);
226 }
227
228 #[test]
229 fn impulse_queue_emit_assigns_id() {
230 let q = ImpulseQueue::new();
231 let node = [0u8; 32];
232 let id1 = q.emit(0, node, 2, ImpulseType::BeliefUpdate, serde_json::json!({}), 100);
233 let id2 = q.emit(1, node, 0, ImpulseType::CoherenceAlert, serde_json::json!({}), 200);
234 assert_eq!(id1, 1);
235 assert_eq!(id2, 2);
236 assert_eq!(q.len(), 2);
237 }
238
239 #[test]
240 fn impulse_queue_drain_sorted_by_hlc() {
241 let q = ImpulseQueue::new();
242 let node = [0u8; 32];
243 q.emit(0, node, 2, ImpulseType::BeliefUpdate, serde_json::json!({}), 300);
245 q.emit(1, node, 0, ImpulseType::CoherenceAlert, serde_json::json!({}), 100);
246 q.emit(2, node, 0, ImpulseType::NoveltyDetected, serde_json::json!({}), 200);
247
248 let drained = q.drain_ready();
249 assert_eq!(drained.len(), 3);
250 assert_eq!(drained[0].hlc_timestamp, 100);
251 assert_eq!(drained[1].hlc_timestamp, 200);
252 assert_eq!(drained[2].hlc_timestamp, 300);
253 }
254
255 #[test]
256 fn impulse_queue_drain_removes_items() {
257 let q = ImpulseQueue::new();
258 let node = [0u8; 32];
259 q.emit(0, node, 2, ImpulseType::BeliefUpdate, serde_json::json!({}), 1);
260 q.emit(0, node, 2, ImpulseType::EdgeConfirmed, serde_json::json!({}), 2);
261 assert_eq!(q.len(), 2);
262
263 let drained = q.drain_ready();
264 assert_eq!(drained.len(), 2);
265 assert!(q.is_empty());
266 }
267
268 #[test]
269 fn impulse_queue_clear() {
270 let q = ImpulseQueue::new();
271 let node = [0u8; 32];
272 q.emit(0, node, 1, ImpulseType::EmbeddingRefined, serde_json::json!({}), 10);
273 q.emit(0, node, 1, ImpulseType::Custom(7), serde_json::json!({}), 20);
274 assert_eq!(q.len(), 2);
275
276 q.clear();
277 assert!(q.is_empty());
278 assert_eq!(q.len(), 0);
279 }
280
281 #[test]
282 fn impulse_queue_pending_count() {
283 let q = ImpulseQueue::new();
284 let node = [0u8; 32];
285 q.emit(0, node, 2, ImpulseType::BeliefUpdate, serde_json::json!({}), 1);
286 q.emit(1, node, 0, ImpulseType::CoherenceAlert, serde_json::json!({}), 2);
287 assert_eq!(q.pending_count(), 2);
288
289 {
291 let guard = q.queue.lock().unwrap();
292 guard[0].acknowledged.store(true, Ordering::Release);
293 }
294 assert_eq!(q.pending_count(), 1);
295 }
296
297 #[test]
298 fn impulse_emit_and_acknowledge() {
299 let q = ImpulseQueue::new();
300 let node = [1u8; 32];
301 q.emit(0, node, 2, ImpulseType::NoveltyDetected, serde_json::json!({"k": "v"}), 50);
302 q.emit(0, node, 3, ImpulseType::EdgeConfirmed, serde_json::json!(null), 60);
303
304 let drained = q.drain_ready();
305 assert_eq!(drained.len(), 2);
306 assert!(q.is_empty());
308
309 for imp in &drained {
311 imp.acknowledged.store(true, Ordering::Release);
312 }
313
314 for imp in &drained {
316 assert!(imp.acknowledged.load(Ordering::Acquire));
317 }
318 }
319}