spec_ai_config/persistence/
mod.rs

1pub mod migrations;
2
3use anyhow::{Context, Result};
4use chrono::{DateTime, Utc};
5use directories::BaseDirs;
6use duckdb::{params, Connection};
7use serde_json::Value as JsonValue;
8use spec_ai_knowledge_graph::KnowledgeGraphStore;
9use std::path::{Path, PathBuf};
10use std::sync::{Arc, Mutex};
11
12use crate::types::{
13    EdgeType, GraphEdge, GraphNode, GraphPath, MemoryVector, Message, MessageRole, NodeType,
14    PolicyEntry, TraversalDirection,
15};
16
17#[derive(Clone)]
18pub struct Persistence {
19    conn: Arc<Mutex<Connection>>,
20    instance_id: String,
21    graph_store: KnowledgeGraphStore,
22}
23
24impl Persistence {
25    /// Create or open the database at the provided path and run migrations.
26    pub fn new<P: AsRef<Path>>(db_path: P) -> Result<Self> {
27        Self::with_instance_id(db_path, generate_instance_id())
28    }
29
30    /// Create with a specific instance_id
31    pub fn with_instance_id<P: AsRef<Path>>(db_path: P, instance_id: String) -> Result<Self> {
32        let db_path = expand_tilde(db_path.as_ref())?;
33        if let Some(dir) = db_path.parent() {
34            std::fs::create_dir_all(dir).context("creating DB directory")?;
35        }
36        let conn = Connection::open(&db_path).context("opening DuckDB")?;
37        migrations::run(&conn).context("running migrations")?;
38        let conn_arc = Arc::new(Mutex::new(conn));
39        let graph_store = KnowledgeGraphStore::new(conn_arc.clone(), instance_id.clone());
40        Ok(Self {
41            conn: conn_arc,
42            instance_id,
43            graph_store,
44        })
45    }
46
47    /// Get the instance ID for this persistence instance
48    pub fn instance_id(&self) -> &str {
49        &self.instance_id
50    }
51
52    /// Get direct access to the KnowledgeGraphStore for Phase 2+ consumer migration
53    pub fn graph_store(&self) -> &KnowledgeGraphStore {
54        &self.graph_store
55    }
56
57    /// Checkpoint the database to ensure all WAL data is written to the main database file.
58    /// Call this before shutdown to ensure clean database state.
59    pub fn checkpoint(&self) -> Result<()> {
60        let conn = self.conn();
61        conn.execute_batch("CHECKPOINT;")
62            .context("checkpointing database")
63    }
64
65    /// Creates or opens the default database at ~/.spec-ai/agent_data.duckdb
66    pub fn new_default() -> Result<Self> {
67        let base = BaseDirs::new().context("base directories not available")?;
68        let path = base.home_dir().join(".agent_cli").join("agent_data.duckdb");
69        Self::new(path)
70    }
71
72    /// Get access to the pooled database connection.
73    /// Returns a MutexGuard that provides exclusive access to the connection.
74    pub fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
75        self.conn
76            .lock()
77            .expect("database connection mutex poisoned")
78    }
79
80    // ---------- Messages ----------
81
82    pub fn insert_message(
83        &self,
84        session_id: &str,
85        role: MessageRole,
86        content: &str,
87    ) -> Result<i64> {
88        let conn = self.conn();
89        let mut stmt = conn.prepare(
90            "INSERT INTO messages (session_id, role, content) VALUES (?, ?, ?) RETURNING id",
91        )?;
92        let id: i64 = stmt.query_row(params![session_id, role.as_str(), content], |row| {
93            row.get(0)
94        })?;
95        Ok(id)
96    }
97
98    pub fn list_messages(&self, session_id: &str, limit: i64) -> Result<Vec<Message>> {
99        let conn = self.conn();
100        let mut stmt = conn.prepare("SELECT id, session_id, role, content, CAST(created_at AS TEXT) as created_at FROM messages WHERE session_id = ? ORDER BY id DESC LIMIT ?")?;
101        let mut rows = stmt.query(params![session_id, limit])?;
102        let mut out = Vec::new();
103        while let Some(row) = rows.next()? {
104            let id: i64 = row.get(0)?;
105            let sid: String = row.get(1)?;
106            let role: String = row.get(2)?;
107            let content: String = row.get(3)?;
108            let created_at: String = row.get(4)?; // DuckDB returns TIMESTAMP as string
109            let created_at: DateTime<Utc> = created_at.parse().unwrap_or_else(|_| Utc::now());
110            out.push(Message {
111                id,
112                session_id: sid,
113                role: MessageRole::from_str(&role),
114                content,
115                created_at,
116            });
117        }
118        out.reverse();
119        Ok(out)
120    }
121
122    pub fn get_message(&self, message_id: i64) -> Result<Option<Message>> {
123        let conn = self.conn();
124        let mut stmt = conn.prepare("SELECT id, session_id, role, content, CAST(created_at AS TEXT) as created_at FROM messages WHERE id = ?")?;
125        let mut rows = stmt.query(params![message_id])?;
126        if let Some(row) = rows.next()? {
127            let id: i64 = row.get(0)?;
128            let sid: String = row.get(1)?;
129            let role: String = row.get(2)?;
130            let content: String = row.get(3)?;
131            let created_at: String = row.get(4)?;
132            let created_at: DateTime<Utc> = created_at.parse().unwrap_or_else(|_| Utc::now());
133            Ok(Some(Message {
134                id,
135                session_id: sid,
136                role: MessageRole::from_str(&role),
137                content,
138                created_at,
139            }))
140        } else {
141            Ok(None)
142        }
143    }
144
145    /// Simple pruning by keeping only the most recent `keep_latest` messages.
146    pub fn prune_messages(&self, session_id: &str, keep_latest: i64) -> Result<u64> {
147        let conn = self.conn();
148        let mut stmt = conn.prepare("DELETE FROM messages WHERE session_id = ? AND id NOT IN (SELECT id FROM messages WHERE session_id = ? ORDER BY id DESC LIMIT ?)")?;
149        let changed = stmt.execute(params![session_id, session_id, keep_latest])? as u64;
150        Ok(changed)
151    }
152
153    // ---------- Memory Vectors ----------
154
155    pub fn insert_memory_vector(
156        &self,
157        session_id: &str,
158        message_id: Option<i64>,
159        embedding: &[f32],
160    ) -> Result<i64> {
161        let conn = self.conn();
162        let embedding_json = serde_json::to_string(embedding)?;
163        let mut stmt = conn.prepare("INSERT INTO memory_vectors (session_id, message_id, embedding) VALUES (?, ?, ?) RETURNING id")?;
164        let id: i64 = stmt.query_row(params![session_id, message_id, embedding_json], |row| {
165            row.get(0)
166        })?;
167        Ok(id)
168    }
169
170    pub fn recall_top_k(
171        &self,
172        session_id: &str,
173        query_embedding: &[f32],
174        k: usize,
175    ) -> Result<Vec<(MemoryVector, f32)>> {
176        let conn = self.conn();
177        let mut stmt = conn.prepare("SELECT id, session_id, message_id, embedding, CAST(created_at AS TEXT) as created_at FROM memory_vectors WHERE session_id = ?")?;
178        let mut rows = stmt.query(params![session_id])?;
179        let mut scored: Vec<(MemoryVector, f32)> = Vec::new();
180        while let Some(row) = rows.next()? {
181            let id: i64 = row.get(0)?;
182            let sid: String = row.get(1)?;
183            let message_id: Option<i64> = row.get(2)?;
184            let embedding_text: String = row.get(3)?;
185            let created_at: String = row.get(4)?;
186            let created_at: DateTime<Utc> = created_at.parse().unwrap_or_else(|_| Utc::now());
187            let embedding: Vec<f32> = serde_json::from_str(&embedding_text).unwrap_or_default();
188            let score = cosine_similarity(query_embedding, &embedding);
189            scored.push((
190                MemoryVector {
191                    id,
192                    session_id: sid,
193                    message_id,
194                    embedding,
195                    created_at,
196                },
197                score,
198            ));
199        }
200        scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
201        scored.truncate(k);
202        Ok(scored)
203    }
204
205    /// List known session IDs ordered by most recent activity
206    pub fn list_sessions(&self) -> Result<Vec<String>> {
207        let conn = self.conn();
208        let mut stmt = conn.prepare(
209            "SELECT session_id, MAX(created_at) as last FROM messages GROUP BY session_id ORDER BY last DESC"
210        )?;
211        let mut rows = stmt.query([])?;
212        let mut out = Vec::new();
213        while let Some(row) = rows.next()? {
214            let sid: String = row.get(0)?;
215            out.push(sid);
216        }
217        Ok(out)
218    }
219
220    // ---------- Tool Log ----------
221
222    pub fn log_tool(
223        &self,
224        session_id: &str,
225        agent_name: &str,
226        run_id: &str,
227        tool_name: &str,
228        arguments: &JsonValue,
229        result: &JsonValue,
230        success: bool,
231        error: Option<&str>,
232    ) -> Result<i64> {
233        let conn = self.conn();
234        let mut stmt = conn.prepare("INSERT INTO tool_log (session_id, agent, run_id, tool_name, arguments, result, success, error) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id")?;
235        let id: i64 = stmt.query_row(
236            params![
237                session_id,
238                agent_name,
239                run_id,
240                tool_name,
241                arguments.to_string(),
242                result.to_string(),
243                success,
244                error.unwrap_or("")
245            ],
246            |row| row.get(0),
247        )?;
248        Ok(id)
249    }
250
251    // ---------- Policy Cache ----------
252
253    pub fn policy_upsert(&self, key: &str, value: &JsonValue) -> Result<()> {
254        let conn = self.conn();
255        // DuckDB upsert workaround: delete then insert atomically within a transaction.
256        conn.execute_batch("BEGIN TRANSACTION;")?;
257        {
258            let mut del = conn.prepare("DELETE FROM policy_cache WHERE key = ?")?;
259            let _ = del.execute(params![key])?;
260            let mut ins = conn.prepare("INSERT INTO policy_cache (key, value, updated_at) VALUES (?, ?, CURRENT_TIMESTAMP)")?;
261            let _ = ins.execute(params![key, value.to_string()])?;
262        }
263        conn.execute_batch("COMMIT;")?;
264        Ok(())
265    }
266
267    pub fn policy_get(&self, key: &str) -> Result<Option<PolicyEntry>> {
268        let conn = self.conn();
269        let mut stmt = conn.prepare("SELECT key, value, CAST(updated_at AS TEXT) as updated_at FROM policy_cache WHERE key = ?")?;
270        let mut rows = stmt.query(params![key])?;
271        if let Some(row) = rows.next()? {
272            let key: String = row.get(0)?;
273            let value_text: String = row.get(1)?;
274            let updated_at: String = row.get(2)?;
275            let updated_at: DateTime<Utc> = updated_at.parse().unwrap_or_else(|_| Utc::now());
276            let value: JsonValue = serde_json::from_str(&value_text).unwrap_or(JsonValue::Null);
277            Ok(Some(PolicyEntry {
278                key,
279                value,
280                updated_at,
281            }))
282        } else {
283            Ok(None)
284        }
285    }
286}
287
288fn generate_instance_id() -> String {
289    let hostname = hostname::get()
290        .ok()
291        .and_then(|h| h.into_string().ok())
292        .unwrap_or_else(|| "unknown".to_string());
293    let uuid = uuid::Uuid::new_v7(uuid::Timestamp::now(uuid::NoContext));
294    format!("{}-{}", hostname, uuid)
295}
296
297fn expand_tilde(path: &Path) -> Result<PathBuf> {
298    let path_str = path.to_string_lossy();
299    if path_str == "~" {
300        let base = BaseDirs::new().context("base directories not available")?;
301        Ok(base.home_dir().to_path_buf())
302    } else if let Some(stripped) = path_str.strip_prefix("~/") {
303        let base = BaseDirs::new().context("base directories not available")?;
304        Ok(base.home_dir().join(stripped))
305    } else {
306        Ok(path.to_path_buf())
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313    use std::path::Path;
314
315    #[test]
316    fn expands_home_directory_prefix() {
317        let base = BaseDirs::new().expect("home directory available");
318        let expected = base.home_dir().join("demo.db");
319        let result = expand_tilde(Path::new("~/demo.db")).expect("path expansion succeeds");
320        assert_eq!(result, expected);
321    }
322
323    #[test]
324    fn leaves_regular_paths_unchanged() {
325        let input = Path::new("relative/path.db");
326        let result = expand_tilde(input).expect("path expansion succeeds");
327        assert_eq!(result, input);
328    }
329}
330
331fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
332    if a.is_empty() || b.is_empty() || a.len() != b.len() {
333        return 0.0;
334    }
335    let mut dot = 0.0f32;
336    let mut na = 0.0f32;
337    let mut nb = 0.0f32;
338    for i in 0..a.len() {
339        dot += a[i] * b[i];
340        na += a[i] * a[i];
341        nb += b[i] * b[i];
342    }
343    if na == 0.0 || nb == 0.0 {
344        return 0.0;
345    }
346    dot / (na.sqrt() * nb.sqrt())
347}
348
349// ========== Knowledge Graph Methods ==========
350
351// Type Conversion Helpers
352
353fn from_kg_node(node: spec_ai_knowledge_graph::GraphNode) -> GraphNode {
354    GraphNode {
355        id: node.id,
356        session_id: node.session_id,
357        node_type: node.node_type,
358        label: node.label,
359        properties: node.properties,
360        embedding_id: node.embedding_id,
361        created_at: node.created_at,
362        updated_at: node.updated_at,
363    }
364}
365
366fn from_kg_edge(edge: spec_ai_knowledge_graph::GraphEdge) -> GraphEdge {
367    GraphEdge {
368        id: edge.id,
369        session_id: edge.session_id,
370        source_id: edge.source_id,
371        target_id: edge.target_id,
372        edge_type: edge.edge_type,
373        predicate: edge.predicate,
374        properties: edge.properties,
375        weight: edge.weight,
376        temporal_start: edge.temporal_start,
377        temporal_end: edge.temporal_end,
378        created_at: edge.created_at,
379    }
380}
381
382fn from_kg_path(path: spec_ai_knowledge_graph::GraphPath) -> GraphPath {
383    GraphPath {
384        nodes: path.nodes.into_iter().map(from_kg_node).collect(),
385        edges: path.edges.into_iter().map(from_kg_edge).collect(),
386        length: path.length,
387        weight: path.weight,
388    }
389}
390
391impl Persistence {
392    // ---------- Graph Node Operations ----------
393
394    pub fn insert_graph_node(
395        &self,
396        session_id: &str,
397        node_type: spec_ai_knowledge_graph::NodeType,
398        label: &str,
399        properties: &JsonValue,
400        embedding_id: Option<i64>,
401    ) -> Result<i64> {
402        self.graph_store
403            .insert_graph_node(session_id, node_type, label, properties, embedding_id)
404    }
405
406    pub fn get_graph_node(&self, node_id: i64) -> Result<Option<GraphNode>> {
407        self.graph_store
408            .get_graph_node(node_id)
409            .map(|opt| opt.map(from_kg_node))
410    }
411
412    pub fn list_graph_nodes(
413        &self,
414        session_id: &str,
415        node_type: Option<spec_ai_knowledge_graph::NodeType>,
416        limit: Option<i64>,
417    ) -> Result<Vec<GraphNode>> {
418        self.graph_store
419            .list_graph_nodes(session_id, node_type, limit)
420            .map(|nodes| nodes.into_iter().map(from_kg_node).collect())
421    }
422
423    pub fn count_graph_nodes(&self, session_id: &str) -> Result<i64> {
424        self.graph_store.count_graph_nodes(session_id)
425    }
426
427    pub fn update_graph_node(&self, node_id: i64, properties: &JsonValue) -> Result<()> {
428        self.graph_store.update_graph_node(node_id, properties)
429    }
430
431    pub fn delete_graph_node(&self, node_id: i64) -> Result<()> {
432        self.graph_store.delete_graph_node(node_id)
433    }
434
435    // ---------- Graph Edge Operations ----------
436
437    pub fn insert_graph_edge(
438        &self,
439        session_id: &str,
440        source_id: i64,
441        target_id: i64,
442        edge_type: spec_ai_knowledge_graph::EdgeType,
443        predicate: Option<&str>,
444        properties: Option<&JsonValue>,
445        weight: f32,
446    ) -> Result<i64> {
447        self.graph_store.insert_graph_edge(
448            session_id, source_id, target_id, edge_type, predicate, properties, weight,
449        )
450    }
451
452    pub fn get_graph_edge(&self, edge_id: i64) -> Result<Option<GraphEdge>> {
453        self.graph_store
454            .get_graph_edge(edge_id)
455            .map(|opt| opt.map(from_kg_edge))
456    }
457
458    pub fn list_graph_edges(
459        &self,
460        session_id: &str,
461        source_id: Option<i64>,
462        target_id: Option<i64>,
463    ) -> Result<Vec<GraphEdge>> {
464        self.graph_store
465            .list_graph_edges(session_id, source_id, target_id)
466            .map(|edges| edges.into_iter().map(from_kg_edge).collect())
467    }
468
469    pub fn count_graph_edges(&self, session_id: &str) -> Result<i64> {
470        self.graph_store.count_graph_edges(session_id)
471    }
472
473    pub fn delete_graph_edge(&self, edge_id: i64) -> Result<()> {
474        self.graph_store.delete_graph_edge(edge_id)
475    }
476
477    // ---------- Graph Traversal Operations ----------
478
479    pub fn find_shortest_path(
480        &self,
481        session_id: &str,
482        source_id: i64,
483        target_id: i64,
484        max_hops: Option<usize>,
485    ) -> Result<Option<GraphPath>> {
486        self.graph_store
487            .find_shortest_path(session_id, source_id, target_id, max_hops)
488            .map(|opt| opt.map(from_kg_path))
489    }
490
491    pub fn traverse_neighbors(
492        &self,
493        session_id: &str,
494        node_id: i64,
495        direction: spec_ai_knowledge_graph::TraversalDirection,
496        depth: usize,
497    ) -> Result<Vec<GraphNode>> {
498        self.graph_store
499            .traverse_neighbors(session_id, node_id, direction, depth)
500            .map(|nodes| nodes.into_iter().map(from_kg_node).collect())
501    }
502
503    // ---------- Transcriptions ----------
504
505    pub fn insert_transcription(
506        &self,
507        session_id: &str,
508        chunk_id: i64,
509        text: &str,
510        timestamp: chrono::DateTime<Utc>,
511    ) -> Result<i64> {
512        let conn = self.conn();
513        let mut stmt = conn.prepare(
514            "INSERT INTO transcriptions (session_id, chunk_id, text, timestamp, embedding_id) VALUES (?, ?, ?, ?, NULL) RETURNING id",
515        )?;
516        let id: i64 = stmt.query_row(
517            params![session_id, chunk_id, text, timestamp.to_rfc3339()],
518            |row| row.get(0),
519        )?;
520        Ok(id)
521    }
522
523    pub fn update_transcription_embedding(
524        &self,
525        transcription_id: i64,
526        embedding_id: i64,
527    ) -> Result<()> {
528        let conn = self.conn();
529        conn.execute(
530            "UPDATE transcriptions SET embedding_id = ? WHERE id = ?",
531            params![embedding_id, transcription_id],
532        )?;
533        Ok(())
534    }
535
536    pub fn list_transcriptions(
537        &self,
538        session_id: &str,
539        limit: Option<i64>,
540    ) -> Result<Vec<(i64, i64, String, DateTime<Utc>)>> {
541        let conn = self.conn();
542        let query = if let Some(lim) = limit {
543            format!(
544                "SELECT id, chunk_id, text, CAST(timestamp AS TEXT) FROM transcriptions WHERE session_id = ? ORDER BY chunk_id ASC LIMIT {}",
545                lim
546            )
547        } else {
548            "SELECT id, chunk_id, text, CAST(timestamp AS TEXT) FROM transcriptions WHERE session_id = ? ORDER BY chunk_id ASC".to_string()
549        };
550
551        let mut stmt = conn.prepare(&query)?;
552        let mut rows = stmt.query(params![session_id])?;
553        let mut out = Vec::new();
554
555        while let Some(row) = rows.next()? {
556            let id: i64 = row.get(0)?;
557            let chunk_id: i64 = row.get(1)?;
558            let text: String = row.get(2)?;
559            let timestamp_str: String = row.get(3)?;
560            let timestamp: DateTime<Utc> = timestamp_str.parse().unwrap_or_else(|_| Utc::now());
561            out.push((id, chunk_id, text, timestamp));
562        }
563
564        Ok(out)
565    }
566
567    pub fn get_full_transcription(&self, session_id: &str) -> Result<String> {
568        let transcriptions = self.list_transcriptions(session_id, None)?;
569        Ok(transcriptions
570            .into_iter()
571            .map(|(_, _, text, _)| text)
572            .collect::<Vec<_>>()
573            .join(" "))
574    }
575
576    pub fn delete_transcriptions(&self, session_id: &str) -> Result<()> {
577        let conn = self.conn();
578        conn.execute(
579            "DELETE FROM transcriptions WHERE session_id = ?",
580            params![session_id],
581        )?;
582        Ok(())
583    }
584
585    pub fn get_transcription_by_embedding(&self, embedding_id: i64) -> Result<Option<String>> {
586        let conn = self.conn();
587        let mut stmt =
588            conn.prepare("SELECT text FROM transcriptions WHERE embedding_id = ? LIMIT 1")?;
589        let result: Result<String, _> = stmt.query_row(params![embedding_id], |row| row.get(0));
590        match result {
591            Ok(text) => Ok(Some(text)),
592            Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
593            Err(e) => Err(e.into()),
594        }
595    }
596
597    // ---------- Tokenized Files Cache ----------
598
599    /// Persist tokenization metadata for a file, replacing any existing entry for the path.
600    pub fn upsert_tokenized_file(
601        &self,
602        session_id: &str,
603        path: &str,
604        file_hash: &str,
605        raw_tokens: usize,
606        cleaned_tokens: usize,
607        bytes_captured: usize,
608        truncated: bool,
609        embedding_id: Option<i64>,
610    ) -> Result<i64> {
611        let conn = self.conn();
612        conn.execute(
613            "DELETE FROM tokenized_files WHERE session_id = ? AND path = ?",
614            params![session_id, path],
615        )?;
616        let mut stmt = conn.prepare("INSERT INTO tokenized_files (session_id, path, file_hash, raw_tokens, cleaned_tokens, bytes_captured, truncated, embedding_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id")?;
617        let id: i64 = stmt.query_row(
618            params![
619                session_id,
620                path,
621                file_hash,
622                raw_tokens as i64,
623                cleaned_tokens as i64,
624                bytes_captured as i64,
625                truncated,
626                embedding_id
627            ],
628            |row| row.get(0),
629        )?;
630        Ok(id)
631    }
632
633    pub fn get_tokenized_file(
634        &self,
635        session_id: &str,
636        path: &str,
637    ) -> Result<Option<TokenizedFileRecord>> {
638        let conn = self.conn();
639        let mut stmt = conn.prepare("SELECT id, session_id, path, file_hash, raw_tokens, cleaned_tokens, bytes_captured, truncated, embedding_id, CAST(updated_at AS TEXT) FROM tokenized_files WHERE session_id = ? AND path = ? LIMIT 1")?;
640        let mut rows = stmt.query(params![session_id, path])?;
641        if let Some(row) = rows.next()? {
642            let record = TokenizedFileRecord::from_row(row)?;
643            Ok(Some(record))
644        } else {
645            Ok(None)
646        }
647    }
648
649    pub fn list_tokenized_files(&self, session_id: &str) -> Result<Vec<TokenizedFileRecord>> {
650        let conn = self.conn();
651        let mut stmt = conn.prepare("SELECT id, session_id, path, file_hash, raw_tokens, cleaned_tokens, bytes_captured, truncated, embedding_id, CAST(updated_at AS TEXT) FROM tokenized_files WHERE session_id = ? ORDER BY path")?;
652        let mut rows = stmt.query(params![session_id])?;
653        let mut out = Vec::new();
654        while let Some(row) = rows.next()? {
655            out.push(TokenizedFileRecord::from_row(row)?);
656        }
657        Ok(out)
658    }
659
660    // ========== Mesh Message Persistence ==========
661
662    /// Store a mesh message in the database
663    pub fn mesh_message_store(
664        &self,
665        message_id: &str,
666        source_instance: &str,
667        target_instance: Option<&str>,
668        message_type: &str,
669        payload: &JsonValue,
670        status: &str,
671    ) -> Result<i64> {
672        let conn = self.conn();
673        let payload_json = serde_json::to_string(payload)?;
674        conn.execute(
675            "INSERT INTO mesh_messages (message_id, source_instance, target_instance, message_type, payload, status) VALUES (?, ?, ?, ?, ?, ?)",
676            params![message_id, source_instance, target_instance, message_type, payload_json, status],
677        )?;
678        // Get the last inserted ID
679        let id: i64 = conn.query_row("SELECT last_insert_rowid()", params![], |row| row.get(0))?;
680        Ok(id)
681    }
682
683    /// Check if a message with this ID already exists (for duplicate detection)
684    pub fn mesh_message_exists(&self, message_id: &str) -> Result<bool> {
685        let conn = self.conn();
686        let count: i64 = conn.query_row(
687            "SELECT COUNT(*) FROM mesh_messages WHERE message_id = ?",
688            params![message_id],
689            |row| row.get(0),
690        )?;
691        Ok(count > 0)
692    }
693
694    /// Update message status (e.g., delivered, failed)
695    pub fn mesh_message_update_status(&self, message_id: i64, status: &str) -> Result<()> {
696        let conn = self.conn();
697        conn.execute(
698            "UPDATE mesh_messages SET status = ?, delivered_at = CURRENT_TIMESTAMP WHERE id = ?",
699            params![status, message_id],
700        )?;
701        Ok(())
702    }
703
704    /// Get pending messages for a target instance
705    pub fn mesh_message_get_pending(
706        &self,
707        target_instance: &str,
708    ) -> Result<Vec<MeshMessageRecord>> {
709        let conn = self.conn();
710        let mut stmt = conn.prepare(
711            "SELECT id, source_instance, target_instance, message_type, payload, status, CAST(created_at AS TEXT), CAST(delivered_at AS TEXT)
712             FROM mesh_messages
713             WHERE (target_instance = ? OR target_instance IS NULL) AND status = 'pending'
714             ORDER BY created_at",
715        )?;
716        let mut rows = stmt.query(params![target_instance])?;
717        let mut out = Vec::new();
718        while let Some(row) = rows.next()? {
719            out.push(MeshMessageRecord::from_row(row)?);
720        }
721        Ok(out)
722    }
723
724    /// Get message history for analytics
725    pub fn mesh_message_get_history(
726        &self,
727        instance_id: Option<&str>,
728        limit: usize,
729    ) -> Result<Vec<MeshMessageRecord>> {
730        let conn = self.conn();
731        let query = if instance_id.is_some() {
732            format!(
733                "SELECT id, source_instance, target_instance, message_type, payload, status, CAST(created_at AS TEXT), CAST(delivered_at AS TEXT)
734                 FROM mesh_messages
735                 WHERE source_instance = ? OR target_instance = ?
736                 ORDER BY created_at DESC LIMIT {}",
737                limit
738            )
739        } else {
740            format!(
741                "SELECT id, source_instance, target_instance, message_type, payload, status, CAST(created_at AS TEXT), CAST(delivered_at AS TEXT)
742                 FROM mesh_messages
743                 ORDER BY created_at DESC LIMIT {}",
744                limit
745            )
746        };
747
748        let mut stmt = conn.prepare(&query)?;
749        let mut rows = if let Some(inst) = instance_id {
750            stmt.query(params![inst, inst])?
751        } else {
752            stmt.query(params![])?
753        };
754
755        let mut out = Vec::new();
756        while let Some(row) = rows.next()? {
757            out.push(MeshMessageRecord::from_row(row)?);
758        }
759        Ok(out)
760    }
761
762    // ===== Graph Synchronization Methods =====
763
764    /// Append an entry to the graph changelog
765    pub fn graph_changelog_append(
766        &self,
767        session_id: &str,
768        instance_id: &str,
769        entity_type: &str,
770        entity_id: i64,
771        operation: &str,
772        vector_clock: &str,
773        data: Option<&str>,
774    ) -> Result<i64> {
775        self.graph_store.graph_changelog_append(
776            session_id,
777            instance_id,
778            entity_type,
779            entity_id,
780            operation,
781            vector_clock,
782            data,
783        )
784    }
785
786    /// Get changelog entries since a given timestamp for a session
787    pub fn graph_changelog_get_since(
788        &self,
789        session_id: &str,
790        since_timestamp: &str,
791    ) -> Result<Vec<ChangelogEntry>> {
792        self.graph_store
793            .graph_changelog_get_since(session_id, since_timestamp)
794            .map(|entries| {
795                entries
796                    .into_iter()
797                    .map(|e| ChangelogEntry {
798                        id: e.id,
799                        session_id: e.session_id,
800                        instance_id: e.instance_id,
801                        entity_type: e.entity_type,
802                        entity_id: e.entity_id,
803                        operation: e.operation,
804                        vector_clock: e.vector_clock,
805                        data: e.data,
806                        created_at: e.created_at,
807                    })
808                    .collect()
809            })
810    }
811
812    /// List conflict entries stored in the changelog
813    pub fn graph_list_conflicts(&self, session_id: Option<&str>) -> Result<Vec<ChangelogEntry>> {
814        self.graph_store
815            .graph_list_conflicts(session_id, 100)
816            .map(|entries| {
817                entries
818                    .into_iter()
819                    .map(|e| ChangelogEntry {
820                        id: e.id,
821                        session_id: e.session_id,
822                        instance_id: e.instance_id,
823                        entity_type: e.entity_type,
824                        entity_id: e.entity_id,
825                        operation: e.operation,
826                        vector_clock: e.vector_clock,
827                        data: e.data,
828                        created_at: e.created_at,
829                    })
830                    .collect()
831            })
832    }
833
834    /// Prune old changelog entries (keep last N days)
835    pub fn graph_changelog_prune(&self, days_to_keep: i64) -> Result<usize> {
836        self.graph_store.graph_changelog_prune(days_to_keep)
837    }
838
839    /// Get the vector clock for an instance/session/graph combination
840    pub fn graph_sync_state_get(
841        &self,
842        instance_id: &str,
843        session_id: &str,
844        graph_name: &str,
845    ) -> Result<Option<String>> {
846        self.graph_store
847            .graph_sync_state_get(instance_id, session_id, graph_name)
848    }
849
850    /// Get sync state metadata including last_sync_at
851    pub fn graph_sync_state_get_metadata(
852        &self,
853        instance_id: &str,
854        session_id: &str,
855        graph_name: &str,
856    ) -> Result<Option<SyncStateRecord>> {
857        self.graph_store
858            .graph_sync_state_get_metadata(instance_id, session_id, graph_name)
859            .map(|opt| {
860                opt.map(|r| SyncStateRecord {
861                    vector_clock: r.vector_clock,
862                    last_sync_at: r.last_sync_at,
863                })
864            })
865    }
866
867    /// Update the vector clock for an instance/session/graph combination
868    pub fn graph_sync_state_update(
869        &self,
870        instance_id: &str,
871        session_id: &str,
872        graph_name: &str,
873        vector_clock: &str,
874    ) -> Result<()> {
875        self.graph_store
876            .graph_sync_state_update(instance_id, session_id, graph_name, vector_clock)
877    }
878
879    /// Persist sync configuration for a graph
880    pub fn graph_set_sync_config(
881        &self,
882        session_id: &str,
883        graph_name: &str,
884        sync_enabled: bool,
885        conflict_resolution_strategy: Option<&str>,
886        sync_interval_seconds: Option<u64>,
887    ) -> Result<GraphSyncConfig> {
888        self.graph_store
889            .graph_set_sync_config(
890                session_id,
891                graph_name,
892                sync_enabled,
893                conflict_resolution_strategy,
894                sync_interval_seconds,
895            )
896            .map(|cfg| GraphSyncConfig {
897                sync_enabled: cfg.sync_enabled,
898                conflict_resolution_strategy: cfg.conflict_resolution_strategy,
899                sync_interval_seconds: cfg.sync_interval_seconds,
900            })
901    }
902
903    /// Retrieve sync configuration for a graph
904    pub fn graph_get_sync_config(
905        &self,
906        session_id: &str,
907        graph_name: &str,
908    ) -> Result<GraphSyncConfig> {
909        self.graph_store
910            .graph_get_sync_config(session_id, graph_name)
911            .map(|cfg| GraphSyncConfig {
912                sync_enabled: cfg.sync_enabled,
913                conflict_resolution_strategy: cfg.conflict_resolution_strategy,
914                sync_interval_seconds: cfg.sync_interval_seconds,
915            })
916    }
917
918    /// Enable or disable sync for a graph
919    pub fn graph_set_sync_enabled(
920        &self,
921        session_id: &str,
922        graph_name: &str,
923        enabled: bool,
924    ) -> Result<()> {
925        self.graph_store
926            .graph_set_sync_enabled(session_id, graph_name, enabled)
927    }
928
929    /// Check if sync is enabled for a graph
930    pub fn graph_get_sync_enabled(&self, session_id: &str, graph_name: &str) -> Result<bool> {
931        self.graph_store
932            .graph_get_sync_enabled(session_id, graph_name)
933    }
934
935    /// List all graphs for a session
936    pub fn graph_list(&self, session_id: &str) -> Result<Vec<String>> {
937        self.graph_store.graph_list(session_id)
938    }
939
940    /// List all sync-enabled graphs across all sessions
941    pub fn graph_list_sync_enabled(&self) -> Result<Vec<(String, String)>> {
942        self.graph_store.graph_list_sync_enabled()
943    }
944
945    /// Get a node with its sync metadata
946    pub fn graph_get_node_with_sync(&self, node_id: i64) -> Result<Option<SyncedNodeRecord>> {
947        self.graph_store
948            .graph_get_node_with_sync(node_id)
949            .map(|opt| {
950                opt.map(|r| SyncedNodeRecord {
951                    id: r.id,
952                    session_id: r.session_id,
953                    node_type: r.node_type,
954                    label: r.label,
955                    properties: r.properties,
956                    embedding_id: r.embedding_id,
957                    created_at: r.created_at,
958                    updated_at: r.updated_at,
959                    vector_clock: r.vector_clock,
960                    last_modified_by: r.last_modified_by,
961                    is_deleted: r.is_deleted,
962                    sync_enabled: r.sync_enabled,
963                })
964            })
965    }
966
967    /// Get all synced nodes for a session with optional filters
968    pub fn graph_list_nodes_with_sync(
969        &self,
970        session_id: &str,
971        sync_enabled_only: bool,
972        include_deleted: bool,
973    ) -> Result<Vec<SyncedNodeRecord>> {
974        self.graph_store
975            .graph_list_nodes_with_sync(session_id, sync_enabled_only, include_deleted)
976            .map(|nodes| {
977                nodes
978                    .into_iter()
979                    .map(|r| SyncedNodeRecord {
980                        id: r.id,
981                        session_id: r.session_id,
982                        node_type: r.node_type,
983                        label: r.label,
984                        properties: r.properties,
985                        embedding_id: r.embedding_id,
986                        created_at: r.created_at,
987                        updated_at: r.updated_at,
988                        vector_clock: r.vector_clock,
989                        last_modified_by: r.last_modified_by,
990                        is_deleted: r.is_deleted,
991                        sync_enabled: r.sync_enabled,
992                    })
993                    .collect()
994            })
995    }
996
997    /// Get an edge with its sync metadata
998    pub fn graph_get_edge_with_sync(&self, edge_id: i64) -> Result<Option<SyncedEdgeRecord>> {
999        self.graph_store
1000            .graph_get_edge_with_sync(edge_id)
1001            .map(|opt| {
1002                opt.map(|r| SyncedEdgeRecord {
1003                    id: r.id,
1004                    session_id: r.session_id,
1005                    source_id: r.source_id,
1006                    target_id: r.target_id,
1007                    edge_type: r.edge_type,
1008                    predicate: r.predicate,
1009                    properties: r.properties,
1010                    weight: r.weight,
1011                    temporal_start: r.temporal_start,
1012                    temporal_end: r.temporal_end,
1013                    created_at: r.created_at,
1014                    vector_clock: r.vector_clock,
1015                    last_modified_by: r.last_modified_by,
1016                    is_deleted: r.is_deleted,
1017                    sync_enabled: r.sync_enabled,
1018                })
1019            })
1020    }
1021
1022    /// Get all synced edges for a session with optional filters
1023    pub fn graph_list_edges_with_sync(
1024        &self,
1025        session_id: &str,
1026        sync_enabled_only: bool,
1027        include_deleted: bool,
1028    ) -> Result<Vec<SyncedEdgeRecord>> {
1029        self.graph_store
1030            .graph_list_edges_with_sync(session_id, sync_enabled_only, include_deleted)
1031            .map(|edges| {
1032                edges
1033                    .into_iter()
1034                    .map(|r| SyncedEdgeRecord {
1035                        id: r.id,
1036                        session_id: r.session_id,
1037                        source_id: r.source_id,
1038                        target_id: r.target_id,
1039                        edge_type: r.edge_type,
1040                        predicate: r.predicate,
1041                        properties: r.properties,
1042                        weight: r.weight,
1043                        temporal_start: r.temporal_start,
1044                        temporal_end: r.temporal_end,
1045                        created_at: r.created_at,
1046                        vector_clock: r.vector_clock,
1047                        last_modified_by: r.last_modified_by,
1048                        is_deleted: r.is_deleted,
1049                        sync_enabled: r.sync_enabled,
1050                    })
1051                    .collect()
1052            })
1053    }
1054
1055    /// Update a node's sync metadata
1056    pub fn graph_update_node_sync_metadata(
1057        &self,
1058        node_id: i64,
1059        vector_clock: &str,
1060        last_modified_by: &str,
1061        sync_enabled: bool,
1062    ) -> Result<()> {
1063        self.graph_store.graph_update_node_sync_metadata(
1064            node_id,
1065            vector_clock,
1066            last_modified_by,
1067            sync_enabled,
1068        )
1069    }
1070
1071    /// Update an edge's sync metadata
1072    pub fn graph_update_edge_sync_metadata(
1073        &self,
1074        edge_id: i64,
1075        vector_clock: &str,
1076        last_modified_by: &str,
1077        sync_enabled: bool,
1078    ) -> Result<()> {
1079        self.graph_store.graph_update_edge_sync_metadata(
1080            edge_id,
1081            vector_clock,
1082            last_modified_by,
1083            sync_enabled,
1084        )
1085    }
1086
1087    /// Mark a node as deleted (tombstone pattern)
1088    pub fn graph_mark_node_deleted(
1089        &self,
1090        node_id: i64,
1091        vector_clock: &str,
1092        deleted_by: &str,
1093    ) -> Result<()> {
1094        self.graph_store
1095            .graph_mark_node_deleted(node_id, vector_clock, deleted_by)
1096    }
1097
1098    /// Mark an edge as deleted (tombstone pattern)
1099    pub fn graph_mark_edge_deleted(
1100        &self,
1101        edge_id: i64,
1102        vector_clock: &str,
1103        deleted_by: &str,
1104    ) -> Result<()> {
1105        self.graph_store
1106            .graph_mark_edge_deleted(edge_id, vector_clock, deleted_by)
1107    }
1108}
1109
1110#[derive(Debug, Clone)]
1111pub struct TokenizedFileRecord {
1112    pub id: i64,
1113    pub session_id: String,
1114    pub path: String,
1115    pub file_hash: String,
1116    pub raw_tokens: usize,
1117    pub cleaned_tokens: usize,
1118    pub bytes_captured: usize,
1119    pub truncated: bool,
1120    pub embedding_id: Option<i64>,
1121    pub updated_at: DateTime<Utc>,
1122}
1123
1124impl TokenizedFileRecord {
1125    fn from_row(row: &duckdb::Row) -> Result<Self> {
1126        let id: i64 = row.get(0)?;
1127        let session_id: String = row.get(1)?;
1128        let path: String = row.get(2)?;
1129        let file_hash: String = row.get(3)?;
1130        let raw_tokens: i64 = row.get(4)?;
1131        let cleaned_tokens: i64 = row.get(5)?;
1132        let bytes_captured: i64 = row.get(6)?;
1133        let truncated: bool = row.get(7)?;
1134        let embedding_id: Option<i64> = row.get(8)?;
1135        let updated_at: String = row.get(9)?;
1136
1137        Ok(Self {
1138            id,
1139            session_id,
1140            path,
1141            file_hash,
1142            raw_tokens: raw_tokens.max(0) as usize,
1143            cleaned_tokens: cleaned_tokens.max(0) as usize,
1144            bytes_captured: bytes_captured.max(0) as usize,
1145            truncated,
1146            embedding_id,
1147            updated_at: updated_at.parse().unwrap_or_else(|_| Utc::now()),
1148        })
1149    }
1150}
1151
1152#[derive(Debug, Clone)]
1153pub struct MeshMessageRecord {
1154    pub id: i64,
1155    pub source_instance: String,
1156    pub target_instance: Option<String>,
1157    pub message_type: String,
1158    pub payload: JsonValue,
1159    pub status: String,
1160    pub created_at: DateTime<Utc>,
1161    pub delivered_at: Option<DateTime<Utc>>,
1162}
1163
1164impl MeshMessageRecord {
1165    fn from_row(row: &duckdb::Row) -> Result<Self> {
1166        let id: i64 = row.get(0)?;
1167        let source_instance: String = row.get(1)?;
1168        let target_instance: Option<String> = row.get(2)?;
1169        let message_type: String = row.get(3)?;
1170        let payload_str: String = row.get(4)?;
1171        let payload: JsonValue = serde_json::from_str(&payload_str)?;
1172        let status: String = row.get(5)?;
1173        let created_at_str: String = row.get(6)?;
1174        let delivered_at_str: Option<String> = row.get(7)?;
1175
1176        Ok(MeshMessageRecord {
1177            id,
1178            source_instance,
1179            target_instance,
1180            message_type,
1181            payload,
1182            status,
1183            created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1184            delivered_at: delivered_at_str.and_then(|s| s.parse().ok()),
1185        })
1186    }
1187}
1188
1189// ===== Graph Sync Record Types =====
1190
1191#[derive(Debug, Clone)]
1192pub struct SyncStateRecord {
1193    pub vector_clock: String,
1194    pub last_sync_at: Option<String>,
1195}
1196
1197#[derive(Debug, Clone, Default)]
1198pub struct GraphSyncConfig {
1199    pub sync_enabled: bool,
1200    pub conflict_resolution_strategy: Option<String>,
1201    pub sync_interval_seconds: Option<u64>,
1202}
1203
1204#[derive(Debug, Clone)]
1205pub struct ChangelogEntry {
1206    pub id: i64,
1207    pub session_id: String,
1208    pub instance_id: String,
1209    pub entity_type: String,
1210    pub entity_id: i64,
1211    pub operation: String,
1212    pub vector_clock: String,
1213    pub data: Option<String>,
1214    pub created_at: DateTime<Utc>,
1215}
1216
1217impl ChangelogEntry {
1218    fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1219        let id: i64 = row.get(0)?;
1220        let session_id: String = row.get(1)?;
1221        let instance_id: String = row.get(2)?;
1222        let entity_type: String = row.get(3)?;
1223        let entity_id: i64 = row.get(4)?;
1224        let operation: String = row.get(5)?;
1225        let vector_clock: String = row.get(6)?;
1226        let data: Option<String> = row.get(7)?;
1227        let created_at_str: String = row.get(8)?;
1228
1229        Ok(ChangelogEntry {
1230            id,
1231            session_id,
1232            instance_id,
1233            entity_type,
1234            entity_id,
1235            operation,
1236            vector_clock,
1237            data,
1238            created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1239        })
1240    }
1241}
1242
1243#[derive(Debug, Clone)]
1244pub struct SyncedNodeRecord {
1245    pub id: i64,
1246    pub session_id: String,
1247    pub node_type: String,
1248    pub label: String,
1249    pub properties: serde_json::Value,
1250    pub embedding_id: Option<i64>,
1251    pub created_at: DateTime<Utc>,
1252    pub updated_at: DateTime<Utc>,
1253    pub vector_clock: String,
1254    pub last_modified_by: Option<String>,
1255    pub is_deleted: bool,
1256    pub sync_enabled: bool,
1257}
1258
1259impl SyncedNodeRecord {
1260    fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1261        let id: i64 = row.get(0)?;
1262        let session_id: String = row.get(1)?;
1263        let node_type: String = row.get(2)?;
1264        let label: String = row.get(3)?;
1265        let properties_str: String = row.get(4)?;
1266        let properties: serde_json::Value = serde_json::from_str(&properties_str).map_err(|e| {
1267            duckdb::Error::FromSqlConversionFailure(4, duckdb::types::Type::Text, Box::new(e))
1268        })?;
1269        let embedding_id: Option<i64> = row.get(5)?;
1270        let created_at_str: String = row.get(6)?;
1271        let updated_at_str: String = row.get(7)?;
1272        let vector_clock: String = row.get(8)?;
1273        let last_modified_by: Option<String> = row.get(9)?;
1274        let is_deleted: bool = row.get(10)?;
1275        let sync_enabled: bool = row.get(11)?;
1276
1277        Ok(SyncedNodeRecord {
1278            id,
1279            session_id,
1280            node_type,
1281            label,
1282            properties,
1283            embedding_id,
1284            created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1285            updated_at: updated_at_str.parse().unwrap_or_else(|_| Utc::now()),
1286            vector_clock,
1287            last_modified_by,
1288            is_deleted,
1289            sync_enabled,
1290        })
1291    }
1292}
1293
1294#[derive(Debug, Clone)]
1295pub struct SyncedEdgeRecord {
1296    pub id: i64,
1297    pub session_id: String,
1298    pub source_id: i64,
1299    pub target_id: i64,
1300    pub edge_type: String,
1301    pub predicate: Option<String>,
1302    pub properties: Option<serde_json::Value>,
1303    pub weight: f32,
1304    pub temporal_start: Option<DateTime<Utc>>,
1305    pub temporal_end: Option<DateTime<Utc>>,
1306    pub created_at: DateTime<Utc>,
1307    pub vector_clock: String,
1308    pub last_modified_by: Option<String>,
1309    pub is_deleted: bool,
1310    pub sync_enabled: bool,
1311}
1312
1313impl SyncedEdgeRecord {
1314    fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1315        let id: i64 = row.get(0)?;
1316        let session_id: String = row.get(1)?;
1317        let source_id: i64 = row.get(2)?;
1318        let target_id: i64 = row.get(3)?;
1319        let edge_type: String = row.get(4)?;
1320        let predicate: Option<String> = row.get(5)?;
1321        let properties_str: Option<String> = row.get(6)?;
1322        let properties: Option<serde_json::Value> = properties_str
1323            .as_ref()
1324            .and_then(|s| serde_json::from_str(s).ok());
1325        let weight: f32 = row.get(7)?;
1326        let temporal_start_str: Option<String> = row.get(8)?;
1327        let temporal_end_str: Option<String> = row.get(9)?;
1328        let created_at_str: String = row.get(10)?;
1329        let vector_clock: String = row.get(11)?;
1330        let last_modified_by: Option<String> = row.get(12)?;
1331        let is_deleted: bool = row.get(13)?;
1332        let sync_enabled: bool = row.get(14)?;
1333
1334        Ok(SyncedEdgeRecord {
1335            id,
1336            session_id,
1337            source_id,
1338            target_id,
1339            edge_type,
1340            predicate,
1341            properties,
1342            weight,
1343            temporal_start: temporal_start_str.and_then(|s| s.parse().ok()),
1344            temporal_end: temporal_end_str.and_then(|s| s.parse().ok()),
1345            created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1346            vector_clock,
1347            last_modified_by,
1348            is_deleted,
1349            sync_enabled,
1350        })
1351    }
1352}