Skip to main content

spec_ai/spec_ai_config/persistence/
mod.rs

1pub mod migrations;
2
3use anyhow::{Context, Result};
4use chrono::{DateTime, Utc};
5use directories::BaseDirs;
6use duckdb::{Connection, params};
7use serde_json::Value as JsonValue;
8use crate::spec_ai_knowledge_graph::KnowledgeGraphStore;
9use std::path::{Path, PathBuf};
10use std::sync::{Arc, Mutex};
11
12use crate::spec_ai_config::types::{
13    GraphEdge, GraphNode, GraphPath, MemoryVector, Message, MessageRole, PolicyEntry,
14};
15
16#[derive(Clone)]
17pub struct Persistence {
18    conn: Arc<Mutex<Connection>>,
19    instance_id: String,
20    graph_store: KnowledgeGraphStore,
21}
22
23impl Persistence {
24    /// Create or open the database at the provided path and run migrations.
25    pub fn new<P: AsRef<Path>>(db_path: P) -> Result<Self> {
26        Self::with_instance_id(db_path, generate_instance_id())
27    }
28
29    /// Create with a specific instance_id
30    pub fn with_instance_id<P: AsRef<Path>>(db_path: P, instance_id: String) -> Result<Self> {
31        let db_path = expand_tilde(db_path.as_ref())?;
32        if let Some(dir) = db_path.parent() {
33            std::fs::create_dir_all(dir).context("creating DB directory")?;
34        }
35        let conn = Connection::open(&db_path).context("opening DuckDB")?;
36        migrations::run(&conn).context("running migrations")?;
37        let conn_arc = Arc::new(Mutex::new(conn));
38        let graph_store = KnowledgeGraphStore::new(conn_arc.clone(), instance_id.clone());
39        Ok(Self {
40            conn: conn_arc,
41            instance_id,
42            graph_store,
43        })
44    }
45
46    /// Get the instance ID for this persistence instance
47    pub fn instance_id(&self) -> &str {
48        &self.instance_id
49    }
50
51    /// Get direct access to the KnowledgeGraphStore for Phase 2+ consumer migration
52    pub fn graph_store(&self) -> &KnowledgeGraphStore {
53        &self.graph_store
54    }
55
56    /// Checkpoint the database to ensure all WAL data is written to the main database file.
57    /// Call this before shutdown to ensure clean database state.
58    pub fn checkpoint(&self) -> Result<()> {
59        let conn = self.conn();
60        conn.execute_batch("CHECKPOINT;")
61            .context("checkpointing database")
62    }
63
64    /// Creates or opens the default database at ~/.spec-ai/agent_data.duckdb
65    pub fn new_default() -> Result<Self> {
66        let base = BaseDirs::new().context("base directories not available")?;
67        let path = base.home_dir().join(".agent_cli").join("agent_data.duckdb");
68        Self::new(path)
69    }
70
71    /// Get access to the pooled database connection.
72    /// Returns a MutexGuard that provides exclusive access to the connection.
73    pub fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
74        self.conn
75            .lock()
76            .expect("database connection mutex poisoned")
77    }
78
79    // ---------- Messages ----------
80
81    pub fn insert_message(
82        &self,
83        session_id: &str,
84        role: MessageRole,
85        content: &str,
86    ) -> Result<i64> {
87        let conn = self.conn();
88        let mut stmt = conn.prepare(
89            "INSERT INTO messages (session_id, role, content) VALUES (?, ?, ?) RETURNING id",
90        )?;
91        let id: i64 = stmt.query_row(params![session_id, role.as_str(), content], |row| {
92            row.get(0)
93        })?;
94        Ok(id)
95    }
96
97    pub fn list_messages(&self, session_id: &str, limit: i64) -> Result<Vec<Message>> {
98        let conn = self.conn();
99        let mut stmt = conn.prepare("SELECT id, session_id, role, content, CAST(created_at AS TEXT) as created_at FROM messages WHERE session_id = ? ORDER BY id DESC LIMIT ?")?;
100        let mut rows = stmt.query(params![session_id, limit])?;
101        let mut out = Vec::new();
102        while let Some(row) = rows.next()? {
103            let id: i64 = row.get(0)?;
104            let sid: String = row.get(1)?;
105            let role: String = row.get(2)?;
106            let content: String = row.get(3)?;
107            let created_at: String = row.get(4)?; // DuckDB returns TIMESTAMP as string
108            let created_at: DateTime<Utc> = created_at.parse().unwrap_or_else(|_| Utc::now());
109            out.push(Message {
110                id,
111                session_id: sid,
112                role: MessageRole::from_str(&role),
113                content,
114                created_at,
115            });
116        }
117        out.reverse();
118        Ok(out)
119    }
120
121    pub fn get_message(&self, message_id: i64) -> Result<Option<Message>> {
122        let conn = self.conn();
123        let mut stmt = conn.prepare("SELECT id, session_id, role, content, CAST(created_at AS TEXT) as created_at FROM messages WHERE id = ?")?;
124        let mut rows = stmt.query(params![message_id])?;
125        if let Some(row) = rows.next()? {
126            let id: i64 = row.get(0)?;
127            let sid: String = row.get(1)?;
128            let role: String = row.get(2)?;
129            let content: String = row.get(3)?;
130            let created_at: String = row.get(4)?;
131            let created_at: DateTime<Utc> = created_at.parse().unwrap_or_else(|_| Utc::now());
132            Ok(Some(Message {
133                id,
134                session_id: sid,
135                role: MessageRole::from_str(&role),
136                content,
137                created_at,
138            }))
139        } else {
140            Ok(None)
141        }
142    }
143
144    /// Simple pruning by keeping only the most recent `keep_latest` messages.
145    pub fn prune_messages(&self, session_id: &str, keep_latest: i64) -> Result<u64> {
146        let conn = self.conn();
147        let mut stmt = conn.prepare("DELETE FROM messages WHERE session_id = ? AND id NOT IN (SELECT id FROM messages WHERE session_id = ? ORDER BY id DESC LIMIT ?)")?;
148        let changed = stmt.execute(params![session_id, session_id, keep_latest])? as u64;
149        Ok(changed)
150    }
151
152    // ---------- Memory Vectors ----------
153
154    pub fn insert_memory_vector(
155        &self,
156        session_id: &str,
157        message_id: Option<i64>,
158        embedding: &[f32],
159    ) -> Result<i64> {
160        let conn = self.conn();
161        let embedding_json = serde_json::to_string(embedding)?;
162        let mut stmt = conn.prepare("INSERT INTO memory_vectors (session_id, message_id, embedding) VALUES (?, ?, ?) RETURNING id")?;
163        let id: i64 = stmt.query_row(params![session_id, message_id, embedding_json], |row| {
164            row.get(0)
165        })?;
166        Ok(id)
167    }
168
169    pub fn recall_top_k(
170        &self,
171        session_id: &str,
172        query_embedding: &[f32],
173        k: usize,
174    ) -> Result<Vec<(MemoryVector, f32)>> {
175        let conn = self.conn();
176        let mut stmt = conn.prepare("SELECT id, session_id, message_id, embedding, CAST(created_at AS TEXT) as created_at FROM memory_vectors WHERE session_id = ?")?;
177        let mut rows = stmt.query(params![session_id])?;
178        let mut scored: Vec<(MemoryVector, f32)> = Vec::new();
179        while let Some(row) = rows.next()? {
180            let id: i64 = row.get(0)?;
181            let sid: String = row.get(1)?;
182            let message_id: Option<i64> = row.get(2)?;
183            let embedding_text: String = row.get(3)?;
184            let created_at: String = row.get(4)?;
185            let created_at: DateTime<Utc> = created_at.parse().unwrap_or_else(|_| Utc::now());
186            let embedding: Vec<f32> = serde_json::from_str(&embedding_text).unwrap_or_default();
187            let score = cosine_similarity(query_embedding, &embedding);
188            scored.push((
189                MemoryVector {
190                    id,
191                    session_id: sid,
192                    message_id,
193                    embedding,
194                    created_at,
195                },
196                score,
197            ));
198        }
199        scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
200        scored.truncate(k);
201        Ok(scored)
202    }
203
204    /// List known session IDs ordered by most recent activity
205    pub fn list_sessions(&self) -> Result<Vec<String>> {
206        let conn = self.conn();
207        let mut stmt = conn.prepare(
208            "SELECT session_id, MAX(created_at) as last FROM messages GROUP BY session_id ORDER BY last DESC"
209        )?;
210        let mut rows = stmt.query([])?;
211        let mut out = Vec::new();
212        while let Some(row) = rows.next()? {
213            let sid: String = row.get(0)?;
214            out.push(sid);
215        }
216        Ok(out)
217    }
218
219    // ---------- Tool Log ----------
220
221    #[allow(clippy::too_many_arguments)]
222    pub fn log_tool(
223        &self,
224        session_id: &str,
225        agent_name: &str,
226        run_id: &str,
227        tool_name: &str,
228        arguments: &JsonValue,
229        result: &JsonValue,
230        success: bool,
231        error: Option<&str>,
232    ) -> Result<i64> {
233        let conn = self.conn();
234        let mut stmt = conn.prepare("INSERT INTO tool_log (session_id, agent, run_id, tool_name, arguments, result, success, error) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id")?;
235        let id: i64 = stmt.query_row(
236            params![
237                session_id,
238                agent_name,
239                run_id,
240                tool_name,
241                arguments.to_string(),
242                result.to_string(),
243                success,
244                error.unwrap_or("")
245            ],
246            |row| row.get(0),
247        )?;
248        Ok(id)
249    }
250
251    // ---------- Policy Cache ----------
252
253    pub fn policy_upsert(&self, key: &str, value: &JsonValue) -> Result<()> {
254        let conn = self.conn();
255        // DuckDB upsert workaround: delete then insert atomically within a transaction.
256        conn.execute_batch("BEGIN TRANSACTION;")?;
257        {
258            let mut del = conn.prepare("DELETE FROM policy_cache WHERE key = ?")?;
259            let _ = del.execute(params![key])?;
260            let mut ins = conn.prepare("INSERT INTO policy_cache (key, value, updated_at) VALUES (?, ?, CURRENT_TIMESTAMP)")?;
261            let _ = ins.execute(params![key, value.to_string()])?;
262        }
263        conn.execute_batch("COMMIT;")?;
264        Ok(())
265    }
266
267    pub fn policy_get(&self, key: &str) -> Result<Option<PolicyEntry>> {
268        let conn = self.conn();
269        let mut stmt = conn.prepare("SELECT key, value, CAST(updated_at AS TEXT) as updated_at FROM policy_cache WHERE key = ?")?;
270        let mut rows = stmt.query(params![key])?;
271        if let Some(row) = rows.next()? {
272            let key: String = row.get(0)?;
273            let value_text: String = row.get(1)?;
274            let updated_at: String = row.get(2)?;
275            let updated_at: DateTime<Utc> = updated_at.parse().unwrap_or_else(|_| Utc::now());
276            let value: JsonValue = serde_json::from_str(&value_text).unwrap_or(JsonValue::Null);
277            Ok(Some(PolicyEntry {
278                key,
279                value,
280                updated_at,
281            }))
282        } else {
283            Ok(None)
284        }
285    }
286}
287
288fn generate_instance_id() -> String {
289    let hostname = hostname::get()
290        .ok()
291        .and_then(|h| h.into_string().ok())
292        .unwrap_or_else(|| "unknown".to_string());
293    let uuid = uuid::Uuid::new_v7(uuid::Timestamp::now(uuid::NoContext));
294    format!("{}-{}", hostname, uuid)
295}
296
297fn expand_tilde(path: &Path) -> Result<PathBuf> {
298    let path_str = path.to_string_lossy();
299    if path_str == "~" {
300        let base = BaseDirs::new().context("base directories not available")?;
301        Ok(base.home_dir().to_path_buf())
302    } else if let Some(stripped) = path_str.strip_prefix("~/") {
303        let base = BaseDirs::new().context("base directories not available")?;
304        Ok(base.home_dir().join(stripped))
305    } else {
306        Ok(path.to_path_buf())
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313    use std::path::Path;
314
315    #[test]
316    fn expands_home_directory_prefix() {
317        let base = BaseDirs::new().expect("home directory available");
318        let expected = base.home_dir().join("demo.db");
319        let result = expand_tilde(Path::new("~/demo.db")).expect("path expansion succeeds");
320        assert_eq!(result, expected);
321    }
322
323    #[test]
324    fn leaves_regular_paths_unchanged() {
325        let input = Path::new("relative/path.db");
326        let result = expand_tilde(input).expect("path expansion succeeds");
327        assert_eq!(result, input);
328    }
329}
330
331fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
332    if a.is_empty() || b.is_empty() || a.len() != b.len() {
333        return 0.0;
334    }
335    let mut dot = 0.0f32;
336    let mut na = 0.0f32;
337    let mut nb = 0.0f32;
338    for i in 0..a.len() {
339        dot += a[i] * b[i];
340        na += a[i] * a[i];
341        nb += b[i] * b[i];
342    }
343    if na == 0.0 || nb == 0.0 {
344        return 0.0;
345    }
346    dot / (na.sqrt() * nb.sqrt())
347}
348
349// ========== Knowledge Graph Methods ==========
350
351// Type Conversion Helpers
352
353fn from_kg_node(node: crate::spec_ai_knowledge_graph::GraphNode) -> GraphNode {
354    GraphNode {
355        id: node.id,
356        session_id: node.session_id,
357        node_type: node.node_type,
358        label: node.label,
359        properties: node.properties,
360        embedding_id: node.embedding_id,
361        created_at: node.created_at,
362        updated_at: node.updated_at,
363    }
364}
365
366fn from_kg_edge(edge: crate::spec_ai_knowledge_graph::GraphEdge) -> GraphEdge {
367    GraphEdge {
368        id: edge.id,
369        session_id: edge.session_id,
370        source_id: edge.source_id,
371        target_id: edge.target_id,
372        edge_type: edge.edge_type,
373        predicate: edge.predicate,
374        properties: edge.properties,
375        weight: edge.weight,
376        temporal_start: edge.temporal_start,
377        temporal_end: edge.temporal_end,
378        created_at: edge.created_at,
379    }
380}
381
382fn from_kg_path(path: crate::spec_ai_knowledge_graph::GraphPath) -> GraphPath {
383    GraphPath {
384        nodes: path.nodes.into_iter().map(from_kg_node).collect(),
385        edges: path.edges.into_iter().map(from_kg_edge).collect(),
386        length: path.length,
387        weight: path.weight,
388    }
389}
390
391impl Persistence {
392    // ---------- Graph Node Operations ----------
393
394    pub fn insert_graph_node(
395        &self,
396        session_id: &str,
397        node_type: crate::spec_ai_knowledge_graph::NodeType,
398        label: &str,
399        properties: &JsonValue,
400        embedding_id: Option<i64>,
401    ) -> Result<i64> {
402        self.graph_store
403            .insert_graph_node(session_id, node_type, label, properties, embedding_id)
404    }
405
406    pub fn get_graph_node(&self, node_id: i64) -> Result<Option<GraphNode>> {
407        self.graph_store
408            .get_graph_node(node_id)
409            .map(|opt| opt.map(from_kg_node))
410    }
411
412    pub fn list_graph_nodes(
413        &self,
414        session_id: &str,
415        node_type: Option<crate::spec_ai_knowledge_graph::NodeType>,
416        limit: Option<i64>,
417    ) -> Result<Vec<GraphNode>> {
418        self.graph_store
419            .list_graph_nodes(session_id, node_type, limit)
420            .map(|nodes| nodes.into_iter().map(from_kg_node).collect())
421    }
422
423    pub fn count_graph_nodes(&self, session_id: &str) -> Result<i64> {
424        self.graph_store.count_graph_nodes(session_id)
425    }
426
427    pub fn update_graph_node(&self, node_id: i64, properties: &JsonValue) -> Result<()> {
428        self.graph_store.update_graph_node(node_id, properties)
429    }
430
431    pub fn delete_graph_node(&self, node_id: i64) -> Result<()> {
432        self.graph_store.delete_graph_node(node_id)
433    }
434
435    // ---------- Graph Edge Operations ----------
436
437    #[allow(clippy::too_many_arguments)]
438    pub fn insert_graph_edge(
439        &self,
440        session_id: &str,
441        source_id: i64,
442        target_id: i64,
443        edge_type: crate::spec_ai_knowledge_graph::EdgeType,
444        predicate: Option<&str>,
445        properties: Option<&JsonValue>,
446        weight: f32,
447    ) -> Result<i64> {
448        self.graph_store.insert_graph_edge(
449            session_id, source_id, target_id, edge_type, predicate, properties, weight,
450        )
451    }
452
453    pub fn get_graph_edge(&self, edge_id: i64) -> Result<Option<GraphEdge>> {
454        self.graph_store
455            .get_graph_edge(edge_id)
456            .map(|opt| opt.map(from_kg_edge))
457    }
458
459    pub fn list_graph_edges(
460        &self,
461        session_id: &str,
462        source_id: Option<i64>,
463        target_id: Option<i64>,
464    ) -> Result<Vec<GraphEdge>> {
465        self.graph_store
466            .list_graph_edges(session_id, source_id, target_id)
467            .map(|edges| edges.into_iter().map(from_kg_edge).collect())
468    }
469
470    pub fn count_graph_edges(&self, session_id: &str) -> Result<i64> {
471        self.graph_store.count_graph_edges(session_id)
472    }
473
474    pub fn delete_graph_edge(&self, edge_id: i64) -> Result<()> {
475        self.graph_store.delete_graph_edge(edge_id)
476    }
477
478    // ---------- Graph Traversal Operations ----------
479
480    pub fn find_shortest_path(
481        &self,
482        session_id: &str,
483        source_id: i64,
484        target_id: i64,
485        max_hops: Option<usize>,
486    ) -> Result<Option<GraphPath>> {
487        self.graph_store
488            .find_shortest_path(session_id, source_id, target_id, max_hops)
489            .map(|opt| opt.map(from_kg_path))
490    }
491
492    pub fn traverse_neighbors(
493        &self,
494        session_id: &str,
495        node_id: i64,
496        direction: crate::spec_ai_knowledge_graph::TraversalDirection,
497        depth: usize,
498    ) -> Result<Vec<GraphNode>> {
499        self.graph_store
500            .traverse_neighbors(session_id, node_id, direction, depth)
501            .map(|nodes| nodes.into_iter().map(from_kg_node).collect())
502    }
503
504    // ---------- Transcriptions ----------
505
506    pub fn insert_transcription(
507        &self,
508        session_id: &str,
509        chunk_id: i64,
510        text: &str,
511        timestamp: chrono::DateTime<Utc>,
512    ) -> Result<i64> {
513        let conn = self.conn();
514        let mut stmt = conn.prepare(
515            "INSERT INTO transcriptions (session_id, chunk_id, text, timestamp, embedding_id) VALUES (?, ?, ?, ?, NULL) RETURNING id",
516        )?;
517        let id: i64 = stmt.query_row(
518            params![session_id, chunk_id, text, timestamp.to_rfc3339()],
519            |row| row.get(0),
520        )?;
521        Ok(id)
522    }
523
524    pub fn update_transcription_embedding(
525        &self,
526        transcription_id: i64,
527        embedding_id: i64,
528    ) -> Result<()> {
529        let conn = self.conn();
530        conn.execute(
531            "UPDATE transcriptions SET embedding_id = ? WHERE id = ?",
532            params![embedding_id, transcription_id],
533        )?;
534        Ok(())
535    }
536
537    #[allow(clippy::type_complexity)]
538    pub fn list_transcriptions(
539        &self,
540        session_id: &str,
541        limit: Option<i64>,
542    ) -> Result<Vec<(i64, i64, String, DateTime<Utc>)>> {
543        let conn = self.conn();
544        let query = if let Some(lim) = limit {
545            format!(
546                "SELECT id, chunk_id, text, CAST(timestamp AS TEXT) FROM transcriptions WHERE session_id = ? ORDER BY chunk_id ASC LIMIT {}",
547                lim
548            )
549        } else {
550            "SELECT id, chunk_id, text, CAST(timestamp AS TEXT) FROM transcriptions WHERE session_id = ? ORDER BY chunk_id ASC".to_string()
551        };
552
553        let mut stmt = conn.prepare(&query)?;
554        let mut rows = stmt.query(params![session_id])?;
555        let mut out = Vec::new();
556
557        while let Some(row) = rows.next()? {
558            let id: i64 = row.get(0)?;
559            let chunk_id: i64 = row.get(1)?;
560            let text: String = row.get(2)?;
561            let timestamp_str: String = row.get(3)?;
562            let timestamp: DateTime<Utc> = timestamp_str.parse().unwrap_or_else(|_| Utc::now());
563            out.push((id, chunk_id, text, timestamp));
564        }
565
566        Ok(out)
567    }
568
569    pub fn get_full_transcription(&self, session_id: &str) -> Result<String> {
570        let transcriptions = self.list_transcriptions(session_id, None)?;
571        Ok(transcriptions
572            .into_iter()
573            .map(|(_, _, text, _)| text)
574            .collect::<Vec<_>>()
575            .join(" "))
576    }
577
578    pub fn delete_transcriptions(&self, session_id: &str) -> Result<()> {
579        let conn = self.conn();
580        conn.execute(
581            "DELETE FROM transcriptions WHERE session_id = ?",
582            params![session_id],
583        )?;
584        Ok(())
585    }
586
587    pub fn get_transcription_by_embedding(&self, embedding_id: i64) -> Result<Option<String>> {
588        let conn = self.conn();
589        let mut stmt =
590            conn.prepare("SELECT text FROM transcriptions WHERE embedding_id = ? LIMIT 1")?;
591        let result: Result<String, _> = stmt.query_row(params![embedding_id], |row| row.get(0));
592        match result {
593            Ok(text) => Ok(Some(text)),
594            Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
595            Err(e) => Err(e.into()),
596        }
597    }
598
599    // ---------- Tokenized Files Cache ----------
600
601    /// Persist tokenization metadata for a file, replacing any existing entry for the path.
602    #[allow(clippy::too_many_arguments)]
603    pub fn upsert_tokenized_file(
604        &self,
605        session_id: &str,
606        path: &str,
607        file_hash: &str,
608        raw_tokens: usize,
609        cleaned_tokens: usize,
610        bytes_captured: usize,
611        truncated: bool,
612        embedding_id: Option<i64>,
613    ) -> Result<i64> {
614        let conn = self.conn();
615        conn.execute(
616            "DELETE FROM tokenized_files WHERE session_id = ? AND path = ?",
617            params![session_id, path],
618        )?;
619        let mut stmt = conn.prepare("INSERT INTO tokenized_files (session_id, path, file_hash, raw_tokens, cleaned_tokens, bytes_captured, truncated, embedding_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id")?;
620        let id: i64 = stmt.query_row(
621            params![
622                session_id,
623                path,
624                file_hash,
625                raw_tokens as i64,
626                cleaned_tokens as i64,
627                bytes_captured as i64,
628                truncated,
629                embedding_id
630            ],
631            |row| row.get(0),
632        )?;
633        Ok(id)
634    }
635
636    pub fn get_tokenized_file(
637        &self,
638        session_id: &str,
639        path: &str,
640    ) -> Result<Option<TokenizedFileRecord>> {
641        let conn = self.conn();
642        let mut stmt = conn.prepare("SELECT id, session_id, path, file_hash, raw_tokens, cleaned_tokens, bytes_captured, truncated, embedding_id, CAST(updated_at AS TEXT) FROM tokenized_files WHERE session_id = ? AND path = ? LIMIT 1")?;
643        let mut rows = stmt.query(params![session_id, path])?;
644        if let Some(row) = rows.next()? {
645            let record = TokenizedFileRecord::from_row(row)?;
646            Ok(Some(record))
647        } else {
648            Ok(None)
649        }
650    }
651
652    pub fn list_tokenized_files(&self, session_id: &str) -> Result<Vec<TokenizedFileRecord>> {
653        let conn = self.conn();
654        let mut stmt = conn.prepare("SELECT id, session_id, path, file_hash, raw_tokens, cleaned_tokens, bytes_captured, truncated, embedding_id, CAST(updated_at AS TEXT) FROM tokenized_files WHERE session_id = ? ORDER BY path")?;
655        let mut rows = stmt.query(params![session_id])?;
656        let mut out = Vec::new();
657        while let Some(row) = rows.next()? {
658            out.push(TokenizedFileRecord::from_row(row)?);
659        }
660        Ok(out)
661    }
662
663    // ========== Mesh Message Persistence ==========
664
665    /// Store a mesh message in the database
666    pub fn mesh_message_store(
667        &self,
668        message_id: &str,
669        source_instance: &str,
670        target_instance: Option<&str>,
671        message_type: &str,
672        payload: &JsonValue,
673        status: &str,
674    ) -> Result<i64> {
675        let conn = self.conn();
676        let payload_json = serde_json::to_string(payload)?;
677        conn.execute(
678            "INSERT INTO mesh_messages (message_id, source_instance, target_instance, message_type, payload, status) VALUES (?, ?, ?, ?, ?, ?)",
679            params![message_id, source_instance, target_instance, message_type, payload_json, status],
680        )?;
681        // Get the last inserted ID
682        let id: i64 = conn.query_row("SELECT last_insert_rowid()", params![], |row| row.get(0))?;
683        Ok(id)
684    }
685
686    /// Check if a message with this ID already exists (for duplicate detection)
687    pub fn mesh_message_exists(&self, message_id: &str) -> Result<bool> {
688        let conn = self.conn();
689        let count: i64 = conn.query_row(
690            "SELECT COUNT(*) FROM mesh_messages WHERE message_id = ?",
691            params![message_id],
692            |row| row.get(0),
693        )?;
694        Ok(count > 0)
695    }
696
697    /// Update message status (e.g., delivered, failed)
698    pub fn mesh_message_update_status(&self, message_id: i64, status: &str) -> Result<()> {
699        let conn = self.conn();
700        conn.execute(
701            "UPDATE mesh_messages SET status = ?, delivered_at = CURRENT_TIMESTAMP WHERE id = ?",
702            params![status, message_id],
703        )?;
704        Ok(())
705    }
706
707    /// Get pending messages for a target instance
708    pub fn mesh_message_get_pending(
709        &self,
710        target_instance: &str,
711    ) -> Result<Vec<MeshMessageRecord>> {
712        let conn = self.conn();
713        let mut stmt = conn.prepare(
714            "SELECT id, source_instance, target_instance, message_type, payload, status, CAST(created_at AS TEXT), CAST(delivered_at AS TEXT)
715             FROM mesh_messages
716             WHERE (target_instance = ? OR target_instance IS NULL) AND status = 'pending'
717             ORDER BY created_at",
718        )?;
719        let mut rows = stmt.query(params![target_instance])?;
720        let mut out = Vec::new();
721        while let Some(row) = rows.next()? {
722            out.push(MeshMessageRecord::from_row(row)?);
723        }
724        Ok(out)
725    }
726
727    /// Get message history for analytics
728    pub fn mesh_message_get_history(
729        &self,
730        instance_id: Option<&str>,
731        limit: usize,
732    ) -> Result<Vec<MeshMessageRecord>> {
733        let conn = self.conn();
734        let query = if instance_id.is_some() {
735            format!(
736                "SELECT id, source_instance, target_instance, message_type, payload, status, CAST(created_at AS TEXT), CAST(delivered_at AS TEXT)
737                 FROM mesh_messages
738                 WHERE source_instance = ? OR target_instance = ?
739                 ORDER BY created_at DESC LIMIT {}",
740                limit
741            )
742        } else {
743            format!(
744                "SELECT id, source_instance, target_instance, message_type, payload, status, CAST(created_at AS TEXT), CAST(delivered_at AS TEXT)
745                 FROM mesh_messages
746                 ORDER BY created_at DESC LIMIT {}",
747                limit
748            )
749        };
750
751        let mut stmt = conn.prepare(&query)?;
752        let mut rows = if let Some(inst) = instance_id {
753            stmt.query(params![inst, inst])?
754        } else {
755            stmt.query(params![])?
756        };
757
758        let mut out = Vec::new();
759        while let Some(row) = rows.next()? {
760            out.push(MeshMessageRecord::from_row(row)?);
761        }
762        Ok(out)
763    }
764
765    // ===== Graph Synchronization Methods =====
766
767    /// Append an entry to the graph changelog
768    #[allow(clippy::too_many_arguments)]
769    pub fn graph_changelog_append(
770        &self,
771        session_id: &str,
772        instance_id: &str,
773        entity_type: &str,
774        entity_id: i64,
775        operation: &str,
776        vector_clock: &str,
777        data: Option<&str>,
778    ) -> Result<i64> {
779        self.graph_store.graph_changelog_append(
780            session_id,
781            instance_id,
782            entity_type,
783            entity_id,
784            operation,
785            vector_clock,
786            data,
787        )
788    }
789
790    /// Get changelog entries since a given timestamp for a session
791    pub fn graph_changelog_get_since(
792        &self,
793        session_id: &str,
794        since_timestamp: &str,
795    ) -> Result<Vec<ChangelogEntry>> {
796        self.graph_store
797            .graph_changelog_get_since(session_id, since_timestamp)
798            .map(|entries| {
799                entries
800                    .into_iter()
801                    .map(|e| ChangelogEntry {
802                        id: e.id,
803                        session_id: e.session_id,
804                        instance_id: e.instance_id,
805                        entity_type: e.entity_type,
806                        entity_id: e.entity_id,
807                        operation: e.operation,
808                        vector_clock: e.vector_clock,
809                        data: e.data,
810                        created_at: e.created_at,
811                    })
812                    .collect()
813            })
814    }
815
816    /// List conflict entries stored in the changelog
817    pub fn graph_list_conflicts(&self, session_id: Option<&str>) -> Result<Vec<ChangelogEntry>> {
818        self.graph_store
819            .graph_list_conflicts(session_id, 100)
820            .map(|entries| {
821                entries
822                    .into_iter()
823                    .map(|e| ChangelogEntry {
824                        id: e.id,
825                        session_id: e.session_id,
826                        instance_id: e.instance_id,
827                        entity_type: e.entity_type,
828                        entity_id: e.entity_id,
829                        operation: e.operation,
830                        vector_clock: e.vector_clock,
831                        data: e.data,
832                        created_at: e.created_at,
833                    })
834                    .collect()
835            })
836    }
837
838    /// Prune old changelog entries (keep last N days)
839    pub fn graph_changelog_prune(&self, days_to_keep: i64) -> Result<usize> {
840        self.graph_store.graph_changelog_prune(days_to_keep)
841    }
842
843    /// Get the vector clock for an instance/session/graph combination
844    pub fn graph_sync_state_get(
845        &self,
846        instance_id: &str,
847        session_id: &str,
848        graph_name: &str,
849    ) -> Result<Option<String>> {
850        self.graph_store
851            .graph_sync_state_get(instance_id, session_id, graph_name)
852    }
853
854    /// Get sync state metadata including last_sync_at
855    pub fn graph_sync_state_get_metadata(
856        &self,
857        instance_id: &str,
858        session_id: &str,
859        graph_name: &str,
860    ) -> Result<Option<SyncStateRecord>> {
861        self.graph_store
862            .graph_sync_state_get_metadata(instance_id, session_id, graph_name)
863            .map(|opt| {
864                opt.map(|r| SyncStateRecord {
865                    vector_clock: r.vector_clock,
866                    last_sync_at: r.last_sync_at,
867                })
868            })
869    }
870
871    /// Update the vector clock for an instance/session/graph combination
872    pub fn graph_sync_state_update(
873        &self,
874        instance_id: &str,
875        session_id: &str,
876        graph_name: &str,
877        vector_clock: &str,
878    ) -> Result<()> {
879        self.graph_store
880            .graph_sync_state_update(instance_id, session_id, graph_name, vector_clock)
881    }
882
883    /// Persist sync configuration for a graph
884    pub fn graph_set_sync_config(
885        &self,
886        session_id: &str,
887        graph_name: &str,
888        sync_enabled: bool,
889        conflict_resolution_strategy: Option<&str>,
890        sync_interval_seconds: Option<u64>,
891    ) -> Result<GraphSyncConfig> {
892        self.graph_store
893            .graph_set_sync_config(
894                session_id,
895                graph_name,
896                sync_enabled,
897                conflict_resolution_strategy,
898                sync_interval_seconds,
899            )
900            .map(|cfg| GraphSyncConfig {
901                sync_enabled: cfg.sync_enabled,
902                conflict_resolution_strategy: cfg.conflict_resolution_strategy,
903                sync_interval_seconds: cfg.sync_interval_seconds,
904            })
905    }
906
907    /// Retrieve sync configuration for a graph
908    pub fn graph_get_sync_config(
909        &self,
910        session_id: &str,
911        graph_name: &str,
912    ) -> Result<GraphSyncConfig> {
913        self.graph_store
914            .graph_get_sync_config(session_id, graph_name)
915            .map(|cfg| GraphSyncConfig {
916                sync_enabled: cfg.sync_enabled,
917                conflict_resolution_strategy: cfg.conflict_resolution_strategy,
918                sync_interval_seconds: cfg.sync_interval_seconds,
919            })
920    }
921
922    /// Enable or disable sync for a graph
923    pub fn graph_set_sync_enabled(
924        &self,
925        session_id: &str,
926        graph_name: &str,
927        enabled: bool,
928    ) -> Result<()> {
929        self.graph_store
930            .graph_set_sync_enabled(session_id, graph_name, enabled)
931    }
932
933    /// Check if sync is enabled for a graph
934    pub fn graph_get_sync_enabled(&self, session_id: &str, graph_name: &str) -> Result<bool> {
935        self.graph_store
936            .graph_get_sync_enabled(session_id, graph_name)
937    }
938
939    /// List all graphs for a session
940    pub fn graph_list(&self, session_id: &str) -> Result<Vec<String>> {
941        self.graph_store.graph_list(session_id)
942    }
943
944    /// List all sync-enabled graphs across all sessions
945    pub fn graph_list_sync_enabled(&self) -> Result<Vec<(String, String)>> {
946        self.graph_store.graph_list_sync_enabled()
947    }
948
949    /// Get a node with its sync metadata
950    pub fn graph_get_node_with_sync(&self, node_id: i64) -> Result<Option<SyncedNodeRecord>> {
951        self.graph_store
952            .graph_get_node_with_sync(node_id)
953            .map(|opt| {
954                opt.map(|r| SyncedNodeRecord {
955                    id: r.id,
956                    session_id: r.session_id,
957                    node_type: r.node_type,
958                    label: r.label,
959                    properties: r.properties,
960                    embedding_id: r.embedding_id,
961                    created_at: r.created_at,
962                    updated_at: r.updated_at,
963                    vector_clock: r.vector_clock,
964                    last_modified_by: r.last_modified_by,
965                    is_deleted: r.is_deleted,
966                    sync_enabled: r.sync_enabled,
967                })
968            })
969    }
970
971    /// Get all synced nodes for a session with optional filters
972    pub fn graph_list_nodes_with_sync(
973        &self,
974        session_id: &str,
975        sync_enabled_only: bool,
976        include_deleted: bool,
977    ) -> Result<Vec<SyncedNodeRecord>> {
978        self.graph_store
979            .graph_list_nodes_with_sync(session_id, sync_enabled_only, include_deleted)
980            .map(|nodes| {
981                nodes
982                    .into_iter()
983                    .map(|r| SyncedNodeRecord {
984                        id: r.id,
985                        session_id: r.session_id,
986                        node_type: r.node_type,
987                        label: r.label,
988                        properties: r.properties,
989                        embedding_id: r.embedding_id,
990                        created_at: r.created_at,
991                        updated_at: r.updated_at,
992                        vector_clock: r.vector_clock,
993                        last_modified_by: r.last_modified_by,
994                        is_deleted: r.is_deleted,
995                        sync_enabled: r.sync_enabled,
996                    })
997                    .collect()
998            })
999    }
1000
1001    /// Get an edge with its sync metadata
1002    pub fn graph_get_edge_with_sync(&self, edge_id: i64) -> Result<Option<SyncedEdgeRecord>> {
1003        self.graph_store
1004            .graph_get_edge_with_sync(edge_id)
1005            .map(|opt| {
1006                opt.map(|r| SyncedEdgeRecord {
1007                    id: r.id,
1008                    session_id: r.session_id,
1009                    source_id: r.source_id,
1010                    target_id: r.target_id,
1011                    edge_type: r.edge_type,
1012                    predicate: r.predicate,
1013                    properties: r.properties,
1014                    weight: r.weight,
1015                    temporal_start: r.temporal_start,
1016                    temporal_end: r.temporal_end,
1017                    created_at: r.created_at,
1018                    vector_clock: r.vector_clock,
1019                    last_modified_by: r.last_modified_by,
1020                    is_deleted: r.is_deleted,
1021                    sync_enabled: r.sync_enabled,
1022                })
1023            })
1024    }
1025
1026    /// Get all synced edges for a session with optional filters
1027    pub fn graph_list_edges_with_sync(
1028        &self,
1029        session_id: &str,
1030        sync_enabled_only: bool,
1031        include_deleted: bool,
1032    ) -> Result<Vec<SyncedEdgeRecord>> {
1033        self.graph_store
1034            .graph_list_edges_with_sync(session_id, sync_enabled_only, include_deleted)
1035            .map(|edges| {
1036                edges
1037                    .into_iter()
1038                    .map(|r| SyncedEdgeRecord {
1039                        id: r.id,
1040                        session_id: r.session_id,
1041                        source_id: r.source_id,
1042                        target_id: r.target_id,
1043                        edge_type: r.edge_type,
1044                        predicate: r.predicate,
1045                        properties: r.properties,
1046                        weight: r.weight,
1047                        temporal_start: r.temporal_start,
1048                        temporal_end: r.temporal_end,
1049                        created_at: r.created_at,
1050                        vector_clock: r.vector_clock,
1051                        last_modified_by: r.last_modified_by,
1052                        is_deleted: r.is_deleted,
1053                        sync_enabled: r.sync_enabled,
1054                    })
1055                    .collect()
1056            })
1057    }
1058
1059    /// Update a node's sync metadata
1060    pub fn graph_update_node_sync_metadata(
1061        &self,
1062        node_id: i64,
1063        vector_clock: &str,
1064        last_modified_by: &str,
1065        sync_enabled: bool,
1066    ) -> Result<()> {
1067        self.graph_store.graph_update_node_sync_metadata(
1068            node_id,
1069            vector_clock,
1070            last_modified_by,
1071            sync_enabled,
1072        )
1073    }
1074
1075    /// Update an edge's sync metadata
1076    pub fn graph_update_edge_sync_metadata(
1077        &self,
1078        edge_id: i64,
1079        vector_clock: &str,
1080        last_modified_by: &str,
1081        sync_enabled: bool,
1082    ) -> Result<()> {
1083        self.graph_store.graph_update_edge_sync_metadata(
1084            edge_id,
1085            vector_clock,
1086            last_modified_by,
1087            sync_enabled,
1088        )
1089    }
1090
1091    /// Mark a node as deleted (tombstone pattern)
1092    pub fn graph_mark_node_deleted(
1093        &self,
1094        node_id: i64,
1095        vector_clock: &str,
1096        deleted_by: &str,
1097    ) -> Result<()> {
1098        self.graph_store
1099            .graph_mark_node_deleted(node_id, vector_clock, deleted_by)
1100    }
1101
1102    /// Mark an edge as deleted (tombstone pattern)
1103    pub fn graph_mark_edge_deleted(
1104        &self,
1105        edge_id: i64,
1106        vector_clock: &str,
1107        deleted_by: &str,
1108    ) -> Result<()> {
1109        self.graph_store
1110            .graph_mark_edge_deleted(edge_id, vector_clock, deleted_by)
1111    }
1112}
1113
1114#[derive(Debug, Clone)]
1115pub struct TokenizedFileRecord {
1116    pub id: i64,
1117    pub session_id: String,
1118    pub path: String,
1119    pub file_hash: String,
1120    pub raw_tokens: usize,
1121    pub cleaned_tokens: usize,
1122    pub bytes_captured: usize,
1123    pub truncated: bool,
1124    pub embedding_id: Option<i64>,
1125    pub updated_at: DateTime<Utc>,
1126}
1127
1128impl TokenizedFileRecord {
1129    fn from_row(row: &duckdb::Row) -> Result<Self> {
1130        let id: i64 = row.get(0)?;
1131        let session_id: String = row.get(1)?;
1132        let path: String = row.get(2)?;
1133        let file_hash: String = row.get(3)?;
1134        let raw_tokens: i64 = row.get(4)?;
1135        let cleaned_tokens: i64 = row.get(5)?;
1136        let bytes_captured: i64 = row.get(6)?;
1137        let truncated: bool = row.get(7)?;
1138        let embedding_id: Option<i64> = row.get(8)?;
1139        let updated_at: String = row.get(9)?;
1140
1141        Ok(Self {
1142            id,
1143            session_id,
1144            path,
1145            file_hash,
1146            raw_tokens: raw_tokens.max(0) as usize,
1147            cleaned_tokens: cleaned_tokens.max(0) as usize,
1148            bytes_captured: bytes_captured.max(0) as usize,
1149            truncated,
1150            embedding_id,
1151            updated_at: updated_at.parse().unwrap_or_else(|_| Utc::now()),
1152        })
1153    }
1154}
1155
1156#[derive(Debug, Clone)]
1157pub struct MeshMessageRecord {
1158    pub id: i64,
1159    pub source_instance: String,
1160    pub target_instance: Option<String>,
1161    pub message_type: String,
1162    pub payload: JsonValue,
1163    pub status: String,
1164    pub created_at: DateTime<Utc>,
1165    pub delivered_at: Option<DateTime<Utc>>,
1166}
1167
1168impl MeshMessageRecord {
1169    fn from_row(row: &duckdb::Row) -> Result<Self> {
1170        let id: i64 = row.get(0)?;
1171        let source_instance: String = row.get(1)?;
1172        let target_instance: Option<String> = row.get(2)?;
1173        let message_type: String = row.get(3)?;
1174        let payload_str: String = row.get(4)?;
1175        let payload: JsonValue = serde_json::from_str(&payload_str)?;
1176        let status: String = row.get(5)?;
1177        let created_at_str: String = row.get(6)?;
1178        let delivered_at_str: Option<String> = row.get(7)?;
1179
1180        Ok(MeshMessageRecord {
1181            id,
1182            source_instance,
1183            target_instance,
1184            message_type,
1185            payload,
1186            status,
1187            created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1188            delivered_at: delivered_at_str.and_then(|s| s.parse().ok()),
1189        })
1190    }
1191}
1192
1193// ===== Graph Sync Record Types =====
1194
1195#[derive(Debug, Clone)]
1196pub struct SyncStateRecord {
1197    pub vector_clock: String,
1198    pub last_sync_at: Option<String>,
1199}
1200
1201#[derive(Debug, Clone, Default)]
1202pub struct GraphSyncConfig {
1203    pub sync_enabled: bool,
1204    pub conflict_resolution_strategy: Option<String>,
1205    pub sync_interval_seconds: Option<u64>,
1206}
1207
1208#[derive(Debug, Clone)]
1209pub struct ChangelogEntry {
1210    pub id: i64,
1211    pub session_id: String,
1212    pub instance_id: String,
1213    pub entity_type: String,
1214    pub entity_id: i64,
1215    pub operation: String,
1216    pub vector_clock: String,
1217    pub data: Option<String>,
1218    pub created_at: DateTime<Utc>,
1219}
1220
1221impl ChangelogEntry {
1222    #[allow(dead_code)]
1223    fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1224        let id: i64 = row.get(0)?;
1225        let session_id: String = row.get(1)?;
1226        let instance_id: String = row.get(2)?;
1227        let entity_type: String = row.get(3)?;
1228        let entity_id: i64 = row.get(4)?;
1229        let operation: String = row.get(5)?;
1230        let vector_clock: String = row.get(6)?;
1231        let data: Option<String> = row.get(7)?;
1232        let created_at_str: String = row.get(8)?;
1233
1234        Ok(ChangelogEntry {
1235            id,
1236            session_id,
1237            instance_id,
1238            entity_type,
1239            entity_id,
1240            operation,
1241            vector_clock,
1242            data,
1243            created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1244        })
1245    }
1246}
1247
1248#[derive(Debug, Clone)]
1249pub struct SyncedNodeRecord {
1250    pub id: i64,
1251    pub session_id: String,
1252    pub node_type: String,
1253    pub label: String,
1254    pub properties: serde_json::Value,
1255    pub embedding_id: Option<i64>,
1256    pub created_at: DateTime<Utc>,
1257    pub updated_at: DateTime<Utc>,
1258    pub vector_clock: String,
1259    pub last_modified_by: Option<String>,
1260    pub is_deleted: bool,
1261    pub sync_enabled: bool,
1262}
1263
1264impl SyncedNodeRecord {
1265    #[allow(dead_code)]
1266    fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1267        let id: i64 = row.get(0)?;
1268        let session_id: String = row.get(1)?;
1269        let node_type: String = row.get(2)?;
1270        let label: String = row.get(3)?;
1271        let properties_str: String = row.get(4)?;
1272        let properties: serde_json::Value = serde_json::from_str(&properties_str).map_err(|e| {
1273            duckdb::Error::FromSqlConversionFailure(4, duckdb::types::Type::Text, Box::new(e))
1274        })?;
1275        let embedding_id: Option<i64> = row.get(5)?;
1276        let created_at_str: String = row.get(6)?;
1277        let updated_at_str: String = row.get(7)?;
1278        let vector_clock: String = row.get(8)?;
1279        let last_modified_by: Option<String> = row.get(9)?;
1280        let is_deleted: bool = row.get(10)?;
1281        let sync_enabled: bool = row.get(11)?;
1282
1283        Ok(SyncedNodeRecord {
1284            id,
1285            session_id,
1286            node_type,
1287            label,
1288            properties,
1289            embedding_id,
1290            created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1291            updated_at: updated_at_str.parse().unwrap_or_else(|_| Utc::now()),
1292            vector_clock,
1293            last_modified_by,
1294            is_deleted,
1295            sync_enabled,
1296        })
1297    }
1298}
1299
1300#[derive(Debug, Clone)]
1301pub struct SyncedEdgeRecord {
1302    pub id: i64,
1303    pub session_id: String,
1304    pub source_id: i64,
1305    pub target_id: i64,
1306    pub edge_type: String,
1307    pub predicate: Option<String>,
1308    pub properties: Option<serde_json::Value>,
1309    pub weight: f32,
1310    pub temporal_start: Option<DateTime<Utc>>,
1311    pub temporal_end: Option<DateTime<Utc>>,
1312    pub created_at: DateTime<Utc>,
1313    pub vector_clock: String,
1314    pub last_modified_by: Option<String>,
1315    pub is_deleted: bool,
1316    pub sync_enabled: bool,
1317}
1318
1319impl SyncedEdgeRecord {
1320    #[allow(dead_code)]
1321    fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1322        let id: i64 = row.get(0)?;
1323        let session_id: String = row.get(1)?;
1324        let source_id: i64 = row.get(2)?;
1325        let target_id: i64 = row.get(3)?;
1326        let edge_type: String = row.get(4)?;
1327        let predicate: Option<String> = row.get(5)?;
1328        let properties_str: Option<String> = row.get(6)?;
1329        let properties: Option<serde_json::Value> = properties_str
1330            .as_ref()
1331            .and_then(|s| serde_json::from_str(s).ok());
1332        let weight: f32 = row.get(7)?;
1333        let temporal_start_str: Option<String> = row.get(8)?;
1334        let temporal_end_str: Option<String> = row.get(9)?;
1335        let created_at_str: String = row.get(10)?;
1336        let vector_clock: String = row.get(11)?;
1337        let last_modified_by: Option<String> = row.get(12)?;
1338        let is_deleted: bool = row.get(13)?;
1339        let sync_enabled: bool = row.get(14)?;
1340
1341        Ok(SyncedEdgeRecord {
1342            id,
1343            session_id,
1344            source_id,
1345            target_id,
1346            edge_type,
1347            predicate,
1348            properties,
1349            weight,
1350            temporal_start: temporal_start_str.and_then(|s| s.parse().ok()),
1351            temporal_end: temporal_end_str.and_then(|s| s.parse().ok()),
1352            created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1353            vector_clock,
1354            last_modified_by,
1355            is_deleted,
1356            sync_enabled,
1357        })
1358    }
1359}