component_dwbase/
lib.rs

1//! component-dwbase: WASI component exposing DWBase operations with local persistence.
2
3use std::collections::HashMap;
4use std::collections::HashSet;
5use std::collections::VecDeque;
6use std::fs;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Mutex;
9use std::time::{Duration, Instant};
10use std::{path::PathBuf, sync::Arc};
11
12use dwbase_core::{
13    Atom, AtomId, AtomKind, Importance, Link, LinkKind, Timestamp, WorkerKey, WorldKey,
14};
15use dwbase_embedder_dummy::DummyEmbedder;
16use dwbase_engine::{AtomFilter, DWBaseEngine, NewAtom, Question, StorageEngine, StreamEngine};
17use dwbase_security::{Capabilities, LocalGatekeeper, TrustStore};
18use dwbase_swarm_nats::replication::Replicator;
19use dwbase_swarm_nats::replication::WorldAccessPolicy;
20#[cfg(feature = "nats")]
21use dwbase_swarm_nats::swarm::NatsBus;
22use dwbase_swarm_nats::swarm::{MockBus, NatsSwarmTransport};
23use dwbase_swarm_nats::world_events::{
24    decode_event_batch, world_events_subject, WorldEventBroadcaster,
25};
26#[cfg(feature = "nats")]
27use dwbase_swarm_nats::AsyncNats;
28use dwbase_swarm_nats::{
29    now_rfc3339, start_presence_loop, MockNats, NatsClient, NodeHello, PeerTable,
30};
31use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
32use once_cell::sync::OnceCell;
33use parking_lot::Mutex as ParkingMutex;
34use serde::{Deserialize, Serialize};
35use time::OffsetDateTime;
36use wit_bindgen::generate;
37
38generate!({ path: "wit/dwbase-core.wit", world: "core" });
39
40use exports::dwbase::core::engine::{
41    self, Answer, Atom as WitAtom, AtomFilter as WitAtomFilter, AtomKind as WitAtomKind,
42    NewAtom as WitNewAtom, Question as WitQuestion,
43};
44
45type ComponentEngine =
46    DWBaseEngine<FsStorage, NoVector, LocalStream, LocalGatekeeper, DummyEmbedder>;
47
48static ENGINE: OnceCell<Mutex<ComponentEngine>> = OnceCell::new();
49static PEERS: OnceCell<PeerTable> = OnceCell::new();
50static SWARM: OnceCell<Arc<Replicator>> = OnceCell::new();
51static STREAM: OnceCell<LocalStream> = OnceCell::new();
52static BUS: OnceCell<Arc<dyn dwbase_swarm_nats::swarm::SwarmBus>> = OnceCell::new();
53static BROADCASTER: OnceCell<WorldEventBroadcaster> = OnceCell::new();
54static EVENT_SUBS: OnceCell<ParkingMutex<HashSet<String>>> = OnceCell::new();
55static LAST_REMOTE_INGEST_MS: AtomicU64 = AtomicU64::new(0);
56static PROM_HANDLE: OnceCell<Option<PrometheusHandle>> = OnceCell::new();
57
58fn now_ms() -> u64 {
59    (std::time::SystemTime::now()
60        .duration_since(std::time::UNIX_EPOCH)
61        .unwrap_or_default()
62        .as_millis()) as u64
63}
64
65fn install_metrics_recorder() -> Option<&'static PrometheusHandle> {
66    PROM_HANDLE
67        .get_or_init(|| PrometheusBuilder::new().install_recorder().ok())
68        .as_ref()
69}
70
71#[derive(Default)]
72struct ParsedMetrics {
73    counters: Vec<engine::MetricPoint>,
74    gauges: Vec<engine::MetricPoint>,
75    histograms: Vec<engine::HistogramMetric>,
76}
77
78fn parse_labels(raw: &str) -> Vec<engine::MetricLabel> {
79    if raw.trim().is_empty() {
80        return Vec::new();
81    }
82    let mut labels = Vec::new();
83    for pair in raw.split(',') {
84        if let Some((k, v)) = pair.split_once('=') {
85            let val = v.trim().trim_matches('"').replace("\\\"", "\"");
86            labels.push(engine::MetricLabel {
87                key: k.to_string(),
88                value: val,
89            });
90        }
91    }
92    labels.sort_by(|a, b| a.key.cmp(&b.key));
93    labels
94}
95
96fn labels_key(labels: &[engine::MetricLabel]) -> String {
97    labels
98        .iter()
99        .map(|l| format!("{}={}", l.key, l.value))
100        .collect::<Vec<_>>()
101        .join("|")
102}
103
104#[derive(Default)]
105struct HistAccum {
106    labels: Vec<engine::MetricLabel>,
107    buckets: Vec<engine::HistogramBucket>,
108    sum: Option<f64>,
109    count: Option<f64>,
110}
111
112fn parse_prometheus(text: &str) -> ParsedMetrics {
113    use std::collections::HashMap;
114
115    let mut types: HashMap<String, String> = HashMap::new();
116    for line in text.lines() {
117        if let Some(rest) = line.strip_prefix("# TYPE ") {
118            if let Some((name, ty)) = rest.trim().split_once(' ') {
119                types.insert(name.trim().to_string(), ty.trim().to_string());
120            }
121        }
122    }
123
124    let mut parsed = ParsedMetrics::default();
125    let mut hist_map: HashMap<(String, String), HistAccum> = HashMap::new();
126
127    for line in text.lines() {
128        let line = line.trim();
129        if line.is_empty() || line.starts_with('#') {
130            continue;
131        }
132        let Some((name_part, val_part)) = line.split_once(' ') else {
133            continue;
134        };
135        let value: f64 = match val_part.trim().parse() {
136            Ok(v) => v,
137            Err(_) => continue,
138        };
139
140        let (name, labels) = if let Some((n, raw_labels)) = name_part.split_once('{') {
141            let clean = raw_labels.trim_end_matches('}');
142            (n.to_string(), parse_labels(clean))
143        } else {
144            (name_part.to_string(), Vec::new())
145        };
146
147        // Histogram buckets/sum/count use suffixed names; group by base name + labels (minus le).
148        if name.ends_with("_bucket") {
149            let base = name.trim_end_matches("_bucket").to_string();
150            let mut base_labels = labels.clone();
151            let le_idx = base_labels.iter().position(|l| l.key == "le");
152            let le = le_idx
153                .and_then(|idx| {
154                    base_labels
155                        .get(idx)
156                        .and_then(|l| l.value.parse::<f64>().ok())
157                })
158                .unwrap_or(f64::INFINITY);
159            if let Some(idx) = le_idx {
160                base_labels.remove(idx);
161            }
162            let key = labels_key(&base_labels);
163            let entry = hist_map
164                .entry((base.clone(), key))
165                .or_insert_with(|| HistAccum {
166                    labels: base_labels.clone(),
167                    ..Default::default()
168                });
169            entry
170                .buckets
171                .push(engine::HistogramBucket { le, count: value });
172            continue;
173        }
174        if name.ends_with("_sum") {
175            let base = name.trim_end_matches("_sum").to_string();
176            let key = labels_key(&labels);
177            let entry = hist_map
178                .entry((base.clone(), key))
179                .or_insert_with(|| HistAccum {
180                    labels: labels.clone(),
181                    ..Default::default()
182                });
183            entry.sum = Some(value);
184            continue;
185        }
186        if name.ends_with("_count") {
187            let base = name.trim_end_matches("_count").to_string();
188            let key = labels_key(&labels);
189            let entry = hist_map
190                .entry((base.clone(), key))
191                .or_insert_with(|| HistAccum {
192                    labels: labels.clone(),
193                    ..Default::default()
194                });
195            entry.count = Some(value);
196            continue;
197        }
198
199        match types.get(&name).map(|s| s.as_str()) {
200            Some("counter") => parsed.counters.push(engine::MetricPoint {
201                name: name.clone(),
202                labels: labels.clone(),
203                value,
204            }),
205            Some("gauge") => parsed.gauges.push(engine::MetricPoint {
206                name: name.clone(),
207                labels: labels.clone(),
208                value,
209            }),
210            _ => parsed.gauges.push(engine::MetricPoint {
211                name: name.clone(),
212                labels: labels.clone(),
213                value,
214            }),
215        }
216    }
217
218    for ((name, _key), mut acc) in hist_map {
219        acc.buckets
220            .sort_by(|a, b| a.le.partial_cmp(&b.le).unwrap_or(std::cmp::Ordering::Equal));
221        parsed.histograms.push(engine::HistogramMetric {
222            name,
223            labels: acc.labels,
224            buckets: acc.buckets,
225            sum: acc.sum.unwrap_or(0.0),
226            count: acc.count.unwrap_or(0.0),
227        });
228    }
229
230    parsed
231}
232
233fn max_disk_bytes() -> u64 {
234    std::env::var("DWBASE_MAX_DISK_MB")
235        .ok()
236        .and_then(|v| v.parse::<u64>().ok())
237        .map(|mb| mb * 1024 * 1024)
238        .unwrap_or(0)
239}
240
241fn health_disk_warn_percent() -> f32 {
242    std::env::var("DWBASE_HEALTH_DISK_WARN_PCT")
243        .ok()
244        .and_then(|v| v.parse::<f32>().ok())
245        .unwrap_or(80.0)
246}
247
248fn health_disk_degraded_percent() -> f32 {
249    std::env::var("DWBASE_HEALTH_DISK_DEGRADED_PCT")
250        .ok()
251        .and_then(|v| v.parse::<f32>().ok())
252        .unwrap_or(90.0)
253}
254
255fn health_disable_fs_stats() -> bool {
256    std::env::var("DWBASE_HEALTH_DISABLE_FS_STATS").is_ok()
257}
258
259fn index_rebuild_warn_ms() -> u64 {
260    std::env::var("DWBASE_INDEX_REBUILD_WARN_SECS")
261        .ok()
262        .and_then(|v| v.parse::<u64>().ok())
263        .unwrap_or(60)
264        .saturating_mul(1000)
265}
266
267fn fs_capacity_bytes(path: &PathBuf) -> Option<(u64, u64)> {
268    if health_disable_fs_stats() {
269        return None;
270    }
271    #[cfg(not(target_arch = "wasm32"))]
272    {
273        let total = fs2::total_space(path).ok()?;
274        let free = fs2::available_space(path).ok()?;
275        Some((total, free))
276    }
277    #[cfg(target_arch = "wasm32")]
278    {
279        let _ = path;
280        None
281    }
282}
283
284fn dir_size_bytes(path: &PathBuf) -> u64 {
285    let mut total = 0u64;
286    let Ok(entries) = fs::read_dir(path) else {
287        return 0;
288    };
289    for ent in entries.flatten() {
290        let p = ent.path();
291        if let Ok(md) = ent.metadata() {
292            if md.is_file() {
293                total = total.saturating_add(md.len());
294            } else if md.is_dir() {
295                total = total.saturating_add(dir_size_bytes(&p));
296            }
297        }
298    }
299    total
300}
301
302fn compute_health(
303    engine: &DWBaseEngine<FsStorage, NoVector, LocalStream, LocalGatekeeper, DummyEmbedder>,
304) -> engine::HealthSnapshot {
305    let storage_ok = engine.storage_ready();
306    let index_ok = engine.index_status().iter().all(|m| m.ready);
307
308    let data = data_dir();
309    let used_bytes = dir_size_bytes(&data);
310    let configured_total_bytes = max_disk_bytes();
311    let mut disk_total_bytes = 0u64;
312    let mut disk_free_bytes = 0u64;
313    let mut disk_used_percent = 0.0f32;
314    let mut pressure_basis = "unknown";
315
316    if configured_total_bytes > 0 {
317        disk_total_bytes = configured_total_bytes;
318        disk_free_bytes = configured_total_bytes.saturating_sub(used_bytes);
319        disk_used_percent = (used_bytes as f32 / configured_total_bytes as f32) * 100.0;
320        pressure_basis = "configured";
321    } else if let Some((fs_total, fs_free)) = fs_capacity_bytes(&data) {
322        disk_total_bytes = fs_total;
323        disk_free_bytes = fs_free;
324        if fs_total > 0 {
325            disk_used_percent =
326                ((fs_total.saturating_sub(fs_free)) as f32 / fs_total as f32) * 100.0;
327        }
328        pressure_basis = "filesystem";
329    }
330
331    let warn_pct = health_disk_warn_percent();
332    let degraded_pct = health_disk_degraded_percent();
333    let disk_pressure = if disk_total_bytes == 0 {
334        "unknown".to_string()
335    } else if disk_used_percent >= degraded_pct {
336        "degraded".to_string()
337    } else if disk_used_percent >= warn_pct {
338        "warn".to_string()
339    } else {
340        "ok".to_string()
341    };
342
343    let last_remote = LAST_REMOTE_INGEST_MS.load(Ordering::Relaxed);
344    let lag_ms = if last_remote == 0 {
345        0
346    } else {
347        now_ms().saturating_sub(last_remote)
348    };
349
350    // Placeholder quarantine metric.
351    let quarantine_count = 0u64;
352
353    if disk_total_bytes > 0 {
354        let used_for_metrics = if pressure_basis == "filesystem" {
355            disk_total_bytes.saturating_sub(disk_free_bytes)
356        } else {
357            used_bytes
358        };
359        dwbase_metrics::record_disk_usage(used_for_metrics, disk_total_bytes);
360    }
361    dwbase_metrics::record_sync_lag(Duration::from_millis(lag_ms));
362    dwbase_metrics::record_quarantine_count(quarantine_count);
363
364    let mut status = "ready".to_string();
365    let mut message = "ok".to_string();
366    if let Some(lag) = engine.max_index_rebuild_lag_ms() {
367        if lag > index_rebuild_warn_ms() {
368            status = "degraded".into();
369            message = format!("index rebuild lag {lag}ms");
370        }
371    }
372    if !storage_ok || !index_ok {
373        status = "degraded".into();
374        message = "storage or index not ready".into();
375    } else if disk_pressure == "degraded" {
376        status = "degraded".into();
377        message = match pressure_basis {
378            "filesystem" => "disk pressure degraded (filesystem)".into(),
379            "configured" => "disk pressure degraded (configured capacity)".into(),
380            _ => "disk pressure degraded".into(),
381        };
382    } else if disk_pressure == "warn" {
383        message = match pressure_basis {
384            "filesystem" => "disk pressure warn (filesystem)".into(),
385            "configured" => "disk pressure warn (configured capacity)".into(),
386            _ => "disk pressure warn".into(),
387        };
388    }
389
390    engine::HealthSnapshot {
391        status,
392        message,
393        storage_ok,
394        index_ok,
395        disk_used_bytes: used_bytes,
396        disk_free_bytes,
397        disk_total_bytes,
398        disk_used_percent,
399        disk_pressure,
400        sync_lag_ms: lag_ms,
401        quarantine_count,
402    }
403}
404
405fn data_dir() -> PathBuf {
406    std::env::var("DWBASE_DATA_DIR")
407        .map(PathBuf::from)
408        .unwrap_or_else(|_| PathBuf::from("./dwbase-data"))
409}
410
411fn tenant_id() -> String {
412    std::env::var("DWBASE_TENANT_ID")
413        .or_else(|_| std::env::var("GREENTIC_TENANT_ID"))
414        .unwrap_or_else(|_| "default".into())
415}
416
417fn tenant_prefix(id: &str) -> String {
418    format!("tenant:{id}/")
419}
420
421fn tool_error(
422    code: &str,
423    message: impl Into<String>,
424    details_json: Option<String>,
425) -> engine::ToolError {
426    engine::ToolError {
427        code: code.to_string(),
428        message: message.into(),
429        details_json,
430    }
431}
432
433fn err_invalid_input(message: impl Into<String>) -> engine::ToolError {
434    tool_error("invalid_input", message, None)
435}
436
437fn err_capability_denied(message: impl Into<String>) -> engine::ToolError {
438    tool_error("capability_denied", message, None)
439}
440
441fn err_invalid_handle(message: impl Into<String>) -> engine::ToolError {
442    tool_error("invalid_handle", message, None)
443}
444
445fn err_storage(message: impl Into<String>) -> engine::ToolError {
446    tool_error("storage_error", message, None)
447}
448
449fn map_validation_error(message: String) -> engine::ToolError {
450    let lower = message.to_ascii_lowercase();
451    if lower.starts_with("write denied") || lower.starts_with("read denied") {
452        return err_capability_denied(message);
453    }
454    if lower.starts_with("payload too large") {
455        return tool_error("payload_too_large", message, None);
456    }
457    if lower.starts_with("importance") {
458        return tool_error("importance_cap", message, None);
459    }
460    if lower.starts_with("kind") {
461        return tool_error("kind_not_allowed", message, None);
462    }
463    if lower.starts_with("label") || lower.contains("labels not permitted") {
464        return tool_error("label_not_allowed", message, None);
465    }
466    err_invalid_input(message)
467}
468
469fn effective_worker(input: &str) -> String {
470    std::env::var("DWBASE_WORKER_ID")
471        .or_else(|_| std::env::var("GREENTIC_WORKER_ID"))
472        .unwrap_or_else(|_| {
473            if input.trim().is_empty() {
474                "llm".into()
475            } else {
476                input.to_string()
477            }
478        })
479}
480
481fn subscription_patterns() -> Vec<String> {
482    let raw = std::env::var("DWBASE_SUBSCRIBE_WORLDS")
483        .or_else(|_| std::env::var("DWBASE_SUBSCRIBE_PATTERNS"))
484        .unwrap_or_default();
485    raw.split(',')
486        .map(|s| s.trim().to_string())
487        .filter(|s| !s.is_empty())
488        .collect()
489}
490
491fn observe_queue_capacity() -> usize {
492    std::env::var("DWBASE_OBSERVE_QUEUE_CAPACITY")
493        .ok()
494        .and_then(|v| v.parse::<usize>().ok())
495        .filter(|v| *v > 0)
496        .unwrap_or(10_000)
497}
498
499fn observe_drop_policy() -> ObserveDropPolicy {
500    let raw = std::env::var("DWBASE_OBSERVE_DROP_POLICY").unwrap_or_else(|_| "drop_oldest".into());
501    match raw.to_ascii_lowercase().as_str() {
502        "drop_newest" | "newest" => ObserveDropPolicy::DropNewest,
503        _ => ObserveDropPolicy::DropOldest,
504    }
505}
506
507fn observe_durable_enabled() -> bool {
508    std::env::var("DWBASE_OBSERVE_DURABLE").is_ok()
509}
510
511fn observe_durable_catchup_limit() -> usize {
512    std::env::var("DWBASE_OBSERVE_DURABLE_CATCHUP_LIMIT")
513        .ok()
514        .and_then(|v| v.parse::<usize>().ok())
515        .filter(|v| *v > 0)
516        .unwrap_or(1_000)
517}
518
519#[derive(Clone, Debug)]
520struct SecurityConfig {
521    tenant_id: String,
522    enforce_tenant_namespace: bool,
523    allow_read_worlds: Vec<String>,
524    allow_write_worlds: Vec<String>,
525    allow_read_prefixes: Vec<String>,
526    allow_write_prefixes: Vec<String>,
527    payload_max_bytes: usize,
528    importance_cap: f32,
529    allowed_kinds: Vec<AtomKind>,
530    allowed_labels: Vec<String>,
531    allow_policy_labels: bool,
532}
533
534impl Default for SecurityConfig {
535    fn default() -> Self {
536        Self {
537            tenant_id: tenant_id(),
538            enforce_tenant_namespace: true,
539            allow_read_worlds: Vec::new(),
540            allow_write_worlds: Vec::new(),
541            allow_read_prefixes: Vec::new(),
542            allow_write_prefixes: Vec::new(),
543            payload_max_bytes: 64 * 1024,
544            importance_cap: 0.7,
545            allowed_kinds: vec![AtomKind::Observation, AtomKind::Reflection],
546            allowed_labels: Vec::new(),
547            allow_policy_labels: false,
548        }
549    }
550}
551
552fn parse_csv_env(key: &str) -> Vec<String> {
553    std::env::var(key)
554        .ok()
555        .unwrap_or_default()
556        .split(',')
557        .map(|s| s.trim().to_string())
558        .filter(|s| !s.is_empty())
559        .collect()
560}
561
562impl SecurityConfig {
563    fn from_env() -> Self {
564        let mut cfg = SecurityConfig::default();
565        if let Ok(v) = std::env::var("DWBASE_ENFORCE_TENANT_NAMESPACE") {
566            cfg.enforce_tenant_namespace = v != "0" && !v.eq_ignore_ascii_case("false");
567        }
568        cfg.allow_read_worlds = parse_csv_env("DWBASE_ALLOW_READ_WORLDS");
569        cfg.allow_write_worlds = parse_csv_env("DWBASE_ALLOW_WRITE_WORLDS");
570        cfg.allow_read_prefixes = parse_csv_env("DWBASE_ALLOW_READ_PREFIXES");
571        cfg.allow_write_prefixes = parse_csv_env("DWBASE_ALLOW_WRITE_PREFIXES");
572
573        if let Ok(v) = std::env::var("DWBASE_MAX_PAYLOAD_BYTES") {
574            if let Ok(n) = v.parse::<usize>() {
575                cfg.payload_max_bytes = n;
576            }
577        }
578        if let Ok(v) = std::env::var("DWBASE_IMPORTANCE_CAP") {
579            if let Ok(n) = v.parse::<f32>() {
580                cfg.importance_cap = n;
581            }
582        }
583        let kinds = parse_csv_env("DWBASE_ALLOWED_KINDS");
584        if !kinds.is_empty() {
585            cfg.allowed_kinds = kinds
586                .iter()
587                .filter_map(|k| match k.to_ascii_lowercase().as_str() {
588                    "observation" => Some(AtomKind::Observation),
589                    "reflection" => Some(AtomKind::Reflection),
590                    "plan" => Some(AtomKind::Plan),
591                    "action" => Some(AtomKind::Action),
592                    "message" => Some(AtomKind::Message),
593                    _ => None,
594                })
595                .collect();
596        }
597        cfg.allowed_labels = parse_csv_env("DWBASE_ALLOWED_LABELS");
598        cfg.allow_policy_labels = std::env::var("DWBASE_ALLOW_POLICY_LABELS").is_ok();
599        cfg
600    }
601}
602
603fn matches_prefixes(prefixes: &[String], world: &str) -> bool {
604    prefixes.iter().any(|p| world.starts_with(p))
605}
606
607fn validate_world_for_write(world: &str) -> Result<(), String> {
608    let cfg = SecurityConfig::from_env();
609    if cfg.allow_write_worlds.iter().any(|w| w == world) {
610        return Ok(());
611    }
612    if matches_prefixes(&cfg.allow_write_prefixes, world) {
613        return Ok(());
614    }
615    if cfg.enforce_tenant_namespace {
616        let prefix = tenant_prefix(&cfg.tenant_id);
617        if !world.starts_with(&prefix) {
618            return Err(format!(
619                "write denied: world must be within {prefix} (set DWBASE_ALLOW_WRITE_WORLDS/DWBASE_ALLOW_WRITE_PREFIXES to override)"
620            ));
621        }
622    }
623    Ok(())
624}
625
626fn validate_world_for_read(world: &str) -> Result<(), String> {
627    let cfg = SecurityConfig::from_env();
628    if cfg.allow_read_worlds.iter().any(|w| w == world) {
629        return Ok(());
630    }
631    if matches_prefixes(&cfg.allow_read_prefixes, world) {
632        return Ok(());
633    }
634    if cfg.enforce_tenant_namespace {
635        let prefix = tenant_prefix(&cfg.tenant_id);
636        if !world.starts_with(&prefix) {
637            return Err(format!(
638                "read denied: world must be within {prefix} (set DWBASE_ALLOW_READ_WORLDS/DWBASE_ALLOW_READ_PREFIXES to override)"
639            ));
640        }
641    }
642    Ok(())
643}
644
645fn validate_new_atom(new_atom: &NewAtom) -> Result<(), String> {
646    let cfg = SecurityConfig::from_env();
647    validate_world_for_write(&new_atom.world.0)?;
648    if new_atom.payload_json.len() > cfg.payload_max_bytes {
649        return Err(format!(
650            "payload too large: {} bytes > max {} (DWBASE_MAX_PAYLOAD_BYTES)",
651            new_atom.payload_json.len(),
652            cfg.payload_max_bytes
653        ));
654    }
655    if new_atom.importance.get() > cfg.importance_cap {
656        return Err(format!(
657            "importance {} exceeds cap {} (DWBASE_IMPORTANCE_CAP)",
658            new_atom.importance.get(),
659            cfg.importance_cap
660        ));
661    }
662    if !cfg.allowed_kinds.contains(&new_atom.kind) {
663        return Err(format!(
664            "kind {:?} not permitted (DWBASE_ALLOWED_KINDS)",
665            new_atom.kind
666        ));
667    }
668    for label in &new_atom.labels {
669        if cfg.allow_policy_labels && label.starts_with("policy:") {
670            continue;
671        }
672        if !cfg.allowed_labels.is_empty() && cfg.allowed_labels.contains(label) {
673            continue;
674        }
675        if cfg.allowed_labels.is_empty() {
676            return Err("labels not permitted by default (set DWBASE_ALLOWED_LABELS or DWBASE_ALLOW_POLICY_LABELS)".into());
677        }
678        return Err(format!("label not permitted: {label}"));
679    }
680    Ok(())
681}
682
683fn init_engine_v2() -> Result<&'static Mutex<ComponentEngine>, engine::ToolError> {
684    ENGINE.get_or_try_init(|| {
685        let _ = SecurityConfig::from_env();
686        let _ = install_metrics_recorder();
687        let dir = data_dir();
688        let storage = FsStorage::new(dir.clone())
689            .map_err(|e| err_storage(format!("init storage failed: {e}")))?;
690        let vector = NoVector;
691        let stream = LocalStream::new();
692        let _ = STREAM.set(stream.clone());
693        let gatekeeper = LocalGatekeeper::new(Capabilities::default(), TrustStore::default());
694        let embedder = DummyEmbedder::new();
695        let engine = DWBaseEngine::new(storage, vector, stream, gatekeeper, embedder);
696        maybe_start_presence(&engine);
697        maybe_start_swarm(&engine);
698        Ok(Mutex::new(engine))
699    })
700}
701
702fn init_engine() -> &'static Mutex<ComponentEngine> {
703    init_engine_v2().expect("engine init")
704}
705
706fn node_id() -> String {
707    std::env::var("DWBASE_NODE_ID").unwrap_or_else(|_| "component-dwbase".into())
708}
709
710fn maybe_start_presence(
711    engine: &DWBaseEngine<FsStorage, NoVector, LocalStream, LocalGatekeeper, DummyEmbedder>,
712) {
713    // Start only when explicitly configured.
714    if std::env::var("NATS_URL").is_err()
715        && std::env::var("GREENTIC_NATS_URL").is_err()
716        && std::env::var("DWBASE_PRESENCE_MOCK").is_err()
717    {
718        return;
719    }
720    let table = PEERS.get_or_init(PeerTable::default);
721    let client: Arc<dyn NatsClient> = if std::env::var("DWBASE_PRESENCE_MOCK").is_ok() {
722        Arc::new(MockNats::default())
723    } else {
724        #[cfg(feature = "nats")]
725        {
726            std::env::var("NATS_URL")
727                .or_else(|_| std::env::var("GREENTIC_NATS_URL"))
728                .ok()
729                .and_then(|url| {
730                    AsyncNats::connect(&url)
731                        .ok()
732                        .map(|c| Arc::new(c) as Arc<dyn NatsClient>)
733                })
734                .unwrap_or_else(|| Arc::new(MockNats::default()))
735        }
736        #[cfg(not(feature = "nats"))]
737        {
738            Arc::new(MockNats::default())
739        }
740    };
741    let hello = hello_from_engine(engine);
742    let ttl = std::time::Duration::from_secs(30);
743    start_presence_loop(client, hello, table.clone(), ttl);
744}
745
746fn maybe_start_swarm(
747    engine: &DWBaseEngine<FsStorage, NoVector, LocalStream, LocalGatekeeper, DummyEmbedder>,
748) {
749    // Opt-in: start only when explicitly enabled or when subscription patterns are provided.
750    let patterns = subscription_patterns();
751    let enabled = std::env::var("DWBASE_SWARM_ENABLE").is_ok() || !patterns.is_empty();
752    if !enabled {
753        return;
754    }
755
756    let self_id = dwbase_swarm::PeerId::new(node_id());
757    let bus: Arc<dyn dwbase_swarm_nats::swarm::SwarmBus> =
758        if std::env::var("DWBASE_SWARM_MOCK").is_ok() {
759            Arc::new(MockBus::default()) as Arc<dyn dwbase_swarm_nats::swarm::SwarmBus>
760        } else {
761            #[cfg(feature = "nats")]
762            {
763                std::env::var("NATS_URL")
764                    .or_else(|_| std::env::var("GREENTIC_NATS_URL"))
765                    .ok()
766                    .and_then(|url| {
767                        NatsBus::connect(&url)
768                            .ok()
769                            .map(|b| Arc::new(b) as Arc<dyn dwbase_swarm_nats::swarm::SwarmBus>)
770                    })
771                    .unwrap_or_else(|| {
772                        Arc::new(MockBus::default()) as Arc<dyn dwbase_swarm_nats::swarm::SwarmBus>
773                    })
774            }
775            #[cfg(not(feature = "nats"))]
776            {
777                Arc::new(MockBus::default()) as Arc<dyn dwbase_swarm_nats::swarm::SwarmBus>
778            }
779        };
780
781    let _ = BUS.set(bus.clone());
782    let _ = EVENT_SUBS.set(ParkingMutex::new(HashSet::new()));
783    if std::env::var("DWBASE_OBSERVE_NATS_BROADCAST").is_ok() {
784        let _ = BROADCASTER.set(WorldEventBroadcaster::new(bus.clone(), node_id(), 200.0));
785    }
786
787    let transport = NatsSwarmTransport::new(bus, self_id, 200.0).expect("init swarm transport");
788    let policy: Arc<dyn WorldAccessPolicy> = Arc::new(ComponentWorldAccessPolicy::new(data_dir()));
789    let swarm_state = data_dir().join("swarm.json");
790    let replicator = Arc::new(
791        Replicator::with_policy_and_state(
792            transport,
793            patterns,
794            std::time::Duration::from_secs(30),
795            policy,
796            Some(swarm_state),
797            512,
798            std::time::Duration::from_secs(300),
799        )
800        .expect("init replicator"),
801    );
802    let _ = SWARM.set(replicator.clone());
803    start_replication_loop(engine, replicator);
804}
805
806#[derive(Clone)]
807struct ComponentWorldAccessPolicy {
808    data_dir: PathBuf,
809    cache: Arc<Mutex<PolicyCache>>,
810}
811
812struct PolicyCache {
813    last_loaded: std::time::Instant,
814    deny_prefixes: Vec<String>,
815    allow_prefixes: Vec<String>,
816    min_retention_days: Option<u64>,
817}
818
819impl Default for PolicyCache {
820    fn default() -> Self {
821        Self {
822            last_loaded: std::time::Instant::now() - std::time::Duration::from_secs(3600),
823            deny_prefixes: Vec::new(),
824            allow_prefixes: Vec::new(),
825            min_retention_days: None,
826        }
827    }
828}
829
830impl ComponentWorldAccessPolicy {
831    fn new(data_dir: PathBuf) -> Self {
832        Self {
833            data_dir,
834            cache: Arc::new(Mutex::new(PolicyCache::default())),
835        }
836    }
837
838    fn refresh_cache(&self) {
839        let mut cache = self.cache.lock().unwrap();
840        if cache.last_loaded.elapsed() < std::time::Duration::from_secs(1) {
841            return;
842        }
843        cache.last_loaded = std::time::Instant::now();
844
845        let policy_world = format!("{}policy", tenant_prefix(&tenant_id()));
846        let path = self.data_dir.join("atoms.json");
847        let Ok(bytes) = fs::read(&path) else {
848            return;
849        };
850        let Ok(persisted) = serde_json::from_slice::<Persisted>(&bytes) else {
851            return;
852        };
853        let Some(atoms) = persisted.atoms.get(&policy_world) else {
854            cache.deny_prefixes.clear();
855            cache.allow_prefixes.clear();
856            cache.min_retention_days = None;
857            return;
858        };
859        let mut deny = Vec::new();
860        let mut allow = Vec::new();
861        let mut min_retention_days = None;
862        for atom in atoms {
863            for label in atom.labels() {
864                if let Some(pat) = label.strip_prefix("policy:replication_deny=") {
865                    deny.push(pat.to_string());
866                }
867                if let Some(pat) = label.strip_prefix("policy:replication_allow=") {
868                    allow.push(pat.to_string());
869                }
870                if let Some(v) = label.strip_prefix("policy:retention_min_days=") {
871                    if let Ok(n) = v.parse::<u64>() {
872                        min_retention_days = Some(n);
873                    }
874                }
875            }
876        }
877        cache.deny_prefixes = deny;
878        cache.allow_prefixes = allow;
879        cache.min_retention_days = min_retention_days;
880    }
881
882    fn policy_allows_replication(&self, world: &str) -> bool {
883        self.refresh_cache();
884        let cache = self.cache.lock().unwrap();
885        let denied = cache.deny_prefixes.iter().any(|p| world.starts_with(p));
886        if denied {
887            return false;
888        }
889        if cache.allow_prefixes.is_empty() {
890            return true;
891        }
892        cache.allow_prefixes.iter().any(|p| world.starts_with(p))
893    }
894}
895
896impl WorldAccessPolicy for ComponentWorldAccessPolicy {
897    fn can_send_world(&self, world: &str, _to: &dwbase_swarm::PeerId) -> bool {
898        validate_world_for_read(world).is_ok() && self.policy_allows_replication(world)
899    }
900
901    fn can_receive_world(&self, world: &str, _from: &dwbase_swarm::PeerId) -> bool {
902        validate_world_for_read(world).is_ok() && self.policy_allows_replication(world)
903    }
904}
905
906fn start_replication_loop(
907    engine: &DWBaseEngine<FsStorage, NoVector, LocalStream, LocalGatekeeper, DummyEmbedder>,
908    replicator: Arc<Replicator>,
909) {
910    // The engine instance is owned by a static mutex; we only need the address here to re-lock it inside the thread.
911    let _engine_ptr = engine as *const _;
912    std::thread::spawn(move || loop {
913        let _ = replicator.announce();
914        let _ = replicator.poll_inbox();
915        while let Some((_from, batch)) = replicator.poll_atom_batch() {
916            if let Some(engine_mutex) = ENGINE.get() {
917                let guard = engine_mutex.lock().unwrap();
918                if futures::executor::block_on(guard.ingest_remote_atoms(batch.atoms)).is_ok() {
919                    LAST_REMOTE_INGEST_MS.store(now_ms(), Ordering::Relaxed);
920                }
921            }
922        }
923        std::thread::sleep(std::time::Duration::from_millis(100));
924    });
925}
926
927fn hello_from_engine(
928    engine: &DWBaseEngine<FsStorage, NoVector, LocalStream, LocalGatekeeper, DummyEmbedder>,
929) -> NodeHello {
930    NodeHello {
931        node_id: node_id(),
932        endpoint: "component-local".into(),
933        worlds_served: engine
934            .storage
935            .worlds()
936            .unwrap_or_default()
937            .into_iter()
938            .map(|w| w.0)
939            .collect(),
940        trust_score: 1.0,
941        started_at: now_rfc3339(),
942        version: env!("CARGO_PKG_VERSION").into(),
943    }
944}
945
946pub fn peers() -> Vec<NodeHello> {
947    PEERS.get().map(|t| t.peers()).unwrap_or_default()
948}
949
950#[derive(Clone)]
951struct LocalStream {
952    inner: Arc<ParkingMutex<LocalStreamInner>>,
953}
954
955#[derive(Clone, Copy, Debug)]
956enum ObserveDropPolicy {
957    DropOldest,
958    DropNewest,
959}
960
961struct LocalStreamInner {
962    next_handle: AtomicU64,
963    subs: HashMap<u64, LocalSubscription>,
964}
965
966struct LocalSubscription {
967    world: WorldKey,
968    filter: AtomFilter,
969    queue: VecDeque<Atom>,
970    capacity: usize,
971    drop_policy: ObserveDropPolicy,
972    dropped_total: u64,
973    last_event_ms: u64,
974}
975
976impl LocalStream {
977    fn new() -> Self {
978        Self {
979            inner: Arc::new(ParkingMutex::new(LocalStreamInner {
980                next_handle: AtomicU64::new(1),
981                subs: HashMap::new(),
982            })),
983        }
984    }
985
986    fn matches_filter(atom: &Atom, filter: &AtomFilter) -> bool {
987        if let Some(world) = &filter.world {
988            if atom.world() != world {
989                return false;
990            }
991        }
992        if !filter.kinds.is_empty() && !filter.kinds.contains(atom.kind()) {
993            return false;
994        }
995        if !filter.labels.is_empty() && !filter.labels.iter().all(|l| atom.labels().contains(l)) {
996            return false;
997        }
998        if !filter.flags.is_empty() && !filter.flags.iter().all(|f| atom.flags().contains(f)) {
999            return false;
1000        }
1001        if let Some(since) = &filter.since {
1002            if atom.timestamp().0 < since.0 {
1003                return false;
1004            }
1005        }
1006        if let Some(until) = &filter.until {
1007            if atom.timestamp().0 > until.0 {
1008                return false;
1009            }
1010        }
1011        true
1012    }
1013
1014    fn push_atom(&self, atom: Atom) {
1015        let mut guard = self.inner.lock();
1016        for sub in guard.subs.values_mut() {
1017            if &sub.world != atom.world() {
1018                continue;
1019            }
1020            if !Self::matches_filter(&atom, &sub.filter) {
1021                continue;
1022            }
1023            Self::enqueue(sub, atom.clone());
1024        }
1025    }
1026
1027    fn enqueue(sub: &mut LocalSubscription, atom: Atom) {
1028        if sub.queue.len() >= sub.capacity {
1029            sub.dropped_total = sub.dropped_total.saturating_add(1);
1030            dwbase_metrics::record_observe_dropped(1);
1031            match sub.drop_policy {
1032                ObserveDropPolicy::DropOldest => {
1033                    let _ = sub.queue.pop_front();
1034                }
1035                ObserveDropPolicy::DropNewest => {
1036                    dwbase_metrics::record_observe_queue_depth(sub.queue.len() as u64);
1037                    return;
1038                }
1039            }
1040        }
1041        sub.queue.push_back(atom);
1042        sub.last_event_ms = now_ms();
1043        dwbase_metrics::record_observe_queue_depth(sub.queue.len() as u64);
1044    }
1045
1046    fn poll_n(&self, handle: u64, max: usize) -> Vec<Atom> {
1047        let mut out = Vec::new();
1048        let mut guard = self.inner.lock();
1049        let Some(sub) = guard.subs.get_mut(&handle) else {
1050            return out;
1051        };
1052        for _ in 0..max {
1053            if let Some(a) = sub.queue.pop_front() {
1054                out.push(a);
1055            } else {
1056                break;
1057            }
1058        }
1059        out
1060    }
1061
1062    fn has_handle(&self, handle: u64) -> bool {
1063        self.inner.lock().subs.contains_key(&handle)
1064    }
1065
1066    fn world_for_handle(&self, handle: u64) -> Option<WorldKey> {
1067        self.inner.lock().subs.get(&handle).map(|s| s.world.clone())
1068    }
1069
1070    fn stats_for_handle(&self, handle: u64) -> Option<(usize, u64, u64)> {
1071        let guard = self.inner.lock();
1072        let sub = guard.subs.get(&handle)?;
1073        Some((sub.queue.len(), sub.dropped_total, sub.last_event_ms))
1074    }
1075
1076    fn push_to_handle(&self, handle: u64, atom: Atom) -> bool {
1077        let mut guard = self.inner.lock();
1078        let Some(sub) = guard.subs.get_mut(&handle) else {
1079            return false;
1080        };
1081        if &sub.world != atom.world() {
1082            return false;
1083        }
1084        if !Self::matches_filter(&atom, &sub.filter) {
1085            return false;
1086        }
1087        Self::enqueue(sub, atom);
1088        true
1089    }
1090}
1091
1092impl dwbase_engine::StreamEngine for LocalStream {
1093    type Handle = u64;
1094
1095    fn publish(&self, atom: &Atom) -> dwbase_engine::Result<()> {
1096        self.push_atom(atom.clone());
1097        Ok(())
1098    }
1099
1100    fn subscribe(
1101        &self,
1102        world: &WorldKey,
1103        filter: AtomFilter,
1104    ) -> dwbase_engine::Result<Self::Handle> {
1105        let handle = self
1106            .inner
1107            .lock()
1108            .next_handle
1109            .fetch_add(1, Ordering::Relaxed);
1110        let mut guard = self.inner.lock();
1111        guard.subs.insert(
1112            handle,
1113            LocalSubscription {
1114                world: world.clone(),
1115                filter,
1116                queue: VecDeque::new(),
1117                capacity: observe_queue_capacity(),
1118                drop_policy: observe_drop_policy(),
1119                dropped_total: 0,
1120                last_event_ms: 0,
1121            },
1122        );
1123        Ok(handle)
1124    }
1125
1126    fn poll(&self, handle: &Self::Handle) -> dwbase_engine::Result<Option<Atom>> {
1127        Ok(self.poll_n(*handle, 1).pop())
1128    }
1129
1130    fn stop(&self, handle: Self::Handle) -> dwbase_engine::Result<()> {
1131        self.inner.lock().subs.remove(&handle);
1132        Ok(())
1133    }
1134}
1135
1136#[derive(Default)]
1137struct NoVector;
1138impl dwbase_engine::VectorEngine for NoVector {
1139    fn upsert(
1140        &self,
1141        _world: &WorldKey,
1142        _atom_id: &AtomId,
1143        _vector: &[f32],
1144    ) -> dwbase_engine::Result<()> {
1145        Ok(())
1146    }
1147
1148    fn search(
1149        &self,
1150        _world: &WorldKey,
1151        _query: &[f32],
1152        _k: usize,
1153        _filter: &AtomFilter,
1154    ) -> dwbase_engine::Result<Vec<AtomId>> {
1155        Ok(Vec::new())
1156    }
1157
1158    fn rebuild(&self, _world: &WorldKey) -> dwbase_engine::Result<()> {
1159        Ok(())
1160    }
1161}
1162
1163#[derive(Debug, Serialize, Deserialize)]
1164struct Persisted {
1165    atoms: HashMap<String, Vec<Atom>>,
1166}
1167
1168#[derive(Debug)]
1169struct FsStorage {
1170    root: PathBuf,
1171    data: Mutex<HashMap<WorldKey, Vec<Atom>>>,
1172}
1173
1174impl FsStorage {
1175    fn new(root: PathBuf) -> dwbase_engine::Result<Self> {
1176        if !root.exists() {
1177            fs::create_dir_all(&root)
1178                .map_err(|e| dwbase_engine::DwbaseError::Storage(e.to_string()))?;
1179        }
1180        let path = root.join("atoms.json");
1181        let data = if path.exists() {
1182            let bytes =
1183                fs::read(&path).map_err(|e| dwbase_engine::DwbaseError::Storage(e.to_string()))?;
1184            if bytes.is_empty() {
1185                HashMap::new()
1186            } else {
1187                let persisted: Persisted = serde_json::from_slice(&bytes)
1188                    .map_err(|e| dwbase_engine::DwbaseError::Storage(e.to_string()))?;
1189                persisted
1190                    .atoms
1191                    .into_iter()
1192                    .map(|(k, v)| (WorldKey::new(k), v))
1193                    .collect()
1194            }
1195        } else {
1196            HashMap::new()
1197        };
1198        Ok(Self {
1199            root,
1200            data: Mutex::new(data),
1201        })
1202    }
1203
1204    fn persist(&self, data: &HashMap<WorldKey, Vec<Atom>>) -> dwbase_engine::Result<()> {
1205        let path = self.root.join("atoms.json");
1206        let persistable = Persisted {
1207            atoms: data.iter().map(|(k, v)| (k.0.clone(), v.clone())).collect(),
1208        };
1209        let bytes = serde_json::to_vec_pretty(&persistable)
1210            .map_err(|e| dwbase_engine::DwbaseError::Storage(e.to_string()))?;
1211        fs::write(path, bytes).map_err(|e| dwbase_engine::DwbaseError::Storage(e.to_string()))?;
1212        Ok(())
1213    }
1214
1215    fn matches_filter(atom: &Atom, filter: &AtomFilter) -> bool {
1216        if let Some(world) = &filter.world {
1217            if atom.world() != world {
1218                return false;
1219            }
1220        }
1221        if !filter.kinds.is_empty() && !filter.kinds.contains(atom.kind()) {
1222            return false;
1223        }
1224        if !filter.labels.is_empty() && !filter.labels.iter().all(|l| atom.labels().contains(l)) {
1225            return false;
1226        }
1227        if !filter.flags.is_empty() && !filter.flags.iter().all(|f| atom.flags().contains(f)) {
1228            return false;
1229        }
1230        if let Some(since) = &filter.since {
1231            if atom.timestamp().0 < since.0 {
1232                return false;
1233            }
1234        }
1235        if let Some(until) = &filter.until {
1236            if atom.timestamp().0 > until.0 {
1237                return false;
1238            }
1239        }
1240        true
1241    }
1242}
1243
1244impl StorageEngine for FsStorage {
1245    fn append(&self, atom: Atom) -> dwbase_engine::Result<()> {
1246        let mut guard = self.data.lock().unwrap();
1247        guard.entry(atom.world().clone()).or_default().push(atom);
1248        self.persist(&guard)
1249    }
1250
1251    fn get_by_ids(&self, ids: &[AtomId]) -> dwbase_engine::Result<Vec<Atom>> {
1252        let guard = self.data.lock().unwrap();
1253        let mut out = Vec::new();
1254        for atoms in guard.values() {
1255            for atom in atoms {
1256                if ids.contains(atom.id()) {
1257                    out.push(atom.clone());
1258                }
1259            }
1260        }
1261        Ok(out)
1262    }
1263
1264    fn scan(&self, world: &WorldKey, filter: &AtomFilter) -> dwbase_engine::Result<Vec<Atom>> {
1265        let guard = self.data.lock().unwrap();
1266        let list = guard.get(world).cloned().unwrap_or_default();
1267        let mut out = Vec::new();
1268        for atom in list {
1269            if Self::matches_filter(&atom, filter) {
1270                out.push(atom);
1271                if let Some(limit) = filter.limit {
1272                    if out.len() >= limit {
1273                        break;
1274                    }
1275                }
1276            }
1277        }
1278        Ok(out)
1279    }
1280
1281    fn stats(&self, world: &WorldKey) -> dwbase_engine::Result<dwbase_engine::StorageStats> {
1282        let guard = self.data.lock().unwrap();
1283        let atoms = guard.get(world);
1284        let atom_count = atoms.map(|v| v.len()).unwrap_or(0);
1285        let vector_count = atoms
1286            .map(|v| v.iter().filter(|a| a.vector().is_some()).count())
1287            .unwrap_or(0);
1288        Ok(dwbase_engine::StorageStats {
1289            atom_count,
1290            vector_count,
1291        })
1292    }
1293
1294    fn list_ids_in_window(
1295        &self,
1296        world: &WorldKey,
1297        window: &dwbase_engine::TimeWindow,
1298    ) -> dwbase_engine::Result<Vec<AtomId>> {
1299        let guard = self.data.lock().unwrap();
1300        let mut ids = Vec::new();
1301        if let Some(atoms) = guard.get(world) {
1302            for atom in atoms {
1303                if let Ok(dt) = OffsetDateTime::parse(
1304                    atom.timestamp().0.as_str(),
1305                    &time::format_description::well_known::Rfc3339,
1306                ) {
1307                    let ms = (dt.unix_timestamp_nanos() / 1_000_000) as i64;
1308                    if ms >= window.start_ms && ms <= window.end_ms {
1309                        ids.push(atom.id().clone());
1310                    }
1311                }
1312            }
1313        }
1314        Ok(ids)
1315    }
1316
1317    fn delete_atoms(&self, world: &WorldKey, ids: &[AtomId]) -> dwbase_engine::Result<usize> {
1318        let mut guard = self.data.lock().unwrap();
1319        let mut removed = 0usize;
1320        if let Some(vec) = guard.get_mut(world) {
1321            let before = vec.len();
1322            vec.retain(|a| !ids.contains(a.id()));
1323            removed = before - vec.len();
1324        }
1325        self.persist(&guard)?;
1326        Ok(removed)
1327    }
1328
1329    fn worlds(&self) -> dwbase_engine::Result<Vec<WorldKey>> {
1330        let guard = self.data.lock().unwrap();
1331        Ok(guard.keys().cloned().collect())
1332    }
1333}
1334
1335fn to_atom(kind: WitAtomKind) -> AtomKind {
1336    match kind {
1337        WitAtomKind::Observation => AtomKind::Observation,
1338        WitAtomKind::Reflection => AtomKind::Reflection,
1339        WitAtomKind::Plan => AtomKind::Plan,
1340        WitAtomKind::Action => AtomKind::Action,
1341        WitAtomKind::Message => AtomKind::Message,
1342    }
1343}
1344
1345fn to_wit_atom(atom: &Atom) -> WitAtom {
1346    WitAtom {
1347        id: atom.id().0.clone(),
1348        world_key: atom.world().0.clone(),
1349        worker: atom.worker().0.clone(),
1350        kind: match atom.kind() {
1351            AtomKind::Observation => WitAtomKind::Observation,
1352            AtomKind::Reflection => WitAtomKind::Reflection,
1353            AtomKind::Plan => WitAtomKind::Plan,
1354            AtomKind::Action => WitAtomKind::Action,
1355            AtomKind::Message => WitAtomKind::Message,
1356        },
1357        timestamp: atom.timestamp().0.clone(),
1358        importance: atom.importance().get(),
1359        payload_json: atom.payload_json().to_string(),
1360        vector: atom.vector().map(|v| v.to_vec()),
1361        flags_list: atom.flags().to_vec(),
1362        labels: atom.labels().to_vec(),
1363        links: atom.links().iter().map(|l| l.target.0.clone()).collect(),
1364    }
1365}
1366
1367fn to_filter(filter: WitAtomFilter) -> AtomFilter {
1368    AtomFilter {
1369        world: filter.world_key.map(WorldKey::new),
1370        kinds: filter.kinds.into_iter().map(to_atom).collect(),
1371        labels: filter.labels,
1372        flags: filter.flag_filter,
1373        since: filter.since.map(Timestamp::new),
1374        until: filter.until.map(Timestamp::new),
1375        limit: filter.limit.map(|v| v as usize),
1376    }
1377}
1378
1379fn warnings_from_health(snapshot: &engine::HealthSnapshot) -> Vec<String> {
1380    if snapshot.status == "ready" {
1381        Vec::new()
1382    } else if snapshot.message.is_empty() {
1383        vec!["degraded".into()]
1384    } else {
1385        vec![format!("degraded: {}", snapshot.message)]
1386    }
1387}
1388
1389#[derive(Clone, Debug, Serialize, Deserialize)]
1390struct ObserveCursor {
1391    last_atom_id: String,
1392    last_timestamp: String,
1393    updated_at_ms: u64,
1394}
1395
1396fn world_token(world_key: &str) -> String {
1397    hex::encode(world_key.as_bytes())
1398}
1399
1400fn observe_cursor_path(world: &WorldKey) -> PathBuf {
1401    data_dir()
1402        .join("_observe")
1403        .join("cursors")
1404        .join(world_token(&world.0))
1405        .join("cursor.json")
1406}
1407
1408fn read_observe_cursor(world: &WorldKey) -> Option<ObserveCursor> {
1409    let path = observe_cursor_path(world);
1410    let bytes = fs::read(path).ok()?;
1411    serde_json::from_slice(&bytes).ok()
1412}
1413
1414fn write_observe_cursor(world: &WorldKey, cursor: &ObserveCursor) {
1415    let path = observe_cursor_path(world);
1416    if let Some(parent) = path.parent() {
1417        let _ = fs::create_dir_all(parent);
1418    }
1419    if let Ok(bytes) = serde_json::to_vec(cursor) {
1420        let _ = fs::write(path, bytes);
1421    }
1422}
1423
1424fn durable_catchup_atoms(
1425    engine: &ComponentEngine,
1426    world: &WorldKey,
1427    filter: &AtomFilter,
1428    cursor: Option<&ObserveCursor>,
1429) -> Vec<Atom> {
1430    let mut out = Vec::new();
1431    let Ok(all) = engine.storage.scan(
1432        world,
1433        &AtomFilter {
1434            world: Some(world.clone()),
1435            kinds: Vec::new(),
1436            labels: Vec::new(),
1437            flags: Vec::new(),
1438            since: None,
1439            until: None,
1440            limit: None,
1441        },
1442    ) else {
1443        return out;
1444    };
1445
1446    let start_idx = cursor
1447        .and_then(|c| all.iter().position(|a| a.id().0 == c.last_atom_id))
1448        .map(|i| i + 1)
1449        .unwrap_or(0);
1450
1451    for atom in all.into_iter().skip(start_idx) {
1452        if !LocalStream::matches_filter(&atom, filter) {
1453            continue;
1454        }
1455        out.push(atom);
1456        if out.len() >= observe_durable_catchup_limit() {
1457            break;
1458        }
1459    }
1460    out
1461}
1462
1463pub struct Component;
1464impl engine::Guest for Component {
1465    fn remember(atom: WitNewAtom) -> String {
1466        let start = Instant::now();
1467        let engine = init_engine();
1468        let guard = engine.lock().unwrap();
1469        let new_atom = NewAtom {
1470            world: WorldKey::new(atom.world_key),
1471            worker: WorkerKey::new(effective_worker(&atom.worker)),
1472            kind: to_atom(atom.kind),
1473            timestamp: Timestamp::new(if atom.timestamp.is_empty() {
1474                OffsetDateTime::now_utc()
1475                    .format(&time::format_description::well_known::Rfc3339)
1476                    .unwrap_or_else(|_| "1970-01-01T00:00:00Z".into())
1477            } else {
1478                atom.timestamp
1479            }),
1480            importance: Importance::clamped(atom.importance),
1481            payload_json: atom.payload_json,
1482            vector: atom.vector,
1483            flags: atom.flags_list,
1484            labels: atom.labels,
1485            links: atom
1486                .links
1487                .into_iter()
1488                .map(|id| Link {
1489                    target: AtomId::new(id),
1490                    kind: LinkKind::References,
1491                })
1492                .collect(),
1493        };
1494        if let Err(_e) = validate_new_atom(&new_atom) {
1495            return String::new();
1496        }
1497        let id = match futures::executor::block_on(guard.remember(new_atom)) {
1498            Ok(v) => v.0,
1499            Err(_) => return String::new(),
1500        };
1501        dwbase_metrics::record_remember_latency(start.elapsed());
1502        dwbase_metrics::record_index_freshness(Duration::from_millis(0));
1503        let repl = SWARM.get().cloned();
1504        let broadcaster = BROADCASTER.get();
1505        let atom = guard
1506            .get_atoms(&[AtomId::new(id.clone())])
1507            .ok()
1508            .and_then(|mut v| v.pop());
1509        drop(guard);
1510        if let Some(atom) = atom {
1511            if let Some(repl) = repl {
1512                let _ = repl.replicate_new_atom(atom.clone());
1513            }
1514            if let Some(b) = broadcaster {
1515                let _ = b.publish_atom(atom);
1516            }
1517        }
1518        id
1519    }
1520
1521    fn ask(question: WitQuestion) -> Answer {
1522        let start = Instant::now();
1523        if validate_world_for_read(&question.world_key).is_err() {
1524            return Answer {
1525                world_key: question.world_key,
1526                text: "capability denied".into(),
1527                supporting_atoms: Vec::new(),
1528            };
1529        }
1530        let engine = init_engine();
1531        let guard = engine.lock().unwrap();
1532        let q = Question {
1533            world: WorldKey::new(question.world_key),
1534            text: question.text,
1535            filter: question.filter.map(to_filter),
1536        };
1537        let ans = futures::executor::block_on(guard.ask(q)).expect("ask");
1538        dwbase_metrics::record_ask_latency(start.elapsed());
1539        Answer {
1540            world_key: ans.world.0,
1541            text: ans.text,
1542            supporting_atoms: ans.supporting_atoms.iter().map(to_wit_atom).collect(),
1543        }
1544    }
1545
1546    fn observe(atom: WitAtom) {
1547        let start = Instant::now();
1548        let engine = init_engine();
1549        let guard = engine.lock().unwrap();
1550        let new_atom = NewAtom {
1551            world: WorldKey::new(atom.world_key.clone()),
1552            worker: WorkerKey::new(effective_worker(&atom.worker)),
1553            kind: to_atom(atom.kind),
1554            timestamp: Timestamp::new(atom.timestamp),
1555            importance: Importance::clamped(atom.importance),
1556            payload_json: atom.payload_json,
1557            vector: atom.vector,
1558            flags: atom.flags_list,
1559            labels: atom.labels,
1560            links: atom
1561                .links
1562                .into_iter()
1563                .map(|id| Link {
1564                    target: AtomId::new(id),
1565                    kind: LinkKind::References,
1566                })
1567                .collect(),
1568        };
1569        if validate_new_atom(&new_atom).is_err() {
1570            return;
1571        }
1572        let id = futures::executor::block_on(guard.remember(new_atom))
1573            .ok()
1574            .map(|v| v.0);
1575        dwbase_metrics::record_remember_latency(start.elapsed());
1576        dwbase_metrics::record_index_freshness(Duration::from_millis(0));
1577        let repl = SWARM.get().cloned();
1578        let broadcaster = BROADCASTER.get();
1579        let atom = id
1580            .clone()
1581            .and_then(|id| guard.get_atoms(&[AtomId::new(id)]).ok())
1582            .and_then(|mut v| v.pop());
1583        drop(guard);
1584        if let Some(atom) = atom {
1585            if let Some(repl) = repl {
1586                let _ = repl.replicate_new_atom(atom.clone());
1587            }
1588            if let Some(b) = broadcaster {
1589                let _ = b.publish_atom(atom);
1590            }
1591        }
1592    }
1593
1594    fn replay(target_world: String, filter: WitAtomFilter) -> Vec<WitAtom> {
1595        if validate_world_for_read(&target_world).is_err() {
1596            return Vec::new();
1597        }
1598        let engine = init_engine();
1599        let guard = engine.lock().unwrap();
1600        let filter = to_filter(filter);
1601        futures::executor::block_on(guard.replay(WorldKey::new(target_world), filter))
1602            .expect("replay")
1603            .iter()
1604            .map(to_wit_atom)
1605            .collect()
1606    }
1607
1608    fn observe_start(filter: WitAtomFilter) -> u64 {
1609        let engine = init_engine();
1610        let filter = to_filter(filter);
1611        let Some(world) = filter.world.clone() else {
1612            return 0;
1613        };
1614        if validate_world_for_read(&world.0).is_err() {
1615            return 0;
1616        }
1617        let stream = STREAM.get_or_init(LocalStream::new).clone();
1618        let handle = stream.subscribe(&world, filter).unwrap_or(0);
1619        if handle == 0 {
1620            return 0;
1621        }
1622
1623        if observe_durable_enabled() {
1624            let guard = engine.lock().unwrap();
1625            let cursor = read_observe_cursor(&world);
1626            let catchup = durable_catchup_atoms(
1627                &guard,
1628                &world,
1629                &AtomFilter {
1630                    world: Some(world.clone()),
1631                    kinds: Vec::new(),
1632                    labels: Vec::new(),
1633                    flags: Vec::new(),
1634                    since: None,
1635                    until: None,
1636                    limit: None,
1637                },
1638                cursor.as_ref(),
1639            );
1640            drop(guard);
1641            for atom in catchup {
1642                let _ = stream.push_to_handle(handle, atom);
1643            }
1644        }
1645
1646        // Optional cross-node observe: subscribe to per-world NATS event subject if a bus is available.
1647        if std::env::var("DWBASE_OBSERVE_NATS_SUBSCRIBE").is_ok() {
1648            if let Some(bus) = BUS.get() {
1649                let subject = world_events_subject(&world.0);
1650                let subs = EVENT_SUBS.get_or_init(|| ParkingMutex::new(HashSet::new()));
1651                let mut guard = subs.lock();
1652                if guard.insert(subject.clone()) {
1653                    let stream = stream.clone();
1654                    let self_node = node_id();
1655                    let _ = bus.subscribe(
1656                        &subject,
1657                        Box::new(move |_sub, bytes, _reply_to| {
1658                            let Ok(batch) = decode_event_batch(&bytes) else {
1659                                return;
1660                            };
1661                            for ev in batch.events {
1662                                if ev.from_node == self_node {
1663                                    continue;
1664                                }
1665                                stream.push_atom(ev.atom);
1666                            }
1667                        }),
1668                    );
1669                }
1670            }
1671        }
1672
1673        handle
1674    }
1675
1676    fn observe_poll(handle: u64, max: u32) -> Vec<WitAtom> {
1677        if handle == 0 {
1678            return Vec::new();
1679        }
1680        let _ = init_engine();
1681        let stream = STREAM.get_or_init(LocalStream::new);
1682        let atoms = stream.poll_n(handle, max as usize);
1683        if observe_durable_enabled() {
1684            if let Some(world) = stream.world_for_handle(handle) {
1685                if let Some(last) = atoms.last() {
1686                    write_observe_cursor(
1687                        &world,
1688                        &ObserveCursor {
1689                            last_atom_id: last.id().0.clone(),
1690                            last_timestamp: last.timestamp().0.clone(),
1691                            updated_at_ms: now_ms(),
1692                        },
1693                    );
1694                }
1695            }
1696        }
1697        atoms.iter().map(to_wit_atom).collect()
1698    }
1699
1700    fn observe_stop(handle: u64) {
1701        if handle == 0 {
1702            return;
1703        }
1704        let _ = init_engine();
1705        let stream = STREAM.get_or_init(LocalStream::new);
1706        let _ = stream.stop(handle);
1707    }
1708
1709    fn observe_stats(handle: u64) -> engine::ObserveStatsSnapshot {
1710        if handle == 0 {
1711            return engine::ObserveStatsSnapshot {
1712                handle,
1713                queued_count: 0,
1714                dropped_count: 0,
1715                last_event_ms: 0,
1716                warnings: vec!["invalid_handle".into()],
1717            };
1718        }
1719        let stream = STREAM.get_or_init(LocalStream::new);
1720        let Some((queued, dropped, last_event_ms)) = stream.stats_for_handle(handle) else {
1721            return engine::ObserveStatsSnapshot {
1722                handle,
1723                queued_count: 0,
1724                dropped_count: 0,
1725                last_event_ms: 0,
1726                warnings: vec!["invalid_handle".into()],
1727            };
1728        };
1729        let mut warnings = Vec::new();
1730        if dropped > 0 {
1731            warnings.push("events_dropped".into());
1732        }
1733        engine::ObserveStatsSnapshot {
1734            handle,
1735            queued_count: queued as u64,
1736            dropped_count: dropped,
1737            last_event_ms,
1738            warnings,
1739        }
1740    }
1741
1742    fn health() -> engine::HealthSnapshot {
1743        let engine = init_engine();
1744        let guard = engine.lock().unwrap();
1745        compute_health(&guard)
1746    }
1747
1748    fn remember_v2(atom: WitNewAtom) -> Result<String, engine::ToolError> {
1749        let start = Instant::now();
1750        let engine = init_engine_v2()?;
1751        let guard = engine.lock().unwrap();
1752
1753        let new_atom = NewAtom {
1754            world: WorldKey::new(atom.world_key),
1755            worker: WorkerKey::new(effective_worker(&atom.worker)),
1756            kind: to_atom(atom.kind),
1757            timestamp: Timestamp::new(if atom.timestamp.is_empty() {
1758                OffsetDateTime::now_utc()
1759                    .format(&time::format_description::well_known::Rfc3339)
1760                    .unwrap_or_else(|_| "1970-01-01T00:00:00Z".into())
1761            } else {
1762                atom.timestamp
1763            }),
1764            importance: Importance::clamped(atom.importance),
1765            payload_json: atom.payload_json,
1766            vector: atom.vector,
1767            flags: atom.flags_list,
1768            labels: atom.labels,
1769            links: atom
1770                .links
1771                .into_iter()
1772                .map(|id| Link {
1773                    target: AtomId::new(id),
1774                    kind: LinkKind::References,
1775                })
1776                .collect(),
1777        };
1778
1779        if let Err(msg) = validate_new_atom(&new_atom) {
1780            return Err(map_validation_error(msg));
1781        }
1782
1783        let id = futures::executor::block_on(guard.remember(new_atom))
1784            .map_err(|e| err_storage(format!("remember failed: {e}")))?
1785            .0;
1786
1787        dwbase_metrics::record_remember_latency(start.elapsed());
1788        dwbase_metrics::record_index_freshness(Duration::from_millis(0));
1789
1790        let repl = SWARM.get().cloned();
1791        let broadcaster = BROADCASTER.get();
1792        let atom = guard
1793            .get_atoms(&[AtomId::new(id.clone())])
1794            .ok()
1795            .and_then(|mut v| v.pop());
1796        drop(guard);
1797
1798        if let Some(atom) = atom {
1799            if let Some(repl) = repl {
1800                let _ = repl.replicate_new_atom(atom.clone());
1801            }
1802            if let Some(b) = broadcaster {
1803                let _ = b.publish_atom(atom);
1804            }
1805        }
1806
1807        Ok(id)
1808    }
1809
1810    fn ask_v2(question: WitQuestion) -> Result<engine::AnswerV2, engine::ToolError> {
1811        let start = Instant::now();
1812        if let Err(msg) = validate_world_for_read(&question.world_key) {
1813            return Err(map_validation_error(msg));
1814        }
1815        let engine = init_engine_v2()?;
1816        let guard = engine.lock().unwrap();
1817        let q = Question {
1818            world: WorldKey::new(question.world_key),
1819            text: question.text,
1820            filter: question.filter.map(to_filter),
1821        };
1822        let ans = futures::executor::block_on(guard.ask(q))
1823            .map_err(|e| tool_error("internal_error", format!("ask failed: {e}"), None))?;
1824        dwbase_metrics::record_ask_latency(start.elapsed());
1825        let health = compute_health(&guard);
1826        Ok(engine::AnswerV2 {
1827            world_key: ans.world.0,
1828            text: ans.text,
1829            supporting_atoms: ans.supporting_atoms.iter().map(to_wit_atom).collect(),
1830            warnings: warnings_from_health(&health),
1831        })
1832    }
1833
1834    fn observe_start_v2(filter: WitAtomFilter) -> Result<u64, engine::ToolError> {
1835        let engine = init_engine_v2()?;
1836        let filter = to_filter(filter);
1837        let Some(world) = filter.world.clone() else {
1838            return Err(err_invalid_input(
1839                "observe_start requires filter.world_key to be set",
1840            ));
1841        };
1842        if let Err(msg) = validate_world_for_read(&world.0) {
1843            return Err(map_validation_error(msg));
1844        }
1845        let stream = STREAM.get_or_init(LocalStream::new).clone();
1846        let handle = stream
1847            .subscribe(&world, filter)
1848            .map_err(|e| tool_error("internal_error", format!("subscribe failed: {e}"), None))?;
1849
1850        if observe_durable_enabled() {
1851            let guard = engine.lock().unwrap();
1852            let cursor = read_observe_cursor(&world);
1853            let catchup = durable_catchup_atoms(
1854                &guard,
1855                &world,
1856                &AtomFilter {
1857                    world: Some(world.clone()),
1858                    kinds: Vec::new(),
1859                    labels: Vec::new(),
1860                    flags: Vec::new(),
1861                    since: None,
1862                    until: None,
1863                    limit: None,
1864                },
1865                cursor.as_ref(),
1866            );
1867            drop(guard);
1868            for atom in catchup {
1869                let _ = stream.push_to_handle(handle, atom);
1870            }
1871        }
1872
1873        if std::env::var("DWBASE_OBSERVE_NATS_SUBSCRIBE").is_ok() {
1874            if let Some(bus) = BUS.get() {
1875                let subject = world_events_subject(&world.0);
1876                let subs = EVENT_SUBS.get_or_init(|| ParkingMutex::new(HashSet::new()));
1877                let mut guard = subs.lock();
1878                if guard.insert(subject.clone()) {
1879                    let stream = stream.clone();
1880                    let self_node = node_id();
1881                    let _ = bus.subscribe(
1882                        &subject,
1883                        Box::new(move |_sub, bytes, _reply_to| {
1884                            let Ok(batch) = decode_event_batch(&bytes) else {
1885                                return;
1886                            };
1887                            for ev in batch.events {
1888                                if ev.from_node == self_node {
1889                                    continue;
1890                                }
1891                                stream.push_atom(ev.atom);
1892                            }
1893                        }),
1894                    );
1895                }
1896            }
1897        }
1898
1899        Ok(handle)
1900    }
1901
1902    fn observe_poll_v2(handle: u64, max: u32) -> Result<Vec<WitAtom>, engine::ToolError> {
1903        if handle == 0 {
1904            return Err(err_invalid_handle("handle must be non-zero"));
1905        }
1906        let _ = init_engine_v2()?;
1907        let stream = STREAM.get_or_init(LocalStream::new);
1908        if !stream.has_handle(handle) {
1909            return Err(err_invalid_handle("unknown observe handle"));
1910        }
1911        let atoms = stream.poll_n(handle, max as usize);
1912        if observe_durable_enabled() {
1913            if let Some(world) = stream.world_for_handle(handle) {
1914                if let Some(last) = atoms.last() {
1915                    write_observe_cursor(
1916                        &world,
1917                        &ObserveCursor {
1918                            last_atom_id: last.id().0.clone(),
1919                            last_timestamp: last.timestamp().0.clone(),
1920                            updated_at_ms: now_ms(),
1921                        },
1922                    );
1923                }
1924            }
1925        }
1926        Ok(atoms.iter().map(to_wit_atom).collect())
1927    }
1928
1929    fn observe_stop_v2(handle: u64) -> Result<bool, engine::ToolError> {
1930        if handle == 0 {
1931            return Err(err_invalid_handle("handle must be non-zero"));
1932        }
1933        let _ = init_engine_v2()?;
1934        let stream = STREAM.get_or_init(LocalStream::new);
1935        if !stream.has_handle(handle) {
1936            return Err(err_invalid_handle("unknown observe handle"));
1937        }
1938        stream
1939            .stop(handle)
1940            .map_err(|e| tool_error("internal_error", format!("stop failed: {e}"), None))?;
1941        Ok(true)
1942    }
1943
1944    fn observe_stats_v2(handle: u64) -> Result<engine::ObserveStatsSnapshot, engine::ToolError> {
1945        if handle == 0 {
1946            return Err(err_invalid_handle("handle must be non-zero"));
1947        }
1948        let _ = init_engine_v2()?;
1949        let stream = STREAM.get_or_init(LocalStream::new);
1950        if !stream.has_handle(handle) {
1951            return Err(err_invalid_handle("unknown observe handle"));
1952        }
1953        Ok(<Component as engine::Guest>::observe_stats(handle))
1954    }
1955
1956    fn health_v2() -> Result<engine::HealthSnapshot, engine::ToolError> {
1957        let engine = init_engine_v2()?;
1958        let guard = engine.lock().unwrap();
1959        Ok(compute_health(&guard))
1960    }
1961
1962    fn metrics_snapshot() -> Result<engine::MetricsSnapshotData, engine::ToolError> {
1963        let handle = install_metrics_recorder()
1964            .or_else(|| PROM_HANDLE.get().and_then(|h| h.as_ref()))
1965            .ok_or_else(|| {
1966                tool_error(
1967                    "internal_error",
1968                    "metrics recorder unavailable",
1969                    Some("install metrics recorder failed".into()),
1970                )
1971            })?;
1972        metrics::gauge!("dwbase.component.up").set(1.0);
1973        let text = handle.render();
1974        let parsed = parse_prometheus(&text);
1975        Ok(engine::MetricsSnapshotData {
1976            format: "prometheus".into(),
1977            prometheus: text,
1978            counters: parsed.counters,
1979            gauges: parsed.gauges,
1980            histograms: parsed.histograms,
1981        })
1982    }
1983}
1984
1985// Expose the WIT exports for this component.
1986export!(Component);
1987
1988#[cfg(test)]
1989mod tests {
1990    use super::*;
1991    use dwbase_swarm_nats::replication::replicator_with_bus;
1992    use tempfile::TempDir;
1993
1994    static ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
1995    static TEST_DIR: once_cell::sync::OnceCell<TempDir> = once_cell::sync::OnceCell::new();
1996
1997    fn env_lock() -> std::sync::MutexGuard<'static, ()> {
1998        ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner())
1999    }
2000
2001    fn reset_security_env() {
2002        for key in [
2003            "DWBASE_TENANT_ID",
2004            "GREENTIC_TENANT_ID",
2005            "DWBASE_WORKER_ID",
2006            "GREENTIC_WORKER_ID",
2007            "DWBASE_ENFORCE_TENANT_NAMESPACE",
2008            "DWBASE_ALLOW_READ_WORLDS",
2009            "DWBASE_ALLOW_WRITE_WORLDS",
2010            "DWBASE_ALLOW_READ_PREFIXES",
2011            "DWBASE_ALLOW_WRITE_PREFIXES",
2012            "DWBASE_MAX_PAYLOAD_BYTES",
2013            "DWBASE_IMPORTANCE_CAP",
2014            "DWBASE_ALLOWED_KINDS",
2015            "DWBASE_ALLOWED_LABELS",
2016            "DWBASE_ALLOW_POLICY_LABELS",
2017            "DWBASE_OBSERVE_QUEUE_CAPACITY",
2018            "DWBASE_OBSERVE_DROP_POLICY",
2019            "DWBASE_OBSERVE_DURABLE",
2020            "DWBASE_OBSERVE_DURABLE_CATCHUP_LIMIT",
2021            "DWBASE_HEALTH_DISABLE_FS_STATS",
2022        ] {
2023            std::env::remove_var(key);
2024        }
2025    }
2026
2027    fn set_shared_data_dir() {
2028        let dir = TEST_DIR.get_or_init(|| TempDir::new().unwrap());
2029        std::env::set_var("DWBASE_DATA_DIR", dir.path());
2030    }
2031
2032    #[test]
2033    fn remember_then_ask_roundtrip() {
2034        let _lock = env_lock();
2035        reset_security_env();
2036        set_shared_data_dir();
2037        let token = format!("hello-{}", now_ms());
2038        let atom_id = <Component as engine::Guest>::remember(WitNewAtom {
2039            world_key: "tenant:default/w1".into(),
2040            worker: "worker-1".into(),
2041            kind: WitAtomKind::Observation,
2042            timestamp: "2024-01-01T00:00:00Z".into(),
2043            importance: 0.5,
2044            payload_json: format!(r#"{{"note":"{}"}}"#, token),
2045            vector: None,
2046            flags_list: vec![],
2047            labels: vec![],
2048            links: vec![],
2049        });
2050        assert!(!atom_id.is_empty());
2051        let atoms_path = data_dir().join("atoms.json");
2052        let bytes = fs::read(&atoms_path).expect("atoms.json should exist after remember");
2053        let text = String::from_utf8_lossy(&bytes);
2054        assert!(
2055            text.contains(&atom_id),
2056            "atoms.json should contain remembered atom id"
2057        );
2058
2059        let answer = <Component as engine::Guest>::ask(WitQuestion {
2060            world_key: "tenant:default/w1".into(),
2061            text: token,
2062            filter: None,
2063        });
2064        assert_eq!(answer.world_key, "tenant:default/w1");
2065        assert!(!answer.text.is_empty());
2066    }
2067
2068    #[test]
2069    fn observe_start_poll_stop_receives_atoms_in_order() {
2070        let _lock = env_lock();
2071        reset_security_env();
2072        set_shared_data_dir();
2073        std::env::set_var("DWBASE_OBSERVE_QUEUE_CAPACITY", "100");
2074        std::env::set_var("DWBASE_OBSERVE_DROP_POLICY", "drop_oldest");
2075
2076        let handle = <Component as engine::Guest>::observe_start(WitAtomFilter {
2077            world_key: Some("tenant:default/obs".into()),
2078            kinds: vec![],
2079            labels: vec![],
2080            flag_filter: vec![],
2081            since: None,
2082            until: None,
2083            limit: None,
2084        });
2085        assert!(handle > 0);
2086
2087        let a1 = <Component as engine::Guest>::remember(WitNewAtom {
2088            world_key: "tenant:default/obs".into(),
2089            worker: "worker-1".into(),
2090            kind: WitAtomKind::Observation,
2091            timestamp: "2024-01-01T00:00:00Z".into(),
2092            importance: 0.4,
2093            payload_json: r#"{"n":1}"#.into(),
2094            vector: None,
2095            flags_list: vec![],
2096            labels: vec![],
2097            links: vec![],
2098        });
2099        let a2 = <Component as engine::Guest>::remember(WitNewAtom {
2100            world_key: "tenant:default/obs".into(),
2101            worker: "worker-1".into(),
2102            kind: WitAtomKind::Observation,
2103            timestamp: "2024-01-01T00:00:01Z".into(),
2104            importance: 0.4,
2105            payload_json: r#"{"n":2}"#.into(),
2106            vector: None,
2107            flags_list: vec![],
2108            labels: vec![],
2109            links: vec![],
2110        });
2111
2112        let mut got = <Component as engine::Guest>::observe_poll(handle, 10);
2113        if got.len() < 2 {
2114            got.extend(<Component as engine::Guest>::observe_poll(handle, 10));
2115        }
2116        assert_eq!(got.len(), 2);
2117        assert_eq!(got[0].id, a1);
2118        assert_eq!(got[1].id, a2);
2119
2120        <Component as engine::Guest>::observe_stop(handle);
2121    }
2122
2123    #[test]
2124    fn observe_receives_remotely_ingested_atoms() {
2125        let _lock = env_lock();
2126        reset_security_env();
2127        set_shared_data_dir();
2128
2129        let handle = <Component as engine::Guest>::observe_start(WitAtomFilter {
2130            world_key: Some("tenant:default/obs-remote".into()),
2131            kinds: vec![],
2132            labels: vec![],
2133            flag_filter: vec![],
2134            since: None,
2135            until: None,
2136            limit: None,
2137        });
2138        assert!(handle > 0);
2139
2140        let atom = Atom::builder(
2141            AtomId::new("remote-1"),
2142            WorldKey::new("tenant:default/obs-remote"),
2143            WorkerKey::new("peer"),
2144            AtomKind::Observation,
2145            Timestamp::new("2024-01-01T00:00:00Z"),
2146            Importance::clamped(0.5),
2147            r#"{"remote":true}"#,
2148        )
2149        .build();
2150
2151        let engine = init_engine();
2152        let guard = engine.lock().unwrap();
2153        futures::executor::block_on(guard.ingest_remote_atoms(vec![atom])).unwrap();
2154        drop(guard);
2155
2156        let got = <Component as engine::Guest>::observe_poll(handle, 10);
2157        assert_eq!(got.len(), 1);
2158        assert_eq!(got[0].id, "remote-1");
2159
2160        <Component as engine::Guest>::observe_stop(handle);
2161    }
2162
2163    #[test]
2164    fn presence_discovery_with_mock() {
2165        let _lock = env_lock();
2166        reset_security_env();
2167        set_shared_data_dir();
2168        let client = Arc::new(MockNats::default()) as Arc<dyn NatsClient>;
2169        let table_a = PeerTable::default();
2170        let table_b = PeerTable::default();
2171        let engine_a = init_engine();
2172        let engine_b = init_engine();
2173        let hello_a = hello_from_engine(&engine_a.lock().unwrap());
2174        let mut hello_b = hello_from_engine(&engine_b.lock().unwrap());
2175        hello_b.node_id = "other-node".into();
2176        let ttl = std::time::Duration::from_millis(100);
2177        start_presence_loop(client.clone(), hello_a, table_a.clone(), ttl);
2178        start_presence_loop(client, hello_b, table_b.clone(), ttl);
2179        std::thread::sleep(std::time::Duration::from_millis(200));
2180        assert!(
2181            table_a.peers().iter().any(|p| p.node_id == "other-node"),
2182            "table A should see node B"
2183        );
2184        assert!(
2185            table_b.peers().iter().any(|p| p.node_id == node_id()),
2186            "table B should see node A"
2187        );
2188    }
2189
2190    #[test]
2191    fn selective_replication_only_ingests_subscribed_worlds() {
2192        let _lock = env_lock();
2193        reset_security_env();
2194        let bus = Arc::new(MockBus::default()) as Arc<dyn dwbase_swarm_nats::swarm::SwarmBus>;
2195
2196        let dir_a = TempDir::new().unwrap();
2197        let dir_b = TempDir::new().unwrap();
2198        let dir_c = TempDir::new().unwrap();
2199
2200        let engine_a = DWBaseEngine::new(
2201            FsStorage::new(dir_a.path().to_path_buf()).unwrap(),
2202            NoVector,
2203            LocalStream::new(),
2204            LocalGatekeeper::new(Capabilities::default(), TrustStore::default()),
2205            DummyEmbedder::new(),
2206        );
2207        let engine_b = DWBaseEngine::new(
2208            FsStorage::new(dir_b.path().to_path_buf()).unwrap(),
2209            NoVector,
2210            LocalStream::new(),
2211            LocalGatekeeper::new(Capabilities::default(), TrustStore::default()),
2212            DummyEmbedder::new(),
2213        );
2214        let engine_c = DWBaseEngine::new(
2215            FsStorage::new(dir_c.path().to_path_buf()).unwrap(),
2216            NoVector,
2217            LocalStream::new(),
2218            LocalGatekeeper::new(Capabilities::default(), TrustStore::default()),
2219            DummyEmbedder::new(),
2220        );
2221
2222        let repl_a = replicator_with_bus(bus.clone(), "node-a", vec![]).unwrap();
2223        let repl_b =
2224            replicator_with_bus(bus.clone(), "node-b", vec!["tenant:default/world-x".into()])
2225                .unwrap();
2226        let repl_c = replicator_with_bus(bus.clone(), "node-c", vec![]).unwrap();
2227
2228        repl_b.announce().unwrap();
2229
2230        // Node A writes an atom to world-x and replicates it.
2231        let atom_id = futures::executor::block_on(engine_a.remember(NewAtom {
2232            world: WorldKey::new("tenant:default/world-x"),
2233            worker: WorkerKey::new("w"),
2234            kind: AtomKind::Observation,
2235            timestamp: Timestamp::new("2024-01-01T00:00:00Z"),
2236            importance: Importance::clamped(0.5),
2237            payload_json: r#"{"note":"hello"}"#.into(),
2238            vector: None,
2239            flags: vec![],
2240            labels: vec![],
2241            links: vec![],
2242        }))
2243        .unwrap();
2244
2245        let mut atoms = engine_a.get_atoms(&[atom_id.clone()]).unwrap();
2246        let atom = atoms.pop().expect("atom exists");
2247        repl_a.replicate_new_atom(atom).unwrap();
2248
2249        // Node B should receive and ingest; Node C should not.
2250        repl_b.poll_inbox().unwrap();
2251        repl_c.poll_inbox().unwrap();
2252
2253        while let Some((_from, batch)) = repl_b.poll_atom_batch() {
2254            futures::executor::block_on(engine_b.ingest_remote_atoms(batch.atoms)).unwrap();
2255        }
2256        while let Some((_from, batch)) = repl_c.poll_atom_batch() {
2257            futures::executor::block_on(engine_c.ingest_remote_atoms(batch.atoms)).unwrap();
2258        }
2259
2260        let ans_b = futures::executor::block_on(engine_b.ask(Question {
2261            world: WorldKey::new("tenant:default/world-x"),
2262            text: "hello?".into(),
2263            filter: None,
2264        }))
2265        .unwrap();
2266        assert!(
2267            ans_b.supporting_atoms.iter().any(|a| a.id() == &atom_id),
2268            "node-b should contain replicated atom"
2269        );
2270
2271        let ans_c = futures::executor::block_on(engine_c.ask(Question {
2272            world: WorldKey::new("tenant:default/world-x"),
2273            text: "hello?".into(),
2274            filter: None,
2275        }))
2276        .unwrap();
2277        assert!(
2278            ans_c.supporting_atoms.iter().all(|a| a.id() != &atom_id),
2279            "node-c should not contain replicated atom"
2280        );
2281    }
2282
2283    #[test]
2284    fn attempt_to_write_foreign_world_is_denied() {
2285        let _lock = env_lock();
2286        reset_security_env();
2287        set_shared_data_dir();
2288        std::env::set_var("DWBASE_TENANT_ID", "acme");
2289
2290        let atom_id = <Component as engine::Guest>::remember(WitNewAtom {
2291            world_key: "tenant:other/w1".into(),
2292            worker: "worker-1".into(),
2293            kind: WitAtomKind::Observation,
2294            timestamp: "2024-01-01T00:00:00Z".into(),
2295            importance: 0.5,
2296            payload_json: r#"{"note":"nope"}"#.into(),
2297            vector: None,
2298            flags_list: vec![],
2299            labels: vec![],
2300            links: vec![],
2301        });
2302        assert!(atom_id.is_empty(), "write should be denied");
2303    }
2304
2305    #[test]
2306    fn oversize_payload_is_rejected() {
2307        let _lock = env_lock();
2308        reset_security_env();
2309        set_shared_data_dir();
2310        std::env::set_var("DWBASE_MAX_PAYLOAD_BYTES", "10");
2311
2312        let atom_id = <Component as engine::Guest>::remember(WitNewAtom {
2313            world_key: "tenant:default/w1".into(),
2314            worker: "worker-1".into(),
2315            kind: WitAtomKind::Observation,
2316            timestamp: "2024-01-01T00:00:00Z".into(),
2317            importance: 0.1,
2318            payload_json: r#"{"note":"this is definitely >10 bytes"}"#.into(),
2319            vector: None,
2320            flags_list: vec![],
2321            labels: vec![],
2322            links: vec![],
2323        });
2324        assert!(atom_id.is_empty(), "oversize payload should be rejected");
2325    }
2326
2327    #[test]
2328    fn health_ready_by_default() {
2329        let _lock = env_lock();
2330        reset_security_env();
2331        set_shared_data_dir();
2332        std::env::set_var("DWBASE_MAX_DISK_MB", "100");
2333        let h = <Component as engine::Guest>::health();
2334        assert!(h.storage_ok);
2335        assert!(h.index_ok);
2336        assert_eq!(h.status, "ready");
2337        assert_eq!(h.disk_pressure, "ok");
2338    }
2339
2340    #[test]
2341    fn health_capacity_unknown_stays_ready_when_ok() {
2342        let _lock = env_lock();
2343        reset_security_env();
2344        set_shared_data_dir();
2345        std::env::remove_var("DWBASE_MAX_DISK_MB");
2346        std::env::set_var("DWBASE_HEALTH_DISABLE_FS_STATS", "1");
2347
2348        let h = <Component as engine::Guest>::health();
2349        assert!(h.storage_ok);
2350        assert!(h.index_ok);
2351        assert_eq!(h.status, "ready");
2352        assert_eq!(h.disk_pressure, "unknown");
2353    }
2354
2355    #[test]
2356    fn health_degrades_when_configured_capacity_exceeded() {
2357        let _lock = env_lock();
2358        reset_security_env();
2359        set_shared_data_dir();
2360        std::env::set_var("DWBASE_HEALTH_DISABLE_FS_STATS", "1");
2361        std::env::set_var("DWBASE_MAX_DISK_MB", "1");
2362
2363        let dir = data_dir();
2364        fs::create_dir_all(&dir).unwrap();
2365        fs::write(dir.join("pressure.bin"), vec![0u8; 2 * 1024 * 1024]).unwrap();
2366
2367        let h = <Component as engine::Guest>::health();
2368        assert_eq!(h.disk_pressure, "degraded");
2369        assert_eq!(h.status, "degraded");
2370    }
2371
2372    #[test]
2373    fn metrics_snapshot_renders_prometheus() {
2374        let _lock = env_lock();
2375        reset_security_env();
2376        set_shared_data_dir();
2377        let _ = install_metrics_recorder();
2378        dwbase_metrics::record_observe_dropped(1);
2379
2380        let snap = <Component as engine::Guest>::metrics_snapshot().expect("metrics");
2381        assert_eq!(snap.format, "prometheus");
2382        println!("metrics snapshot text:\n{}", snap.prometheus);
2383        assert!(
2384            !snap.prometheus.is_empty(),
2385            "prometheus text should be present"
2386        );
2387    }
2388
2389    fn atom_for_world(id: &str, world: &str, ts: &str) -> Atom {
2390        Atom::builder(
2391            AtomId::new(id),
2392            WorldKey::new(world),
2393            WorkerKey::new("w"),
2394            AtomKind::Observation,
2395            Timestamp::new(ts),
2396            Importance::clamped(0.5),
2397            r#"{"x":true}"#,
2398        )
2399        .build()
2400    }
2401
2402    #[test]
2403    fn observe_drop_oldest_is_deterministic_fifo() {
2404        let _lock = env_lock();
2405        reset_security_env();
2406        std::env::set_var("DWBASE_OBSERVE_QUEUE_CAPACITY", "2");
2407        std::env::set_var("DWBASE_OBSERVE_DROP_POLICY", "drop_oldest");
2408
2409        let stream = LocalStream::new();
2410        let world = WorldKey::new("tenant:default/obs-drop-oldest");
2411        let handle = stream
2412            .subscribe(
2413                &world,
2414                AtomFilter {
2415                    world: Some(world.clone()),
2416                    kinds: vec![],
2417                    labels: vec![],
2418                    flags: vec![],
2419                    since: None,
2420                    until: None,
2421                    limit: None,
2422                },
2423            )
2424            .unwrap();
2425
2426        assert!(stream.push_to_handle(
2427            handle,
2428            atom_for_world("a1", &world.0, "2024-01-01T00:00:00Z")
2429        ));
2430        assert!(stream.push_to_handle(
2431            handle,
2432            atom_for_world("a2", &world.0, "2024-01-01T00:00:01Z")
2433        ));
2434        assert!(stream.push_to_handle(
2435            handle,
2436            atom_for_world("a3", &world.0, "2024-01-01T00:00:02Z")
2437        ));
2438
2439        let got = stream.poll_n(handle, 10);
2440        assert_eq!(got.len(), 2);
2441        assert_eq!(got[0].id().0, "a2");
2442        assert_eq!(got[1].id().0, "a3");
2443
2444        let (queued, dropped, _last) = stream.stats_for_handle(handle).unwrap();
2445        assert_eq!(queued, 0);
2446        assert_eq!(dropped, 1);
2447    }
2448
2449    #[test]
2450    fn observe_drop_newest_is_deterministic_fifo() {
2451        let _lock = env_lock();
2452        reset_security_env();
2453        std::env::set_var("DWBASE_OBSERVE_QUEUE_CAPACITY", "2");
2454        std::env::set_var("DWBASE_OBSERVE_DROP_POLICY", "drop_newest");
2455
2456        let stream = LocalStream::new();
2457        let world = WorldKey::new("tenant:default/obs-drop-newest");
2458        let handle = stream
2459            .subscribe(
2460                &world,
2461                AtomFilter {
2462                    world: Some(world.clone()),
2463                    kinds: vec![],
2464                    labels: vec![],
2465                    flags: vec![],
2466                    since: None,
2467                    until: None,
2468                    limit: None,
2469                },
2470            )
2471            .unwrap();
2472
2473        assert!(stream.push_to_handle(
2474            handle,
2475            atom_for_world("a1", &world.0, "2024-01-01T00:00:00Z")
2476        ));
2477        assert!(stream.push_to_handle(
2478            handle,
2479            atom_for_world("a2", &world.0, "2024-01-01T00:00:01Z")
2480        ));
2481        assert!(stream.push_to_handle(
2482            handle,
2483            atom_for_world("a3", &world.0, "2024-01-01T00:00:02Z")
2484        ));
2485
2486        let got = stream.poll_n(handle, 10);
2487        assert_eq!(got.len(), 2);
2488        assert_eq!(got[0].id().0, "a1");
2489        assert_eq!(got[1].id().0, "a2");
2490
2491        let (queued, dropped, _last) = stream.stats_for_handle(handle).unwrap();
2492        assert_eq!(queued, 0);
2493        assert_eq!(dropped, 1);
2494    }
2495
2496    #[test]
2497    fn observe_durable_cursor_catches_up_after_restart_like_gap() {
2498        let _lock = env_lock();
2499        reset_security_env();
2500        set_shared_data_dir();
2501        std::env::set_var("DWBASE_OBSERVE_DURABLE", "1");
2502
2503        let world = "tenant:default/obs-durable";
2504        let handle1 = <Component as engine::Guest>::observe_start(WitAtomFilter {
2505            world_key: Some(world.into()),
2506            kinds: vec![],
2507            labels: vec![],
2508            flag_filter: vec![],
2509            since: None,
2510            until: None,
2511            limit: None,
2512        });
2513        assert!(handle1 > 0);
2514
2515        let _a1 = <Component as engine::Guest>::remember(WitNewAtom {
2516            world_key: world.into(),
2517            worker: "worker-1".into(),
2518            kind: WitAtomKind::Observation,
2519            timestamp: "2024-01-01T00:00:00Z".into(),
2520            importance: 0.4,
2521            payload_json: r#"{"n":1}"#.into(),
2522            vector: None,
2523            flags_list: vec![],
2524            labels: vec![],
2525            links: vec![],
2526        });
2527        let _a2 = <Component as engine::Guest>::remember(WitNewAtom {
2528            world_key: world.into(),
2529            worker: "worker-1".into(),
2530            kind: WitAtomKind::Observation,
2531            timestamp: "2024-01-01T00:00:01Z".into(),
2532            importance: 0.4,
2533            payload_json: r#"{"n":2}"#.into(),
2534            vector: None,
2535            flags_list: vec![],
2536            labels: vec![],
2537            links: vec![],
2538        });
2539
2540        let got = <Component as engine::Guest>::observe_poll(handle1, 10);
2541        assert_eq!(got.len(), 2);
2542        <Component as engine::Guest>::observe_stop(handle1);
2543
2544        // Atom remembered while no observe subscription is active.
2545        let a3 = <Component as engine::Guest>::remember(WitNewAtom {
2546            world_key: world.into(),
2547            worker: "worker-1".into(),
2548            kind: WitAtomKind::Observation,
2549            timestamp: "2024-01-01T00:00:02Z".into(),
2550            importance: 0.4,
2551            payload_json: r#"{"n":3}"#.into(),
2552            vector: None,
2553            flags_list: vec![],
2554            labels: vec![],
2555            links: vec![],
2556        });
2557        assert!(!a3.is_empty());
2558
2559        let handle2 = <Component as engine::Guest>::observe_start(WitAtomFilter {
2560            world_key: Some(world.into()),
2561            kinds: vec![],
2562            labels: vec![],
2563            flag_filter: vec![],
2564            since: None,
2565            until: None,
2566            limit: None,
2567        });
2568        assert!(handle2 > 0);
2569
2570        let got = <Component as engine::Guest>::observe_poll(handle2, 10);
2571        assert_eq!(got.len(), 1);
2572        assert_eq!(got[0].id, a3);
2573        <Component as engine::Guest>::observe_stop(handle2);
2574    }
2575
2576    #[test]
2577    fn remember_v2_capability_denied_is_structured() {
2578        let _lock = env_lock();
2579        reset_security_env();
2580        set_shared_data_dir();
2581
2582        let res = <Component as engine::Guest>::remember_v2(WitNewAtom {
2583            world_key: "tenant:other/w1".into(),
2584            worker: "worker-1".into(),
2585            kind: WitAtomKind::Observation,
2586            timestamp: "2024-01-01T00:00:00Z".into(),
2587            importance: 0.5,
2588            payload_json: r#"{"note":"nope"}"#.into(),
2589            vector: None,
2590            flags_list: vec![],
2591            labels: vec![],
2592            links: vec![],
2593        });
2594
2595        let err = res.expect_err("should deny cross-tenant write");
2596        assert_eq!(err.code, "capability_denied");
2597    }
2598
2599    #[test]
2600    fn remember_v2_oversize_payload_is_structured() {
2601        let _lock = env_lock();
2602        reset_security_env();
2603        set_shared_data_dir();
2604        std::env::set_var("DWBASE_MAX_PAYLOAD_BYTES", "10");
2605
2606        let payload = "x".repeat(20);
2607        let res = <Component as engine::Guest>::remember_v2(WitNewAtom {
2608            world_key: "tenant:default/w1".into(),
2609            worker: "worker-1".into(),
2610            kind: WitAtomKind::Observation,
2611            timestamp: "2024-01-01T00:00:00Z".into(),
2612            importance: 0.1,
2613            payload_json: payload,
2614            vector: None,
2615            flags_list: vec![],
2616            labels: vec![],
2617            links: vec![],
2618        });
2619
2620        let err = res.expect_err("should reject oversize payload");
2621        assert_eq!(err.code, "payload_too_large");
2622    }
2623
2624    #[test]
2625    fn observe_v2_invalid_handle_is_structured() {
2626        let _lock = env_lock();
2627        reset_security_env();
2628        set_shared_data_dir();
2629
2630        let res = <Component as engine::Guest>::observe_poll_v2(91254, 1);
2631        let err = res.expect_err("invalid handle should error");
2632        assert_eq!(err.code, "invalid_handle");
2633
2634        let res = <Component as engine::Guest>::observe_stop_v2(91254);
2635        let err = res.expect_err("invalid handle should error");
2636        assert_eq!(err.code, "invalid_handle");
2637    }
2638}