spec_ai_config/persistence/
mod.rs

1pub mod migrations;
2
3use anyhow::{Context, Result};
4use chrono::{DateTime, Utc};
5use directories::BaseDirs;
6use duckdb::{params, Connection};
7use serde_json::Value as JsonValue;
8use spec_ai_knowledge_graph::KnowledgeGraphStore;
9use std::path::{Path, PathBuf};
10use std::sync::{Arc, Mutex};
11
12use crate::types::{
13    GraphEdge, GraphNode, GraphPath, MemoryVector, Message, MessageRole, PolicyEntry,
14};
15
16#[derive(Clone)]
17pub struct Persistence {
18    conn: Arc<Mutex<Connection>>,
19    instance_id: String,
20    graph_store: KnowledgeGraphStore,
21}
22
23impl Persistence {
24    /// Create or open the database at the provided path and run migrations.
25    pub fn new<P: AsRef<Path>>(db_path: P) -> Result<Self> {
26        Self::with_instance_id(db_path, generate_instance_id())
27    }
28
29    /// Create with a specific instance_id
30    pub fn with_instance_id<P: AsRef<Path>>(db_path: P, instance_id: String) -> Result<Self> {
31        let db_path = expand_tilde(db_path.as_ref())?;
32        if let Some(dir) = db_path.parent() {
33            std::fs::create_dir_all(dir).context("creating DB directory")?;
34        }
35        let conn = Connection::open(&db_path).context("opening DuckDB")?;
36        migrations::run(&conn).context("running migrations")?;
37        let conn_arc = Arc::new(Mutex::new(conn));
38        let graph_store = KnowledgeGraphStore::new(conn_arc.clone(), instance_id.clone());
39        Ok(Self {
40            conn: conn_arc,
41            instance_id,
42            graph_store,
43        })
44    }
45
46    /// Get the instance ID for this persistence instance
47    pub fn instance_id(&self) -> &str {
48        &self.instance_id
49    }
50
51    /// Get direct access to the KnowledgeGraphStore for Phase 2+ consumer migration
52    pub fn graph_store(&self) -> &KnowledgeGraphStore {
53        &self.graph_store
54    }
55
56    /// Checkpoint the database to ensure all WAL data is written to the main database file.
57    /// Call this before shutdown to ensure clean database state.
58    pub fn checkpoint(&self) -> Result<()> {
59        let conn = self.conn();
60        conn.execute_batch("CHECKPOINT;")
61            .context("checkpointing database")
62    }
63
64    /// Creates or opens the default database at ~/.spec-ai/agent_data.duckdb
65    pub fn new_default() -> Result<Self> {
66        let base = BaseDirs::new().context("base directories not available")?;
67        let path = base.home_dir().join(".agent_cli").join("agent_data.duckdb");
68        Self::new(path)
69    }
70
71    /// Get access to the pooled database connection.
72    /// Returns a MutexGuard that provides exclusive access to the connection.
73    pub fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
74        self.conn
75            .lock()
76            .expect("database connection mutex poisoned")
77    }
78
79    // ---------- Messages ----------
80
81    pub fn insert_message(
82        &self,
83        session_id: &str,
84        role: MessageRole,
85        content: &str,
86    ) -> Result<i64> {
87        let conn = self.conn();
88        let mut stmt = conn.prepare(
89            "INSERT INTO messages (session_id, role, content) VALUES (?, ?, ?) RETURNING id",
90        )?;
91        let id: i64 = stmt.query_row(params![session_id, role.as_str(), content], |row| {
92            row.get(0)
93        })?;
94        Ok(id)
95    }
96
97    pub fn list_messages(&self, session_id: &str, limit: i64) -> Result<Vec<Message>> {
98        let conn = self.conn();
99        let mut stmt = conn.prepare("SELECT id, session_id, role, content, CAST(created_at AS TEXT) as created_at FROM messages WHERE session_id = ? ORDER BY id DESC LIMIT ?")?;
100        let mut rows = stmt.query(params![session_id, limit])?;
101        let mut out = Vec::new();
102        while let Some(row) = rows.next()? {
103            let id: i64 = row.get(0)?;
104            let sid: String = row.get(1)?;
105            let role: String = row.get(2)?;
106            let content: String = row.get(3)?;
107            let created_at: String = row.get(4)?; // DuckDB returns TIMESTAMP as string
108            let created_at: DateTime<Utc> = created_at.parse().unwrap_or_else(|_| Utc::now());
109            out.push(Message {
110                id,
111                session_id: sid,
112                role: MessageRole::from_str(&role),
113                content,
114                created_at,
115            });
116        }
117        out.reverse();
118        Ok(out)
119    }
120
121    pub fn get_message(&self, message_id: i64) -> Result<Option<Message>> {
122        let conn = self.conn();
123        let mut stmt = conn.prepare("SELECT id, session_id, role, content, CAST(created_at AS TEXT) as created_at FROM messages WHERE id = ?")?;
124        let mut rows = stmt.query(params![message_id])?;
125        if let Some(row) = rows.next()? {
126            let id: i64 = row.get(0)?;
127            let sid: String = row.get(1)?;
128            let role: String = row.get(2)?;
129            let content: String = row.get(3)?;
130            let created_at: String = row.get(4)?;
131            let created_at: DateTime<Utc> = created_at.parse().unwrap_or_else(|_| Utc::now());
132            Ok(Some(Message {
133                id,
134                session_id: sid,
135                role: MessageRole::from_str(&role),
136                content,
137                created_at,
138            }))
139        } else {
140            Ok(None)
141        }
142    }
143
144    /// Simple pruning by keeping only the most recent `keep_latest` messages.
145    pub fn prune_messages(&self, session_id: &str, keep_latest: i64) -> Result<u64> {
146        let conn = self.conn();
147        let mut stmt = conn.prepare("DELETE FROM messages WHERE session_id = ? AND id NOT IN (SELECT id FROM messages WHERE session_id = ? ORDER BY id DESC LIMIT ?)")?;
148        let changed = stmt.execute(params![session_id, session_id, keep_latest])? as u64;
149        Ok(changed)
150    }
151
152    // ---------- Memory Vectors ----------
153
154    pub fn insert_memory_vector(
155        &self,
156        session_id: &str,
157        message_id: Option<i64>,
158        embedding: &[f32],
159    ) -> Result<i64> {
160        let conn = self.conn();
161        let embedding_json = serde_json::to_string(embedding)?;
162        let mut stmt = conn.prepare("INSERT INTO memory_vectors (session_id, message_id, embedding) VALUES (?, ?, ?) RETURNING id")?;
163        let id: i64 = stmt.query_row(params![session_id, message_id, embedding_json], |row| {
164            row.get(0)
165        })?;
166        Ok(id)
167    }
168
169    pub fn recall_top_k(
170        &self,
171        session_id: &str,
172        query_embedding: &[f32],
173        k: usize,
174    ) -> Result<Vec<(MemoryVector, f32)>> {
175        let conn = self.conn();
176        let mut stmt = conn.prepare("SELECT id, session_id, message_id, embedding, CAST(created_at AS TEXT) as created_at FROM memory_vectors WHERE session_id = ?")?;
177        let mut rows = stmt.query(params![session_id])?;
178        let mut scored: Vec<(MemoryVector, f32)> = Vec::new();
179        while let Some(row) = rows.next()? {
180            let id: i64 = row.get(0)?;
181            let sid: String = row.get(1)?;
182            let message_id: Option<i64> = row.get(2)?;
183            let embedding_text: String = row.get(3)?;
184            let created_at: String = row.get(4)?;
185            let created_at: DateTime<Utc> = created_at.parse().unwrap_or_else(|_| Utc::now());
186            let embedding: Vec<f32> = serde_json::from_str(&embedding_text).unwrap_or_default();
187            let score = cosine_similarity(query_embedding, &embedding);
188            scored.push((
189                MemoryVector {
190                    id,
191                    session_id: sid,
192                    message_id,
193                    embedding,
194                    created_at,
195                },
196                score,
197            ));
198        }
199        scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
200        scored.truncate(k);
201        Ok(scored)
202    }
203
204    /// List known session IDs ordered by most recent activity
205    pub fn list_sessions(&self) -> Result<Vec<String>> {
206        let conn = self.conn();
207        let mut stmt = conn.prepare(
208            "SELECT session_id, MAX(created_at) as last FROM messages GROUP BY session_id ORDER BY last DESC"
209        )?;
210        let mut rows = stmt.query([])?;
211        let mut out = Vec::new();
212        while let Some(row) = rows.next()? {
213            let sid: String = row.get(0)?;
214            out.push(sid);
215        }
216        Ok(out)
217    }
218
219    // ---------- Tool Log ----------
220
221    pub fn log_tool(
222        &self,
223        session_id: &str,
224        agent_name: &str,
225        run_id: &str,
226        tool_name: &str,
227        arguments: &JsonValue,
228        result: &JsonValue,
229        success: bool,
230        error: Option<&str>,
231    ) -> Result<i64> {
232        let conn = self.conn();
233        let mut stmt = conn.prepare("INSERT INTO tool_log (session_id, agent, run_id, tool_name, arguments, result, success, error) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id")?;
234        let id: i64 = stmt.query_row(
235            params![
236                session_id,
237                agent_name,
238                run_id,
239                tool_name,
240                arguments.to_string(),
241                result.to_string(),
242                success,
243                error.unwrap_or("")
244            ],
245            |row| row.get(0),
246        )?;
247        Ok(id)
248    }
249
250    // ---------- Policy Cache ----------
251
252    pub fn policy_upsert(&self, key: &str, value: &JsonValue) -> Result<()> {
253        let conn = self.conn();
254        // DuckDB upsert workaround: delete then insert atomically within a transaction.
255        conn.execute_batch("BEGIN TRANSACTION;")?;
256        {
257            let mut del = conn.prepare("DELETE FROM policy_cache WHERE key = ?")?;
258            let _ = del.execute(params![key])?;
259            let mut ins = conn.prepare("INSERT INTO policy_cache (key, value, updated_at) VALUES (?, ?, CURRENT_TIMESTAMP)")?;
260            let _ = ins.execute(params![key, value.to_string()])?;
261        }
262        conn.execute_batch("COMMIT;")?;
263        Ok(())
264    }
265
266    pub fn policy_get(&self, key: &str) -> Result<Option<PolicyEntry>> {
267        let conn = self.conn();
268        let mut stmt = conn.prepare("SELECT key, value, CAST(updated_at AS TEXT) as updated_at FROM policy_cache WHERE key = ?")?;
269        let mut rows = stmt.query(params![key])?;
270        if let Some(row) = rows.next()? {
271            let key: String = row.get(0)?;
272            let value_text: String = row.get(1)?;
273            let updated_at: String = row.get(2)?;
274            let updated_at: DateTime<Utc> = updated_at.parse().unwrap_or_else(|_| Utc::now());
275            let value: JsonValue = serde_json::from_str(&value_text).unwrap_or(JsonValue::Null);
276            Ok(Some(PolicyEntry {
277                key,
278                value,
279                updated_at,
280            }))
281        } else {
282            Ok(None)
283        }
284    }
285}
286
287fn generate_instance_id() -> String {
288    let hostname = hostname::get()
289        .ok()
290        .and_then(|h| h.into_string().ok())
291        .unwrap_or_else(|| "unknown".to_string());
292    let uuid = uuid::Uuid::new_v7(uuid::Timestamp::now(uuid::NoContext));
293    format!("{}-{}", hostname, uuid)
294}
295
296fn expand_tilde(path: &Path) -> Result<PathBuf> {
297    let path_str = path.to_string_lossy();
298    if path_str == "~" {
299        let base = BaseDirs::new().context("base directories not available")?;
300        Ok(base.home_dir().to_path_buf())
301    } else if let Some(stripped) = path_str.strip_prefix("~/") {
302        let base = BaseDirs::new().context("base directories not available")?;
303        Ok(base.home_dir().join(stripped))
304    } else {
305        Ok(path.to_path_buf())
306    }
307}
308
309#[cfg(test)]
310mod tests {
311    use super::*;
312    use std::path::Path;
313
314    #[test]
315    fn expands_home_directory_prefix() {
316        let base = BaseDirs::new().expect("home directory available");
317        let expected = base.home_dir().join("demo.db");
318        let result = expand_tilde(Path::new("~/demo.db")).expect("path expansion succeeds");
319        assert_eq!(result, expected);
320    }
321
322    #[test]
323    fn leaves_regular_paths_unchanged() {
324        let input = Path::new("relative/path.db");
325        let result = expand_tilde(input).expect("path expansion succeeds");
326        assert_eq!(result, input);
327    }
328}
329
330fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
331    if a.is_empty() || b.is_empty() || a.len() != b.len() {
332        return 0.0;
333    }
334    let mut dot = 0.0f32;
335    let mut na = 0.0f32;
336    let mut nb = 0.0f32;
337    for i in 0..a.len() {
338        dot += a[i] * b[i];
339        na += a[i] * a[i];
340        nb += b[i] * b[i];
341    }
342    if na == 0.0 || nb == 0.0 {
343        return 0.0;
344    }
345    dot / (na.sqrt() * nb.sqrt())
346}
347
348// ========== Knowledge Graph Methods ==========
349
350// Type Conversion Helpers
351
352fn from_kg_node(node: spec_ai_knowledge_graph::GraphNode) -> GraphNode {
353    GraphNode {
354        id: node.id,
355        session_id: node.session_id,
356        node_type: node.node_type,
357        label: node.label,
358        properties: node.properties,
359        embedding_id: node.embedding_id,
360        created_at: node.created_at,
361        updated_at: node.updated_at,
362    }
363}
364
365fn from_kg_edge(edge: spec_ai_knowledge_graph::GraphEdge) -> GraphEdge {
366    GraphEdge {
367        id: edge.id,
368        session_id: edge.session_id,
369        source_id: edge.source_id,
370        target_id: edge.target_id,
371        edge_type: edge.edge_type,
372        predicate: edge.predicate,
373        properties: edge.properties,
374        weight: edge.weight,
375        temporal_start: edge.temporal_start,
376        temporal_end: edge.temporal_end,
377        created_at: edge.created_at,
378    }
379}
380
381fn from_kg_path(path: spec_ai_knowledge_graph::GraphPath) -> GraphPath {
382    GraphPath {
383        nodes: path.nodes.into_iter().map(from_kg_node).collect(),
384        edges: path.edges.into_iter().map(from_kg_edge).collect(),
385        length: path.length,
386        weight: path.weight,
387    }
388}
389
390impl Persistence {
391    // ---------- Graph Node Operations ----------
392
393    pub fn insert_graph_node(
394        &self,
395        session_id: &str,
396        node_type: spec_ai_knowledge_graph::NodeType,
397        label: &str,
398        properties: &JsonValue,
399        embedding_id: Option<i64>,
400    ) -> Result<i64> {
401        self.graph_store
402            .insert_graph_node(session_id, node_type, label, properties, embedding_id)
403    }
404
405    pub fn get_graph_node(&self, node_id: i64) -> Result<Option<GraphNode>> {
406        self.graph_store
407            .get_graph_node(node_id)
408            .map(|opt| opt.map(from_kg_node))
409    }
410
411    pub fn list_graph_nodes(
412        &self,
413        session_id: &str,
414        node_type: Option<spec_ai_knowledge_graph::NodeType>,
415        limit: Option<i64>,
416    ) -> Result<Vec<GraphNode>> {
417        self.graph_store
418            .list_graph_nodes(session_id, node_type, limit)
419            .map(|nodes| nodes.into_iter().map(from_kg_node).collect())
420    }
421
422    pub fn count_graph_nodes(&self, session_id: &str) -> Result<i64> {
423        self.graph_store.count_graph_nodes(session_id)
424    }
425
426    pub fn update_graph_node(&self, node_id: i64, properties: &JsonValue) -> Result<()> {
427        self.graph_store.update_graph_node(node_id, properties)
428    }
429
430    pub fn delete_graph_node(&self, node_id: i64) -> Result<()> {
431        self.graph_store.delete_graph_node(node_id)
432    }
433
434    // ---------- Graph Edge Operations ----------
435
436    pub fn insert_graph_edge(
437        &self,
438        session_id: &str,
439        source_id: i64,
440        target_id: i64,
441        edge_type: spec_ai_knowledge_graph::EdgeType,
442        predicate: Option<&str>,
443        properties: Option<&JsonValue>,
444        weight: f32,
445    ) -> Result<i64> {
446        self.graph_store.insert_graph_edge(
447            session_id, source_id, target_id, edge_type, predicate, properties, weight,
448        )
449    }
450
451    pub fn get_graph_edge(&self, edge_id: i64) -> Result<Option<GraphEdge>> {
452        self.graph_store
453            .get_graph_edge(edge_id)
454            .map(|opt| opt.map(from_kg_edge))
455    }
456
457    pub fn list_graph_edges(
458        &self,
459        session_id: &str,
460        source_id: Option<i64>,
461        target_id: Option<i64>,
462    ) -> Result<Vec<GraphEdge>> {
463        self.graph_store
464            .list_graph_edges(session_id, source_id, target_id)
465            .map(|edges| edges.into_iter().map(from_kg_edge).collect())
466    }
467
468    pub fn count_graph_edges(&self, session_id: &str) -> Result<i64> {
469        self.graph_store.count_graph_edges(session_id)
470    }
471
472    pub fn delete_graph_edge(&self, edge_id: i64) -> Result<()> {
473        self.graph_store.delete_graph_edge(edge_id)
474    }
475
476    // ---------- Graph Traversal Operations ----------
477
478    pub fn find_shortest_path(
479        &self,
480        session_id: &str,
481        source_id: i64,
482        target_id: i64,
483        max_hops: Option<usize>,
484    ) -> Result<Option<GraphPath>> {
485        self.graph_store
486            .find_shortest_path(session_id, source_id, target_id, max_hops)
487            .map(|opt| opt.map(from_kg_path))
488    }
489
490    pub fn traverse_neighbors(
491        &self,
492        session_id: &str,
493        node_id: i64,
494        direction: spec_ai_knowledge_graph::TraversalDirection,
495        depth: usize,
496    ) -> Result<Vec<GraphNode>> {
497        self.graph_store
498            .traverse_neighbors(session_id, node_id, direction, depth)
499            .map(|nodes| nodes.into_iter().map(from_kg_node).collect())
500    }
501
502    // ---------- Transcriptions ----------
503
504    pub fn insert_transcription(
505        &self,
506        session_id: &str,
507        chunk_id: i64,
508        text: &str,
509        timestamp: chrono::DateTime<Utc>,
510    ) -> Result<i64> {
511        let conn = self.conn();
512        let mut stmt = conn.prepare(
513            "INSERT INTO transcriptions (session_id, chunk_id, text, timestamp, embedding_id) VALUES (?, ?, ?, ?, NULL) RETURNING id",
514        )?;
515        let id: i64 = stmt.query_row(
516            params![session_id, chunk_id, text, timestamp.to_rfc3339()],
517            |row| row.get(0),
518        )?;
519        Ok(id)
520    }
521
522    pub fn update_transcription_embedding(
523        &self,
524        transcription_id: i64,
525        embedding_id: i64,
526    ) -> Result<()> {
527        let conn = self.conn();
528        conn.execute(
529            "UPDATE transcriptions SET embedding_id = ? WHERE id = ?",
530            params![embedding_id, transcription_id],
531        )?;
532        Ok(())
533    }
534
535    pub fn list_transcriptions(
536        &self,
537        session_id: &str,
538        limit: Option<i64>,
539    ) -> Result<Vec<(i64, i64, String, DateTime<Utc>)>> {
540        let conn = self.conn();
541        let query = if let Some(lim) = limit {
542            format!(
543                "SELECT id, chunk_id, text, CAST(timestamp AS TEXT) FROM transcriptions WHERE session_id = ? ORDER BY chunk_id ASC LIMIT {}",
544                lim
545            )
546        } else {
547            "SELECT id, chunk_id, text, CAST(timestamp AS TEXT) FROM transcriptions WHERE session_id = ? ORDER BY chunk_id ASC".to_string()
548        };
549
550        let mut stmt = conn.prepare(&query)?;
551        let mut rows = stmt.query(params![session_id])?;
552        let mut out = Vec::new();
553
554        while let Some(row) = rows.next()? {
555            let id: i64 = row.get(0)?;
556            let chunk_id: i64 = row.get(1)?;
557            let text: String = row.get(2)?;
558            let timestamp_str: String = row.get(3)?;
559            let timestamp: DateTime<Utc> = timestamp_str.parse().unwrap_or_else(|_| Utc::now());
560            out.push((id, chunk_id, text, timestamp));
561        }
562
563        Ok(out)
564    }
565
566    pub fn get_full_transcription(&self, session_id: &str) -> Result<String> {
567        let transcriptions = self.list_transcriptions(session_id, None)?;
568        Ok(transcriptions
569            .into_iter()
570            .map(|(_, _, text, _)| text)
571            .collect::<Vec<_>>()
572            .join(" "))
573    }
574
575    pub fn delete_transcriptions(&self, session_id: &str) -> Result<()> {
576        let conn = self.conn();
577        conn.execute(
578            "DELETE FROM transcriptions WHERE session_id = ?",
579            params![session_id],
580        )?;
581        Ok(())
582    }
583
584    pub fn get_transcription_by_embedding(&self, embedding_id: i64) -> Result<Option<String>> {
585        let conn = self.conn();
586        let mut stmt =
587            conn.prepare("SELECT text FROM transcriptions WHERE embedding_id = ? LIMIT 1")?;
588        let result: Result<String, _> = stmt.query_row(params![embedding_id], |row| row.get(0));
589        match result {
590            Ok(text) => Ok(Some(text)),
591            Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
592            Err(e) => Err(e.into()),
593        }
594    }
595
596    // ---------- Tokenized Files Cache ----------
597
598    /// Persist tokenization metadata for a file, replacing any existing entry for the path.
599    pub fn upsert_tokenized_file(
600        &self,
601        session_id: &str,
602        path: &str,
603        file_hash: &str,
604        raw_tokens: usize,
605        cleaned_tokens: usize,
606        bytes_captured: usize,
607        truncated: bool,
608        embedding_id: Option<i64>,
609    ) -> Result<i64> {
610        let conn = self.conn();
611        conn.execute(
612            "DELETE FROM tokenized_files WHERE session_id = ? AND path = ?",
613            params![session_id, path],
614        )?;
615        let mut stmt = conn.prepare("INSERT INTO tokenized_files (session_id, path, file_hash, raw_tokens, cleaned_tokens, bytes_captured, truncated, embedding_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id")?;
616        let id: i64 = stmt.query_row(
617            params![
618                session_id,
619                path,
620                file_hash,
621                raw_tokens as i64,
622                cleaned_tokens as i64,
623                bytes_captured as i64,
624                truncated,
625                embedding_id
626            ],
627            |row| row.get(0),
628        )?;
629        Ok(id)
630    }
631
632    pub fn get_tokenized_file(
633        &self,
634        session_id: &str,
635        path: &str,
636    ) -> Result<Option<TokenizedFileRecord>> {
637        let conn = self.conn();
638        let mut stmt = conn.prepare("SELECT id, session_id, path, file_hash, raw_tokens, cleaned_tokens, bytes_captured, truncated, embedding_id, CAST(updated_at AS TEXT) FROM tokenized_files WHERE session_id = ? AND path = ? LIMIT 1")?;
639        let mut rows = stmt.query(params![session_id, path])?;
640        if let Some(row) = rows.next()? {
641            let record = TokenizedFileRecord::from_row(row)?;
642            Ok(Some(record))
643        } else {
644            Ok(None)
645        }
646    }
647
648    pub fn list_tokenized_files(&self, session_id: &str) -> Result<Vec<TokenizedFileRecord>> {
649        let conn = self.conn();
650        let mut stmt = conn.prepare("SELECT id, session_id, path, file_hash, raw_tokens, cleaned_tokens, bytes_captured, truncated, embedding_id, CAST(updated_at AS TEXT) FROM tokenized_files WHERE session_id = ? ORDER BY path")?;
651        let mut rows = stmt.query(params![session_id])?;
652        let mut out = Vec::new();
653        while let Some(row) = rows.next()? {
654            out.push(TokenizedFileRecord::from_row(row)?);
655        }
656        Ok(out)
657    }
658
659    // ========== Mesh Message Persistence ==========
660
661    /// Store a mesh message in the database
662    pub fn mesh_message_store(
663        &self,
664        message_id: &str,
665        source_instance: &str,
666        target_instance: Option<&str>,
667        message_type: &str,
668        payload: &JsonValue,
669        status: &str,
670    ) -> Result<i64> {
671        let conn = self.conn();
672        let payload_json = serde_json::to_string(payload)?;
673        conn.execute(
674            "INSERT INTO mesh_messages (message_id, source_instance, target_instance, message_type, payload, status) VALUES (?, ?, ?, ?, ?, ?)",
675            params![message_id, source_instance, target_instance, message_type, payload_json, status],
676        )?;
677        // Get the last inserted ID
678        let id: i64 = conn.query_row("SELECT last_insert_rowid()", params![], |row| row.get(0))?;
679        Ok(id)
680    }
681
682    /// Check if a message with this ID already exists (for duplicate detection)
683    pub fn mesh_message_exists(&self, message_id: &str) -> Result<bool> {
684        let conn = self.conn();
685        let count: i64 = conn.query_row(
686            "SELECT COUNT(*) FROM mesh_messages WHERE message_id = ?",
687            params![message_id],
688            |row| row.get(0),
689        )?;
690        Ok(count > 0)
691    }
692
693    /// Update message status (e.g., delivered, failed)
694    pub fn mesh_message_update_status(&self, message_id: i64, status: &str) -> Result<()> {
695        let conn = self.conn();
696        conn.execute(
697            "UPDATE mesh_messages SET status = ?, delivered_at = CURRENT_TIMESTAMP WHERE id = ?",
698            params![status, message_id],
699        )?;
700        Ok(())
701    }
702
703    /// Get pending messages for a target instance
704    pub fn mesh_message_get_pending(
705        &self,
706        target_instance: &str,
707    ) -> Result<Vec<MeshMessageRecord>> {
708        let conn = self.conn();
709        let mut stmt = conn.prepare(
710            "SELECT id, source_instance, target_instance, message_type, payload, status, CAST(created_at AS TEXT), CAST(delivered_at AS TEXT)
711             FROM mesh_messages
712             WHERE (target_instance = ? OR target_instance IS NULL) AND status = 'pending'
713             ORDER BY created_at",
714        )?;
715        let mut rows = stmt.query(params![target_instance])?;
716        let mut out = Vec::new();
717        while let Some(row) = rows.next()? {
718            out.push(MeshMessageRecord::from_row(row)?);
719        }
720        Ok(out)
721    }
722
723    /// Get message history for analytics
724    pub fn mesh_message_get_history(
725        &self,
726        instance_id: Option<&str>,
727        limit: usize,
728    ) -> Result<Vec<MeshMessageRecord>> {
729        let conn = self.conn();
730        let query = if instance_id.is_some() {
731            format!(
732                "SELECT id, source_instance, target_instance, message_type, payload, status, CAST(created_at AS TEXT), CAST(delivered_at AS TEXT)
733                 FROM mesh_messages
734                 WHERE source_instance = ? OR target_instance = ?
735                 ORDER BY created_at DESC LIMIT {}",
736                limit
737            )
738        } else {
739            format!(
740                "SELECT id, source_instance, target_instance, message_type, payload, status, CAST(created_at AS TEXT), CAST(delivered_at AS TEXT)
741                 FROM mesh_messages
742                 ORDER BY created_at DESC LIMIT {}",
743                limit
744            )
745        };
746
747        let mut stmt = conn.prepare(&query)?;
748        let mut rows = if let Some(inst) = instance_id {
749            stmt.query(params![inst, inst])?
750        } else {
751            stmt.query(params![])?
752        };
753
754        let mut out = Vec::new();
755        while let Some(row) = rows.next()? {
756            out.push(MeshMessageRecord::from_row(row)?);
757        }
758        Ok(out)
759    }
760
761    // ===== Graph Synchronization Methods =====
762
763    /// Append an entry to the graph changelog
764    pub fn graph_changelog_append(
765        &self,
766        session_id: &str,
767        instance_id: &str,
768        entity_type: &str,
769        entity_id: i64,
770        operation: &str,
771        vector_clock: &str,
772        data: Option<&str>,
773    ) -> Result<i64> {
774        self.graph_store.graph_changelog_append(
775            session_id,
776            instance_id,
777            entity_type,
778            entity_id,
779            operation,
780            vector_clock,
781            data,
782        )
783    }
784
785    /// Get changelog entries since a given timestamp for a session
786    pub fn graph_changelog_get_since(
787        &self,
788        session_id: &str,
789        since_timestamp: &str,
790    ) -> Result<Vec<ChangelogEntry>> {
791        self.graph_store
792            .graph_changelog_get_since(session_id, since_timestamp)
793            .map(|entries| {
794                entries
795                    .into_iter()
796                    .map(|e| ChangelogEntry {
797                        id: e.id,
798                        session_id: e.session_id,
799                        instance_id: e.instance_id,
800                        entity_type: e.entity_type,
801                        entity_id: e.entity_id,
802                        operation: e.operation,
803                        vector_clock: e.vector_clock,
804                        data: e.data,
805                        created_at: e.created_at,
806                    })
807                    .collect()
808            })
809    }
810
811    /// List conflict entries stored in the changelog
812    pub fn graph_list_conflicts(&self, session_id: Option<&str>) -> Result<Vec<ChangelogEntry>> {
813        self.graph_store
814            .graph_list_conflicts(session_id, 100)
815            .map(|entries| {
816                entries
817                    .into_iter()
818                    .map(|e| ChangelogEntry {
819                        id: e.id,
820                        session_id: e.session_id,
821                        instance_id: e.instance_id,
822                        entity_type: e.entity_type,
823                        entity_id: e.entity_id,
824                        operation: e.operation,
825                        vector_clock: e.vector_clock,
826                        data: e.data,
827                        created_at: e.created_at,
828                    })
829                    .collect()
830            })
831    }
832
833    /// Prune old changelog entries (keep last N days)
834    pub fn graph_changelog_prune(&self, days_to_keep: i64) -> Result<usize> {
835        self.graph_store.graph_changelog_prune(days_to_keep)
836    }
837
838    /// Get the vector clock for an instance/session/graph combination
839    pub fn graph_sync_state_get(
840        &self,
841        instance_id: &str,
842        session_id: &str,
843        graph_name: &str,
844    ) -> Result<Option<String>> {
845        self.graph_store
846            .graph_sync_state_get(instance_id, session_id, graph_name)
847    }
848
849    /// Get sync state metadata including last_sync_at
850    pub fn graph_sync_state_get_metadata(
851        &self,
852        instance_id: &str,
853        session_id: &str,
854        graph_name: &str,
855    ) -> Result<Option<SyncStateRecord>> {
856        self.graph_store
857            .graph_sync_state_get_metadata(instance_id, session_id, graph_name)
858            .map(|opt| {
859                opt.map(|r| SyncStateRecord {
860                    vector_clock: r.vector_clock,
861                    last_sync_at: r.last_sync_at,
862                })
863            })
864    }
865
866    /// Update the vector clock for an instance/session/graph combination
867    pub fn graph_sync_state_update(
868        &self,
869        instance_id: &str,
870        session_id: &str,
871        graph_name: &str,
872        vector_clock: &str,
873    ) -> Result<()> {
874        self.graph_store
875            .graph_sync_state_update(instance_id, session_id, graph_name, vector_clock)
876    }
877
878    /// Persist sync configuration for a graph
879    pub fn graph_set_sync_config(
880        &self,
881        session_id: &str,
882        graph_name: &str,
883        sync_enabled: bool,
884        conflict_resolution_strategy: Option<&str>,
885        sync_interval_seconds: Option<u64>,
886    ) -> Result<GraphSyncConfig> {
887        self.graph_store
888            .graph_set_sync_config(
889                session_id,
890                graph_name,
891                sync_enabled,
892                conflict_resolution_strategy,
893                sync_interval_seconds,
894            )
895            .map(|cfg| GraphSyncConfig {
896                sync_enabled: cfg.sync_enabled,
897                conflict_resolution_strategy: cfg.conflict_resolution_strategy,
898                sync_interval_seconds: cfg.sync_interval_seconds,
899            })
900    }
901
902    /// Retrieve sync configuration for a graph
903    pub fn graph_get_sync_config(
904        &self,
905        session_id: &str,
906        graph_name: &str,
907    ) -> Result<GraphSyncConfig> {
908        self.graph_store
909            .graph_get_sync_config(session_id, graph_name)
910            .map(|cfg| GraphSyncConfig {
911                sync_enabled: cfg.sync_enabled,
912                conflict_resolution_strategy: cfg.conflict_resolution_strategy,
913                sync_interval_seconds: cfg.sync_interval_seconds,
914            })
915    }
916
917    /// Enable or disable sync for a graph
918    pub fn graph_set_sync_enabled(
919        &self,
920        session_id: &str,
921        graph_name: &str,
922        enabled: bool,
923    ) -> Result<()> {
924        self.graph_store
925            .graph_set_sync_enabled(session_id, graph_name, enabled)
926    }
927
928    /// Check if sync is enabled for a graph
929    pub fn graph_get_sync_enabled(&self, session_id: &str, graph_name: &str) -> Result<bool> {
930        self.graph_store
931            .graph_get_sync_enabled(session_id, graph_name)
932    }
933
934    /// List all graphs for a session
935    pub fn graph_list(&self, session_id: &str) -> Result<Vec<String>> {
936        self.graph_store.graph_list(session_id)
937    }
938
939    /// List all sync-enabled graphs across all sessions
940    pub fn graph_list_sync_enabled(&self) -> Result<Vec<(String, String)>> {
941        self.graph_store.graph_list_sync_enabled()
942    }
943
944    /// Get a node with its sync metadata
945    pub fn graph_get_node_with_sync(&self, node_id: i64) -> Result<Option<SyncedNodeRecord>> {
946        self.graph_store
947            .graph_get_node_with_sync(node_id)
948            .map(|opt| {
949                opt.map(|r| SyncedNodeRecord {
950                    id: r.id,
951                    session_id: r.session_id,
952                    node_type: r.node_type,
953                    label: r.label,
954                    properties: r.properties,
955                    embedding_id: r.embedding_id,
956                    created_at: r.created_at,
957                    updated_at: r.updated_at,
958                    vector_clock: r.vector_clock,
959                    last_modified_by: r.last_modified_by,
960                    is_deleted: r.is_deleted,
961                    sync_enabled: r.sync_enabled,
962                })
963            })
964    }
965
966    /// Get all synced nodes for a session with optional filters
967    pub fn graph_list_nodes_with_sync(
968        &self,
969        session_id: &str,
970        sync_enabled_only: bool,
971        include_deleted: bool,
972    ) -> Result<Vec<SyncedNodeRecord>> {
973        self.graph_store
974            .graph_list_nodes_with_sync(session_id, sync_enabled_only, include_deleted)
975            .map(|nodes| {
976                nodes
977                    .into_iter()
978                    .map(|r| SyncedNodeRecord {
979                        id: r.id,
980                        session_id: r.session_id,
981                        node_type: r.node_type,
982                        label: r.label,
983                        properties: r.properties,
984                        embedding_id: r.embedding_id,
985                        created_at: r.created_at,
986                        updated_at: r.updated_at,
987                        vector_clock: r.vector_clock,
988                        last_modified_by: r.last_modified_by,
989                        is_deleted: r.is_deleted,
990                        sync_enabled: r.sync_enabled,
991                    })
992                    .collect()
993            })
994    }
995
996    /// Get an edge with its sync metadata
997    pub fn graph_get_edge_with_sync(&self, edge_id: i64) -> Result<Option<SyncedEdgeRecord>> {
998        self.graph_store
999            .graph_get_edge_with_sync(edge_id)
1000            .map(|opt| {
1001                opt.map(|r| SyncedEdgeRecord {
1002                    id: r.id,
1003                    session_id: r.session_id,
1004                    source_id: r.source_id,
1005                    target_id: r.target_id,
1006                    edge_type: r.edge_type,
1007                    predicate: r.predicate,
1008                    properties: r.properties,
1009                    weight: r.weight,
1010                    temporal_start: r.temporal_start,
1011                    temporal_end: r.temporal_end,
1012                    created_at: r.created_at,
1013                    vector_clock: r.vector_clock,
1014                    last_modified_by: r.last_modified_by,
1015                    is_deleted: r.is_deleted,
1016                    sync_enabled: r.sync_enabled,
1017                })
1018            })
1019    }
1020
1021    /// Get all synced edges for a session with optional filters
1022    pub fn graph_list_edges_with_sync(
1023        &self,
1024        session_id: &str,
1025        sync_enabled_only: bool,
1026        include_deleted: bool,
1027    ) -> Result<Vec<SyncedEdgeRecord>> {
1028        self.graph_store
1029            .graph_list_edges_with_sync(session_id, sync_enabled_only, include_deleted)
1030            .map(|edges| {
1031                edges
1032                    .into_iter()
1033                    .map(|r| SyncedEdgeRecord {
1034                        id: r.id,
1035                        session_id: r.session_id,
1036                        source_id: r.source_id,
1037                        target_id: r.target_id,
1038                        edge_type: r.edge_type,
1039                        predicate: r.predicate,
1040                        properties: r.properties,
1041                        weight: r.weight,
1042                        temporal_start: r.temporal_start,
1043                        temporal_end: r.temporal_end,
1044                        created_at: r.created_at,
1045                        vector_clock: r.vector_clock,
1046                        last_modified_by: r.last_modified_by,
1047                        is_deleted: r.is_deleted,
1048                        sync_enabled: r.sync_enabled,
1049                    })
1050                    .collect()
1051            })
1052    }
1053
1054    /// Update a node's sync metadata
1055    pub fn graph_update_node_sync_metadata(
1056        &self,
1057        node_id: i64,
1058        vector_clock: &str,
1059        last_modified_by: &str,
1060        sync_enabled: bool,
1061    ) -> Result<()> {
1062        self.graph_store.graph_update_node_sync_metadata(
1063            node_id,
1064            vector_clock,
1065            last_modified_by,
1066            sync_enabled,
1067        )
1068    }
1069
1070    /// Update an edge's sync metadata
1071    pub fn graph_update_edge_sync_metadata(
1072        &self,
1073        edge_id: i64,
1074        vector_clock: &str,
1075        last_modified_by: &str,
1076        sync_enabled: bool,
1077    ) -> Result<()> {
1078        self.graph_store.graph_update_edge_sync_metadata(
1079            edge_id,
1080            vector_clock,
1081            last_modified_by,
1082            sync_enabled,
1083        )
1084    }
1085
1086    /// Mark a node as deleted (tombstone pattern)
1087    pub fn graph_mark_node_deleted(
1088        &self,
1089        node_id: i64,
1090        vector_clock: &str,
1091        deleted_by: &str,
1092    ) -> Result<()> {
1093        self.graph_store
1094            .graph_mark_node_deleted(node_id, vector_clock, deleted_by)
1095    }
1096
1097    /// Mark an edge as deleted (tombstone pattern)
1098    pub fn graph_mark_edge_deleted(
1099        &self,
1100        edge_id: i64,
1101        vector_clock: &str,
1102        deleted_by: &str,
1103    ) -> Result<()> {
1104        self.graph_store
1105            .graph_mark_edge_deleted(edge_id, vector_clock, deleted_by)
1106    }
1107}
1108
1109#[derive(Debug, Clone)]
1110pub struct TokenizedFileRecord {
1111    pub id: i64,
1112    pub session_id: String,
1113    pub path: String,
1114    pub file_hash: String,
1115    pub raw_tokens: usize,
1116    pub cleaned_tokens: usize,
1117    pub bytes_captured: usize,
1118    pub truncated: bool,
1119    pub embedding_id: Option<i64>,
1120    pub updated_at: DateTime<Utc>,
1121}
1122
1123impl TokenizedFileRecord {
1124    fn from_row(row: &duckdb::Row) -> Result<Self> {
1125        let id: i64 = row.get(0)?;
1126        let session_id: String = row.get(1)?;
1127        let path: String = row.get(2)?;
1128        let file_hash: String = row.get(3)?;
1129        let raw_tokens: i64 = row.get(4)?;
1130        let cleaned_tokens: i64 = row.get(5)?;
1131        let bytes_captured: i64 = row.get(6)?;
1132        let truncated: bool = row.get(7)?;
1133        let embedding_id: Option<i64> = row.get(8)?;
1134        let updated_at: String = row.get(9)?;
1135
1136        Ok(Self {
1137            id,
1138            session_id,
1139            path,
1140            file_hash,
1141            raw_tokens: raw_tokens.max(0) as usize,
1142            cleaned_tokens: cleaned_tokens.max(0) as usize,
1143            bytes_captured: bytes_captured.max(0) as usize,
1144            truncated,
1145            embedding_id,
1146            updated_at: updated_at.parse().unwrap_or_else(|_| Utc::now()),
1147        })
1148    }
1149}
1150
1151#[derive(Debug, Clone)]
1152pub struct MeshMessageRecord {
1153    pub id: i64,
1154    pub source_instance: String,
1155    pub target_instance: Option<String>,
1156    pub message_type: String,
1157    pub payload: JsonValue,
1158    pub status: String,
1159    pub created_at: DateTime<Utc>,
1160    pub delivered_at: Option<DateTime<Utc>>,
1161}
1162
1163impl MeshMessageRecord {
1164    fn from_row(row: &duckdb::Row) -> Result<Self> {
1165        let id: i64 = row.get(0)?;
1166        let source_instance: String = row.get(1)?;
1167        let target_instance: Option<String> = row.get(2)?;
1168        let message_type: String = row.get(3)?;
1169        let payload_str: String = row.get(4)?;
1170        let payload: JsonValue = serde_json::from_str(&payload_str)?;
1171        let status: String = row.get(5)?;
1172        let created_at_str: String = row.get(6)?;
1173        let delivered_at_str: Option<String> = row.get(7)?;
1174
1175        Ok(MeshMessageRecord {
1176            id,
1177            source_instance,
1178            target_instance,
1179            message_type,
1180            payload,
1181            status,
1182            created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1183            delivered_at: delivered_at_str.and_then(|s| s.parse().ok()),
1184        })
1185    }
1186}
1187
1188// ===== Graph Sync Record Types =====
1189
1190#[derive(Debug, Clone)]
1191pub struct SyncStateRecord {
1192    pub vector_clock: String,
1193    pub last_sync_at: Option<String>,
1194}
1195
1196#[derive(Debug, Clone, Default)]
1197pub struct GraphSyncConfig {
1198    pub sync_enabled: bool,
1199    pub conflict_resolution_strategy: Option<String>,
1200    pub sync_interval_seconds: Option<u64>,
1201}
1202
1203#[derive(Debug, Clone)]
1204pub struct ChangelogEntry {
1205    pub id: i64,
1206    pub session_id: String,
1207    pub instance_id: String,
1208    pub entity_type: String,
1209    pub entity_id: i64,
1210    pub operation: String,
1211    pub vector_clock: String,
1212    pub data: Option<String>,
1213    pub created_at: DateTime<Utc>,
1214}
1215
1216impl ChangelogEntry {
1217    fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1218        let id: i64 = row.get(0)?;
1219        let session_id: String = row.get(1)?;
1220        let instance_id: String = row.get(2)?;
1221        let entity_type: String = row.get(3)?;
1222        let entity_id: i64 = row.get(4)?;
1223        let operation: String = row.get(5)?;
1224        let vector_clock: String = row.get(6)?;
1225        let data: Option<String> = row.get(7)?;
1226        let created_at_str: String = row.get(8)?;
1227
1228        Ok(ChangelogEntry {
1229            id,
1230            session_id,
1231            instance_id,
1232            entity_type,
1233            entity_id,
1234            operation,
1235            vector_clock,
1236            data,
1237            created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1238        })
1239    }
1240}
1241
1242#[derive(Debug, Clone)]
1243pub struct SyncedNodeRecord {
1244    pub id: i64,
1245    pub session_id: String,
1246    pub node_type: String,
1247    pub label: String,
1248    pub properties: serde_json::Value,
1249    pub embedding_id: Option<i64>,
1250    pub created_at: DateTime<Utc>,
1251    pub updated_at: DateTime<Utc>,
1252    pub vector_clock: String,
1253    pub last_modified_by: Option<String>,
1254    pub is_deleted: bool,
1255    pub sync_enabled: bool,
1256}
1257
1258impl SyncedNodeRecord {
1259    fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1260        let id: i64 = row.get(0)?;
1261        let session_id: String = row.get(1)?;
1262        let node_type: String = row.get(2)?;
1263        let label: String = row.get(3)?;
1264        let properties_str: String = row.get(4)?;
1265        let properties: serde_json::Value = serde_json::from_str(&properties_str).map_err(|e| {
1266            duckdb::Error::FromSqlConversionFailure(4, duckdb::types::Type::Text, Box::new(e))
1267        })?;
1268        let embedding_id: Option<i64> = row.get(5)?;
1269        let created_at_str: String = row.get(6)?;
1270        let updated_at_str: String = row.get(7)?;
1271        let vector_clock: String = row.get(8)?;
1272        let last_modified_by: Option<String> = row.get(9)?;
1273        let is_deleted: bool = row.get(10)?;
1274        let sync_enabled: bool = row.get(11)?;
1275
1276        Ok(SyncedNodeRecord {
1277            id,
1278            session_id,
1279            node_type,
1280            label,
1281            properties,
1282            embedding_id,
1283            created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1284            updated_at: updated_at_str.parse().unwrap_or_else(|_| Utc::now()),
1285            vector_clock,
1286            last_modified_by,
1287            is_deleted,
1288            sync_enabled,
1289        })
1290    }
1291}
1292
1293#[derive(Debug, Clone)]
1294pub struct SyncedEdgeRecord {
1295    pub id: i64,
1296    pub session_id: String,
1297    pub source_id: i64,
1298    pub target_id: i64,
1299    pub edge_type: String,
1300    pub predicate: Option<String>,
1301    pub properties: Option<serde_json::Value>,
1302    pub weight: f32,
1303    pub temporal_start: Option<DateTime<Utc>>,
1304    pub temporal_end: Option<DateTime<Utc>>,
1305    pub created_at: DateTime<Utc>,
1306    pub vector_clock: String,
1307    pub last_modified_by: Option<String>,
1308    pub is_deleted: bool,
1309    pub sync_enabled: bool,
1310}
1311
1312impl SyncedEdgeRecord {
1313    fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1314        let id: i64 = row.get(0)?;
1315        let session_id: String = row.get(1)?;
1316        let source_id: i64 = row.get(2)?;
1317        let target_id: i64 = row.get(3)?;
1318        let edge_type: String = row.get(4)?;
1319        let predicate: Option<String> = row.get(5)?;
1320        let properties_str: Option<String> = row.get(6)?;
1321        let properties: Option<serde_json::Value> = properties_str
1322            .as_ref()
1323            .and_then(|s| serde_json::from_str(s).ok());
1324        let weight: f32 = row.get(7)?;
1325        let temporal_start_str: Option<String> = row.get(8)?;
1326        let temporal_end_str: Option<String> = row.get(9)?;
1327        let created_at_str: String = row.get(10)?;
1328        let vector_clock: String = row.get(11)?;
1329        let last_modified_by: Option<String> = row.get(12)?;
1330        let is_deleted: bool = row.get(13)?;
1331        let sync_enabled: bool = row.get(14)?;
1332
1333        Ok(SyncedEdgeRecord {
1334            id,
1335            session_id,
1336            source_id,
1337            target_id,
1338            edge_type,
1339            predicate,
1340            properties,
1341            weight,
1342            temporal_start: temporal_start_str.and_then(|s| s.parse().ok()),
1343            temporal_end: temporal_end_str.and_then(|s| s.parse().ok()),
1344            created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1345            vector_clock,
1346            last_modified_by,
1347            is_deleted,
1348            sync_enabled,
1349        })
1350    }
1351}