1use std::collections::HashMap;
4use std::collections::HashSet;
5use std::collections::VecDeque;
6use std::fs;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Mutex;
9use std::time::{Duration, Instant};
10use std::{path::PathBuf, sync::Arc};
11
12use dwbase_core::{
13 Atom, AtomId, AtomKind, Importance, Link, LinkKind, Timestamp, WorkerKey, WorldKey,
14};
15use dwbase_embedder_dummy::DummyEmbedder;
16use dwbase_engine::{AtomFilter, DWBaseEngine, NewAtom, Question, StorageEngine, StreamEngine};
17use dwbase_security::{Capabilities, LocalGatekeeper, TrustStore};
18use dwbase_swarm_nats::replication::Replicator;
19use dwbase_swarm_nats::replication::WorldAccessPolicy;
20#[cfg(feature = "nats")]
21use dwbase_swarm_nats::swarm::NatsBus;
22use dwbase_swarm_nats::swarm::{MockBus, NatsSwarmTransport};
23use dwbase_swarm_nats::world_events::{
24 decode_event_batch, world_events_subject, WorldEventBroadcaster,
25};
26#[cfg(feature = "nats")]
27use dwbase_swarm_nats::AsyncNats;
28use dwbase_swarm_nats::{
29 now_rfc3339, start_presence_loop, MockNats, NatsClient, NodeHello, PeerTable,
30};
31use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
32use once_cell::sync::OnceCell;
33use parking_lot::Mutex as ParkingMutex;
34use serde::{Deserialize, Serialize};
35use time::OffsetDateTime;
36use wit_bindgen::generate;
37
38generate!({ path: "wit/dwbase-core.wit", world: "core" });
39
40use exports::dwbase::core::engine::{
41 self, Answer, Atom as WitAtom, AtomFilter as WitAtomFilter, AtomKind as WitAtomKind,
42 NewAtom as WitNewAtom, Question as WitQuestion,
43};
44
45type ComponentEngine =
46 DWBaseEngine<FsStorage, NoVector, LocalStream, LocalGatekeeper, DummyEmbedder>;
47
48static ENGINE: OnceCell<Mutex<ComponentEngine>> = OnceCell::new();
49static PEERS: OnceCell<PeerTable> = OnceCell::new();
50static SWARM: OnceCell<Arc<Replicator>> = OnceCell::new();
51static STREAM: OnceCell<LocalStream> = OnceCell::new();
52static BUS: OnceCell<Arc<dyn dwbase_swarm_nats::swarm::SwarmBus>> = OnceCell::new();
53static BROADCASTER: OnceCell<WorldEventBroadcaster> = OnceCell::new();
54static EVENT_SUBS: OnceCell<ParkingMutex<HashSet<String>>> = OnceCell::new();
55static LAST_REMOTE_INGEST_MS: AtomicU64 = AtomicU64::new(0);
56static PROM_HANDLE: OnceCell<Option<PrometheusHandle>> = OnceCell::new();
57
58fn now_ms() -> u64 {
59 (std::time::SystemTime::now()
60 .duration_since(std::time::UNIX_EPOCH)
61 .unwrap_or_default()
62 .as_millis()) as u64
63}
64
65fn install_metrics_recorder() -> Option<&'static PrometheusHandle> {
66 PROM_HANDLE
67 .get_or_init(|| PrometheusBuilder::new().install_recorder().ok())
68 .as_ref()
69}
70
71#[derive(Default)]
72struct ParsedMetrics {
73 counters: Vec<engine::MetricPoint>,
74 gauges: Vec<engine::MetricPoint>,
75 histograms: Vec<engine::HistogramMetric>,
76}
77
78fn parse_labels(raw: &str) -> Vec<engine::MetricLabel> {
79 if raw.trim().is_empty() {
80 return Vec::new();
81 }
82 let mut labels = Vec::new();
83 for pair in raw.split(',') {
84 if let Some((k, v)) = pair.split_once('=') {
85 let val = v.trim().trim_matches('"').replace("\\\"", "\"");
86 labels.push(engine::MetricLabel {
87 key: k.to_string(),
88 value: val,
89 });
90 }
91 }
92 labels.sort_by(|a, b| a.key.cmp(&b.key));
93 labels
94}
95
96fn labels_key(labels: &[engine::MetricLabel]) -> String {
97 labels
98 .iter()
99 .map(|l| format!("{}={}", l.key, l.value))
100 .collect::<Vec<_>>()
101 .join("|")
102}
103
104#[derive(Default)]
105struct HistAccum {
106 labels: Vec<engine::MetricLabel>,
107 buckets: Vec<engine::HistogramBucket>,
108 sum: Option<f64>,
109 count: Option<f64>,
110}
111
112fn parse_prometheus(text: &str) -> ParsedMetrics {
113 use std::collections::HashMap;
114
115 let mut types: HashMap<String, String> = HashMap::new();
116 for line in text.lines() {
117 if let Some(rest) = line.strip_prefix("# TYPE ") {
118 if let Some((name, ty)) = rest.trim().split_once(' ') {
119 types.insert(name.trim().to_string(), ty.trim().to_string());
120 }
121 }
122 }
123
124 let mut parsed = ParsedMetrics::default();
125 let mut hist_map: HashMap<(String, String), HistAccum> = HashMap::new();
126
127 for line in text.lines() {
128 let line = line.trim();
129 if line.is_empty() || line.starts_with('#') {
130 continue;
131 }
132 let Some((name_part, val_part)) = line.split_once(' ') else {
133 continue;
134 };
135 let value: f64 = match val_part.trim().parse() {
136 Ok(v) => v,
137 Err(_) => continue,
138 };
139
140 let (name, labels) = if let Some((n, raw_labels)) = name_part.split_once('{') {
141 let clean = raw_labels.trim_end_matches('}');
142 (n.to_string(), parse_labels(clean))
143 } else {
144 (name_part.to_string(), Vec::new())
145 };
146
147 if name.ends_with("_bucket") {
149 let base = name.trim_end_matches("_bucket").to_string();
150 let mut base_labels = labels.clone();
151 let le_idx = base_labels.iter().position(|l| l.key == "le");
152 let le = le_idx
153 .and_then(|idx| {
154 base_labels
155 .get(idx)
156 .and_then(|l| l.value.parse::<f64>().ok())
157 })
158 .unwrap_or(f64::INFINITY);
159 if let Some(idx) = le_idx {
160 base_labels.remove(idx);
161 }
162 let key = labels_key(&base_labels);
163 let entry = hist_map
164 .entry((base.clone(), key))
165 .or_insert_with(|| HistAccum {
166 labels: base_labels.clone(),
167 ..Default::default()
168 });
169 entry
170 .buckets
171 .push(engine::HistogramBucket { le, count: value });
172 continue;
173 }
174 if name.ends_with("_sum") {
175 let base = name.trim_end_matches("_sum").to_string();
176 let key = labels_key(&labels);
177 let entry = hist_map
178 .entry((base.clone(), key))
179 .or_insert_with(|| HistAccum {
180 labels: labels.clone(),
181 ..Default::default()
182 });
183 entry.sum = Some(value);
184 continue;
185 }
186 if name.ends_with("_count") {
187 let base = name.trim_end_matches("_count").to_string();
188 let key = labels_key(&labels);
189 let entry = hist_map
190 .entry((base.clone(), key))
191 .or_insert_with(|| HistAccum {
192 labels: labels.clone(),
193 ..Default::default()
194 });
195 entry.count = Some(value);
196 continue;
197 }
198
199 match types.get(&name).map(|s| s.as_str()) {
200 Some("counter") => parsed.counters.push(engine::MetricPoint {
201 name: name.clone(),
202 labels: labels.clone(),
203 value,
204 }),
205 Some("gauge") => parsed.gauges.push(engine::MetricPoint {
206 name: name.clone(),
207 labels: labels.clone(),
208 value,
209 }),
210 _ => parsed.gauges.push(engine::MetricPoint {
211 name: name.clone(),
212 labels: labels.clone(),
213 value,
214 }),
215 }
216 }
217
218 for ((name, _key), mut acc) in hist_map {
219 acc.buckets
220 .sort_by(|a, b| a.le.partial_cmp(&b.le).unwrap_or(std::cmp::Ordering::Equal));
221 parsed.histograms.push(engine::HistogramMetric {
222 name,
223 labels: acc.labels,
224 buckets: acc.buckets,
225 sum: acc.sum.unwrap_or(0.0),
226 count: acc.count.unwrap_or(0.0),
227 });
228 }
229
230 parsed
231}
232
233fn max_disk_bytes() -> u64 {
234 std::env::var("DWBASE_MAX_DISK_MB")
235 .ok()
236 .and_then(|v| v.parse::<u64>().ok())
237 .map(|mb| mb * 1024 * 1024)
238 .unwrap_or(0)
239}
240
241fn health_disk_warn_percent() -> f32 {
242 std::env::var("DWBASE_HEALTH_DISK_WARN_PCT")
243 .ok()
244 .and_then(|v| v.parse::<f32>().ok())
245 .unwrap_or(80.0)
246}
247
248fn health_disk_degraded_percent() -> f32 {
249 std::env::var("DWBASE_HEALTH_DISK_DEGRADED_PCT")
250 .ok()
251 .and_then(|v| v.parse::<f32>().ok())
252 .unwrap_or(90.0)
253}
254
255fn health_disable_fs_stats() -> bool {
256 std::env::var("DWBASE_HEALTH_DISABLE_FS_STATS").is_ok()
257}
258
259fn index_rebuild_warn_ms() -> u64 {
260 std::env::var("DWBASE_INDEX_REBUILD_WARN_SECS")
261 .ok()
262 .and_then(|v| v.parse::<u64>().ok())
263 .unwrap_or(60)
264 .saturating_mul(1000)
265}
266
267fn fs_capacity_bytes(path: &PathBuf) -> Option<(u64, u64)> {
268 if health_disable_fs_stats() {
269 return None;
270 }
271 #[cfg(not(target_arch = "wasm32"))]
272 {
273 let total = fs2::total_space(path).ok()?;
274 let free = fs2::available_space(path).ok()?;
275 Some((total, free))
276 }
277 #[cfg(target_arch = "wasm32")]
278 {
279 let _ = path;
280 None
281 }
282}
283
284fn dir_size_bytes(path: &PathBuf) -> u64 {
285 let mut total = 0u64;
286 let Ok(entries) = fs::read_dir(path) else {
287 return 0;
288 };
289 for ent in entries.flatten() {
290 let p = ent.path();
291 if let Ok(md) = ent.metadata() {
292 if md.is_file() {
293 total = total.saturating_add(md.len());
294 } else if md.is_dir() {
295 total = total.saturating_add(dir_size_bytes(&p));
296 }
297 }
298 }
299 total
300}
301
302fn compute_health(
303 engine: &DWBaseEngine<FsStorage, NoVector, LocalStream, LocalGatekeeper, DummyEmbedder>,
304) -> engine::HealthSnapshot {
305 let storage_ok = engine.storage_ready();
306 let index_ok = engine.index_status().iter().all(|m| m.ready);
307
308 let data = data_dir();
309 let used_bytes = dir_size_bytes(&data);
310 let configured_total_bytes = max_disk_bytes();
311 let mut disk_total_bytes = 0u64;
312 let mut disk_free_bytes = 0u64;
313 let mut disk_used_percent = 0.0f32;
314 let mut pressure_basis = "unknown";
315
316 if configured_total_bytes > 0 {
317 disk_total_bytes = configured_total_bytes;
318 disk_free_bytes = configured_total_bytes.saturating_sub(used_bytes);
319 disk_used_percent = (used_bytes as f32 / configured_total_bytes as f32) * 100.0;
320 pressure_basis = "configured";
321 } else if let Some((fs_total, fs_free)) = fs_capacity_bytes(&data) {
322 disk_total_bytes = fs_total;
323 disk_free_bytes = fs_free;
324 if fs_total > 0 {
325 disk_used_percent =
326 ((fs_total.saturating_sub(fs_free)) as f32 / fs_total as f32) * 100.0;
327 }
328 pressure_basis = "filesystem";
329 }
330
331 let warn_pct = health_disk_warn_percent();
332 let degraded_pct = health_disk_degraded_percent();
333 let disk_pressure = if disk_total_bytes == 0 {
334 "unknown".to_string()
335 } else if disk_used_percent >= degraded_pct {
336 "degraded".to_string()
337 } else if disk_used_percent >= warn_pct {
338 "warn".to_string()
339 } else {
340 "ok".to_string()
341 };
342
343 let last_remote = LAST_REMOTE_INGEST_MS.load(Ordering::Relaxed);
344 let lag_ms = if last_remote == 0 {
345 0
346 } else {
347 now_ms().saturating_sub(last_remote)
348 };
349
350 let quarantine_count = 0u64;
352
353 if disk_total_bytes > 0 {
354 let used_for_metrics = if pressure_basis == "filesystem" {
355 disk_total_bytes.saturating_sub(disk_free_bytes)
356 } else {
357 used_bytes
358 };
359 dwbase_metrics::record_disk_usage(used_for_metrics, disk_total_bytes);
360 }
361 dwbase_metrics::record_sync_lag(Duration::from_millis(lag_ms));
362 dwbase_metrics::record_quarantine_count(quarantine_count);
363
364 let mut status = "ready".to_string();
365 let mut message = "ok".to_string();
366 if let Some(lag) = engine.max_index_rebuild_lag_ms() {
367 if lag > index_rebuild_warn_ms() {
368 status = "degraded".into();
369 message = format!("index rebuild lag {lag}ms");
370 }
371 }
372 if !storage_ok || !index_ok {
373 status = "degraded".into();
374 message = "storage or index not ready".into();
375 } else if disk_pressure == "degraded" {
376 status = "degraded".into();
377 message = match pressure_basis {
378 "filesystem" => "disk pressure degraded (filesystem)".into(),
379 "configured" => "disk pressure degraded (configured capacity)".into(),
380 _ => "disk pressure degraded".into(),
381 };
382 } else if disk_pressure == "warn" {
383 message = match pressure_basis {
384 "filesystem" => "disk pressure warn (filesystem)".into(),
385 "configured" => "disk pressure warn (configured capacity)".into(),
386 _ => "disk pressure warn".into(),
387 };
388 }
389
390 engine::HealthSnapshot {
391 status,
392 message,
393 storage_ok,
394 index_ok,
395 disk_used_bytes: used_bytes,
396 disk_free_bytes,
397 disk_total_bytes,
398 disk_used_percent,
399 disk_pressure,
400 sync_lag_ms: lag_ms,
401 quarantine_count,
402 }
403}
404
405fn data_dir() -> PathBuf {
406 std::env::var("DWBASE_DATA_DIR")
407 .map(PathBuf::from)
408 .unwrap_or_else(|_| PathBuf::from("./dwbase-data"))
409}
410
411fn tenant_id() -> String {
412 std::env::var("DWBASE_TENANT_ID")
413 .or_else(|_| std::env::var("GREENTIC_TENANT_ID"))
414 .unwrap_or_else(|_| "default".into())
415}
416
417fn tenant_prefix(id: &str) -> String {
418 format!("tenant:{id}/")
419}
420
421fn tool_error(
422 code: &str,
423 message: impl Into<String>,
424 details_json: Option<String>,
425) -> engine::ToolError {
426 engine::ToolError {
427 code: code.to_string(),
428 message: message.into(),
429 details_json,
430 }
431}
432
433fn err_invalid_input(message: impl Into<String>) -> engine::ToolError {
434 tool_error("invalid_input", message, None)
435}
436
437fn err_capability_denied(message: impl Into<String>) -> engine::ToolError {
438 tool_error("capability_denied", message, None)
439}
440
441fn err_invalid_handle(message: impl Into<String>) -> engine::ToolError {
442 tool_error("invalid_handle", message, None)
443}
444
445fn err_storage(message: impl Into<String>) -> engine::ToolError {
446 tool_error("storage_error", message, None)
447}
448
449fn map_validation_error(message: String) -> engine::ToolError {
450 let lower = message.to_ascii_lowercase();
451 if lower.starts_with("write denied") || lower.starts_with("read denied") {
452 return err_capability_denied(message);
453 }
454 if lower.starts_with("payload too large") {
455 return tool_error("payload_too_large", message, None);
456 }
457 if lower.starts_with("importance") {
458 return tool_error("importance_cap", message, None);
459 }
460 if lower.starts_with("kind") {
461 return tool_error("kind_not_allowed", message, None);
462 }
463 if lower.starts_with("label") || lower.contains("labels not permitted") {
464 return tool_error("label_not_allowed", message, None);
465 }
466 err_invalid_input(message)
467}
468
469fn effective_worker(input: &str) -> String {
470 std::env::var("DWBASE_WORKER_ID")
471 .or_else(|_| std::env::var("GREENTIC_WORKER_ID"))
472 .unwrap_or_else(|_| {
473 if input.trim().is_empty() {
474 "llm".into()
475 } else {
476 input.to_string()
477 }
478 })
479}
480
481fn subscription_patterns() -> Vec<String> {
482 let raw = std::env::var("DWBASE_SUBSCRIBE_WORLDS")
483 .or_else(|_| std::env::var("DWBASE_SUBSCRIBE_PATTERNS"))
484 .unwrap_or_default();
485 raw.split(',')
486 .map(|s| s.trim().to_string())
487 .filter(|s| !s.is_empty())
488 .collect()
489}
490
491fn observe_queue_capacity() -> usize {
492 std::env::var("DWBASE_OBSERVE_QUEUE_CAPACITY")
493 .ok()
494 .and_then(|v| v.parse::<usize>().ok())
495 .filter(|v| *v > 0)
496 .unwrap_or(10_000)
497}
498
499fn observe_drop_policy() -> ObserveDropPolicy {
500 let raw = std::env::var("DWBASE_OBSERVE_DROP_POLICY").unwrap_or_else(|_| "drop_oldest".into());
501 match raw.to_ascii_lowercase().as_str() {
502 "drop_newest" | "newest" => ObserveDropPolicy::DropNewest,
503 _ => ObserveDropPolicy::DropOldest,
504 }
505}
506
507fn observe_durable_enabled() -> bool {
508 std::env::var("DWBASE_OBSERVE_DURABLE").is_ok()
509}
510
511fn observe_durable_catchup_limit() -> usize {
512 std::env::var("DWBASE_OBSERVE_DURABLE_CATCHUP_LIMIT")
513 .ok()
514 .and_then(|v| v.parse::<usize>().ok())
515 .filter(|v| *v > 0)
516 .unwrap_or(1_000)
517}
518
519#[derive(Clone, Debug)]
520struct SecurityConfig {
521 tenant_id: String,
522 enforce_tenant_namespace: bool,
523 allow_read_worlds: Vec<String>,
524 allow_write_worlds: Vec<String>,
525 allow_read_prefixes: Vec<String>,
526 allow_write_prefixes: Vec<String>,
527 payload_max_bytes: usize,
528 importance_cap: f32,
529 allowed_kinds: Vec<AtomKind>,
530 allowed_labels: Vec<String>,
531 allow_policy_labels: bool,
532}
533
534impl Default for SecurityConfig {
535 fn default() -> Self {
536 Self {
537 tenant_id: tenant_id(),
538 enforce_tenant_namespace: true,
539 allow_read_worlds: Vec::new(),
540 allow_write_worlds: Vec::new(),
541 allow_read_prefixes: Vec::new(),
542 allow_write_prefixes: Vec::new(),
543 payload_max_bytes: 64 * 1024,
544 importance_cap: 0.7,
545 allowed_kinds: vec![AtomKind::Observation, AtomKind::Reflection],
546 allowed_labels: Vec::new(),
547 allow_policy_labels: false,
548 }
549 }
550}
551
552fn parse_csv_env(key: &str) -> Vec<String> {
553 std::env::var(key)
554 .ok()
555 .unwrap_or_default()
556 .split(',')
557 .map(|s| s.trim().to_string())
558 .filter(|s| !s.is_empty())
559 .collect()
560}
561
562impl SecurityConfig {
563 fn from_env() -> Self {
564 let mut cfg = SecurityConfig::default();
565 if let Ok(v) = std::env::var("DWBASE_ENFORCE_TENANT_NAMESPACE") {
566 cfg.enforce_tenant_namespace = v != "0" && !v.eq_ignore_ascii_case("false");
567 }
568 cfg.allow_read_worlds = parse_csv_env("DWBASE_ALLOW_READ_WORLDS");
569 cfg.allow_write_worlds = parse_csv_env("DWBASE_ALLOW_WRITE_WORLDS");
570 cfg.allow_read_prefixes = parse_csv_env("DWBASE_ALLOW_READ_PREFIXES");
571 cfg.allow_write_prefixes = parse_csv_env("DWBASE_ALLOW_WRITE_PREFIXES");
572
573 if let Ok(v) = std::env::var("DWBASE_MAX_PAYLOAD_BYTES") {
574 if let Ok(n) = v.parse::<usize>() {
575 cfg.payload_max_bytes = n;
576 }
577 }
578 if let Ok(v) = std::env::var("DWBASE_IMPORTANCE_CAP") {
579 if let Ok(n) = v.parse::<f32>() {
580 cfg.importance_cap = n;
581 }
582 }
583 let kinds = parse_csv_env("DWBASE_ALLOWED_KINDS");
584 if !kinds.is_empty() {
585 cfg.allowed_kinds = kinds
586 .iter()
587 .filter_map(|k| match k.to_ascii_lowercase().as_str() {
588 "observation" => Some(AtomKind::Observation),
589 "reflection" => Some(AtomKind::Reflection),
590 "plan" => Some(AtomKind::Plan),
591 "action" => Some(AtomKind::Action),
592 "message" => Some(AtomKind::Message),
593 _ => None,
594 })
595 .collect();
596 }
597 cfg.allowed_labels = parse_csv_env("DWBASE_ALLOWED_LABELS");
598 cfg.allow_policy_labels = std::env::var("DWBASE_ALLOW_POLICY_LABELS").is_ok();
599 cfg
600 }
601}
602
603fn matches_prefixes(prefixes: &[String], world: &str) -> bool {
604 prefixes.iter().any(|p| world.starts_with(p))
605}
606
607fn validate_world_for_write(world: &str) -> Result<(), String> {
608 let cfg = SecurityConfig::from_env();
609 if cfg.allow_write_worlds.iter().any(|w| w == world) {
610 return Ok(());
611 }
612 if matches_prefixes(&cfg.allow_write_prefixes, world) {
613 return Ok(());
614 }
615 if cfg.enforce_tenant_namespace {
616 let prefix = tenant_prefix(&cfg.tenant_id);
617 if !world.starts_with(&prefix) {
618 return Err(format!(
619 "write denied: world must be within {prefix} (set DWBASE_ALLOW_WRITE_WORLDS/DWBASE_ALLOW_WRITE_PREFIXES to override)"
620 ));
621 }
622 }
623 Ok(())
624}
625
626fn validate_world_for_read(world: &str) -> Result<(), String> {
627 let cfg = SecurityConfig::from_env();
628 if cfg.allow_read_worlds.iter().any(|w| w == world) {
629 return Ok(());
630 }
631 if matches_prefixes(&cfg.allow_read_prefixes, world) {
632 return Ok(());
633 }
634 if cfg.enforce_tenant_namespace {
635 let prefix = tenant_prefix(&cfg.tenant_id);
636 if !world.starts_with(&prefix) {
637 return Err(format!(
638 "read denied: world must be within {prefix} (set DWBASE_ALLOW_READ_WORLDS/DWBASE_ALLOW_READ_PREFIXES to override)"
639 ));
640 }
641 }
642 Ok(())
643}
644
645fn validate_new_atom(new_atom: &NewAtom) -> Result<(), String> {
646 let cfg = SecurityConfig::from_env();
647 validate_world_for_write(&new_atom.world.0)?;
648 if new_atom.payload_json.len() > cfg.payload_max_bytes {
649 return Err(format!(
650 "payload too large: {} bytes > max {} (DWBASE_MAX_PAYLOAD_BYTES)",
651 new_atom.payload_json.len(),
652 cfg.payload_max_bytes
653 ));
654 }
655 if new_atom.importance.get() > cfg.importance_cap {
656 return Err(format!(
657 "importance {} exceeds cap {} (DWBASE_IMPORTANCE_CAP)",
658 new_atom.importance.get(),
659 cfg.importance_cap
660 ));
661 }
662 if !cfg.allowed_kinds.contains(&new_atom.kind) {
663 return Err(format!(
664 "kind {:?} not permitted (DWBASE_ALLOWED_KINDS)",
665 new_atom.kind
666 ));
667 }
668 for label in &new_atom.labels {
669 if cfg.allow_policy_labels && label.starts_with("policy:") {
670 continue;
671 }
672 if !cfg.allowed_labels.is_empty() && cfg.allowed_labels.contains(label) {
673 continue;
674 }
675 if cfg.allowed_labels.is_empty() {
676 return Err("labels not permitted by default (set DWBASE_ALLOWED_LABELS or DWBASE_ALLOW_POLICY_LABELS)".into());
677 }
678 return Err(format!("label not permitted: {label}"));
679 }
680 Ok(())
681}
682
683fn init_engine_v2() -> Result<&'static Mutex<ComponentEngine>, engine::ToolError> {
684 ENGINE.get_or_try_init(|| {
685 let _ = SecurityConfig::from_env();
686 let _ = install_metrics_recorder();
687 let dir = data_dir();
688 let storage = FsStorage::new(dir.clone())
689 .map_err(|e| err_storage(format!("init storage failed: {e}")))?;
690 let vector = NoVector;
691 let stream = LocalStream::new();
692 let _ = STREAM.set(stream.clone());
693 let gatekeeper = LocalGatekeeper::new(Capabilities::default(), TrustStore::default());
694 let embedder = DummyEmbedder::new();
695 let engine = DWBaseEngine::new(storage, vector, stream, gatekeeper, embedder);
696 maybe_start_presence(&engine);
697 maybe_start_swarm(&engine);
698 Ok(Mutex::new(engine))
699 })
700}
701
702fn init_engine() -> &'static Mutex<ComponentEngine> {
703 init_engine_v2().expect("engine init")
704}
705
706fn node_id() -> String {
707 std::env::var("DWBASE_NODE_ID").unwrap_or_else(|_| "component-dwbase".into())
708}
709
710fn maybe_start_presence(
711 engine: &DWBaseEngine<FsStorage, NoVector, LocalStream, LocalGatekeeper, DummyEmbedder>,
712) {
713 if std::env::var("NATS_URL").is_err()
715 && std::env::var("GREENTIC_NATS_URL").is_err()
716 && std::env::var("DWBASE_PRESENCE_MOCK").is_err()
717 {
718 return;
719 }
720 let table = PEERS.get_or_init(PeerTable::default);
721 let client: Arc<dyn NatsClient> = if std::env::var("DWBASE_PRESENCE_MOCK").is_ok() {
722 Arc::new(MockNats::default())
723 } else {
724 #[cfg(feature = "nats")]
725 {
726 std::env::var("NATS_URL")
727 .or_else(|_| std::env::var("GREENTIC_NATS_URL"))
728 .ok()
729 .and_then(|url| {
730 AsyncNats::connect(&url)
731 .ok()
732 .map(|c| Arc::new(c) as Arc<dyn NatsClient>)
733 })
734 .unwrap_or_else(|| Arc::new(MockNats::default()))
735 }
736 #[cfg(not(feature = "nats"))]
737 {
738 Arc::new(MockNats::default())
739 }
740 };
741 let hello = hello_from_engine(engine);
742 let ttl = std::time::Duration::from_secs(30);
743 start_presence_loop(client, hello, table.clone(), ttl);
744}
745
746fn maybe_start_swarm(
747 engine: &DWBaseEngine<FsStorage, NoVector, LocalStream, LocalGatekeeper, DummyEmbedder>,
748) {
749 let patterns = subscription_patterns();
751 let enabled = std::env::var("DWBASE_SWARM_ENABLE").is_ok() || !patterns.is_empty();
752 if !enabled {
753 return;
754 }
755
756 let self_id = dwbase_swarm::PeerId::new(node_id());
757 let bus: Arc<dyn dwbase_swarm_nats::swarm::SwarmBus> =
758 if std::env::var("DWBASE_SWARM_MOCK").is_ok() {
759 Arc::new(MockBus::default()) as Arc<dyn dwbase_swarm_nats::swarm::SwarmBus>
760 } else {
761 #[cfg(feature = "nats")]
762 {
763 std::env::var("NATS_URL")
764 .or_else(|_| std::env::var("GREENTIC_NATS_URL"))
765 .ok()
766 .and_then(|url| {
767 NatsBus::connect(&url)
768 .ok()
769 .map(|b| Arc::new(b) as Arc<dyn dwbase_swarm_nats::swarm::SwarmBus>)
770 })
771 .unwrap_or_else(|| {
772 Arc::new(MockBus::default()) as Arc<dyn dwbase_swarm_nats::swarm::SwarmBus>
773 })
774 }
775 #[cfg(not(feature = "nats"))]
776 {
777 Arc::new(MockBus::default()) as Arc<dyn dwbase_swarm_nats::swarm::SwarmBus>
778 }
779 };
780
781 let _ = BUS.set(bus.clone());
782 let _ = EVENT_SUBS.set(ParkingMutex::new(HashSet::new()));
783 if std::env::var("DWBASE_OBSERVE_NATS_BROADCAST").is_ok() {
784 let _ = BROADCASTER.set(WorldEventBroadcaster::new(bus.clone(), node_id(), 200.0));
785 }
786
787 let transport = NatsSwarmTransport::new(bus, self_id, 200.0).expect("init swarm transport");
788 let policy: Arc<dyn WorldAccessPolicy> = Arc::new(ComponentWorldAccessPolicy::new(data_dir()));
789 let swarm_state = data_dir().join("swarm.json");
790 let replicator = Arc::new(
791 Replicator::with_policy_and_state(
792 transport,
793 patterns,
794 std::time::Duration::from_secs(30),
795 policy,
796 Some(swarm_state),
797 512,
798 std::time::Duration::from_secs(300),
799 )
800 .expect("init replicator"),
801 );
802 let _ = SWARM.set(replicator.clone());
803 start_replication_loop(engine, replicator);
804}
805
806#[derive(Clone)]
807struct ComponentWorldAccessPolicy {
808 data_dir: PathBuf,
809 cache: Arc<Mutex<PolicyCache>>,
810}
811
812struct PolicyCache {
813 last_loaded: std::time::Instant,
814 deny_prefixes: Vec<String>,
815 allow_prefixes: Vec<String>,
816 min_retention_days: Option<u64>,
817}
818
819impl Default for PolicyCache {
820 fn default() -> Self {
821 Self {
822 last_loaded: std::time::Instant::now() - std::time::Duration::from_secs(3600),
823 deny_prefixes: Vec::new(),
824 allow_prefixes: Vec::new(),
825 min_retention_days: None,
826 }
827 }
828}
829
830impl ComponentWorldAccessPolicy {
831 fn new(data_dir: PathBuf) -> Self {
832 Self {
833 data_dir,
834 cache: Arc::new(Mutex::new(PolicyCache::default())),
835 }
836 }
837
838 fn refresh_cache(&self) {
839 let mut cache = self.cache.lock().unwrap();
840 if cache.last_loaded.elapsed() < std::time::Duration::from_secs(1) {
841 return;
842 }
843 cache.last_loaded = std::time::Instant::now();
844
845 let policy_world = format!("{}policy", tenant_prefix(&tenant_id()));
846 let path = self.data_dir.join("atoms.json");
847 let Ok(bytes) = fs::read(&path) else {
848 return;
849 };
850 let Ok(persisted) = serde_json::from_slice::<Persisted>(&bytes) else {
851 return;
852 };
853 let Some(atoms) = persisted.atoms.get(&policy_world) else {
854 cache.deny_prefixes.clear();
855 cache.allow_prefixes.clear();
856 cache.min_retention_days = None;
857 return;
858 };
859 let mut deny = Vec::new();
860 let mut allow = Vec::new();
861 let mut min_retention_days = None;
862 for atom in atoms {
863 for label in atom.labels() {
864 if let Some(pat) = label.strip_prefix("policy:replication_deny=") {
865 deny.push(pat.to_string());
866 }
867 if let Some(pat) = label.strip_prefix("policy:replication_allow=") {
868 allow.push(pat.to_string());
869 }
870 if let Some(v) = label.strip_prefix("policy:retention_min_days=") {
871 if let Ok(n) = v.parse::<u64>() {
872 min_retention_days = Some(n);
873 }
874 }
875 }
876 }
877 cache.deny_prefixes = deny;
878 cache.allow_prefixes = allow;
879 cache.min_retention_days = min_retention_days;
880 }
881
882 fn policy_allows_replication(&self, world: &str) -> bool {
883 self.refresh_cache();
884 let cache = self.cache.lock().unwrap();
885 let denied = cache.deny_prefixes.iter().any(|p| world.starts_with(p));
886 if denied {
887 return false;
888 }
889 if cache.allow_prefixes.is_empty() {
890 return true;
891 }
892 cache.allow_prefixes.iter().any(|p| world.starts_with(p))
893 }
894}
895
896impl WorldAccessPolicy for ComponentWorldAccessPolicy {
897 fn can_send_world(&self, world: &str, _to: &dwbase_swarm::PeerId) -> bool {
898 validate_world_for_read(world).is_ok() && self.policy_allows_replication(world)
899 }
900
901 fn can_receive_world(&self, world: &str, _from: &dwbase_swarm::PeerId) -> bool {
902 validate_world_for_read(world).is_ok() && self.policy_allows_replication(world)
903 }
904}
905
906fn start_replication_loop(
907 engine: &DWBaseEngine<FsStorage, NoVector, LocalStream, LocalGatekeeper, DummyEmbedder>,
908 replicator: Arc<Replicator>,
909) {
910 let _engine_ptr = engine as *const _;
912 std::thread::spawn(move || loop {
913 let _ = replicator.announce();
914 let _ = replicator.poll_inbox();
915 while let Some((_from, batch)) = replicator.poll_atom_batch() {
916 if let Some(engine_mutex) = ENGINE.get() {
917 let guard = engine_mutex.lock().unwrap();
918 if futures::executor::block_on(guard.ingest_remote_atoms(batch.atoms)).is_ok() {
919 LAST_REMOTE_INGEST_MS.store(now_ms(), Ordering::Relaxed);
920 }
921 }
922 }
923 std::thread::sleep(std::time::Duration::from_millis(100));
924 });
925}
926
927fn hello_from_engine(
928 engine: &DWBaseEngine<FsStorage, NoVector, LocalStream, LocalGatekeeper, DummyEmbedder>,
929) -> NodeHello {
930 NodeHello {
931 node_id: node_id(),
932 endpoint: "component-local".into(),
933 worlds_served: engine
934 .storage
935 .worlds()
936 .unwrap_or_default()
937 .into_iter()
938 .map(|w| w.0)
939 .collect(),
940 trust_score: 1.0,
941 started_at: now_rfc3339(),
942 version: env!("CARGO_PKG_VERSION").into(),
943 }
944}
945
946pub fn peers() -> Vec<NodeHello> {
947 PEERS.get().map(|t| t.peers()).unwrap_or_default()
948}
949
950#[derive(Clone)]
951struct LocalStream {
952 inner: Arc<ParkingMutex<LocalStreamInner>>,
953}
954
955#[derive(Clone, Copy, Debug)]
956enum ObserveDropPolicy {
957 DropOldest,
958 DropNewest,
959}
960
961struct LocalStreamInner {
962 next_handle: AtomicU64,
963 subs: HashMap<u64, LocalSubscription>,
964}
965
966struct LocalSubscription {
967 world: WorldKey,
968 filter: AtomFilter,
969 queue: VecDeque<Atom>,
970 capacity: usize,
971 drop_policy: ObserveDropPolicy,
972 dropped_total: u64,
973 last_event_ms: u64,
974}
975
976impl LocalStream {
977 fn new() -> Self {
978 Self {
979 inner: Arc::new(ParkingMutex::new(LocalStreamInner {
980 next_handle: AtomicU64::new(1),
981 subs: HashMap::new(),
982 })),
983 }
984 }
985
986 fn matches_filter(atom: &Atom, filter: &AtomFilter) -> bool {
987 if let Some(world) = &filter.world {
988 if atom.world() != world {
989 return false;
990 }
991 }
992 if !filter.kinds.is_empty() && !filter.kinds.contains(atom.kind()) {
993 return false;
994 }
995 if !filter.labels.is_empty() && !filter.labels.iter().all(|l| atom.labels().contains(l)) {
996 return false;
997 }
998 if !filter.flags.is_empty() && !filter.flags.iter().all(|f| atom.flags().contains(f)) {
999 return false;
1000 }
1001 if let Some(since) = &filter.since {
1002 if atom.timestamp().0 < since.0 {
1003 return false;
1004 }
1005 }
1006 if let Some(until) = &filter.until {
1007 if atom.timestamp().0 > until.0 {
1008 return false;
1009 }
1010 }
1011 true
1012 }
1013
1014 fn push_atom(&self, atom: Atom) {
1015 let mut guard = self.inner.lock();
1016 for sub in guard.subs.values_mut() {
1017 if &sub.world != atom.world() {
1018 continue;
1019 }
1020 if !Self::matches_filter(&atom, &sub.filter) {
1021 continue;
1022 }
1023 Self::enqueue(sub, atom.clone());
1024 }
1025 }
1026
1027 fn enqueue(sub: &mut LocalSubscription, atom: Atom) {
1028 if sub.queue.len() >= sub.capacity {
1029 sub.dropped_total = sub.dropped_total.saturating_add(1);
1030 dwbase_metrics::record_observe_dropped(1);
1031 match sub.drop_policy {
1032 ObserveDropPolicy::DropOldest => {
1033 let _ = sub.queue.pop_front();
1034 }
1035 ObserveDropPolicy::DropNewest => {
1036 dwbase_metrics::record_observe_queue_depth(sub.queue.len() as u64);
1037 return;
1038 }
1039 }
1040 }
1041 sub.queue.push_back(atom);
1042 sub.last_event_ms = now_ms();
1043 dwbase_metrics::record_observe_queue_depth(sub.queue.len() as u64);
1044 }
1045
1046 fn poll_n(&self, handle: u64, max: usize) -> Vec<Atom> {
1047 let mut out = Vec::new();
1048 let mut guard = self.inner.lock();
1049 let Some(sub) = guard.subs.get_mut(&handle) else {
1050 return out;
1051 };
1052 for _ in 0..max {
1053 if let Some(a) = sub.queue.pop_front() {
1054 out.push(a);
1055 } else {
1056 break;
1057 }
1058 }
1059 out
1060 }
1061
1062 fn has_handle(&self, handle: u64) -> bool {
1063 self.inner.lock().subs.contains_key(&handle)
1064 }
1065
1066 fn world_for_handle(&self, handle: u64) -> Option<WorldKey> {
1067 self.inner.lock().subs.get(&handle).map(|s| s.world.clone())
1068 }
1069
1070 fn stats_for_handle(&self, handle: u64) -> Option<(usize, u64, u64)> {
1071 let guard = self.inner.lock();
1072 let sub = guard.subs.get(&handle)?;
1073 Some((sub.queue.len(), sub.dropped_total, sub.last_event_ms))
1074 }
1075
1076 fn push_to_handle(&self, handle: u64, atom: Atom) -> bool {
1077 let mut guard = self.inner.lock();
1078 let Some(sub) = guard.subs.get_mut(&handle) else {
1079 return false;
1080 };
1081 if &sub.world != atom.world() {
1082 return false;
1083 }
1084 if !Self::matches_filter(&atom, &sub.filter) {
1085 return false;
1086 }
1087 Self::enqueue(sub, atom);
1088 true
1089 }
1090}
1091
1092impl dwbase_engine::StreamEngine for LocalStream {
1093 type Handle = u64;
1094
1095 fn publish(&self, atom: &Atom) -> dwbase_engine::Result<()> {
1096 self.push_atom(atom.clone());
1097 Ok(())
1098 }
1099
1100 fn subscribe(
1101 &self,
1102 world: &WorldKey,
1103 filter: AtomFilter,
1104 ) -> dwbase_engine::Result<Self::Handle> {
1105 let handle = self
1106 .inner
1107 .lock()
1108 .next_handle
1109 .fetch_add(1, Ordering::Relaxed);
1110 let mut guard = self.inner.lock();
1111 guard.subs.insert(
1112 handle,
1113 LocalSubscription {
1114 world: world.clone(),
1115 filter,
1116 queue: VecDeque::new(),
1117 capacity: observe_queue_capacity(),
1118 drop_policy: observe_drop_policy(),
1119 dropped_total: 0,
1120 last_event_ms: 0,
1121 },
1122 );
1123 Ok(handle)
1124 }
1125
1126 fn poll(&self, handle: &Self::Handle) -> dwbase_engine::Result<Option<Atom>> {
1127 Ok(self.poll_n(*handle, 1).pop())
1128 }
1129
1130 fn stop(&self, handle: Self::Handle) -> dwbase_engine::Result<()> {
1131 self.inner.lock().subs.remove(&handle);
1132 Ok(())
1133 }
1134}
1135
1136#[derive(Default)]
1137struct NoVector;
1138impl dwbase_engine::VectorEngine for NoVector {
1139 fn upsert(
1140 &self,
1141 _world: &WorldKey,
1142 _atom_id: &AtomId,
1143 _vector: &[f32],
1144 ) -> dwbase_engine::Result<()> {
1145 Ok(())
1146 }
1147
1148 fn search(
1149 &self,
1150 _world: &WorldKey,
1151 _query: &[f32],
1152 _k: usize,
1153 _filter: &AtomFilter,
1154 ) -> dwbase_engine::Result<Vec<AtomId>> {
1155 Ok(Vec::new())
1156 }
1157
1158 fn rebuild(&self, _world: &WorldKey) -> dwbase_engine::Result<()> {
1159 Ok(())
1160 }
1161}
1162
1163#[derive(Debug, Serialize, Deserialize)]
1164struct Persisted {
1165 atoms: HashMap<String, Vec<Atom>>,
1166}
1167
1168#[derive(Debug)]
1169struct FsStorage {
1170 root: PathBuf,
1171 data: Mutex<HashMap<WorldKey, Vec<Atom>>>,
1172}
1173
1174impl FsStorage {
1175 fn new(root: PathBuf) -> dwbase_engine::Result<Self> {
1176 if !root.exists() {
1177 fs::create_dir_all(&root)
1178 .map_err(|e| dwbase_engine::DwbaseError::Storage(e.to_string()))?;
1179 }
1180 let path = root.join("atoms.json");
1181 let data = if path.exists() {
1182 let bytes =
1183 fs::read(&path).map_err(|e| dwbase_engine::DwbaseError::Storage(e.to_string()))?;
1184 if bytes.is_empty() {
1185 HashMap::new()
1186 } else {
1187 let persisted: Persisted = serde_json::from_slice(&bytes)
1188 .map_err(|e| dwbase_engine::DwbaseError::Storage(e.to_string()))?;
1189 persisted
1190 .atoms
1191 .into_iter()
1192 .map(|(k, v)| (WorldKey::new(k), v))
1193 .collect()
1194 }
1195 } else {
1196 HashMap::new()
1197 };
1198 Ok(Self {
1199 root,
1200 data: Mutex::new(data),
1201 })
1202 }
1203
1204 fn persist(&self, data: &HashMap<WorldKey, Vec<Atom>>) -> dwbase_engine::Result<()> {
1205 let path = self.root.join("atoms.json");
1206 let persistable = Persisted {
1207 atoms: data.iter().map(|(k, v)| (k.0.clone(), v.clone())).collect(),
1208 };
1209 let bytes = serde_json::to_vec_pretty(&persistable)
1210 .map_err(|e| dwbase_engine::DwbaseError::Storage(e.to_string()))?;
1211 fs::write(path, bytes).map_err(|e| dwbase_engine::DwbaseError::Storage(e.to_string()))?;
1212 Ok(())
1213 }
1214
1215 fn matches_filter(atom: &Atom, filter: &AtomFilter) -> bool {
1216 if let Some(world) = &filter.world {
1217 if atom.world() != world {
1218 return false;
1219 }
1220 }
1221 if !filter.kinds.is_empty() && !filter.kinds.contains(atom.kind()) {
1222 return false;
1223 }
1224 if !filter.labels.is_empty() && !filter.labels.iter().all(|l| atom.labels().contains(l)) {
1225 return false;
1226 }
1227 if !filter.flags.is_empty() && !filter.flags.iter().all(|f| atom.flags().contains(f)) {
1228 return false;
1229 }
1230 if let Some(since) = &filter.since {
1231 if atom.timestamp().0 < since.0 {
1232 return false;
1233 }
1234 }
1235 if let Some(until) = &filter.until {
1236 if atom.timestamp().0 > until.0 {
1237 return false;
1238 }
1239 }
1240 true
1241 }
1242}
1243
1244impl StorageEngine for FsStorage {
1245 fn append(&self, atom: Atom) -> dwbase_engine::Result<()> {
1246 let mut guard = self.data.lock().unwrap();
1247 guard.entry(atom.world().clone()).or_default().push(atom);
1248 self.persist(&guard)
1249 }
1250
1251 fn get_by_ids(&self, ids: &[AtomId]) -> dwbase_engine::Result<Vec<Atom>> {
1252 let guard = self.data.lock().unwrap();
1253 let mut out = Vec::new();
1254 for atoms in guard.values() {
1255 for atom in atoms {
1256 if ids.contains(atom.id()) {
1257 out.push(atom.clone());
1258 }
1259 }
1260 }
1261 Ok(out)
1262 }
1263
1264 fn scan(&self, world: &WorldKey, filter: &AtomFilter) -> dwbase_engine::Result<Vec<Atom>> {
1265 let guard = self.data.lock().unwrap();
1266 let list = guard.get(world).cloned().unwrap_or_default();
1267 let mut out = Vec::new();
1268 for atom in list {
1269 if Self::matches_filter(&atom, filter) {
1270 out.push(atom);
1271 if let Some(limit) = filter.limit {
1272 if out.len() >= limit {
1273 break;
1274 }
1275 }
1276 }
1277 }
1278 Ok(out)
1279 }
1280
1281 fn stats(&self, world: &WorldKey) -> dwbase_engine::Result<dwbase_engine::StorageStats> {
1282 let guard = self.data.lock().unwrap();
1283 let atoms = guard.get(world);
1284 let atom_count = atoms.map(|v| v.len()).unwrap_or(0);
1285 let vector_count = atoms
1286 .map(|v| v.iter().filter(|a| a.vector().is_some()).count())
1287 .unwrap_or(0);
1288 Ok(dwbase_engine::StorageStats {
1289 atom_count,
1290 vector_count,
1291 })
1292 }
1293
1294 fn list_ids_in_window(
1295 &self,
1296 world: &WorldKey,
1297 window: &dwbase_engine::TimeWindow,
1298 ) -> dwbase_engine::Result<Vec<AtomId>> {
1299 let guard = self.data.lock().unwrap();
1300 let mut ids = Vec::new();
1301 if let Some(atoms) = guard.get(world) {
1302 for atom in atoms {
1303 if let Ok(dt) = OffsetDateTime::parse(
1304 atom.timestamp().0.as_str(),
1305 &time::format_description::well_known::Rfc3339,
1306 ) {
1307 let ms = (dt.unix_timestamp_nanos() / 1_000_000) as i64;
1308 if ms >= window.start_ms && ms <= window.end_ms {
1309 ids.push(atom.id().clone());
1310 }
1311 }
1312 }
1313 }
1314 Ok(ids)
1315 }
1316
1317 fn delete_atoms(&self, world: &WorldKey, ids: &[AtomId]) -> dwbase_engine::Result<usize> {
1318 let mut guard = self.data.lock().unwrap();
1319 let mut removed = 0usize;
1320 if let Some(vec) = guard.get_mut(world) {
1321 let before = vec.len();
1322 vec.retain(|a| !ids.contains(a.id()));
1323 removed = before - vec.len();
1324 }
1325 self.persist(&guard)?;
1326 Ok(removed)
1327 }
1328
1329 fn worlds(&self) -> dwbase_engine::Result<Vec<WorldKey>> {
1330 let guard = self.data.lock().unwrap();
1331 Ok(guard.keys().cloned().collect())
1332 }
1333}
1334
1335fn to_atom(kind: WitAtomKind) -> AtomKind {
1336 match kind {
1337 WitAtomKind::Observation => AtomKind::Observation,
1338 WitAtomKind::Reflection => AtomKind::Reflection,
1339 WitAtomKind::Plan => AtomKind::Plan,
1340 WitAtomKind::Action => AtomKind::Action,
1341 WitAtomKind::Message => AtomKind::Message,
1342 }
1343}
1344
1345fn to_wit_atom(atom: &Atom) -> WitAtom {
1346 WitAtom {
1347 id: atom.id().0.clone(),
1348 world_key: atom.world().0.clone(),
1349 worker: atom.worker().0.clone(),
1350 kind: match atom.kind() {
1351 AtomKind::Observation => WitAtomKind::Observation,
1352 AtomKind::Reflection => WitAtomKind::Reflection,
1353 AtomKind::Plan => WitAtomKind::Plan,
1354 AtomKind::Action => WitAtomKind::Action,
1355 AtomKind::Message => WitAtomKind::Message,
1356 },
1357 timestamp: atom.timestamp().0.clone(),
1358 importance: atom.importance().get(),
1359 payload_json: atom.payload_json().to_string(),
1360 vector: atom.vector().map(|v| v.to_vec()),
1361 flags_list: atom.flags().to_vec(),
1362 labels: atom.labels().to_vec(),
1363 links: atom.links().iter().map(|l| l.target.0.clone()).collect(),
1364 }
1365}
1366
1367fn to_filter(filter: WitAtomFilter) -> AtomFilter {
1368 AtomFilter {
1369 world: filter.world_key.map(WorldKey::new),
1370 kinds: filter.kinds.into_iter().map(to_atom).collect(),
1371 labels: filter.labels,
1372 flags: filter.flag_filter,
1373 since: filter.since.map(Timestamp::new),
1374 until: filter.until.map(Timestamp::new),
1375 limit: filter.limit.map(|v| v as usize),
1376 }
1377}
1378
1379fn warnings_from_health(snapshot: &engine::HealthSnapshot) -> Vec<String> {
1380 if snapshot.status == "ready" {
1381 Vec::new()
1382 } else if snapshot.message.is_empty() {
1383 vec!["degraded".into()]
1384 } else {
1385 vec![format!("degraded: {}", snapshot.message)]
1386 }
1387}
1388
1389#[derive(Clone, Debug, Serialize, Deserialize)]
1390struct ObserveCursor {
1391 last_atom_id: String,
1392 last_timestamp: String,
1393 updated_at_ms: u64,
1394}
1395
1396fn world_token(world_key: &str) -> String {
1397 hex::encode(world_key.as_bytes())
1398}
1399
1400fn observe_cursor_path(world: &WorldKey) -> PathBuf {
1401 data_dir()
1402 .join("_observe")
1403 .join("cursors")
1404 .join(world_token(&world.0))
1405 .join("cursor.json")
1406}
1407
1408fn read_observe_cursor(world: &WorldKey) -> Option<ObserveCursor> {
1409 let path = observe_cursor_path(world);
1410 let bytes = fs::read(path).ok()?;
1411 serde_json::from_slice(&bytes).ok()
1412}
1413
1414fn write_observe_cursor(world: &WorldKey, cursor: &ObserveCursor) {
1415 let path = observe_cursor_path(world);
1416 if let Some(parent) = path.parent() {
1417 let _ = fs::create_dir_all(parent);
1418 }
1419 if let Ok(bytes) = serde_json::to_vec(cursor) {
1420 let _ = fs::write(path, bytes);
1421 }
1422}
1423
1424fn durable_catchup_atoms(
1425 engine: &ComponentEngine,
1426 world: &WorldKey,
1427 filter: &AtomFilter,
1428 cursor: Option<&ObserveCursor>,
1429) -> Vec<Atom> {
1430 let mut out = Vec::new();
1431 let Ok(all) = engine.storage.scan(
1432 world,
1433 &AtomFilter {
1434 world: Some(world.clone()),
1435 kinds: Vec::new(),
1436 labels: Vec::new(),
1437 flags: Vec::new(),
1438 since: None,
1439 until: None,
1440 limit: None,
1441 },
1442 ) else {
1443 return out;
1444 };
1445
1446 let start_idx = cursor
1447 .and_then(|c| all.iter().position(|a| a.id().0 == c.last_atom_id))
1448 .map(|i| i + 1)
1449 .unwrap_or(0);
1450
1451 for atom in all.into_iter().skip(start_idx) {
1452 if !LocalStream::matches_filter(&atom, filter) {
1453 continue;
1454 }
1455 out.push(atom);
1456 if out.len() >= observe_durable_catchup_limit() {
1457 break;
1458 }
1459 }
1460 out
1461}
1462
1463pub struct Component;
1464impl engine::Guest for Component {
1465 fn remember(atom: WitNewAtom) -> String {
1466 let start = Instant::now();
1467 let engine = init_engine();
1468 let guard = engine.lock().unwrap();
1469 let new_atom = NewAtom {
1470 world: WorldKey::new(atom.world_key),
1471 worker: WorkerKey::new(effective_worker(&atom.worker)),
1472 kind: to_atom(atom.kind),
1473 timestamp: Timestamp::new(if atom.timestamp.is_empty() {
1474 OffsetDateTime::now_utc()
1475 .format(&time::format_description::well_known::Rfc3339)
1476 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".into())
1477 } else {
1478 atom.timestamp
1479 }),
1480 importance: Importance::clamped(atom.importance),
1481 payload_json: atom.payload_json,
1482 vector: atom.vector,
1483 flags: atom.flags_list,
1484 labels: atom.labels,
1485 links: atom
1486 .links
1487 .into_iter()
1488 .map(|id| Link {
1489 target: AtomId::new(id),
1490 kind: LinkKind::References,
1491 })
1492 .collect(),
1493 };
1494 if let Err(_e) = validate_new_atom(&new_atom) {
1495 return String::new();
1496 }
1497 let id = match futures::executor::block_on(guard.remember(new_atom)) {
1498 Ok(v) => v.0,
1499 Err(_) => return String::new(),
1500 };
1501 dwbase_metrics::record_remember_latency(start.elapsed());
1502 dwbase_metrics::record_index_freshness(Duration::from_millis(0));
1503 let repl = SWARM.get().cloned();
1504 let broadcaster = BROADCASTER.get();
1505 let atom = guard
1506 .get_atoms(&[AtomId::new(id.clone())])
1507 .ok()
1508 .and_then(|mut v| v.pop());
1509 drop(guard);
1510 if let Some(atom) = atom {
1511 if let Some(repl) = repl {
1512 let _ = repl.replicate_new_atom(atom.clone());
1513 }
1514 if let Some(b) = broadcaster {
1515 let _ = b.publish_atom(atom);
1516 }
1517 }
1518 id
1519 }
1520
1521 fn ask(question: WitQuestion) -> Answer {
1522 let start = Instant::now();
1523 if validate_world_for_read(&question.world_key).is_err() {
1524 return Answer {
1525 world_key: question.world_key,
1526 text: "capability denied".into(),
1527 supporting_atoms: Vec::new(),
1528 };
1529 }
1530 let engine = init_engine();
1531 let guard = engine.lock().unwrap();
1532 let q = Question {
1533 world: WorldKey::new(question.world_key),
1534 text: question.text,
1535 filter: question.filter.map(to_filter),
1536 };
1537 let ans = futures::executor::block_on(guard.ask(q)).expect("ask");
1538 dwbase_metrics::record_ask_latency(start.elapsed());
1539 Answer {
1540 world_key: ans.world.0,
1541 text: ans.text,
1542 supporting_atoms: ans.supporting_atoms.iter().map(to_wit_atom).collect(),
1543 }
1544 }
1545
1546 fn observe(atom: WitAtom) {
1547 let start = Instant::now();
1548 let engine = init_engine();
1549 let guard = engine.lock().unwrap();
1550 let new_atom = NewAtom {
1551 world: WorldKey::new(atom.world_key.clone()),
1552 worker: WorkerKey::new(effective_worker(&atom.worker)),
1553 kind: to_atom(atom.kind),
1554 timestamp: Timestamp::new(atom.timestamp),
1555 importance: Importance::clamped(atom.importance),
1556 payload_json: atom.payload_json,
1557 vector: atom.vector,
1558 flags: atom.flags_list,
1559 labels: atom.labels,
1560 links: atom
1561 .links
1562 .into_iter()
1563 .map(|id| Link {
1564 target: AtomId::new(id),
1565 kind: LinkKind::References,
1566 })
1567 .collect(),
1568 };
1569 if validate_new_atom(&new_atom).is_err() {
1570 return;
1571 }
1572 let id = futures::executor::block_on(guard.remember(new_atom))
1573 .ok()
1574 .map(|v| v.0);
1575 dwbase_metrics::record_remember_latency(start.elapsed());
1576 dwbase_metrics::record_index_freshness(Duration::from_millis(0));
1577 let repl = SWARM.get().cloned();
1578 let broadcaster = BROADCASTER.get();
1579 let atom = id
1580 .clone()
1581 .and_then(|id| guard.get_atoms(&[AtomId::new(id)]).ok())
1582 .and_then(|mut v| v.pop());
1583 drop(guard);
1584 if let Some(atom) = atom {
1585 if let Some(repl) = repl {
1586 let _ = repl.replicate_new_atom(atom.clone());
1587 }
1588 if let Some(b) = broadcaster {
1589 let _ = b.publish_atom(atom);
1590 }
1591 }
1592 }
1593
1594 fn replay(target_world: String, filter: WitAtomFilter) -> Vec<WitAtom> {
1595 if validate_world_for_read(&target_world).is_err() {
1596 return Vec::new();
1597 }
1598 let engine = init_engine();
1599 let guard = engine.lock().unwrap();
1600 let filter = to_filter(filter);
1601 futures::executor::block_on(guard.replay(WorldKey::new(target_world), filter))
1602 .expect("replay")
1603 .iter()
1604 .map(to_wit_atom)
1605 .collect()
1606 }
1607
1608 fn observe_start(filter: WitAtomFilter) -> u64 {
1609 let engine = init_engine();
1610 let filter = to_filter(filter);
1611 let Some(world) = filter.world.clone() else {
1612 return 0;
1613 };
1614 if validate_world_for_read(&world.0).is_err() {
1615 return 0;
1616 }
1617 let stream = STREAM.get_or_init(LocalStream::new).clone();
1618 let handle = stream.subscribe(&world, filter).unwrap_or(0);
1619 if handle == 0 {
1620 return 0;
1621 }
1622
1623 if observe_durable_enabled() {
1624 let guard = engine.lock().unwrap();
1625 let cursor = read_observe_cursor(&world);
1626 let catchup = durable_catchup_atoms(
1627 &guard,
1628 &world,
1629 &AtomFilter {
1630 world: Some(world.clone()),
1631 kinds: Vec::new(),
1632 labels: Vec::new(),
1633 flags: Vec::new(),
1634 since: None,
1635 until: None,
1636 limit: None,
1637 },
1638 cursor.as_ref(),
1639 );
1640 drop(guard);
1641 for atom in catchup {
1642 let _ = stream.push_to_handle(handle, atom);
1643 }
1644 }
1645
1646 if std::env::var("DWBASE_OBSERVE_NATS_SUBSCRIBE").is_ok() {
1648 if let Some(bus) = BUS.get() {
1649 let subject = world_events_subject(&world.0);
1650 let subs = EVENT_SUBS.get_or_init(|| ParkingMutex::new(HashSet::new()));
1651 let mut guard = subs.lock();
1652 if guard.insert(subject.clone()) {
1653 let stream = stream.clone();
1654 let self_node = node_id();
1655 let _ = bus.subscribe(
1656 &subject,
1657 Box::new(move |_sub, bytes, _reply_to| {
1658 let Ok(batch) = decode_event_batch(&bytes) else {
1659 return;
1660 };
1661 for ev in batch.events {
1662 if ev.from_node == self_node {
1663 continue;
1664 }
1665 stream.push_atom(ev.atom);
1666 }
1667 }),
1668 );
1669 }
1670 }
1671 }
1672
1673 handle
1674 }
1675
1676 fn observe_poll(handle: u64, max: u32) -> Vec<WitAtom> {
1677 if handle == 0 {
1678 return Vec::new();
1679 }
1680 let _ = init_engine();
1681 let stream = STREAM.get_or_init(LocalStream::new);
1682 let atoms = stream.poll_n(handle, max as usize);
1683 if observe_durable_enabled() {
1684 if let Some(world) = stream.world_for_handle(handle) {
1685 if let Some(last) = atoms.last() {
1686 write_observe_cursor(
1687 &world,
1688 &ObserveCursor {
1689 last_atom_id: last.id().0.clone(),
1690 last_timestamp: last.timestamp().0.clone(),
1691 updated_at_ms: now_ms(),
1692 },
1693 );
1694 }
1695 }
1696 }
1697 atoms.iter().map(to_wit_atom).collect()
1698 }
1699
1700 fn observe_stop(handle: u64) {
1701 if handle == 0 {
1702 return;
1703 }
1704 let _ = init_engine();
1705 let stream = STREAM.get_or_init(LocalStream::new);
1706 let _ = stream.stop(handle);
1707 }
1708
1709 fn observe_stats(handle: u64) -> engine::ObserveStatsSnapshot {
1710 if handle == 0 {
1711 return engine::ObserveStatsSnapshot {
1712 handle,
1713 queued_count: 0,
1714 dropped_count: 0,
1715 last_event_ms: 0,
1716 warnings: vec!["invalid_handle".into()],
1717 };
1718 }
1719 let stream = STREAM.get_or_init(LocalStream::new);
1720 let Some((queued, dropped, last_event_ms)) = stream.stats_for_handle(handle) else {
1721 return engine::ObserveStatsSnapshot {
1722 handle,
1723 queued_count: 0,
1724 dropped_count: 0,
1725 last_event_ms: 0,
1726 warnings: vec!["invalid_handle".into()],
1727 };
1728 };
1729 let mut warnings = Vec::new();
1730 if dropped > 0 {
1731 warnings.push("events_dropped".into());
1732 }
1733 engine::ObserveStatsSnapshot {
1734 handle,
1735 queued_count: queued as u64,
1736 dropped_count: dropped,
1737 last_event_ms,
1738 warnings,
1739 }
1740 }
1741
1742 fn health() -> engine::HealthSnapshot {
1743 let engine = init_engine();
1744 let guard = engine.lock().unwrap();
1745 compute_health(&guard)
1746 }
1747
1748 fn remember_v2(atom: WitNewAtom) -> Result<String, engine::ToolError> {
1749 let start = Instant::now();
1750 let engine = init_engine_v2()?;
1751 let guard = engine.lock().unwrap();
1752
1753 let new_atom = NewAtom {
1754 world: WorldKey::new(atom.world_key),
1755 worker: WorkerKey::new(effective_worker(&atom.worker)),
1756 kind: to_atom(atom.kind),
1757 timestamp: Timestamp::new(if atom.timestamp.is_empty() {
1758 OffsetDateTime::now_utc()
1759 .format(&time::format_description::well_known::Rfc3339)
1760 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".into())
1761 } else {
1762 atom.timestamp
1763 }),
1764 importance: Importance::clamped(atom.importance),
1765 payload_json: atom.payload_json,
1766 vector: atom.vector,
1767 flags: atom.flags_list,
1768 labels: atom.labels,
1769 links: atom
1770 .links
1771 .into_iter()
1772 .map(|id| Link {
1773 target: AtomId::new(id),
1774 kind: LinkKind::References,
1775 })
1776 .collect(),
1777 };
1778
1779 if let Err(msg) = validate_new_atom(&new_atom) {
1780 return Err(map_validation_error(msg));
1781 }
1782
1783 let id = futures::executor::block_on(guard.remember(new_atom))
1784 .map_err(|e| err_storage(format!("remember failed: {e}")))?
1785 .0;
1786
1787 dwbase_metrics::record_remember_latency(start.elapsed());
1788 dwbase_metrics::record_index_freshness(Duration::from_millis(0));
1789
1790 let repl = SWARM.get().cloned();
1791 let broadcaster = BROADCASTER.get();
1792 let atom = guard
1793 .get_atoms(&[AtomId::new(id.clone())])
1794 .ok()
1795 .and_then(|mut v| v.pop());
1796 drop(guard);
1797
1798 if let Some(atom) = atom {
1799 if let Some(repl) = repl {
1800 let _ = repl.replicate_new_atom(atom.clone());
1801 }
1802 if let Some(b) = broadcaster {
1803 let _ = b.publish_atom(atom);
1804 }
1805 }
1806
1807 Ok(id)
1808 }
1809
1810 fn ask_v2(question: WitQuestion) -> Result<engine::AnswerV2, engine::ToolError> {
1811 let start = Instant::now();
1812 if let Err(msg) = validate_world_for_read(&question.world_key) {
1813 return Err(map_validation_error(msg));
1814 }
1815 let engine = init_engine_v2()?;
1816 let guard = engine.lock().unwrap();
1817 let q = Question {
1818 world: WorldKey::new(question.world_key),
1819 text: question.text,
1820 filter: question.filter.map(to_filter),
1821 };
1822 let ans = futures::executor::block_on(guard.ask(q))
1823 .map_err(|e| tool_error("internal_error", format!("ask failed: {e}"), None))?;
1824 dwbase_metrics::record_ask_latency(start.elapsed());
1825 let health = compute_health(&guard);
1826 Ok(engine::AnswerV2 {
1827 world_key: ans.world.0,
1828 text: ans.text,
1829 supporting_atoms: ans.supporting_atoms.iter().map(to_wit_atom).collect(),
1830 warnings: warnings_from_health(&health),
1831 })
1832 }
1833
1834 fn observe_start_v2(filter: WitAtomFilter) -> Result<u64, engine::ToolError> {
1835 let engine = init_engine_v2()?;
1836 let filter = to_filter(filter);
1837 let Some(world) = filter.world.clone() else {
1838 return Err(err_invalid_input(
1839 "observe_start requires filter.world_key to be set",
1840 ));
1841 };
1842 if let Err(msg) = validate_world_for_read(&world.0) {
1843 return Err(map_validation_error(msg));
1844 }
1845 let stream = STREAM.get_or_init(LocalStream::new).clone();
1846 let handle = stream
1847 .subscribe(&world, filter)
1848 .map_err(|e| tool_error("internal_error", format!("subscribe failed: {e}"), None))?;
1849
1850 if observe_durable_enabled() {
1851 let guard = engine.lock().unwrap();
1852 let cursor = read_observe_cursor(&world);
1853 let catchup = durable_catchup_atoms(
1854 &guard,
1855 &world,
1856 &AtomFilter {
1857 world: Some(world.clone()),
1858 kinds: Vec::new(),
1859 labels: Vec::new(),
1860 flags: Vec::new(),
1861 since: None,
1862 until: None,
1863 limit: None,
1864 },
1865 cursor.as_ref(),
1866 );
1867 drop(guard);
1868 for atom in catchup {
1869 let _ = stream.push_to_handle(handle, atom);
1870 }
1871 }
1872
1873 if std::env::var("DWBASE_OBSERVE_NATS_SUBSCRIBE").is_ok() {
1874 if let Some(bus) = BUS.get() {
1875 let subject = world_events_subject(&world.0);
1876 let subs = EVENT_SUBS.get_or_init(|| ParkingMutex::new(HashSet::new()));
1877 let mut guard = subs.lock();
1878 if guard.insert(subject.clone()) {
1879 let stream = stream.clone();
1880 let self_node = node_id();
1881 let _ = bus.subscribe(
1882 &subject,
1883 Box::new(move |_sub, bytes, _reply_to| {
1884 let Ok(batch) = decode_event_batch(&bytes) else {
1885 return;
1886 };
1887 for ev in batch.events {
1888 if ev.from_node == self_node {
1889 continue;
1890 }
1891 stream.push_atom(ev.atom);
1892 }
1893 }),
1894 );
1895 }
1896 }
1897 }
1898
1899 Ok(handle)
1900 }
1901
1902 fn observe_poll_v2(handle: u64, max: u32) -> Result<Vec<WitAtom>, engine::ToolError> {
1903 if handle == 0 {
1904 return Err(err_invalid_handle("handle must be non-zero"));
1905 }
1906 let _ = init_engine_v2()?;
1907 let stream = STREAM.get_or_init(LocalStream::new);
1908 if !stream.has_handle(handle) {
1909 return Err(err_invalid_handle("unknown observe handle"));
1910 }
1911 let atoms = stream.poll_n(handle, max as usize);
1912 if observe_durable_enabled() {
1913 if let Some(world) = stream.world_for_handle(handle) {
1914 if let Some(last) = atoms.last() {
1915 write_observe_cursor(
1916 &world,
1917 &ObserveCursor {
1918 last_atom_id: last.id().0.clone(),
1919 last_timestamp: last.timestamp().0.clone(),
1920 updated_at_ms: now_ms(),
1921 },
1922 );
1923 }
1924 }
1925 }
1926 Ok(atoms.iter().map(to_wit_atom).collect())
1927 }
1928
1929 fn observe_stop_v2(handle: u64) -> Result<bool, engine::ToolError> {
1930 if handle == 0 {
1931 return Err(err_invalid_handle("handle must be non-zero"));
1932 }
1933 let _ = init_engine_v2()?;
1934 let stream = STREAM.get_or_init(LocalStream::new);
1935 if !stream.has_handle(handle) {
1936 return Err(err_invalid_handle("unknown observe handle"));
1937 }
1938 stream
1939 .stop(handle)
1940 .map_err(|e| tool_error("internal_error", format!("stop failed: {e}"), None))?;
1941 Ok(true)
1942 }
1943
1944 fn observe_stats_v2(handle: u64) -> Result<engine::ObserveStatsSnapshot, engine::ToolError> {
1945 if handle == 0 {
1946 return Err(err_invalid_handle("handle must be non-zero"));
1947 }
1948 let _ = init_engine_v2()?;
1949 let stream = STREAM.get_or_init(LocalStream::new);
1950 if !stream.has_handle(handle) {
1951 return Err(err_invalid_handle("unknown observe handle"));
1952 }
1953 Ok(<Component as engine::Guest>::observe_stats(handle))
1954 }
1955
1956 fn health_v2() -> Result<engine::HealthSnapshot, engine::ToolError> {
1957 let engine = init_engine_v2()?;
1958 let guard = engine.lock().unwrap();
1959 Ok(compute_health(&guard))
1960 }
1961
1962 fn metrics_snapshot() -> Result<engine::MetricsSnapshotData, engine::ToolError> {
1963 let handle = install_metrics_recorder()
1964 .or_else(|| PROM_HANDLE.get().and_then(|h| h.as_ref()))
1965 .ok_or_else(|| {
1966 tool_error(
1967 "internal_error",
1968 "metrics recorder unavailable",
1969 Some("install metrics recorder failed".into()),
1970 )
1971 })?;
1972 metrics::gauge!("dwbase.component.up").set(1.0);
1973 let text = handle.render();
1974 let parsed = parse_prometheus(&text);
1975 Ok(engine::MetricsSnapshotData {
1976 format: "prometheus".into(),
1977 prometheus: text,
1978 counters: parsed.counters,
1979 gauges: parsed.gauges,
1980 histograms: parsed.histograms,
1981 })
1982 }
1983}
1984
1985export!(Component);
1987
1988#[cfg(test)]
1989mod tests {
1990 use super::*;
1991 use dwbase_swarm_nats::replication::replicator_with_bus;
1992 use tempfile::TempDir;
1993
1994 static ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
1995 static TEST_DIR: once_cell::sync::OnceCell<TempDir> = once_cell::sync::OnceCell::new();
1996
1997 fn env_lock() -> std::sync::MutexGuard<'static, ()> {
1998 ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner())
1999 }
2000
2001 fn reset_security_env() {
2002 for key in [
2003 "DWBASE_TENANT_ID",
2004 "GREENTIC_TENANT_ID",
2005 "DWBASE_WORKER_ID",
2006 "GREENTIC_WORKER_ID",
2007 "DWBASE_ENFORCE_TENANT_NAMESPACE",
2008 "DWBASE_ALLOW_READ_WORLDS",
2009 "DWBASE_ALLOW_WRITE_WORLDS",
2010 "DWBASE_ALLOW_READ_PREFIXES",
2011 "DWBASE_ALLOW_WRITE_PREFIXES",
2012 "DWBASE_MAX_PAYLOAD_BYTES",
2013 "DWBASE_IMPORTANCE_CAP",
2014 "DWBASE_ALLOWED_KINDS",
2015 "DWBASE_ALLOWED_LABELS",
2016 "DWBASE_ALLOW_POLICY_LABELS",
2017 "DWBASE_OBSERVE_QUEUE_CAPACITY",
2018 "DWBASE_OBSERVE_DROP_POLICY",
2019 "DWBASE_OBSERVE_DURABLE",
2020 "DWBASE_OBSERVE_DURABLE_CATCHUP_LIMIT",
2021 "DWBASE_HEALTH_DISABLE_FS_STATS",
2022 ] {
2023 std::env::remove_var(key);
2024 }
2025 }
2026
2027 fn set_shared_data_dir() {
2028 let dir = TEST_DIR.get_or_init(|| TempDir::new().unwrap());
2029 std::env::set_var("DWBASE_DATA_DIR", dir.path());
2030 }
2031
2032 #[test]
2033 fn remember_then_ask_roundtrip() {
2034 let _lock = env_lock();
2035 reset_security_env();
2036 set_shared_data_dir();
2037 let token = format!("hello-{}", now_ms());
2038 let atom_id = <Component as engine::Guest>::remember(WitNewAtom {
2039 world_key: "tenant:default/w1".into(),
2040 worker: "worker-1".into(),
2041 kind: WitAtomKind::Observation,
2042 timestamp: "2024-01-01T00:00:00Z".into(),
2043 importance: 0.5,
2044 payload_json: format!(r#"{{"note":"{}"}}"#, token),
2045 vector: None,
2046 flags_list: vec![],
2047 labels: vec![],
2048 links: vec![],
2049 });
2050 assert!(!atom_id.is_empty());
2051 let atoms_path = data_dir().join("atoms.json");
2052 let bytes = fs::read(&atoms_path).expect("atoms.json should exist after remember");
2053 let text = String::from_utf8_lossy(&bytes);
2054 assert!(
2055 text.contains(&atom_id),
2056 "atoms.json should contain remembered atom id"
2057 );
2058
2059 let answer = <Component as engine::Guest>::ask(WitQuestion {
2060 world_key: "tenant:default/w1".into(),
2061 text: token,
2062 filter: None,
2063 });
2064 assert_eq!(answer.world_key, "tenant:default/w1");
2065 assert!(!answer.text.is_empty());
2066 }
2067
2068 #[test]
2069 fn observe_start_poll_stop_receives_atoms_in_order() {
2070 let _lock = env_lock();
2071 reset_security_env();
2072 set_shared_data_dir();
2073 std::env::set_var("DWBASE_OBSERVE_QUEUE_CAPACITY", "100");
2074 std::env::set_var("DWBASE_OBSERVE_DROP_POLICY", "drop_oldest");
2075
2076 let handle = <Component as engine::Guest>::observe_start(WitAtomFilter {
2077 world_key: Some("tenant:default/obs".into()),
2078 kinds: vec![],
2079 labels: vec![],
2080 flag_filter: vec![],
2081 since: None,
2082 until: None,
2083 limit: None,
2084 });
2085 assert!(handle > 0);
2086
2087 let a1 = <Component as engine::Guest>::remember(WitNewAtom {
2088 world_key: "tenant:default/obs".into(),
2089 worker: "worker-1".into(),
2090 kind: WitAtomKind::Observation,
2091 timestamp: "2024-01-01T00:00:00Z".into(),
2092 importance: 0.4,
2093 payload_json: r#"{"n":1}"#.into(),
2094 vector: None,
2095 flags_list: vec![],
2096 labels: vec![],
2097 links: vec![],
2098 });
2099 let a2 = <Component as engine::Guest>::remember(WitNewAtom {
2100 world_key: "tenant:default/obs".into(),
2101 worker: "worker-1".into(),
2102 kind: WitAtomKind::Observation,
2103 timestamp: "2024-01-01T00:00:01Z".into(),
2104 importance: 0.4,
2105 payload_json: r#"{"n":2}"#.into(),
2106 vector: None,
2107 flags_list: vec![],
2108 labels: vec![],
2109 links: vec![],
2110 });
2111
2112 let mut got = <Component as engine::Guest>::observe_poll(handle, 10);
2113 if got.len() < 2 {
2114 got.extend(<Component as engine::Guest>::observe_poll(handle, 10));
2115 }
2116 assert_eq!(got.len(), 2);
2117 assert_eq!(got[0].id, a1);
2118 assert_eq!(got[1].id, a2);
2119
2120 <Component as engine::Guest>::observe_stop(handle);
2121 }
2122
2123 #[test]
2124 fn observe_receives_remotely_ingested_atoms() {
2125 let _lock = env_lock();
2126 reset_security_env();
2127 set_shared_data_dir();
2128
2129 let handle = <Component as engine::Guest>::observe_start(WitAtomFilter {
2130 world_key: Some("tenant:default/obs-remote".into()),
2131 kinds: vec![],
2132 labels: vec![],
2133 flag_filter: vec![],
2134 since: None,
2135 until: None,
2136 limit: None,
2137 });
2138 assert!(handle > 0);
2139
2140 let atom = Atom::builder(
2141 AtomId::new("remote-1"),
2142 WorldKey::new("tenant:default/obs-remote"),
2143 WorkerKey::new("peer"),
2144 AtomKind::Observation,
2145 Timestamp::new("2024-01-01T00:00:00Z"),
2146 Importance::clamped(0.5),
2147 r#"{"remote":true}"#,
2148 )
2149 .build();
2150
2151 let engine = init_engine();
2152 let guard = engine.lock().unwrap();
2153 futures::executor::block_on(guard.ingest_remote_atoms(vec![atom])).unwrap();
2154 drop(guard);
2155
2156 let got = <Component as engine::Guest>::observe_poll(handle, 10);
2157 assert_eq!(got.len(), 1);
2158 assert_eq!(got[0].id, "remote-1");
2159
2160 <Component as engine::Guest>::observe_stop(handle);
2161 }
2162
2163 #[test]
2164 fn presence_discovery_with_mock() {
2165 let _lock = env_lock();
2166 reset_security_env();
2167 set_shared_data_dir();
2168 let client = Arc::new(MockNats::default()) as Arc<dyn NatsClient>;
2169 let table_a = PeerTable::default();
2170 let table_b = PeerTable::default();
2171 let engine_a = init_engine();
2172 let engine_b = init_engine();
2173 let hello_a = hello_from_engine(&engine_a.lock().unwrap());
2174 let mut hello_b = hello_from_engine(&engine_b.lock().unwrap());
2175 hello_b.node_id = "other-node".into();
2176 let ttl = std::time::Duration::from_millis(100);
2177 start_presence_loop(client.clone(), hello_a, table_a.clone(), ttl);
2178 start_presence_loop(client, hello_b, table_b.clone(), ttl);
2179 std::thread::sleep(std::time::Duration::from_millis(200));
2180 assert!(
2181 table_a.peers().iter().any(|p| p.node_id == "other-node"),
2182 "table A should see node B"
2183 );
2184 assert!(
2185 table_b.peers().iter().any(|p| p.node_id == node_id()),
2186 "table B should see node A"
2187 );
2188 }
2189
2190 #[test]
2191 fn selective_replication_only_ingests_subscribed_worlds() {
2192 let _lock = env_lock();
2193 reset_security_env();
2194 let bus = Arc::new(MockBus::default()) as Arc<dyn dwbase_swarm_nats::swarm::SwarmBus>;
2195
2196 let dir_a = TempDir::new().unwrap();
2197 let dir_b = TempDir::new().unwrap();
2198 let dir_c = TempDir::new().unwrap();
2199
2200 let engine_a = DWBaseEngine::new(
2201 FsStorage::new(dir_a.path().to_path_buf()).unwrap(),
2202 NoVector,
2203 LocalStream::new(),
2204 LocalGatekeeper::new(Capabilities::default(), TrustStore::default()),
2205 DummyEmbedder::new(),
2206 );
2207 let engine_b = DWBaseEngine::new(
2208 FsStorage::new(dir_b.path().to_path_buf()).unwrap(),
2209 NoVector,
2210 LocalStream::new(),
2211 LocalGatekeeper::new(Capabilities::default(), TrustStore::default()),
2212 DummyEmbedder::new(),
2213 );
2214 let engine_c = DWBaseEngine::new(
2215 FsStorage::new(dir_c.path().to_path_buf()).unwrap(),
2216 NoVector,
2217 LocalStream::new(),
2218 LocalGatekeeper::new(Capabilities::default(), TrustStore::default()),
2219 DummyEmbedder::new(),
2220 );
2221
2222 let repl_a = replicator_with_bus(bus.clone(), "node-a", vec![]).unwrap();
2223 let repl_b =
2224 replicator_with_bus(bus.clone(), "node-b", vec!["tenant:default/world-x".into()])
2225 .unwrap();
2226 let repl_c = replicator_with_bus(bus.clone(), "node-c", vec![]).unwrap();
2227
2228 repl_b.announce().unwrap();
2229
2230 let atom_id = futures::executor::block_on(engine_a.remember(NewAtom {
2232 world: WorldKey::new("tenant:default/world-x"),
2233 worker: WorkerKey::new("w"),
2234 kind: AtomKind::Observation,
2235 timestamp: Timestamp::new("2024-01-01T00:00:00Z"),
2236 importance: Importance::clamped(0.5),
2237 payload_json: r#"{"note":"hello"}"#.into(),
2238 vector: None,
2239 flags: vec![],
2240 labels: vec![],
2241 links: vec![],
2242 }))
2243 .unwrap();
2244
2245 let mut atoms = engine_a.get_atoms(&[atom_id.clone()]).unwrap();
2246 let atom = atoms.pop().expect("atom exists");
2247 repl_a.replicate_new_atom(atom).unwrap();
2248
2249 repl_b.poll_inbox().unwrap();
2251 repl_c.poll_inbox().unwrap();
2252
2253 while let Some((_from, batch)) = repl_b.poll_atom_batch() {
2254 futures::executor::block_on(engine_b.ingest_remote_atoms(batch.atoms)).unwrap();
2255 }
2256 while let Some((_from, batch)) = repl_c.poll_atom_batch() {
2257 futures::executor::block_on(engine_c.ingest_remote_atoms(batch.atoms)).unwrap();
2258 }
2259
2260 let ans_b = futures::executor::block_on(engine_b.ask(Question {
2261 world: WorldKey::new("tenant:default/world-x"),
2262 text: "hello?".into(),
2263 filter: None,
2264 }))
2265 .unwrap();
2266 assert!(
2267 ans_b.supporting_atoms.iter().any(|a| a.id() == &atom_id),
2268 "node-b should contain replicated atom"
2269 );
2270
2271 let ans_c = futures::executor::block_on(engine_c.ask(Question {
2272 world: WorldKey::new("tenant:default/world-x"),
2273 text: "hello?".into(),
2274 filter: None,
2275 }))
2276 .unwrap();
2277 assert!(
2278 ans_c.supporting_atoms.iter().all(|a| a.id() != &atom_id),
2279 "node-c should not contain replicated atom"
2280 );
2281 }
2282
2283 #[test]
2284 fn attempt_to_write_foreign_world_is_denied() {
2285 let _lock = env_lock();
2286 reset_security_env();
2287 set_shared_data_dir();
2288 std::env::set_var("DWBASE_TENANT_ID", "acme");
2289
2290 let atom_id = <Component as engine::Guest>::remember(WitNewAtom {
2291 world_key: "tenant:other/w1".into(),
2292 worker: "worker-1".into(),
2293 kind: WitAtomKind::Observation,
2294 timestamp: "2024-01-01T00:00:00Z".into(),
2295 importance: 0.5,
2296 payload_json: r#"{"note":"nope"}"#.into(),
2297 vector: None,
2298 flags_list: vec![],
2299 labels: vec![],
2300 links: vec![],
2301 });
2302 assert!(atom_id.is_empty(), "write should be denied");
2303 }
2304
2305 #[test]
2306 fn oversize_payload_is_rejected() {
2307 let _lock = env_lock();
2308 reset_security_env();
2309 set_shared_data_dir();
2310 std::env::set_var("DWBASE_MAX_PAYLOAD_BYTES", "10");
2311
2312 let atom_id = <Component as engine::Guest>::remember(WitNewAtom {
2313 world_key: "tenant:default/w1".into(),
2314 worker: "worker-1".into(),
2315 kind: WitAtomKind::Observation,
2316 timestamp: "2024-01-01T00:00:00Z".into(),
2317 importance: 0.1,
2318 payload_json: r#"{"note":"this is definitely >10 bytes"}"#.into(),
2319 vector: None,
2320 flags_list: vec![],
2321 labels: vec![],
2322 links: vec![],
2323 });
2324 assert!(atom_id.is_empty(), "oversize payload should be rejected");
2325 }
2326
2327 #[test]
2328 fn health_ready_by_default() {
2329 let _lock = env_lock();
2330 reset_security_env();
2331 set_shared_data_dir();
2332 std::env::set_var("DWBASE_MAX_DISK_MB", "100");
2333 let h = <Component as engine::Guest>::health();
2334 assert!(h.storage_ok);
2335 assert!(h.index_ok);
2336 assert_eq!(h.status, "ready");
2337 assert_eq!(h.disk_pressure, "ok");
2338 }
2339
2340 #[test]
2341 fn health_capacity_unknown_stays_ready_when_ok() {
2342 let _lock = env_lock();
2343 reset_security_env();
2344 set_shared_data_dir();
2345 std::env::remove_var("DWBASE_MAX_DISK_MB");
2346 std::env::set_var("DWBASE_HEALTH_DISABLE_FS_STATS", "1");
2347
2348 let h = <Component as engine::Guest>::health();
2349 assert!(h.storage_ok);
2350 assert!(h.index_ok);
2351 assert_eq!(h.status, "ready");
2352 assert_eq!(h.disk_pressure, "unknown");
2353 }
2354
2355 #[test]
2356 fn health_degrades_when_configured_capacity_exceeded() {
2357 let _lock = env_lock();
2358 reset_security_env();
2359 set_shared_data_dir();
2360 std::env::set_var("DWBASE_HEALTH_DISABLE_FS_STATS", "1");
2361 std::env::set_var("DWBASE_MAX_DISK_MB", "1");
2362
2363 let dir = data_dir();
2364 fs::create_dir_all(&dir).unwrap();
2365 fs::write(dir.join("pressure.bin"), vec![0u8; 2 * 1024 * 1024]).unwrap();
2366
2367 let h = <Component as engine::Guest>::health();
2368 assert_eq!(h.disk_pressure, "degraded");
2369 assert_eq!(h.status, "degraded");
2370 }
2371
2372 #[test]
2373 fn metrics_snapshot_renders_prometheus() {
2374 let _lock = env_lock();
2375 reset_security_env();
2376 set_shared_data_dir();
2377 let _ = install_metrics_recorder();
2378 dwbase_metrics::record_observe_dropped(1);
2379
2380 let snap = <Component as engine::Guest>::metrics_snapshot().expect("metrics");
2381 assert_eq!(snap.format, "prometheus");
2382 println!("metrics snapshot text:\n{}", snap.prometheus);
2383 assert!(
2384 !snap.prometheus.is_empty(),
2385 "prometheus text should be present"
2386 );
2387 }
2388
2389 fn atom_for_world(id: &str, world: &str, ts: &str) -> Atom {
2390 Atom::builder(
2391 AtomId::new(id),
2392 WorldKey::new(world),
2393 WorkerKey::new("w"),
2394 AtomKind::Observation,
2395 Timestamp::new(ts),
2396 Importance::clamped(0.5),
2397 r#"{"x":true}"#,
2398 )
2399 .build()
2400 }
2401
2402 #[test]
2403 fn observe_drop_oldest_is_deterministic_fifo() {
2404 let _lock = env_lock();
2405 reset_security_env();
2406 std::env::set_var("DWBASE_OBSERVE_QUEUE_CAPACITY", "2");
2407 std::env::set_var("DWBASE_OBSERVE_DROP_POLICY", "drop_oldest");
2408
2409 let stream = LocalStream::new();
2410 let world = WorldKey::new("tenant:default/obs-drop-oldest");
2411 let handle = stream
2412 .subscribe(
2413 &world,
2414 AtomFilter {
2415 world: Some(world.clone()),
2416 kinds: vec![],
2417 labels: vec![],
2418 flags: vec![],
2419 since: None,
2420 until: None,
2421 limit: None,
2422 },
2423 )
2424 .unwrap();
2425
2426 assert!(stream.push_to_handle(
2427 handle,
2428 atom_for_world("a1", &world.0, "2024-01-01T00:00:00Z")
2429 ));
2430 assert!(stream.push_to_handle(
2431 handle,
2432 atom_for_world("a2", &world.0, "2024-01-01T00:00:01Z")
2433 ));
2434 assert!(stream.push_to_handle(
2435 handle,
2436 atom_for_world("a3", &world.0, "2024-01-01T00:00:02Z")
2437 ));
2438
2439 let got = stream.poll_n(handle, 10);
2440 assert_eq!(got.len(), 2);
2441 assert_eq!(got[0].id().0, "a2");
2442 assert_eq!(got[1].id().0, "a3");
2443
2444 let (queued, dropped, _last) = stream.stats_for_handle(handle).unwrap();
2445 assert_eq!(queued, 0);
2446 assert_eq!(dropped, 1);
2447 }
2448
2449 #[test]
2450 fn observe_drop_newest_is_deterministic_fifo() {
2451 let _lock = env_lock();
2452 reset_security_env();
2453 std::env::set_var("DWBASE_OBSERVE_QUEUE_CAPACITY", "2");
2454 std::env::set_var("DWBASE_OBSERVE_DROP_POLICY", "drop_newest");
2455
2456 let stream = LocalStream::new();
2457 let world = WorldKey::new("tenant:default/obs-drop-newest");
2458 let handle = stream
2459 .subscribe(
2460 &world,
2461 AtomFilter {
2462 world: Some(world.clone()),
2463 kinds: vec![],
2464 labels: vec![],
2465 flags: vec![],
2466 since: None,
2467 until: None,
2468 limit: None,
2469 },
2470 )
2471 .unwrap();
2472
2473 assert!(stream.push_to_handle(
2474 handle,
2475 atom_for_world("a1", &world.0, "2024-01-01T00:00:00Z")
2476 ));
2477 assert!(stream.push_to_handle(
2478 handle,
2479 atom_for_world("a2", &world.0, "2024-01-01T00:00:01Z")
2480 ));
2481 assert!(stream.push_to_handle(
2482 handle,
2483 atom_for_world("a3", &world.0, "2024-01-01T00:00:02Z")
2484 ));
2485
2486 let got = stream.poll_n(handle, 10);
2487 assert_eq!(got.len(), 2);
2488 assert_eq!(got[0].id().0, "a1");
2489 assert_eq!(got[1].id().0, "a2");
2490
2491 let (queued, dropped, _last) = stream.stats_for_handle(handle).unwrap();
2492 assert_eq!(queued, 0);
2493 assert_eq!(dropped, 1);
2494 }
2495
2496 #[test]
2497 fn observe_durable_cursor_catches_up_after_restart_like_gap() {
2498 let _lock = env_lock();
2499 reset_security_env();
2500 set_shared_data_dir();
2501 std::env::set_var("DWBASE_OBSERVE_DURABLE", "1");
2502
2503 let world = "tenant:default/obs-durable";
2504 let handle1 = <Component as engine::Guest>::observe_start(WitAtomFilter {
2505 world_key: Some(world.into()),
2506 kinds: vec![],
2507 labels: vec![],
2508 flag_filter: vec![],
2509 since: None,
2510 until: None,
2511 limit: None,
2512 });
2513 assert!(handle1 > 0);
2514
2515 let _a1 = <Component as engine::Guest>::remember(WitNewAtom {
2516 world_key: world.into(),
2517 worker: "worker-1".into(),
2518 kind: WitAtomKind::Observation,
2519 timestamp: "2024-01-01T00:00:00Z".into(),
2520 importance: 0.4,
2521 payload_json: r#"{"n":1}"#.into(),
2522 vector: None,
2523 flags_list: vec![],
2524 labels: vec![],
2525 links: vec![],
2526 });
2527 let _a2 = <Component as engine::Guest>::remember(WitNewAtom {
2528 world_key: world.into(),
2529 worker: "worker-1".into(),
2530 kind: WitAtomKind::Observation,
2531 timestamp: "2024-01-01T00:00:01Z".into(),
2532 importance: 0.4,
2533 payload_json: r#"{"n":2}"#.into(),
2534 vector: None,
2535 flags_list: vec![],
2536 labels: vec![],
2537 links: vec![],
2538 });
2539
2540 let got = <Component as engine::Guest>::observe_poll(handle1, 10);
2541 assert_eq!(got.len(), 2);
2542 <Component as engine::Guest>::observe_stop(handle1);
2543
2544 let a3 = <Component as engine::Guest>::remember(WitNewAtom {
2546 world_key: world.into(),
2547 worker: "worker-1".into(),
2548 kind: WitAtomKind::Observation,
2549 timestamp: "2024-01-01T00:00:02Z".into(),
2550 importance: 0.4,
2551 payload_json: r#"{"n":3}"#.into(),
2552 vector: None,
2553 flags_list: vec![],
2554 labels: vec![],
2555 links: vec![],
2556 });
2557 assert!(!a3.is_empty());
2558
2559 let handle2 = <Component as engine::Guest>::observe_start(WitAtomFilter {
2560 world_key: Some(world.into()),
2561 kinds: vec![],
2562 labels: vec![],
2563 flag_filter: vec![],
2564 since: None,
2565 until: None,
2566 limit: None,
2567 });
2568 assert!(handle2 > 0);
2569
2570 let got = <Component as engine::Guest>::observe_poll(handle2, 10);
2571 assert_eq!(got.len(), 1);
2572 assert_eq!(got[0].id, a3);
2573 <Component as engine::Guest>::observe_stop(handle2);
2574 }
2575
2576 #[test]
2577 fn remember_v2_capability_denied_is_structured() {
2578 let _lock = env_lock();
2579 reset_security_env();
2580 set_shared_data_dir();
2581
2582 let res = <Component as engine::Guest>::remember_v2(WitNewAtom {
2583 world_key: "tenant:other/w1".into(),
2584 worker: "worker-1".into(),
2585 kind: WitAtomKind::Observation,
2586 timestamp: "2024-01-01T00:00:00Z".into(),
2587 importance: 0.5,
2588 payload_json: r#"{"note":"nope"}"#.into(),
2589 vector: None,
2590 flags_list: vec![],
2591 labels: vec![],
2592 links: vec![],
2593 });
2594
2595 let err = res.expect_err("should deny cross-tenant write");
2596 assert_eq!(err.code, "capability_denied");
2597 }
2598
2599 #[test]
2600 fn remember_v2_oversize_payload_is_structured() {
2601 let _lock = env_lock();
2602 reset_security_env();
2603 set_shared_data_dir();
2604 std::env::set_var("DWBASE_MAX_PAYLOAD_BYTES", "10");
2605
2606 let payload = "x".repeat(20);
2607 let res = <Component as engine::Guest>::remember_v2(WitNewAtom {
2608 world_key: "tenant:default/w1".into(),
2609 worker: "worker-1".into(),
2610 kind: WitAtomKind::Observation,
2611 timestamp: "2024-01-01T00:00:00Z".into(),
2612 importance: 0.1,
2613 payload_json: payload,
2614 vector: None,
2615 flags_list: vec![],
2616 labels: vec![],
2617 links: vec![],
2618 });
2619
2620 let err = res.expect_err("should reject oversize payload");
2621 assert_eq!(err.code, "payload_too_large");
2622 }
2623
2624 #[test]
2625 fn observe_v2_invalid_handle_is_structured() {
2626 let _lock = env_lock();
2627 reset_security_env();
2628 set_shared_data_dir();
2629
2630 let res = <Component as engine::Guest>::observe_poll_v2(91254, 1);
2631 let err = res.expect_err("invalid handle should error");
2632 assert_eq!(err.code, "invalid_handle");
2633
2634 let res = <Component as engine::Guest>::observe_stop_v2(91254);
2635 let err = res.expect_err("invalid handle should error");
2636 assert_eq!(err.code, "invalid_handle");
2637 }
2638}