Skip to main content

clawdb/
engine.rs

1//! Best-fit `clawdb` wrapper over the currently published component crates.
2
3use std::{path::Path, sync::Arc, time::Instant};
4
5use anyhow::Context;
6use claw_guard::error::GuardError;
7use secrecy::SecretString;
8use serde::{Deserialize, Serialize};
9use serde_json::json;
10use sqlx::Executor;
11use tokio::sync::Mutex;
12use tokio_util::sync::CancellationToken;
13use uuid::Uuid;
14
15use crate::{
16    error::{ClawDBError, ClawDBResult},
17    plugins::{events::ClawEvent, manager::PluginManager},
18    telemetry::{Metrics, PrometheusHandle},
19    types::{
20        BranchDiff, ClawTransaction, HealthStatus, MemoryRecord, MergeResult, ReflectSummary,
21        RememberResult, SearchHit, SyncSummary,
22    },
23};
24
25pub use crate::config::ClawDBConfig;
26
27/// Public session type returned by the wrapper.
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct ClawDBSession {
30    /// Session identifier.
31    pub id: Uuid,
32    /// Agent identifier.
33    pub agent_id: Uuid,
34    /// Workspace identifier.
35    pub workspace_id: Uuid,
36    /// Session role.
37    pub role: String,
38    /// Granted scopes.
39    pub scopes: Vec<String>,
40    /// Guard token.
41    pub token: String,
42    /// Expiry timestamp.
43    pub expires_at: chrono::DateTime<chrono::Utc>,
44}
45
46/// Unified database wrapper built from the current component crates.
47pub struct ClawDB {
48    /// User configuration.
49    pub config: ClawDBConfig,
50    core: Arc<claw_core::ClawEngine>,
51    vector: Option<Arc<claw_vector::VectorEngine>>,
52    branch: Arc<claw_branch::BranchEngine>,
53    sync: Arc<claw_sync::SyncEngine>,
54    guard: Arc<claw_guard::Guard>,
55    reflect: Option<Arc<claw_reflect_client::ReflectClient>>,
56    shutdown: CancellationToken,
57    metrics: Arc<Metrics>,
58    plugins: Arc<Mutex<PluginManager>>,
59    started_at: Instant,
60    sync_local_only: bool,
61}
62
63impl ClawDB {
64    /// Creates a new wrapper and initializes all enabled components.
65    pub async fn new(config: ClawDBConfig) -> ClawDBResult<Self> {
66        crate::telemetry::init_telemetry(&config.telemetry)?;
67
68        let core_config = claw_core::ClawConfig::builder()
69            .db_path(config.core.db_path.clone())
70            .max_connections(config.core.max_connections)
71            .wal_enabled(config.core.wal_enabled)
72            .cache_size_mb(config.core.cache_size_mb)
73            .build()
74            .map_err(ClawDBError::Core)?;
75        let core = Arc::new(claw_core::ClawEngine::open(core_config).await?);
76        core.migrate().await?;
77        core.pool()
78            .execute(
79                "CREATE TABLE IF NOT EXISTS memory_records (
80                    id TEXT PRIMARY KEY
81                )",
82            )
83            .await
84            .map_err(|error| ClawDBError::ComponentInit("core", error.to_string()))?;
85        core.pool()
86            .execute(
87                "CREATE TABLE IF NOT EXISTS tool_outputs (
88                    id TEXT PRIMARY KEY,
89                    session_id TEXT
90                )",
91            )
92            .await
93            .map_err(|error| ClawDBError::ComponentInit("core", error.to_string()))?;
94
95        let vector = if config.vector.enabled {
96            let vector_config = claw_vector::VectorConfig::builder()
97                .db_path(config.vector.db_path.clone())
98                .index_dir(config.vector.index_dir.clone())
99                .embedding_service_url(config.vector.embedding_service_url.clone())
100                .default_workspace_id(config.workspace_id.to_string())
101                .default_dimensions(config.vector.default_dimensions)
102                .build()
103                .map_err(ClawDBError::Vector)?;
104            let engine = Arc::new(
105                claw_vector::VectorEngine::new(vector_config)
106                    .await
107                    .map_err(|error| ClawDBError::ComponentInit("vector", error.to_string()))?,
108            );
109            ensure_vector_collection(&engine, &config.workspace_id.to_string()).await?;
110            Some(engine)
111        } else {
112            None
113        };
114
115        let branch_config = claw_branch::BranchConfig::builder()
116            .workspace_id(config.workspace_id)
117            .branches_dir(config.branch.branches_dir.clone())
118            .registry_db_path(config.branch.registry_db_path.clone())
119            .max_branches_per_workspace(config.branch.max_branches_per_workspace)
120            .gc_interval_secs(config.branch.gc_interval_secs)
121            .trunk_branch_name(config.branch.trunk_branch_name.clone())
122            .build()
123            .map_err(ClawDBError::Branch)?;
124        let branch = Arc::new(
125            claw_branch::BranchEngine::new(branch_config, &config.core.db_path)
126                .await
127                .map_err(|error| ClawDBError::ComponentInit("branch", error.to_string()))?,
128        );
129        branch.start_gc_scheduler().await?;
130
131        let sync_local_only = config.sync.hub_url.is_none();
132        let sync_config = claw_sync::SyncConfig {
133            workspace_id: config.workspace_id,
134            device_id: config.agent_id,
135            hub_endpoint: config
136                .sync
137                .hub_url
138                .clone()
139                .unwrap_or_else(|| "http://127.0.0.1:50051".to_string()),
140            data_dir: config.sync.data_dir.clone(),
141            db_path: config.sync.db_path.clone(),
142            tls_enabled: config.sync.tls_enabled,
143            connect_timeout_secs: config.sync.connect_timeout_secs,
144            request_timeout_secs: config.sync.request_timeout_secs,
145            sync_interval_secs: config.sync.sync_interval_secs,
146            heartbeat_interval_secs: config.sync.sync_interval_secs.max(1),
147            max_retries: 5,
148            retry_base_ms: 500,
149            max_delta_rows: config.sync.max_delta_rows,
150            max_chunk_bytes: config.sync.max_chunk_bytes,
151            max_pull_chunks: config.sync.max_pull_chunks,
152            max_push_inflight: config.sync.max_push_inflight,
153        };
154        let sync = Arc::new(
155            claw_sync::SyncEngine::new(sync_config, core.pool().clone())
156                .await
157                .map_err(|error| ClawDBError::ComponentInit("sync", error.to_string()))?,
158        );
159
160        let guard_config = claw_guard::GuardConfig {
161            db_path: config.guard.db_path.clone().into(),
162            jwt_secret: SecretString::new(config.guard.jwt_secret.clone().into_boxed_str()),
163            policy_dir: Some(config.guard.policy_dir.clone()),
164            sensitive_resources: config.guard.sensitive_resources.clone(),
165            audit_flush_interval_ms: config.guard.audit_flush_interval_ms,
166            audit_batch_size: config.guard.audit_batch_size,
167            business_hours_start_hour: 8,
168            business_hours_end_hour: 18,
169        };
170        let guard = Arc::new(
171            claw_guard::Guard::new(guard_config)
172                .await
173                .map_err(|error| ClawDBError::ComponentInit("guard", error.to_string()))?,
174        );
175
176        let reflect = match (&config.reflect.base_url, &config.reflect.api_key) {
177            (Some(base_url), Some(api_key)) => Some(Arc::new(
178                claw_reflect_client::ReflectClient::new(base_url.clone(), api_key.clone())
179                    .map_err(|error| ClawDBError::ComponentInit("reflect", error.to_string()))?,
180            )),
181            _ => {
182                tracing::warn!("reflect client disabled because base URL or API key is missing");
183                None
184            }
185        };
186
187        let metrics = Metrics::new();
188        let (mut plugin_manager, mut plugin_rx) = PluginManager::new();
189        let _ = plugin_manager.load_from_dir(&config.plugins.plugins_dir);
190        let plugins = Arc::new(Mutex::new(plugin_manager));
191        let plugins_task = plugins.clone();
192        tokio::spawn(async move {
193            while let Ok(event) = plugin_rx.recv().await {
194                let mut manager = plugins_task.lock().await;
195                manager.dispatch(&event).await;
196            }
197        });
198
199        tracing::info!(
200            core = true,
201            vector = vector.is_some(),
202            branch = true,
203            sync = true,
204            reflect = reflect.is_some(),
205            "ClawDB components initialized"
206        );
207
208        Ok(Self {
209            config,
210            core,
211            vector,
212            branch,
213            sync,
214            guard,
215            reflect,
216            shutdown: CancellationToken::new(),
217            metrics,
218            plugins,
219            started_at: Instant::now(),
220            sync_local_only,
221        })
222    }
223
224    /// Compatibility constructor used by binaries.
225    pub async fn start_with(config: ClawDBConfig) -> ClawDBResult<Self> {
226        Self::new(config).await
227    }
228
229    /// Opens the default data directory using environment-backed configuration.
230    pub async fn open_default() -> ClawDBResult<Self> {
231        Self::new(ClawDBConfig::from_env()?).await
232    }
233
234    /// Opens a specific data directory.
235    pub async fn open(data_dir: &Path) -> ClawDBResult<Self> {
236        let mut config = ClawDBConfig::load_or_default(data_dir)?;
237        config.data_dir = data_dir.to_path_buf();
238        Self::new(config).await
239    }
240
241    /// Returns the current uptime in seconds.
242    pub fn uptime_secs(&self) -> u64 {
243        self.started_at.elapsed().as_secs()
244    }
245
246    /// Returns the underlying core engine.
247    pub fn core_engine(&self) -> &Arc<claw_core::ClawEngine> {
248        &self.core
249    }
250
251    /// Returns the underlying branch engine.
252    pub fn branch_engine(&self) -> &Arc<claw_branch::BranchEngine> {
253        &self.branch
254    }
255
256    /// Returns the underlying sync engine.
257    pub fn sync_engine(&self) -> &Arc<claw_sync::SyncEngine> {
258        &self.sync
259    }
260
261    /// Returns the underlying guard engine.
262    pub fn guard_engine(&self) -> &Arc<claw_guard::Guard> {
263        &self.guard
264    }
265
266    /// Returns the optional vector engine.
267    pub fn vector_engine(&self) -> Option<&Arc<claw_vector::VectorEngine>> {
268        self.vector.as_ref()
269    }
270
271    /// Returns the optional reflect client.
272    pub fn reflect_client(&self) -> Option<&Arc<claw_reflect_client::ReflectClient>> {
273        self.reflect.as_ref()
274    }
275
276    /// Returns a Prometheus handle for scraping metrics.
277    pub fn metrics_handle(&self) -> PrometheusHandle {
278        self.metrics.handle()
279    }
280
281    /// Creates a session with the default one-hour TTL.
282    #[tracing::instrument(skip(self, scopes), fields(workspace_id = %self.config.workspace_id, agent_id = %agent_id))]
283    pub async fn session(
284        &self,
285        agent_id: Uuid,
286        role: &str,
287        scopes: Vec<String>,
288    ) -> ClawDBResult<ClawDBSession> {
289        self.session_with_ttl(agent_id, role, scopes, 3600).await
290    }
291
292    /// Creates a session with a custom TTL.
293    #[tracing::instrument(skip(self, scopes), fields(workspace_id = %self.config.workspace_id, agent_id = %agent_id))]
294    pub async fn session_with_ttl(
295        &self,
296        agent_id: Uuid,
297        role: &str,
298        scopes: Vec<String>,
299        ttl_secs: i64,
300    ) -> ClawDBResult<ClawDBSession> {
301        let session = self
302            .guard
303            .sessions()
304            .create_session(
305                agent_id,
306                self.config.workspace_id,
307                role,
308                scopes.clone(),
309                ttl_secs.max(1) as u64,
310            )
311            .await?;
312        self.metrics.session_created.inc();
313        self.emit(ClawEvent::SessionCreated {
314            session_id: session.id,
315            agent_id,
316        })
317        .await;
318        Ok(ClawDBSession {
319            id: session.id,
320            agent_id: session.agent_id,
321            workspace_id: session.workspace_id,
322            role: session.role,
323            scopes,
324            token: session.token,
325            expires_at: session.expires_at,
326        })
327    }
328
329    /// Stores a semantic memory with default tags and type mapping.
330    #[tracing::instrument(skip(self, session, content), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id))]
331    pub async fn remember(
332        &self,
333        session: &ClawDBSession,
334        content: &str,
335    ) -> ClawDBResult<RememberResult> {
336        self.remember_typed(session, content, "semantic", &[], serde_json::Value::Null)
337            .await
338    }
339
340    /// Stores a memory using the current component-crate capabilities.
341    #[tracing::instrument(skip(self, session, content, tags, metadata), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id))]
342    pub async fn remember_typed(
343        &self,
344        session: &ClawDBSession,
345        content: &str,
346        memory_type: &str,
347        tags: &[String],
348        metadata: serde_json::Value,
349    ) -> ClawDBResult<RememberResult> {
350        self.authorize(session, &["memory:write", "memory:*", "*"])
351            .await?;
352
353        let record = claw_core::MemoryRecord::new(
354            content,
355            parse_memory_type(memory_type),
356            tags.to_vec(),
357            None,
358        );
359        let memory_id = self.core.insert_memory(&record).await?;
360
361        let mut indexed = false;
362        if let Some(vector) = &self.vector {
363            let vector_metadata = json!({
364                "memory_id": memory_id,
365                "memory_type": record.memory_type.as_str(),
366                "tags": record.tags,
367                "metadata": metadata,
368            });
369            match vector
370                .upsert_in_workspace(
371                    &session.workspace_id.to_string(),
372                    "memories",
373                    content,
374                    vector_metadata,
375                )
376                .await
377            {
378                Ok(_) => indexed = true,
379                Err(error) => {
380                    tracing::warn!(error = %error, "vector upsert failed after core write")
381                }
382            }
383        }
384
385        self.metrics
386            .remember_total(&session.workspace_id.to_string(), "ok");
387        self.emit(ClawEvent::MemoryWritten {
388            memory_id: memory_id.to_string(),
389            workspace_id: session.workspace_id,
390        })
391        .await;
392
393        Ok(RememberResult { memory_id, indexed })
394    }
395
396    /// Searches memory using semantic search when available, else SQLite FTS.
397    #[tracing::instrument(skip(self, session, query), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id))]
398    pub async fn search(
399        &self,
400        session: &ClawDBSession,
401        query: &str,
402    ) -> ClawDBResult<Vec<SearchHit>> {
403        self.search_with_options(session, query, 10, self.vector.is_some(), None)
404            .await
405    }
406
407    /// Searches memory with current component-crate semantics.
408    #[tracing::instrument(skip(self, session, query, filter), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id))]
409    pub async fn search_with_options(
410        &self,
411        session: &ClawDBSession,
412        query: &str,
413        top_k: usize,
414        semantic: bool,
415        filter: Option<serde_json::Value>,
416    ) -> ClawDBResult<Vec<SearchHit>> {
417        self.authorize(session, &["memory:read", "memory:search", "memory:*", "*"])
418            .await?;
419
420        let workspace_id = session.workspace_id.to_string();
421        let use_semantic = semantic && self.vector.is_some();
422        let hits = if use_semantic {
423            let vector = self
424                .vector
425                .as_ref()
426                .ok_or(ClawDBError::ComponentDisabled("vector"))?;
427            let mut response = vector
428                .search_text_in_workspace(
429                    &workspace_id,
430                    "memories",
431                    query,
432                    top_k.saturating_mul(3).max(top_k),
433                )
434                .await?;
435            if let Some(filter_value) = filter {
436                response
437                    .results
438                    .retain(|result| metadata_matches(&result.metadata, &filter_value));
439            }
440            response
441                .results
442                .into_iter()
443                .take(top_k)
444                .map(search_result_to_hit)
445                .collect::<ClawDBResult<Vec<_>>>()?
446        } else {
447            self.core
448                .fts_search(query)
449                .await?
450                .into_iter()
451                .filter(|record| {
452                    filter
453                        .as_ref()
454                        .map_or(true, |value| memory_record_matches(record, value))
455                })
456                .take(top_k)
457                .map(|record| SearchHit {
458                    id: record.id,
459                    score: 1.0,
460                    content: record.content,
461                    memory_type: record.memory_type.as_str().to_string(),
462                    tags: record.tags,
463                    metadata: serde_json::Value::Null,
464                })
465                .collect()
466        };
467
468        let mode = if use_semantic { "semantic" } else { "fts" };
469        self.metrics.search_total(&workspace_id, mode);
470        self.metrics.search_hits(&workspace_id, hits.len() as f64);
471        self.emit(ClawEvent::SearchExecuted {
472            query: query.to_string(),
473            hits: hits.len(),
474        })
475        .await;
476        Ok(hits)
477    }
478
479    /// Recalls specific memories from the core engine.
480    #[tracing::instrument(skip(self, session, memory_ids), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id))]
481    pub async fn recall(
482        &self,
483        session: &ClawDBSession,
484        memory_ids: &[Uuid],
485    ) -> ClawDBResult<Vec<MemoryRecord>> {
486        self.authorize(session, &["memory:read", "memory:*", "*"])
487            .await?;
488        let mut records = Vec::with_capacity(memory_ids.len());
489        for id in memory_ids {
490            records.push(self.core.get_memory(*id).await?);
491        }
492        Ok(records)
493    }
494
495    /// Lists memories, optionally filtering by memory type.
496    #[tracing::instrument(skip(self, session), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id))]
497    pub async fn list_memories(
498        &self,
499        session: &ClawDBSession,
500        memory_type: Option<&str>,
501    ) -> ClawDBResult<Vec<MemoryRecord>> {
502        self.authorize(session, &["memory:read", "memory:*", "*"])
503            .await?;
504        let type_filter = memory_type.map(parse_memory_type);
505        Ok(self.core.list_memories(type_filter).await?)
506    }
507
508    /// Deletes a memory by id.
509    #[tracing::instrument(skip(self, session), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id, memory_id = %memory_id))]
510    pub async fn delete_memory(&self, session: &ClawDBSession, memory_id: Uuid) -> ClawDBResult<()> {
511        self.authorize(session, &["memory:write", "memory:*", "*"])
512            .await?;
513        self.core.delete_memory(memory_id).await?;
514        Ok(())
515    }
516
517    /// Forks a new branch from trunk.
518    #[tracing::instrument(skip(self, session, name), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id))]
519    pub async fn branch(&self, session: &ClawDBSession, name: &str) -> ClawDBResult<Uuid> {
520        self.authorize(session, &["branch:write", "branch:*", "*"])
521            .await?;
522        let branch = self.branch.fork_trunk(name).await?;
523        self.metrics
524            .branch_ops(&session.workspace_id.to_string(), "fork");
525        self.emit(ClawEvent::BranchCreated {
526            branch_id: branch.id,
527            name: branch.name,
528        })
529        .await;
530        Ok(branch.id)
531    }
532
533    /// Forks a new branch from an explicit parent branch.
534    #[tracing::instrument(skip(self, session, name), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id, parent = %parent))]
535    pub async fn fork_branch(
536        &self,
537        session: &ClawDBSession,
538        parent: Uuid,
539        name: &str,
540    ) -> ClawDBResult<Uuid> {
541        self.authorize(session, &["branch:write", "branch:*", "*"])
542            .await?;
543        let branch = self.branch.fork(parent, name, None).await?;
544        self.metrics
545            .branch_ops(&session.workspace_id.to_string(), "fork");
546        self.emit(ClawEvent::BranchCreated {
547            branch_id: branch.id,
548            name: branch.name,
549        })
550        .await;
551        Ok(branch.id)
552    }
553
554    /// Returns a branch by identifier.
555    #[tracing::instrument(skip(self, session), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id, branch_id = %branch_id))]
556    pub async fn get_branch(
557        &self,
558        session: &ClawDBSession,
559        branch_id: Uuid,
560    ) -> ClawDBResult<claw_branch::Branch> {
561        self.authorize(session, &["branch:read", "branch:*", "*"])
562            .await?;
563        Ok(self.branch.get(branch_id).await?)
564    }
565
566    /// Lists all branches in the current workspace.
567    #[tracing::instrument(skip(self, session), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id))]
568    pub async fn list_branches(
569        &self,
570        session: &ClawDBSession,
571    ) -> ClawDBResult<Vec<claw_branch::Branch>> {
572        self.authorize(session, &["branch:read", "branch:*", "*"])
573            .await?;
574        Ok(self.branch.list(None).await?)
575    }
576
577    /// Merges a source branch into a target branch.
578    #[tracing::instrument(skip(self, session), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id))]
579    pub async fn merge(
580        &self,
581        session: &ClawDBSession,
582        source: Uuid,
583        target: Uuid,
584    ) -> ClawDBResult<MergeResult> {
585        self.merge_with_strategy(session, source, target, claw_branch::MergeStrategy::Theirs)
586            .await
587    }
588
589    /// Merges a source branch into a target branch using an explicit strategy.
590    #[tracing::instrument(skip(self, session), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id, source = %source, target = %target))]
591    pub async fn merge_with_strategy(
592        &self,
593        session: &ClawDBSession,
594        source: Uuid,
595        target: Uuid,
596        strategy: claw_branch::MergeStrategy,
597    ) -> ClawDBResult<MergeResult> {
598        self.authorize(session, &["branch:write", "branch:*", "*"])
599            .await?;
600        let result = self.branch.merge(source, target, strategy).await?;
601        self.metrics
602            .branch_ops(&session.workspace_id.to_string(), "merge");
603        self.emit(ClawEvent::BranchMerged {
604            source,
605            target,
606            merged: result.applied,
607        })
608        .await;
609        Ok(result)
610    }
611
612    /// Diffs two branches.
613    #[tracing::instrument(skip(self, session), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id))]
614    pub async fn diff(
615        &self,
616        session: &ClawDBSession,
617        source: Uuid,
618        target: Uuid,
619    ) -> ClawDBResult<BranchDiff> {
620        self.authorize(session, &["branch:read", "branch:*", "*"])
621            .await?;
622        Ok(self.branch.diff(source, target).await?)
623    }
624
625    /// Discards a branch.
626    #[tracing::instrument(skip(self, session), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id, branch_id = %branch_id))]
627    pub async fn discard_branch(&self, session: &ClawDBSession, branch_id: Uuid) -> ClawDBResult<()> {
628        self.authorize(session, &["branch:write", "branch:*", "*"])
629            .await?;
630        self.branch.discard(branch_id).await?;
631        Ok(())
632    }
633
634    /// Runs a sync round or returns a no-op summary in local-only mode.
635    #[tracing::instrument(skip(self, session), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id))]
636    pub async fn sync(&self, session: &ClawDBSession) -> ClawDBResult<SyncSummary> {
637        self.authorize(session, &["sync:write", "sync:*", "*"])
638            .await?;
639        if self.sync_local_only {
640            return Ok(SyncSummary {
641                pushed: 0,
642                pulled: 0,
643                conflicts: 0,
644                duration_ms: 0,
645            });
646        }
647        let round = self.sync.sync_now().await?;
648        self.metrics.sync_pushed(
649            &session.workspace_id.to_string(),
650            round.push.deltas_sent as u64,
651        );
652        self.metrics.sync_pulled(
653            &session.workspace_id.to_string(),
654            round.pull.deltas_received as u64,
655        );
656        self.emit(ClawEvent::SyncCompleted {
657            pushed: round.push.deltas_sent,
658            pulled: round.pull.deltas_received,
659        })
660        .await;
661        Ok(SyncSummary {
662            pushed: round.push.deltas_sent,
663            pulled: round.pull.deltas_received,
664            conflicts: round.pull.ops_skipped,
665            duration_ms: round.duration_ms,
666        })
667    }
668
669    /// Triggers a reflect job when the reflect client is configured.
670    #[tracing::instrument(skip(self, session), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id))]
671    pub async fn reflect(&self, session: &ClawDBSession) -> ClawDBResult<ReflectSummary> {
672        self.authorize(session, &["reflect:run", "reflect:write", "reflect:*", "*"])
673            .await?;
674        let Some(reflect) = &self.reflect else {
675            return Ok(ReflectSummary::skipped());
676        };
677        let job = reflect
678            .trigger_job("full", &session.workspace_id.to_string(), false)
679            .await?;
680        self.emit(ClawEvent::ReflectCycleRun { facts_extracted: 0 })
681            .await;
682        Ok(ReflectSummary {
683            job_id: Some(job.job_id),
684            status: job.status,
685            message: job.message,
686            skipped: false,
687        })
688    }
689
690    /// Starts a transaction over the core engine and stages vector work for commit.
691    #[tracing::instrument(skip(self, session), fields(workspace_id = %session.workspace_id, agent_id = %session.agent_id))]
692    pub async fn transaction<'a>(
693        &'a self,
694        session: &ClawDBSession,
695    ) -> ClawDBResult<ClawTransaction<'a>> {
696        self.authorize(session, &["memory:write", "memory:*", "*"])
697            .await?;
698        Ok(ClawTransaction {
699            inner: self.core.begin_transaction().await?,
700            vector: self.vector.clone(),
701            workspace_id: session.workspace_id.to_string(),
702            pending_vector_upserts: Vec::new(),
703        })
704    }
705
706    /// Validates a session token.
707    #[tracing::instrument(skip(self, token))]
708    pub async fn validate_session(&self, token: &str) -> ClawDBResult<ClawDBSession> {
709        let session = self.guard.sessions().validate_session(token).await?;
710        Ok(ClawDBSession {
711            id: session.id,
712            agent_id: session.agent_id,
713            workspace_id: session.workspace_id,
714            role: session.role,
715            scopes: session.scopes,
716            token: session.token,
717            expires_at: session.expires_at,
718        })
719    }
720
721    /// Revokes a session by identifier.
722    #[tracing::instrument(skip(self))]
723    pub async fn revoke_session(&self, session_id: Uuid) -> ClawDBResult<()> {
724        self.guard
725            .sessions()
726            .revoke_session(session_id)
727            .await?;
728        Ok(())
729    }
730
731    /// Returns the number of active, non-revoked sessions recorded by guard.
732    #[tracing::instrument(skip(self))]
733    pub async fn active_session_count(&self) -> ClawDBResult<u64> {
734        let count: i64 = sqlx::query_scalar(
735            "SELECT COUNT(*) FROM sessions WHERE revoked = 0 AND expires_at > CURRENT_TIMESTAMP",
736        )
737        .fetch_one(self.guard.pool())
738        .await
739        .map_err(|error| ClawDBError::ComponentInit("guard", error.to_string()))?;
740        Ok(count.max(0) as u64)
741    }
742
743    /// Returns aggregate component health booleans.
744    #[tracing::instrument(skip(self))]
745    pub async fn health(&self) -> ClawDBResult<HealthStatus> {
746        let mut components = std::collections::HashMap::new();
747
748        components.insert("core".to_string(), self.core.stats().await.is_ok());
749        components.insert(
750            "vector".to_string(),
751            if let Some(vector) = &self.vector {
752                let _ = vector.stats().await;
753                true
754            } else {
755                true
756            },
757        );
758        components.insert("branch".to_string(), true);
759        components.insert(
760            "sync".to_string(),
761            if self.sync_local_only {
762                true
763            } else {
764                let status = self.sync.status();
765                status.connected || status.last_error.is_none()
766            },
767        );
768        components.insert("guard".to_string(), true);
769        components.insert(
770            "reflect".to_string(),
771            if let Some(reflect) = &self.reflect {
772                // The reflect client currently has no cheap health probe API.
773                // Treat "configured" as healthy and surface runtime failures from reflect calls.
774                let _ = reflect;
775                true
776            } else {
777                true
778            },
779        );
780
781        let ok = components.values().all(|healthy| *healthy);
782        Ok(HealthStatus { ok, components })
783    }
784
785    /// Closes background tasks owned by the wrapper.
786    #[tracing::instrument(skip(self))]
787    pub async fn close(&self) -> ClawDBResult<()> {
788        self.shutdown.cancel();
789        self.branch.shutdown().await?;
790        self.sync.close().await?;
791        Ok(())
792    }
793
794    /// Compatibility alias for `close`.
795    pub async fn shutdown(&self) -> ClawDBResult<()> {
796        self.close().await
797    }
798
799    async fn authorize(
800        &self,
801        session: &ClawDBSession,
802        accepted_scopes: &[&str],
803    ) -> ClawDBResult<()> {
804        self.guard
805            .sessions()
806            .validate_session(&session.token)
807            .await
808            .map_err(map_guard_session_error)?;
809        if accepted_scopes.iter().any(|required| {
810            session
811                .scopes
812                .iter()
813                .any(|granted| scope_matches(granted, required))
814        }) {
815            return Ok(());
816        }
817        self.metrics.session_denied.inc();
818        self.emit(ClawEvent::PolicyDenied {
819            agent_id: session.agent_id,
820            resource: accepted_scopes
821                .first()
822                .copied()
823                .unwrap_or("unknown")
824                .to_string(),
825            reason: "required scope missing".to_string(),
826        })
827        .await;
828        Err(ClawDBError::PermissionDenied(
829            "required scope missing".to_string(),
830        ))
831    }
832
833    async fn emit(&self, event: ClawEvent) {
834        let manager = self.plugins.clone();
835        let manager = manager.lock().await;
836        manager.emit(event);
837    }
838}
839
840impl<'a> ClawTransaction<'a> {
841    /// Stages a default semantic memory inside the transaction.
842    pub async fn remember(&mut self, content: &str) -> ClawDBResult<Uuid> {
843        self.remember_typed(content, "semantic", &[], serde_json::Value::Null)
844            .await
845    }
846
847    /// Stages a typed memory inside the transaction.
848    pub async fn remember_typed(
849        &mut self,
850        content: &str,
851        memory_type: &str,
852        tags: &[String],
853        metadata: serde_json::Value,
854    ) -> ClawDBResult<Uuid> {
855        let record = claw_core::MemoryRecord::new(
856            content,
857            parse_memory_type(memory_type),
858            tags.to_vec(),
859            None,
860        );
861        let id = self.inner.insert_memory(&record).await?;
862        self.pending_vector_upserts.push((
863            content.to_string(),
864            json!({
865                "memory_id": id,
866                "memory_type": record.memory_type.as_str(),
867                "tags": record.tags,
868                "metadata": metadata,
869            }),
870        ));
871        Ok(id)
872    }
873
874    /// Commits the transaction and flushes staged vector writes best-effort.
875    pub async fn commit(mut self) -> ClawDBResult<()> {
876        self.inner.commit().await?;
877        if let Some(vector) = &self.vector {
878            for (content, metadata) in std::mem::take(&mut self.pending_vector_upserts) {
879                if let Err(error) = vector
880                    .upsert_in_workspace(&self.workspace_id, "memories", &content, metadata)
881                    .await
882                {
883                    tracing::warn!(error = %error, "vector upsert failed after transaction commit");
884                }
885            }
886        }
887        Ok(())
888    }
889
890    /// Rolls the transaction back.
891    pub async fn rollback(self) -> ClawDBResult<()> {
892        self.inner.rollback().await?;
893        Ok(())
894    }
895}
896
897/// Compatibility alias.
898pub type ClawDBEngine = ClawDB;
899
900async fn ensure_vector_collection(
901    vector: &claw_vector::VectorEngine,
902    workspace_id: &str,
903) -> ClawDBResult<()> {
904    let existing = vector.list_collections_in_workspace(workspace_id).await?;
905    if existing
906        .iter()
907        .any(|collection| collection.name == "memories")
908    {
909        return Ok(());
910    }
911    vector
912        .create_collection_in_workspace(
913            workspace_id,
914            "memories",
915            vector.config.default_dimensions,
916            claw_vector::DistanceMetric::Cosine,
917        )
918        .await
919        .context("failed to create default memories collection")
920        .map_err(|error| ClawDBError::ComponentInit("vector", error.to_string()))?;
921    Ok(())
922}
923
924fn parse_memory_type(value: &str) -> claw_core::MemoryType {
925    match value.trim().to_ascii_lowercase().as_str() {
926        "semantic" | "context" | "message" => claw_core::MemoryType::Semantic,
927        "episodic" => claw_core::MemoryType::Episodic,
928        "working" => claw_core::MemoryType::Working,
929        "procedural" => claw_core::MemoryType::Procedural,
930        _ => claw_core::MemoryType::Semantic,
931    }
932}
933
934fn metadata_matches(metadata: &serde_json::Value, filter: &serde_json::Value) -> bool {
935    match filter {
936        serde_json::Value::Object(expected) => expected
937            .iter()
938            .all(|(key, value)| metadata.get(key) == Some(value)),
939        _ => true,
940    }
941}
942
943fn memory_record_matches(record: &MemoryRecord, filter: &serde_json::Value) -> bool {
944    let tags = serde_json::Value::Array(
945        record
946            .tags
947            .iter()
948            .cloned()
949            .map(serde_json::Value::String)
950            .collect(),
951    );
952    let view = json!({
953        "id": record.id.to_string(),
954        "content": record.content.clone(),
955        "memory_type": record.memory_type.as_str(),
956        "tags": tags,
957    });
958    metadata_matches(&view, filter)
959}
960
961fn search_result_to_hit(result: claw_vector::SearchResult) -> ClawDBResult<SearchHit> {
962    let memory_type = result
963        .metadata
964        .get("memory_type")
965        .and_then(|value| value.as_str())
966        .unwrap_or("semantic")
967        .to_string();
968    let tags = result
969        .metadata
970        .get("tags")
971        .and_then(|value| value.as_array())
972        .map(|values| {
973            values
974                .iter()
975                .filter_map(|value| value.as_str().map(ToOwned::to_owned))
976                .collect()
977        })
978        .unwrap_or_default();
979    Ok(SearchHit {
980        id: result.id,
981        score: result.score,
982        content: result.text.unwrap_or_default(),
983        memory_type,
984        tags,
985        metadata: result.metadata,
986    })
987}
988
989fn scope_matches(granted: &str, required: &str) -> bool {
990    granted == "*"
991        || granted == required
992        || granted
993            .strip_suffix(":*")
994            .is_some_and(|prefix| required.starts_with(&format!("{prefix}:")))
995}
996
997fn map_guard_session_error(error: GuardError) -> ClawDBError {
998    match error {
999        GuardError::SessionExpired | GuardError::SessionRevoked | GuardError::InvalidToken => {
1000            ClawDBError::SessionInvalid
1001        }
1002        other => ClawDBError::Guard(other),
1003    }
1004}