spec_ai_config/persistence/
mod.rs

1pub mod migrations;
2
3use anyhow::{Context, Result};
4use chrono::{DateTime, Utc};
5use directories::BaseDirs;
6use duckdb::{params, Connection};
7use serde_json::Value as JsonValue;
8use spec_ai_knowledge_graph::KnowledgeGraphStore;
9use std::path::{Path, PathBuf};
10use std::sync::{Arc, Mutex};
11
12use crate::types::{
13    EdgeType, GraphEdge, GraphNode, GraphPath, MemoryVector, Message, MessageRole, NodeType,
14    PolicyEntry, TraversalDirection,
15};
16
17#[derive(Clone)]
18pub struct Persistence {
19    conn: Arc<Mutex<Connection>>,
20    instance_id: String,
21    graph_store: KnowledgeGraphStore,
22}
23
24impl Persistence {
25    /// Create or open the database at the provided path and run migrations.
26    pub fn new<P: AsRef<Path>>(db_path: P) -> Result<Self> {
27        Self::with_instance_id(db_path, generate_instance_id())
28    }
29
30    /// Create with a specific instance_id
31    pub fn with_instance_id<P: AsRef<Path>>(db_path: P, instance_id: String) -> Result<Self> {
32        let db_path = expand_tilde(db_path.as_ref())?;
33        if let Some(dir) = db_path.parent() {
34            std::fs::create_dir_all(dir).context("creating DB directory")?;
35        }
36        let conn = Connection::open(&db_path).context("opening DuckDB")?;
37        migrations::run(&conn).context("running migrations")?;
38        let conn_arc = Arc::new(Mutex::new(conn));
39        let graph_store = KnowledgeGraphStore::new(conn_arc.clone(), instance_id.clone());
40        Ok(Self {
41            conn: conn_arc,
42            instance_id,
43            graph_store,
44        })
45    }
46
47    /// Get the instance ID for this persistence instance
48    pub fn instance_id(&self) -> &str {
49        &self.instance_id
50    }
51
52    /// Get direct access to the KnowledgeGraphStore for Phase 2+ consumer migration
53    pub fn graph_store(&self) -> &KnowledgeGraphStore {
54        &self.graph_store
55    }
56
57    /// Checkpoint the database to ensure all WAL data is written to the main database file.
58    /// Call this before shutdown to ensure clean database state.
59    pub fn checkpoint(&self) -> Result<()> {
60        let conn = self.conn();
61        conn.execute_batch("CHECKPOINT;")
62            .context("checkpointing database")
63    }
64
65    /// Creates or opens the default database at ~/.spec-ai/agent_data.duckdb
66    pub fn new_default() -> Result<Self> {
67        let base = BaseDirs::new().context("base directories not available")?;
68        let path = base.home_dir().join(".agent_cli").join("agent_data.duckdb");
69        Self::new(path)
70    }
71
72    /// Get access to the pooled database connection.
73    /// Returns a MutexGuard that provides exclusive access to the connection.
74    pub fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
75        self.conn
76            .lock()
77            .expect("database connection mutex poisoned")
78    }
79
80    // ---------- Messages ----------
81
82    pub fn insert_message(
83        &self,
84        session_id: &str,
85        role: MessageRole,
86        content: &str,
87    ) -> Result<i64> {
88        let conn = self.conn();
89        let mut stmt = conn.prepare(
90            "INSERT INTO messages (session_id, role, content) VALUES (?, ?, ?) RETURNING id",
91        )?;
92        let id: i64 = stmt.query_row(params![session_id, role.as_str(), content], |row| {
93            row.get(0)
94        })?;
95        Ok(id)
96    }
97
98    pub fn list_messages(&self, session_id: &str, limit: i64) -> Result<Vec<Message>> {
99        let conn = self.conn();
100        let mut stmt = conn.prepare("SELECT id, session_id, role, content, CAST(created_at AS TEXT) as created_at FROM messages WHERE session_id = ? ORDER BY id DESC LIMIT ?")?;
101        let mut rows = stmt.query(params![session_id, limit])?;
102        let mut out = Vec::new();
103        while let Some(row) = rows.next()? {
104            let id: i64 = row.get(0)?;
105            let sid: String = row.get(1)?;
106            let role: String = row.get(2)?;
107            let content: String = row.get(3)?;
108            let created_at: String = row.get(4)?; // DuckDB returns TIMESTAMP as string
109            let created_at: DateTime<Utc> = created_at.parse().unwrap_or_else(|_| Utc::now());
110            out.push(Message {
111                id,
112                session_id: sid,
113                role: MessageRole::from_str(&role),
114                content,
115                created_at,
116            });
117        }
118        out.reverse();
119        Ok(out)
120    }
121
122    pub fn get_message(&self, message_id: i64) -> Result<Option<Message>> {
123        let conn = self.conn();
124        let mut stmt = conn.prepare("SELECT id, session_id, role, content, CAST(created_at AS TEXT) as created_at FROM messages WHERE id = ?")?;
125        let mut rows = stmt.query(params![message_id])?;
126        if let Some(row) = rows.next()? {
127            let id: i64 = row.get(0)?;
128            let sid: String = row.get(1)?;
129            let role: String = row.get(2)?;
130            let content: String = row.get(3)?;
131            let created_at: String = row.get(4)?;
132            let created_at: DateTime<Utc> = created_at.parse().unwrap_or_else(|_| Utc::now());
133            Ok(Some(Message {
134                id,
135                session_id: sid,
136                role: MessageRole::from_str(&role),
137                content,
138                created_at,
139            }))
140        } else {
141            Ok(None)
142        }
143    }
144
145    /// Simple pruning by keeping only the most recent `keep_latest` messages.
146    pub fn prune_messages(&self, session_id: &str, keep_latest: i64) -> Result<u64> {
147        let conn = self.conn();
148        let mut stmt = conn.prepare("DELETE FROM messages WHERE session_id = ? AND id NOT IN (SELECT id FROM messages WHERE session_id = ? ORDER BY id DESC LIMIT ?)")?;
149        let changed = stmt.execute(params![session_id, session_id, keep_latest])? as u64;
150        Ok(changed)
151    }
152
153    // ---------- Memory Vectors ----------
154
155    pub fn insert_memory_vector(
156        &self,
157        session_id: &str,
158        message_id: Option<i64>,
159        embedding: &[f32],
160    ) -> Result<i64> {
161        let conn = self.conn();
162        let embedding_json = serde_json::to_string(embedding)?;
163        let mut stmt = conn.prepare("INSERT INTO memory_vectors (session_id, message_id, embedding) VALUES (?, ?, ?) RETURNING id")?;
164        let id: i64 = stmt.query_row(params![session_id, message_id, embedding_json], |row| {
165            row.get(0)
166        })?;
167        Ok(id)
168    }
169
170    pub fn recall_top_k(
171        &self,
172        session_id: &str,
173        query_embedding: &[f32],
174        k: usize,
175    ) -> Result<Vec<(MemoryVector, f32)>> {
176        let conn = self.conn();
177        let mut stmt = conn.prepare("SELECT id, session_id, message_id, embedding, CAST(created_at AS TEXT) as created_at FROM memory_vectors WHERE session_id = ?")?;
178        let mut rows = stmt.query(params![session_id])?;
179        let mut scored: Vec<(MemoryVector, f32)> = Vec::new();
180        while let Some(row) = rows.next()? {
181            let id: i64 = row.get(0)?;
182            let sid: String = row.get(1)?;
183            let message_id: Option<i64> = row.get(2)?;
184            let embedding_text: String = row.get(3)?;
185            let created_at: String = row.get(4)?;
186            let created_at: DateTime<Utc> = created_at.parse().unwrap_or_else(|_| Utc::now());
187            let embedding: Vec<f32> = serde_json::from_str(&embedding_text).unwrap_or_default();
188            let score = cosine_similarity(query_embedding, &embedding);
189            scored.push((
190                MemoryVector {
191                    id,
192                    session_id: sid,
193                    message_id,
194                    embedding,
195                    created_at,
196                },
197                score,
198            ));
199        }
200        scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
201        scored.truncate(k);
202        Ok(scored)
203    }
204
205    /// List known session IDs ordered by most recent activity
206    pub fn list_sessions(&self) -> Result<Vec<String>> {
207        let conn = self.conn();
208        let mut stmt = conn.prepare(
209            "SELECT session_id, MAX(created_at) as last FROM messages GROUP BY session_id ORDER BY last DESC"
210        )?;
211        let mut rows = stmt.query([])?;
212        let mut out = Vec::new();
213        while let Some(row) = rows.next()? {
214            let sid: String = row.get(0)?;
215            out.push(sid);
216        }
217        Ok(out)
218    }
219
220    // ---------- Tool Log ----------
221
222    pub fn log_tool(
223        &self,
224        session_id: &str,
225        agent_name: &str,
226        run_id: &str,
227        tool_name: &str,
228        arguments: &JsonValue,
229        result: &JsonValue,
230        success: bool,
231        error: Option<&str>,
232    ) -> Result<i64> {
233        let conn = self.conn();
234        let mut stmt = conn.prepare("INSERT INTO tool_log (session_id, agent, run_id, tool_name, arguments, result, success, error) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id")?;
235        let id: i64 = stmt.query_row(
236            params![
237                session_id,
238                agent_name,
239                run_id,
240                tool_name,
241                arguments.to_string(),
242                result.to_string(),
243                success,
244                error.unwrap_or("")
245            ],
246            |row| row.get(0),
247        )?;
248        Ok(id)
249    }
250
251    // ---------- Policy Cache ----------
252
253    pub fn policy_upsert(&self, key: &str, value: &JsonValue) -> Result<()> {
254        let conn = self.conn();
255        // DuckDB upsert workaround: delete then insert atomically within a transaction.
256        conn.execute_batch("BEGIN TRANSACTION;")?;
257        {
258            let mut del = conn.prepare("DELETE FROM policy_cache WHERE key = ?")?;
259            let _ = del.execute(params![key])?;
260            let mut ins = conn.prepare("INSERT INTO policy_cache (key, value, updated_at) VALUES (?, ?, CURRENT_TIMESTAMP)")?;
261            let _ = ins.execute(params![key, value.to_string()])?;
262        }
263        conn.execute_batch("COMMIT;")?;
264        Ok(())
265    }
266
267    pub fn policy_get(&self, key: &str) -> Result<Option<PolicyEntry>> {
268        let conn = self.conn();
269        let mut stmt = conn.prepare("SELECT key, value, CAST(updated_at AS TEXT) as updated_at FROM policy_cache WHERE key = ?")?;
270        let mut rows = stmt.query(params![key])?;
271        if let Some(row) = rows.next()? {
272            let key: String = row.get(0)?;
273            let value_text: String = row.get(1)?;
274            let updated_at: String = row.get(2)?;
275            let updated_at: DateTime<Utc> = updated_at.parse().unwrap_or_else(|_| Utc::now());
276            let value: JsonValue = serde_json::from_str(&value_text).unwrap_or(JsonValue::Null);
277            Ok(Some(PolicyEntry {
278                key,
279                value,
280                updated_at,
281            }))
282        } else {
283            Ok(None)
284        }
285    }
286}
287
288fn generate_instance_id() -> String {
289    let hostname = hostname::get()
290        .ok()
291        .and_then(|h| h.into_string().ok())
292        .unwrap_or_else(|| "unknown".to_string());
293    let uuid = uuid::Uuid::new_v7(uuid::Timestamp::now(uuid::NoContext));
294    format!("{}-{}", hostname, uuid)
295}
296
297fn expand_tilde(path: &Path) -> Result<PathBuf> {
298    let path_str = path.to_string_lossy();
299    if path_str == "~" {
300        let base = BaseDirs::new().context("base directories not available")?;
301        Ok(base.home_dir().to_path_buf())
302    } else if let Some(stripped) = path_str.strip_prefix("~/") {
303        let base = BaseDirs::new().context("base directories not available")?;
304        Ok(base.home_dir().join(stripped))
305    } else {
306        Ok(path.to_path_buf())
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313    use std::path::Path;
314
315    #[test]
316    fn expands_home_directory_prefix() {
317        let base = BaseDirs::new().expect("home directory available");
318        let expected = base.home_dir().join("demo.db");
319        let result = expand_tilde(Path::new("~/demo.db")).expect("path expansion succeeds");
320        assert_eq!(result, expected);
321    }
322
323    #[test]
324    fn leaves_regular_paths_unchanged() {
325        let input = Path::new("relative/path.db");
326        let result = expand_tilde(input).expect("path expansion succeeds");
327        assert_eq!(result, input);
328    }
329}
330
331fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
332    if a.is_empty() || b.is_empty() || a.len() != b.len() {
333        return 0.0;
334    }
335    let mut dot = 0.0f32;
336    let mut na = 0.0f32;
337    let mut nb = 0.0f32;
338    for i in 0..a.len() {
339        dot += a[i] * b[i];
340        na += a[i] * a[i];
341        nb += b[i] * b[i];
342    }
343    if na == 0.0 || nb == 0.0 {
344        return 0.0;
345    }
346    dot / (na.sqrt() * nb.sqrt())
347}
348
349// ========== Knowledge Graph Methods ==========
350
351// Type Conversion Helpers
352
353fn from_kg_node(node: spec_ai_knowledge_graph::GraphNode) -> GraphNode {
354    GraphNode {
355        id: node.id,
356        session_id: node.session_id,
357        node_type: node.node_type,
358        label: node.label,
359        properties: node.properties,
360        embedding_id: node.embedding_id,
361        created_at: node.created_at,
362        updated_at: node.updated_at,
363    }
364}
365
366fn from_kg_edge(edge: spec_ai_knowledge_graph::GraphEdge) -> GraphEdge {
367    GraphEdge {
368        id: edge.id,
369        session_id: edge.session_id,
370        source_id: edge.source_id,
371        target_id: edge.target_id,
372        edge_type: edge.edge_type,
373        predicate: edge.predicate,
374        properties: edge.properties,
375        weight: edge.weight,
376        temporal_start: edge.temporal_start,
377        temporal_end: edge.temporal_end,
378        created_at: edge.created_at,
379    }
380}
381
382fn from_kg_path(path: spec_ai_knowledge_graph::GraphPath) -> GraphPath {
383    GraphPath {
384        nodes: path.nodes.into_iter().map(from_kg_node).collect(),
385        edges: path.edges.into_iter().map(from_kg_edge).collect(),
386        length: path.length,
387        weight: path.weight,
388    }
389}
390
391impl Persistence {
392    // ---------- Graph Node Operations ----------
393
394    pub fn insert_graph_node(
395        &self,
396        session_id: &str,
397        node_type: spec_ai_knowledge_graph::NodeType,
398        label: &str,
399        properties: &JsonValue,
400        embedding_id: Option<i64>,
401    ) -> Result<i64> {
402        self.graph_store
403            .insert_graph_node(session_id, node_type, label, properties, embedding_id)
404    }
405
406    pub fn get_graph_node(&self, node_id: i64) -> Result<Option<GraphNode>> {
407        self.graph_store
408            .get_graph_node(node_id)
409            .map(|opt| opt.map(from_kg_node))
410    }
411
412    pub fn list_graph_nodes(
413        &self,
414        session_id: &str,
415        node_type: Option<spec_ai_knowledge_graph::NodeType>,
416        limit: Option<i64>,
417    ) -> Result<Vec<GraphNode>> {
418        self.graph_store
419            .list_graph_nodes(session_id, node_type, limit)
420            .map(|nodes| nodes.into_iter().map(from_kg_node).collect())
421    }
422
423    pub fn count_graph_nodes(&self, session_id: &str) -> Result<i64> {
424        self.graph_store.count_graph_nodes(session_id)
425    }
426
427    pub fn update_graph_node(&self, node_id: i64, properties: &JsonValue) -> Result<()> {
428        self.graph_store.update_graph_node(node_id, properties)
429    }
430
431    pub fn delete_graph_node(&self, node_id: i64) -> Result<()> {
432        self.graph_store.delete_graph_node(node_id)
433    }
434
435    // ---------- Graph Edge Operations ----------
436
437    pub fn insert_graph_edge(
438        &self,
439        session_id: &str,
440        source_id: i64,
441        target_id: i64,
442        edge_type: spec_ai_knowledge_graph::EdgeType,
443        predicate: Option<&str>,
444        properties: Option<&JsonValue>,
445        weight: f32,
446    ) -> Result<i64> {
447        self.graph_store.insert_graph_edge(
448            session_id, source_id, target_id, edge_type, predicate, properties, weight,
449        )
450    }
451
452    pub fn get_graph_edge(&self, edge_id: i64) -> Result<Option<GraphEdge>> {
453        self.graph_store
454            .get_graph_edge(edge_id)
455            .map(|opt| opt.map(from_kg_edge))
456    }
457
458    pub fn list_graph_edges(
459        &self,
460        session_id: &str,
461        source_id: Option<i64>,
462        target_id: Option<i64>,
463    ) -> Result<Vec<GraphEdge>> {
464        self.graph_store
465            .list_graph_edges(session_id, source_id, target_id)
466            .map(|edges| edges.into_iter().map(from_kg_edge).collect())
467    }
468
469    pub fn count_graph_edges(&self, session_id: &str) -> Result<i64> {
470        self.graph_store.count_graph_edges(session_id)
471    }
472
473    pub fn delete_graph_edge(&self, edge_id: i64) -> Result<()> {
474        self.graph_store.delete_graph_edge(edge_id)
475    }
476
477    // ---------- Graph Traversal Operations ----------
478
479    pub fn find_shortest_path(
480        &self,
481        session_id: &str,
482        source_id: i64,
483        target_id: i64,
484        max_hops: Option<usize>,
485    ) -> Result<Option<GraphPath>> {
486        self.graph_store
487            .find_shortest_path(session_id, source_id, target_id, max_hops)
488            .map(|opt| opt.map(from_kg_path))
489    }
490
491    pub fn traverse_neighbors(
492        &self,
493        session_id: &str,
494        node_id: i64,
495        direction: spec_ai_knowledge_graph::TraversalDirection,
496        depth: usize,
497    ) -> Result<Vec<GraphNode>> {
498        self.graph_store
499            .traverse_neighbors(session_id, node_id, direction, depth)
500            .map(|nodes| nodes.into_iter().map(from_kg_node).collect())
501    }
502
503    // ---------- Transcriptions ----------
504
505    pub fn insert_transcription(
506        &self,
507        session_id: &str,
508        chunk_id: i64,
509        text: &str,
510        timestamp: chrono::DateTime<Utc>,
511    ) -> Result<i64> {
512        let conn = self.conn();
513        let mut stmt = conn.prepare(
514            "INSERT INTO transcriptions (session_id, chunk_id, text, timestamp, embedding_id) VALUES (?, ?, ?, ?, NULL) RETURNING id",
515        )?;
516        let id: i64 = stmt.query_row(
517            params![session_id, chunk_id, text, timestamp.to_rfc3339()],
518            |row| row.get(0),
519        )?;
520        Ok(id)
521    }
522
523    pub fn update_transcription_embedding(
524        &self,
525        transcription_id: i64,
526        embedding_id: i64,
527    ) -> Result<()> {
528        let conn = self.conn();
529        conn.execute(
530            "UPDATE transcriptions SET embedding_id = ? WHERE id = ?",
531            params![embedding_id, transcription_id],
532        )?;
533        Ok(())
534    }
535
536    pub fn list_transcriptions(
537        &self,
538        session_id: &str,
539        limit: Option<i64>,
540    ) -> Result<Vec<(i64, i64, String, DateTime<Utc>)>> {
541        let conn = self.conn();
542        let query = if let Some(lim) = limit {
543            format!(
544                "SELECT id, chunk_id, text, CAST(timestamp AS TEXT) FROM transcriptions WHERE session_id = ? ORDER BY chunk_id ASC LIMIT {}",
545                lim
546            )
547        } else {
548            "SELECT id, chunk_id, text, CAST(timestamp AS TEXT) FROM transcriptions WHERE session_id = ? ORDER BY chunk_id ASC".to_string()
549        };
550
551        let mut stmt = conn.prepare(&query)?;
552        let mut rows = stmt.query(params![session_id])?;
553        let mut out = Vec::new();
554
555        while let Some(row) = rows.next()? {
556            let id: i64 = row.get(0)?;
557            let chunk_id: i64 = row.get(1)?;
558            let text: String = row.get(2)?;
559            let timestamp_str: String = row.get(3)?;
560            let timestamp: DateTime<Utc> = timestamp_str.parse().unwrap_or_else(|_| Utc::now());
561            out.push((id, chunk_id, text, timestamp));
562        }
563
564        Ok(out)
565    }
566
567    pub fn get_full_transcription(&self, session_id: &str) -> Result<String> {
568        let transcriptions = self.list_transcriptions(session_id, None)?;
569        Ok(transcriptions
570            .into_iter()
571            .map(|(_, _, text, _)| text)
572            .collect::<Vec<_>>()
573            .join(" "))
574    }
575
576    pub fn delete_transcriptions(&self, session_id: &str) -> Result<()> {
577        let conn = self.conn();
578        conn.execute(
579            "DELETE FROM transcriptions WHERE session_id = ?",
580            params![session_id],
581        )?;
582        Ok(())
583    }
584
585    pub fn get_transcription_by_embedding(&self, embedding_id: i64) -> Result<Option<String>> {
586        let conn = self.conn();
587        let mut stmt =
588            conn.prepare("SELECT text FROM transcriptions WHERE embedding_id = ? LIMIT 1")?;
589        let result: Result<String, _> = stmt.query_row(params![embedding_id], |row| row.get(0));
590        match result {
591            Ok(text) => Ok(Some(text)),
592            Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
593            Err(e) => Err(e.into()),
594        }
595    }
596
597    // ---------- Tokenized Files Cache ----------
598
599    /// Persist tokenization metadata for a file, replacing any existing entry for the path.
600    pub fn upsert_tokenized_file(
601        &self,
602        session_id: &str,
603        path: &str,
604        file_hash: &str,
605        raw_tokens: usize,
606        cleaned_tokens: usize,
607        bytes_captured: usize,
608        truncated: bool,
609        embedding_id: Option<i64>,
610    ) -> Result<i64> {
611        let conn = self.conn();
612        conn.execute(
613            "DELETE FROM tokenized_files WHERE session_id = ? AND path = ?",
614            params![session_id, path],
615        )?;
616        let mut stmt = conn.prepare("INSERT INTO tokenized_files (session_id, path, file_hash, raw_tokens, cleaned_tokens, bytes_captured, truncated, embedding_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id")?;
617        let id: i64 = stmt.query_row(
618            params![
619                session_id,
620                path,
621                file_hash,
622                raw_tokens as i64,
623                cleaned_tokens as i64,
624                bytes_captured as i64,
625                truncated,
626                embedding_id
627            ],
628            |row| row.get(0),
629        )?;
630        Ok(id)
631    }
632
633    pub fn get_tokenized_file(
634        &self,
635        session_id: &str,
636        path: &str,
637    ) -> Result<Option<TokenizedFileRecord>> {
638        let conn = self.conn();
639        let mut stmt = conn.prepare("SELECT id, session_id, path, file_hash, raw_tokens, cleaned_tokens, bytes_captured, truncated, embedding_id, CAST(updated_at AS TEXT) FROM tokenized_files WHERE session_id = ? AND path = ? LIMIT 1")?;
640        let mut rows = stmt.query(params![session_id, path])?;
641        if let Some(row) = rows.next()? {
642            let record = TokenizedFileRecord::from_row(row)?;
643            Ok(Some(record))
644        } else {
645            Ok(None)
646        }
647    }
648
649    pub fn list_tokenized_files(&self, session_id: &str) -> Result<Vec<TokenizedFileRecord>> {
650        let conn = self.conn();
651        let mut stmt = conn.prepare("SELECT id, session_id, path, file_hash, raw_tokens, cleaned_tokens, bytes_captured, truncated, embedding_id, CAST(updated_at AS TEXT) FROM tokenized_files WHERE session_id = ? ORDER BY path")?;
652        let mut rows = stmt.query(params![session_id])?;
653        let mut out = Vec::new();
654        while let Some(row) = rows.next()? {
655            out.push(TokenizedFileRecord::from_row(row)?);
656        }
657        Ok(out)
658    }
659
660    // ========== Mesh Message Persistence ==========
661
662    /// Store a mesh message in the database
663    pub fn mesh_message_store(
664        &self,
665        message_id: &str,
666        source_instance: &str,
667        target_instance: Option<&str>,
668        message_type: &str,
669        payload: &JsonValue,
670        status: &str,
671    ) -> Result<i64> {
672        let conn = self.conn();
673        let payload_json = serde_json::to_string(payload)?;
674        conn.execute(
675            "INSERT INTO mesh_messages (message_id, source_instance, target_instance, message_type, payload, status) VALUES (?, ?, ?, ?, ?, ?)",
676            params![message_id, source_instance, target_instance, message_type, payload_json, status],
677        )?;
678        // Get the last inserted ID
679        let id: i64 = conn.query_row("SELECT last_insert_rowid()", params![], |row| row.get(0))?;
680        Ok(id)
681    }
682
683    /// Check if a message with this ID already exists (for duplicate detection)
684    pub fn mesh_message_exists(&self, message_id: &str) -> Result<bool> {
685        let conn = self.conn();
686        let count: i64 = conn.query_row(
687            "SELECT COUNT(*) FROM mesh_messages WHERE message_id = ?",
688            params![message_id],
689            |row| row.get(0),
690        )?;
691        Ok(count > 0)
692    }
693
694    /// Update message status (e.g., delivered, failed)
695    pub fn mesh_message_update_status(&self, message_id: i64, status: &str) -> Result<()> {
696        let conn = self.conn();
697        conn.execute(
698            "UPDATE mesh_messages SET status = ?, delivered_at = CURRENT_TIMESTAMP WHERE id = ?",
699            params![status, message_id],
700        )?;
701        Ok(())
702    }
703
704    /// Get pending messages for a target instance
705    pub fn mesh_message_get_pending(
706        &self,
707        target_instance: &str,
708    ) -> Result<Vec<MeshMessageRecord>> {
709        let conn = self.conn();
710        let mut stmt = conn.prepare(
711            "SELECT id, source_instance, target_instance, message_type, payload, status, CAST(created_at AS TEXT), CAST(delivered_at AS TEXT)
712             FROM mesh_messages
713             WHERE (target_instance = ? OR target_instance IS NULL) AND status = 'pending'
714             ORDER BY created_at",
715        )?;
716        let mut rows = stmt.query(params![target_instance])?;
717        let mut out = Vec::new();
718        while let Some(row) = rows.next()? {
719            out.push(MeshMessageRecord::from_row(row)?);
720        }
721        Ok(out)
722    }
723
724    /// Get message history for analytics
725    pub fn mesh_message_get_history(
726        &self,
727        instance_id: Option<&str>,
728        limit: usize,
729    ) -> Result<Vec<MeshMessageRecord>> {
730        let conn = self.conn();
731        let query = if instance_id.is_some() {
732            format!(
733                "SELECT id, source_instance, target_instance, message_type, payload, status, CAST(created_at AS TEXT), CAST(delivered_at AS TEXT)
734                 FROM mesh_messages
735                 WHERE source_instance = ? OR target_instance = ?
736                 ORDER BY created_at DESC LIMIT {}",
737                limit
738            )
739        } else {
740            format!(
741                "SELECT id, source_instance, target_instance, message_type, payload, status, CAST(created_at AS TEXT), CAST(delivered_at AS TEXT)
742                 FROM mesh_messages
743                 ORDER BY created_at DESC LIMIT {}",
744                limit
745            )
746        };
747
748        let mut stmt = conn.prepare(&query)?;
749        let mut rows = if let Some(inst) = instance_id {
750            stmt.query(params![inst, inst])?
751        } else {
752            stmt.query(params![])?
753        };
754
755        let mut out = Vec::new();
756        while let Some(row) = rows.next()? {
757            out.push(MeshMessageRecord::from_row(row)?);
758        }
759        Ok(out)
760    }
761
762    // ===== Graph Synchronization Methods =====
763
764    /// Append an entry to the graph changelog
765    pub fn graph_changelog_append(
766        &self,
767        session_id: &str,
768        instance_id: &str,
769        entity_type: &str,
770        entity_id: i64,
771        operation: &str,
772        vector_clock: &str,
773        data: Option<&str>,
774    ) -> Result<i64> {
775        self.graph_store.graph_changelog_append(
776            session_id,
777            instance_id,
778            entity_type,
779            entity_id,
780            operation,
781            vector_clock,
782            data,
783        )
784    }
785
786    /// Get changelog entries since a given timestamp for a session
787    pub fn graph_changelog_get_since(
788        &self,
789        session_id: &str,
790        since_timestamp: &str,
791    ) -> Result<Vec<ChangelogEntry>> {
792        self.graph_store
793            .graph_changelog_get_since(session_id, since_timestamp)
794            .map(|entries| {
795                entries
796                    .into_iter()
797                    .map(|e| ChangelogEntry {
798                        id: e.id,
799                        session_id: e.session_id,
800                        instance_id: e.instance_id,
801                        entity_type: e.entity_type,
802                        entity_id: e.entity_id,
803                        operation: e.operation,
804                        vector_clock: e.vector_clock,
805                        data: e.data,
806                        created_at: e.created_at,
807                    })
808                    .collect()
809            })
810    }
811
812    /// Prune old changelog entries (keep last N days)
813    pub fn graph_changelog_prune(&self, days_to_keep: i64) -> Result<usize> {
814        self.graph_store.graph_changelog_prune(days_to_keep)
815    }
816
817    /// Get the vector clock for an instance/session/graph combination
818    pub fn graph_sync_state_get(
819        &self,
820        instance_id: &str,
821        session_id: &str,
822        graph_name: &str,
823    ) -> Result<Option<String>> {
824        self.graph_store
825            .graph_sync_state_get(instance_id, session_id, graph_name)
826    }
827
828    /// Update the vector clock for an instance/session/graph combination
829    pub fn graph_sync_state_update(
830        &self,
831        instance_id: &str,
832        session_id: &str,
833        graph_name: &str,
834        vector_clock: &str,
835    ) -> Result<()> {
836        self.graph_store
837            .graph_sync_state_update(instance_id, session_id, graph_name, vector_clock)
838    }
839
840    /// Enable or disable sync for a graph
841    pub fn graph_set_sync_enabled(
842        &self,
843        session_id: &str,
844        graph_name: &str,
845        enabled: bool,
846    ) -> Result<()> {
847        self.graph_store
848            .graph_set_sync_enabled(session_id, graph_name, enabled)
849    }
850
851    /// Check if sync is enabled for a graph
852    pub fn graph_get_sync_enabled(&self, session_id: &str, graph_name: &str) -> Result<bool> {
853        self.graph_store
854            .graph_get_sync_enabled(session_id, graph_name)
855    }
856
857    /// List all graphs for a session
858    pub fn graph_list(&self, session_id: &str) -> Result<Vec<String>> {
859        self.graph_store.graph_list(session_id)
860    }
861
862    /// List all sync-enabled graphs across all sessions
863    pub fn graph_list_sync_enabled(&self) -> Result<Vec<(String, String)>> {
864        self.graph_store.graph_list_sync_enabled()
865    }
866
867    /// Get a node with its sync metadata
868    pub fn graph_get_node_with_sync(&self, node_id: i64) -> Result<Option<SyncedNodeRecord>> {
869        self.graph_store
870            .graph_get_node_with_sync(node_id)
871            .map(|opt| {
872                opt.map(|r| SyncedNodeRecord {
873                    id: r.id,
874                    session_id: r.session_id,
875                    node_type: r.node_type,
876                    label: r.label,
877                    properties: r.properties,
878                    embedding_id: r.embedding_id,
879                    created_at: r.created_at,
880                    updated_at: r.updated_at,
881                    vector_clock: r.vector_clock,
882                    last_modified_by: r.last_modified_by,
883                    is_deleted: r.is_deleted,
884                    sync_enabled: r.sync_enabled,
885                })
886            })
887    }
888
889    /// Get all synced nodes for a session with optional filters
890    pub fn graph_list_nodes_with_sync(
891        &self,
892        session_id: &str,
893        sync_enabled_only: bool,
894        include_deleted: bool,
895    ) -> Result<Vec<SyncedNodeRecord>> {
896        self.graph_store
897            .graph_list_nodes_with_sync(session_id, sync_enabled_only, include_deleted)
898            .map(|nodes| {
899                nodes
900                    .into_iter()
901                    .map(|r| SyncedNodeRecord {
902                        id: r.id,
903                        session_id: r.session_id,
904                        node_type: r.node_type,
905                        label: r.label,
906                        properties: r.properties,
907                        embedding_id: r.embedding_id,
908                        created_at: r.created_at,
909                        updated_at: r.updated_at,
910                        vector_clock: r.vector_clock,
911                        last_modified_by: r.last_modified_by,
912                        is_deleted: r.is_deleted,
913                        sync_enabled: r.sync_enabled,
914                    })
915                    .collect()
916            })
917    }
918
919    /// Get an edge with its sync metadata
920    pub fn graph_get_edge_with_sync(&self, edge_id: i64) -> Result<Option<SyncedEdgeRecord>> {
921        self.graph_store
922            .graph_get_edge_with_sync(edge_id)
923            .map(|opt| {
924                opt.map(|r| SyncedEdgeRecord {
925                    id: r.id,
926                    session_id: r.session_id,
927                    source_id: r.source_id,
928                    target_id: r.target_id,
929                    edge_type: r.edge_type,
930                    predicate: r.predicate,
931                    properties: r.properties,
932                    weight: r.weight,
933                    temporal_start: r.temporal_start,
934                    temporal_end: r.temporal_end,
935                    created_at: r.created_at,
936                    vector_clock: r.vector_clock,
937                    last_modified_by: r.last_modified_by,
938                    is_deleted: r.is_deleted,
939                    sync_enabled: r.sync_enabled,
940                })
941            })
942    }
943
944    /// Get all synced edges for a session with optional filters
945    pub fn graph_list_edges_with_sync(
946        &self,
947        session_id: &str,
948        sync_enabled_only: bool,
949        include_deleted: bool,
950    ) -> Result<Vec<SyncedEdgeRecord>> {
951        self.graph_store
952            .graph_list_edges_with_sync(session_id, sync_enabled_only, include_deleted)
953            .map(|edges| {
954                edges
955                    .into_iter()
956                    .map(|r| SyncedEdgeRecord {
957                        id: r.id,
958                        session_id: r.session_id,
959                        source_id: r.source_id,
960                        target_id: r.target_id,
961                        edge_type: r.edge_type,
962                        predicate: r.predicate,
963                        properties: r.properties,
964                        weight: r.weight,
965                        temporal_start: r.temporal_start,
966                        temporal_end: r.temporal_end,
967                        created_at: r.created_at,
968                        vector_clock: r.vector_clock,
969                        last_modified_by: r.last_modified_by,
970                        is_deleted: r.is_deleted,
971                        sync_enabled: r.sync_enabled,
972                    })
973                    .collect()
974            })
975    }
976
977    /// Update a node's sync metadata
978    pub fn graph_update_node_sync_metadata(
979        &self,
980        node_id: i64,
981        vector_clock: &str,
982        last_modified_by: &str,
983        sync_enabled: bool,
984    ) -> Result<()> {
985        self.graph_store.graph_update_node_sync_metadata(
986            node_id,
987            vector_clock,
988            last_modified_by,
989            sync_enabled,
990        )
991    }
992
993    /// Update an edge's sync metadata
994    pub fn graph_update_edge_sync_metadata(
995        &self,
996        edge_id: i64,
997        vector_clock: &str,
998        last_modified_by: &str,
999        sync_enabled: bool,
1000    ) -> Result<()> {
1001        self.graph_store.graph_update_edge_sync_metadata(
1002            edge_id,
1003            vector_clock,
1004            last_modified_by,
1005            sync_enabled,
1006        )
1007    }
1008
1009    /// Mark a node as deleted (tombstone pattern)
1010    pub fn graph_mark_node_deleted(
1011        &self,
1012        node_id: i64,
1013        vector_clock: &str,
1014        deleted_by: &str,
1015    ) -> Result<()> {
1016        self.graph_store
1017            .graph_mark_node_deleted(node_id, vector_clock, deleted_by)
1018    }
1019
1020    /// Mark an edge as deleted (tombstone pattern)
1021    pub fn graph_mark_edge_deleted(
1022        &self,
1023        edge_id: i64,
1024        vector_clock: &str,
1025        deleted_by: &str,
1026    ) -> Result<()> {
1027        self.graph_store
1028            .graph_mark_edge_deleted(edge_id, vector_clock, deleted_by)
1029    }
1030}
1031
1032#[derive(Debug, Clone)]
1033pub struct TokenizedFileRecord {
1034    pub id: i64,
1035    pub session_id: String,
1036    pub path: String,
1037    pub file_hash: String,
1038    pub raw_tokens: usize,
1039    pub cleaned_tokens: usize,
1040    pub bytes_captured: usize,
1041    pub truncated: bool,
1042    pub embedding_id: Option<i64>,
1043    pub updated_at: DateTime<Utc>,
1044}
1045
1046impl TokenizedFileRecord {
1047    fn from_row(row: &duckdb::Row) -> Result<Self> {
1048        let id: i64 = row.get(0)?;
1049        let session_id: String = row.get(1)?;
1050        let path: String = row.get(2)?;
1051        let file_hash: String = row.get(3)?;
1052        let raw_tokens: i64 = row.get(4)?;
1053        let cleaned_tokens: i64 = row.get(5)?;
1054        let bytes_captured: i64 = row.get(6)?;
1055        let truncated: bool = row.get(7)?;
1056        let embedding_id: Option<i64> = row.get(8)?;
1057        let updated_at: String = row.get(9)?;
1058
1059        Ok(Self {
1060            id,
1061            session_id,
1062            path,
1063            file_hash,
1064            raw_tokens: raw_tokens.max(0) as usize,
1065            cleaned_tokens: cleaned_tokens.max(0) as usize,
1066            bytes_captured: bytes_captured.max(0) as usize,
1067            truncated,
1068            embedding_id,
1069            updated_at: updated_at.parse().unwrap_or_else(|_| Utc::now()),
1070        })
1071    }
1072}
1073
1074#[derive(Debug, Clone)]
1075pub struct MeshMessageRecord {
1076    pub id: i64,
1077    pub source_instance: String,
1078    pub target_instance: Option<String>,
1079    pub message_type: String,
1080    pub payload: JsonValue,
1081    pub status: String,
1082    pub created_at: DateTime<Utc>,
1083    pub delivered_at: Option<DateTime<Utc>>,
1084}
1085
1086impl MeshMessageRecord {
1087    fn from_row(row: &duckdb::Row) -> Result<Self> {
1088        let id: i64 = row.get(0)?;
1089        let source_instance: String = row.get(1)?;
1090        let target_instance: Option<String> = row.get(2)?;
1091        let message_type: String = row.get(3)?;
1092        let payload_str: String = row.get(4)?;
1093        let payload: JsonValue = serde_json::from_str(&payload_str)?;
1094        let status: String = row.get(5)?;
1095        let created_at_str: String = row.get(6)?;
1096        let delivered_at_str: Option<String> = row.get(7)?;
1097
1098        Ok(MeshMessageRecord {
1099            id,
1100            source_instance,
1101            target_instance,
1102            message_type,
1103            payload,
1104            status,
1105            created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1106            delivered_at: delivered_at_str.and_then(|s| s.parse().ok()),
1107        })
1108    }
1109}
1110
1111// ===== Graph Sync Record Types =====
1112
1113#[derive(Debug, Clone)]
1114pub struct ChangelogEntry {
1115    pub id: i64,
1116    pub session_id: String,
1117    pub instance_id: String,
1118    pub entity_type: String,
1119    pub entity_id: i64,
1120    pub operation: String,
1121    pub vector_clock: String,
1122    pub data: Option<String>,
1123    pub created_at: DateTime<Utc>,
1124}
1125
1126impl ChangelogEntry {
1127    fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1128        let id: i64 = row.get(0)?;
1129        let session_id: String = row.get(1)?;
1130        let instance_id: String = row.get(2)?;
1131        let entity_type: String = row.get(3)?;
1132        let entity_id: i64 = row.get(4)?;
1133        let operation: String = row.get(5)?;
1134        let vector_clock: String = row.get(6)?;
1135        let data: Option<String> = row.get(7)?;
1136        let created_at_str: String = row.get(8)?;
1137
1138        Ok(ChangelogEntry {
1139            id,
1140            session_id,
1141            instance_id,
1142            entity_type,
1143            entity_id,
1144            operation,
1145            vector_clock,
1146            data,
1147            created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1148        })
1149    }
1150}
1151
1152#[derive(Debug, Clone)]
1153pub struct SyncedNodeRecord {
1154    pub id: i64,
1155    pub session_id: String,
1156    pub node_type: String,
1157    pub label: String,
1158    pub properties: serde_json::Value,
1159    pub embedding_id: Option<i64>,
1160    pub created_at: DateTime<Utc>,
1161    pub updated_at: DateTime<Utc>,
1162    pub vector_clock: String,
1163    pub last_modified_by: Option<String>,
1164    pub is_deleted: bool,
1165    pub sync_enabled: bool,
1166}
1167
1168impl SyncedNodeRecord {
1169    fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1170        let id: i64 = row.get(0)?;
1171        let session_id: String = row.get(1)?;
1172        let node_type: String = row.get(2)?;
1173        let label: String = row.get(3)?;
1174        let properties_str: String = row.get(4)?;
1175        let properties: serde_json::Value = serde_json::from_str(&properties_str).map_err(|e| {
1176            duckdb::Error::FromSqlConversionFailure(4, duckdb::types::Type::Text, Box::new(e))
1177        })?;
1178        let embedding_id: Option<i64> = row.get(5)?;
1179        let created_at_str: String = row.get(6)?;
1180        let updated_at_str: String = row.get(7)?;
1181        let vector_clock: String = row.get(8)?;
1182        let last_modified_by: Option<String> = row.get(9)?;
1183        let is_deleted: bool = row.get(10)?;
1184        let sync_enabled: bool = row.get(11)?;
1185
1186        Ok(SyncedNodeRecord {
1187            id,
1188            session_id,
1189            node_type,
1190            label,
1191            properties,
1192            embedding_id,
1193            created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1194            updated_at: updated_at_str.parse().unwrap_or_else(|_| Utc::now()),
1195            vector_clock,
1196            last_modified_by,
1197            is_deleted,
1198            sync_enabled,
1199        })
1200    }
1201}
1202
1203#[derive(Debug, Clone)]
1204pub struct SyncedEdgeRecord {
1205    pub id: i64,
1206    pub session_id: String,
1207    pub source_id: i64,
1208    pub target_id: i64,
1209    pub edge_type: String,
1210    pub predicate: Option<String>,
1211    pub properties: Option<serde_json::Value>,
1212    pub weight: f32,
1213    pub temporal_start: Option<DateTime<Utc>>,
1214    pub temporal_end: Option<DateTime<Utc>>,
1215    pub created_at: DateTime<Utc>,
1216    pub vector_clock: String,
1217    pub last_modified_by: Option<String>,
1218    pub is_deleted: bool,
1219    pub sync_enabled: bool,
1220}
1221
1222impl SyncedEdgeRecord {
1223    fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1224        let id: i64 = row.get(0)?;
1225        let session_id: String = row.get(1)?;
1226        let source_id: i64 = row.get(2)?;
1227        let target_id: i64 = row.get(3)?;
1228        let edge_type: String = row.get(4)?;
1229        let predicate: Option<String> = row.get(5)?;
1230        let properties_str: Option<String> = row.get(6)?;
1231        let properties: Option<serde_json::Value> = properties_str
1232            .as_ref()
1233            .and_then(|s| serde_json::from_str(s).ok());
1234        let weight: f32 = row.get(7)?;
1235        let temporal_start_str: Option<String> = row.get(8)?;
1236        let temporal_end_str: Option<String> = row.get(9)?;
1237        let created_at_str: String = row.get(10)?;
1238        let vector_clock: String = row.get(11)?;
1239        let last_modified_by: Option<String> = row.get(12)?;
1240        let is_deleted: bool = row.get(13)?;
1241        let sync_enabled: bool = row.get(14)?;
1242
1243        Ok(SyncedEdgeRecord {
1244            id,
1245            session_id,
1246            source_id,
1247            target_id,
1248            edge_type,
1249            predicate,
1250            properties,
1251            weight,
1252            temporal_start: temporal_start_str.and_then(|s| s.parse().ok()),
1253            temporal_end: temporal_end_str.and_then(|s| s.parse().ok()),
1254            created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1255            vector_clock,
1256            last_modified_by,
1257            is_deleted,
1258            sync_enabled,
1259        })
1260    }
1261}