1use std::{path::Path, sync::Arc, time::Instant};
4
5use anyhow::Context;
6use claw_guard::error::GuardError;
7use secrecy::SecretString;
8use serde::{Deserialize, Serialize};
9use serde_json::json;
10use sqlx::Executor;
11use tokio::sync::Mutex;
12use tokio_util::sync::CancellationToken;
13use uuid::Uuid;
14
15use crate::{
16 error::{ClawDBError, ClawDBResult},
17 plugins::{events::ClawEvent, manager::PluginManager},
18 telemetry::{Metrics, PrometheusHandle},
19 types::{
20 BranchDiff, ClawTransaction, HealthStatus, MemoryRecord, MergeResult, ReflectSummary,
21 RememberResult, SearchHit, SyncSummary,
22 },
23};
24
25pub use crate::config::ClawDBConfig;
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct ClawDBSession {
30 pub id: Uuid,
32 pub agent_id: Uuid,
34 pub workspace_id: Uuid,
36 pub role: String,
38 pub scopes: Vec<String>,
40 pub token: String,
42 pub expires_at: chrono::DateTime<chrono::Utc>,
44}
45
46pub struct ClawDB {
48 pub config: ClawDBConfig,
50 core: Arc<claw_core::ClawEngine>,
51 vector: Option<Arc<claw_vector::VectorEngine>>,
52 branch: Arc<claw_branch::BranchEngine>,
53 sync: Arc<claw_sync::SyncEngine>,
54 guard: Arc<claw_guard::Guard>,
55 reflect: Option<Arc<claw_reflect_client::ReflectClient>>,
56 shutdown: CancellationToken,
57 metrics: Arc<Metrics>,
58 plugins: Arc<Mutex<PluginManager>>,
59 started_at: Instant,
60 sync_local_only: bool,
61}
62
63impl ClawDB {
64 pub async fn new(config: ClawDBConfig) -> ClawDBResult<Self> {
66 crate::telemetry::init_telemetry(&config.telemetry)?;
67
68 let core_config = claw_core::ClawConfig::builder()
69 .db_path(config.core.db_path.clone())
70 .max_connections(config.core.max_connections)
71 .wal_enabled(config.core.wal_enabled)
72 .cache_size_mb(config.core.cache_size_mb)
73 .build()
74 .map_err(ClawDBError::Core)?;
75 let core = Arc::new(claw_core::ClawEngine::open(core_config).await?);
76 core.migrate().await?;
77 core.pool()
78 .execute(
79 "CREATE TABLE IF NOT EXISTS memory_records (
80 id TEXT PRIMARY KEY
81 )",
82 )
83 .await
84 .map_err(|error| ClawDBError::ComponentInit("core", error.to_string()))?;
85 core.pool()
86 .execute(
87 "CREATE TABLE IF NOT EXISTS tool_outputs (
88 id TEXT PRIMARY KEY,
89 session_id TEXT
90 )",
91 )
92 .await
93 .map_err(|error| ClawDBError::ComponentInit("core", error.to_string()))?;
94
95 let vector = if config.vector.enabled {
96 let vector_config = claw_vector::VectorConfig::builder()
97 .db_path(config.vector.db_path.clone())
98 .index_dir(config.vector.index_dir.clone())
99 .embedding_service_url(config.vector.embedding_service_url.clone())
100 .default_workspace_id(config.workspace_id.to_string())
101 .default_dimensions(config.vector.default_dimensions)
102 .build()
103 .map_err(ClawDBError::Vector)?;
104 let engine = Arc::new(
105 claw_vector::VectorEngine::new(vector_config)
106 .await
107 .map_err(|error| ClawDBError::ComponentInit("vector", error.to_string()))?,
108 );
109 ensure_vector_collection(&engine, &config.workspace_id.to_string()).await?;
110 Some(engine)
111 } else {
112 None
113 };
114
115 let branch_config = claw_branch::BranchConfig::builder()
116 .workspace_id(config.workspace_id)
117 .branches_dir(config.branch.branches_dir.clone())
118 .registry_db_path(config.branch.registry_db_path.clone())
119 .max_branches_per_workspace(config.branch.max_branches_per_workspace)
120 .gc_interval_secs(config.branch.gc_interval_secs)
121 .trunk_branch_name(config.branch.trunk_branch_name.clone())
122 .build()
123 .map_err(ClawDBError::Branch)?;
124 let branch = Arc::new(
125 claw_branch::BranchEngine::new(branch_config, &config.core.db_path)
126 .await
127 .map_err(|error| ClawDBError::ComponentInit("branch", error.to_string()))?,
128 );
129 branch.start_gc_scheduler().await?;
130
131 let sync_local_only = config.sync.hub_url.is_none();
132 let sync_config = claw_sync::SyncConfig {
133 workspace_id: config.workspace_id,
134 device_id: config.agent_id,
135 hub_endpoint: config
136 .sync
137 .hub_url
138 .clone()
139 .unwrap_or_else(|| "http://127.0.0.1:50051".to_string()),
140 data_dir: config.sync.data_dir.clone(),
141 db_path: config.sync.db_path.clone(),
142 tls_enabled: config.sync.tls_enabled,
143 connect_timeout_secs: config.sync.connect_timeout_secs,
144 request_timeout_secs: config.sync.request_timeout_secs,
145 sync_interval_secs: config.sync.sync_interval_secs,
146 heartbeat_interval_secs: config.sync.sync_interval_secs.max(1),
147 max_retries: 5,
148 retry_base_ms: 500,
149 max_delta_rows: config.sync.max_delta_rows,
150 max_chunk_bytes: config.sync.max_chunk_bytes,
151 max_pull_chunks: config.sync.max_pull_chunks,
152 max_push_inflight: config.sync.max_push_inflight,
153 };
154 let sync = Arc::new(
155 claw_sync::SyncEngine::new(sync_config, core.pool().clone())
156 .await
157 .map_err(|error| ClawDBError::ComponentInit("sync", error.to_string()))?,
158 );
159
160 let guard_config = claw_guard::GuardConfig {
161 db_path: config.guard.db_path.clone().into(),
162 jwt_secret: SecretString::new(config.guard.jwt_secret.clone().into_boxed_str()),
163 policy_dir: Some(config.guard.policy_dir.clone()),
164 sensitive_resources: config.guard.sensitive_resources.clone(),
165 audit_flush_interval_ms: config.guard.audit_flush_interval_ms,
166 audit_batch_size: config.guard.audit_batch_size,
167 business_hours_start_hour: 8,
168 business_hours_end_hour: 18,
169 };
170 let guard = Arc::new(
171 claw_guard::Guard::new(guard_config)
172 .await
173 .map_err(|error| ClawDBError::ComponentInit("guard", error.to_string()))?,
174 );
175
176 let reflect = match (&config.reflect.base_url, &config.reflect.api_key) {
177 (Some(base_url), Some(api_key)) => Some(Arc::new(
178 claw_reflect_client::ReflectClient::new(base_url.clone(), api_key.clone())
179 .map_err(|error| ClawDBError::ComponentInit("reflect", error.to_string()))?,
180 )),
181 _ => {
182 tracing::warn!("reflect client disabled because base URL or API key is missing");
183 None
184 }
185 };
186
187 let metrics = Metrics::new();
188 let (mut plugin_manager, mut plugin_rx) = PluginManager::new();
189 let _ = plugin_manager.load_from_dir(&config.plugins.plugins_dir);
190 let plugins = Arc::new(Mutex::new(plugin_manager));
191 let plugins_task = plugins.clone();
192 tokio::spawn(async move {
193 while let Ok(event) = plugin_rx.recv().await {
194 let mut manager = plugins_task.lock().await;
195 manager.dispatch(&event).await;
196 }
197 });
198
199 tracing::info!(
200 core = true,
201 vector = vector.is_some(),
202 branch = true,
203 sync = true,
204 reflect = reflect.is_some(),
205 "ClawDB components initialized"
206 );
207
208 Ok(Self {
209 config,
210 core,
211 vector,
212 branch,
213 sync,
214 guard,
215 reflect,
216 shutdown: CancellationToken::new(),
217 metrics,
218 plugins,
219 started_at: Instant::now(),
220 sync_local_only,
221 })
222 }
223
224 pub async fn start_with(config: ClawDBConfig) -> ClawDBResult<Self> {
226 Self::new(config).await
227 }
228
229 pub async fn open_default() -> ClawDBResult<Self> {
231 Self::new(ClawDBConfig::from_env()?).await
232 }
233
234 pub async fn open(data_dir: &Path) -> ClawDBResult<Self> {
236 let mut config = ClawDBConfig::load_or_default(data_dir)?;
237 config.data_dir = data_dir.to_path_buf();
238 Self::new(config).await
239 }
240
241 pub fn uptime_secs(&self) -> u64 {
243 self.started_at.elapsed().as_secs()
244 }
245
246 pub fn core_engine(&self) -> &Arc<claw_core::ClawEngine> {
248 &self.core
249 }
250
251 pub fn branch_engine(&self) -> &Arc<claw_branch::BranchEngine> {
253 &self.branch
254 }
255
256 pub fn sync_engine(&self) -> &Arc<claw_sync::SyncEngine> {
258 &self.sync
259 }
260
261 pub fn guard_engine(&self) -> &Arc<claw_guard::Guard> {
263 &self.guard
264 }
265
266 pub fn vector_engine(&self) -> Option<&Arc<claw_vector::VectorEngine>> {
268 self.vector.as_ref()
269 }
270
271 pub fn reflect_client(&self) -> Option<&Arc<claw_reflect_client::ReflectClient>> {
273 self.reflect.as_ref()
274 }
275
276 pub fn metrics_handle(&self) -> PrometheusHandle {
278 self.metrics.handle()
279 }
280
281 #[tracing::instrument(skip(self, scopes), fields(workspace_id = %self.config.workspace_id, agent_id = %agent_id))]
283 pub async fn session(
284 &self,
285 agent_id: Uuid,
286 role: &str,
287 scopes: Vec<String>,
288 ) -> ClawDBResult<ClawDBSession> {
289 self.session_with_ttl(agent_id, role, scopes, 3600).await
290 }
291
292 #[tracing::instrument(skip(self, scopes), fields(workspace_id = %self.config.workspace_id, agent_id = %agent_id))]
294 pub async fn session_with_ttl(
295 &self,
296 agent_id: Uuid,
297 role: &str,
298 scopes: Vec<String>,
299 ttl_secs: i64,
300 ) -> ClawDBResult<ClawDBSession> {
301 let session = self
302 .guard
303 .sessions()
304 .create_session(
305 agent_id,
306 self.config.workspace_id,
307 role,
308 scopes.clone(),
309 ttl_secs.max(1) as u64,
310 )
311 .await?;
312 self.metrics.session_created.inc();
313 self.emit(ClawEvent::SessionCreated {
314 session_id: session.id,
315 agent_id,
316 })
317 .await;
318 Ok(ClawDBSession {
319 id: session.id,
320 agent_id: session.agent_id,
321 workspace_id: session.workspace_id,
322 role: session.role,
323 scopes,
324 token: session.token,
325 expires_at: session.expires_at,
326 })
327 }
328
329 #[tracing::instrument(skip(self, session, content), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id))]
331 pub async fn remember(
332 &self,
333 session: &ClawDBSession,
334 content: &str,
335 ) -> ClawDBResult<RememberResult> {
336 self.remember_typed(session, content, "semantic", &[], serde_json::Value::Null)
337 .await
338 }
339
340 #[tracing::instrument(skip(self, session, content, tags, metadata), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id))]
342 pub async fn remember_typed(
343 &self,
344 session: &ClawDBSession,
345 content: &str,
346 memory_type: &str,
347 tags: &[String],
348 metadata: serde_json::Value,
349 ) -> ClawDBResult<RememberResult> {
350 self.authorize(session, &["memory:write", "memory:*", "*"])
351 .await?;
352
353 let record = claw_core::MemoryRecord::new(
354 content,
355 parse_memory_type(memory_type),
356 tags.to_vec(),
357 None,
358 );
359 let memory_id = self.core.insert_memory(&record).await?;
360
361 let mut indexed = false;
362 if let Some(vector) = &self.vector {
363 let vector_metadata = json!({
364 "memory_id": memory_id,
365 "memory_type": record.memory_type.as_str(),
366 "tags": record.tags,
367 "metadata": metadata,
368 });
369 match vector
370 .upsert_in_workspace(
371 &session.workspace_id.to_string(),
372 "memories",
373 content,
374 vector_metadata,
375 )
376 .await
377 {
378 Ok(_) => indexed = true,
379 Err(error) => {
380 tracing::warn!(error = %error, "vector upsert failed after core write")
381 }
382 }
383 }
384
385 self.metrics
386 .remember_total(&session.workspace_id.to_string(), "ok");
387 self.emit(ClawEvent::MemoryWritten {
388 memory_id: memory_id.to_string(),
389 workspace_id: session.workspace_id,
390 })
391 .await;
392
393 Ok(RememberResult { memory_id, indexed })
394 }
395
396 #[tracing::instrument(skip(self, session, query), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id))]
398 pub async fn search(
399 &self,
400 session: &ClawDBSession,
401 query: &str,
402 ) -> ClawDBResult<Vec<SearchHit>> {
403 self.search_with_options(session, query, 10, self.vector.is_some(), None)
404 .await
405 }
406
407 #[tracing::instrument(skip(self, session, query, filter), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id))]
409 pub async fn search_with_options(
410 &self,
411 session: &ClawDBSession,
412 query: &str,
413 top_k: usize,
414 semantic: bool,
415 filter: Option<serde_json::Value>,
416 ) -> ClawDBResult<Vec<SearchHit>> {
417 self.authorize(session, &["memory:read", "memory:search", "memory:*", "*"])
418 .await?;
419
420 let workspace_id = session.workspace_id.to_string();
421 let use_semantic = semantic && self.vector.is_some();
422 let hits = if use_semantic {
423 let vector = self
424 .vector
425 .as_ref()
426 .ok_or(ClawDBError::ComponentDisabled("vector"))?;
427 let mut response = vector
428 .search_text_in_workspace(
429 &workspace_id,
430 "memories",
431 query,
432 top_k.saturating_mul(3).max(top_k),
433 )
434 .await?;
435 if let Some(filter_value) = filter {
436 response
437 .results
438 .retain(|result| metadata_matches(&result.metadata, &filter_value));
439 }
440 response
441 .results
442 .into_iter()
443 .take(top_k)
444 .map(search_result_to_hit)
445 .collect::<ClawDBResult<Vec<_>>>()?
446 } else {
447 self.core
448 .fts_search(query)
449 .await?
450 .into_iter()
451 .filter(|record| {
452 filter
453 .as_ref()
454 .map_or(true, |value| memory_record_matches(record, value))
455 })
456 .take(top_k)
457 .map(|record| SearchHit {
458 id: record.id,
459 score: 1.0,
460 content: record.content,
461 memory_type: record.memory_type.as_str().to_string(),
462 tags: record.tags,
463 metadata: serde_json::Value::Null,
464 })
465 .collect()
466 };
467
468 let mode = if use_semantic { "semantic" } else { "fts" };
469 self.metrics.search_total(&workspace_id, mode);
470 self.metrics.search_hits(&workspace_id, hits.len() as f64);
471 self.emit(ClawEvent::SearchExecuted {
472 query: query.to_string(),
473 hits: hits.len(),
474 })
475 .await;
476 Ok(hits)
477 }
478
479 #[tracing::instrument(skip(self, session, memory_ids), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id))]
481 pub async fn recall(
482 &self,
483 session: &ClawDBSession,
484 memory_ids: &[Uuid],
485 ) -> ClawDBResult<Vec<MemoryRecord>> {
486 self.authorize(session, &["memory:read", "memory:*", "*"])
487 .await?;
488 let mut records = Vec::with_capacity(memory_ids.len());
489 for id in memory_ids {
490 records.push(self.core.get_memory(*id).await?);
491 }
492 Ok(records)
493 }
494
495 #[tracing::instrument(skip(self, session), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id))]
497 pub async fn list_memories(
498 &self,
499 session: &ClawDBSession,
500 memory_type: Option<&str>,
501 ) -> ClawDBResult<Vec<MemoryRecord>> {
502 self.authorize(session, &["memory:read", "memory:*", "*"])
503 .await?;
504 let type_filter = memory_type.map(parse_memory_type);
505 Ok(self.core.list_memories(type_filter).await?)
506 }
507
508 #[tracing::instrument(skip(self, session), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id, memory_id = %memory_id))]
510 pub async fn delete_memory(&self, session: &ClawDBSession, memory_id: Uuid) -> ClawDBResult<()> {
511 self.authorize(session, &["memory:write", "memory:*", "*"])
512 .await?;
513 self.core.delete_memory(memory_id).await?;
514 Ok(())
515 }
516
517 #[tracing::instrument(skip(self, session, name), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id))]
519 pub async fn branch(&self, session: &ClawDBSession, name: &str) -> ClawDBResult<Uuid> {
520 self.authorize(session, &["branch:write", "branch:*", "*"])
521 .await?;
522 let branch = self.branch.fork_trunk(name).await?;
523 self.metrics
524 .branch_ops(&session.workspace_id.to_string(), "fork");
525 self.emit(ClawEvent::BranchCreated {
526 branch_id: branch.id,
527 name: branch.name,
528 })
529 .await;
530 Ok(branch.id)
531 }
532
533 #[tracing::instrument(skip(self, session, name), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id, parent = %parent))]
535 pub async fn fork_branch(
536 &self,
537 session: &ClawDBSession,
538 parent: Uuid,
539 name: &str,
540 ) -> ClawDBResult<Uuid> {
541 self.authorize(session, &["branch:write", "branch:*", "*"])
542 .await?;
543 let branch = self.branch.fork(parent, name, None).await?;
544 self.metrics
545 .branch_ops(&session.workspace_id.to_string(), "fork");
546 self.emit(ClawEvent::BranchCreated {
547 branch_id: branch.id,
548 name: branch.name,
549 })
550 .await;
551 Ok(branch.id)
552 }
553
554 #[tracing::instrument(skip(self, session), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id, branch_id = %branch_id))]
556 pub async fn get_branch(
557 &self,
558 session: &ClawDBSession,
559 branch_id: Uuid,
560 ) -> ClawDBResult<claw_branch::Branch> {
561 self.authorize(session, &["branch:read", "branch:*", "*"])
562 .await?;
563 Ok(self.branch.get(branch_id).await?)
564 }
565
566 #[tracing::instrument(skip(self, session), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id))]
568 pub async fn list_branches(
569 &self,
570 session: &ClawDBSession,
571 ) -> ClawDBResult<Vec<claw_branch::Branch>> {
572 self.authorize(session, &["branch:read", "branch:*", "*"])
573 .await?;
574 Ok(self.branch.list(None).await?)
575 }
576
577 #[tracing::instrument(skip(self, session), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id))]
579 pub async fn merge(
580 &self,
581 session: &ClawDBSession,
582 source: Uuid,
583 target: Uuid,
584 ) -> ClawDBResult<MergeResult> {
585 self.merge_with_strategy(session, source, target, claw_branch::MergeStrategy::Theirs)
586 .await
587 }
588
589 #[tracing::instrument(skip(self, session), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id, source = %source, target = %target))]
591 pub async fn merge_with_strategy(
592 &self,
593 session: &ClawDBSession,
594 source: Uuid,
595 target: Uuid,
596 strategy: claw_branch::MergeStrategy,
597 ) -> ClawDBResult<MergeResult> {
598 self.authorize(session, &["branch:write", "branch:*", "*"])
599 .await?;
600 let result = self.branch.merge(source, target, strategy).await?;
601 self.metrics
602 .branch_ops(&session.workspace_id.to_string(), "merge");
603 self.emit(ClawEvent::BranchMerged {
604 source,
605 target,
606 merged: result.applied,
607 })
608 .await;
609 Ok(result)
610 }
611
612 #[tracing::instrument(skip(self, session), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id))]
614 pub async fn diff(
615 &self,
616 session: &ClawDBSession,
617 source: Uuid,
618 target: Uuid,
619 ) -> ClawDBResult<BranchDiff> {
620 self.authorize(session, &["branch:read", "branch:*", "*"])
621 .await?;
622 Ok(self.branch.diff(source, target).await?)
623 }
624
625 #[tracing::instrument(skip(self, session), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id, branch_id = %branch_id))]
627 pub async fn discard_branch(&self, session: &ClawDBSession, branch_id: Uuid) -> ClawDBResult<()> {
628 self.authorize(session, &["branch:write", "branch:*", "*"])
629 .await?;
630 self.branch.discard(branch_id).await?;
631 Ok(())
632 }
633
634 #[tracing::instrument(skip(self, session), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id))]
636 pub async fn sync(&self, session: &ClawDBSession) -> ClawDBResult<SyncSummary> {
637 self.authorize(session, &["sync:write", "sync:*", "*"])
638 .await?;
639 if self.sync_local_only {
640 return Ok(SyncSummary {
641 pushed: 0,
642 pulled: 0,
643 conflicts: 0,
644 duration_ms: 0,
645 });
646 }
647 let round = self.sync.sync_now().await?;
648 self.metrics.sync_pushed(
649 &session.workspace_id.to_string(),
650 round.push.deltas_sent as u64,
651 );
652 self.metrics.sync_pulled(
653 &session.workspace_id.to_string(),
654 round.pull.deltas_received as u64,
655 );
656 self.emit(ClawEvent::SyncCompleted {
657 pushed: round.push.deltas_sent,
658 pulled: round.pull.deltas_received,
659 })
660 .await;
661 Ok(SyncSummary {
662 pushed: round.push.deltas_sent,
663 pulled: round.pull.deltas_received,
664 conflicts: round.pull.ops_skipped,
665 duration_ms: round.duration_ms,
666 })
667 }
668
669 #[tracing::instrument(skip(self, session), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id))]
671 pub async fn reflect(&self, session: &ClawDBSession) -> ClawDBResult<ReflectSummary> {
672 self.authorize(session, &["reflect:run", "reflect:write", "reflect:*", "*"])
673 .await?;
674 let Some(reflect) = &self.reflect else {
675 return Ok(ReflectSummary::skipped());
676 };
677 let job = reflect
678 .trigger_job("full", &session.workspace_id.to_string(), false)
679 .await?;
680 self.emit(ClawEvent::ReflectCycleRun { facts_extracted: 0 })
681 .await;
682 Ok(ReflectSummary {
683 job_id: Some(job.job_id),
684 status: job.status,
685 message: job.message,
686 skipped: false,
687 })
688 }
689
690 #[tracing::instrument(skip(self, session), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id))]
692 pub async fn transaction<'a>(
693 &'a self,
694 session: &ClawDBSession,
695 ) -> ClawDBResult<ClawTransaction<'a>> {
696 self.authorize(session, &["memory:write", "memory:*", "*"])
697 .await?;
698 Ok(ClawTransaction {
699 inner: self.core.begin_transaction().await?,
700 vector: self.vector.clone(),
701 workspace_id: session.workspace_id.to_string(),
702 pending_vector_upserts: Vec::new(),
703 })
704 }
705
706 #[tracing::instrument(skip(self, token))]
708 pub async fn validate_session(&self, token: &str) -> ClawDBResult<ClawDBSession> {
709 let session = self.guard.sessions().validate_session(token).await?;
710 Ok(ClawDBSession {
711 id: session.id,
712 agent_id: session.agent_id,
713 workspace_id: session.workspace_id,
714 role: session.role,
715 scopes: session.scopes,
716 token: session.token,
717 expires_at: session.expires_at,
718 })
719 }
720
721 #[tracing::instrument(skip(self))]
723 pub async fn revoke_session(&self, session_id: Uuid) -> ClawDBResult<()> {
724 self.guard
725 .sessions()
726 .revoke_session(session_id)
727 .await?;
728 Ok(())
729 }
730
731 #[tracing::instrument(skip(self))]
733 pub async fn active_session_count(&self) -> ClawDBResult<u64> {
734 let count: i64 = sqlx::query_scalar(
735 "SELECT COUNT(*) FROM sessions WHERE revoked = 0 AND expires_at > CURRENT_TIMESTAMP",
736 )
737 .fetch_one(self.guard.pool())
738 .await
739 .map_err(|error| ClawDBError::ComponentInit("guard", error.to_string()))?;
740 Ok(count.max(0) as u64)
741 }
742
743 #[tracing::instrument(skip(self))]
745 pub async fn health(&self) -> ClawDBResult<HealthStatus> {
746 let mut components = std::collections::HashMap::new();
747
748 components.insert("core".to_string(), self.core.stats().await.is_ok());
749 components.insert(
750 "vector".to_string(),
751 if let Some(vector) = &self.vector {
752 let _ = vector.stats().await;
753 true
754 } else {
755 true
756 },
757 );
758 components.insert("branch".to_string(), true);
759 components.insert(
760 "sync".to_string(),
761 if self.sync_local_only {
762 true
763 } else {
764 let status = self.sync.status();
765 status.connected || status.last_error.is_none()
766 },
767 );
768 components.insert("guard".to_string(), true);
769 components.insert(
770 "reflect".to_string(),
771 if let Some(reflect) = &self.reflect {
772 let _ = reflect;
775 true
776 } else {
777 true
778 },
779 );
780
781 let ok = components.values().all(|healthy| *healthy);
782 Ok(HealthStatus { ok, components })
783 }
784
785 #[tracing::instrument(skip(self))]
787 pub async fn close(&self) -> ClawDBResult<()> {
788 self.shutdown.cancel();
789 self.branch.shutdown().await?;
790 self.sync.close().await?;
791 Ok(())
792 }
793
794 pub async fn shutdown(&self) -> ClawDBResult<()> {
796 self.close().await
797 }
798
799 async fn authorize(
800 &self,
801 session: &ClawDBSession,
802 accepted_scopes: &[&str],
803 ) -> ClawDBResult<()> {
804 self.guard
805 .sessions()
806 .validate_session(&session.token)
807 .await
808 .map_err(map_guard_session_error)?;
809 if accepted_scopes.iter().any(|required| {
810 session
811 .scopes
812 .iter()
813 .any(|granted| scope_matches(granted, required))
814 }) {
815 return Ok(());
816 }
817 self.metrics.session_denied.inc();
818 self.emit(ClawEvent::PolicyDenied {
819 agent_id: session.agent_id,
820 resource: accepted_scopes
821 .first()
822 .copied()
823 .unwrap_or("unknown")
824 .to_string(),
825 reason: "required scope missing".to_string(),
826 })
827 .await;
828 Err(ClawDBError::PermissionDenied(
829 "required scope missing".to_string(),
830 ))
831 }
832
833 async fn emit(&self, event: ClawEvent) {
834 let manager = self.plugins.clone();
835 let manager = manager.lock().await;
836 manager.emit(event);
837 }
838}
839
840impl<'a> ClawTransaction<'a> {
841 pub async fn remember(&mut self, content: &str) -> ClawDBResult<Uuid> {
843 self.remember_typed(content, "semantic", &[], serde_json::Value::Null)
844 .await
845 }
846
847 pub async fn remember_typed(
849 &mut self,
850 content: &str,
851 memory_type: &str,
852 tags: &[String],
853 metadata: serde_json::Value,
854 ) -> ClawDBResult<Uuid> {
855 let record = claw_core::MemoryRecord::new(
856 content,
857 parse_memory_type(memory_type),
858 tags.to_vec(),
859 None,
860 );
861 let id = self.inner.insert_memory(&record).await?;
862 self.pending_vector_upserts.push((
863 content.to_string(),
864 json!({
865 "memory_id": id,
866 "memory_type": record.memory_type.as_str(),
867 "tags": record.tags,
868 "metadata": metadata,
869 }),
870 ));
871 Ok(id)
872 }
873
874 pub async fn commit(mut self) -> ClawDBResult<()> {
876 self.inner.commit().await?;
877 if let Some(vector) = &self.vector {
878 for (content, metadata) in std::mem::take(&mut self.pending_vector_upserts) {
879 if let Err(error) = vector
880 .upsert_in_workspace(&self.workspace_id, "memories", &content, metadata)
881 .await
882 {
883 tracing::warn!(error = %error, "vector upsert failed after transaction commit");
884 }
885 }
886 }
887 Ok(())
888 }
889
890 pub async fn rollback(self) -> ClawDBResult<()> {
892 self.inner.rollback().await?;
893 Ok(())
894 }
895}
896
897pub type ClawDBEngine = ClawDB;
899
900async fn ensure_vector_collection(
901 vector: &claw_vector::VectorEngine,
902 workspace_id: &str,
903) -> ClawDBResult<()> {
904 let existing = vector.list_collections_in_workspace(workspace_id).await?;
905 if existing
906 .iter()
907 .any(|collection| collection.name == "memories")
908 {
909 return Ok(());
910 }
911 vector
912 .create_collection_in_workspace(
913 workspace_id,
914 "memories",
915 vector.config.default_dimensions,
916 claw_vector::DistanceMetric::Cosine,
917 )
918 .await
919 .context("failed to create default memories collection")
920 .map_err(|error| ClawDBError::ComponentInit("vector", error.to_string()))?;
921 Ok(())
922}
923
924fn parse_memory_type(value: &str) -> claw_core::MemoryType {
925 match value.trim().to_ascii_lowercase().as_str() {
926 "semantic" | "context" | "message" => claw_core::MemoryType::Semantic,
927 "episodic" => claw_core::MemoryType::Episodic,
928 "working" => claw_core::MemoryType::Working,
929 "procedural" => claw_core::MemoryType::Procedural,
930 _ => claw_core::MemoryType::Semantic,
931 }
932}
933
934fn metadata_matches(metadata: &serde_json::Value, filter: &serde_json::Value) -> bool {
935 match filter {
936 serde_json::Value::Object(expected) => expected
937 .iter()
938 .all(|(key, value)| metadata.get(key) == Some(value)),
939 _ => true,
940 }
941}
942
943fn memory_record_matches(record: &MemoryRecord, filter: &serde_json::Value) -> bool {
944 let tags = serde_json::Value::Array(
945 record
946 .tags
947 .iter()
948 .cloned()
949 .map(serde_json::Value::String)
950 .collect(),
951 );
952 let view = json!({
953 "id": record.id.to_string(),
954 "content": record.content.clone(),
955 "memory_type": record.memory_type.as_str(),
956 "tags": tags,
957 });
958 metadata_matches(&view, filter)
959}
960
961fn search_result_to_hit(result: claw_vector::SearchResult) -> ClawDBResult<SearchHit> {
962 let memory_type = result
963 .metadata
964 .get("memory_type")
965 .and_then(|value| value.as_str())
966 .unwrap_or("semantic")
967 .to_string();
968 let tags = result
969 .metadata
970 .get("tags")
971 .and_then(|value| value.as_array())
972 .map(|values| {
973 values
974 .iter()
975 .filter_map(|value| value.as_str().map(ToOwned::to_owned))
976 .collect()
977 })
978 .unwrap_or_default();
979 Ok(SearchHit {
980 id: result.id,
981 score: result.score,
982 content: result.text.unwrap_or_default(),
983 memory_type,
984 tags,
985 metadata: result.metadata,
986 })
987}
988
989fn scope_matches(granted: &str, required: &str) -> bool {
990 granted == "*"
991 || granted == required
992 || granted
993 .strip_suffix(":*")
994 .is_some_and(|prefix| required.starts_with(&format!("{prefix}:")))
995}
996
997fn map_guard_session_error(error: GuardError) -> ClawDBError {
998 match error {
999 GuardError::SessionExpired | GuardError::SessionRevoked | GuardError::InvalidToken => {
1000 ClawDBError::SessionInvalid
1001 }
1002 other => ClawDBError::Guard(other),
1003 }
1004}