1use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::Arc;
19use std::time::Instant;
20
21use serde::{Deserialize, Serialize};
22use tracing::{debug, warn};
23
24use crate::causal::{CausalEdgeType, CausalGraph};
25use crate::crossref::{CrossRef, CrossRefStore, CrossRefType, StructureTag, UniversalNodeId};
26use crate::embedding::{EmbeddingProvider};
27use crate::hnsw_service::HnswService;
28use crate::impulse::{Impulse, ImpulseQueue, ImpulseType};
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct DemocritusConfig {
37 pub max_impulses_per_tick: usize,
39 pub search_k: usize,
41 pub correlation_threshold: f32,
43 pub tick_budget_us: u64,
45}
46
47impl Default for DemocritusConfig {
48 fn default() -> Self {
49 Self {
50 max_impulses_per_tick: 64,
51 search_k: 5,
52 correlation_threshold: 0.7,
53 tick_budget_us: 15_000, }
55 }
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct DemocritusTickResult {
65 pub impulses_sensed: usize,
67 pub embeddings_produced: usize,
69 pub searches_performed: usize,
71 pub edges_added: usize,
73 pub crossrefs_added: usize,
75 pub budget_exceeded: bool,
77 pub duration_us: u64,
79}
80
81pub struct DemocritusLoop {
89 causal_graph: Arc<CausalGraph>,
91 hnsw: Arc<HnswService>,
92 impulse_queue: Arc<ImpulseQueue>,
93 crossref_store: Arc<CrossRefStore>,
94 embedding_provider: Arc<dyn EmbeddingProvider>,
95 config: DemocritusConfig,
97 total_ticks: AtomicU64,
99 total_nodes_added: AtomicU64,
100 total_edges_added: AtomicU64,
101}
102
103impl DemocritusLoop {
104 pub fn new(
106 causal_graph: Arc<CausalGraph>,
107 hnsw: Arc<HnswService>,
108 impulse_queue: Arc<ImpulseQueue>,
109 crossref_store: Arc<CrossRefStore>,
110 embedding_provider: Arc<dyn EmbeddingProvider>,
111 config: DemocritusConfig,
112 ) -> Self {
113 Self {
114 causal_graph,
115 hnsw,
116 impulse_queue,
117 crossref_store,
118 embedding_provider,
119 config,
120 total_ticks: AtomicU64::new(0),
121 total_nodes_added: AtomicU64::new(0),
122 total_edges_added: AtomicU64::new(0),
123 }
124 }
125
126 pub async fn tick(&self) -> DemocritusTickResult {
131 let start = Instant::now();
132 let mut result = DemocritusTickResult {
133 impulses_sensed: 0,
134 embeddings_produced: 0,
135 searches_performed: 0,
136 edges_added: 0,
137 crossrefs_added: 0,
138 budget_exceeded: false,
139 duration_us: 0,
140 };
141
142 let impulses = self.sense();
144 result.impulses_sensed = impulses.len();
145
146 if impulses.is_empty() {
147 result.duration_us = start.elapsed().as_micros() as u64;
148 self.commit(&result);
149 return result;
150 }
151
152 let embedded = self.embed(&impulses).await;
154 result.embeddings_produced = embedded.len();
155
156 if self.budget_exceeded(start) {
157 result.budget_exceeded = true;
158 result.duration_us = start.elapsed().as_micros() as u64;
159 self.commit(&result);
160 return result;
161 }
162
163 let non_empty_queries: Vec<(usize, &[f32])> = embedded
167 .iter()
168 .enumerate()
169 .filter(|(_, emb)| !emb.is_empty())
170 .map(|(i, emb)| (i, emb.as_slice()))
171 .collect();
172
173 let query_slices: Vec<&[f32]> = non_empty_queries.iter().map(|(_, s)| *s).collect();
174 let batch_results = if !query_slices.is_empty() {
175 self.hnsw.search_batch(&query_slices, self.config.search_k)
176 } else {
177 Vec::new()
178 };
179
180 let mut search_results_by_index: Vec<Vec<(String, f32)>> =
182 vec![Vec::new(); embedded.len()];
183 for (batch_idx, &(orig_idx, _)) in non_empty_queries.iter().enumerate() {
184 search_results_by_index[orig_idx] = batch_results
185 .get(batch_idx)
186 .map(|results| results.iter().map(|r| (r.id.clone(), r.score)).collect())
187 .unwrap_or_default();
188 }
189 result.searches_performed = non_empty_queries.len();
190
191 let mut neighbors_per_event: Vec<(&Impulse, &Vec<f32>, Vec<(String, f32)>)> =
192 Vec::with_capacity(embedded.len());
193 for (i, (impulse, embedding)) in impulses.iter().zip(embedded.iter()).enumerate() {
194 if self.budget_exceeded(start) {
195 result.budget_exceeded = true;
196 break;
197 }
198 let neighbors = std::mem::take(&mut search_results_by_index[i]);
199 neighbors_per_event.push((impulse, embedding, neighbors));
200 }
201
202 for (impulse, embedding, neighbors) in &neighbors_per_event {
204 if self.budget_exceeded(start) {
205 result.budget_exceeded = true;
206 break;
207 }
208 let (edges, crossrefs) = self.update(impulse, embedding, neighbors);
209 result.edges_added += edges;
210 result.crossrefs_added += crossrefs;
211 }
212
213 result.duration_us = start.elapsed().as_micros() as u64;
215 self.commit(&result);
216 result
217 }
218
219 fn sense(&self) -> Vec<Impulse> {
223 let mut impulses = self.impulse_queue.drain_ready();
224 impulses.truncate(self.config.max_impulses_per_tick);
225 impulses
226 }
227
228 async fn embed(&self, impulses: &[Impulse]) -> Vec<Vec<f32>> {
234 let texts: Vec<String> = impulses
235 .iter()
236 .map(|imp| {
237 let type_str = imp.impulse_type.to_string();
239 let payload_str = imp.payload.to_string();
240 format!("{type_str}:{payload_str}")
241 })
242 .collect();
243
244 let text_refs: Vec<&str> = texts.iter().map(|s| s.as_str()).collect();
245
246 match self.embedding_provider.embed_batch(&text_refs).await {
247 Ok(vecs) => vecs,
248 Err(e) => {
249 warn!("DEMOCRITUS embed phase failed, falling back to empty vectors: {e}");
250 vec![Vec::new(); impulses.len()]
251 }
252 }
253 }
254
255 #[allow(dead_code)]
260 fn search(&self, embedding: &[f32]) -> Vec<(String, f32)> {
261 if embedding.is_empty() {
262 return Vec::new();
263 }
264 self.hnsw
265 .search(embedding, self.config.search_k)
266 .into_iter()
267 .map(|r| (r.id, r.score))
268 .collect()
269 }
270
271 fn update(
277 &self,
278 impulse: &Impulse,
279 embedding: &[f32],
280 neighbors: &[(String, f32)],
281 ) -> (usize, usize) {
282 let mut edges_added = 0usize;
283 let mut crossrefs_added = 0usize;
284
285 let label = format!("impulse:{}:{}", impulse.impulse_type, impulse.id);
287 let node_id = self.causal_graph.add_node(
288 label.clone(),
289 impulse.payload.clone(),
290 );
291 self.total_nodes_added.fetch_add(1, Ordering::Relaxed);
292
293 if !embedding.is_empty() {
295 self.hnsw.insert(
296 node_id.to_string(),
297 embedding.to_vec(),
298 serde_json::json!({
299 "impulse_id": impulse.id,
300 "impulse_type": impulse.impulse_type.to_string(),
301 "hlc": impulse.hlc_timestamp,
302 }),
303 );
304 }
305
306 for (neighbor_id_str, score) in neighbors {
308 let edge_type = self.classify_edge(impulse, *score);
309
310 if let Ok(neighbor_node_id) = neighbor_id_str.parse::<u64>() {
311 let linked = self.causal_graph.link(
312 node_id,
313 neighbor_node_id,
314 edge_type,
315 *score,
316 impulse.hlc_timestamp,
317 0, );
319 if linked {
320 edges_added += 1;
321 self.total_edges_added.fetch_add(1, Ordering::Relaxed);
322 }
323 }
324 }
325
326 let source_tag = structure_tag_from_u8(impulse.source_structure);
328 let uni_id = UniversalNodeId::new(
329 &StructureTag::CausalGraph,
330 label.as_bytes(),
331 impulse.hlc_timestamp,
332 &impulse.source_node,
333 &[0u8; 32],
334 );
335 let source_uni_id = UniversalNodeId::from_bytes(impulse.source_node);
336 self.crossref_store.insert(CrossRef {
337 source: uni_id,
338 source_structure: StructureTag::CausalGraph,
339 target: source_uni_id,
340 target_structure: source_tag,
341 ref_type: CrossRefType::TriggeredBy,
342 created_at: impulse.hlc_timestamp,
343 chain_seq: 0,
344 });
345 crossrefs_added += 1;
346
347 (edges_added, crossrefs_added)
348 }
349
350 fn commit(&self, result: &DemocritusTickResult) {
352 self.total_ticks.fetch_add(1, Ordering::Relaxed);
353 debug!(
354 "DEMOCRITUS tick #{}: sensed={}, embedded={}, searched={}, edges={}, crossrefs={}, budget_exceeded={}, duration={}us",
355 self.total_ticks.load(Ordering::Relaxed),
356 result.impulses_sensed,
357 result.embeddings_produced,
358 result.searches_performed,
359 result.edges_added,
360 result.crossrefs_added,
361 result.budget_exceeded,
362 result.duration_us,
363 );
364 }
365
366 fn classify_edge(&self, impulse: &Impulse, score: f32) -> CausalEdgeType {
370 if score >= self.config.correlation_threshold {
372 return CausalEdgeType::Correlates;
373 }
374
375 match &impulse.impulse_type {
377 ImpulseType::BeliefUpdate | ImpulseType::NoveltyDetected => CausalEdgeType::Follows,
378 ImpulseType::EdgeConfirmed => CausalEdgeType::Causes,
379 ImpulseType::CoherenceAlert => CausalEdgeType::EvidenceFor,
380 ImpulseType::EmbeddingRefined => CausalEdgeType::Enables,
381 ImpulseType::Custom(_) => CausalEdgeType::Follows,
382 }
383 }
384
385 fn budget_exceeded(&self, start: Instant) -> bool {
387 start.elapsed().as_micros() as u64 > self.config.tick_budget_us
388 }
389
390 pub fn total_ticks(&self) -> u64 {
394 self.total_ticks.load(Ordering::Relaxed)
395 }
396
397 pub fn total_nodes_added(&self) -> u64 {
399 self.total_nodes_added.load(Ordering::Relaxed)
400 }
401
402 pub fn total_edges_added(&self) -> u64 {
404 self.total_edges_added.load(Ordering::Relaxed)
405 }
406}
407
408fn structure_tag_from_u8(tag: u8) -> StructureTag {
410 match tag {
411 0x01 => StructureTag::ExoChain,
412 0x02 => StructureTag::ResourceTree,
413 0x03 => StructureTag::CausalGraph,
414 0x04 => StructureTag::HnswIndex,
415 other => StructureTag::Custom(other),
416 }
417}
418
419#[cfg(test)]
424mod tests {
425 use super::*;
426 use crate::embedding::MockEmbeddingProvider;
427 use crate::hnsw_service::HnswServiceConfig;
428
429 fn make_loop() -> (
431 Arc<CausalGraph>,
432 Arc<HnswService>,
433 Arc<ImpulseQueue>,
434 Arc<CrossRefStore>,
435 DemocritusLoop,
436 ) {
437 make_loop_with_config(DemocritusConfig::default())
438 }
439
440 fn make_loop_with_config(
441 config: DemocritusConfig,
442 ) -> (
443 Arc<CausalGraph>,
444 Arc<HnswService>,
445 Arc<ImpulseQueue>,
446 Arc<CrossRefStore>,
447 DemocritusLoop,
448 ) {
449 let cg = Arc::new(CausalGraph::new());
450 let hnsw = Arc::new(HnswService::new(HnswServiceConfig {
451 default_dimensions: 8,
452 ..HnswServiceConfig::default()
453 }));
454 let iq = Arc::new(ImpulseQueue::new());
455 let crs = Arc::new(CrossRefStore::new());
456 let emb: Arc<dyn EmbeddingProvider> = Arc::new(MockEmbeddingProvider::new(8));
457
458 let democritus = DemocritusLoop::new(
459 Arc::clone(&cg),
460 Arc::clone(&hnsw),
461 Arc::clone(&iq),
462 Arc::clone(&crs),
463 emb,
464 config,
465 );
466 (cg, hnsw, iq, crs, democritus)
467 }
468
469 fn emit_test_impulse(iq: &ImpulseQueue, impulse_type: ImpulseType, ts: u64) -> u64 {
470 iq.emit(
471 StructureTag::CausalGraph.as_u8(),
472 [0u8; 32],
473 StructureTag::HnswIndex.as_u8(),
474 impulse_type,
475 serde_json::json!({"test": true}),
476 ts,
477 )
478 }
479
480 #[tokio::test]
483 async fn empty_queue_produces_no_work() {
484 let (_cg, _hnsw, _iq, _crs, demo) = make_loop();
485 let result = demo.tick().await;
486
487 assert_eq!(result.impulses_sensed, 0);
488 assert_eq!(result.embeddings_produced, 0);
489 assert_eq!(result.searches_performed, 0);
490 assert_eq!(result.edges_added, 0);
491 assert_eq!(result.crossrefs_added, 0);
492 assert!(!result.budget_exceeded);
493 assert_eq!(demo.total_ticks(), 1);
494 assert_eq!(demo.total_nodes_added(), 0);
495 }
496
497 #[tokio::test]
500 async fn single_impulse_full_pipeline() {
501 let (cg, hnsw, iq, crs, demo) = make_loop();
502
503 emit_test_impulse(&iq, ImpulseType::BeliefUpdate, 100);
504
505 let result = demo.tick().await;
506
507 assert_eq!(result.impulses_sensed, 1);
508 assert_eq!(result.embeddings_produced, 1);
509 assert_eq!(result.searches_performed, 1);
510 assert_eq!(result.edges_added, 0);
512 assert_eq!(result.crossrefs_added, 1);
514 assert_eq!(cg.node_count(), 1);
516 assert_eq!(hnsw.len(), 1);
518 assert_eq!(crs.count(), 1);
520 assert_eq!(demo.total_nodes_added(), 1);
521 }
522
523 #[tokio::test]
526 async fn multiple_impulses_batch_processing() {
527 let (cg, _hnsw, iq, crs, demo) = make_loop();
528
529 emit_test_impulse(&iq, ImpulseType::BeliefUpdate, 100);
530 emit_test_impulse(&iq, ImpulseType::CoherenceAlert, 200);
531 emit_test_impulse(&iq, ImpulseType::NoveltyDetected, 300);
532
533 let result = demo.tick().await;
534
535 assert_eq!(result.impulses_sensed, 3);
536 assert_eq!(result.embeddings_produced, 3);
537 assert_eq!(result.searches_performed, 3);
538 assert_eq!(result.crossrefs_added, 3);
539 assert_eq!(cg.node_count(), 3);
540 assert_eq!(crs.count(), 3);
541 }
542
543 #[tokio::test]
546 async fn tick_respects_budget() {
547 let config = DemocritusConfig {
549 tick_budget_us: 0,
550 ..DemocritusConfig::default()
551 };
552 let (_cg, _hnsw, iq, _crs, demo) = make_loop_with_config(config);
553
554 for i in 0..10 {
556 emit_test_impulse(&iq, ImpulseType::BeliefUpdate, i);
557 }
558
559 let result = demo.tick().await;
560
561 assert!(result.budget_exceeded);
563 assert_eq!(demo.total_ticks(), 1);
565 }
566
567 #[tokio::test]
570 async fn crossref_links_node_to_source() {
571 let (_cg, _hnsw, iq, crs, demo) = make_loop();
572
573 let source_node = [42u8; 32];
574 iq.emit(
575 StructureTag::ExoChain.as_u8(),
576 source_node,
577 StructureTag::HnswIndex.as_u8(),
578 ImpulseType::EdgeConfirmed,
579 serde_json::json!({"chain": "test"}),
580 500,
581 );
582
583 demo.tick().await;
584
585 let target_uni = UniversalNodeId::from_bytes(source_node);
587 let refs = crs.get_reverse(&target_uni);
588 assert_eq!(refs.len(), 1);
589 assert_eq!(refs[0].target_structure, StructureTag::ExoChain);
590 assert_eq!(refs[0].ref_type, CrossRefType::TriggeredBy);
591 }
592
593 #[tokio::test]
596 async fn tick_statistics_increment() {
597 let (_cg, _hnsw, iq, _crs, demo) = make_loop();
598
599 assert_eq!(demo.total_ticks(), 0);
600 assert_eq!(demo.total_nodes_added(), 0);
601 assert_eq!(demo.total_edges_added(), 0);
602
603 emit_test_impulse(&iq, ImpulseType::BeliefUpdate, 10);
605 demo.tick().await;
606 assert_eq!(demo.total_ticks(), 1);
607 assert_eq!(demo.total_nodes_added(), 1);
608
609 emit_test_impulse(&iq, ImpulseType::CoherenceAlert, 20);
611 emit_test_impulse(&iq, ImpulseType::NoveltyDetected, 30);
612 demo.tick().await;
613 assert_eq!(demo.total_ticks(), 2);
614 assert_eq!(demo.total_nodes_added(), 3);
615 }
616
617 #[tokio::test]
620 async fn hnsw_returns_neighbors_on_second_tick() {
621 let (_cg, hnsw, iq, _crs, demo) = make_loop();
622
623 emit_test_impulse(&iq, ImpulseType::BeliefUpdate, 100);
625 demo.tick().await;
626 assert_eq!(hnsw.len(), 1);
627
628 emit_test_impulse(&iq, ImpulseType::BeliefUpdate, 200);
630 let result = demo.tick().await;
631
632 assert_eq!(result.searches_performed, 1);
633 assert_eq!(hnsw.search_count(), 2);
637 }
638
639 #[tokio::test]
642 async fn edge_type_classification() {
643 let (_, _, _, _, demo) = make_loop();
644
645 let impulse_belief = Impulse {
646 id: 1,
647 source_structure: 0,
648 source_node: [0u8; 32],
649 target_structure: 2,
650 impulse_type: ImpulseType::BeliefUpdate,
651 payload: serde_json::json!({}),
652 hlc_timestamp: 0,
653 acknowledged: std::sync::atomic::AtomicBool::new(false),
654 };
655
656 assert_eq!(
658 demo.classify_edge(&impulse_belief, 0.9),
659 CausalEdgeType::Correlates
660 );
661
662 assert_eq!(
664 demo.classify_edge(&impulse_belief, 0.3),
665 CausalEdgeType::Follows
666 );
667
668 let impulse_confirmed = Impulse {
670 impulse_type: ImpulseType::EdgeConfirmed,
671 ..impulse_belief.clone()
672 };
673 assert_eq!(
674 demo.classify_edge(&impulse_confirmed, 0.3),
675 CausalEdgeType::Causes
676 );
677
678 let impulse_coherence = Impulse {
680 impulse_type: ImpulseType::CoherenceAlert,
681 ..impulse_belief.clone()
682 };
683 assert_eq!(
684 demo.classify_edge(&impulse_coherence, 0.3),
685 CausalEdgeType::EvidenceFor
686 );
687
688 let impulse_refined = Impulse {
690 impulse_type: ImpulseType::EmbeddingRefined,
691 ..impulse_belief.clone()
692 };
693 assert_eq!(
694 demo.classify_edge(&impulse_refined, 0.3),
695 CausalEdgeType::Enables
696 );
697 }
698
699 #[tokio::test]
702 async fn commit_updates_tick_counter() {
703 let (_, _, _, _, demo) = make_loop();
704
705 demo.tick().await;
707 demo.tick().await;
708 demo.tick().await;
709
710 assert_eq!(demo.total_ticks(), 3);
711 }
712
713 #[tokio::test]
716 async fn embedding_error_falls_back_gracefully() {
717 use crate::embedding::EmbeddingError;
718
719 struct FailingProvider;
721
722 #[async_trait::async_trait]
723 impl EmbeddingProvider for FailingProvider {
724 async fn embed(&self, _text: &str) -> Result<Vec<f32>, EmbeddingError> {
725 Err(EmbeddingError::BackendError("test failure".into()))
726 }
727 async fn embed_batch(
728 &self,
729 _texts: &[&str],
730 ) -> Result<Vec<Vec<f32>>, EmbeddingError> {
731 Err(EmbeddingError::BackendError("test failure".into()))
732 }
733 fn dimensions(&self) -> usize {
734 8
735 }
736 fn model_name(&self) -> &str {
737 "failing-test"
738 }
739 }
740
741 let cg = Arc::new(CausalGraph::new());
742 let hnsw = Arc::new(HnswService::new(HnswServiceConfig::default()));
743 let iq = Arc::new(ImpulseQueue::new());
744 let crs = Arc::new(CrossRefStore::new());
745 let emb: Arc<dyn EmbeddingProvider> = Arc::new(FailingProvider);
746
747 let demo = DemocritusLoop::new(
748 Arc::clone(&cg),
749 Arc::clone(&hnsw),
750 Arc::clone(&iq),
751 Arc::clone(&crs),
752 emb,
753 DemocritusConfig::default(),
754 );
755
756 emit_test_impulse(&iq, ImpulseType::BeliefUpdate, 100);
757
758 let result = demo.tick().await;
759
760 assert_eq!(result.impulses_sensed, 1);
762 assert_eq!(result.embeddings_produced, 1); assert_eq!(result.searches_performed, 1);
765 assert_eq!(cg.node_count(), 1);
767 assert_eq!(hnsw.len(), 0);
769 assert_eq!(result.crossrefs_added, 1);
771 }
772
773 #[tokio::test]
776 async fn max_impulses_per_tick_truncation() {
777 let config = DemocritusConfig {
778 max_impulses_per_tick: 2,
779 ..DemocritusConfig::default()
780 };
781 let (_cg, _hnsw, iq, _crs, demo) = make_loop_with_config(config);
782
783 for i in 0..5 {
785 emit_test_impulse(&iq, ImpulseType::BeliefUpdate, i);
786 }
787
788 let result = demo.tick().await;
789
790 assert_eq!(result.impulses_sensed, 2);
792 assert_eq!(result.embeddings_produced, 2);
793 }
794
795 #[test]
798 fn structure_tag_roundtrip() {
799 assert_eq!(structure_tag_from_u8(0x01), StructureTag::ExoChain);
800 assert_eq!(structure_tag_from_u8(0x02), StructureTag::ResourceTree);
801 assert_eq!(structure_tag_from_u8(0x03), StructureTag::CausalGraph);
802 assert_eq!(structure_tag_from_u8(0x04), StructureTag::HnswIndex);
803 assert_eq!(structure_tag_from_u8(0xFF), StructureTag::Custom(0xFF));
804 }
805
806 #[tokio::test]
809 async fn budget_exhaustion_with_many_impulses() {
810 let config = DemocritusConfig {
813 tick_budget_us: 0,
814 max_impulses_per_tick: 100,
815 ..DemocritusConfig::default()
816 };
817 let (cg, _hnsw, iq, _crs, demo) = make_loop_with_config(config);
818
819 for i in 0..50 {
820 emit_test_impulse(&iq, ImpulseType::BeliefUpdate, i);
821 }
822
823 let result = demo.tick().await;
824 assert!(result.budget_exceeded);
825 assert_eq!(demo.total_ticks(), 1);
827 assert!(result.impulses_sensed <= 50);
829 assert!(cg.node_count() <= result.impulses_sensed as u64);
832 }
833
834 #[tokio::test]
835 async fn budget_exceeded_flag_only_set_when_needed() {
836 let config = DemocritusConfig {
838 tick_budget_us: 10_000_000, ..DemocritusConfig::default()
840 };
841 let (_, _, iq, _, demo) = make_loop_with_config(config);
842
843 emit_test_impulse(&iq, ImpulseType::BeliefUpdate, 1);
844 let result = demo.tick().await;
845 assert!(!result.budget_exceeded);
846 }
847
848 #[tokio::test]
851 async fn impulse_queue_large_burst() {
852 let config = DemocritusConfig {
853 max_impulses_per_tick: 10,
854 ..DemocritusConfig::default()
855 };
856 let (_, _, iq, _, demo) = make_loop_with_config(config);
857
858 for i in 0..500 {
860 emit_test_impulse(&iq, ImpulseType::BeliefUpdate, i);
861 }
862
863 let r1 = demo.tick().await;
865 assert_eq!(r1.impulses_sensed, 10);
866
867 let r2 = demo.tick().await;
870 assert_eq!(r2.impulses_sensed, 0);
871 }
872
873 #[tokio::test]
874 async fn impulse_queue_interleaved_emit_and_tick() {
875 let (cg, _, iq, _, demo) = make_loop();
876
877 emit_test_impulse(&iq, ImpulseType::BeliefUpdate, 10);
879 let r1 = demo.tick().await;
880 assert_eq!(r1.impulses_sensed, 1);
881 assert_eq!(cg.node_count(), 1);
882
883 emit_test_impulse(&iq, ImpulseType::CoherenceAlert, 20);
884 emit_test_impulse(&iq, ImpulseType::NoveltyDetected, 30);
885 let r2 = demo.tick().await;
886 assert_eq!(r2.impulses_sensed, 2);
887 assert_eq!(cg.node_count(), 3);
888
889 assert_eq!(demo.total_ticks(), 2);
890 assert_eq!(demo.total_nodes_added(), 3);
891 }
892
893 #[tokio::test]
896 async fn embed_failure_still_creates_crossrefs() {
897 use crate::embedding::EmbeddingError;
898
899 struct FailingProvider;
900
901 #[async_trait::async_trait]
902 impl EmbeddingProvider for FailingProvider {
903 async fn embed(&self, _text: &str) -> Result<Vec<f32>, EmbeddingError> {
904 Err(EmbeddingError::BackendError("test failure".into()))
905 }
906 async fn embed_batch(
907 &self,
908 _texts: &[&str],
909 ) -> Result<Vec<Vec<f32>>, EmbeddingError> {
910 Err(EmbeddingError::BackendError("test failure".into()))
911 }
912 fn dimensions(&self) -> usize {
913 8
914 }
915 fn model_name(&self) -> &str {
916 "failing-test"
917 }
918 }
919
920 let cg = Arc::new(CausalGraph::new());
921 let hnsw = Arc::new(HnswService::new(HnswServiceConfig::default()));
922 let iq = Arc::new(ImpulseQueue::new());
923 let crs = Arc::new(CrossRefStore::new());
924 let emb: Arc<dyn EmbeddingProvider> = Arc::new(FailingProvider);
925
926 let demo = DemocritusLoop::new(
927 Arc::clone(&cg),
928 Arc::clone(&hnsw),
929 Arc::clone(&iq),
930 Arc::clone(&crs),
931 emb,
932 DemocritusConfig::default(),
933 );
934
935 for i in 0..5 {
937 emit_test_impulse(&iq, ImpulseType::BeliefUpdate, i * 100);
938 }
939
940 let result = demo.tick().await;
941 assert_eq!(result.impulses_sensed, 5);
942 assert_eq!(result.embeddings_produced, 5);
944 assert_eq!(cg.node_count(), 5);
946 assert_eq!(result.crossrefs_added, 5);
948 assert_eq!(crs.count(), 5);
949 assert_eq!(hnsw.len(), 0);
951 }
952
953 #[tokio::test]
954 async fn embed_failure_no_edges_added() {
955 use crate::embedding::EmbeddingError;
956
957 struct FailingProvider;
958
959 #[async_trait::async_trait]
960 impl EmbeddingProvider for FailingProvider {
961 async fn embed(&self, _text: &str) -> Result<Vec<f32>, EmbeddingError> {
962 Err(EmbeddingError::BackendError("fail".into()))
963 }
964 async fn embed_batch(
965 &self,
966 _texts: &[&str],
967 ) -> Result<Vec<Vec<f32>>, EmbeddingError> {
968 Err(EmbeddingError::BackendError("fail".into()))
969 }
970 fn dimensions(&self) -> usize {
971 8
972 }
973 fn model_name(&self) -> &str {
974 "fail"
975 }
976 }
977
978 let cg = Arc::new(CausalGraph::new());
979 let hnsw = Arc::new(HnswService::new(HnswServiceConfig::default()));
980 let iq = Arc::new(ImpulseQueue::new());
981 let crs = Arc::new(CrossRefStore::new());
982 let emb: Arc<dyn EmbeddingProvider> = Arc::new(FailingProvider);
983
984 let demo = DemocritusLoop::new(
985 Arc::clone(&cg),
986 Arc::clone(&hnsw),
987 Arc::clone(&iq),
988 Arc::clone(&crs),
989 emb,
990 DemocritusConfig::default(),
991 );
992
993 emit_test_impulse(&iq, ImpulseType::BeliefUpdate, 100);
994 let result = demo.tick().await;
995 assert_eq!(result.edges_added, 0);
997 assert_eq!(demo.total_edges_added(), 0);
998 }
999
1000 #[tokio::test]
1003 async fn multiple_sequential_ticks_accumulate_state() {
1004 let (cg, hnsw, iq, crs, demo) = make_loop();
1005
1006 emit_test_impulse(&iq, ImpulseType::BeliefUpdate, 100);
1008 let r1 = demo.tick().await;
1009 assert_eq!(r1.impulses_sensed, 1);
1010 let nodes_after_1 = cg.node_count();
1011 let hnsw_after_1 = hnsw.len();
1012
1013 emit_test_impulse(&iq, ImpulseType::CoherenceAlert, 200);
1015 emit_test_impulse(&iq, ImpulseType::NoveltyDetected, 300);
1016 let r2 = demo.tick().await;
1017 assert_eq!(r2.impulses_sensed, 2);
1018 assert_eq!(cg.node_count(), nodes_after_1 + 2);
1019 assert_eq!(hnsw.len(), hnsw_after_1 + 2);
1020
1021 emit_test_impulse(&iq, ImpulseType::EdgeConfirmed, 400);
1023 emit_test_impulse(&iq, ImpulseType::EmbeddingRefined, 500);
1024 emit_test_impulse(&iq, ImpulseType::Custom(42), 600);
1025 let r3 = demo.tick().await;
1026 assert_eq!(r3.impulses_sensed, 3);
1027 assert_eq!(cg.node_count(), nodes_after_1 + 5);
1028
1029 assert_eq!(demo.total_ticks(), 3);
1031 assert_eq!(demo.total_nodes_added(), 6);
1032 assert_eq!(crs.count(), 6);
1034 }
1035
1036 #[tokio::test]
1037 async fn sequential_ticks_can_find_prior_neighbors() {
1038 let config = DemocritusConfig {
1039 correlation_threshold: 0.0, ..DemocritusConfig::default()
1041 };
1042 let (cg, hnsw, iq, _, demo) = make_loop_with_config(config);
1043
1044 emit_test_impulse(&iq, ImpulseType::BeliefUpdate, 100);
1046 demo.tick().await;
1047 assert_eq!(hnsw.len(), 1);
1048
1049 emit_test_impulse(&iq, ImpulseType::BeliefUpdate, 200);
1051 let r2 = demo.tick().await;
1052 assert_eq!(r2.searches_performed, 1);
1053 assert!(cg.node_count() >= 2);
1058 }
1059
1060 #[tokio::test]
1063 async fn zero_max_impulses_per_tick() {
1064 let config = DemocritusConfig {
1065 max_impulses_per_tick: 0,
1066 ..DemocritusConfig::default()
1067 };
1068 let (_, _, iq, _, demo) = make_loop_with_config(config);
1069
1070 emit_test_impulse(&iq, ImpulseType::BeliefUpdate, 100);
1071 let result = demo.tick().await;
1072 assert_eq!(result.impulses_sensed, 0);
1074 assert_eq!(result.embeddings_produced, 0);
1075 }
1076
1077 #[tokio::test]
1078 async fn tick_result_duration_is_positive() {
1079 let (_, _, iq, _, demo) = make_loop();
1080 emit_test_impulse(&iq, ImpulseType::BeliefUpdate, 100);
1081 let result = demo.tick().await;
1082 assert!(result.duration_us < 10_000_000, "tick should complete within 10s");
1084 }
1085
1086 #[test]
1087 fn classify_edge_custom_impulse_type() {
1088 let (_, _, _, _, demo) = make_loop();
1089 let impulse = Impulse {
1090 id: 1,
1091 source_structure: 0,
1092 source_node: [0u8; 32],
1093 target_structure: 2,
1094 impulse_type: ImpulseType::Custom(99),
1095 payload: serde_json::json!({}),
1096 hlc_timestamp: 0,
1097 acknowledged: std::sync::atomic::AtomicBool::new(false),
1098 };
1099
1100 assert_eq!(
1102 demo.classify_edge(&impulse, 0.3),
1103 CausalEdgeType::Follows
1104 );
1105 assert_eq!(
1107 demo.classify_edge(&impulse, 0.9),
1108 CausalEdgeType::Correlates
1109 );
1110 }
1111}