spec_ai_config/persistence/
mod.rs

1pub mod migrations;
2
3use anyhow::{Context, Result};
4use chrono::{DateTime, Utc};
5use directories::BaseDirs;
6use duckdb::{params, Connection};
7use serde_json::Value as JsonValue;
8use std::path::{Path, PathBuf};
9use std::sync::{Arc, Mutex};
10
11use crate::types::{
12    EdgeType, GraphEdge, GraphNode, GraphPath, MemoryVector, Message, MessageRole, NodeType,
13    PolicyEntry, TraversalDirection,
14};
15
16#[derive(Clone)]
17pub struct Persistence {
18    conn: Arc<Mutex<Connection>>,
19    instance_id: String,
20}
21
22impl Persistence {
23    /// Create or open the database at the provided path and run migrations.
24    pub fn new<P: AsRef<Path>>(db_path: P) -> Result<Self> {
25        Self::with_instance_id(db_path, generate_instance_id())
26    }
27
28    /// Create with a specific instance_id
29    pub fn with_instance_id<P: AsRef<Path>>(db_path: P, instance_id: String) -> Result<Self> {
30        let db_path = expand_tilde(db_path.as_ref())?;
31        if let Some(dir) = db_path.parent() {
32            std::fs::create_dir_all(dir).context("creating DB directory")?;
33        }
34        let conn = Connection::open(&db_path).context("opening DuckDB")?;
35        migrations::run(&conn).context("running migrations")?;
36        Ok(Self {
37            conn: Arc::new(Mutex::new(conn)),
38            instance_id,
39        })
40    }
41
42    /// Get the instance ID for this persistence instance
43    pub fn instance_id(&self) -> &str {
44        &self.instance_id
45    }
46
47    /// Checkpoint the database to ensure all WAL data is written to the main database file.
48    /// Call this before shutdown to ensure clean database state.
49    pub fn checkpoint(&self) -> Result<()> {
50        let conn = self.conn();
51        conn.execute_batch("CHECKPOINT;")
52            .context("checkpointing database")
53    }
54
55    /// Creates or opens the default database at ~/.spec-ai/agent_data.duckdb
56    pub fn new_default() -> Result<Self> {
57        let base = BaseDirs::new().context("base directories not available")?;
58        let path = base.home_dir().join(".agent_cli").join("agent_data.duckdb");
59        Self::new(path)
60    }
61
62    /// Get access to the pooled database connection.
63    /// Returns a MutexGuard that provides exclusive access to the connection.
64    pub fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
65        self.conn
66            .lock()
67            .expect("database connection mutex poisoned")
68    }
69
70    // ---------- Messages ----------
71
72    pub fn insert_message(
73        &self,
74        session_id: &str,
75        role: MessageRole,
76        content: &str,
77    ) -> Result<i64> {
78        let conn = self.conn();
79        let mut stmt = conn.prepare(
80            "INSERT INTO messages (session_id, role, content) VALUES (?, ?, ?) RETURNING id",
81        )?;
82        let id: i64 = stmt.query_row(params![session_id, role.as_str(), content], |row| {
83            row.get(0)
84        })?;
85        Ok(id)
86    }
87
88    pub fn list_messages(&self, session_id: &str, limit: i64) -> Result<Vec<Message>> {
89        let conn = self.conn();
90        let mut stmt = conn.prepare("SELECT id, session_id, role, content, CAST(created_at AS TEXT) as created_at FROM messages WHERE session_id = ? ORDER BY id DESC LIMIT ?")?;
91        let mut rows = stmt.query(params![session_id, limit])?;
92        let mut out = Vec::new();
93        while let Some(row) = rows.next()? {
94            let id: i64 = row.get(0)?;
95            let sid: String = row.get(1)?;
96            let role: String = row.get(2)?;
97            let content: String = row.get(3)?;
98            let created_at: String = row.get(4)?; // DuckDB returns TIMESTAMP as string
99            let created_at: DateTime<Utc> = created_at.parse().unwrap_or_else(|_| Utc::now());
100            out.push(Message {
101                id,
102                session_id: sid,
103                role: MessageRole::from_str(&role),
104                content,
105                created_at,
106            });
107        }
108        out.reverse();
109        Ok(out)
110    }
111
112    pub fn get_message(&self, message_id: i64) -> Result<Option<Message>> {
113        let conn = self.conn();
114        let mut stmt = conn.prepare("SELECT id, session_id, role, content, CAST(created_at AS TEXT) as created_at FROM messages WHERE id = ?")?;
115        let mut rows = stmt.query(params![message_id])?;
116        if let Some(row) = rows.next()? {
117            let id: i64 = row.get(0)?;
118            let sid: String = row.get(1)?;
119            let role: String = row.get(2)?;
120            let content: String = row.get(3)?;
121            let created_at: String = row.get(4)?;
122            let created_at: DateTime<Utc> = created_at.parse().unwrap_or_else(|_| Utc::now());
123            Ok(Some(Message {
124                id,
125                session_id: sid,
126                role: MessageRole::from_str(&role),
127                content,
128                created_at,
129            }))
130        } else {
131            Ok(None)
132        }
133    }
134
135    /// Simple pruning by keeping only the most recent `keep_latest` messages.
136    pub fn prune_messages(&self, session_id: &str, keep_latest: i64) -> Result<u64> {
137        let conn = self.conn();
138        let mut stmt = conn.prepare("DELETE FROM messages WHERE session_id = ? AND id NOT IN (SELECT id FROM messages WHERE session_id = ? ORDER BY id DESC LIMIT ?)")?;
139        let changed = stmt.execute(params![session_id, session_id, keep_latest])? as u64;
140        Ok(changed)
141    }
142
143    // ---------- Memory Vectors ----------
144
145    pub fn insert_memory_vector(
146        &self,
147        session_id: &str,
148        message_id: Option<i64>,
149        embedding: &[f32],
150    ) -> Result<i64> {
151        let conn = self.conn();
152        let embedding_json = serde_json::to_string(embedding)?;
153        let mut stmt = conn.prepare("INSERT INTO memory_vectors (session_id, message_id, embedding) VALUES (?, ?, ?) RETURNING id")?;
154        let id: i64 = stmt.query_row(params![session_id, message_id, embedding_json], |row| {
155            row.get(0)
156        })?;
157        Ok(id)
158    }
159
160    pub fn recall_top_k(
161        &self,
162        session_id: &str,
163        query_embedding: &[f32],
164        k: usize,
165    ) -> Result<Vec<(MemoryVector, f32)>> {
166        let conn = self.conn();
167        let mut stmt = conn.prepare("SELECT id, session_id, message_id, embedding, CAST(created_at AS TEXT) as created_at FROM memory_vectors WHERE session_id = ?")?;
168        let mut rows = stmt.query(params![session_id])?;
169        let mut scored: Vec<(MemoryVector, f32)> = Vec::new();
170        while let Some(row) = rows.next()? {
171            let id: i64 = row.get(0)?;
172            let sid: String = row.get(1)?;
173            let message_id: Option<i64> = row.get(2)?;
174            let embedding_text: String = row.get(3)?;
175            let created_at: String = row.get(4)?;
176            let created_at: DateTime<Utc> = created_at.parse().unwrap_or_else(|_| Utc::now());
177            let embedding: Vec<f32> = serde_json::from_str(&embedding_text).unwrap_or_default();
178            let score = cosine_similarity(query_embedding, &embedding);
179            scored.push((
180                MemoryVector {
181                    id,
182                    session_id: sid,
183                    message_id,
184                    embedding,
185                    created_at,
186                },
187                score,
188            ));
189        }
190        scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
191        scored.truncate(k);
192        Ok(scored)
193    }
194
195    /// List known session IDs ordered by most recent activity
196    pub fn list_sessions(&self) -> Result<Vec<String>> {
197        let conn = self.conn();
198        let mut stmt = conn.prepare(
199            "SELECT session_id, MAX(created_at) as last FROM messages GROUP BY session_id ORDER BY last DESC"
200        )?;
201        let mut rows = stmt.query([])?;
202        let mut out = Vec::new();
203        while let Some(row) = rows.next()? {
204            let sid: String = row.get(0)?;
205            out.push(sid);
206        }
207        Ok(out)
208    }
209
210    // ---------- Tool Log ----------
211
212    pub fn log_tool(
213        &self,
214        session_id: &str,
215        agent_name: &str,
216        run_id: &str,
217        tool_name: &str,
218        arguments: &JsonValue,
219        result: &JsonValue,
220        success: bool,
221        error: Option<&str>,
222    ) -> Result<i64> {
223        let conn = self.conn();
224        let mut stmt = conn.prepare("INSERT INTO tool_log (session_id, agent, run_id, tool_name, arguments, result, success, error) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id")?;
225        let id: i64 = stmt.query_row(
226            params![
227                session_id,
228                agent_name,
229                run_id,
230                tool_name,
231                arguments.to_string(),
232                result.to_string(),
233                success,
234                error.unwrap_or("")
235            ],
236            |row| row.get(0),
237        )?;
238        Ok(id)
239    }
240
241    // ---------- Policy Cache ----------
242
243    pub fn policy_upsert(&self, key: &str, value: &JsonValue) -> Result<()> {
244        let conn = self.conn();
245        // DuckDB upsert workaround: delete then insert atomically within a transaction.
246        conn.execute_batch("BEGIN TRANSACTION;")?;
247        {
248            let mut del = conn.prepare("DELETE FROM policy_cache WHERE key = ?")?;
249            let _ = del.execute(params![key])?;
250            let mut ins = conn.prepare("INSERT INTO policy_cache (key, value, updated_at) VALUES (?, ?, CURRENT_TIMESTAMP)")?;
251            let _ = ins.execute(params![key, value.to_string()])?;
252        }
253        conn.execute_batch("COMMIT;")?;
254        Ok(())
255    }
256
257    pub fn policy_get(&self, key: &str) -> Result<Option<PolicyEntry>> {
258        let conn = self.conn();
259        let mut stmt = conn.prepare("SELECT key, value, CAST(updated_at AS TEXT) as updated_at FROM policy_cache WHERE key = ?")?;
260        let mut rows = stmt.query(params![key])?;
261        if let Some(row) = rows.next()? {
262            let key: String = row.get(0)?;
263            let value_text: String = row.get(1)?;
264            let updated_at: String = row.get(2)?;
265            let updated_at: DateTime<Utc> = updated_at.parse().unwrap_or_else(|_| Utc::now());
266            let value: JsonValue = serde_json::from_str(&value_text).unwrap_or(JsonValue::Null);
267            Ok(Some(PolicyEntry {
268                key,
269                value,
270                updated_at,
271            }))
272        } else {
273            Ok(None)
274        }
275    }
276}
277
278fn generate_instance_id() -> String {
279    let hostname = hostname::get()
280        .ok()
281        .and_then(|h| h.into_string().ok())
282        .unwrap_or_else(|| "unknown".to_string());
283    let uuid = uuid::Uuid::new_v7(uuid::Timestamp::now(uuid::NoContext));
284    format!("{}-{}", hostname, uuid)
285}
286
287fn expand_tilde(path: &Path) -> Result<PathBuf> {
288    let path_str = path.to_string_lossy();
289    if path_str == "~" {
290        let base = BaseDirs::new().context("base directories not available")?;
291        Ok(base.home_dir().to_path_buf())
292    } else if let Some(stripped) = path_str.strip_prefix("~/") {
293        let base = BaseDirs::new().context("base directories not available")?;
294        Ok(base.home_dir().join(stripped))
295    } else {
296        Ok(path.to_path_buf())
297    }
298}
299
300#[cfg(test)]
301mod tests {
302    use super::*;
303    use std::path::Path;
304
305    #[test]
306    fn expands_home_directory_prefix() {
307        let base = BaseDirs::new().expect("home directory available");
308        let expected = base.home_dir().join("demo.db");
309        let result = expand_tilde(Path::new("~/demo.db")).expect("path expansion succeeds");
310        assert_eq!(result, expected);
311    }
312
313    #[test]
314    fn leaves_regular_paths_unchanged() {
315        let input = Path::new("relative/path.db");
316        let result = expand_tilde(input).expect("path expansion succeeds");
317        assert_eq!(result, input);
318    }
319}
320
321fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
322    if a.is_empty() || b.is_empty() || a.len() != b.len() {
323        return 0.0;
324    }
325    let mut dot = 0.0f32;
326    let mut na = 0.0f32;
327    let mut nb = 0.0f32;
328    for i in 0..a.len() {
329        dot += a[i] * b[i];
330        na += a[i] * a[i];
331        nb += b[i] * b[i];
332    }
333    if na == 0.0 || nb == 0.0 {
334        return 0.0;
335    }
336    dot / (na.sqrt() * nb.sqrt())
337}
338
339// ========== Knowledge Graph Methods ==========
340
341impl Persistence {
342    // ---------- Graph Node Operations ----------
343
344    pub fn insert_graph_node(
345        &self,
346        session_id: &str,
347        node_type: NodeType,
348        label: &str,
349        properties: &JsonValue,
350        embedding_id: Option<i64>,
351    ) -> Result<i64> {
352        use crate::sync::VectorClock;
353
354        // Check if sync is enabled BEFORE locking the connection to avoid deadlock
355        let sync_enabled = self
356            .graph_get_sync_enabled(session_id, "default")
357            .unwrap_or(false);
358
359        // Create initial vector clock for this node
360        let mut vector_clock = VectorClock::new();
361        vector_clock.increment(&self.instance_id);
362        let vc_json = vector_clock.to_json()?;
363
364        let conn = self.conn();
365
366        // Insert the node with sync metadata
367        let mut stmt = conn.prepare(
368            "INSERT INTO graph_nodes (session_id, node_type, label, properties, embedding_id,
369                                     vector_clock, last_modified_by, sync_enabled)
370             VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id",
371        )?;
372        let id: i64 = stmt.query_row(
373            params![
374                session_id,
375                node_type.as_str(),
376                label,
377                properties.to_string(),
378                embedding_id,
379                vc_json,
380                self.instance_id,
381                sync_enabled,
382            ],
383            |row| row.get(0),
384        )?;
385
386        // If sync is enabled, append to changelog
387        if sync_enabled {
388            let node_data = serde_json::json!({
389                "id": id,
390                "session_id": session_id,
391                "node_type": node_type.as_str(),
392                "label": label,
393                "properties": properties,
394                "embedding_id": embedding_id,
395            });
396
397            self.graph_changelog_append(
398                session_id,
399                &self.instance_id,
400                "node",
401                id,
402                "create",
403                &vc_json,
404                Some(&node_data.to_string()),
405            )?;
406        }
407
408        Ok(id)
409    }
410
411    pub fn get_graph_node(&self, node_id: i64) -> Result<Option<GraphNode>> {
412        let conn = self.conn();
413        let mut stmt = conn.prepare(
414            "SELECT id, session_id, node_type, label, properties, embedding_id,
415                    CAST(created_at AS TEXT), CAST(updated_at AS TEXT)
416             FROM graph_nodes WHERE id = ?",
417        )?;
418        let mut rows = stmt.query(params![node_id])?;
419        if let Some(row) = rows.next()? {
420            Ok(Some(Self::row_to_graph_node(row)?))
421        } else {
422            Ok(None)
423        }
424    }
425
426    pub fn list_graph_nodes(
427        &self,
428        session_id: &str,
429        node_type: Option<NodeType>,
430        limit: Option<i64>,
431    ) -> Result<Vec<GraphNode>> {
432        let conn = self.conn();
433
434        let nodes = if let Some(nt) = node_type {
435            let mut stmt = conn.prepare(
436                "SELECT id, session_id, node_type, label, properties, embedding_id,
437                        CAST(created_at AS TEXT), CAST(updated_at AS TEXT)
438                 FROM graph_nodes WHERE session_id = ? AND node_type = ?
439                 ORDER BY id DESC LIMIT ?",
440            )?;
441            let query = stmt.query(params![session_id, nt.as_str(), limit.unwrap_or(100)])?;
442            Self::collect_graph_nodes(query)?
443        } else {
444            let mut stmt = conn.prepare(
445                "SELECT id, session_id, node_type, label, properties, embedding_id,
446                        CAST(created_at AS TEXT), CAST(updated_at AS TEXT)
447                 FROM graph_nodes WHERE session_id = ?
448                 ORDER BY id DESC LIMIT ?",
449            )?;
450            let query = stmt.query(params![session_id, limit.unwrap_or(100)])?;
451            Self::collect_graph_nodes(query)?
452        };
453
454        Ok(nodes)
455    }
456
457    pub fn count_graph_nodes(&self, session_id: &str) -> Result<i64> {
458        let conn = self.conn();
459        let mut stmt = conn.prepare("SELECT COUNT(*) FROM graph_nodes WHERE session_id = ?")?;
460        let count: i64 = stmt.query_row(params![session_id], |row| row.get(0))?;
461        Ok(count)
462    }
463
464    pub fn update_graph_node(&self, node_id: i64, properties: &JsonValue) -> Result<()> {
465        use crate::sync::VectorClock;
466
467        let conn = self.conn();
468
469        // First get the current node data and vector clock
470        let mut stmt = conn.prepare(
471            "SELECT session_id, node_type, label, vector_clock, sync_enabled
472             FROM graph_nodes WHERE id = ?",
473        )?;
474
475        let (session_id, node_type, label, current_vc_json, sync_enabled): (
476            String,
477            String,
478            String,
479            Option<String>,
480            bool,
481        ) = stmt.query_row(params![node_id], |row| {
482            Ok((
483                row.get(0)?,
484                row.get(1)?,
485                row.get(2)?,
486                row.get(3)?,
487                row.get(4).unwrap_or(false),
488            ))
489        })?;
490
491        // Update the vector clock
492        let mut vector_clock = if let Some(vc_json) = current_vc_json {
493            VectorClock::from_json(&vc_json).unwrap_or_else(|_| VectorClock::new())
494        } else {
495            VectorClock::new()
496        };
497        vector_clock.increment(&self.instance_id);
498        let vc_json = vector_clock.to_json()?;
499
500        // Update the node with new properties and sync metadata
501        conn.execute(
502            "UPDATE graph_nodes
503             SET properties = ?,
504                 vector_clock = ?,
505                 last_modified_by = ?,
506                 updated_at = CURRENT_TIMESTAMP
507             WHERE id = ?",
508            params![properties.to_string(), vc_json, self.instance_id, node_id],
509        )?;
510
511        // If sync is enabled, append to changelog
512        if sync_enabled {
513            let node_data = serde_json::json!({
514                "id": node_id,
515                "session_id": session_id,
516                "node_type": node_type,
517                "label": label,
518                "properties": properties,
519            });
520
521            self.graph_changelog_append(
522                &session_id,
523                &self.instance_id,
524                "node",
525                node_id,
526                "update",
527                &vc_json,
528                Some(&node_data.to_string()),
529            )?;
530        }
531
532        Ok(())
533    }
534
535    pub fn delete_graph_node(&self, node_id: i64) -> Result<()> {
536        use crate::sync::VectorClock;
537
538        let conn = self.conn();
539
540        // First get the node data before deletion
541        let mut stmt = conn.prepare(
542            "SELECT session_id, node_type, label, properties, vector_clock, sync_enabled
543             FROM graph_nodes WHERE id = ?",
544        )?;
545
546        let result = stmt.query_row(params![node_id], |row| {
547            Ok((
548                row.get::<_, String>(0)?,
549                row.get::<_, String>(1)?,
550                row.get::<_, String>(2)?,
551                row.get::<_, String>(3)?,
552                row.get::<_, Option<String>>(4)?,
553                row.get::<_, bool>(5).unwrap_or(false),
554            ))
555        });
556
557        // If node exists, handle sync tracking
558        if let Ok((session_id, node_type, label, properties, current_vc_json, sync_enabled)) =
559            result
560        {
561            if sync_enabled {
562                // Update the vector clock for the deletion
563                let mut vector_clock = if let Some(vc_json) = current_vc_json {
564                    VectorClock::from_json(&vc_json).unwrap_or_else(|_| VectorClock::new())
565                } else {
566                    VectorClock::new()
567                };
568                vector_clock.increment(&self.instance_id);
569                let vc_json = vector_clock.to_json()?;
570
571                // Create a tombstone entry for sync
572                conn.execute(
573                    "INSERT INTO graph_tombstones
574                     (session_id, entity_type, entity_id, deleted_by, vector_clock)
575                     VALUES (?, ?, ?, ?, ?)",
576                    params![session_id, "node", node_id, self.instance_id, vc_json],
577                )?;
578
579                // Append deletion to changelog
580                let node_data = serde_json::json!({
581                    "id": node_id,
582                    "session_id": session_id,
583                    "node_type": node_type,
584                    "label": label,
585                    "properties": properties,
586                });
587
588                self.graph_changelog_append(
589                    &session_id,
590                    &self.instance_id,
591                    "node",
592                    node_id,
593                    "delete",
594                    &vc_json,
595                    Some(&node_data.to_string()),
596                )?;
597            }
598        }
599
600        // Now delete the node
601        conn.execute("DELETE FROM graph_nodes WHERE id = ?", params![node_id])?;
602        Ok(())
603    }
604
605    // ---------- Graph Edge Operations ----------
606
607    pub fn insert_graph_edge(
608        &self,
609        session_id: &str,
610        source_id: i64,
611        target_id: i64,
612        edge_type: EdgeType,
613        predicate: Option<&str>,
614        properties: Option<&JsonValue>,
615        weight: f32,
616    ) -> Result<i64> {
617        use crate::sync::VectorClock;
618
619        // Check if sync is enabled BEFORE locking the connection to avoid deadlock
620        let sync_enabled = self
621            .graph_get_sync_enabled(session_id, "default")
622            .unwrap_or(false);
623
624        // Create initial vector clock for this edge
625        let mut vector_clock = VectorClock::new();
626        vector_clock.increment(&self.instance_id);
627        let vc_json = vector_clock.to_json()?;
628
629        let conn = self.conn();
630
631        // Insert the edge with sync metadata
632        let mut stmt = conn.prepare(
633            "INSERT INTO graph_edges (session_id, source_id, target_id, edge_type, predicate, properties, weight,
634                                     vector_clock, last_modified_by, sync_enabled)
635             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING id",
636        )?;
637        let props_str = properties.map(|p| p.to_string());
638        let id: i64 = stmt.query_row(
639            params![
640                session_id,
641                source_id,
642                target_id,
643                edge_type.as_str(),
644                predicate,
645                props_str,
646                weight,
647                vc_json,
648                self.instance_id,
649                sync_enabled,
650            ],
651            |row| row.get(0),
652        )?;
653
654        // If sync is enabled, append to changelog
655        if sync_enabled {
656            let edge_data = serde_json::json!({
657                "id": id,
658                "session_id": session_id,
659                "source_id": source_id,
660                "target_id": target_id,
661                "edge_type": edge_type.as_str(),
662                "predicate": predicate,
663                "properties": properties,
664                "weight": weight,
665            });
666
667            self.graph_changelog_append(
668                session_id,
669                &self.instance_id,
670                "edge",
671                id,
672                "insert",
673                &vc_json,
674                Some(&edge_data.to_string()),
675            )?;
676        }
677
678        Ok(id)
679    }
680
681    pub fn get_graph_edge(&self, edge_id: i64) -> Result<Option<GraphEdge>> {
682        let conn = self.conn();
683        let mut stmt = conn.prepare(
684            "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
685                    CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
686             FROM graph_edges WHERE id = ?",
687        )?;
688        let mut rows = stmt.query(params![edge_id])?;
689        if let Some(row) = rows.next()? {
690            Ok(Some(Self::row_to_graph_edge(row)?))
691        } else {
692            Ok(None)
693        }
694    }
695
696    pub fn list_graph_edges(
697        &self,
698        session_id: &str,
699        source_id: Option<i64>,
700        target_id: Option<i64>,
701    ) -> Result<Vec<GraphEdge>> {
702        let conn = self.conn();
703
704        let edges = match (source_id, target_id) {
705            (Some(src), Some(tgt)) => {
706                let mut stmt = conn.prepare(
707                    "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
708                            CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
709                     FROM graph_edges WHERE session_id = ? AND source_id = ? AND target_id = ?",
710                )?;
711                let query = stmt.query(params![session_id, src, tgt])?;
712                Self::collect_graph_edges(query)?
713            }
714            (Some(src), None) => {
715                let mut stmt = conn.prepare(
716                    "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
717                            CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
718                     FROM graph_edges WHERE session_id = ? AND source_id = ?",
719                )?;
720                let query = stmt.query(params![session_id, src])?;
721                Self::collect_graph_edges(query)?
722            }
723            (None, Some(tgt)) => {
724                let mut stmt = conn.prepare(
725                    "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
726                            CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
727                     FROM graph_edges WHERE session_id = ? AND target_id = ?",
728                )?;
729                let query = stmt.query(params![session_id, tgt])?;
730                Self::collect_graph_edges(query)?
731            }
732            (None, None) => {
733                let mut stmt = conn.prepare(
734                    "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
735                            CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
736                     FROM graph_edges WHERE session_id = ?",
737                )?;
738                let query = stmt.query(params![session_id])?;
739                Self::collect_graph_edges(query)?
740            }
741        };
742
743        Ok(edges)
744    }
745
746    pub fn count_graph_edges(&self, session_id: &str) -> Result<i64> {
747        let conn = self.conn();
748        let mut stmt = conn.prepare("SELECT COUNT(*) FROM graph_edges WHERE session_id = ?")?;
749        let count: i64 = stmt.query_row(params![session_id], |row| row.get(0))?;
750        Ok(count)
751    }
752
753    pub fn delete_graph_edge(&self, edge_id: i64) -> Result<()> {
754        use crate::sync::VectorClock;
755
756        let conn = self.conn();
757
758        // First get the edge data before deletion
759        let mut stmt = conn.prepare(
760            "SELECT session_id, source_id, target_id, edge_type, predicate, properties, weight,
761                    vector_clock, sync_enabled
762             FROM graph_edges WHERE id = ?",
763        )?;
764
765        let result = stmt.query_row(params![edge_id], |row| {
766            Ok((
767                row.get::<_, String>(0)?,
768                row.get::<_, i64>(1)?,
769                row.get::<_, i64>(2)?,
770                row.get::<_, String>(3)?,
771                row.get::<_, Option<String>>(4)?,
772                row.get::<_, Option<String>>(5)?,
773                row.get::<_, f32>(6)?,
774                row.get::<_, Option<String>>(7)?,
775                row.get::<_, bool>(8).unwrap_or(false),
776            ))
777        });
778
779        // If edge exists, handle sync tracking
780        if let Ok((
781            session_id,
782            source_id,
783            target_id,
784            edge_type,
785            predicate,
786            properties,
787            weight,
788            current_vc_json,
789            sync_enabled,
790        )) = result
791        {
792            if sync_enabled {
793                // Update the vector clock for the deletion
794                let mut vector_clock = if let Some(vc_json) = current_vc_json {
795                    VectorClock::from_json(&vc_json).unwrap_or_else(|_| VectorClock::new())
796                } else {
797                    VectorClock::new()
798                };
799                vector_clock.increment(&self.instance_id);
800                let vc_json = vector_clock.to_json()?;
801
802                // Create a tombstone entry for sync
803                conn.execute(
804                    "INSERT INTO graph_tombstones
805                     (session_id, entity_type, entity_id, deleted_by, vector_clock)
806                     VALUES (?, ?, ?, ?, ?)",
807                    params![session_id, "edge", edge_id, self.instance_id, vc_json],
808                )?;
809
810                // Append deletion to changelog
811                let edge_data = serde_json::json!({
812                    "id": edge_id,
813                    "session_id": session_id,
814                    "source_id": source_id,
815                    "target_id": target_id,
816                    "edge_type": edge_type,
817                    "predicate": predicate,
818                    "properties": properties,
819                    "weight": weight,
820                });
821
822                self.graph_changelog_append(
823                    &session_id,
824                    &self.instance_id,
825                    "edge",
826                    edge_id,
827                    "delete",
828                    &vc_json,
829                    Some(&edge_data.to_string()),
830                )?;
831            }
832        }
833
834        // Now delete the edge
835        conn.execute("DELETE FROM graph_edges WHERE id = ?", params![edge_id])?;
836        Ok(())
837    }
838
839    // ---------- Graph Traversal Operations ----------
840
841    pub fn find_shortest_path(
842        &self,
843        session_id: &str,
844        source_id: i64,
845        target_id: i64,
846        max_hops: Option<usize>,
847    ) -> Result<Option<GraphPath>> {
848        // Simple BFS implementation for finding shortest path
849        // In production, this would use DuckPGQ's ANY SHORTEST functionality
850        let max_depth = max_hops.unwrap_or(10);
851
852        let mut visited = std::collections::HashSet::new();
853        let mut queue = std::collections::VecDeque::new();
854        let mut parent_map = std::collections::HashMap::new();
855
856        queue.push_back((source_id, 0));
857        visited.insert(source_id);
858
859        while let Some((current_id, depth)) = queue.pop_front() {
860            if current_id == target_id {
861                // Reconstruct path
862                let path = self.reconstruct_path(&parent_map, source_id, target_id)?;
863                return Ok(Some(path));
864            }
865
866            if depth >= max_depth {
867                continue;
868            }
869
870            // Get outgoing edges
871            let edges = self.list_graph_edges(session_id, Some(current_id), None)?;
872            for edge in edges {
873                let target = edge.target_id;
874                if !visited.contains(&target) {
875                    visited.insert(target);
876                    parent_map.insert(target, (current_id, edge));
877                    queue.push_back((target, depth + 1));
878                }
879            }
880        }
881
882        Ok(None)
883    }
884
885    pub fn traverse_neighbors(
886        &self,
887        session_id: &str,
888        node_id: i64,
889        direction: TraversalDirection,
890        depth: usize,
891    ) -> Result<Vec<GraphNode>> {
892        if depth == 0 {
893            return Ok(vec![]);
894        }
895
896        let mut visited = std::collections::HashSet::new();
897        let mut result = Vec::new();
898        let mut queue = std::collections::VecDeque::new();
899
900        queue.push_back((node_id, 0));
901        visited.insert(node_id);
902
903        while let Some((current_id, current_depth)) = queue.pop_front() {
904            if current_depth > 0 {
905                if let Some(node) = self.get_graph_node(current_id)? {
906                    result.push(node);
907                }
908            }
909
910            if current_depth >= depth {
911                continue;
912            }
913
914            // Get edges based on direction
915            let edges = match direction {
916                TraversalDirection::Outgoing => {
917                    self.list_graph_edges(session_id, Some(current_id), None)?
918                }
919                TraversalDirection::Incoming => {
920                    self.list_graph_edges(session_id, None, Some(current_id))?
921                }
922                TraversalDirection::Both => {
923                    let mut out_edges =
924                        self.list_graph_edges(session_id, Some(current_id), None)?;
925                    let in_edges = self.list_graph_edges(session_id, None, Some(current_id))?;
926                    out_edges.extend(in_edges);
927                    out_edges
928                }
929            };
930
931            for edge in edges {
932                let next_id = match direction {
933                    TraversalDirection::Outgoing => edge.target_id,
934                    TraversalDirection::Incoming => edge.source_id,
935                    TraversalDirection::Both => {
936                        if edge.source_id == current_id {
937                            edge.target_id
938                        } else {
939                            edge.source_id
940                        }
941                    }
942                };
943
944                if !visited.contains(&next_id) {
945                    visited.insert(next_id);
946                    queue.push_back((next_id, current_depth + 1));
947                }
948            }
949        }
950
951        Ok(result)
952    }
953
954    // ---------- Helper Methods ----------
955
956    fn row_to_graph_node(row: &duckdb::Row) -> Result<GraphNode> {
957        let id: i64 = row.get(0)?;
958        let session_id: String = row.get(1)?;
959        let node_type: String = row.get(2)?;
960        let label: String = row.get(3)?;
961        let properties: String = row.get(4)?;
962        let embedding_id: Option<i64> = row.get(5)?;
963        let created_at: String = row.get(6)?;
964        let updated_at: String = row.get(7)?;
965
966        Ok(GraphNode {
967            id,
968            session_id,
969            node_type: NodeType::from_str(&node_type),
970            label,
971            properties: serde_json::from_str(&properties).unwrap_or(JsonValue::Null),
972            embedding_id,
973            created_at: created_at.parse().unwrap_or_else(|_| Utc::now()),
974            updated_at: updated_at.parse().unwrap_or_else(|_| Utc::now()),
975        })
976    }
977
978    fn row_to_graph_edge(row: &duckdb::Row) -> Result<GraphEdge> {
979        let id: i64 = row.get(0)?;
980        let session_id: String = row.get(1)?;
981        let source_id: i64 = row.get(2)?;
982        let target_id: i64 = row.get(3)?;
983        let edge_type: String = row.get(4)?;
984        let predicate: Option<String> = row.get(5)?;
985        let properties: Option<String> = row.get(6)?;
986        let weight: f32 = row.get(7)?;
987        let temporal_start: Option<String> = row.get(8)?;
988        let temporal_end: Option<String> = row.get(9)?;
989        let created_at: String = row.get(10)?;
990
991        Ok(GraphEdge {
992            id,
993            session_id,
994            source_id,
995            target_id,
996            edge_type: EdgeType::from_str(&edge_type),
997            predicate,
998            properties: properties.and_then(|p| serde_json::from_str(&p).ok()),
999            weight,
1000            temporal_start: temporal_start.and_then(|s| s.parse().ok()),
1001            temporal_end: temporal_end.and_then(|s| s.parse().ok()),
1002            created_at: created_at.parse().unwrap_or_else(|_| Utc::now()),
1003        })
1004    }
1005
1006    fn collect_graph_nodes(mut rows: duckdb::Rows) -> Result<Vec<GraphNode>> {
1007        let mut nodes = Vec::new();
1008        while let Some(row) = rows.next()? {
1009            nodes.push(Self::row_to_graph_node(row)?);
1010        }
1011        Ok(nodes)
1012    }
1013
1014    fn collect_graph_edges(mut rows: duckdb::Rows) -> Result<Vec<GraphEdge>> {
1015        let mut edges = Vec::new();
1016        while let Some(row) = rows.next()? {
1017            edges.push(Self::row_to_graph_edge(row)?);
1018        }
1019        Ok(edges)
1020    }
1021
1022    fn reconstruct_path(
1023        &self,
1024        parent_map: &std::collections::HashMap<i64, (i64, GraphEdge)>,
1025        source_id: i64,
1026        target_id: i64,
1027    ) -> Result<GraphPath> {
1028        let mut path_edges = Vec::new();
1029        let mut path_nodes = Vec::new();
1030        let mut current = target_id;
1031        let mut total_weight = 0.0;
1032
1033        // Collect edges in reverse order
1034        while current != source_id {
1035            if let Some((parent, edge)) = parent_map.get(&current) {
1036                path_edges.push(edge.clone());
1037                total_weight += edge.weight;
1038                current = *parent;
1039            } else {
1040                break;
1041            }
1042        }
1043
1044        // Reverse to get correct order
1045        path_edges.reverse();
1046
1047        // Collect nodes
1048        if let Some(node) = self.get_graph_node(source_id)? {
1049            path_nodes.push(node);
1050        }
1051        for edge in &path_edges {
1052            if let Some(node) = self.get_graph_node(edge.target_id)? {
1053                path_nodes.push(node);
1054            }
1055        }
1056
1057        Ok(GraphPath {
1058            length: path_edges.len(),
1059            weight: total_weight,
1060            nodes: path_nodes,
1061            edges: path_edges,
1062        })
1063    }
1064
1065    // ---------- Transcriptions ----------
1066
1067    pub fn insert_transcription(
1068        &self,
1069        session_id: &str,
1070        chunk_id: i64,
1071        text: &str,
1072        timestamp: chrono::DateTime<Utc>,
1073    ) -> Result<i64> {
1074        let conn = self.conn();
1075        let mut stmt = conn.prepare(
1076            "INSERT INTO transcriptions (session_id, chunk_id, text, timestamp, embedding_id) VALUES (?, ?, ?, ?, NULL) RETURNING id",
1077        )?;
1078        let id: i64 = stmt.query_row(
1079            params![session_id, chunk_id, text, timestamp.to_rfc3339()],
1080            |row| row.get(0),
1081        )?;
1082        Ok(id)
1083    }
1084
1085    pub fn update_transcription_embedding(
1086        &self,
1087        transcription_id: i64,
1088        embedding_id: i64,
1089    ) -> Result<()> {
1090        let conn = self.conn();
1091        conn.execute(
1092            "UPDATE transcriptions SET embedding_id = ? WHERE id = ?",
1093            params![embedding_id, transcription_id],
1094        )?;
1095        Ok(())
1096    }
1097
1098    pub fn list_transcriptions(
1099        &self,
1100        session_id: &str,
1101        limit: Option<i64>,
1102    ) -> Result<Vec<(i64, i64, String, DateTime<Utc>)>> {
1103        let conn = self.conn();
1104        let query = if let Some(lim) = limit {
1105            format!(
1106                "SELECT id, chunk_id, text, CAST(timestamp AS TEXT) FROM transcriptions WHERE session_id = ? ORDER BY chunk_id ASC LIMIT {}",
1107                lim
1108            )
1109        } else {
1110            "SELECT id, chunk_id, text, CAST(timestamp AS TEXT) FROM transcriptions WHERE session_id = ? ORDER BY chunk_id ASC".to_string()
1111        };
1112
1113        let mut stmt = conn.prepare(&query)?;
1114        let mut rows = stmt.query(params![session_id])?;
1115        let mut out = Vec::new();
1116
1117        while let Some(row) = rows.next()? {
1118            let id: i64 = row.get(0)?;
1119            let chunk_id: i64 = row.get(1)?;
1120            let text: String = row.get(2)?;
1121            let timestamp_str: String = row.get(3)?;
1122            let timestamp: DateTime<Utc> = timestamp_str.parse().unwrap_or_else(|_| Utc::now());
1123            out.push((id, chunk_id, text, timestamp));
1124        }
1125
1126        Ok(out)
1127    }
1128
1129    pub fn get_full_transcription(&self, session_id: &str) -> Result<String> {
1130        let transcriptions = self.list_transcriptions(session_id, None)?;
1131        Ok(transcriptions
1132            .into_iter()
1133            .map(|(_, _, text, _)| text)
1134            .collect::<Vec<_>>()
1135            .join(" "))
1136    }
1137
1138    pub fn delete_transcriptions(&self, session_id: &str) -> Result<()> {
1139        let conn = self.conn();
1140        conn.execute(
1141            "DELETE FROM transcriptions WHERE session_id = ?",
1142            params![session_id],
1143        )?;
1144        Ok(())
1145    }
1146
1147    pub fn get_transcription_by_embedding(&self, embedding_id: i64) -> Result<Option<String>> {
1148        let conn = self.conn();
1149        let mut stmt =
1150            conn.prepare("SELECT text FROM transcriptions WHERE embedding_id = ? LIMIT 1")?;
1151        let result: Result<String, _> = stmt.query_row(params![embedding_id], |row| row.get(0));
1152        match result {
1153            Ok(text) => Ok(Some(text)),
1154            Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
1155            Err(e) => Err(e.into()),
1156        }
1157    }
1158
1159    // ---------- Tokenized Files Cache ----------
1160
1161    /// Persist tokenization metadata for a file, replacing any existing entry for the path.
1162    pub fn upsert_tokenized_file(
1163        &self,
1164        session_id: &str,
1165        path: &str,
1166        file_hash: &str,
1167        raw_tokens: usize,
1168        cleaned_tokens: usize,
1169        bytes_captured: usize,
1170        truncated: bool,
1171        embedding_id: Option<i64>,
1172    ) -> Result<i64> {
1173        let conn = self.conn();
1174        conn.execute(
1175            "DELETE FROM tokenized_files WHERE session_id = ? AND path = ?",
1176            params![session_id, path],
1177        )?;
1178        let mut stmt = conn.prepare("INSERT INTO tokenized_files (session_id, path, file_hash, raw_tokens, cleaned_tokens, bytes_captured, truncated, embedding_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id")?;
1179        let id: i64 = stmt.query_row(
1180            params![
1181                session_id,
1182                path,
1183                file_hash,
1184                raw_tokens as i64,
1185                cleaned_tokens as i64,
1186                bytes_captured as i64,
1187                truncated,
1188                embedding_id
1189            ],
1190            |row| row.get(0),
1191        )?;
1192        Ok(id)
1193    }
1194
1195    pub fn get_tokenized_file(
1196        &self,
1197        session_id: &str,
1198        path: &str,
1199    ) -> Result<Option<TokenizedFileRecord>> {
1200        let conn = self.conn();
1201        let mut stmt = conn.prepare("SELECT id, session_id, path, file_hash, raw_tokens, cleaned_tokens, bytes_captured, truncated, embedding_id, CAST(updated_at AS TEXT) FROM tokenized_files WHERE session_id = ? AND path = ? LIMIT 1")?;
1202        let mut rows = stmt.query(params![session_id, path])?;
1203        if let Some(row) = rows.next()? {
1204            let record = TokenizedFileRecord::from_row(row)?;
1205            Ok(Some(record))
1206        } else {
1207            Ok(None)
1208        }
1209    }
1210
1211    pub fn list_tokenized_files(&self, session_id: &str) -> Result<Vec<TokenizedFileRecord>> {
1212        let conn = self.conn();
1213        let mut stmt = conn.prepare("SELECT id, session_id, path, file_hash, raw_tokens, cleaned_tokens, bytes_captured, truncated, embedding_id, CAST(updated_at AS TEXT) FROM tokenized_files WHERE session_id = ? ORDER BY path")?;
1214        let mut rows = stmt.query(params![session_id])?;
1215        let mut out = Vec::new();
1216        while let Some(row) = rows.next()? {
1217            out.push(TokenizedFileRecord::from_row(row)?);
1218        }
1219        Ok(out)
1220    }
1221
1222    // ========== Mesh Message Persistence ==========
1223
1224    /// Store a mesh message in the database
1225    pub fn mesh_message_store(
1226        &self,
1227        message_id: &str,
1228        source_instance: &str,
1229        target_instance: Option<&str>,
1230        message_type: &str,
1231        payload: &JsonValue,
1232        status: &str,
1233    ) -> Result<i64> {
1234        let conn = self.conn();
1235        let payload_json = serde_json::to_string(payload)?;
1236        conn.execute(
1237            "INSERT INTO mesh_messages (message_id, source_instance, target_instance, message_type, payload, status) VALUES (?, ?, ?, ?, ?, ?)",
1238            params![message_id, source_instance, target_instance, message_type, payload_json, status],
1239        )?;
1240        // Get the last inserted ID
1241        let id: i64 = conn.query_row("SELECT last_insert_rowid()", params![], |row| row.get(0))?;
1242        Ok(id)
1243    }
1244
1245    /// Check if a message with this ID already exists (for duplicate detection)
1246    pub fn mesh_message_exists(&self, message_id: &str) -> Result<bool> {
1247        let conn = self.conn();
1248        let count: i64 = conn.query_row(
1249            "SELECT COUNT(*) FROM mesh_messages WHERE message_id = ?",
1250            params![message_id],
1251            |row| row.get(0),
1252        )?;
1253        Ok(count > 0)
1254    }
1255
1256    /// Update message status (e.g., delivered, failed)
1257    pub fn mesh_message_update_status(&self, message_id: i64, status: &str) -> Result<()> {
1258        let conn = self.conn();
1259        conn.execute(
1260            "UPDATE mesh_messages SET status = ?, delivered_at = CURRENT_TIMESTAMP WHERE id = ?",
1261            params![status, message_id],
1262        )?;
1263        Ok(())
1264    }
1265
1266    /// Get pending messages for a target instance
1267    pub fn mesh_message_get_pending(
1268        &self,
1269        target_instance: &str,
1270    ) -> Result<Vec<MeshMessageRecord>> {
1271        let conn = self.conn();
1272        let mut stmt = conn.prepare(
1273            "SELECT id, source_instance, target_instance, message_type, payload, status, CAST(created_at AS TEXT), CAST(delivered_at AS TEXT)
1274             FROM mesh_messages
1275             WHERE (target_instance = ? OR target_instance IS NULL) AND status = 'pending'
1276             ORDER BY created_at",
1277        )?;
1278        let mut rows = stmt.query(params![target_instance])?;
1279        let mut out = Vec::new();
1280        while let Some(row) = rows.next()? {
1281            out.push(MeshMessageRecord::from_row(row)?);
1282        }
1283        Ok(out)
1284    }
1285
1286    /// Get message history for analytics
1287    pub fn mesh_message_get_history(
1288        &self,
1289        instance_id: Option<&str>,
1290        limit: usize,
1291    ) -> Result<Vec<MeshMessageRecord>> {
1292        let conn = self.conn();
1293        let query = if instance_id.is_some() {
1294            format!(
1295                "SELECT id, source_instance, target_instance, message_type, payload, status, CAST(created_at AS TEXT), CAST(delivered_at AS TEXT)
1296                 FROM mesh_messages
1297                 WHERE source_instance = ? OR target_instance = ?
1298                 ORDER BY created_at DESC LIMIT {}",
1299                limit
1300            )
1301        } else {
1302            format!(
1303                "SELECT id, source_instance, target_instance, message_type, payload, status, CAST(created_at AS TEXT), CAST(delivered_at AS TEXT)
1304                 FROM mesh_messages
1305                 ORDER BY created_at DESC LIMIT {}",
1306                limit
1307            )
1308        };
1309
1310        let mut stmt = conn.prepare(&query)?;
1311        let mut rows = if let Some(inst) = instance_id {
1312            stmt.query(params![inst, inst])?
1313        } else {
1314            stmt.query(params![])?
1315        };
1316
1317        let mut out = Vec::new();
1318        while let Some(row) = rows.next()? {
1319            out.push(MeshMessageRecord::from_row(row)?);
1320        }
1321        Ok(out)
1322    }
1323
1324    // ===== Graph Synchronization Methods =====
1325
1326    /// Append an entry to the graph changelog
1327    pub fn graph_changelog_append(
1328        &self,
1329        session_id: &str,
1330        instance_id: &str,
1331        entity_type: &str,
1332        entity_id: i64,
1333        operation: &str,
1334        vector_clock: &str,
1335        data: Option<&str>,
1336    ) -> Result<i64> {
1337        let conn = self.conn();
1338        conn.execute(
1339            "INSERT INTO graph_changelog (session_id, instance_id, entity_type, entity_id, operation, vector_clock, data)
1340             VALUES (?, ?, ?, ?, ?, ?, ?)",
1341            params![session_id, instance_id, entity_type, entity_id, operation, vector_clock, data],
1342        )?;
1343        let id: i64 = conn.query_row("SELECT last_insert_rowid()", params![], |row| row.get(0))?;
1344        Ok(id)
1345    }
1346
1347    /// Get changelog entries since a given timestamp for a session
1348    pub fn graph_changelog_get_since(
1349        &self,
1350        session_id: &str,
1351        since_timestamp: &str,
1352    ) -> Result<Vec<ChangelogEntry>> {
1353        let conn = self.conn();
1354        let mut stmt = conn.prepare(
1355            "SELECT id, session_id, instance_id, entity_type, entity_id, operation, vector_clock, data, CAST(created_at AS TEXT)
1356             FROM graph_changelog
1357             WHERE session_id = ? AND created_at > ?
1358             ORDER BY created_at ASC",
1359        )?;
1360        let mut rows = stmt.query(params![session_id, since_timestamp])?;
1361        let mut entries = Vec::new();
1362        while let Some(row) = rows.next()? {
1363            entries.push(ChangelogEntry::from_row(row)?);
1364        }
1365        Ok(entries)
1366    }
1367
1368    /// Prune old changelog entries (keep last N days)
1369    pub fn graph_changelog_prune(&self, days_to_keep: i64) -> Result<usize> {
1370        let conn = self.conn();
1371        let cutoff = chrono::Utc::now() - chrono::Duration::days(days_to_keep);
1372        let cutoff_str = cutoff.to_rfc3339();
1373        let deleted = conn.execute(
1374            "DELETE FROM graph_changelog WHERE created_at < ?",
1375            params![cutoff_str],
1376        )?;
1377        Ok(deleted)
1378    }
1379
1380    /// Get the vector clock for an instance/session/graph combination
1381    pub fn graph_sync_state_get(
1382        &self,
1383        instance_id: &str,
1384        session_id: &str,
1385        graph_name: &str,
1386    ) -> Result<Option<String>> {
1387        let conn = self.conn();
1388        let result: Result<String, _> = conn.query_row(
1389            "SELECT vector_clock FROM graph_sync_state WHERE instance_id = ? AND session_id = ? AND graph_name = ?",
1390            params![instance_id, session_id, graph_name],
1391            |row| row.get(0),
1392        );
1393        match result {
1394            Ok(vc) => Ok(Some(vc)),
1395            Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
1396            Err(e) => Err(e.into()),
1397        }
1398    }
1399
1400    /// Update the vector clock for an instance/session/graph combination
1401    pub fn graph_sync_state_update(
1402        &self,
1403        instance_id: &str,
1404        session_id: &str,
1405        graph_name: &str,
1406        vector_clock: &str,
1407    ) -> Result<()> {
1408        let conn = self.conn();
1409        // Upsert pattern
1410        conn.execute("BEGIN TRANSACTION", params![])?;
1411        conn.execute(
1412            "DELETE FROM graph_sync_state WHERE instance_id = ? AND session_id = ? AND graph_name = ?",
1413            params![instance_id, session_id, graph_name],
1414        )?;
1415        conn.execute(
1416            "INSERT INTO graph_sync_state (instance_id, session_id, graph_name, vector_clock) VALUES (?, ?, ?, ?)",
1417            params![instance_id, session_id, graph_name, vector_clock],
1418        )?;
1419        conn.execute("COMMIT", params![])?;
1420        Ok(())
1421    }
1422
1423    /// Enable or disable sync for a graph
1424    pub fn graph_set_sync_enabled(
1425        &self,
1426        session_id: &str,
1427        graph_name: &str,
1428        enabled: bool,
1429    ) -> Result<()> {
1430        let conn = self.conn();
1431        conn.execute(
1432            "UPDATE graph_metadata SET sync_enabled = ? WHERE session_id = ? AND graph_name = ?",
1433            params![enabled, session_id, graph_name],
1434        )?;
1435        Ok(())
1436    }
1437
1438    /// Check if sync is enabled for a graph
1439    pub fn graph_get_sync_enabled(&self, session_id: &str, graph_name: &str) -> Result<bool> {
1440        let conn = self.conn();
1441        let result: Result<bool, _> = conn.query_row(
1442            "SELECT sync_enabled FROM graph_metadata WHERE session_id = ? AND graph_name = ?",
1443            params![session_id, graph_name],
1444            |row| row.get(0),
1445        );
1446        match result {
1447            Ok(enabled) => Ok(enabled),
1448            Err(duckdb::Error::QueryReturnedNoRows) => Ok(false),
1449            Err(e) => Err(e.into()),
1450        }
1451    }
1452
1453    /// List all graphs for a session
1454    pub fn graph_list(&self, session_id: &str) -> Result<Vec<String>> {
1455        let conn = self.conn();
1456        let mut stmt = conn.prepare(
1457            "SELECT DISTINCT graph_name FROM graph_metadata WHERE session_id = ?
1458             UNION
1459             SELECT DISTINCT 'default' as graph_name
1460             FROM graph_nodes WHERE session_id = ?
1461             ORDER BY graph_name",
1462        )?;
1463
1464        let mut graphs = Vec::new();
1465        let mut rows = stmt.query(params![session_id, session_id])?;
1466        while let Some(row) = rows.next()? {
1467            let graph_name: String = row.get(0)?;
1468            graphs.push(graph_name);
1469        }
1470
1471        // Always include "default" if we have any nodes
1472        if graphs.is_empty() {
1473            let node_count: i64 = conn.query_row(
1474                "SELECT COUNT(*) FROM graph_nodes WHERE session_id = ?",
1475                params![session_id],
1476                |row| row.get(0),
1477            )?;
1478            if node_count > 0 {
1479                graphs.push("default".to_string());
1480            }
1481        }
1482
1483        Ok(graphs)
1484    }
1485
1486    /// Get a node with its sync metadata
1487    pub fn graph_get_node_with_sync(&self, node_id: i64) -> Result<Option<SyncedNodeRecord>> {
1488        let conn = self.conn();
1489        let result: Result<SyncedNodeRecord, _> = conn.query_row(
1490            "SELECT id, session_id, node_type, label, properties, embedding_id,
1491                    CAST(created_at AS TEXT), CAST(updated_at AS TEXT),
1492                    COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
1493             FROM graph_nodes WHERE id = ?",
1494            params![node_id],
1495            SyncedNodeRecord::from_row,
1496        );
1497        match result {
1498            Ok(node) => Ok(Some(node)),
1499            Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
1500            Err(e) => Err(e.into()),
1501        }
1502    }
1503
1504    /// Get all synced nodes for a session with optional filters
1505    pub fn graph_list_nodes_with_sync(
1506        &self,
1507        session_id: &str,
1508        sync_enabled_only: bool,
1509        include_deleted: bool,
1510    ) -> Result<Vec<SyncedNodeRecord>> {
1511        let conn = self.conn();
1512        let mut query = String::from(
1513            "SELECT id, session_id, node_type, label, properties, embedding_id,
1514                    CAST(created_at AS TEXT), CAST(updated_at AS TEXT),
1515                    COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
1516             FROM graph_nodes WHERE session_id = ?",
1517        );
1518
1519        if sync_enabled_only {
1520            query.push_str(" AND sync_enabled = TRUE");
1521        }
1522        if !include_deleted {
1523            query.push_str(" AND is_deleted = FALSE");
1524        }
1525        query.push_str(" ORDER BY created_at ASC");
1526
1527        let mut stmt = conn.prepare(&query)?;
1528        let mut rows = stmt.query(params![session_id])?;
1529        let mut nodes = Vec::new();
1530        while let Some(row) = rows.next()? {
1531            nodes.push(SyncedNodeRecord::from_row(row)?);
1532        }
1533        Ok(nodes)
1534    }
1535
1536    /// Get an edge with its sync metadata
1537    pub fn graph_get_edge_with_sync(&self, edge_id: i64) -> Result<Option<SyncedEdgeRecord>> {
1538        let conn = self.conn();
1539        let result: Result<SyncedEdgeRecord, _> = conn.query_row(
1540            "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
1541                    CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT),
1542                    COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
1543             FROM graph_edges WHERE id = ?",
1544            params![edge_id],
1545            SyncedEdgeRecord::from_row,
1546        );
1547        match result {
1548            Ok(edge) => Ok(Some(edge)),
1549            Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
1550            Err(e) => Err(e.into()),
1551        }
1552    }
1553
1554    /// Get all synced edges for a session with optional filters
1555    pub fn graph_list_edges_with_sync(
1556        &self,
1557        session_id: &str,
1558        sync_enabled_only: bool,
1559        include_deleted: bool,
1560    ) -> Result<Vec<SyncedEdgeRecord>> {
1561        let conn = self.conn();
1562        let mut query = String::from(
1563            "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
1564                    CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT),
1565                    COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
1566             FROM graph_edges WHERE session_id = ?",
1567        );
1568
1569        if sync_enabled_only {
1570            query.push_str(" AND sync_enabled = TRUE");
1571        }
1572        if !include_deleted {
1573            query.push_str(" AND is_deleted = FALSE");
1574        }
1575        query.push_str(" ORDER BY created_at ASC");
1576
1577        let mut stmt = conn.prepare(&query)?;
1578        let mut rows = stmt.query(params![session_id])?;
1579        let mut edges = Vec::new();
1580        while let Some(row) = rows.next()? {
1581            edges.push(SyncedEdgeRecord::from_row(row)?);
1582        }
1583        Ok(edges)
1584    }
1585
1586    /// Update a node's sync metadata
1587    pub fn graph_update_node_sync_metadata(
1588        &self,
1589        node_id: i64,
1590        vector_clock: &str,
1591        last_modified_by: &str,
1592        sync_enabled: bool,
1593    ) -> Result<()> {
1594        let conn = self.conn();
1595        conn.execute(
1596            "UPDATE graph_nodes SET vector_clock = ?, last_modified_by = ?, sync_enabled = ?, updated_at = CURRENT_TIMESTAMP
1597             WHERE id = ?",
1598            params![vector_clock, last_modified_by, sync_enabled, node_id],
1599        )?;
1600        Ok(())
1601    }
1602
1603    /// Update an edge's sync metadata
1604    pub fn graph_update_edge_sync_metadata(
1605        &self,
1606        edge_id: i64,
1607        vector_clock: &str,
1608        last_modified_by: &str,
1609        sync_enabled: bool,
1610    ) -> Result<()> {
1611        let conn = self.conn();
1612        conn.execute(
1613            "UPDATE graph_edges SET vector_clock = ?, last_modified_by = ?, sync_enabled = ?
1614             WHERE id = ?",
1615            params![vector_clock, last_modified_by, sync_enabled, edge_id],
1616        )?;
1617        Ok(())
1618    }
1619
1620    /// Mark a node as deleted (tombstone pattern)
1621    pub fn graph_mark_node_deleted(
1622        &self,
1623        node_id: i64,
1624        vector_clock: &str,
1625        deleted_by: &str,
1626    ) -> Result<()> {
1627        let conn = self.conn();
1628        conn.execute(
1629            "UPDATE graph_nodes SET is_deleted = TRUE, vector_clock = ?, last_modified_by = ?, updated_at = CURRENT_TIMESTAMP
1630             WHERE id = ?",
1631            params![vector_clock, deleted_by, node_id],
1632        )?;
1633        Ok(())
1634    }
1635
1636    /// Mark an edge as deleted (tombstone pattern)
1637    pub fn graph_mark_edge_deleted(
1638        &self,
1639        edge_id: i64,
1640        vector_clock: &str,
1641        deleted_by: &str,
1642    ) -> Result<()> {
1643        let conn = self.conn();
1644        conn.execute(
1645            "UPDATE graph_edges SET is_deleted = TRUE, vector_clock = ?, last_modified_by = ?
1646             WHERE id = ?",
1647            params![vector_clock, deleted_by, edge_id],
1648        )?;
1649        Ok(())
1650    }
1651}
1652
1653#[derive(Debug, Clone)]
1654pub struct TokenizedFileRecord {
1655    pub id: i64,
1656    pub session_id: String,
1657    pub path: String,
1658    pub file_hash: String,
1659    pub raw_tokens: usize,
1660    pub cleaned_tokens: usize,
1661    pub bytes_captured: usize,
1662    pub truncated: bool,
1663    pub embedding_id: Option<i64>,
1664    pub updated_at: DateTime<Utc>,
1665}
1666
1667impl TokenizedFileRecord {
1668    fn from_row(row: &duckdb::Row) -> Result<Self> {
1669        let id: i64 = row.get(0)?;
1670        let session_id: String = row.get(1)?;
1671        let path: String = row.get(2)?;
1672        let file_hash: String = row.get(3)?;
1673        let raw_tokens: i64 = row.get(4)?;
1674        let cleaned_tokens: i64 = row.get(5)?;
1675        let bytes_captured: i64 = row.get(6)?;
1676        let truncated: bool = row.get(7)?;
1677        let embedding_id: Option<i64> = row.get(8)?;
1678        let updated_at: String = row.get(9)?;
1679
1680        Ok(Self {
1681            id,
1682            session_id,
1683            path,
1684            file_hash,
1685            raw_tokens: raw_tokens.max(0) as usize,
1686            cleaned_tokens: cleaned_tokens.max(0) as usize,
1687            bytes_captured: bytes_captured.max(0) as usize,
1688            truncated,
1689            embedding_id,
1690            updated_at: updated_at.parse().unwrap_or_else(|_| Utc::now()),
1691        })
1692    }
1693}
1694
1695#[derive(Debug, Clone)]
1696pub struct MeshMessageRecord {
1697    pub id: i64,
1698    pub source_instance: String,
1699    pub target_instance: Option<String>,
1700    pub message_type: String,
1701    pub payload: JsonValue,
1702    pub status: String,
1703    pub created_at: DateTime<Utc>,
1704    pub delivered_at: Option<DateTime<Utc>>,
1705}
1706
1707impl MeshMessageRecord {
1708    fn from_row(row: &duckdb::Row) -> Result<Self> {
1709        let id: i64 = row.get(0)?;
1710        let source_instance: String = row.get(1)?;
1711        let target_instance: Option<String> = row.get(2)?;
1712        let message_type: String = row.get(3)?;
1713        let payload_str: String = row.get(4)?;
1714        let payload: JsonValue = serde_json::from_str(&payload_str)?;
1715        let status: String = row.get(5)?;
1716        let created_at_str: String = row.get(6)?;
1717        let delivered_at_str: Option<String> = row.get(7)?;
1718
1719        Ok(MeshMessageRecord {
1720            id,
1721            source_instance,
1722            target_instance,
1723            message_type,
1724            payload,
1725            status,
1726            created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1727            delivered_at: delivered_at_str.and_then(|s| s.parse().ok()),
1728        })
1729    }
1730}
1731
1732// ===== Graph Sync Record Types =====
1733
1734#[derive(Debug, Clone)]
1735pub struct ChangelogEntry {
1736    pub id: i64,
1737    pub session_id: String,
1738    pub instance_id: String,
1739    pub entity_type: String,
1740    pub entity_id: i64,
1741    pub operation: String,
1742    pub vector_clock: String,
1743    pub data: Option<String>,
1744    pub created_at: DateTime<Utc>,
1745}
1746
1747impl ChangelogEntry {
1748    fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1749        let id: i64 = row.get(0)?;
1750        let session_id: String = row.get(1)?;
1751        let instance_id: String = row.get(2)?;
1752        let entity_type: String = row.get(3)?;
1753        let entity_id: i64 = row.get(4)?;
1754        let operation: String = row.get(5)?;
1755        let vector_clock: String = row.get(6)?;
1756        let data: Option<String> = row.get(7)?;
1757        let created_at_str: String = row.get(8)?;
1758
1759        Ok(ChangelogEntry {
1760            id,
1761            session_id,
1762            instance_id,
1763            entity_type,
1764            entity_id,
1765            operation,
1766            vector_clock,
1767            data,
1768            created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1769        })
1770    }
1771}
1772
1773#[derive(Debug, Clone)]
1774pub struct SyncedNodeRecord {
1775    pub id: i64,
1776    pub session_id: String,
1777    pub node_type: String,
1778    pub label: String,
1779    pub properties: serde_json::Value,
1780    pub embedding_id: Option<i64>,
1781    pub created_at: DateTime<Utc>,
1782    pub updated_at: DateTime<Utc>,
1783    pub vector_clock: String,
1784    pub last_modified_by: Option<String>,
1785    pub is_deleted: bool,
1786    pub sync_enabled: bool,
1787}
1788
1789impl SyncedNodeRecord {
1790    fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1791        let id: i64 = row.get(0)?;
1792        let session_id: String = row.get(1)?;
1793        let node_type: String = row.get(2)?;
1794        let label: String = row.get(3)?;
1795        let properties_str: String = row.get(4)?;
1796        let properties: serde_json::Value = serde_json::from_str(&properties_str).map_err(|e| {
1797            duckdb::Error::FromSqlConversionFailure(4, duckdb::types::Type::Text, Box::new(e))
1798        })?;
1799        let embedding_id: Option<i64> = row.get(5)?;
1800        let created_at_str: String = row.get(6)?;
1801        let updated_at_str: String = row.get(7)?;
1802        let vector_clock: String = row.get(8)?;
1803        let last_modified_by: Option<String> = row.get(9)?;
1804        let is_deleted: bool = row.get(10)?;
1805        let sync_enabled: bool = row.get(11)?;
1806
1807        Ok(SyncedNodeRecord {
1808            id,
1809            session_id,
1810            node_type,
1811            label,
1812            properties,
1813            embedding_id,
1814            created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1815            updated_at: updated_at_str.parse().unwrap_or_else(|_| Utc::now()),
1816            vector_clock,
1817            last_modified_by,
1818            is_deleted,
1819            sync_enabled,
1820        })
1821    }
1822}
1823
1824#[derive(Debug, Clone)]
1825pub struct SyncedEdgeRecord {
1826    pub id: i64,
1827    pub session_id: String,
1828    pub source_id: i64,
1829    pub target_id: i64,
1830    pub edge_type: String,
1831    pub predicate: Option<String>,
1832    pub properties: Option<serde_json::Value>,
1833    pub weight: f32,
1834    pub temporal_start: Option<DateTime<Utc>>,
1835    pub temporal_end: Option<DateTime<Utc>>,
1836    pub created_at: DateTime<Utc>,
1837    pub vector_clock: String,
1838    pub last_modified_by: Option<String>,
1839    pub is_deleted: bool,
1840    pub sync_enabled: bool,
1841}
1842
1843impl SyncedEdgeRecord {
1844    fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1845        let id: i64 = row.get(0)?;
1846        let session_id: String = row.get(1)?;
1847        let source_id: i64 = row.get(2)?;
1848        let target_id: i64 = row.get(3)?;
1849        let edge_type: String = row.get(4)?;
1850        let predicate: Option<String> = row.get(5)?;
1851        let properties_str: Option<String> = row.get(6)?;
1852        let properties: Option<serde_json::Value> = properties_str
1853            .as_ref()
1854            .and_then(|s| serde_json::from_str(s).ok());
1855        let weight: f32 = row.get(7)?;
1856        let temporal_start_str: Option<String> = row.get(8)?;
1857        let temporal_end_str: Option<String> = row.get(9)?;
1858        let created_at_str: String = row.get(10)?;
1859        let vector_clock: String = row.get(11)?;
1860        let last_modified_by: Option<String> = row.get(12)?;
1861        let is_deleted: bool = row.get(13)?;
1862        let sync_enabled: bool = row.get(14)?;
1863
1864        Ok(SyncedEdgeRecord {
1865            id,
1866            session_id,
1867            source_id,
1868            target_id,
1869            edge_type,
1870            predicate,
1871            properties,
1872            weight,
1873            temporal_start: temporal_start_str.and_then(|s| s.parse().ok()),
1874            temporal_end: temporal_end_str.and_then(|s| s.parse().ok()),
1875            created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1876            vector_clock,
1877            last_modified_by,
1878            is_deleted,
1879            sync_enabled,
1880        })
1881    }
1882}