Skip to main content

clawft_kernel/
democritus.rs

1//! DEMOCRITUS continuous cognitive loop (ECC decision D5).
2//!
3//! The [`DemocritusLoop`] is the nervous system of WeftOS — an integration
4//! layer that orchestrates the ECC subsystems on every cognitive tick:
5//!
6//! ```text
7//! SENSE → EMBED → SEARCH → UPDATE → COMMIT
8//! ```
9//!
10//! It drains the [`ImpulseQueue`] for new events, embeds them via the
11//! configured [`EmbeddingProvider`], queries HNSW for nearest neighbors,
12//! updates the [`CausalGraph`] with inferred edges, registers cross-refs
13//! in the [`CrossRefStore`], and logs the result.
14//!
15//! This module is compiled only when the `ecc` feature is enabled.
16
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::Arc;
19use std::time::Instant;
20
21use serde::{Deserialize, Serialize};
22use tracing::{debug, warn};
23
24use crate::causal::{CausalEdgeType, CausalGraph};
25use crate::crossref::{CrossRef, CrossRefStore, CrossRefType, StructureTag, UniversalNodeId};
26use crate::embedding::{EmbeddingProvider};
27use crate::hnsw_service::HnswService;
28use crate::impulse::{Impulse, ImpulseQueue, ImpulseType};
29
30// ---------------------------------------------------------------------------
31// Configuration
32// ---------------------------------------------------------------------------
33
34/// Configuration for the DEMOCRITUS loop.
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct DemocritusConfig {
37    /// Maximum number of impulses to process per tick.
38    pub max_impulses_per_tick: usize,
39    /// Number of nearest neighbors to retrieve during SEARCH phase.
40    pub search_k: usize,
41    /// Cosine similarity threshold above which two events are considered correlated.
42    pub correlation_threshold: f32,
43    /// Budget for a single tick in microseconds. If exceeded, the tick stops early.
44    pub tick_budget_us: u64,
45}
46
47impl Default for DemocritusConfig {
48    fn default() -> Self {
49        Self {
50            max_impulses_per_tick: 64,
51            search_k: 5,
52            correlation_threshold: 0.7,
53            tick_budget_us: 15_000, // 15ms
54        }
55    }
56}
57
58// ---------------------------------------------------------------------------
59// Tick result
60// ---------------------------------------------------------------------------
61
62/// Summary of a single DEMOCRITUS tick cycle.
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct DemocritusTickResult {
65    /// Number of impulses drained in the SENSE phase.
66    pub impulses_sensed: usize,
67    /// Number of embeddings produced in the EMBED phase.
68    pub embeddings_produced: usize,
69    /// Number of HNSW searches performed in the SEARCH phase.
70    pub searches_performed: usize,
71    /// Number of causal edges added in the UPDATE phase.
72    pub edges_added: usize,
73    /// Number of cross-refs registered in the UPDATE phase.
74    pub crossrefs_added: usize,
75    /// Whether the tick was cut short due to budget exhaustion.
76    pub budget_exceeded: bool,
77    /// Wall-clock duration of the tick in microseconds.
78    pub duration_us: u64,
79}
80
81// ---------------------------------------------------------------------------
82// DemocritusLoop
83// ---------------------------------------------------------------------------
84
85/// The DEMOCRITUS continuous cognitive loop.
86///
87/// Runs every CognitiveTick cycle: Sense -> Embed -> Search -> Update -> Commit.
88pub struct DemocritusLoop {
89    // ECC subsystem references
90    causal_graph: Arc<CausalGraph>,
91    hnsw: Arc<HnswService>,
92    impulse_queue: Arc<ImpulseQueue>,
93    crossref_store: Arc<CrossRefStore>,
94    embedding_provider: Arc<dyn EmbeddingProvider>,
95    // Configuration
96    config: DemocritusConfig,
97    // Tick statistics
98    total_ticks: AtomicU64,
99    total_nodes_added: AtomicU64,
100    total_edges_added: AtomicU64,
101}
102
103impl DemocritusLoop {
104    /// Create a new DEMOCRITUS loop wired to the given ECC subsystems.
105    pub fn new(
106        causal_graph: Arc<CausalGraph>,
107        hnsw: Arc<HnswService>,
108        impulse_queue: Arc<ImpulseQueue>,
109        crossref_store: Arc<CrossRefStore>,
110        embedding_provider: Arc<dyn EmbeddingProvider>,
111        config: DemocritusConfig,
112    ) -> Self {
113        Self {
114            causal_graph,
115            hnsw,
116            impulse_queue,
117            crossref_store,
118            embedding_provider,
119            config,
120            total_ticks: AtomicU64::new(0),
121            total_nodes_added: AtomicU64::new(0),
122            total_edges_added: AtomicU64::new(0),
123        }
124    }
125
126    /// Execute one full tick cycle: Sense -> Embed -> Search -> Update -> Commit.
127    ///
128    /// Returns a summary of what was processed. This is the method the
129    /// [`CognitiveTick`] loop should call on each cycle.
130    pub async fn tick(&self) -> DemocritusTickResult {
131        let start = Instant::now();
132        let mut result = DemocritusTickResult {
133            impulses_sensed: 0,
134            embeddings_produced: 0,
135            searches_performed: 0,
136            edges_added: 0,
137            crossrefs_added: 0,
138            budget_exceeded: false,
139            duration_us: 0,
140        };
141
142        // ── SENSE ────────────────────────────────────────────────────
143        let impulses = self.sense();
144        result.impulses_sensed = impulses.len();
145
146        if impulses.is_empty() {
147            result.duration_us = start.elapsed().as_micros() as u64;
148            self.commit(&result);
149            return result;
150        }
151
152        // ── EMBED ────────────────────────────────────────────────────
153        let embedded = self.embed(&impulses).await;
154        result.embeddings_produced = embedded.len();
155
156        if self.budget_exceeded(start) {
157            result.budget_exceeded = true;
158            result.duration_us = start.elapsed().as_micros() as u64;
159            self.commit(&result);
160            return result;
161        }
162
163        // ── SEARCH ───────────────────────────────────────────────────
164        // Batch all HNSW searches under a single mutex acquisition
165        // instead of locking per-impulse (Task 3: batch Mutex).
166        let non_empty_queries: Vec<(usize, &[f32])> = embedded
167            .iter()
168            .enumerate()
169            .filter(|(_, emb)| !emb.is_empty())
170            .map(|(i, emb)| (i, emb.as_slice()))
171            .collect();
172
173        let query_slices: Vec<&[f32]> = non_empty_queries.iter().map(|(_, s)| *s).collect();
174        let batch_results = if !query_slices.is_empty() {
175            self.hnsw.search_batch(&query_slices, self.config.search_k)
176        } else {
177            Vec::new()
178        };
179
180        // Reassemble: map batch results back to their impulse indices.
181        let mut search_results_by_index: Vec<Vec<(String, f32)>> =
182            vec![Vec::new(); embedded.len()];
183        for (batch_idx, &(orig_idx, _)) in non_empty_queries.iter().enumerate() {
184            search_results_by_index[orig_idx] = batch_results
185                .get(batch_idx)
186                .map(|results| results.iter().map(|r| (r.id.clone(), r.score)).collect())
187                .unwrap_or_default();
188        }
189        result.searches_performed = non_empty_queries.len();
190
191        let mut neighbors_per_event: Vec<(&Impulse, &Vec<f32>, Vec<(String, f32)>)> =
192            Vec::with_capacity(embedded.len());
193        for (i, (impulse, embedding)) in impulses.iter().zip(embedded.iter()).enumerate() {
194            if self.budget_exceeded(start) {
195                result.budget_exceeded = true;
196                break;
197            }
198            let neighbors = std::mem::take(&mut search_results_by_index[i]);
199            neighbors_per_event.push((impulse, embedding, neighbors));
200        }
201
202        // ── UPDATE ───────────────────────────────────────────────────
203        for (impulse, embedding, neighbors) in &neighbors_per_event {
204            if self.budget_exceeded(start) {
205                result.budget_exceeded = true;
206                break;
207            }
208            let (edges, crossrefs) = self.update(impulse, embedding, neighbors);
209            result.edges_added += edges;
210            result.crossrefs_added += crossrefs;
211        }
212
213        // ── COMMIT ───────────────────────────────────────────────────
214        result.duration_us = start.elapsed().as_micros() as u64;
215        self.commit(&result);
216        result
217    }
218
219    // ── Phase implementations ────────────────────────────────────────
220
221    /// SENSE: drain the impulse queue up to the per-tick limit.
222    fn sense(&self) -> Vec<Impulse> {
223        let mut impulses = self.impulse_queue.drain_ready();
224        impulses.truncate(self.config.max_impulses_per_tick);
225        impulses
226    }
227
228    /// EMBED: convert each impulse's payload to a vector embedding.
229    ///
230    /// On embedding failure, falls back to an empty vector (the impulse
231    /// will still be recorded in the causal graph but won't participate
232    /// in similarity search).
233    async fn embed(&self, impulses: &[Impulse]) -> Vec<Vec<f32>> {
234        let texts: Vec<String> = impulses
235            .iter()
236            .map(|imp| {
237                // Build a text representation from the impulse payload.
238                let type_str = imp.impulse_type.to_string();
239                let payload_str = imp.payload.to_string();
240                format!("{type_str}:{payload_str}")
241            })
242            .collect();
243
244        let text_refs: Vec<&str> = texts.iter().map(|s| s.as_str()).collect();
245
246        match self.embedding_provider.embed_batch(&text_refs).await {
247            Ok(vecs) => vecs,
248            Err(e) => {
249                warn!("DEMOCRITUS embed phase failed, falling back to empty vectors: {e}");
250                vec![Vec::new(); impulses.len()]
251            }
252        }
253    }
254
255    /// SEARCH: query HNSW for k nearest neighbors of the given embedding.
256    ///
257    /// Note: The tick loop now uses `search_batch` for batched mutex
258    /// acquisition. This method is retained for single-query callers.
259    #[allow(dead_code)]
260    fn search(&self, embedding: &[f32]) -> Vec<(String, f32)> {
261        if embedding.is_empty() {
262            return Vec::new();
263        }
264        self.hnsw
265            .search(embedding, self.config.search_k)
266            .into_iter()
267            .map(|r| (r.id, r.score))
268            .collect()
269    }
270
271    /// UPDATE: add a causal node for the impulse, insert into HNSW,
272    /// create causal edges based on neighbor similarity, and register
273    /// cross-references.
274    ///
275    /// Returns (edges_added, crossrefs_added).
276    fn update(
277        &self,
278        impulse: &Impulse,
279        embedding: &[f32],
280        neighbors: &[(String, f32)],
281    ) -> (usize, usize) {
282        let mut edges_added = 0usize;
283        let mut crossrefs_added = 0usize;
284
285        // Add a causal node for this impulse.
286        let label = format!("impulse:{}:{}", impulse.impulse_type, impulse.id);
287        let node_id = self.causal_graph.add_node(
288            label.clone(),
289            impulse.payload.clone(),
290        );
291        self.total_nodes_added.fetch_add(1, Ordering::Relaxed);
292
293        // Insert embedding into HNSW (keyed by causal node ID).
294        if !embedding.is_empty() {
295            self.hnsw.insert(
296                node_id.to_string(),
297                embedding.to_vec(),
298                serde_json::json!({
299                    "impulse_id": impulse.id,
300                    "impulse_type": impulse.impulse_type.to_string(),
301                    "hlc": impulse.hlc_timestamp,
302                }),
303            );
304        }
305
306        // Create causal edges based on neighbor similarity.
307        for (neighbor_id_str, score) in neighbors {
308            let edge_type = self.classify_edge(impulse, *score);
309
310            if let Ok(neighbor_node_id) = neighbor_id_str.parse::<u64>() {
311                let linked = self.causal_graph.link(
312                    node_id,
313                    neighbor_node_id,
314                    edge_type,
315                    *score,
316                    impulse.hlc_timestamp,
317                    0, // chain_seq; set during exochain commit if enabled
318                );
319                if linked {
320                    edges_added += 1;
321                    self.total_edges_added.fetch_add(1, Ordering::Relaxed);
322                }
323            }
324        }
325
326        // Register a cross-reference linking the causal node to its source structure.
327        let source_tag = structure_tag_from_u8(impulse.source_structure);
328        let uni_id = UniversalNodeId::new(
329            &StructureTag::CausalGraph,
330            label.as_bytes(),
331            impulse.hlc_timestamp,
332            &impulse.source_node,
333            &[0u8; 32],
334        );
335        let source_uni_id = UniversalNodeId::from_bytes(impulse.source_node);
336        self.crossref_store.insert(CrossRef {
337            source: uni_id,
338            source_structure: StructureTag::CausalGraph,
339            target: source_uni_id,
340            target_structure: source_tag,
341            ref_type: CrossRefType::TriggeredBy,
342            created_at: impulse.hlc_timestamp,
343            chain_seq: 0,
344        });
345        crossrefs_added += 1;
346
347        (edges_added, crossrefs_added)
348    }
349
350    /// COMMIT: update tick statistics and log the result.
351    fn commit(&self, result: &DemocritusTickResult) {
352        self.total_ticks.fetch_add(1, Ordering::Relaxed);
353        debug!(
354            "DEMOCRITUS tick #{}: sensed={}, embedded={}, searched={}, edges={}, crossrefs={}, budget_exceeded={}, duration={}us",
355            self.total_ticks.load(Ordering::Relaxed),
356            result.impulses_sensed,
357            result.embeddings_produced,
358            result.searches_performed,
359            result.edges_added,
360            result.crossrefs_added,
361            result.budget_exceeded,
362            result.duration_us,
363        );
364    }
365
366    // ── Helpers ──────────────────────────────────────────────────────
367
368    /// Classify the edge type based on impulse context and similarity score.
369    fn classify_edge(&self, impulse: &Impulse, score: f32) -> CausalEdgeType {
370        // High similarity → Correlates (statistically similar events).
371        if score >= self.config.correlation_threshold {
372            return CausalEdgeType::Correlates;
373        }
374
375        // Impulse type hints at causal direction.
376        match &impulse.impulse_type {
377            ImpulseType::BeliefUpdate | ImpulseType::NoveltyDetected => CausalEdgeType::Follows,
378            ImpulseType::EdgeConfirmed => CausalEdgeType::Causes,
379            ImpulseType::CoherenceAlert => CausalEdgeType::EvidenceFor,
380            ImpulseType::EmbeddingRefined => CausalEdgeType::Enables,
381            ImpulseType::Custom(_) => CausalEdgeType::Follows,
382        }
383    }
384
385    /// Check if the tick budget has been exceeded.
386    fn budget_exceeded(&self, start: Instant) -> bool {
387        start.elapsed().as_micros() as u64 > self.config.tick_budget_us
388    }
389
390    // ── Statistics accessors ─────────────────────────────────────────
391
392    /// Total number of ticks executed.
393    pub fn total_ticks(&self) -> u64 {
394        self.total_ticks.load(Ordering::Relaxed)
395    }
396
397    /// Total number of causal nodes added across all ticks.
398    pub fn total_nodes_added(&self) -> u64 {
399        self.total_nodes_added.load(Ordering::Relaxed)
400    }
401
402    /// Total number of causal edges added across all ticks.
403    pub fn total_edges_added(&self) -> u64 {
404        self.total_edges_added.load(Ordering::Relaxed)
405    }
406}
407
408/// Map a raw `u8` structure tag back to a [`StructureTag`] variant.
409fn structure_tag_from_u8(tag: u8) -> StructureTag {
410    match tag {
411        0x01 => StructureTag::ExoChain,
412        0x02 => StructureTag::ResourceTree,
413        0x03 => StructureTag::CausalGraph,
414        0x04 => StructureTag::HnswIndex,
415        other => StructureTag::Custom(other),
416    }
417}
418
419// ---------------------------------------------------------------------------
420// Tests
421// ---------------------------------------------------------------------------
422
423#[cfg(test)]
424mod tests {
425    use super::*;
426    use crate::embedding::MockEmbeddingProvider;
427    use crate::hnsw_service::HnswServiceConfig;
428
429    /// Helper: build a fully wired DemocritusLoop with default config.
430    fn make_loop() -> (
431        Arc<CausalGraph>,
432        Arc<HnswService>,
433        Arc<ImpulseQueue>,
434        Arc<CrossRefStore>,
435        DemocritusLoop,
436    ) {
437        make_loop_with_config(DemocritusConfig::default())
438    }
439
440    fn make_loop_with_config(
441        config: DemocritusConfig,
442    ) -> (
443        Arc<CausalGraph>,
444        Arc<HnswService>,
445        Arc<ImpulseQueue>,
446        Arc<CrossRefStore>,
447        DemocritusLoop,
448    ) {
449        let cg = Arc::new(CausalGraph::new());
450        let hnsw = Arc::new(HnswService::new(HnswServiceConfig {
451            default_dimensions: 8,
452            ..HnswServiceConfig::default()
453        }));
454        let iq = Arc::new(ImpulseQueue::new());
455        let crs = Arc::new(CrossRefStore::new());
456        let emb: Arc<dyn EmbeddingProvider> = Arc::new(MockEmbeddingProvider::new(8));
457
458        let democritus = DemocritusLoop::new(
459            Arc::clone(&cg),
460            Arc::clone(&hnsw),
461            Arc::clone(&iq),
462            Arc::clone(&crs),
463            emb,
464            config,
465        );
466        (cg, hnsw, iq, crs, democritus)
467    }
468
469    fn emit_test_impulse(iq: &ImpulseQueue, impulse_type: ImpulseType, ts: u64) -> u64 {
470        iq.emit(
471            StructureTag::CausalGraph.as_u8(),
472            [0u8; 32],
473            StructureTag::HnswIndex.as_u8(),
474            impulse_type,
475            serde_json::json!({"test": true}),
476            ts,
477        )
478    }
479
480    // ── Test 1: Empty impulse queue — tick completes with no new nodes ──
481
482    #[tokio::test]
483    async fn empty_queue_produces_no_work() {
484        let (_cg, _hnsw, _iq, _crs, demo) = make_loop();
485        let result = demo.tick().await;
486
487        assert_eq!(result.impulses_sensed, 0);
488        assert_eq!(result.embeddings_produced, 0);
489        assert_eq!(result.searches_performed, 0);
490        assert_eq!(result.edges_added, 0);
491        assert_eq!(result.crossrefs_added, 0);
492        assert!(!result.budget_exceeded);
493        assert_eq!(demo.total_ticks(), 1);
494        assert_eq!(demo.total_nodes_added(), 0);
495    }
496
497    // ── Test 2: Single impulse → full pipeline ──
498
499    #[tokio::test]
500    async fn single_impulse_full_pipeline() {
501        let (cg, hnsw, iq, crs, demo) = make_loop();
502
503        emit_test_impulse(&iq, ImpulseType::BeliefUpdate, 100);
504
505        let result = demo.tick().await;
506
507        assert_eq!(result.impulses_sensed, 1);
508        assert_eq!(result.embeddings_produced, 1);
509        assert_eq!(result.searches_performed, 1);
510        // No pre-existing neighbors, so no edges added.
511        assert_eq!(result.edges_added, 0);
512        // One cross-ref should be registered.
513        assert_eq!(result.crossrefs_added, 1);
514        // Causal graph should have one node.
515        assert_eq!(cg.node_count(), 1);
516        // HNSW should have one entry.
517        assert_eq!(hnsw.len(), 1);
518        // CrossRefStore should have one entry.
519        assert_eq!(crs.count(), 1);
520        assert_eq!(demo.total_nodes_added(), 1);
521    }
522
523    // ── Test 3: Multiple impulses in one tick — batch processing ──
524
525    #[tokio::test]
526    async fn multiple_impulses_batch_processing() {
527        let (cg, _hnsw, iq, crs, demo) = make_loop();
528
529        emit_test_impulse(&iq, ImpulseType::BeliefUpdate, 100);
530        emit_test_impulse(&iq, ImpulseType::CoherenceAlert, 200);
531        emit_test_impulse(&iq, ImpulseType::NoveltyDetected, 300);
532
533        let result = demo.tick().await;
534
535        assert_eq!(result.impulses_sensed, 3);
536        assert_eq!(result.embeddings_produced, 3);
537        assert_eq!(result.searches_performed, 3);
538        assert_eq!(result.crossrefs_added, 3);
539        assert_eq!(cg.node_count(), 3);
540        assert_eq!(crs.count(), 3);
541    }
542
543    // ── Test 4: Tick respects budget (stops early if budget exceeded) ──
544
545    #[tokio::test]
546    async fn tick_respects_budget() {
547        // Use a budget of 0 microseconds so the tick must stop immediately.
548        let config = DemocritusConfig {
549            tick_budget_us: 0,
550            ..DemocritusConfig::default()
551        };
552        let (_cg, _hnsw, iq, _crs, demo) = make_loop_with_config(config);
553
554        // Emit several impulses.
555        for i in 0..10 {
556            emit_test_impulse(&iq, ImpulseType::BeliefUpdate, i);
557        }
558
559        let result = demo.tick().await;
560
561        // With a zero budget, the tick should have been cut short.
562        assert!(result.budget_exceeded);
563        // Tick counter still increments.
564        assert_eq!(demo.total_ticks(), 1);
565    }
566
567    // ── Test 5: CrossRef created linking new node to source entity ──
568
569    #[tokio::test]
570    async fn crossref_links_node_to_source() {
571        let (_cg, _hnsw, iq, crs, demo) = make_loop();
572
573        let source_node = [42u8; 32];
574        iq.emit(
575            StructureTag::ExoChain.as_u8(),
576            source_node,
577            StructureTag::HnswIndex.as_u8(),
578            ImpulseType::EdgeConfirmed,
579            serde_json::json!({"chain": "test"}),
580            500,
581        );
582
583        demo.tick().await;
584
585        // Verify cross-ref exists with the correct target (the source node).
586        let target_uni = UniversalNodeId::from_bytes(source_node);
587        let refs = crs.get_reverse(&target_uni);
588        assert_eq!(refs.len(), 1);
589        assert_eq!(refs[0].target_structure, StructureTag::ExoChain);
590        assert_eq!(refs[0].ref_type, CrossRefType::TriggeredBy);
591    }
592
593    // ── Test 6: Tick statistics increment correctly ──
594
595    #[tokio::test]
596    async fn tick_statistics_increment() {
597        let (_cg, _hnsw, iq, _crs, demo) = make_loop();
598
599        assert_eq!(demo.total_ticks(), 0);
600        assert_eq!(demo.total_nodes_added(), 0);
601        assert_eq!(demo.total_edges_added(), 0);
602
603        // Tick 1: one impulse.
604        emit_test_impulse(&iq, ImpulseType::BeliefUpdate, 10);
605        demo.tick().await;
606        assert_eq!(demo.total_ticks(), 1);
607        assert_eq!(demo.total_nodes_added(), 1);
608
609        // Tick 2: two impulses.
610        emit_test_impulse(&iq, ImpulseType::CoherenceAlert, 20);
611        emit_test_impulse(&iq, ImpulseType::NoveltyDetected, 30);
612        demo.tick().await;
613        assert_eq!(demo.total_ticks(), 2);
614        assert_eq!(demo.total_nodes_added(), 3);
615    }
616
617    // ── Test 7: HNSW search returns relevant neighbors ──
618
619    #[tokio::test]
620    async fn hnsw_returns_neighbors_on_second_tick() {
621        let (_cg, hnsw, iq, _crs, demo) = make_loop();
622
623        // First tick: insert a node.
624        emit_test_impulse(&iq, ImpulseType::BeliefUpdate, 100);
625        demo.tick().await;
626        assert_eq!(hnsw.len(), 1);
627
628        // Second tick: same impulse type/payload should find the first as neighbor.
629        emit_test_impulse(&iq, ImpulseType::BeliefUpdate, 200);
630        let result = demo.tick().await;
631
632        assert_eq!(result.searches_performed, 1);
633        // The search should have found the node from tick 1.
634        // Whether an edge is added depends on the score vs threshold,
635        // but the search itself was performed.
636        assert_eq!(hnsw.search_count(), 2);
637    }
638
639    // ── Test 8: Causal edge type selection ──
640
641    #[tokio::test]
642    async fn edge_type_classification() {
643        let (_, _, _, _, demo) = make_loop();
644
645        let impulse_belief = Impulse {
646            id: 1,
647            source_structure: 0,
648            source_node: [0u8; 32],
649            target_structure: 2,
650            impulse_type: ImpulseType::BeliefUpdate,
651            payload: serde_json::json!({}),
652            hlc_timestamp: 0,
653            acknowledged: std::sync::atomic::AtomicBool::new(false),
654        };
655
656        // High similarity → Correlates.
657        assert_eq!(
658            demo.classify_edge(&impulse_belief, 0.9),
659            CausalEdgeType::Correlates
660        );
661
662        // Below threshold, BeliefUpdate → Follows.
663        assert_eq!(
664            demo.classify_edge(&impulse_belief, 0.3),
665            CausalEdgeType::Follows
666        );
667
668        // EdgeConfirmed → Causes.
669        let impulse_confirmed = Impulse {
670            impulse_type: ImpulseType::EdgeConfirmed,
671            ..impulse_belief.clone()
672        };
673        assert_eq!(
674            demo.classify_edge(&impulse_confirmed, 0.3),
675            CausalEdgeType::Causes
676        );
677
678        // CoherenceAlert → EvidenceFor.
679        let impulse_coherence = Impulse {
680            impulse_type: ImpulseType::CoherenceAlert,
681            ..impulse_belief.clone()
682        };
683        assert_eq!(
684            demo.classify_edge(&impulse_coherence, 0.3),
685            CausalEdgeType::EvidenceFor
686        );
687
688        // EmbeddingRefined → Enables.
689        let impulse_refined = Impulse {
690            impulse_type: ImpulseType::EmbeddingRefined,
691            ..impulse_belief.clone()
692        };
693        assert_eq!(
694            demo.classify_edge(&impulse_refined, 0.3),
695            CausalEdgeType::Enables
696        );
697    }
698
699    // ── Test 9: Commit phase logs and updates total_ticks ──
700
701    #[tokio::test]
702    async fn commit_updates_tick_counter() {
703        let (_, _, _, _, demo) = make_loop();
704
705        // Empty ticks still increment the tick counter.
706        demo.tick().await;
707        demo.tick().await;
708        demo.tick().await;
709
710        assert_eq!(demo.total_ticks(), 3);
711    }
712
713    // ── Test 10: Embedding errors handled gracefully ──
714
715    #[tokio::test]
716    async fn embedding_error_falls_back_gracefully() {
717        use crate::embedding::EmbeddingError;
718
719        /// Provider that always fails.
720        struct FailingProvider;
721
722        #[async_trait::async_trait]
723        impl EmbeddingProvider for FailingProvider {
724            async fn embed(&self, _text: &str) -> Result<Vec<f32>, EmbeddingError> {
725                Err(EmbeddingError::BackendError("test failure".into()))
726            }
727            async fn embed_batch(
728                &self,
729                _texts: &[&str],
730            ) -> Result<Vec<Vec<f32>>, EmbeddingError> {
731                Err(EmbeddingError::BackendError("test failure".into()))
732            }
733            fn dimensions(&self) -> usize {
734                8
735            }
736            fn model_name(&self) -> &str {
737                "failing-test"
738            }
739        }
740
741        let cg = Arc::new(CausalGraph::new());
742        let hnsw = Arc::new(HnswService::new(HnswServiceConfig::default()));
743        let iq = Arc::new(ImpulseQueue::new());
744        let crs = Arc::new(CrossRefStore::new());
745        let emb: Arc<dyn EmbeddingProvider> = Arc::new(FailingProvider);
746
747        let demo = DemocritusLoop::new(
748            Arc::clone(&cg),
749            Arc::clone(&hnsw),
750            Arc::clone(&iq),
751            Arc::clone(&crs),
752            emb,
753            DemocritusConfig::default(),
754        );
755
756        emit_test_impulse(&iq, ImpulseType::BeliefUpdate, 100);
757
758        let result = demo.tick().await;
759
760        // Embedding failed → empty vectors, but tick still completes.
761        assert_eq!(result.impulses_sensed, 1);
762        assert_eq!(result.embeddings_produced, 1); // fallback produces empty vecs
763        // Search with empty vector returns nothing, so 0 searches (no-op).
764        assert_eq!(result.searches_performed, 1);
765        // Node still added to causal graph.
766        assert_eq!(cg.node_count(), 1);
767        // But no HNSW insertion (empty embedding skipped).
768        assert_eq!(hnsw.len(), 0);
769        // Cross-ref still created.
770        assert_eq!(result.crossrefs_added, 1);
771    }
772
773    // ── Test 11: max_impulses_per_tick truncation ──
774
775    #[tokio::test]
776    async fn max_impulses_per_tick_truncation() {
777        let config = DemocritusConfig {
778            max_impulses_per_tick: 2,
779            ..DemocritusConfig::default()
780        };
781        let (_cg, _hnsw, iq, _crs, demo) = make_loop_with_config(config);
782
783        // Emit 5 impulses.
784        for i in 0..5 {
785            emit_test_impulse(&iq, ImpulseType::BeliefUpdate, i);
786        }
787
788        let result = demo.tick().await;
789
790        // Only 2 should be processed due to truncation.
791        assert_eq!(result.impulses_sensed, 2);
792        assert_eq!(result.embeddings_produced, 2);
793    }
794
795    // ── Test 12: structure_tag_from_u8 mapping ──
796
797    #[test]
798    fn structure_tag_roundtrip() {
799        assert_eq!(structure_tag_from_u8(0x01), StructureTag::ExoChain);
800        assert_eq!(structure_tag_from_u8(0x02), StructureTag::ResourceTree);
801        assert_eq!(structure_tag_from_u8(0x03), StructureTag::CausalGraph);
802        assert_eq!(structure_tag_from_u8(0x04), StructureTag::HnswIndex);
803        assert_eq!(structure_tag_from_u8(0xFF), StructureTag::Custom(0xFF));
804    }
805
806    // ── Sprint 11: Budget exhaustion tests ──────────────────────────
807
808    #[tokio::test]
809    async fn budget_exhaustion_with_many_impulses() {
810        // Use a budget of 0 microseconds with many impulses to force
811        // budget exhaustion at different phases.
812        let config = DemocritusConfig {
813            tick_budget_us: 0,
814            max_impulses_per_tick: 100,
815            ..DemocritusConfig::default()
816        };
817        let (cg, _hnsw, iq, _crs, demo) = make_loop_with_config(config);
818
819        for i in 0..50 {
820            emit_test_impulse(&iq, ImpulseType::BeliefUpdate, i);
821        }
822
823        let result = demo.tick().await;
824        assert!(result.budget_exceeded);
825        // Even with budget exceeded, tick count increments.
826        assert_eq!(demo.total_ticks(), 1);
827        // Some impulses may have been sensed before budget check.
828        assert!(result.impulses_sensed <= 50);
829        // Causal graph nodes added should match embeddings completed
830        // (may be fewer than sensed due to budget).
831        assert!(cg.node_count() <= result.impulses_sensed as u64);
832    }
833
834    #[tokio::test]
835    async fn budget_exceeded_flag_only_set_when_needed() {
836        // Large budget should not trigger budget_exceeded.
837        let config = DemocritusConfig {
838            tick_budget_us: 10_000_000, // 10 seconds
839            ..DemocritusConfig::default()
840        };
841        let (_, _, iq, _, demo) = make_loop_with_config(config);
842
843        emit_test_impulse(&iq, ImpulseType::BeliefUpdate, 1);
844        let result = demo.tick().await;
845        assert!(!result.budget_exceeded);
846    }
847
848    // ── Sprint 11: ImpulseQueue overflow tests ──────────────────────
849
850    #[tokio::test]
851    async fn impulse_queue_large_burst() {
852        let config = DemocritusConfig {
853            max_impulses_per_tick: 10,
854            ..DemocritusConfig::default()
855        };
856        let (_, _, iq, _, demo) = make_loop_with_config(config);
857
858        // Emit far more impulses than per-tick limit.
859        for i in 0..500 {
860            emit_test_impulse(&iq, ImpulseType::BeliefUpdate, i);
861        }
862
863        // First tick processes at most 10.
864        let r1 = demo.tick().await;
865        assert_eq!(r1.impulses_sensed, 10);
866
867        // Queue was drained fully (drain_ready takes all), but only 10 processed.
868        // Remaining impulses are gone (drain clears the queue).
869        let r2 = demo.tick().await;
870        assert_eq!(r2.impulses_sensed, 0);
871    }
872
873    #[tokio::test]
874    async fn impulse_queue_interleaved_emit_and_tick() {
875        let (cg, _, iq, _, demo) = make_loop();
876
877        // Emit, tick, emit, tick — verify state accumulates.
878        emit_test_impulse(&iq, ImpulseType::BeliefUpdate, 10);
879        let r1 = demo.tick().await;
880        assert_eq!(r1.impulses_sensed, 1);
881        assert_eq!(cg.node_count(), 1);
882
883        emit_test_impulse(&iq, ImpulseType::CoherenceAlert, 20);
884        emit_test_impulse(&iq, ImpulseType::NoveltyDetected, 30);
885        let r2 = demo.tick().await;
886        assert_eq!(r2.impulses_sensed, 2);
887        assert_eq!(cg.node_count(), 3);
888
889        assert_eq!(demo.total_ticks(), 2);
890        assert_eq!(demo.total_nodes_added(), 3);
891    }
892
893    // ── Sprint 11: Embed failure recovery tests ─────────────────────
894
895    #[tokio::test]
896    async fn embed_failure_still_creates_crossrefs() {
897        use crate::embedding::EmbeddingError;
898
899        struct FailingProvider;
900
901        #[async_trait::async_trait]
902        impl EmbeddingProvider for FailingProvider {
903            async fn embed(&self, _text: &str) -> Result<Vec<f32>, EmbeddingError> {
904                Err(EmbeddingError::BackendError("test failure".into()))
905            }
906            async fn embed_batch(
907                &self,
908                _texts: &[&str],
909            ) -> Result<Vec<Vec<f32>>, EmbeddingError> {
910                Err(EmbeddingError::BackendError("test failure".into()))
911            }
912            fn dimensions(&self) -> usize {
913                8
914            }
915            fn model_name(&self) -> &str {
916                "failing-test"
917            }
918        }
919
920        let cg = Arc::new(CausalGraph::new());
921        let hnsw = Arc::new(HnswService::new(HnswServiceConfig::default()));
922        let iq = Arc::new(ImpulseQueue::new());
923        let crs = Arc::new(CrossRefStore::new());
924        let emb: Arc<dyn EmbeddingProvider> = Arc::new(FailingProvider);
925
926        let demo = DemocritusLoop::new(
927            Arc::clone(&cg),
928            Arc::clone(&hnsw),
929            Arc::clone(&iq),
930            Arc::clone(&crs),
931            emb,
932            DemocritusConfig::default(),
933        );
934
935        // Emit multiple impulses.
936        for i in 0..5 {
937            emit_test_impulse(&iq, ImpulseType::BeliefUpdate, i * 100);
938        }
939
940        let result = demo.tick().await;
941        assert_eq!(result.impulses_sensed, 5);
942        // Fallback: 5 empty vectors produced.
943        assert_eq!(result.embeddings_produced, 5);
944        // Causal nodes still created despite embed failure.
945        assert_eq!(cg.node_count(), 5);
946        // Cross-refs still created.
947        assert_eq!(result.crossrefs_added, 5);
948        assert_eq!(crs.count(), 5);
949        // No HNSW insertions (empty embeddings skipped).
950        assert_eq!(hnsw.len(), 0);
951    }
952
953    #[tokio::test]
954    async fn embed_failure_no_edges_added() {
955        use crate::embedding::EmbeddingError;
956
957        struct FailingProvider;
958
959        #[async_trait::async_trait]
960        impl EmbeddingProvider for FailingProvider {
961            async fn embed(&self, _text: &str) -> Result<Vec<f32>, EmbeddingError> {
962                Err(EmbeddingError::BackendError("fail".into()))
963            }
964            async fn embed_batch(
965                &self,
966                _texts: &[&str],
967            ) -> Result<Vec<Vec<f32>>, EmbeddingError> {
968                Err(EmbeddingError::BackendError("fail".into()))
969            }
970            fn dimensions(&self) -> usize {
971                8
972            }
973            fn model_name(&self) -> &str {
974                "fail"
975            }
976        }
977
978        let cg = Arc::new(CausalGraph::new());
979        let hnsw = Arc::new(HnswService::new(HnswServiceConfig::default()));
980        let iq = Arc::new(ImpulseQueue::new());
981        let crs = Arc::new(CrossRefStore::new());
982        let emb: Arc<dyn EmbeddingProvider> = Arc::new(FailingProvider);
983
984        let demo = DemocritusLoop::new(
985            Arc::clone(&cg),
986            Arc::clone(&hnsw),
987            Arc::clone(&iq),
988            Arc::clone(&crs),
989            emb,
990            DemocritusConfig::default(),
991        );
992
993        emit_test_impulse(&iq, ImpulseType::BeliefUpdate, 100);
994        let result = demo.tick().await;
995        // With empty embeddings, search returns no neighbors, so no edges.
996        assert_eq!(result.edges_added, 0);
997        assert_eq!(demo.total_edges_added(), 0);
998    }
999
1000    // ── Sprint 11: Multiple sequential ticks with accumulated state ──
1001
1002    #[tokio::test]
1003    async fn multiple_sequential_ticks_accumulate_state() {
1004        let (cg, hnsw, iq, crs, demo) = make_loop();
1005
1006        // Tick 1: single impulse.
1007        emit_test_impulse(&iq, ImpulseType::BeliefUpdate, 100);
1008        let r1 = demo.tick().await;
1009        assert_eq!(r1.impulses_sensed, 1);
1010        let nodes_after_1 = cg.node_count();
1011        let hnsw_after_1 = hnsw.len();
1012
1013        // Tick 2: two more impulses.
1014        emit_test_impulse(&iq, ImpulseType::CoherenceAlert, 200);
1015        emit_test_impulse(&iq, ImpulseType::NoveltyDetected, 300);
1016        let r2 = demo.tick().await;
1017        assert_eq!(r2.impulses_sensed, 2);
1018        assert_eq!(cg.node_count(), nodes_after_1 + 2);
1019        assert_eq!(hnsw.len(), hnsw_after_1 + 2);
1020
1021        // Tick 3: three more.
1022        emit_test_impulse(&iq, ImpulseType::EdgeConfirmed, 400);
1023        emit_test_impulse(&iq, ImpulseType::EmbeddingRefined, 500);
1024        emit_test_impulse(&iq, ImpulseType::Custom(42), 600);
1025        let r3 = demo.tick().await;
1026        assert_eq!(r3.impulses_sensed, 3);
1027        assert_eq!(cg.node_count(), nodes_after_1 + 5);
1028
1029        // Total statistics.
1030        assert_eq!(demo.total_ticks(), 3);
1031        assert_eq!(demo.total_nodes_added(), 6);
1032        // Cross-refs: one per impulse.
1033        assert_eq!(crs.count(), 6);
1034    }
1035
1036    #[tokio::test]
1037    async fn sequential_ticks_can_find_prior_neighbors() {
1038        let config = DemocritusConfig {
1039            correlation_threshold: 0.0, // accept all as correlated
1040            ..DemocritusConfig::default()
1041        };
1042        let (cg, hnsw, iq, _, demo) = make_loop_with_config(config);
1043
1044        // Tick 1: insert a node.
1045        emit_test_impulse(&iq, ImpulseType::BeliefUpdate, 100);
1046        demo.tick().await;
1047        assert_eq!(hnsw.len(), 1);
1048
1049        // Tick 2: same type should find tick-1's node as neighbor.
1050        emit_test_impulse(&iq, ImpulseType::BeliefUpdate, 200);
1051        let r2 = demo.tick().await;
1052        assert_eq!(r2.searches_performed, 1);
1053        // With threshold=0.0, any non-zero similarity creates an edge.
1054        // The mock provider produces deterministic vectors, so same impulse
1055        // type gets same embedding, yielding high similarity.
1056        // Edges depend on whether neighbor_id parses as a valid node_id.
1057        assert!(cg.node_count() >= 2);
1058    }
1059
1060    // ── Sprint 11: Config edge cases ────────────────────────────────
1061
1062    #[tokio::test]
1063    async fn zero_max_impulses_per_tick() {
1064        let config = DemocritusConfig {
1065            max_impulses_per_tick: 0,
1066            ..DemocritusConfig::default()
1067        };
1068        let (_, _, iq, _, demo) = make_loop_with_config(config);
1069
1070        emit_test_impulse(&iq, ImpulseType::BeliefUpdate, 100);
1071        let result = demo.tick().await;
1072        // Truncation to 0 means no impulses processed.
1073        assert_eq!(result.impulses_sensed, 0);
1074        assert_eq!(result.embeddings_produced, 0);
1075    }
1076
1077    #[tokio::test]
1078    async fn tick_result_duration_is_positive() {
1079        let (_, _, iq, _, demo) = make_loop();
1080        emit_test_impulse(&iq, ImpulseType::BeliefUpdate, 100);
1081        let result = demo.tick().await;
1082        // Duration should be non-negative (may be 0 on very fast systems).
1083        assert!(result.duration_us < 10_000_000, "tick should complete within 10s");
1084    }
1085
1086    #[test]
1087    fn classify_edge_custom_impulse_type() {
1088        let (_, _, _, _, demo) = make_loop();
1089        let impulse = Impulse {
1090            id: 1,
1091            source_structure: 0,
1092            source_node: [0u8; 32],
1093            target_structure: 2,
1094            impulse_type: ImpulseType::Custom(99),
1095            payload: serde_json::json!({}),
1096            hlc_timestamp: 0,
1097            acknowledged: std::sync::atomic::AtomicBool::new(false),
1098        };
1099
1100        // Custom type below threshold → Follows.
1101        assert_eq!(
1102            demo.classify_edge(&impulse, 0.3),
1103            CausalEdgeType::Follows
1104        );
1105        // Custom type above threshold → Correlates.
1106        assert_eq!(
1107            demo.classify_edge(&impulse, 0.9),
1108            CausalEdgeType::Correlates
1109        );
1110    }
1111}