Skip to main content

clawft_kernel/
impulse.rs

1//! Ephemeral causal impulse queue for inter-structure communication (ECC Phase K3c).
2//!
3//! Impulses are short-lived events that flow between the four ECC structures
4//! (causal graph, spectral index, HNSW, cloud/edge bridge). The [`ImpulseQueue`]
5//! provides a thread-safe, ordered buffer that producers [`emit`](ImpulseQueue::emit)
6//! into and consumers [`drain_ready`](ImpulseQueue::drain_ready) from.
7//!
8//! Structure tags are represented as raw `u8` values to avoid cross-module
9//! coupling. They correspond to `crossref::StructureTag::as_u8()`.
10
11use std::fmt;
12use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
13use std::sync::Mutex;
14
15use serde::{Deserialize, Serialize};
16
17// ---------------------------------------------------------------------------
18// ImpulseType
19// ---------------------------------------------------------------------------
20
21/// Discriminant for the kind of causal event being signalled.
22#[non_exhaustive]
23#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
24pub enum ImpulseType {
25    /// causal -> hnsw (new embedding needed)
26    BeliefUpdate,
27    /// spectral -> causal (graph incoherent)
28    CoherenceAlert,
29    /// hnsw -> causal (new cluster found)
30    NoveltyDetected,
31    /// cloud -> edge (DEMOCRITUS validated edge)
32    EdgeConfirmed,
33    /// cloud -> edge (better embedding available)
34    EmbeddingRefined,
35    /// Extension point for user-defined impulse kinds.
36    Custom(u8),
37}
38
39impl fmt::Display for ImpulseType {
40    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41        match self {
42            Self::BeliefUpdate => write!(f, "BeliefUpdate"),
43            Self::CoherenceAlert => write!(f, "CoherenceAlert"),
44            Self::NoveltyDetected => write!(f, "NoveltyDetected"),
45            Self::EdgeConfirmed => write!(f, "EdgeConfirmed"),
46            Self::EmbeddingRefined => write!(f, "EmbeddingRefined"),
47            Self::Custom(code) => write!(f, "Custom({code})"),
48        }
49    }
50}
51
52// ---------------------------------------------------------------------------
53// Impulse
54// ---------------------------------------------------------------------------
55
56/// A single causal event travelling between ECC structures.
57///
58/// `source_structure` and `target_structure` are `u8` tags that correspond to
59/// `crossref::StructureTag::as_u8()` values:
60///   0 = CausalGraph, 1 = SpectralIndex, 2 = Hnsw, 3 = CloudBridge.
61pub struct Impulse {
62    /// Monotonically increasing identifier assigned by the queue.
63    pub id: u64,
64    /// Originating structure (see `StructureTag::as_u8()`).
65    pub source_structure: u8,
66    /// 32-byte universal node identifier from the source structure.
67    pub source_node: [u8; 32],
68    /// Destination structure (see `StructureTag::as_u8()`).
69    pub target_structure: u8,
70    /// The kind of impulse.
71    pub impulse_type: ImpulseType,
72    /// Arbitrary JSON payload carried by this impulse.
73    pub payload: serde_json::Value,
74    /// Hybrid-logical-clock timestamp for causal ordering.
75    pub hlc_timestamp: u64,
76    /// Set to `true` once the consumer has processed this impulse.
77    pub acknowledged: AtomicBool,
78}
79
80// AtomicBool is not Clone, so we implement Clone manually.
81impl Clone for Impulse {
82    fn clone(&self) -> Self {
83        Self {
84            id: self.id,
85            source_structure: self.source_structure,
86            source_node: self.source_node,
87            target_structure: self.target_structure,
88            impulse_type: self.impulse_type.clone(),
89            payload: self.payload.clone(),
90            hlc_timestamp: self.hlc_timestamp,
91            acknowledged: AtomicBool::new(self.acknowledged.load(Ordering::Acquire)),
92        }
93    }
94}
95
96impl fmt::Debug for Impulse {
97    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
98        f.debug_struct("Impulse")
99            .field("id", &self.id)
100            .field("source_structure", &self.source_structure)
101            .field("target_structure", &self.target_structure)
102            .field("impulse_type", &self.impulse_type)
103            .field("hlc_timestamp", &self.hlc_timestamp)
104            .field("acknowledged", &self.acknowledged.load(Ordering::Relaxed))
105            .finish()
106    }
107}
108
109// ---------------------------------------------------------------------------
110// ImpulseQueue
111// ---------------------------------------------------------------------------
112
113/// Thread-safe queue of [`Impulse`] events awaiting consumption.
114pub struct ImpulseQueue {
115    queue: Mutex<Vec<Impulse>>,
116    next_id: AtomicU64,
117}
118
119impl ImpulseQueue {
120    /// Create a new, empty impulse queue.
121    pub fn new() -> Self {
122        Self {
123            queue: Mutex::new(Vec::new()),
124            next_id: AtomicU64::new(1),
125        }
126    }
127
128    /// Enqueue a new impulse and return its assigned id.
129    pub fn emit(
130        &self,
131        source_structure: u8,
132        source_node: [u8; 32],
133        target_structure: u8,
134        impulse_type: ImpulseType,
135        payload: serde_json::Value,
136        hlc_timestamp: u64,
137    ) -> u64 {
138        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
139        let impulse = Impulse {
140            id,
141            source_structure,
142            source_node,
143            target_structure,
144            impulse_type,
145            payload,
146            hlc_timestamp,
147            acknowledged: AtomicBool::new(false),
148        };
149        let mut q = self.queue.lock().expect("impulse queue poisoned");
150        q.push(impulse);
151        id
152    }
153
154    /// Drain all unacknowledged impulses, returning them sorted by
155    /// `hlc_timestamp` (ascending). Acknowledged impulses are discarded.
156    pub fn drain_ready(&self) -> Vec<Impulse> {
157        let mut q = self.queue.lock().expect("impulse queue poisoned");
158        let drained: Vec<Impulse> = q
159            .drain(..)
160            .filter(|imp| !imp.acknowledged.load(Ordering::Acquire))
161            .collect();
162        let mut sorted = drained;
163        sorted.sort_by_key(|imp| imp.hlc_timestamp);
164        sorted
165    }
166
167    /// Total number of impulses in the queue (acknowledged or not).
168    pub fn len(&self) -> usize {
169        self.queue.lock().expect("impulse queue poisoned").len()
170    }
171
172    /// Returns `true` if the queue contains no impulses.
173    pub fn is_empty(&self) -> bool {
174        self.len() == 0
175    }
176
177    /// Remove all impulses from the queue (e.g. during calibration).
178    pub fn clear(&self) {
179        self.queue.lock().expect("impulse queue poisoned").clear();
180    }
181
182    /// Count of impulses that have not yet been acknowledged.
183    pub fn pending_count(&self) -> usize {
184        self.queue
185            .lock()
186            .expect("impulse queue poisoned")
187            .iter()
188            .filter(|imp| !imp.acknowledged.load(Ordering::Acquire))
189            .count()
190    }
191}
192
193impl Default for ImpulseQueue {
194    fn default() -> Self {
195        Self::new()
196    }
197}
198
199// ---------------------------------------------------------------------------
200// Tests
201// ---------------------------------------------------------------------------
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206
207    #[test]
208    fn impulse_type_display() {
209        assert_eq!(ImpulseType::BeliefUpdate.to_string(), "BeliefUpdate");
210        assert_eq!(ImpulseType::CoherenceAlert.to_string(), "CoherenceAlert");
211        assert_eq!(ImpulseType::NoveltyDetected.to_string(), "NoveltyDetected");
212        assert_eq!(ImpulseType::EdgeConfirmed.to_string(), "EdgeConfirmed");
213        assert_eq!(
214            ImpulseType::EmbeddingRefined.to_string(),
215            "EmbeddingRefined"
216        );
217        assert_eq!(ImpulseType::Custom(42).to_string(), "Custom(42)");
218    }
219
220    #[test]
221    fn impulse_queue_new_empty() {
222        let q = ImpulseQueue::new();
223        assert!(q.is_empty());
224        assert_eq!(q.len(), 0);
225        assert_eq!(q.pending_count(), 0);
226    }
227
228    #[test]
229    fn impulse_queue_emit_assigns_id() {
230        let q = ImpulseQueue::new();
231        let node = [0u8; 32];
232        let id1 = q.emit(0, node, 2, ImpulseType::BeliefUpdate, serde_json::json!({}), 100);
233        let id2 = q.emit(1, node, 0, ImpulseType::CoherenceAlert, serde_json::json!({}), 200);
234        assert_eq!(id1, 1);
235        assert_eq!(id2, 2);
236        assert_eq!(q.len(), 2);
237    }
238
239    #[test]
240    fn impulse_queue_drain_sorted_by_hlc() {
241        let q = ImpulseQueue::new();
242        let node = [0u8; 32];
243        // Emit in reverse timestamp order.
244        q.emit(0, node, 2, ImpulseType::BeliefUpdate, serde_json::json!({}), 300);
245        q.emit(1, node, 0, ImpulseType::CoherenceAlert, serde_json::json!({}), 100);
246        q.emit(2, node, 0, ImpulseType::NoveltyDetected, serde_json::json!({}), 200);
247
248        let drained = q.drain_ready();
249        assert_eq!(drained.len(), 3);
250        assert_eq!(drained[0].hlc_timestamp, 100);
251        assert_eq!(drained[1].hlc_timestamp, 200);
252        assert_eq!(drained[2].hlc_timestamp, 300);
253    }
254
255    #[test]
256    fn impulse_queue_drain_removes_items() {
257        let q = ImpulseQueue::new();
258        let node = [0u8; 32];
259        q.emit(0, node, 2, ImpulseType::BeliefUpdate, serde_json::json!({}), 1);
260        q.emit(0, node, 2, ImpulseType::EdgeConfirmed, serde_json::json!({}), 2);
261        assert_eq!(q.len(), 2);
262
263        let drained = q.drain_ready();
264        assert_eq!(drained.len(), 2);
265        assert!(q.is_empty());
266    }
267
268    #[test]
269    fn impulse_queue_clear() {
270        let q = ImpulseQueue::new();
271        let node = [0u8; 32];
272        q.emit(0, node, 1, ImpulseType::EmbeddingRefined, serde_json::json!({}), 10);
273        q.emit(0, node, 1, ImpulseType::Custom(7), serde_json::json!({}), 20);
274        assert_eq!(q.len(), 2);
275
276        q.clear();
277        assert!(q.is_empty());
278        assert_eq!(q.len(), 0);
279    }
280
281    #[test]
282    fn impulse_queue_pending_count() {
283        let q = ImpulseQueue::new();
284        let node = [0u8; 32];
285        q.emit(0, node, 2, ImpulseType::BeliefUpdate, serde_json::json!({}), 1);
286        q.emit(1, node, 0, ImpulseType::CoherenceAlert, serde_json::json!({}), 2);
287        assert_eq!(q.pending_count(), 2);
288
289        // Acknowledge one via the internal queue.
290        {
291            let guard = q.queue.lock().unwrap();
292            guard[0].acknowledged.store(true, Ordering::Release);
293        }
294        assert_eq!(q.pending_count(), 1);
295    }
296
297    #[test]
298    fn impulse_emit_and_acknowledge() {
299        let q = ImpulseQueue::new();
300        let node = [1u8; 32];
301        q.emit(0, node, 2, ImpulseType::NoveltyDetected, serde_json::json!({"k": "v"}), 50);
302        q.emit(0, node, 3, ImpulseType::EdgeConfirmed, serde_json::json!(null), 60);
303
304        let drained = q.drain_ready();
305        assert_eq!(drained.len(), 2);
306        // Queue is now empty after drain.
307        assert!(q.is_empty());
308
309        // Mark drained impulses as acknowledged.
310        for imp in &drained {
311            imp.acknowledged.store(true, Ordering::Release);
312        }
313
314        // Verify acknowledgement persists on the drained copies.
315        for imp in &drained {
316            assert!(imp.acknowledged.load(Ordering::Acquire));
317        }
318    }
319}