1use core::{future::Future, pin::Pin};
7use dwbase_core::{
8 Atom, AtomId, AtomKind, Importance, Link, LinkKind, Timestamp, WorkerKey, WorldKey,
9};
10use serde::{Deserialize, Serialize};
11use serde_json::json;
12use std::collections::{HashMap, VecDeque};
13use std::hash::{Hash, Hasher};
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::RwLock;
16use std::time::Instant as StdInstant;
17use thiserror::Error;
18use time::format_description::well_known::Rfc3339;
19use time::OffsetDateTime;
20
21pub type Result<T> = std::result::Result<T, DwbaseError>;
22
23#[derive(Debug, Error)]
25pub enum DwbaseError {
26 #[error("capability denied: {0}")]
27 CapabilityDenied(String),
28 #[error("invalid input: {0}")]
29 InvalidInput(String),
30 #[error("storage error: {0}")]
31 Storage(String),
32 #[error("vector error: {0}")]
33 Vector(String),
34 #[error("stream error: {0}")]
35 Stream(String),
36 #[error("internal error: {0}")]
37 Internal(String),
38}
39
40#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
42pub struct NewAtom {
43 pub world: WorldKey,
44 pub worker: WorkerKey,
45 pub kind: AtomKind,
46 pub timestamp: Timestamp,
47 pub importance: Importance,
48 pub payload_json: String,
49 pub vector: Option<Vec<f32>>,
50 pub flags: Vec<String>,
51 pub labels: Vec<String>,
52 pub links: Vec<Link>,
53}
54
55#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
57pub struct AtomFilter {
58 pub world: Option<WorldKey>,
59 pub kinds: Vec<AtomKind>,
60 pub labels: Vec<String>,
61 pub flags: Vec<String>,
62 pub since: Option<Timestamp>,
63 pub until: Option<Timestamp>,
64 pub limit: Option<usize>,
65}
66
67#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
69pub struct Question {
70 pub world: WorldKey,
71 pub text: String,
72 pub filter: Option<AtomFilter>,
73}
74
75#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
77pub struct Answer {
78 pub world: WorldKey,
79 pub text: String,
80 pub supporting_atoms: Vec<Atom>,
81 #[serde(default)]
82 pub warnings: Vec<String>,
83}
84
85#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
87pub struct WorldMeta {
88 pub world: WorldKey,
89 pub description: Option<String>,
90 pub labels: Vec<String>,
91}
92
93#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
95pub enum WorldAction {
96 Create(WorldMeta),
97 Archive(WorldKey),
98 Resume(WorldKey),
99}
100
101pub struct ReflexIndex {
103 config: ReflexIndexConfig,
104 buckets: RwLock<HashMap<WorldKey, HashMap<i64, Vec<Atom>>>>,
105}
106
107#[derive(Clone, Debug)]
108pub struct ReflexIndexConfig {
109 pub recency_seconds: u64,
111 pub bucket_width_seconds: u64,
113}
114
115impl Default for ReflexIndexConfig {
116 fn default() -> Self {
117 Self {
118 recency_seconds: 300,
119 bucket_width_seconds: 1,
120 }
121 }
122}
123
124#[derive(Default)]
125struct ConflictIndex {
126 supersedes: RwLock<HashMap<AtomId, AtomId>>,
127 contradicts: RwLock<HashMap<AtomId, Vec<AtomId>>>,
128 confirms: RwLock<HashMap<AtomId, Vec<AtomId>>>,
129}
130
131#[derive(Clone, Debug, Default)]
132struct GcPolicy {
133 retention_days: Option<u64>,
134 min_importance: Option<f32>,
135 replication_allow: Vec<String>,
136 replication_deny: Vec<String>,
137}
138
139#[derive(Clone, Debug, Serialize, Deserialize)]
140pub struct IndexMetadata {
141 pub world: WorldKey,
142 pub version: u64,
143 pub embedder_version: String,
144 pub last_rebuilt: Timestamp,
145 pub ready: bool,
146 #[serde(default)]
147 pub rebuilding: bool,
148 #[serde(default)]
149 pub progress: f32,
150 #[serde(default)]
151 pub started_at: Option<Timestamp>,
152 #[serde(default)]
153 pub last_progress: Option<Timestamp>,
154}
155
156impl ConflictIndex {
157 fn register(&self, atom: &Atom) {
158 let mut supersedes = self.supersedes.write().expect("supersedes lock");
159 let mut contradicts = self.contradicts.write().expect("contradicts lock");
160 let mut confirms = self.confirms.write().expect("confirms lock");
161 for link in atom.links() {
162 match link.kind {
163 LinkKind::Supersedes => {
164 supersedes.insert(link.target.clone(), atom.id().clone());
165 }
166 LinkKind::Contradicts => {
167 contradicts
168 .entry(link.target.clone())
169 .or_default()
170 .push(atom.id().clone());
171 }
172 LinkKind::Confirms => {
173 confirms
174 .entry(link.target.clone())
175 .or_default()
176 .push(atom.id().clone());
177 }
178 LinkKind::References => {}
179 }
180 }
181 }
182
183 fn superseded_by(&self, id: &AtomId) -> Option<AtomId> {
184 self.supersedes
185 .read()
186 .expect("supersedes lock")
187 .get(id)
188 .cloned()
189 }
190
191 fn contradiction_count(&self, id: &AtomId) -> usize {
192 self.contradicts
193 .read()
194 .expect("contradicts lock")
195 .get(id)
196 .map(|v| v.len())
197 .unwrap_or(0)
198 }
199
200 fn confirmation_count(&self, id: &AtomId) -> usize {
201 self.confirms
202 .read()
203 .expect("confirms lock")
204 .get(id)
205 .map(|v| v.len())
206 .unwrap_or(0)
207 }
208}
209
210#[derive(Clone, Debug, Default)]
211pub struct ReflexFilter {
212 pub world: Option<WorldKey>,
213 pub kinds: Vec<AtomKind>,
214 pub labels: Vec<String>,
215 pub author: Option<WorkerKey>,
216 pub min_importance: Option<f32>,
217 pub since: Option<Timestamp>,
218 pub exclude_flags: Vec<String>,
219}
220
221impl ReflexFilter {
222 pub fn from_question(question: &Question) -> Self {
223 let mut base = ReflexFilter {
224 world: Some(question.world.clone()),
225 ..Default::default()
226 };
227 if let Some(filter) = &question.filter {
228 base.kinds = filter.kinds.clone();
229 base.labels = filter.labels.clone();
230 base.exclude_flags = filter.flags.clone();
231 base.since = filter.since.clone();
232 }
233 base
234 }
235}
236
237impl ReflexIndex {
238 pub fn new(config: ReflexIndexConfig) -> Self {
239 Self {
240 config,
241 buckets: RwLock::new(HashMap::new()),
242 }
243 }
244
245 fn parse_ts(&self, ts: &Timestamp) -> Result<i64> {
246 let dt = OffsetDateTime::parse(&ts.0, &Rfc3339)
247 .map_err(|e| DwbaseError::InvalidInput(format!("invalid timestamp {}: {}", ts.0, e)))?;
248 Ok(dt.unix_timestamp())
249 }
250
251 fn bucket_id(&self, ts: i64) -> i64 {
252 let width = self.config.bucket_width_seconds.max(1) as i64;
253 ts / width
254 }
255
256 fn trim_old_buckets(&self, world_buckets: &mut HashMap<i64, Vec<Atom>>, newest: i64) {
257 let window_buckets =
258 (self.config.recency_seconds / self.config.bucket_width_seconds.max(1)).max(1) as i64;
259 let min_bucket = newest - window_buckets;
260 #[cfg(feature = "metrics")]
261 let before = world_buckets.len();
262 world_buckets.retain(|bucket, _| *bucket >= min_bucket);
263 #[cfg(feature = "metrics")]
264 dwbase_metrics::record_gc_evictions(before.saturating_sub(world_buckets.len()) as u64);
265 }
266
267 pub fn insert(&self, atom: Atom) -> Result<()> {
269 let ts = self.parse_ts(atom.timestamp())?;
270 let bucket = self.bucket_id(ts);
271 let mut guard = self.buckets.write().expect("poisoned reflex index lock");
272 let world_buckets = guard.entry(atom.world().clone()).or_default();
273 world_buckets.entry(bucket).or_default().push(atom);
274 self.trim_old_buckets(world_buckets, bucket);
275 Ok(())
276 }
277
278 pub fn filter(&self, filter: &ReflexFilter) -> Result<Vec<Atom>> {
280 let since_epoch = if let Some(since) = &filter.since {
281 Some(self.parse_ts(since)?)
282 } else {
283 None
284 };
285
286 let guard = self.buckets.read().expect("poisoned reflex index lock");
287 let world_views: Vec<(&WorldKey, &HashMap<i64, Vec<Atom>>)> = match &filter.world {
288 Some(world) => guard
289 .get(world)
290 .map(|b| vec![(world, b)])
291 .unwrap_or_default(),
292 None => guard.iter().collect(),
293 };
294
295 let mut results = Vec::new();
296 for (_world, buckets) in world_views {
297 for atoms in buckets.values() {
298 for atom in atoms {
299 if !filter.kinds.is_empty() && !filter.kinds.contains(atom.kind()) {
300 continue;
301 }
302 if let Some(author) = &filter.author {
303 if atom.worker() != author {
304 continue;
305 }
306 }
307 if let Some(min_imp) = filter.min_importance {
308 if atom.importance().get() < min_imp {
309 continue;
310 }
311 }
312 if let Some(since) = since_epoch {
313 let atom_ts = self.parse_ts(atom.timestamp())?;
314 if atom_ts < since {
315 continue;
316 }
317 }
318 if !filter.labels.is_empty()
319 && !filter.labels.iter().all(|l| atom.labels().contains(l))
320 {
321 continue;
322 }
323 if !filter.exclude_flags.is_empty()
324 && filter
325 .exclude_flags
326 .iter()
327 .any(|f| atom.flags().contains(f))
328 {
329 continue;
330 }
331 results.push(atom.clone());
332 }
333 }
334 }
335 Ok(results)
336 }
337}
338
339pub trait StorageEngine {
341 fn append(&self, atom: Atom) -> Result<()>;
342 fn get_by_ids(&self, ids: &[AtomId]) -> Result<Vec<Atom>>;
343 fn scan(&self, world: &WorldKey, filter: &AtomFilter) -> Result<Vec<Atom>>;
344 fn stats(&self, world: &WorldKey) -> Result<StorageStats>;
345 fn list_ids_in_window(&self, world: &WorldKey, window: &TimeWindow) -> Result<Vec<AtomId>>;
347 fn delete_atoms(&self, world: &WorldKey, ids: &[AtomId]) -> Result<usize>;
349 fn worlds(&self) -> Result<Vec<WorldKey>>;
351}
352
353#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
355pub struct StorageStats {
356 pub atom_count: usize,
357 pub vector_count: usize,
358}
359
360#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
362pub struct TimeWindow {
363 pub start_ms: i64,
364 pub end_ms: i64,
365}
366
367impl TimeWindow {
368 pub fn new(start_ms: i64, end_ms: i64) -> Self {
369 Self { start_ms, end_ms }
370 }
371}
372
373#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
375pub struct SummaryWindow {
376 pub start_ms: i64,
377 pub end_ms: i64,
378}
379
380impl SummaryWindow {
381 pub fn new(start_ms: i64, end_ms: i64) -> Self {
382 Self { start_ms, end_ms }
383 }
384}
385
386#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
388pub struct SummaryAdvert {
389 pub world: WorldKey,
390 pub windows: Vec<SummaryWindow>,
391 pub digest: String,
392}
393
394impl SummaryAdvert {
395 pub fn new(world: WorldKey, windows: Vec<SummaryWindow>, digest: impl Into<String>) -> Self {
396 Self {
397 world,
398 windows,
399 digest: digest.into(),
400 }
401 }
402}
403
404#[derive(Default)]
406pub struct SummaryCatalog {
407 entries: HashMap<WorldKey, Vec<SummaryAdvert>>,
408}
409
410impl SummaryCatalog {
411 pub fn new() -> Self {
412 Self::default()
413 }
414
415 pub fn upsert(&mut self, advert: SummaryAdvert) {
417 let world_entry = self.entries.entry(advert.world.clone()).or_default();
418 if let Some(pos) = world_entry.iter().position(|a| a.digest == advert.digest) {
419 world_entry[pos] = advert;
420 } else {
421 world_entry.push(advert);
422 }
423 }
424
425 pub fn list(&self, world: &WorldKey) -> Vec<SummaryAdvert> {
427 self.entries.get(world).cloned().unwrap_or_else(Vec::new)
428 }
429
430 pub fn worlds(&self) -> Vec<WorldKey> {
432 self.entries.keys().cloned().collect()
433 }
434}
435
436pub trait VectorEngine {
438 fn upsert(&self, world: &WorldKey, atom_id: &AtomId, vector: &[f32]) -> Result<()>;
439 fn search(
440 &self,
441 world: &WorldKey,
442 query: &[f32],
443 k: usize,
444 filter: &AtomFilter,
445 ) -> Result<Vec<AtomId>>;
446 fn rebuild(&self, world: &WorldKey) -> Result<()>;
447}
448
449pub trait StreamEngine {
451 type Handle;
452
453 fn publish(&self, atom: &Atom) -> Result<()>;
454 fn subscribe(&self, world: &WorldKey, filter: AtomFilter) -> Result<Self::Handle>;
455 fn poll(&self, handle: &Self::Handle) -> Result<Option<Atom>>;
456 fn stop(&self, handle: Self::Handle) -> Result<()>;
457}
458
459pub trait Embedder {
461 #[allow(clippy::type_complexity)]
463 #[cfg_attr(feature = "tokio", allow(async_fn_in_trait))]
464 fn embed<'a>(
465 &'a self,
466 payload_json: &'a str,
467 ) -> Pin<Box<dyn Future<Output = Result<Option<Vec<f32>>>> + Send + 'a>>;
468
469 fn model_version(&self) -> &'static str {
471 "unknown"
472 }
473}
474
475pub trait Gatekeeper {
477 fn check_remember(&self, new_atom: &NewAtom) -> Result<()>;
478 fn check_ask(&self, question: &Question) -> Result<()>;
479 fn check_world_action(&self, action: &WorldAction) -> Result<()>;
480}
481
482pub struct DWBaseEngine<S, V, T, G, E> {
484 pub storage: S,
485 pub vector: V,
486 pub stream: T,
487 pub gatekeeper: G,
488 pub embedder: E,
489 pub reflex_index: ReflexIndex,
490 conflict_index: ConflictIndex,
491 suspicions: parking_lot::Mutex<HashMap<WorkerKey, VecDeque<StdInstant>>>,
492 index_state: parking_lot::Mutex<HashMap<WorldKey, IndexMetadata>>,
493 id_gen: AtomicU64,
494}
495
496impl<S, V, T, G, E> DWBaseEngine<S, V, T, G, E> {
497 pub fn new(storage: S, vector: V, stream: T, gatekeeper: G, embedder: E) -> Self {
498 Self {
499 storage,
500 vector,
501 stream,
502 gatekeeper,
503 embedder,
504 reflex_index: ReflexIndex::new(ReflexIndexConfig::default()),
505 conflict_index: ConflictIndex::default(),
506 suspicions: parking_lot::Mutex::new(HashMap::new()),
507 index_state: parking_lot::Mutex::new(HashMap::new()),
508 id_gen: AtomicU64::new(1),
509 }
510 }
511
512 pub fn with_reflex_index(
513 storage: S,
514 vector: V,
515 stream: T,
516 gatekeeper: G,
517 embedder: E,
518 reflex_index: ReflexIndex,
519 ) -> Self {
520 Self {
521 storage,
522 vector,
523 stream,
524 gatekeeper,
525 embedder,
526 reflex_index,
527 conflict_index: ConflictIndex::default(),
528 suspicions: parking_lot::Mutex::new(HashMap::new()),
529 index_state: parking_lot::Mutex::new(HashMap::new()),
530 id_gen: AtomicU64::new(1),
531 }
532 }
533}
534
535impl<S, V, T, G, E> DWBaseEngine<S, V, T, G, E>
536where
537 S: StorageEngine,
538 V: VectorEngine,
539 T: StreamEngine,
540 G: Gatekeeper,
541 E: Embedder,
542{
543 fn new_id(&self) -> AtomId {
544 let id = self.id_gen.fetch_add(1, Ordering::Relaxed);
545 AtomId::new(format!("atom-{}", id))
546 }
547
548 fn register_conflicts(&self, atom: &Atom) {
549 self.conflict_index.register(atom);
550 }
551
552 fn record_suspicion(&self, worker: &WorkerKey, now: StdInstant) -> usize {
553 let mut map = self.suspicions.lock();
554 let entry = map.entry(worker.clone()).or_default();
555 entry.push_back(now);
556 while let Some(front) = entry.front().cloned() {
558 if now.duration_since(front) > std::time::Duration::from_secs(1) {
559 entry.pop_front();
560 } else {
561 break;
562 }
563 }
564 entry.len()
565 }
566
567 fn is_impossible_payload(payload: &str) -> bool {
568 let lower = payload.to_ascii_lowercase();
569 lower.contains("nan") || lower.contains("infinity") || lower.contains("impossible")
570 }
571
572 fn low_trust(worker: &WorkerKey) -> bool {
573 worker.0.to_ascii_lowercase().starts_with("lowtrust")
574 }
575
576 fn should_quarantine(&self, atom: &NewAtom, now: StdInstant) -> bool {
577 let spam = self.record_suspicion(&atom.worker, now) > 5;
578 let impossible = Self::is_impossible_payload(&atom.payload_json);
579 let low_trust = Self::low_trust(&atom.worker) && atom.importance.get() > 0.7;
580 spam || impossible || low_trust
581 }
582
583 fn trust_hint(&self, atom: &Atom) -> f32 {
584 let mut hasher = std::collections::hash_map::DefaultHasher::new();
586 atom.worker().0.hash(&mut hasher);
587 let v = hasher.finish();
588 (v % 10_000) as f32 / 10_000.0
589 }
590
591 fn rank_score(&self, atom: &Atom) -> f32 {
592 let mut score = atom.importance().get();
593 if self.conflict_index.superseded_by(atom.id()).is_some() {
594 score -= 1.0;
595 }
596 let contradictions = self.conflict_index.contradiction_count(atom.id());
597 if contradictions > 0 {
598 score -= 0.3 * contradictions as f32;
599 }
600 let confirms = self.conflict_index.confirmation_count(atom.id());
601 if confirms > 0 {
602 score += 0.2 * confirms as f32;
603 }
604 score
605 }
606
607 fn parse_policy(&self, atoms: &[Atom], world: &WorldKey) -> GcPolicy {
608 let mut policy = GcPolicy {
610 retention_days: Some(365),
611 ..Default::default()
612 };
613 let mut retention_min_days: Option<u64> = None;
614
615 let mut policy_atoms: Vec<Atom> = atoms
616 .iter()
617 .filter(|a| a.world() == world)
618 .cloned()
619 .collect();
620
621 let policy_world = WorldKey::new(format!("policy:{}", world.0));
625 if let Ok(mut extra) = self.storage.scan(&policy_world, &AtomFilter::default()) {
626 policy_atoms.append(&mut extra);
627 }
628 if let Some(tenant_world) = world.0.strip_prefix("tenant:").and_then(|rest| {
629 rest.split_once('/')
630 .map(|(tenant, _)| WorldKey::new(format!("tenant:{tenant}/policy")))
631 }) {
632 if let Ok(mut extra) = self.storage.scan(&tenant_world, &AtomFilter::default()) {
633 policy_atoms.append(&mut extra);
634 }
635 }
636
637 for atom in policy_atoms {
638 for label in atom.labels() {
639 if let Some(val) = label.strip_prefix("policy:retention_days=") {
640 if let Ok(days) = val.parse::<u64>() {
641 policy.retention_days = Some(days);
642 }
643 }
644 if let Some(val) = label.strip_prefix("policy:retention_min_days=") {
645 if let Ok(days) = val.parse::<u64>() {
646 retention_min_days = Some(days);
647 }
648 }
649 if let Some(val) = label.strip_prefix("policy:min_importance=") {
650 if let Ok(v) = val.parse::<f32>() {
651 policy.min_importance = Some(v);
652 }
653 }
654 if let Some(val) = label.strip_prefix("policy:replication_allow=") {
655 if !val.is_empty() {
656 policy.replication_allow.push(val.to_string());
657 }
658 }
659 if let Some(val) = label.strip_prefix("policy:replication_deny=") {
660 if !val.is_empty() {
661 policy.replication_deny.push(val.to_string());
662 }
663 }
664 }
665 }
666
667 if let Some(min_days) = retention_min_days {
668 match policy.retention_days {
669 Some(current) if current < min_days => policy.retention_days = Some(min_days),
670 None => policy.retention_days = Some(min_days),
671 _ => {}
672 }
673 }
674 policy
675 }
676
677 fn is_policy_atom(atom: &Atom) -> bool {
678 atom.labels()
679 .iter()
680 .any(|l| l.starts_with("policy:") || l == "world_meta")
681 }
682
683 fn world_status(&self, world: &WorldKey) -> Result<Option<String>> {
684 let atoms = self
685 .storage
686 .scan(
687 world,
688 &AtomFilter {
689 world: Some(world.clone()),
690 kinds: Vec::new(),
691 labels: vec!["world_meta".into()],
692 flags: Vec::new(),
693 since: None,
694 until: None,
695 limit: None,
696 },
697 )
698 .unwrap_or_default();
699 let mut latest: Option<(OffsetDateTime, String)> = None;
700 for atom in atoms {
701 let ts = parse_ts(atom.timestamp()).unwrap_or(OffsetDateTime::UNIX_EPOCH);
702 let status = atom
703 .labels()
704 .iter()
705 .find_map(|l| l.strip_prefix("world_status:"))
706 .map(|s| s.to_string());
707 if let Some(status) = status {
708 if latest.as_ref().map(|(t, _)| *t < ts).unwrap_or(true) {
709 latest = Some((ts, status));
710 }
711 }
712 }
713 Ok(latest.map(|(_, s)| s))
714 }
715
716 pub fn world_archived(&self, world: &WorldKey) -> Result<bool> {
717 Ok(matches!(
718 self.world_status(world)?,
719 Some(status) if status == "archived"
720 ))
721 }
722
723 pub fn worlds(&self) -> Result<Vec<WorldKey>> {
725 self.worlds_filtered(false)
726 }
727
728 pub fn worlds_filtered(&self, include_archived: bool) -> Result<Vec<WorldKey>> {
730 let mut worlds = self.storage.worlds()?;
731 worlds.sort_by(|a, b| a.0.cmp(&b.0));
732 if include_archived {
733 return Ok(worlds);
734 }
735 let mut filtered = Vec::new();
736 for world in worlds {
737 if self.world_archived(&world)? {
738 continue;
739 }
740 filtered.push(world);
741 }
742 Ok(filtered)
743 }
744
745 fn now_timestamp(&self) -> Timestamp {
746 Timestamp(
747 OffsetDateTime::now_utc()
748 .format(&Rfc3339)
749 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".into()),
750 )
751 }
752
753 pub fn index_status(&self) -> Vec<IndexMetadata> {
754 self.index_state.lock().values().cloned().collect()
755 }
756
757 fn ensure_index_entry(&self, world: &WorldKey) {
758 let mut guard = self.index_state.lock();
759 guard.entry(world.clone()).or_insert_with(|| IndexMetadata {
760 world: world.clone(),
761 version: 1,
762 embedder_version: self.embedder.model_version().into(),
763 last_rebuilt: self.now_timestamp(),
764 ready: true,
765 rebuilding: false,
766 progress: 1.0,
767 started_at: None,
768 last_progress: None,
769 });
770 }
771
772 fn maybe_rebuild_index(&self, world: &WorldKey) {
773 self.ensure_index_entry(world);
774 let mut guard = self.index_state.lock();
775 if let Some(meta) = guard.get_mut(world) {
776 if meta.embedder_version != self.embedder.model_version() {
777 meta.ready = false;
778 meta.rebuilding = true;
779 meta.progress = 0.0;
780 let now = self.now_timestamp();
781 meta.started_at = Some(now.clone());
782 meta.last_progress = Some(now.clone());
783 let _ = self.vector.rebuild(world);
784 meta.version += 1;
785 meta.embedder_version = self.embedder.model_version().into();
786 meta.last_rebuilt = self.now_timestamp();
787 meta.ready = true;
788 meta.rebuilding = false;
789 meta.progress = 1.0;
790 meta.last_progress = Some(meta.last_rebuilt.clone());
791 }
792 }
793 }
794
795 fn atom_from_new(&self, id: AtomId, mut new_atom: NewAtom) -> Atom {
796 let mut builder = Atom::builder(
797 id,
798 new_atom.world,
799 new_atom.worker,
800 new_atom.kind,
801 new_atom.timestamp,
802 new_atom.importance,
803 new_atom.payload_json,
804 );
805 builder = builder.vector(new_atom.vector);
806 for flag in new_atom.flags.drain(..) {
807 builder = builder.add_flag(flag);
808 }
809 for label in new_atom.labels.drain(..) {
810 builder = builder.add_label(label);
811 }
812 for link in new_atom.links.drain(..) {
813 builder = builder.add_typed_link(link.target, link.kind);
814 }
815 builder.build()
816 }
817
818 pub async fn remember(&self, mut new_atom: NewAtom) -> Result<AtomId> {
820 #[cfg(feature = "metrics")]
821 let start = StdInstant::now();
822
823 self.gatekeeper.check_remember(&new_atom)?;
824 let now = StdInstant::now();
825
826 let id = self.new_id();
828 if new_atom.timestamp.0.is_empty() {
829 new_atom.timestamp = self.now_timestamp();
830 }
831
832 self.ensure_index_entry(&new_atom.world);
833 self.maybe_rebuild_index(&new_atom.world);
834
835 if self.should_quarantine(&new_atom, now) {
837 new_atom.world = WorldKey::new(format!("quarantine:{}", new_atom.world.0));
838 if !new_atom.flags.contains(&"suspect".to_string()) {
839 new_atom.flags.push("suspect".into());
840 }
841 if !new_atom.labels.contains(&"quarantine".to_string()) {
842 new_atom.labels.push("quarantine".into());
843 }
844 }
845
846 let embedded_vector = self.embedder.embed(&new_atom.payload_json).await?;
848 if new_atom.vector.is_none() {
849 new_atom.vector = embedded_vector;
850 }
851
852 let atom = self.atom_from_new(id.clone(), new_atom);
853
854 #[cfg(feature = "metrics")]
855 let freshness = parse_ts(atom.timestamp()).ok().and_then(|ts| {
856 let age = OffsetDateTime::now_utc() - ts;
857 (!age.is_negative()).then_some(age)
858 });
859
860 if let Some(vec) = atom.vector() {
861 self.vector.upsert(atom.world(), atom.id(), vec)?;
862 }
863 self.storage.append(atom.clone())?;
864 self.reflex_index.insert(atom.clone())?;
865 self.register_conflicts(&atom);
866 self.stream.publish(&atom)?;
867
868 #[cfg(feature = "metrics")]
869 {
870 dwbase_metrics::record_remember_latency(start.elapsed());
871 if let Some(age) = freshness {
872 let age_std = std::time::Duration::from_secs_f64(age.as_seconds_f64());
873 dwbase_metrics::record_index_freshness(age_std);
874 }
875 }
876
877 Ok(id)
878 }
879
880 pub async fn ask(&self, question: Question) -> Result<Answer> {
882 #[cfg(feature = "metrics")]
883 let start = StdInstant::now();
884
885 let reflex_filter = ReflexFilter::from_question(&question);
886 self.gatekeeper.check_ask(&question)?;
887
888 let mut candidates = self.reflex_index.filter(&reflex_filter)?;
889 let mut warnings = Vec::new();
890 self.ensure_index_entry(&question.world);
891 self.maybe_rebuild_index(&question.world);
892 let ready_meta = self
893 .index_state
894 .lock()
895 .get(&question.world)
896 .cloned()
897 .unwrap_or_else(|| IndexMetadata {
898 world: question.world.clone(),
899 version: 1,
900 embedder_version: self.embedder.model_version().into(),
901 last_rebuilt: self.now_timestamp(),
902 ready: true,
903 rebuilding: false,
904 progress: 1.0,
905 started_at: None,
906 last_progress: None,
907 });
908 let ready = ready_meta.ready;
909
910 if candidates.is_empty() {
912 let mut filter = question.filter.clone().unwrap_or_default();
913 filter.world = Some(question.world.clone());
914 candidates = self.storage.scan(&question.world, &filter)?;
915 }
916
917 let querying_quarantine = question.world.0.starts_with("quarantine:");
919 if !querying_quarantine {
920 candidates.retain(|a| !a.world().0.starts_with("quarantine:"));
921 }
922
923 if ready {
925 if let Ok(Some(query_vec)) = self.embedder.embed(&question.text).await {
926 let filter = question.filter.clone().unwrap_or_default();
927 let ids = self
928 .vector
929 .search(&question.world, &query_vec, 10, &filter)?;
930 if !ids.is_empty() {
931 let fetched = self.storage.get_by_ids(&ids)?;
932 candidates.extend(fetched);
933 }
934 }
935 } else {
936 warnings.push("index not ready; used fallback search".into());
937 if ready_meta.rebuilding {
938 warnings.push("index rebuilding in background".into());
939 }
940 }
941
942 if candidates.is_empty() {
944 let filter = question.filter.clone().unwrap_or_default();
945 let mut storage_scan = self.storage.scan(&question.world, &filter)?;
946 storage_scan.truncate(20);
947 candidates.extend(storage_scan);
948 }
949
950 let mut seen = std::collections::HashSet::new();
952 candidates.retain(|a| seen.insert(a.id().clone()));
953
954 candidates.sort_by(|a, b| {
956 let score_a = self.rank_score(a);
957 let score_b = self.rank_score(b);
958 score_b
959 .partial_cmp(&score_a)
960 .unwrap_or(std::cmp::Ordering::Equal)
961 .then_with(|| {
962 let ts_a = parse_ts(a.timestamp()).unwrap_or(OffsetDateTime::UNIX_EPOCH);
963 let ts_b = parse_ts(b.timestamp()).unwrap_or(OffsetDateTime::UNIX_EPOCH);
964 ts_b.cmp(&ts_a)
965 })
966 .then_with(|| {
967 b.importance()
968 .get()
969 .partial_cmp(&a.importance().get())
970 .unwrap_or(std::cmp::Ordering::Equal)
971 })
972 .then_with(|| {
973 self.trust_hint(b)
974 .partial_cmp(&self.trust_hint(a))
975 .unwrap_or(std::cmp::Ordering::Equal)
976 })
977 });
978
979 let answer = Answer {
980 world: question.world,
981 text: "answer-pending".into(),
982 supporting_atoms: candidates,
983 warnings,
984 };
985 #[cfg(feature = "metrics")]
986 dwbase_metrics::record_ask_latency(start.elapsed());
987 Ok(answer)
988 }
989
990 pub fn list_ids_in_window(&self, world: &WorldKey, window: &TimeWindow) -> Result<Vec<AtomId>> {
992 self.storage.list_ids_in_window(world, window)
993 }
994
995 pub fn storage_ready(&self) -> bool {
997 self.storage.worlds().is_ok()
998 }
999
1000 pub fn max_index_rebuild_lag_ms(&self) -> Option<u64> {
1002 let now = OffsetDateTime::now_utc();
1003 let guard = self.index_state.lock();
1004 let mut max_lag = None;
1005 for meta in guard.values() {
1006 if meta.ready && !meta.rebuilding {
1007 continue;
1008 }
1009 let ts = meta
1010 .last_progress
1011 .as_ref()
1012 .or(meta.started_at.as_ref())
1013 .unwrap_or(&meta.last_rebuilt);
1014 if let Ok(t) = parse_ts(ts) {
1015 let lag = now - t;
1016 let ms = lag.whole_milliseconds().max(0) as u64;
1017 if max_lag.map(|m| ms > m).unwrap_or(true) {
1018 max_lag = Some(ms);
1019 }
1020 }
1021 }
1022 max_lag
1023 }
1024
1025 pub fn get_atoms(&self, ids: &[AtomId]) -> Result<Vec<Atom>> {
1027 self.storage.get_by_ids(ids)
1028 }
1029
1030 pub async fn ingest_remote_atoms(&self, atoms: Vec<Atom>) -> Result<Vec<AtomId>> {
1032 let mut ingested = Vec::new();
1033 for atom in atoms {
1034 let id = atom.id().clone();
1035 if !self
1036 .storage
1037 .get_by_ids(std::slice::from_ref(&id))?
1038 .is_empty()
1039 {
1040 continue;
1041 }
1042 if let Some(vec) = atom.vector() {
1043 self.vector.upsert(atom.world(), atom.id(), vec)?;
1044 }
1045 self.storage.append(atom.clone())?;
1046 self.reflex_index.insert(atom.clone())?;
1047 self.register_conflicts(&atom);
1048 self.stream.publish(&atom)?;
1049 ingested.push(id);
1050 }
1051 Ok(ingested)
1052 }
1053
1054 pub fn gc_once(&self, _max_disk_mb: Option<u64>) -> Result<usize> {
1056 let mut evicted = 0usize;
1057 let worlds = self.storage.worlds()?;
1058 for world in worlds {
1059 let mut atoms = self.storage.scan(&world, &AtomFilter::default())?;
1060 let policy = self.parse_policy(&atoms, &world);
1061 let referenced: std::collections::HashSet<_> = atoms
1062 .iter()
1063 .flat_map(|a| a.links().iter().map(|l| l.target.clone()))
1064 .collect();
1065
1066 atoms.sort_by_key(|a| parse_ts(a.timestamp()).unwrap_or(OffsetDateTime::UNIX_EPOCH));
1067 let now = atoms
1068 .iter()
1069 .filter_map(|a| parse_ts(a.timestamp()).ok())
1070 .max()
1071 .unwrap_or_else(OffsetDateTime::now_utc);
1072 let mut to_delete = Vec::new();
1073
1074 for atom in &atoms {
1075 if Self::is_policy_atom(atom) {
1076 continue;
1077 }
1078 if referenced.contains(atom.id()) {
1079 continue;
1080 }
1081 if atom.labels().iter().any(|l| l.starts_with("world_meta")) {
1082 continue;
1083 }
1084 let expired = policy
1085 .retention_days
1086 .filter(|d| *d > 0)
1087 .and_then(|days| {
1088 parse_ts(atom.timestamp())
1089 .ok()
1090 .map(|ts| now - ts > time::Duration::days(days as i64))
1091 })
1092 .unwrap_or(false);
1093 let low_importance = policy
1094 .min_importance
1095 .map(|min| atom.importance().get() < min)
1096 .unwrap_or(false);
1097 if expired || low_importance {
1098 to_delete.push(atom.id().clone());
1099 continue;
1100 }
1101 }
1102 if !to_delete.is_empty() {
1103 evicted += self.storage.delete_atoms(&world, &to_delete)?;
1104 }
1105 }
1106 #[cfg(feature = "metrics")]
1107 {
1108 dwbase_metrics::record_gc_evictions(evicted as u64);
1109 }
1110 Ok(evicted)
1111 }
1112
1113 pub async fn observe(&self, atom: Atom) -> Result<()> {
1115 self.stream.publish(&atom)?;
1116 Ok(())
1117 }
1118
1119 pub async fn replay(&self, world: WorldKey, filter: AtomFilter) -> Result<Vec<Atom>> {
1121 self.storage.scan(&world, &filter)
1122 }
1123
1124 pub async fn manage_world(&self, action: WorldAction) -> Result<()> {
1126 self.gatekeeper.check_world_action(&action)?;
1127
1128 let (world, next_status, description, mut extra_labels) = match action.clone() {
1129 WorldAction::Create(meta) => (
1130 meta.world,
1131 "active".to_string(),
1132 meta.description,
1133 meta.labels,
1134 ),
1135 WorldAction::Archive(world) => (world, "archived".to_string(), None, Vec::new()),
1136 WorldAction::Resume(world) => (world, "active".to_string(), None, Vec::new()),
1137 };
1138
1139 if let Some(current) = self.world_status(&world)? {
1141 if current == next_status && description.is_none() && extra_labels.is_empty() {
1142 return Ok(());
1143 }
1144 }
1145
1146 let now = self.now_timestamp();
1147 let mut payload = json!({
1148 "action": next_status,
1149 "world": world.0,
1150 "at": now.0,
1151 });
1152 if let Some(d) = &description {
1153 payload["description"] = json!(d);
1154 }
1155 if !extra_labels.is_empty() {
1156 payload["labels"] = json!(extra_labels);
1157 }
1158
1159 let mut builder = Atom::builder(
1160 self.new_id(),
1161 world.clone(),
1162 WorkerKey::new("system"),
1163 AtomKind::Reflection,
1164 now,
1165 Importance::clamped(0.1),
1166 payload.to_string(),
1167 )
1168 .add_label("world_meta".to_string())
1169 .add_label(format!("world_status:{next_status}"));
1170
1171 for l in extra_labels.drain(..) {
1172 builder = builder.add_label(l);
1173 }
1174
1175 let atom = builder.build();
1176
1177 if let Some(vec) = atom.vector() {
1178 self.vector.upsert(atom.world(), atom.id(), vec)?;
1179 }
1180 self.storage.append(atom.clone())?;
1181 self.reflex_index.insert(atom.clone())?;
1182 self.register_conflicts(&atom);
1183 self.stream.publish(&atom)?;
1184 Ok(())
1185 }
1186}
1187
1188#[cfg(test)]
1189mod tests {
1190 use super::*;
1191 use dwbase_core::{Link, LinkKind};
1192 use std::collections::HashMap;
1193 use std::sync::{
1194 atomic::{AtomicUsize, Ordering},
1195 Mutex,
1196 };
1197
1198 #[derive(Clone)]
1199 struct AllowGatekeeper;
1200
1201 impl Gatekeeper for AllowGatekeeper {
1202 fn check_remember(&self, _new_atom: &NewAtom) -> Result<()> {
1203 Ok(())
1204 }
1205
1206 fn check_ask(&self, _question: &Question) -> Result<()> {
1207 Ok(())
1208 }
1209
1210 fn check_world_action(&self, _action: &WorldAction) -> Result<()> {
1211 Ok(())
1212 }
1213 }
1214
1215 struct DummyEmbedder;
1216
1217 impl Embedder for DummyEmbedder {
1218 #[allow(clippy::type_complexity)]
1219 fn embed<'a>(
1220 &'a self,
1221 payload_json: &'a str,
1222 ) -> Pin<Box<dyn Future<Output = Result<Option<Vec<f32>>>> + Send + 'a>> {
1223 Box::pin(async move {
1224 let parts: Vec<f32> = payload_json
1225 .split(',')
1226 .filter_map(|p| p.trim().parse::<f32>().ok())
1227 .collect();
1228 if parts.is_empty() {
1229 Ok(None)
1230 } else {
1231 Ok(Some(parts))
1232 }
1233 })
1234 }
1235
1236 fn model_version(&self) -> &'static str {
1237 "dummy-test"
1238 }
1239 }
1240
1241 #[derive(Default)]
1242 struct MemStorage {
1243 atoms: Mutex<Vec<Atom>>,
1244 }
1245
1246 impl StorageEngine for MemStorage {
1247 fn append(&self, atom: Atom) -> Result<()> {
1248 let mut atoms = self.atoms.lock().unwrap();
1249 atoms.push(atom);
1250 Ok(())
1251 }
1252
1253 fn get_by_ids(&self, ids: &[AtomId]) -> Result<Vec<Atom>> {
1254 let atoms = self.atoms.lock().unwrap();
1255 Ok(atoms
1256 .iter()
1257 .filter(|a| ids.contains(a.id()))
1258 .cloned()
1259 .collect())
1260 }
1261
1262 fn scan(&self, world: &WorldKey, filter: &AtomFilter) -> Result<Vec<Atom>> {
1263 let atoms = self.atoms.lock().unwrap();
1264 let mut results = Vec::new();
1265 for atom in atoms.iter() {
1266 if atom.world() != world {
1267 continue;
1268 }
1269 if !filter.kinds.is_empty() && !filter.kinds.contains(atom.kind()) {
1270 continue;
1271 }
1272 if !filter.labels.is_empty()
1273 && !filter.labels.iter().all(|l| atom.labels().contains(l))
1274 {
1275 continue;
1276 }
1277 if !filter.flags.is_empty()
1278 && !filter.flags.iter().all(|f| atom.flags().contains(f))
1279 {
1280 continue;
1281 }
1282 results.push(atom.clone());
1283 }
1284 Ok(results)
1285 }
1286
1287 fn stats(&self, _world: &WorldKey) -> Result<StorageStats> {
1288 let atoms = self.atoms.lock().unwrap();
1289 Ok(StorageStats {
1290 atom_count: atoms.len(),
1291 vector_count: 0,
1292 })
1293 }
1294
1295 fn list_ids_in_window(&self, world: &WorldKey, window: &TimeWindow) -> Result<Vec<AtomId>> {
1296 let atoms = self.atoms.lock().unwrap();
1297 Ok(atoms
1298 .iter()
1299 .filter(|a| a.world() == world)
1300 .filter(|a| {
1301 let ts = Timestamp::new(a.timestamp().0.clone());
1302 if let Ok(dt) = OffsetDateTime::parse(ts.0.as_str(), &Rfc3339) {
1303 let ms = dt.unix_timestamp_nanos() / 1_000_000;
1304 let start = window.start_ms as i128;
1305 let end = window.end_ms as i128;
1306 ms >= start && ms <= end
1307 } else {
1308 false
1309 }
1310 })
1311 .map(|a| a.id().clone())
1312 .collect())
1313 }
1314
1315 fn delete_atoms(&self, world: &WorldKey, ids: &[AtomId]) -> Result<usize> {
1316 let mut atoms = self.atoms.lock().unwrap();
1317 let before = atoms.len();
1318 atoms.retain(|a| !(a.world() == world && ids.contains(a.id())));
1319 Ok(before.saturating_sub(atoms.len()))
1320 }
1321
1322 fn worlds(&self) -> Result<Vec<WorldKey>> {
1323 let atoms = self.atoms.lock().unwrap();
1324 Ok(atoms
1325 .iter()
1326 .map(|a| a.world().clone())
1327 .collect::<std::collections::HashSet<_>>()
1328 .into_iter()
1329 .collect())
1330 }
1331 }
1332
1333 #[derive(Default)]
1334 struct MemVector {
1335 dims: Mutex<Option<usize>>,
1336 data: Mutex<HashMap<AtomId, Vec<f32>>>,
1337 }
1338
1339 impl VectorEngine for MemVector {
1340 fn upsert(&self, _world: &WorldKey, atom_id: &AtomId, vector: &[f32]) -> Result<()> {
1341 let mut data = self.data.lock().unwrap();
1342 data.insert(atom_id.clone(), vector.to_vec());
1343 let mut dims = self.dims.lock().unwrap();
1344 dims.get_or_insert(vector.len());
1345 Ok(())
1346 }
1347
1348 fn search(
1349 &self,
1350 _world: &WorldKey,
1351 query: &[f32],
1352 k: usize,
1353 _filter: &AtomFilter,
1354 ) -> Result<Vec<AtomId>> {
1355 let data = self.data.lock().unwrap();
1356 let mut pairs: Vec<(AtomId, f32)> = data
1357 .iter()
1358 .map(|(id, v)| {
1359 let dist = v
1360 .iter()
1361 .zip(query.iter())
1362 .map(|(a, b)| (a - b) * (a - b))
1363 .sum();
1364 (id.clone(), dist)
1365 })
1366 .collect();
1367 pairs.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap());
1368 Ok(pairs.into_iter().take(k).map(|(id, _)| id).collect())
1369 }
1370
1371 fn rebuild(&self, _world: &WorldKey) -> Result<()> {
1372 Ok(())
1373 }
1374 }
1375
1376 #[derive(Default)]
1377 struct MemStream {
1378 next: AtomicUsize,
1379 subscribers: Mutex<HashMap<usize, (AtomFilter, Vec<Atom>)>>,
1380 }
1381
1382 impl StreamEngine for MemStream {
1383 type Handle = usize;
1384
1385 fn publish(&self, atom: &Atom) -> Result<()> {
1386 let mut subs = self.subscribers.lock().unwrap();
1387 for (_id, (filter, queue)) in subs.iter_mut() {
1388 let mut include = true;
1389 if let Some(world) = &filter.world {
1390 include &= atom.world() == world;
1391 }
1392 if !filter.kinds.is_empty() && !filter.kinds.contains(atom.kind()) {
1393 include = false;
1394 }
1395 if include {
1396 queue.push(atom.clone());
1397 }
1398 }
1399 Ok(())
1400 }
1401
1402 fn subscribe(&self, _world: &WorldKey, filter: AtomFilter) -> Result<Self::Handle> {
1403 let id = self.next.fetch_add(1, Ordering::Relaxed);
1404 let mut subs = self.subscribers.lock().unwrap();
1405 subs.insert(id, (filter, Vec::new()));
1406 Ok(id)
1407 }
1408
1409 fn poll(&self, handle: &Self::Handle) -> Result<Option<Atom>> {
1410 let mut subs = self.subscribers.lock().unwrap();
1411 if let Some((_, queue)) = subs.get_mut(handle) {
1412 if queue.is_empty() {
1413 Ok(None)
1414 } else {
1415 Ok(Some(queue.remove(0)))
1416 }
1417 } else {
1418 Err(DwbaseError::Stream(format!("unknown handle {handle}")))
1419 }
1420 }
1421
1422 fn stop(&self, handle: Self::Handle) -> Result<()> {
1423 let mut subs = self.subscribers.lock().unwrap();
1424 subs.remove(&handle);
1425 Ok(())
1426 }
1427 }
1428
1429 fn engine_components(
1430 ) -> DWBaseEngine<MemStorage, MemVector, MemStream, AllowGatekeeper, DummyEmbedder> {
1431 let storage = MemStorage::default();
1432 let vector = MemVector::default();
1433 let stream = MemStream::default();
1434 let gatekeeper = AllowGatekeeper;
1435 DWBaseEngine::new(storage, vector, stream, gatekeeper, DummyEmbedder)
1436 }
1437
1438 fn new_atom_with(ts: &str, payload: &str, label: &str) -> NewAtom {
1439 NewAtom {
1440 world: WorldKey::new("w1"),
1441 worker: WorkerKey::new("worker-1"),
1442 kind: AtomKind::Observation,
1443 timestamp: Timestamp::new(ts),
1444 importance: Importance::new(0.5).unwrap(),
1445 payload_json: payload.into(),
1446 vector: None,
1447 flags: vec![],
1448 labels: vec![label.into()],
1449 links: vec![],
1450 }
1451 }
1452
1453 #[tokio::test(flavor = "current_thread")]
1454 async fn remember_and_ask_returns_ranked_atoms() {
1455 let engine = engine_components();
1456 let a1 = new_atom_with("2024-01-01T00:00:01Z", "0,0", "a1");
1457 let a2 = new_atom_with("2024-01-01T00:00:00Z", "10,10", "a2");
1458
1459 let id1 = engine.remember(a1).await.expect("remember a1");
1460 let id2 = engine.remember(a2).await.expect("remember a2");
1461
1462 assert_ne!(id1, id2);
1463
1464 let question = Question {
1465 world: WorldKey::new("w1"),
1466 text: "0,0".into(),
1467 filter: None,
1468 };
1469
1470 let answer = engine.ask(question).await.expect("ask");
1471 let ids: Vec<_> = answer
1472 .supporting_atoms
1473 .iter()
1474 .map(|a| a.id().clone())
1475 .collect();
1476 assert_eq!(ids.first().unwrap(), &id1);
1477 assert!(ids.contains(&id2));
1478 }
1479
1480 #[tokio::test(flavor = "current_thread")]
1481 async fn superseded_atoms_downranked() {
1482 let engine = engine_components();
1483 let base = new_atom_with("2024-01-01T00:00:00Z", "p", "base");
1484 let base_id = engine.remember(base).await.expect("base");
1485
1486 let newer = NewAtom {
1487 world: WorldKey::new("w1"),
1488 worker: WorkerKey::new("worker-1"),
1489 kind: AtomKind::Observation,
1490 timestamp: Timestamp::new("2024-01-01T00:00:05Z"),
1491 importance: Importance::new(0.5).unwrap(),
1492 payload_json: "p-new".into(),
1493 vector: None,
1494 flags: vec![],
1495 labels: vec![],
1496 links: vec![Link::new(base_id.clone(), LinkKind::Supersedes)],
1497 };
1498 let newer_id = engine.remember(newer).await.expect("newer");
1499
1500 let question = Question {
1501 world: WorldKey::new("w1"),
1502 text: "p".into(),
1503 filter: None,
1504 };
1505
1506 let answer = engine.ask(question).await.expect("ask");
1507 let ids: Vec<_> = answer
1508 .supporting_atoms
1509 .iter()
1510 .map(|a| a.id().clone())
1511 .collect();
1512 assert_eq!(ids.first(), Some(&newer_id));
1513 assert!(ids.contains(&base_id));
1514 }
1515
1516 #[tokio::test(flavor = "current_thread")]
1517 async fn gc_retains_linked_and_policy_atoms() {
1518 let engine = engine_components();
1519 let base = new_atom_with("2024-01-01T00:00:00Z", "p", "keep");
1520 let base_id = engine.remember(base).await.unwrap();
1521 let linker = NewAtom {
1522 world: WorldKey::new("w1"),
1523 worker: WorkerKey::new("w1-worker"),
1524 kind: AtomKind::Observation,
1525 timestamp: Timestamp::new("2024-01-02T00:00:00Z"),
1526 importance: Importance::new(0.5).unwrap(),
1527 payload_json: "linker".into(),
1528 vector: None,
1529 flags: vec![],
1530 labels: vec![],
1531 links: vec![Link::new(base_id.clone(), LinkKind::References)],
1532 };
1533 engine.remember(linker).await.unwrap();
1534
1535 let policy = NewAtom {
1537 world: WorldKey::new("w1"),
1538 worker: WorkerKey::new("w1-worker"),
1539 kind: AtomKind::Reflection,
1540 timestamp: Timestamp::new("2024-01-03T00:00:00Z"),
1541 importance: Importance::new(0.5).unwrap(),
1542 payload_json: "policy".into(),
1543 vector: None,
1544 flags: vec![],
1545 labels: vec!["policy:retention_days=0".into()],
1546 links: vec![],
1547 };
1548 engine.remember(policy).await.unwrap();
1549
1550 let evicted = engine.gc_once(None).unwrap();
1552 assert_eq!(evicted, 0);
1553 let replayed = engine
1554 .replay(WorldKey::new("w1"), AtomFilter::default())
1555 .await
1556 .unwrap();
1557 assert_eq!(replayed.len(), 3);
1558 }
1559
1560 #[tokio::test(flavor = "current_thread")]
1561 async fn gc_applies_retention_and_min_importance() {
1562 let engine = engine_components();
1563 let old_low = NewAtom {
1564 world: WorldKey::new("w1"),
1565 worker: WorkerKey::new("w1-worker"),
1566 kind: AtomKind::Observation,
1567 timestamp: Timestamp::new("2020-01-01T00:00:00Z"),
1568 importance: Importance::new(0.1).unwrap(),
1569 payload_json: "old".into(),
1570 vector: None,
1571 flags: vec![],
1572 labels: vec![],
1573 links: vec![],
1574 };
1575 let recent = new_atom_with("2024-01-01T00:00:00Z", "new", "recent");
1576 engine.remember(old_low).await.unwrap();
1577 engine.remember(recent).await.unwrap();
1578 let policy = NewAtom {
1579 world: WorldKey::new("w1"),
1580 worker: WorkerKey::new("w1-worker"),
1581 kind: AtomKind::Reflection,
1582 timestamp: Timestamp::new("2024-01-02T00:00:00Z"),
1583 importance: Importance::new(0.5).unwrap(),
1584 payload_json: "policy".into(),
1585 vector: None,
1586 flags: vec![],
1587 labels: vec![
1588 "policy:retention_days=365".into(),
1589 "policy:min_importance=0.2".into(),
1590 ],
1591 links: vec![],
1592 };
1593 engine.remember(policy).await.unwrap();
1594
1595 let evicted = engine.gc_once(None).unwrap();
1596 assert_eq!(evicted, 1);
1597 let remaining = engine
1598 .replay(WorldKey::new("w1"), AtomFilter::default())
1599 .await
1600 .unwrap();
1601 assert_eq!(remaining.len(), 2);
1602 }
1603
1604 #[tokio::test(flavor = "current_thread")]
1605 async fn contradicted_atoms_downranked_but_present() {
1606 let engine = engine_components();
1607 let base = new_atom_with("2024-01-01T00:00:00Z", "p", "base");
1608 let base_id = engine.remember(base).await.expect("base");
1609
1610 let contradictor = NewAtom {
1611 world: WorldKey::new("w1"),
1612 worker: WorkerKey::new("worker-2"),
1613 kind: AtomKind::Observation,
1614 timestamp: Timestamp::new("2024-01-01T00:00:10Z"),
1615 importance: Importance::new(0.5).unwrap(),
1616 payload_json: "p-alt".into(),
1617 vector: None,
1618 flags: vec![],
1619 labels: vec![],
1620 links: vec![Link::new(base_id.clone(), LinkKind::Contradicts)],
1621 };
1622 let contra_id = engine.remember(contradictor).await.expect("contradictor");
1623
1624 let question = Question {
1625 world: WorldKey::new("w1"),
1626 text: "p".into(),
1627 filter: None,
1628 };
1629 let answer = engine.ask(question).await.expect("ask");
1630 let ids: Vec<_> = answer
1631 .supporting_atoms
1632 .iter()
1633 .map(|a| a.id().clone())
1634 .collect();
1635 assert_eq!(ids.first(), Some(&contra_id));
1636 assert!(ids.contains(&base_id));
1637 }
1638
1639 #[tokio::test(flavor = "current_thread")]
1640 async fn suspicious_atoms_are_quarantined_and_excluded() {
1641 let engine = engine_components();
1642 let suspicious = NewAtom {
1643 world: WorldKey::new("wq"),
1644 worker: WorkerKey::new("lowtrust-bot"),
1645 kind: AtomKind::Observation,
1646 timestamp: Timestamp::new(""),
1647 importance: Importance::new(0.9).unwrap(),
1648 payload_json: "nan value".into(),
1649 vector: None,
1650 flags: vec![],
1651 labels: vec![],
1652 links: vec![],
1653 };
1654 let _ = engine.remember(suspicious).await.expect("quarantine");
1655
1656 let q = Question {
1657 world: WorldKey::new("wq"),
1658 text: "nan".into(),
1659 filter: None,
1660 };
1661 let answer = engine.ask(q).await.expect("ask");
1662 assert!(
1663 answer.supporting_atoms.is_empty(),
1664 "quarantine filtered for normal world"
1665 );
1666
1667 let q2 = Question {
1668 world: WorldKey::new("quarantine:wq"),
1669 text: "nan".into(),
1670 filter: None,
1671 };
1672 let answer_q = engine.ask(q2).await.expect("ask quarantine");
1673 assert_eq!(answer_q.supporting_atoms.len(), 1);
1674 assert!(answer_q.supporting_atoms[0]
1675 .flags()
1676 .iter()
1677 .any(|f| f == "suspect"));
1678 }
1679
1680 #[tokio::test(flavor = "current_thread")]
1681 async fn manage_world_create_adds_world_and_meta() {
1682 let engine = engine_components();
1683 let world = WorldKey::new("tenant:demo/new");
1684 engine
1685 .manage_world(WorldAction::Create(WorldMeta {
1686 world: world.clone(),
1687 description: Some("demo".into()),
1688 labels: vec!["team:demo".into()],
1689 }))
1690 .await
1691 .expect("manage world");
1692 let worlds = engine.worlds().unwrap();
1693 assert!(worlds.contains(&world));
1694 assert!(!engine.world_archived(&world).unwrap());
1695 }
1696
1697 #[tokio::test(flavor = "current_thread")]
1698 async fn manage_world_archive_and_resume_flip_status() {
1699 let engine = engine_components();
1700 let world = WorldKey::new("tenant:demo/archive");
1701 engine
1702 .manage_world(WorldAction::Create(WorldMeta {
1703 world: world.clone(),
1704 description: None,
1705 labels: vec![],
1706 }))
1707 .await
1708 .unwrap();
1709 engine
1710 .manage_world(WorldAction::Archive(world.clone()))
1711 .await
1712 .unwrap();
1713 assert!(engine.world_archived(&world).unwrap());
1714 engine
1715 .manage_world(WorldAction::Resume(world.clone()))
1716 .await
1717 .unwrap();
1718 assert!(!engine.world_archived(&world).unwrap());
1719 }
1720
1721 #[tokio::test(flavor = "current_thread")]
1722 async fn worlds_excludes_archived_by_default() {
1723 let engine = engine_components();
1724 let active = WorldKey::new("tenant:demo/active");
1725 let archived = WorldKey::new("tenant:demo/archived");
1726 engine
1727 .manage_world(WorldAction::Create(WorldMeta {
1728 world: active.clone(),
1729 description: None,
1730 labels: vec![],
1731 }))
1732 .await
1733 .unwrap();
1734 engine
1735 .manage_world(WorldAction::Create(WorldMeta {
1736 world: archived.clone(),
1737 description: None,
1738 labels: vec![],
1739 }))
1740 .await
1741 .unwrap();
1742 engine
1743 .manage_world(WorldAction::Archive(archived.clone()))
1744 .await
1745 .unwrap();
1746 let worlds = engine.worlds().unwrap();
1747 assert!(worlds.contains(&active));
1748 assert!(!worlds.contains(&archived));
1749
1750 let with_archived = engine.worlds_filtered(true).unwrap();
1751 assert!(with_archived.contains(&archived));
1752 }
1753
1754 #[tokio::test(flavor = "current_thread")]
1755 async fn policy_atoms_respect_policy_world_conventions() {
1756 let engine = engine_components();
1757 let world = WorldKey::new("tenant:acme/alpha");
1758 engine
1759 .manage_world(WorldAction::Create(WorldMeta {
1760 world: world.clone(),
1761 description: Some("alpha".into()),
1762 labels: vec!["team:alpha".into()],
1763 }))
1764 .await
1765 .unwrap();
1766
1767 let per_world_policy = NewAtom {
1768 world: WorldKey::new(format!("policy:{}", world.0)),
1769 worker: WorkerKey::new("policy-tester"),
1770 kind: AtomKind::Reflection,
1771 timestamp: Timestamp::new("2024-02-01T00:00:00Z"),
1772 importance: Importance::clamped(0.2),
1773 payload_json: "policy".into(),
1774 vector: None,
1775 flags: vec![],
1776 labels: vec![
1777 "policy:retention_days=1".into(),
1778 "policy:min_importance=0.4".into(),
1779 "policy:replication_allow=tenant:acme/".into(),
1780 ],
1781 links: vec![],
1782 };
1783 engine.remember(per_world_policy).await.unwrap();
1784
1785 let tenant_policy = NewAtom {
1786 world: WorldKey::new("tenant:acme/policy"),
1787 worker: WorkerKey::new("policy-tester"),
1788 kind: AtomKind::Reflection,
1789 timestamp: Timestamp::new("2024-02-02T00:00:00Z"),
1790 importance: Importance::clamped(0.2),
1791 payload_json: "policy".into(),
1792 vector: None,
1793 flags: vec![],
1794 labels: vec![
1795 "policy:replication_deny=tenant:acme/private".into(),
1796 "policy:retention_min_days=7".into(),
1797 ],
1798 links: vec![],
1799 };
1800 engine.remember(tenant_policy).await.unwrap();
1801
1802 let policy = engine.parse_policy(&[], &world);
1803 assert_eq!(policy.retention_days, Some(7));
1804 assert_eq!(policy.min_importance, Some(0.4));
1805 assert!(policy
1806 .replication_allow
1807 .contains(&"tenant:acme/".to_string()));
1808 assert!(policy
1809 .replication_deny
1810 .contains(&"tenant:acme/private".to_string()));
1811 }
1812
1813 #[test]
1814 fn embedder_change_triggers_index_metadata_update() {
1815 let engine = engine_components();
1816 let world = WorldKey::new("w-meta");
1817 engine.ensure_index_entry(&world);
1818 {
1819 let mut guard = engine.index_state.lock();
1820 if let Some(meta) = guard.get_mut(&world) {
1821 meta.embedder_version = "old".into();
1822 meta.ready = true;
1823 meta.progress = 0.0;
1824 meta.rebuilding = true;
1825 meta.started_at = Some(Timestamp::new("2024-01-01T00:00:00Z"));
1826 }
1827 }
1828 engine.maybe_rebuild_index(&world);
1829 let meta = engine
1830 .index_state
1831 .lock()
1832 .get(&world)
1833 .cloned()
1834 .expect("meta");
1835 assert!(meta.ready);
1836 assert_ne!(meta.embedder_version, "old");
1837 assert!(meta.version >= 1);
1838 assert_eq!(meta.progress, 1.0);
1839 assert!(!meta.rebuilding);
1840 }
1841
1842 #[test]
1843 fn rebuild_lag_reports_inflight() {
1844 let engine = engine_components();
1845 let world = WorldKey::new("w-lag");
1846 {
1847 let mut guard = engine.index_state.lock();
1848 guard.insert(
1849 world.clone(),
1850 IndexMetadata {
1851 world: world.clone(),
1852 version: 1,
1853 embedder_version: "v1".into(),
1854 last_rebuilt: Timestamp::new("2024-01-01T00:00:00Z"),
1855 ready: false,
1856 rebuilding: true,
1857 progress: 0.3,
1858 started_at: Some(Timestamp::new("2024-01-02T00:00:00Z")),
1859 last_progress: Some(Timestamp::new("2024-01-02T00:00:00Z")),
1860 },
1861 );
1862 }
1863 let lag = engine.max_index_rebuild_lag_ms();
1864 assert!(lag.is_some());
1865 assert!(lag.unwrap() > 0);
1866 }
1867}
1868fn parse_ts(ts: &Timestamp) -> Result<OffsetDateTime> {
1869 OffsetDateTime::parse(&ts.0, &Rfc3339)
1870 .map_err(|e| DwbaseError::InvalidInput(format!("invalid timestamp {}: {}", ts.0, e)))
1871}