1use core::{future::Future, pin::Pin};
7use dwbase_core::{
8 Atom, AtomId, AtomKind, Importance, Link, LinkKind, Timestamp, WorkerKey, WorldKey,
9};
10use serde::{Deserialize, Serialize};
11use serde_json::json;
12use std::collections::{HashMap, VecDeque};
13use std::hash::{Hash, Hasher};
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::RwLock;
16use std::time::Instant as StdInstant;
17use thiserror::Error;
18use time::format_description::well_known::Rfc3339;
19use time::OffsetDateTime;
20
21pub type Result<T> = std::result::Result<T, DwbaseError>;
22
23#[derive(Debug, Error)]
25pub enum DwbaseError {
26 #[error("capability denied: {0}")]
27 CapabilityDenied(String),
28 #[error("invalid input: {0}")]
29 InvalidInput(String),
30 #[error("storage error: {0}")]
31 Storage(String),
32 #[error("vector error: {0}")]
33 Vector(String),
34 #[error("stream error: {0}")]
35 Stream(String),
36 #[error("internal error: {0}")]
37 Internal(String),
38}
39
40#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
42pub struct NewAtom {
43 pub world: WorldKey,
44 pub worker: WorkerKey,
45 pub kind: AtomKind,
46 pub timestamp: Timestamp,
47 pub importance: Importance,
48 pub payload_json: String,
49 pub vector: Option<Vec<f32>>,
50 pub flags: Vec<String>,
51 pub labels: Vec<String>,
52 pub links: Vec<Link>,
53}
54
55#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
57pub struct AtomFilter {
58 pub world: Option<WorldKey>,
59 pub kinds: Vec<AtomKind>,
60 pub labels: Vec<String>,
61 pub flags: Vec<String>,
62 pub since: Option<Timestamp>,
63 pub until: Option<Timestamp>,
64 pub limit: Option<usize>,
65}
66
67#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
69pub struct Question {
70 pub world: WorldKey,
71 pub text: String,
72 pub filter: Option<AtomFilter>,
73}
74
75#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
77pub struct Answer {
78 pub world: WorldKey,
79 pub text: String,
80 pub supporting_atoms: Vec<Atom>,
81 #[serde(default)]
82 pub warnings: Vec<String>,
83}
84
85#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
87pub struct WorldMeta {
88 pub world: WorldKey,
89 pub description: Option<String>,
90 pub labels: Vec<String>,
91}
92
93#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
95pub enum WorldAction {
96 Create(WorldMeta),
97 Archive(WorldKey),
98 Resume(WorldKey),
99}
100
101pub struct ReflexIndex {
103 config: ReflexIndexConfig,
104 buckets: RwLock<HashMap<WorldKey, HashMap<i64, Vec<Atom>>>>,
105}
106
107#[derive(Clone, Debug)]
108pub struct ReflexIndexConfig {
109 pub recency_seconds: u64,
111 pub bucket_width_seconds: u64,
113}
114
115impl Default for ReflexIndexConfig {
116 fn default() -> Self {
117 Self {
118 recency_seconds: 300,
119 bucket_width_seconds: 1,
120 }
121 }
122}
123
124#[derive(Default)]
125struct ConflictIndex {
126 supersedes: RwLock<HashMap<AtomId, AtomId>>,
127 contradicts: RwLock<HashMap<AtomId, Vec<AtomId>>>,
128 confirms: RwLock<HashMap<AtomId, Vec<AtomId>>>,
129}
130
131#[derive(Clone, Debug, Default)]
132struct GcPolicy {
133 retention_days: Option<u64>,
134 min_importance: Option<f32>,
135 replication_allow: Vec<String>,
136 replication_deny: Vec<String>,
137}
138
139#[derive(Clone, Debug, Serialize, Deserialize)]
140pub struct IndexMetadata {
141 pub world: WorldKey,
142 pub version: u64,
143 pub embedder_version: String,
144 pub last_rebuilt: Timestamp,
145 pub ready: bool,
146 #[serde(default)]
147 pub rebuilding: bool,
148 #[serde(default)]
149 pub progress: f32,
150 #[serde(default)]
151 pub started_at: Option<Timestamp>,
152 #[serde(default)]
153 pub last_progress: Option<Timestamp>,
154}
155
156impl ConflictIndex {
157 fn register(&self, atom: &Atom) {
158 let mut supersedes = self.supersedes.write().expect("supersedes lock");
159 let mut contradicts = self.contradicts.write().expect("contradicts lock");
160 let mut confirms = self.confirms.write().expect("confirms lock");
161 for link in atom.links() {
162 match link.kind {
163 LinkKind::Supersedes => {
164 supersedes.insert(link.target.clone(), atom.id().clone());
165 }
166 LinkKind::Contradicts => {
167 contradicts
168 .entry(link.target.clone())
169 .or_default()
170 .push(atom.id().clone());
171 }
172 LinkKind::Confirms => {
173 confirms
174 .entry(link.target.clone())
175 .or_default()
176 .push(atom.id().clone());
177 }
178 LinkKind::References => {}
179 }
180 }
181 }
182
183 fn superseded_by(&self, id: &AtomId) -> Option<AtomId> {
184 self.supersedes
185 .read()
186 .expect("supersedes lock")
187 .get(id)
188 .cloned()
189 }
190
191 fn contradiction_count(&self, id: &AtomId) -> usize {
192 self.contradicts
193 .read()
194 .expect("contradicts lock")
195 .get(id)
196 .map(|v| v.len())
197 .unwrap_or(0)
198 }
199
200 fn confirmation_count(&self, id: &AtomId) -> usize {
201 self.confirms
202 .read()
203 .expect("confirms lock")
204 .get(id)
205 .map(|v| v.len())
206 .unwrap_or(0)
207 }
208}
209
210#[derive(Clone, Debug, Default)]
211pub struct ReflexFilter {
212 pub world: Option<WorldKey>,
213 pub kinds: Vec<AtomKind>,
214 pub labels: Vec<String>,
215 pub author: Option<WorkerKey>,
216 pub min_importance: Option<f32>,
217 pub since: Option<Timestamp>,
218 pub exclude_flags: Vec<String>,
219}
220
221impl ReflexFilter {
222 pub fn from_question(question: &Question) -> Self {
223 let mut base = ReflexFilter {
224 world: Some(question.world.clone()),
225 ..Default::default()
226 };
227 if let Some(filter) = &question.filter {
228 base.kinds = filter.kinds.clone();
229 base.labels = filter.labels.clone();
230 base.exclude_flags = filter.flags.clone();
231 base.since = filter.since.clone();
232 }
233 base
234 }
235}
236
237impl ReflexIndex {
238 pub fn new(config: ReflexIndexConfig) -> Self {
239 Self {
240 config,
241 buckets: RwLock::new(HashMap::new()),
242 }
243 }
244
245 fn parse_ts(&self, ts: &Timestamp) -> Result<i64> {
246 let dt = OffsetDateTime::parse(&ts.0, &Rfc3339)
247 .map_err(|e| DwbaseError::InvalidInput(format!("invalid timestamp {}: {}", ts.0, e)))?;
248 Ok(dt.unix_timestamp())
249 }
250
251 fn bucket_id(&self, ts: i64) -> i64 {
252 let width = self.config.bucket_width_seconds.max(1) as i64;
253 ts / width
254 }
255
256 fn trim_old_buckets(&self, world_buckets: &mut HashMap<i64, Vec<Atom>>, newest: i64) {
257 let window_buckets =
258 (self.config.recency_seconds / self.config.bucket_width_seconds.max(1)).max(1) as i64;
259 let min_bucket = newest - window_buckets;
260 #[cfg(feature = "metrics")]
261 let before = world_buckets.len();
262 world_buckets.retain(|bucket, _| *bucket >= min_bucket);
263 #[cfg(feature = "metrics")]
264 dwbase_metrics::record_gc_evictions(before.saturating_sub(world_buckets.len()) as u64);
265 }
266
267 pub fn insert(&self, atom: Atom) -> Result<()> {
269 let ts = self.parse_ts(atom.timestamp())?;
270 let bucket = self.bucket_id(ts);
271 let mut guard = self.buckets.write().expect("poisoned reflex index lock");
272 let world_buckets = guard.entry(atom.world().clone()).or_default();
273 world_buckets.entry(bucket).or_default().push(atom);
274 self.trim_old_buckets(world_buckets, bucket);
275 Ok(())
276 }
277
278 pub fn filter(&self, filter: &ReflexFilter) -> Result<Vec<Atom>> {
280 let since_epoch = if let Some(since) = &filter.since {
281 Some(self.parse_ts(since)?)
282 } else {
283 None
284 };
285
286 let guard = self.buckets.read().expect("poisoned reflex index lock");
287 let world_views: Vec<(&WorldKey, &HashMap<i64, Vec<Atom>>)> = match &filter.world {
288 Some(world) => guard
289 .get(world)
290 .map(|b| vec![(world, b)])
291 .unwrap_or_default(),
292 None => guard.iter().collect(),
293 };
294
295 let mut results = Vec::new();
296 for (_world, buckets) in world_views {
297 for atoms in buckets.values() {
298 for atom in atoms {
299 if !filter.kinds.is_empty() && !filter.kinds.contains(atom.kind()) {
300 continue;
301 }
302 if let Some(author) = &filter.author {
303 if atom.worker() != author {
304 continue;
305 }
306 }
307 if let Some(min_imp) = filter.min_importance {
308 if atom.importance().get() < min_imp {
309 continue;
310 }
311 }
312 if let Some(since) = since_epoch {
313 let atom_ts = self.parse_ts(atom.timestamp())?;
314 if atom_ts < since {
315 continue;
316 }
317 }
318 if !filter.labels.is_empty()
319 && !filter.labels.iter().all(|l| atom.labels().contains(l))
320 {
321 continue;
322 }
323 if !filter.exclude_flags.is_empty()
324 && filter
325 .exclude_flags
326 .iter()
327 .any(|f| atom.flags().contains(f))
328 {
329 continue;
330 }
331 results.push(atom.clone());
332 }
333 }
334 }
335 Ok(results)
336 }
337}
338
339pub trait StorageEngine {
341 fn append(&self, atom: Atom) -> Result<()>;
342 fn get_by_ids(&self, ids: &[AtomId]) -> Result<Vec<Atom>>;
343 fn scan(&self, world: &WorldKey, filter: &AtomFilter) -> Result<Vec<Atom>>;
344 fn stats(&self, world: &WorldKey) -> Result<StorageStats>;
345 fn list_ids_in_window(&self, world: &WorldKey, window: &TimeWindow) -> Result<Vec<AtomId>>;
347 fn delete_atoms(&self, world: &WorldKey, ids: &[AtomId]) -> Result<usize>;
349 fn worlds(&self) -> Result<Vec<WorldKey>>;
351}
352
353#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
355pub struct StorageStats {
356 pub atom_count: usize,
357 pub vector_count: usize,
358}
359
360#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
362pub struct TimeWindow {
363 pub start_ms: i64,
364 pub end_ms: i64,
365}
366
367impl TimeWindow {
368 pub fn new(start_ms: i64, end_ms: i64) -> Self {
369 Self { start_ms, end_ms }
370 }
371}
372
373#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
375pub struct SummaryWindow {
376 pub start_ms: i64,
377 pub end_ms: i64,
378}
379
380impl SummaryWindow {
381 pub fn new(start_ms: i64, end_ms: i64) -> Self {
382 Self { start_ms, end_ms }
383 }
384}
385
386#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
388pub struct SummaryAdvert {
389 pub world: WorldKey,
390 pub windows: Vec<SummaryWindow>,
391 pub digest: String,
392}
393
394impl SummaryAdvert {
395 pub fn new(world: WorldKey, windows: Vec<SummaryWindow>, digest: impl Into<String>) -> Self {
396 Self {
397 world,
398 windows,
399 digest: digest.into(),
400 }
401 }
402}
403
404#[derive(Default)]
406pub struct SummaryCatalog {
407 entries: HashMap<WorldKey, Vec<SummaryAdvert>>,
408}
409
410impl SummaryCatalog {
411 pub fn new() -> Self {
412 Self::default()
413 }
414
415 pub fn upsert(&mut self, advert: SummaryAdvert) {
417 let world_entry = self.entries.entry(advert.world.clone()).or_default();
418 if let Some(pos) = world_entry.iter().position(|a| a.digest == advert.digest) {
419 world_entry[pos] = advert;
420 } else {
421 world_entry.push(advert);
422 }
423 }
424
425 pub fn list(&self, world: &WorldKey) -> Vec<SummaryAdvert> {
427 self.entries.get(world).cloned().unwrap_or_else(Vec::new)
428 }
429
430 pub fn worlds(&self) -> Vec<WorldKey> {
432 self.entries.keys().cloned().collect()
433 }
434}
435
436pub trait VectorEngine {
438 fn upsert(&self, world: &WorldKey, atom_id: &AtomId, vector: &[f32]) -> Result<()>;
439 fn search(
440 &self,
441 world: &WorldKey,
442 query: &[f32],
443 k: usize,
444 filter: &AtomFilter,
445 ) -> Result<Vec<AtomId>>;
446 fn rebuild(&self, world: &WorldKey) -> Result<()>;
447}
448
449pub trait StreamEngine {
451 type Handle;
452
453 fn publish(&self, atom: &Atom) -> Result<()>;
454 fn subscribe(&self, world: &WorldKey, filter: AtomFilter) -> Result<Self::Handle>;
455 fn poll(&self, handle: &Self::Handle) -> Result<Option<Atom>>;
456 fn stop(&self, handle: Self::Handle) -> Result<()>;
457}
458
459pub trait Embedder {
461 #[allow(clippy::type_complexity)]
463 #[cfg_attr(feature = "tokio", allow(async_fn_in_trait))]
464 fn embed<'a>(
465 &'a self,
466 payload_json: &'a str,
467 ) -> Pin<Box<dyn Future<Output = Result<Option<Vec<f32>>>> + Send + 'a>>;
468
469 fn model_version(&self) -> &'static str {
471 "unknown"
472 }
473}
474
475pub trait Gatekeeper {
477 fn check_remember(&self, new_atom: &NewAtom) -> Result<()>;
478 fn check_ask(&self, question: &Question) -> Result<()>;
479 fn check_world_action(&self, action: &WorldAction) -> Result<()>;
480}
481
482pub struct DWBaseEngine<S, V, T, G, E> {
484 pub storage: S,
485 pub vector: V,
486 pub stream: T,
487 pub gatekeeper: G,
488 pub embedder: E,
489 pub reflex_index: ReflexIndex,
490 conflict_index: ConflictIndex,
491 suspicions: parking_lot::Mutex<HashMap<WorkerKey, VecDeque<StdInstant>>>,
492 index_state: parking_lot::Mutex<HashMap<WorldKey, IndexMetadata>>,
493 id_gen: AtomicU64,
494}
495
496impl<S, V, T, G, E> DWBaseEngine<S, V, T, G, E> {
497 pub fn new(storage: S, vector: V, stream: T, gatekeeper: G, embedder: E) -> Self {
498 Self {
499 storage,
500 vector,
501 stream,
502 gatekeeper,
503 embedder,
504 reflex_index: ReflexIndex::new(ReflexIndexConfig::default()),
505 conflict_index: ConflictIndex::default(),
506 suspicions: parking_lot::Mutex::new(HashMap::new()),
507 index_state: parking_lot::Mutex::new(HashMap::new()),
508 id_gen: AtomicU64::new(1),
509 }
510 }
511
512 pub fn with_reflex_index(
513 storage: S,
514 vector: V,
515 stream: T,
516 gatekeeper: G,
517 embedder: E,
518 reflex_index: ReflexIndex,
519 ) -> Self {
520 Self {
521 storage,
522 vector,
523 stream,
524 gatekeeper,
525 embedder,
526 reflex_index,
527 conflict_index: ConflictIndex::default(),
528 suspicions: parking_lot::Mutex::new(HashMap::new()),
529 index_state: parking_lot::Mutex::new(HashMap::new()),
530 id_gen: AtomicU64::new(1),
531 }
532 }
533}
534
535impl<S, V, T, G, E> DWBaseEngine<S, V, T, G, E>
536where
537 S: StorageEngine,
538 V: VectorEngine,
539 T: StreamEngine,
540 G: Gatekeeper,
541 E: Embedder,
542{
543 fn new_id(&self) -> AtomId {
544 let id = self.id_gen.fetch_add(1, Ordering::Relaxed);
545 AtomId::new(format!("atom-{}", id))
546 }
547
548 fn register_conflicts(&self, atom: &Atom) {
549 self.conflict_index.register(atom);
550 }
551
552 fn record_suspicion(&self, worker: &WorkerKey, now: StdInstant) -> usize {
553 let mut map = self.suspicions.lock();
554 let entry = map.entry(worker.clone()).or_default();
555 entry.push_back(now);
556 while let Some(front) = entry.front().cloned() {
558 if now.duration_since(front) > std::time::Duration::from_secs(1) {
559 entry.pop_front();
560 } else {
561 break;
562 }
563 }
564 entry.len()
565 }
566
567 fn is_impossible_payload(payload: &str) -> bool {
568 let lower = payload.to_ascii_lowercase();
569 lower.contains("nan") || lower.contains("infinity") || lower.contains("impossible")
570 }
571
572 fn low_trust(worker: &WorkerKey) -> bool {
573 worker.0.to_ascii_lowercase().starts_with("lowtrust")
574 }
575
576 fn should_quarantine(&self, atom: &NewAtom, now: StdInstant) -> bool {
577 let spam = self.record_suspicion(&atom.worker, now) > 5;
578 let impossible = Self::is_impossible_payload(&atom.payload_json);
579 let low_trust = Self::low_trust(&atom.worker) && atom.importance.get() > 0.7;
580 spam || impossible || low_trust
581 }
582
583 fn trust_hint(&self, atom: &Atom) -> f32 {
584 let mut hasher = std::collections::hash_map::DefaultHasher::new();
586 atom.worker().0.hash(&mut hasher);
587 let v = hasher.finish();
588 (v % 10_000) as f32 / 10_000.0
589 }
590
591 fn rank_score(&self, atom: &Atom) -> f32 {
592 let mut score = atom.importance().get();
593 if self.conflict_index.superseded_by(atom.id()).is_some() {
594 score -= 1.0;
595 }
596 let contradictions = self.conflict_index.contradiction_count(atom.id());
597 if contradictions > 0 {
598 score -= 0.3 * contradictions as f32;
599 }
600 let confirms = self.conflict_index.confirmation_count(atom.id());
601 if confirms > 0 {
602 score += 0.2 * confirms as f32;
603 }
604 score
605 }
606
607 fn parse_policy(&self, atoms: &[Atom], world: &WorldKey) -> GcPolicy {
608 let mut policy = GcPolicy {
610 retention_days: Some(365),
611 ..Default::default()
612 };
613 let mut retention_min_days: Option<u64> = None;
614
615 let mut policy_atoms: Vec<Atom> = atoms
616 .iter()
617 .filter(|a| a.world() == world)
618 .cloned()
619 .collect();
620
621 let policy_world = WorldKey::new(format!("policy:{}", world.0));
625 if let Ok(mut extra) = self.storage.scan(&policy_world, &AtomFilter::default()) {
626 policy_atoms.append(&mut extra);
627 }
628 if let Some(tenant_world) = world.0.strip_prefix("tenant:").and_then(|rest| {
629 rest.split_once('/')
630 .map(|(tenant, _)| WorldKey::new(format!("tenant:{tenant}/policy")))
631 }) {
632 if let Ok(mut extra) = self.storage.scan(&tenant_world, &AtomFilter::default()) {
633 policy_atoms.append(&mut extra);
634 }
635 }
636
637 for atom in policy_atoms {
638 for label in atom.labels() {
639 if let Some(val) = label.strip_prefix("policy:retention_days=") {
640 if let Ok(days) = val.parse::<u64>() {
641 policy.retention_days = Some(days);
642 }
643 }
644 if let Some(val) = label.strip_prefix("policy:retention_min_days=") {
645 if let Ok(days) = val.parse::<u64>() {
646 retention_min_days = Some(days);
647 }
648 }
649 if let Some(val) = label.strip_prefix("policy:min_importance=") {
650 if let Ok(v) = val.parse::<f32>() {
651 policy.min_importance = Some(v);
652 }
653 }
654 if let Some(val) = label.strip_prefix("policy:replication_allow=") {
655 if !val.is_empty() {
656 policy.replication_allow.push(val.to_string());
657 }
658 }
659 if let Some(val) = label.strip_prefix("policy:replication_deny=") {
660 if !val.is_empty() {
661 policy.replication_deny.push(val.to_string());
662 }
663 }
664 }
665 }
666
667 if let Some(min_days) = retention_min_days {
668 match policy.retention_days {
669 Some(current) if current < min_days => policy.retention_days = Some(min_days),
670 None => policy.retention_days = Some(min_days),
671 _ => {}
672 }
673 }
674 policy
675 }
676
677 fn is_policy_atom(atom: &Atom) -> bool {
678 atom.labels()
679 .iter()
680 .any(|l| l.starts_with("policy:") || l == "world_meta")
681 }
682
683 fn world_status(&self, world: &WorldKey) -> Result<Option<String>> {
684 let atoms = self
685 .storage
686 .scan(
687 world,
688 &AtomFilter {
689 world: Some(world.clone()),
690 kinds: Vec::new(),
691 labels: vec!["world_meta".into()],
692 flags: Vec::new(),
693 since: None,
694 until: None,
695 limit: None,
696 },
697 )
698 .unwrap_or_default();
699 let mut latest: Option<(OffsetDateTime, String)> = None;
700 for atom in atoms {
701 let ts = parse_ts(atom.timestamp()).unwrap_or(OffsetDateTime::UNIX_EPOCH);
702 let status = atom
703 .labels()
704 .iter()
705 .find_map(|l| l.strip_prefix("world_status:"))
706 .map(|s| s.to_string());
707 if let Some(status) = status {
708 if latest.as_ref().map(|(t, _)| *t < ts).unwrap_or(true) {
709 latest = Some((ts, status));
710 }
711 }
712 }
713 Ok(latest.map(|(_, s)| s))
714 }
715
716 pub fn world_archived(&self, world: &WorldKey) -> Result<bool> {
717 Ok(matches!(
718 self.world_status(world)?,
719 Some(status) if status == "archived"
720 ))
721 }
722
723 pub fn worlds(&self) -> Result<Vec<WorldKey>> {
725 self.worlds_filtered(false)
726 }
727
728 pub fn worlds_filtered(&self, include_archived: bool) -> Result<Vec<WorldKey>> {
730 let mut worlds = self.storage.worlds()?;
731 worlds.sort_by(|a, b| a.0.cmp(&b.0));
732 if include_archived {
733 return Ok(worlds);
734 }
735 let mut filtered = Vec::new();
736 for world in worlds {
737 if self.world_archived(&world)? {
738 continue;
739 }
740 filtered.push(world);
741 }
742 Ok(filtered)
743 }
744
745 fn now_timestamp(&self) -> Timestamp {
746 Timestamp(
747 OffsetDateTime::now_utc()
748 .format(&Rfc3339)
749 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".into()),
750 )
751 }
752
753 pub fn index_status(&self) -> Vec<IndexMetadata> {
754 self.index_state.lock().values().cloned().collect()
755 }
756
757 fn ensure_index_entry(&self, world: &WorldKey) {
758 let mut guard = self.index_state.lock();
759 guard.entry(world.clone()).or_insert_with(|| IndexMetadata {
760 world: world.clone(),
761 version: 1,
762 embedder_version: self.embedder.model_version().into(),
763 last_rebuilt: self.now_timestamp(),
764 ready: true,
765 rebuilding: false,
766 progress: 1.0,
767 started_at: None,
768 last_progress: None,
769 });
770 }
771
772 fn maybe_rebuild_index(&self, world: &WorldKey) {
773 self.ensure_index_entry(world);
774 let mut guard = self.index_state.lock();
775 if let Some(meta) = guard.get_mut(world) {
776 if meta.embedder_version != self.embedder.model_version() {
777 meta.ready = false;
778 meta.rebuilding = true;
779 meta.progress = 0.0;
780 let now = self.now_timestamp();
781 meta.started_at = Some(now.clone());
782 meta.last_progress = Some(now.clone());
783 let _ = self.vector.rebuild(world);
784 meta.version += 1;
785 meta.embedder_version = self.embedder.model_version().into();
786 meta.last_rebuilt = self.now_timestamp();
787 meta.ready = true;
788 meta.rebuilding = false;
789 meta.progress = 1.0;
790 meta.last_progress = Some(meta.last_rebuilt.clone());
791 }
792 }
793 }
794
795 fn atom_from_new(&self, id: AtomId, mut new_atom: NewAtom) -> Atom {
796 let mut builder = Atom::builder(
797 id,
798 new_atom.world,
799 new_atom.worker,
800 new_atom.kind,
801 new_atom.timestamp,
802 new_atom.importance,
803 new_atom.payload_json,
804 );
805 builder = builder.vector(new_atom.vector);
806 for flag in new_atom.flags.drain(..) {
807 builder = builder.add_flag(flag);
808 }
809 for label in new_atom.labels.drain(..) {
810 builder = builder.add_label(label);
811 }
812 for link in new_atom.links.drain(..) {
813 builder = builder.add_typed_link(link.target, link.kind);
814 }
815 builder.build()
816 }
817
818 pub async fn remember(&self, mut new_atom: NewAtom) -> Result<AtomId> {
820 #[cfg(feature = "metrics")]
821 let start = StdInstant::now();
822
823 self.gatekeeper.check_remember(&new_atom)?;
824 let now = StdInstant::now();
825
826 let id = self.new_id();
828 if new_atom.timestamp.0.is_empty() {
829 new_atom.timestamp = self.now_timestamp();
830 }
831
832 self.ensure_index_entry(&new_atom.world);
833 self.maybe_rebuild_index(&new_atom.world);
834
835 if self.should_quarantine(&new_atom, now) {
837 new_atom.world = WorldKey::new(format!("quarantine:{}", new_atom.world.0));
838 if !new_atom.flags.contains(&"suspect".to_string()) {
839 new_atom.flags.push("suspect".into());
840 }
841 if !new_atom.labels.contains(&"quarantine".to_string()) {
842 new_atom.labels.push("quarantine".into());
843 }
844 }
845
846 let embedded_vector = self.embedder.embed(&new_atom.payload_json).await?;
848 if new_atom.vector.is_none() {
849 new_atom.vector = embedded_vector;
850 }
851
852 let atom = self.atom_from_new(id.clone(), new_atom);
853
854 #[cfg(feature = "metrics")]
855 let freshness = parse_ts(atom.timestamp()).ok().and_then(|ts| {
856 let age = OffsetDateTime::now_utc() - ts;
857 (!age.is_negative()).then_some(age)
858 });
859
860 if let Some(vec) = atom.vector() {
861 self.vector.upsert(atom.world(), atom.id(), vec)?;
862 }
863 self.storage.append(atom.clone())?;
864 self.reflex_index.insert(atom.clone())?;
865 self.register_conflicts(&atom);
866 self.stream.publish(&atom)?;
867
868 #[cfg(feature = "metrics")]
869 {
870 dwbase_metrics::record_remember_latency(start.elapsed());
871 if let Some(age) = freshness {
872 let age_std = std::time::Duration::from_secs_f64(age.as_seconds_f64());
873 dwbase_metrics::record_index_freshness(age_std);
874 }
875 }
876
877 Ok(id)
878 }
879
880 pub async fn ask(&self, question: Question) -> Result<Answer> {
882 #[cfg(feature = "metrics")]
883 let start = StdInstant::now();
884
885 let reflex_filter = ReflexFilter::from_question(&question);
886 self.gatekeeper.check_ask(&question)?;
887
888 let mut candidates = self.reflex_index.filter(&reflex_filter)?;
889 let mut warnings = Vec::new();
890 self.ensure_index_entry(&question.world);
891 self.maybe_rebuild_index(&question.world);
892 let ready_meta = self
893 .index_state
894 .lock()
895 .get(&question.world)
896 .cloned()
897 .unwrap_or_else(|| IndexMetadata {
898 world: question.world.clone(),
899 version: 1,
900 embedder_version: self.embedder.model_version().into(),
901 last_rebuilt: self.now_timestamp(),
902 ready: true,
903 rebuilding: false,
904 progress: 1.0,
905 started_at: None,
906 last_progress: None,
907 });
908 let ready = ready_meta.ready;
909
910 if candidates.is_empty() {
912 let mut filter = question.filter.clone().unwrap_or_default();
913 filter.world = Some(question.world.clone());
914 candidates = self.storage.scan(&question.world, &filter)?;
915 }
916
917 let querying_quarantine = question.world.0.starts_with("quarantine:");
919 if !querying_quarantine {
920 candidates.retain(|a| !a.world().0.starts_with("quarantine:"));
921 }
922
923 if ready {
925 if let Ok(Some(query_vec)) = self.embedder.embed(&question.text).await {
926 let filter = question.filter.clone().unwrap_or_default();
927 let ids = self
928 .vector
929 .search(&question.world, &query_vec, 10, &filter)?;
930 if !ids.is_empty() {
931 let fetched = self.storage.get_by_ids(&ids)?;
932 candidates.extend(fetched);
933 }
934 }
935 } else {
936 warnings.push("index not ready; used fallback search".into());
937 if ready_meta.rebuilding {
938 warnings.push("index rebuilding in background".into());
939 }
940 }
941
942 if candidates.is_empty() {
944 let filter = question.filter.clone().unwrap_or_default();
945 let mut storage_scan = self.storage.scan(&question.world, &filter)?;
946 storage_scan.truncate(20);
947 candidates.extend(storage_scan);
948 }
949
950 let mut seen = std::collections::HashSet::new();
952 candidates.retain(|a| seen.insert(a.id().clone()));
953
954 candidates.sort_by(|a, b| {
956 let score_a = self.rank_score(a);
957 let score_b = self.rank_score(b);
958 score_b
959 .partial_cmp(&score_a)
960 .unwrap_or(std::cmp::Ordering::Equal)
961 .then_with(|| {
962 let ts_a = parse_ts(a.timestamp()).unwrap_or(OffsetDateTime::UNIX_EPOCH);
963 let ts_b = parse_ts(b.timestamp()).unwrap_or(OffsetDateTime::UNIX_EPOCH);
964 ts_b.cmp(&ts_a)
965 })
966 .then_with(|| {
967 b.importance()
968 .get()
969 .partial_cmp(&a.importance().get())
970 .unwrap_or(std::cmp::Ordering::Equal)
971 })
972 .then_with(|| {
973 self.trust_hint(b)
974 .partial_cmp(&self.trust_hint(a))
975 .unwrap_or(std::cmp::Ordering::Equal)
976 })
977 });
978
979 let answer = Answer {
980 world: question.world,
981 text: "answer-pending".into(),
982 supporting_atoms: candidates,
983 warnings,
984 };
985 #[cfg(feature = "metrics")]
986 dwbase_metrics::record_ask_latency(start.elapsed());
987 Ok(answer)
988 }
989
990 pub fn list_ids_in_window(&self, world: &WorldKey, window: &TimeWindow) -> Result<Vec<AtomId>> {
992 self.storage.list_ids_in_window(world, window)
993 }
994
995 pub fn storage_ready(&self) -> bool {
997 self.storage.worlds().is_ok()
998 }
999
1000 pub fn max_index_rebuild_lag_ms(&self) -> Option<u64> {
1002 let now = OffsetDateTime::now_utc();
1003 let guard = self.index_state.lock();
1004 let mut max_lag = None;
1005 for meta in guard.values() {
1006 if meta.ready && !meta.rebuilding {
1007 continue;
1008 }
1009 let ts = meta
1010 .last_progress
1011 .as_ref()
1012 .or(meta.started_at.as_ref())
1013 .unwrap_or(&meta.last_rebuilt);
1014 if let Ok(t) = parse_ts(ts) {
1015 let lag = now - t;
1016 let ms = lag.whole_milliseconds().max(0) as u64;
1017 if max_lag.map(|m| ms > m).unwrap_or(true) {
1018 max_lag = Some(ms);
1019 }
1020 }
1021 }
1022 max_lag
1023 }
1024
1025 pub fn get_atoms(&self, ids: &[AtomId]) -> Result<Vec<Atom>> {
1027 self.storage.get_by_ids(ids)
1028 }
1029
1030 pub async fn ingest_remote_atoms(&self, atoms: Vec<Atom>) -> Result<Vec<AtomId>> {
1032 let mut ingested = Vec::new();
1033 for atom in atoms {
1034 let id = atom.id().clone();
1035 if !self
1036 .storage
1037 .get_by_ids(std::slice::from_ref(&id))?
1038 .is_empty()
1039 {
1040 continue;
1041 }
1042 if let Some(vec) = atom.vector() {
1043 self.vector.upsert(atom.world(), atom.id(), vec)?;
1044 }
1045 self.storage.append(atom.clone())?;
1046 self.reflex_index.insert(atom.clone())?;
1047 self.register_conflicts(&atom);
1048 self.stream.publish(&atom)?;
1049 ingested.push(id);
1050 }
1051 Ok(ingested)
1052 }
1053
1054 pub fn gc_once(&self, _max_disk_mb: Option<u64>) -> Result<usize> {
1056 let mut evicted = 0usize;
1057 let worlds = self.storage.worlds()?;
1058 for world in worlds {
1059 let mut atoms = self.storage.scan(&world, &AtomFilter::default())?;
1060 let policy = self.parse_policy(&atoms, &world);
1061 let referenced: std::collections::HashSet<_> = atoms
1062 .iter()
1063 .flat_map(|a| a.links().iter().map(|l| l.target.clone()))
1064 .collect();
1065
1066 atoms.sort_by_key(|a| parse_ts(a.timestamp()).unwrap_or(OffsetDateTime::UNIX_EPOCH));
1067 let now = atoms
1068 .iter()
1069 .filter_map(|a| parse_ts(a.timestamp()).ok())
1070 .max()
1071 .unwrap_or_else(OffsetDateTime::now_utc);
1072 let mut to_delete = Vec::new();
1073
1074 for atom in &atoms {
1075 if Self::is_policy_atom(atom) {
1076 continue;
1077 }
1078 if referenced.contains(atom.id()) {
1079 continue;
1080 }
1081 if atom.labels().iter().any(|l| l.starts_with("world_meta")) {
1082 continue;
1083 }
1084 let expired = policy
1085 .retention_days
1086 .filter(|d| *d > 0)
1087 .and_then(|days| {
1088 parse_ts(atom.timestamp())
1089 .ok()
1090 .map(|ts| now - ts > time::Duration::days(days as i64))
1091 })
1092 .unwrap_or(false);
1093 let low_importance = policy
1094 .min_importance
1095 .map(|min| atom.importance().get() < min)
1096 .unwrap_or(false);
1097 if expired || low_importance {
1098 to_delete.push(atom.id().clone());
1099 continue;
1100 }
1101 }
1102 if !to_delete.is_empty() {
1103 evicted += self.storage.delete_atoms(&world, &to_delete)?;
1104 }
1105 }
1106 #[cfg(feature = "metrics")]
1107 {
1108 dwbase_metrics::record_gc_evictions(evicted as u64);
1109 }
1110 Ok(evicted)
1111 }
1112
1113 pub async fn observe(&self, atom: Atom) -> Result<()> {
1115 self.stream.publish(&atom)?;
1116 Ok(())
1117 }
1118
1119 pub async fn replay(&self, world: WorldKey, filter: AtomFilter) -> Result<Vec<Atom>> {
1121 self.storage.scan(&world, &filter)
1122 }
1123
1124 pub async fn manage_world(&self, action: WorldAction) -> Result<()> {
1126 self.gatekeeper.check_world_action(&action)?;
1127
1128 let (world, next_status, description, mut extra_labels) = match action.clone() {
1129 WorldAction::Create(meta) => (
1130 meta.world,
1131 "active".to_string(),
1132 meta.description,
1133 meta.labels,
1134 ),
1135 WorldAction::Archive(world) => (world, "archived".to_string(), None, Vec::new()),
1136 WorldAction::Resume(world) => (world, "active".to_string(), None, Vec::new()),
1137 };
1138
1139 if let Some(current) = self.world_status(&world)? {
1141 if current == next_status && description.is_none() && extra_labels.is_empty() {
1142 return Ok(());
1143 }
1144 }
1145
1146 let now = self.now_timestamp();
1147 let mut payload = json!({
1148 "action": next_status,
1149 "world": world.0,
1150 "at": now.0,
1151 });
1152 if let Some(d) = &description {
1153 payload["description"] = json!(d);
1154 }
1155 if !extra_labels.is_empty() {
1156 payload["labels"] = json!(extra_labels);
1157 }
1158
1159 let mut builder = Atom::builder(
1160 self.new_id(),
1161 world.clone(),
1162 WorkerKey::new("system"),
1163 AtomKind::Reflection,
1164 now,
1165 Importance::clamped(0.1),
1166 payload.to_string(),
1167 )
1168 .add_label("world_meta".to_string())
1169 .add_label(format!("world_status:{next_status}"));
1170
1171 for l in extra_labels.drain(..) {
1172 builder = builder.add_label(l);
1173 }
1174
1175 let atom = builder.build();
1176
1177 if let Some(vec) = atom.vector() {
1178 self.vector.upsert(atom.world(), atom.id(), vec)?;
1179 }
1180 self.storage.append(atom.clone())?;
1181 self.reflex_index.insert(atom.clone())?;
1182 self.register_conflicts(&atom);
1183 self.stream.publish(&atom)?;
1184 Ok(())
1185 }
1186}
1187
1188fn parse_ts(ts: &Timestamp) -> Result<OffsetDateTime> {
1189 OffsetDateTime::parse(&ts.0, &Rfc3339)
1190 .map_err(|e| DwbaseError::InvalidInput(format!("invalid timestamp {}: {}", ts.0, e)))
1191}
1192
1193#[cfg(test)]
1194mod tests {
1195 use super::*;
1196 use dwbase_core::{Link, LinkKind};
1197 use std::collections::HashMap;
1198 use std::sync::{
1199 atomic::{AtomicUsize, Ordering},
1200 Mutex,
1201 };
1202
1203 #[derive(Clone)]
1204 struct AllowGatekeeper;
1205
1206 impl Gatekeeper for AllowGatekeeper {
1207 fn check_remember(&self, _new_atom: &NewAtom) -> Result<()> {
1208 Ok(())
1209 }
1210
1211 fn check_ask(&self, _question: &Question) -> Result<()> {
1212 Ok(())
1213 }
1214
1215 fn check_world_action(&self, _action: &WorldAction) -> Result<()> {
1216 Ok(())
1217 }
1218 }
1219
1220 struct DummyEmbedder;
1221
1222 impl Embedder for DummyEmbedder {
1223 #[allow(clippy::type_complexity)]
1224 fn embed<'a>(
1225 &'a self,
1226 payload_json: &'a str,
1227 ) -> Pin<Box<dyn Future<Output = Result<Option<Vec<f32>>>> + Send + 'a>> {
1228 Box::pin(async move {
1229 let parts: Vec<f32> = payload_json
1230 .split(',')
1231 .filter_map(|p| p.trim().parse::<f32>().ok())
1232 .collect();
1233 if parts.is_empty() {
1234 Ok(None)
1235 } else {
1236 Ok(Some(parts))
1237 }
1238 })
1239 }
1240
1241 fn model_version(&self) -> &'static str {
1242 "dummy-test"
1243 }
1244 }
1245
1246 #[derive(Default)]
1247 struct MemStorage {
1248 atoms: Mutex<Vec<Atom>>,
1249 }
1250
1251 impl StorageEngine for MemStorage {
1252 fn append(&self, atom: Atom) -> Result<()> {
1253 let mut atoms = self.atoms.lock().unwrap();
1254 atoms.push(atom);
1255 Ok(())
1256 }
1257
1258 fn get_by_ids(&self, ids: &[AtomId]) -> Result<Vec<Atom>> {
1259 let atoms = self.atoms.lock().unwrap();
1260 Ok(atoms
1261 .iter()
1262 .filter(|a| ids.contains(a.id()))
1263 .cloned()
1264 .collect())
1265 }
1266
1267 fn scan(&self, world: &WorldKey, filter: &AtomFilter) -> Result<Vec<Atom>> {
1268 let atoms = self.atoms.lock().unwrap();
1269 let mut results = Vec::new();
1270 for atom in atoms.iter() {
1271 if atom.world() != world {
1272 continue;
1273 }
1274 if !filter.kinds.is_empty() && !filter.kinds.contains(atom.kind()) {
1275 continue;
1276 }
1277 if !filter.labels.is_empty()
1278 && !filter.labels.iter().all(|l| atom.labels().contains(l))
1279 {
1280 continue;
1281 }
1282 if !filter.flags.is_empty()
1283 && !filter.flags.iter().all(|f| atom.flags().contains(f))
1284 {
1285 continue;
1286 }
1287 results.push(atom.clone());
1288 }
1289 Ok(results)
1290 }
1291
1292 fn stats(&self, _world: &WorldKey) -> Result<StorageStats> {
1293 let atoms = self.atoms.lock().unwrap();
1294 Ok(StorageStats {
1295 atom_count: atoms.len(),
1296 vector_count: 0,
1297 })
1298 }
1299
1300 fn list_ids_in_window(&self, world: &WorldKey, window: &TimeWindow) -> Result<Vec<AtomId>> {
1301 let atoms = self.atoms.lock().unwrap();
1302 Ok(atoms
1303 .iter()
1304 .filter(|a| a.world() == world)
1305 .filter(|a| {
1306 let ts = Timestamp::new(a.timestamp().0.clone());
1307 if let Ok(dt) = OffsetDateTime::parse(ts.0.as_str(), &Rfc3339) {
1308 let ms = dt.unix_timestamp_nanos() / 1_000_000;
1309 let start = window.start_ms as i128;
1310 let end = window.end_ms as i128;
1311 ms >= start && ms <= end
1312 } else {
1313 false
1314 }
1315 })
1316 .map(|a| a.id().clone())
1317 .collect())
1318 }
1319
1320 fn delete_atoms(&self, world: &WorldKey, ids: &[AtomId]) -> Result<usize> {
1321 let mut atoms = self.atoms.lock().unwrap();
1322 let before = atoms.len();
1323 atoms.retain(|a| !(a.world() == world && ids.contains(a.id())));
1324 Ok(before.saturating_sub(atoms.len()))
1325 }
1326
1327 fn worlds(&self) -> Result<Vec<WorldKey>> {
1328 let atoms = self.atoms.lock().unwrap();
1329 Ok(atoms
1330 .iter()
1331 .map(|a| a.world().clone())
1332 .collect::<std::collections::HashSet<_>>()
1333 .into_iter()
1334 .collect())
1335 }
1336 }
1337
1338 #[derive(Default)]
1339 struct MemVector {
1340 dims: Mutex<Option<usize>>,
1341 data: Mutex<HashMap<AtomId, Vec<f32>>>,
1342 }
1343
1344 impl VectorEngine for MemVector {
1345 fn upsert(&self, _world: &WorldKey, atom_id: &AtomId, vector: &[f32]) -> Result<()> {
1346 let mut data = self.data.lock().unwrap();
1347 data.insert(atom_id.clone(), vector.to_vec());
1348 let mut dims = self.dims.lock().unwrap();
1349 dims.get_or_insert(vector.len());
1350 Ok(())
1351 }
1352
1353 fn search(
1354 &self,
1355 _world: &WorldKey,
1356 query: &[f32],
1357 k: usize,
1358 _filter: &AtomFilter,
1359 ) -> Result<Vec<AtomId>> {
1360 let data = self.data.lock().unwrap();
1361 let mut pairs: Vec<(AtomId, f32)> = data
1362 .iter()
1363 .map(|(id, v)| {
1364 let dist = v
1365 .iter()
1366 .zip(query.iter())
1367 .map(|(a, b)| (a - b) * (a - b))
1368 .sum();
1369 (id.clone(), dist)
1370 })
1371 .collect();
1372 pairs.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap());
1373 Ok(pairs.into_iter().take(k).map(|(id, _)| id).collect())
1374 }
1375
1376 fn rebuild(&self, _world: &WorldKey) -> Result<()> {
1377 Ok(())
1378 }
1379 }
1380
1381 #[derive(Default)]
1382 struct MemStream {
1383 next: AtomicUsize,
1384 subscribers: Mutex<HashMap<usize, (AtomFilter, Vec<Atom>)>>,
1385 }
1386
1387 impl StreamEngine for MemStream {
1388 type Handle = usize;
1389
1390 fn publish(&self, atom: &Atom) -> Result<()> {
1391 let mut subs = self.subscribers.lock().unwrap();
1392 for (_id, (filter, queue)) in subs.iter_mut() {
1393 let mut include = true;
1394 if let Some(world) = &filter.world {
1395 include &= atom.world() == world;
1396 }
1397 if !filter.kinds.is_empty() && !filter.kinds.contains(atom.kind()) {
1398 include = false;
1399 }
1400 if include {
1401 queue.push(atom.clone());
1402 }
1403 }
1404 Ok(())
1405 }
1406
1407 fn subscribe(&self, _world: &WorldKey, filter: AtomFilter) -> Result<Self::Handle> {
1408 let id = self.next.fetch_add(1, Ordering::Relaxed);
1409 let mut subs = self.subscribers.lock().unwrap();
1410 subs.insert(id, (filter, Vec::new()));
1411 Ok(id)
1412 }
1413
1414 fn poll(&self, handle: &Self::Handle) -> Result<Option<Atom>> {
1415 let mut subs = self.subscribers.lock().unwrap();
1416 if let Some((_, queue)) = subs.get_mut(handle) {
1417 if queue.is_empty() {
1418 Ok(None)
1419 } else {
1420 Ok(Some(queue.remove(0)))
1421 }
1422 } else {
1423 Err(DwbaseError::Stream(format!("unknown handle {handle}")))
1424 }
1425 }
1426
1427 fn stop(&self, handle: Self::Handle) -> Result<()> {
1428 let mut subs = self.subscribers.lock().unwrap();
1429 subs.remove(&handle);
1430 Ok(())
1431 }
1432 }
1433
1434 fn engine_components(
1435 ) -> DWBaseEngine<MemStorage, MemVector, MemStream, AllowGatekeeper, DummyEmbedder> {
1436 let storage = MemStorage::default();
1437 let vector = MemVector::default();
1438 let stream = MemStream::default();
1439 let gatekeeper = AllowGatekeeper;
1440 DWBaseEngine::new(storage, vector, stream, gatekeeper, DummyEmbedder)
1441 }
1442
1443 fn new_atom_with(ts: &str, payload: &str, label: &str) -> NewAtom {
1444 NewAtom {
1445 world: WorldKey::new("w1"),
1446 worker: WorkerKey::new("worker-1"),
1447 kind: AtomKind::Observation,
1448 timestamp: Timestamp::new(ts),
1449 importance: Importance::new(0.5).unwrap(),
1450 payload_json: payload.into(),
1451 vector: None,
1452 flags: vec![],
1453 labels: vec![label.into()],
1454 links: vec![],
1455 }
1456 }
1457
1458 #[tokio::test(flavor = "current_thread")]
1459 async fn remember_and_ask_returns_ranked_atoms() {
1460 let engine = engine_components();
1461 let a1 = new_atom_with("2024-01-01T00:00:01Z", "0,0", "a1");
1462 let a2 = new_atom_with("2024-01-01T00:00:00Z", "10,10", "a2");
1463
1464 let id1 = engine.remember(a1).await.expect("remember a1");
1465 let id2 = engine.remember(a2).await.expect("remember a2");
1466
1467 assert_ne!(id1, id2);
1468
1469 let question = Question {
1470 world: WorldKey::new("w1"),
1471 text: "0,0".into(),
1472 filter: None,
1473 };
1474
1475 let answer = engine.ask(question).await.expect("ask");
1476 let ids: Vec<_> = answer
1477 .supporting_atoms
1478 .iter()
1479 .map(|a| a.id().clone())
1480 .collect();
1481 assert_eq!(ids.first().unwrap(), &id1);
1482 assert!(ids.contains(&id2));
1483 }
1484
1485 #[tokio::test(flavor = "current_thread")]
1486 async fn superseded_atoms_downranked() {
1487 let engine = engine_components();
1488 let base = new_atom_with("2024-01-01T00:00:00Z", "p", "base");
1489 let base_id = engine.remember(base).await.expect("base");
1490
1491 let newer = NewAtom {
1492 world: WorldKey::new("w1"),
1493 worker: WorkerKey::new("worker-1"),
1494 kind: AtomKind::Observation,
1495 timestamp: Timestamp::new("2024-01-01T00:00:05Z"),
1496 importance: Importance::new(0.5).unwrap(),
1497 payload_json: "p-new".into(),
1498 vector: None,
1499 flags: vec![],
1500 labels: vec![],
1501 links: vec![Link::new(base_id.clone(), LinkKind::Supersedes)],
1502 };
1503 let newer_id = engine.remember(newer).await.expect("newer");
1504
1505 let question = Question {
1506 world: WorldKey::new("w1"),
1507 text: "p".into(),
1508 filter: None,
1509 };
1510
1511 let answer = engine.ask(question).await.expect("ask");
1512 let ids: Vec<_> = answer
1513 .supporting_atoms
1514 .iter()
1515 .map(|a| a.id().clone())
1516 .collect();
1517 assert_eq!(ids.first(), Some(&newer_id));
1518 assert!(ids.contains(&base_id));
1519 }
1520
1521 #[tokio::test(flavor = "current_thread")]
1522 async fn gc_retains_linked_and_policy_atoms() {
1523 let engine = engine_components();
1524 let base = new_atom_with("2024-01-01T00:00:00Z", "p", "keep");
1525 let base_id = engine.remember(base).await.unwrap();
1526 let linker = NewAtom {
1527 world: WorldKey::new("w1"),
1528 worker: WorkerKey::new("w1-worker"),
1529 kind: AtomKind::Observation,
1530 timestamp: Timestamp::new("2024-01-02T00:00:00Z"),
1531 importance: Importance::new(0.5).unwrap(),
1532 payload_json: "linker".into(),
1533 vector: None,
1534 flags: vec![],
1535 labels: vec![],
1536 links: vec![Link::new(base_id.clone(), LinkKind::References)],
1537 };
1538 engine.remember(linker).await.unwrap();
1539
1540 let policy = NewAtom {
1542 world: WorldKey::new("w1"),
1543 worker: WorkerKey::new("w1-worker"),
1544 kind: AtomKind::Reflection,
1545 timestamp: Timestamp::new("2024-01-03T00:00:00Z"),
1546 importance: Importance::new(0.5).unwrap(),
1547 payload_json: "policy".into(),
1548 vector: None,
1549 flags: vec![],
1550 labels: vec!["policy:retention_days=0".into()],
1551 links: vec![],
1552 };
1553 engine.remember(policy).await.unwrap();
1554
1555 let evicted = engine.gc_once(None).unwrap();
1557 assert_eq!(evicted, 0);
1558 let replayed = engine
1559 .replay(WorldKey::new("w1"), AtomFilter::default())
1560 .await
1561 .unwrap();
1562 assert_eq!(replayed.len(), 3);
1563 }
1564
1565 #[tokio::test(flavor = "current_thread")]
1566 async fn gc_applies_retention_and_min_importance() {
1567 let engine = engine_components();
1568 let old_low = NewAtom {
1569 world: WorldKey::new("w1"),
1570 worker: WorkerKey::new("w1-worker"),
1571 kind: AtomKind::Observation,
1572 timestamp: Timestamp::new("2020-01-01T00:00:00Z"),
1573 importance: Importance::new(0.1).unwrap(),
1574 payload_json: "old".into(),
1575 vector: None,
1576 flags: vec![],
1577 labels: vec![],
1578 links: vec![],
1579 };
1580 let recent = new_atom_with("2024-01-01T00:00:00Z", "new", "recent");
1581 engine.remember(old_low).await.unwrap();
1582 engine.remember(recent).await.unwrap();
1583 let policy = NewAtom {
1584 world: WorldKey::new("w1"),
1585 worker: WorkerKey::new("w1-worker"),
1586 kind: AtomKind::Reflection,
1587 timestamp: Timestamp::new("2024-01-02T00:00:00Z"),
1588 importance: Importance::new(0.5).unwrap(),
1589 payload_json: "policy".into(),
1590 vector: None,
1591 flags: vec![],
1592 labels: vec![
1593 "policy:retention_days=365".into(),
1594 "policy:min_importance=0.2".into(),
1595 ],
1596 links: vec![],
1597 };
1598 engine.remember(policy).await.unwrap();
1599
1600 let evicted = engine.gc_once(None).unwrap();
1601 assert_eq!(evicted, 1);
1602 let remaining = engine
1603 .replay(WorldKey::new("w1"), AtomFilter::default())
1604 .await
1605 .unwrap();
1606 assert_eq!(remaining.len(), 2);
1607 }
1608
1609 #[tokio::test(flavor = "current_thread")]
1610 async fn contradicted_atoms_downranked_but_present() {
1611 let engine = engine_components();
1612 let base = new_atom_with("2024-01-01T00:00:00Z", "p", "base");
1613 let base_id = engine.remember(base).await.expect("base");
1614
1615 let contradictor = NewAtom {
1616 world: WorldKey::new("w1"),
1617 worker: WorkerKey::new("worker-2"),
1618 kind: AtomKind::Observation,
1619 timestamp: Timestamp::new("2024-01-01T00:00:10Z"),
1620 importance: Importance::new(0.5).unwrap(),
1621 payload_json: "p-alt".into(),
1622 vector: None,
1623 flags: vec![],
1624 labels: vec![],
1625 links: vec![Link::new(base_id.clone(), LinkKind::Contradicts)],
1626 };
1627 let contra_id = engine.remember(contradictor).await.expect("contradictor");
1628
1629 let question = Question {
1630 world: WorldKey::new("w1"),
1631 text: "p".into(),
1632 filter: None,
1633 };
1634 let answer = engine.ask(question).await.expect("ask");
1635 let ids: Vec<_> = answer
1636 .supporting_atoms
1637 .iter()
1638 .map(|a| a.id().clone())
1639 .collect();
1640 assert_eq!(ids.first(), Some(&contra_id));
1641 assert!(ids.contains(&base_id));
1642 }
1643
1644 #[tokio::test(flavor = "current_thread")]
1645 async fn suspicious_atoms_are_quarantined_and_excluded() {
1646 let engine = engine_components();
1647 let suspicious = NewAtom {
1648 world: WorldKey::new("wq"),
1649 worker: WorkerKey::new("lowtrust-bot"),
1650 kind: AtomKind::Observation,
1651 timestamp: Timestamp::new(""),
1652 importance: Importance::new(0.9).unwrap(),
1653 payload_json: "nan value".into(),
1654 vector: None,
1655 flags: vec![],
1656 labels: vec![],
1657 links: vec![],
1658 };
1659 let _ = engine.remember(suspicious).await.expect("quarantine");
1660
1661 let q = Question {
1662 world: WorldKey::new("wq"),
1663 text: "nan".into(),
1664 filter: None,
1665 };
1666 let answer = engine.ask(q).await.expect("ask");
1667 assert!(
1668 answer.supporting_atoms.is_empty(),
1669 "quarantine filtered for normal world"
1670 );
1671
1672 let q2 = Question {
1673 world: WorldKey::new("quarantine:wq"),
1674 text: "nan".into(),
1675 filter: None,
1676 };
1677 let answer_q = engine.ask(q2).await.expect("ask quarantine");
1678 assert_eq!(answer_q.supporting_atoms.len(), 1);
1679 assert!(answer_q.supporting_atoms[0]
1680 .flags()
1681 .iter()
1682 .any(|f| f == "suspect"));
1683 }
1684
1685 #[tokio::test(flavor = "current_thread")]
1686 async fn manage_world_create_adds_world_and_meta() {
1687 let engine = engine_components();
1688 let world = WorldKey::new("tenant:demo/new");
1689 engine
1690 .manage_world(WorldAction::Create(WorldMeta {
1691 world: world.clone(),
1692 description: Some("demo".into()),
1693 labels: vec!["team:demo".into()],
1694 }))
1695 .await
1696 .expect("manage world");
1697 let worlds = engine.worlds().unwrap();
1698 assert!(worlds.contains(&world));
1699 assert!(!engine.world_archived(&world).unwrap());
1700 }
1701
1702 #[tokio::test(flavor = "current_thread")]
1703 async fn manage_world_archive_and_resume_flip_status() {
1704 let engine = engine_components();
1705 let world = WorldKey::new("tenant:demo/archive");
1706 engine
1707 .manage_world(WorldAction::Create(WorldMeta {
1708 world: world.clone(),
1709 description: None,
1710 labels: vec![],
1711 }))
1712 .await
1713 .unwrap();
1714 engine
1715 .manage_world(WorldAction::Archive(world.clone()))
1716 .await
1717 .unwrap();
1718 assert!(engine.world_archived(&world).unwrap());
1719 engine
1720 .manage_world(WorldAction::Resume(world.clone()))
1721 .await
1722 .unwrap();
1723 assert!(!engine.world_archived(&world).unwrap());
1724 }
1725
1726 #[tokio::test(flavor = "current_thread")]
1727 async fn worlds_excludes_archived_by_default() {
1728 let engine = engine_components();
1729 let active = WorldKey::new("tenant:demo/active");
1730 let archived = WorldKey::new("tenant:demo/archived");
1731 engine
1732 .manage_world(WorldAction::Create(WorldMeta {
1733 world: active.clone(),
1734 description: None,
1735 labels: vec![],
1736 }))
1737 .await
1738 .unwrap();
1739 engine
1740 .manage_world(WorldAction::Create(WorldMeta {
1741 world: archived.clone(),
1742 description: None,
1743 labels: vec![],
1744 }))
1745 .await
1746 .unwrap();
1747 engine
1748 .manage_world(WorldAction::Archive(archived.clone()))
1749 .await
1750 .unwrap();
1751 let worlds = engine.worlds().unwrap();
1752 assert!(worlds.contains(&active));
1753 assert!(!worlds.contains(&archived));
1754
1755 let with_archived = engine.worlds_filtered(true).unwrap();
1756 assert!(with_archived.contains(&archived));
1757 }
1758
1759 #[tokio::test(flavor = "current_thread")]
1760 async fn policy_atoms_respect_policy_world_conventions() {
1761 let engine = engine_components();
1762 let world = WorldKey::new("tenant:acme/alpha");
1763 engine
1764 .manage_world(WorldAction::Create(WorldMeta {
1765 world: world.clone(),
1766 description: Some("alpha".into()),
1767 labels: vec!["team:alpha".into()],
1768 }))
1769 .await
1770 .unwrap();
1771
1772 let per_world_policy = NewAtom {
1773 world: WorldKey::new(format!("policy:{}", world.0)),
1774 worker: WorkerKey::new("policy-tester"),
1775 kind: AtomKind::Reflection,
1776 timestamp: Timestamp::new("2024-02-01T00:00:00Z"),
1777 importance: Importance::clamped(0.2),
1778 payload_json: "policy".into(),
1779 vector: None,
1780 flags: vec![],
1781 labels: vec![
1782 "policy:retention_days=1".into(),
1783 "policy:min_importance=0.4".into(),
1784 "policy:replication_allow=tenant:acme/".into(),
1785 ],
1786 links: vec![],
1787 };
1788 engine.remember(per_world_policy).await.unwrap();
1789
1790 let tenant_policy = NewAtom {
1791 world: WorldKey::new("tenant:acme/policy"),
1792 worker: WorkerKey::new("policy-tester"),
1793 kind: AtomKind::Reflection,
1794 timestamp: Timestamp::new("2024-02-02T00:00:00Z"),
1795 importance: Importance::clamped(0.2),
1796 payload_json: "policy".into(),
1797 vector: None,
1798 flags: vec![],
1799 labels: vec![
1800 "policy:replication_deny=tenant:acme/private".into(),
1801 "policy:retention_min_days=7".into(),
1802 ],
1803 links: vec![],
1804 };
1805 engine.remember(tenant_policy).await.unwrap();
1806
1807 let policy = engine.parse_policy(&[], &world);
1808 assert_eq!(policy.retention_days, Some(7));
1809 assert_eq!(policy.min_importance, Some(0.4));
1810 assert!(policy
1811 .replication_allow
1812 .contains(&"tenant:acme/".to_string()));
1813 assert!(policy
1814 .replication_deny
1815 .contains(&"tenant:acme/private".to_string()));
1816 }
1817
1818 #[test]
1819 fn embedder_change_triggers_index_metadata_update() {
1820 let engine = engine_components();
1821 let world = WorldKey::new("w-meta");
1822 engine.ensure_index_entry(&world);
1823 {
1824 let mut guard = engine.index_state.lock();
1825 if let Some(meta) = guard.get_mut(&world) {
1826 meta.embedder_version = "old".into();
1827 meta.ready = true;
1828 meta.progress = 0.0;
1829 meta.rebuilding = true;
1830 meta.started_at = Some(Timestamp::new("2024-01-01T00:00:00Z"));
1831 }
1832 }
1833 engine.maybe_rebuild_index(&world);
1834 let meta = engine
1835 .index_state
1836 .lock()
1837 .get(&world)
1838 .cloned()
1839 .expect("meta");
1840 assert!(meta.ready);
1841 assert_ne!(meta.embedder_version, "old");
1842 assert!(meta.version >= 1);
1843 assert_eq!(meta.progress, 1.0);
1844 assert!(!meta.rebuilding);
1845 }
1846
1847 #[test]
1848 fn rebuild_lag_reports_inflight() {
1849 let engine = engine_components();
1850 let world = WorldKey::new("w-lag");
1851 {
1852 let mut guard = engine.index_state.lock();
1853 guard.insert(
1854 world.clone(),
1855 IndexMetadata {
1856 world: world.clone(),
1857 version: 1,
1858 embedder_version: "v1".into(),
1859 last_rebuilt: Timestamp::new("2024-01-01T00:00:00Z"),
1860 ready: false,
1861 rebuilding: true,
1862 progress: 0.3,
1863 started_at: Some(Timestamp::new("2024-01-02T00:00:00Z")),
1864 last_progress: Some(Timestamp::new("2024-01-02T00:00:00Z")),
1865 },
1866 );
1867 }
1868 let lag = engine.max_index_rebuild_lag_ms();
1869 assert!(lag.is_some());
1870 assert!(lag.unwrap() > 0);
1871 }
1872}