1pub mod migrations;
2
3use anyhow::{Context, Result};
4use chrono::{DateTime, Utc};
5use directories::BaseDirs;
6use duckdb::{params, Connection};
7use serde_json::Value as JsonValue;
8use std::path::{Path, PathBuf};
9use std::sync::{Arc, Mutex};
10
11use crate::types::{
12 EdgeType, GraphEdge, GraphNode, GraphPath, MemoryVector, Message, MessageRole, NodeType,
13 PolicyEntry, TraversalDirection,
14};
15
16#[derive(Clone)]
17pub struct Persistence {
18 conn: Arc<Mutex<Connection>>,
19 instance_id: String,
20}
21
22impl Persistence {
23 pub fn new<P: AsRef<Path>>(db_path: P) -> Result<Self> {
25 Self::with_instance_id(db_path, generate_instance_id())
26 }
27
28 pub fn with_instance_id<P: AsRef<Path>>(db_path: P, instance_id: String) -> Result<Self> {
30 let db_path = expand_tilde(db_path.as_ref())?;
31 if let Some(dir) = db_path.parent() {
32 std::fs::create_dir_all(dir).context("creating DB directory")?;
33 }
34 let conn = Connection::open(&db_path).context("opening DuckDB")?;
35 migrations::run(&conn).context("running migrations")?;
36 Ok(Self {
37 conn: Arc::new(Mutex::new(conn)),
38 instance_id,
39 })
40 }
41
42 pub fn instance_id(&self) -> &str {
44 &self.instance_id
45 }
46
47 pub fn checkpoint(&self) -> Result<()> {
50 let conn = self.conn();
51 conn.execute_batch("CHECKPOINT;")
52 .context("checkpointing database")
53 }
54
55 pub fn new_default() -> Result<Self> {
57 let base = BaseDirs::new().context("base directories not available")?;
58 let path = base.home_dir().join(".agent_cli").join("agent_data.duckdb");
59 Self::new(path)
60 }
61
62 pub fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
65 self.conn
66 .lock()
67 .expect("database connection mutex poisoned")
68 }
69
70 pub fn insert_message(
73 &self,
74 session_id: &str,
75 role: MessageRole,
76 content: &str,
77 ) -> Result<i64> {
78 let conn = self.conn();
79 let mut stmt = conn.prepare(
80 "INSERT INTO messages (session_id, role, content) VALUES (?, ?, ?) RETURNING id",
81 )?;
82 let id: i64 = stmt.query_row(params![session_id, role.as_str(), content], |row| {
83 row.get(0)
84 })?;
85 Ok(id)
86 }
87
88 pub fn list_messages(&self, session_id: &str, limit: i64) -> Result<Vec<Message>> {
89 let conn = self.conn();
90 let mut stmt = conn.prepare("SELECT id, session_id, role, content, CAST(created_at AS TEXT) as created_at FROM messages WHERE session_id = ? ORDER BY id DESC LIMIT ?")?;
91 let mut rows = stmt.query(params![session_id, limit])?;
92 let mut out = Vec::new();
93 while let Some(row) = rows.next()? {
94 let id: i64 = row.get(0)?;
95 let sid: String = row.get(1)?;
96 let role: String = row.get(2)?;
97 let content: String = row.get(3)?;
98 let created_at: String = row.get(4)?; let created_at: DateTime<Utc> = created_at.parse().unwrap_or_else(|_| Utc::now());
100 out.push(Message {
101 id,
102 session_id: sid,
103 role: MessageRole::from_str(&role),
104 content,
105 created_at,
106 });
107 }
108 out.reverse();
109 Ok(out)
110 }
111
112 pub fn get_message(&self, message_id: i64) -> Result<Option<Message>> {
113 let conn = self.conn();
114 let mut stmt = conn.prepare("SELECT id, session_id, role, content, CAST(created_at AS TEXT) as created_at FROM messages WHERE id = ?")?;
115 let mut rows = stmt.query(params![message_id])?;
116 if let Some(row) = rows.next()? {
117 let id: i64 = row.get(0)?;
118 let sid: String = row.get(1)?;
119 let role: String = row.get(2)?;
120 let content: String = row.get(3)?;
121 let created_at: String = row.get(4)?;
122 let created_at: DateTime<Utc> = created_at.parse().unwrap_or_else(|_| Utc::now());
123 Ok(Some(Message {
124 id,
125 session_id: sid,
126 role: MessageRole::from_str(&role),
127 content,
128 created_at,
129 }))
130 } else {
131 Ok(None)
132 }
133 }
134
135 pub fn prune_messages(&self, session_id: &str, keep_latest: i64) -> Result<u64> {
137 let conn = self.conn();
138 let mut stmt = conn.prepare("DELETE FROM messages WHERE session_id = ? AND id NOT IN (SELECT id FROM messages WHERE session_id = ? ORDER BY id DESC LIMIT ?)")?;
139 let changed = stmt.execute(params![session_id, session_id, keep_latest])? as u64;
140 Ok(changed)
141 }
142
143 pub fn insert_memory_vector(
146 &self,
147 session_id: &str,
148 message_id: Option<i64>,
149 embedding: &[f32],
150 ) -> Result<i64> {
151 let conn = self.conn();
152 let embedding_json = serde_json::to_string(embedding)?;
153 let mut stmt = conn.prepare("INSERT INTO memory_vectors (session_id, message_id, embedding) VALUES (?, ?, ?) RETURNING id")?;
154 let id: i64 = stmt.query_row(params![session_id, message_id, embedding_json], |row| {
155 row.get(0)
156 })?;
157 Ok(id)
158 }
159
160 pub fn recall_top_k(
161 &self,
162 session_id: &str,
163 query_embedding: &[f32],
164 k: usize,
165 ) -> Result<Vec<(MemoryVector, f32)>> {
166 let conn = self.conn();
167 let mut stmt = conn.prepare("SELECT id, session_id, message_id, embedding, CAST(created_at AS TEXT) as created_at FROM memory_vectors WHERE session_id = ?")?;
168 let mut rows = stmt.query(params![session_id])?;
169 let mut scored: Vec<(MemoryVector, f32)> = Vec::new();
170 while let Some(row) = rows.next()? {
171 let id: i64 = row.get(0)?;
172 let sid: String = row.get(1)?;
173 let message_id: Option<i64> = row.get(2)?;
174 let embedding_text: String = row.get(3)?;
175 let created_at: String = row.get(4)?;
176 let created_at: DateTime<Utc> = created_at.parse().unwrap_or_else(|_| Utc::now());
177 let embedding: Vec<f32> = serde_json::from_str(&embedding_text).unwrap_or_default();
178 let score = cosine_similarity(query_embedding, &embedding);
179 scored.push((
180 MemoryVector {
181 id,
182 session_id: sid,
183 message_id,
184 embedding,
185 created_at,
186 },
187 score,
188 ));
189 }
190 scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
191 scored.truncate(k);
192 Ok(scored)
193 }
194
195 pub fn list_sessions(&self) -> Result<Vec<String>> {
197 let conn = self.conn();
198 let mut stmt = conn.prepare(
199 "SELECT session_id, MAX(created_at) as last FROM messages GROUP BY session_id ORDER BY last DESC"
200 )?;
201 let mut rows = stmt.query([])?;
202 let mut out = Vec::new();
203 while let Some(row) = rows.next()? {
204 let sid: String = row.get(0)?;
205 out.push(sid);
206 }
207 Ok(out)
208 }
209
210 pub fn log_tool(
213 &self,
214 session_id: &str,
215 agent_name: &str,
216 run_id: &str,
217 tool_name: &str,
218 arguments: &JsonValue,
219 result: &JsonValue,
220 success: bool,
221 error: Option<&str>,
222 ) -> Result<i64> {
223 let conn = self.conn();
224 let mut stmt = conn.prepare("INSERT INTO tool_log (session_id, agent, run_id, tool_name, arguments, result, success, error) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id")?;
225 let id: i64 = stmt.query_row(
226 params![
227 session_id,
228 agent_name,
229 run_id,
230 tool_name,
231 arguments.to_string(),
232 result.to_string(),
233 success,
234 error.unwrap_or("")
235 ],
236 |row| row.get(0),
237 )?;
238 Ok(id)
239 }
240
241 pub fn policy_upsert(&self, key: &str, value: &JsonValue) -> Result<()> {
244 let conn = self.conn();
245 conn.execute_batch("BEGIN TRANSACTION;")?;
247 {
248 let mut del = conn.prepare("DELETE FROM policy_cache WHERE key = ?")?;
249 let _ = del.execute(params![key])?;
250 let mut ins = conn.prepare("INSERT INTO policy_cache (key, value, updated_at) VALUES (?, ?, CURRENT_TIMESTAMP)")?;
251 let _ = ins.execute(params![key, value.to_string()])?;
252 }
253 conn.execute_batch("COMMIT;")?;
254 Ok(())
255 }
256
257 pub fn policy_get(&self, key: &str) -> Result<Option<PolicyEntry>> {
258 let conn = self.conn();
259 let mut stmt = conn.prepare("SELECT key, value, CAST(updated_at AS TEXT) as updated_at FROM policy_cache WHERE key = ?")?;
260 let mut rows = stmt.query(params![key])?;
261 if let Some(row) = rows.next()? {
262 let key: String = row.get(0)?;
263 let value_text: String = row.get(1)?;
264 let updated_at: String = row.get(2)?;
265 let updated_at: DateTime<Utc> = updated_at.parse().unwrap_or_else(|_| Utc::now());
266 let value: JsonValue = serde_json::from_str(&value_text).unwrap_or(JsonValue::Null);
267 Ok(Some(PolicyEntry {
268 key,
269 value,
270 updated_at,
271 }))
272 } else {
273 Ok(None)
274 }
275 }
276}
277
278fn generate_instance_id() -> String {
279 let hostname = hostname::get()
280 .ok()
281 .and_then(|h| h.into_string().ok())
282 .unwrap_or_else(|| "unknown".to_string());
283 let uuid = uuid::Uuid::new_v7(uuid::Timestamp::now(uuid::NoContext));
284 format!("{}-{}", hostname, uuid)
285}
286
287fn expand_tilde(path: &Path) -> Result<PathBuf> {
288 let path_str = path.to_string_lossy();
289 if path_str == "~" {
290 let base = BaseDirs::new().context("base directories not available")?;
291 Ok(base.home_dir().to_path_buf())
292 } else if let Some(stripped) = path_str.strip_prefix("~/") {
293 let base = BaseDirs::new().context("base directories not available")?;
294 Ok(base.home_dir().join(stripped))
295 } else {
296 Ok(path.to_path_buf())
297 }
298}
299
300#[cfg(test)]
301mod tests {
302 use super::*;
303 use std::path::Path;
304
305 #[test]
306 fn expands_home_directory_prefix() {
307 let base = BaseDirs::new().expect("home directory available");
308 let expected = base.home_dir().join("demo.db");
309 let result = expand_tilde(Path::new("~/demo.db")).expect("path expansion succeeds");
310 assert_eq!(result, expected);
311 }
312
313 #[test]
314 fn leaves_regular_paths_unchanged() {
315 let input = Path::new("relative/path.db");
316 let result = expand_tilde(input).expect("path expansion succeeds");
317 assert_eq!(result, input);
318 }
319}
320
321fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
322 if a.is_empty() || b.is_empty() || a.len() != b.len() {
323 return 0.0;
324 }
325 let mut dot = 0.0f32;
326 let mut na = 0.0f32;
327 let mut nb = 0.0f32;
328 for i in 0..a.len() {
329 dot += a[i] * b[i];
330 na += a[i] * a[i];
331 nb += b[i] * b[i];
332 }
333 if na == 0.0 || nb == 0.0 {
334 return 0.0;
335 }
336 dot / (na.sqrt() * nb.sqrt())
337}
338
339impl Persistence {
342 pub fn insert_graph_node(
345 &self,
346 session_id: &str,
347 node_type: NodeType,
348 label: &str,
349 properties: &JsonValue,
350 embedding_id: Option<i64>,
351 ) -> Result<i64> {
352 use crate::sync::VectorClock;
353
354 let sync_enabled = self
356 .graph_get_sync_enabled(session_id, "default")
357 .unwrap_or(false);
358
359 let mut vector_clock = VectorClock::new();
361 vector_clock.increment(&self.instance_id);
362 let vc_json = vector_clock.to_json()?;
363
364 let conn = self.conn();
365
366 let mut stmt = conn.prepare(
368 "INSERT INTO graph_nodes (session_id, node_type, label, properties, embedding_id,
369 vector_clock, last_modified_by, sync_enabled)
370 VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id",
371 )?;
372 let id: i64 = stmt.query_row(
373 params![
374 session_id,
375 node_type.as_str(),
376 label,
377 properties.to_string(),
378 embedding_id,
379 vc_json,
380 self.instance_id,
381 sync_enabled,
382 ],
383 |row| row.get(0),
384 )?;
385
386 if sync_enabled {
388 let node_data = serde_json::json!({
389 "id": id,
390 "session_id": session_id,
391 "node_type": node_type.as_str(),
392 "label": label,
393 "properties": properties,
394 "embedding_id": embedding_id,
395 });
396
397 self.graph_changelog_append(
398 session_id,
399 &self.instance_id,
400 "node",
401 id,
402 "create",
403 &vc_json,
404 Some(&node_data.to_string()),
405 )?;
406 }
407
408 Ok(id)
409 }
410
411 pub fn get_graph_node(&self, node_id: i64) -> Result<Option<GraphNode>> {
412 let conn = self.conn();
413 let mut stmt = conn.prepare(
414 "SELECT id, session_id, node_type, label, properties, embedding_id,
415 CAST(created_at AS TEXT), CAST(updated_at AS TEXT)
416 FROM graph_nodes WHERE id = ?",
417 )?;
418 let mut rows = stmt.query(params![node_id])?;
419 if let Some(row) = rows.next()? {
420 Ok(Some(Self::row_to_graph_node(row)?))
421 } else {
422 Ok(None)
423 }
424 }
425
426 pub fn list_graph_nodes(
427 &self,
428 session_id: &str,
429 node_type: Option<NodeType>,
430 limit: Option<i64>,
431 ) -> Result<Vec<GraphNode>> {
432 let conn = self.conn();
433
434 let nodes = if let Some(nt) = node_type {
435 let mut stmt = conn.prepare(
436 "SELECT id, session_id, node_type, label, properties, embedding_id,
437 CAST(created_at AS TEXT), CAST(updated_at AS TEXT)
438 FROM graph_nodes WHERE session_id = ? AND node_type = ?
439 ORDER BY id DESC LIMIT ?",
440 )?;
441 let query = stmt.query(params![session_id, nt.as_str(), limit.unwrap_or(100)])?;
442 Self::collect_graph_nodes(query)?
443 } else {
444 let mut stmt = conn.prepare(
445 "SELECT id, session_id, node_type, label, properties, embedding_id,
446 CAST(created_at AS TEXT), CAST(updated_at AS TEXT)
447 FROM graph_nodes WHERE session_id = ?
448 ORDER BY id DESC LIMIT ?",
449 )?;
450 let query = stmt.query(params![session_id, limit.unwrap_or(100)])?;
451 Self::collect_graph_nodes(query)?
452 };
453
454 Ok(nodes)
455 }
456
457 pub fn count_graph_nodes(&self, session_id: &str) -> Result<i64> {
458 let conn = self.conn();
459 let mut stmt = conn.prepare("SELECT COUNT(*) FROM graph_nodes WHERE session_id = ?")?;
460 let count: i64 = stmt.query_row(params![session_id], |row| row.get(0))?;
461 Ok(count)
462 }
463
464 pub fn update_graph_node(&self, node_id: i64, properties: &JsonValue) -> Result<()> {
465 use crate::sync::VectorClock;
466
467 let conn = self.conn();
468
469 let mut stmt = conn.prepare(
471 "SELECT session_id, node_type, label, vector_clock, sync_enabled
472 FROM graph_nodes WHERE id = ?",
473 )?;
474
475 let (session_id, node_type, label, current_vc_json, sync_enabled): (
476 String,
477 String,
478 String,
479 Option<String>,
480 bool,
481 ) = stmt.query_row(params![node_id], |row| {
482 Ok((
483 row.get(0)?,
484 row.get(1)?,
485 row.get(2)?,
486 row.get(3)?,
487 row.get(4).unwrap_or(false),
488 ))
489 })?;
490
491 let mut vector_clock = if let Some(vc_json) = current_vc_json {
493 VectorClock::from_json(&vc_json).unwrap_or_else(|_| VectorClock::new())
494 } else {
495 VectorClock::new()
496 };
497 vector_clock.increment(&self.instance_id);
498 let vc_json = vector_clock.to_json()?;
499
500 conn.execute(
502 "UPDATE graph_nodes
503 SET properties = ?,
504 vector_clock = ?,
505 last_modified_by = ?,
506 updated_at = CURRENT_TIMESTAMP
507 WHERE id = ?",
508 params![properties.to_string(), vc_json, self.instance_id, node_id],
509 )?;
510
511 if sync_enabled {
513 let node_data = serde_json::json!({
514 "id": node_id,
515 "session_id": session_id,
516 "node_type": node_type,
517 "label": label,
518 "properties": properties,
519 });
520
521 self.graph_changelog_append(
522 &session_id,
523 &self.instance_id,
524 "node",
525 node_id,
526 "update",
527 &vc_json,
528 Some(&node_data.to_string()),
529 )?;
530 }
531
532 Ok(())
533 }
534
535 pub fn delete_graph_node(&self, node_id: i64) -> Result<()> {
536 use crate::sync::VectorClock;
537
538 let conn = self.conn();
539
540 let mut stmt = conn.prepare(
542 "SELECT session_id, node_type, label, properties, vector_clock, sync_enabled
543 FROM graph_nodes WHERE id = ?",
544 )?;
545
546 let result = stmt.query_row(params![node_id], |row| {
547 Ok((
548 row.get::<_, String>(0)?,
549 row.get::<_, String>(1)?,
550 row.get::<_, String>(2)?,
551 row.get::<_, String>(3)?,
552 row.get::<_, Option<String>>(4)?,
553 row.get::<_, bool>(5).unwrap_or(false),
554 ))
555 });
556
557 if let Ok((session_id, node_type, label, properties, current_vc_json, sync_enabled)) =
559 result
560 {
561 if sync_enabled {
562 let mut vector_clock = if let Some(vc_json) = current_vc_json {
564 VectorClock::from_json(&vc_json).unwrap_or_else(|_| VectorClock::new())
565 } else {
566 VectorClock::new()
567 };
568 vector_clock.increment(&self.instance_id);
569 let vc_json = vector_clock.to_json()?;
570
571 conn.execute(
573 "INSERT INTO graph_tombstones
574 (session_id, entity_type, entity_id, deleted_by, vector_clock)
575 VALUES (?, ?, ?, ?, ?)",
576 params![session_id, "node", node_id, self.instance_id, vc_json],
577 )?;
578
579 let node_data = serde_json::json!({
581 "id": node_id,
582 "session_id": session_id,
583 "node_type": node_type,
584 "label": label,
585 "properties": properties,
586 });
587
588 self.graph_changelog_append(
589 &session_id,
590 &self.instance_id,
591 "node",
592 node_id,
593 "delete",
594 &vc_json,
595 Some(&node_data.to_string()),
596 )?;
597 }
598 }
599
600 conn.execute("DELETE FROM graph_nodes WHERE id = ?", params![node_id])?;
602 Ok(())
603 }
604
605 pub fn insert_graph_edge(
608 &self,
609 session_id: &str,
610 source_id: i64,
611 target_id: i64,
612 edge_type: EdgeType,
613 predicate: Option<&str>,
614 properties: Option<&JsonValue>,
615 weight: f32,
616 ) -> Result<i64> {
617 use crate::sync::VectorClock;
618
619 let sync_enabled = self
621 .graph_get_sync_enabled(session_id, "default")
622 .unwrap_or(false);
623
624 let mut vector_clock = VectorClock::new();
626 vector_clock.increment(&self.instance_id);
627 let vc_json = vector_clock.to_json()?;
628
629 let conn = self.conn();
630
631 let mut stmt = conn.prepare(
633 "INSERT INTO graph_edges (session_id, source_id, target_id, edge_type, predicate, properties, weight,
634 vector_clock, last_modified_by, sync_enabled)
635 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING id",
636 )?;
637 let props_str = properties.map(|p| p.to_string());
638 let id: i64 = stmt.query_row(
639 params![
640 session_id,
641 source_id,
642 target_id,
643 edge_type.as_str(),
644 predicate,
645 props_str,
646 weight,
647 vc_json,
648 self.instance_id,
649 sync_enabled,
650 ],
651 |row| row.get(0),
652 )?;
653
654 if sync_enabled {
656 let edge_data = serde_json::json!({
657 "id": id,
658 "session_id": session_id,
659 "source_id": source_id,
660 "target_id": target_id,
661 "edge_type": edge_type.as_str(),
662 "predicate": predicate,
663 "properties": properties,
664 "weight": weight,
665 });
666
667 self.graph_changelog_append(
668 session_id,
669 &self.instance_id,
670 "edge",
671 id,
672 "insert",
673 &vc_json,
674 Some(&edge_data.to_string()),
675 )?;
676 }
677
678 Ok(id)
679 }
680
681 pub fn get_graph_edge(&self, edge_id: i64) -> Result<Option<GraphEdge>> {
682 let conn = self.conn();
683 let mut stmt = conn.prepare(
684 "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
685 CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
686 FROM graph_edges WHERE id = ?",
687 )?;
688 let mut rows = stmt.query(params![edge_id])?;
689 if let Some(row) = rows.next()? {
690 Ok(Some(Self::row_to_graph_edge(row)?))
691 } else {
692 Ok(None)
693 }
694 }
695
696 pub fn list_graph_edges(
697 &self,
698 session_id: &str,
699 source_id: Option<i64>,
700 target_id: Option<i64>,
701 ) -> Result<Vec<GraphEdge>> {
702 let conn = self.conn();
703
704 let edges = match (source_id, target_id) {
705 (Some(src), Some(tgt)) => {
706 let mut stmt = conn.prepare(
707 "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
708 CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
709 FROM graph_edges WHERE session_id = ? AND source_id = ? AND target_id = ?",
710 )?;
711 let query = stmt.query(params![session_id, src, tgt])?;
712 Self::collect_graph_edges(query)?
713 }
714 (Some(src), None) => {
715 let mut stmt = conn.prepare(
716 "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
717 CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
718 FROM graph_edges WHERE session_id = ? AND source_id = ?",
719 )?;
720 let query = stmt.query(params![session_id, src])?;
721 Self::collect_graph_edges(query)?
722 }
723 (None, Some(tgt)) => {
724 let mut stmt = conn.prepare(
725 "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
726 CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
727 FROM graph_edges WHERE session_id = ? AND target_id = ?",
728 )?;
729 let query = stmt.query(params![session_id, tgt])?;
730 Self::collect_graph_edges(query)?
731 }
732 (None, None) => {
733 let mut stmt = conn.prepare(
734 "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
735 CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT)
736 FROM graph_edges WHERE session_id = ?",
737 )?;
738 let query = stmt.query(params![session_id])?;
739 Self::collect_graph_edges(query)?
740 }
741 };
742
743 Ok(edges)
744 }
745
746 pub fn count_graph_edges(&self, session_id: &str) -> Result<i64> {
747 let conn = self.conn();
748 let mut stmt = conn.prepare("SELECT COUNT(*) FROM graph_edges WHERE session_id = ?")?;
749 let count: i64 = stmt.query_row(params![session_id], |row| row.get(0))?;
750 Ok(count)
751 }
752
753 pub fn delete_graph_edge(&self, edge_id: i64) -> Result<()> {
754 use crate::sync::VectorClock;
755
756 let conn = self.conn();
757
758 let mut stmt = conn.prepare(
760 "SELECT session_id, source_id, target_id, edge_type, predicate, properties, weight,
761 vector_clock, sync_enabled
762 FROM graph_edges WHERE id = ?",
763 )?;
764
765 let result = stmt.query_row(params![edge_id], |row| {
766 Ok((
767 row.get::<_, String>(0)?,
768 row.get::<_, i64>(1)?,
769 row.get::<_, i64>(2)?,
770 row.get::<_, String>(3)?,
771 row.get::<_, Option<String>>(4)?,
772 row.get::<_, Option<String>>(5)?,
773 row.get::<_, f32>(6)?,
774 row.get::<_, Option<String>>(7)?,
775 row.get::<_, bool>(8).unwrap_or(false),
776 ))
777 });
778
779 if let Ok((
781 session_id,
782 source_id,
783 target_id,
784 edge_type,
785 predicate,
786 properties,
787 weight,
788 current_vc_json,
789 sync_enabled,
790 )) = result
791 {
792 if sync_enabled {
793 let mut vector_clock = if let Some(vc_json) = current_vc_json {
795 VectorClock::from_json(&vc_json).unwrap_or_else(|_| VectorClock::new())
796 } else {
797 VectorClock::new()
798 };
799 vector_clock.increment(&self.instance_id);
800 let vc_json = vector_clock.to_json()?;
801
802 conn.execute(
804 "INSERT INTO graph_tombstones
805 (session_id, entity_type, entity_id, deleted_by, vector_clock)
806 VALUES (?, ?, ?, ?, ?)",
807 params![session_id, "edge", edge_id, self.instance_id, vc_json],
808 )?;
809
810 let edge_data = serde_json::json!({
812 "id": edge_id,
813 "session_id": session_id,
814 "source_id": source_id,
815 "target_id": target_id,
816 "edge_type": edge_type,
817 "predicate": predicate,
818 "properties": properties,
819 "weight": weight,
820 });
821
822 self.graph_changelog_append(
823 &session_id,
824 &self.instance_id,
825 "edge",
826 edge_id,
827 "delete",
828 &vc_json,
829 Some(&edge_data.to_string()),
830 )?;
831 }
832 }
833
834 conn.execute("DELETE FROM graph_edges WHERE id = ?", params![edge_id])?;
836 Ok(())
837 }
838
839 pub fn find_shortest_path(
842 &self,
843 session_id: &str,
844 source_id: i64,
845 target_id: i64,
846 max_hops: Option<usize>,
847 ) -> Result<Option<GraphPath>> {
848 let max_depth = max_hops.unwrap_or(10);
851
852 let mut visited = std::collections::HashSet::new();
853 let mut queue = std::collections::VecDeque::new();
854 let mut parent_map = std::collections::HashMap::new();
855
856 queue.push_back((source_id, 0));
857 visited.insert(source_id);
858
859 while let Some((current_id, depth)) = queue.pop_front() {
860 if current_id == target_id {
861 let path = self.reconstruct_path(&parent_map, source_id, target_id)?;
863 return Ok(Some(path));
864 }
865
866 if depth >= max_depth {
867 continue;
868 }
869
870 let edges = self.list_graph_edges(session_id, Some(current_id), None)?;
872 for edge in edges {
873 let target = edge.target_id;
874 if !visited.contains(&target) {
875 visited.insert(target);
876 parent_map.insert(target, (current_id, edge));
877 queue.push_back((target, depth + 1));
878 }
879 }
880 }
881
882 Ok(None)
883 }
884
885 pub fn traverse_neighbors(
886 &self,
887 session_id: &str,
888 node_id: i64,
889 direction: TraversalDirection,
890 depth: usize,
891 ) -> Result<Vec<GraphNode>> {
892 if depth == 0 {
893 return Ok(vec![]);
894 }
895
896 let mut visited = std::collections::HashSet::new();
897 let mut result = Vec::new();
898 let mut queue = std::collections::VecDeque::new();
899
900 queue.push_back((node_id, 0));
901 visited.insert(node_id);
902
903 while let Some((current_id, current_depth)) = queue.pop_front() {
904 if current_depth > 0 {
905 if let Some(node) = self.get_graph_node(current_id)? {
906 result.push(node);
907 }
908 }
909
910 if current_depth >= depth {
911 continue;
912 }
913
914 let edges = match direction {
916 TraversalDirection::Outgoing => {
917 self.list_graph_edges(session_id, Some(current_id), None)?
918 }
919 TraversalDirection::Incoming => {
920 self.list_graph_edges(session_id, None, Some(current_id))?
921 }
922 TraversalDirection::Both => {
923 let mut out_edges =
924 self.list_graph_edges(session_id, Some(current_id), None)?;
925 let in_edges = self.list_graph_edges(session_id, None, Some(current_id))?;
926 out_edges.extend(in_edges);
927 out_edges
928 }
929 };
930
931 for edge in edges {
932 let next_id = match direction {
933 TraversalDirection::Outgoing => edge.target_id,
934 TraversalDirection::Incoming => edge.source_id,
935 TraversalDirection::Both => {
936 if edge.source_id == current_id {
937 edge.target_id
938 } else {
939 edge.source_id
940 }
941 }
942 };
943
944 if !visited.contains(&next_id) {
945 visited.insert(next_id);
946 queue.push_back((next_id, current_depth + 1));
947 }
948 }
949 }
950
951 Ok(result)
952 }
953
954 fn row_to_graph_node(row: &duckdb::Row) -> Result<GraphNode> {
957 let id: i64 = row.get(0)?;
958 let session_id: String = row.get(1)?;
959 let node_type: String = row.get(2)?;
960 let label: String = row.get(3)?;
961 let properties: String = row.get(4)?;
962 let embedding_id: Option<i64> = row.get(5)?;
963 let created_at: String = row.get(6)?;
964 let updated_at: String = row.get(7)?;
965
966 Ok(GraphNode {
967 id,
968 session_id,
969 node_type: NodeType::from_str(&node_type),
970 label,
971 properties: serde_json::from_str(&properties).unwrap_or(JsonValue::Null),
972 embedding_id,
973 created_at: created_at.parse().unwrap_or_else(|_| Utc::now()),
974 updated_at: updated_at.parse().unwrap_or_else(|_| Utc::now()),
975 })
976 }
977
978 fn row_to_graph_edge(row: &duckdb::Row) -> Result<GraphEdge> {
979 let id: i64 = row.get(0)?;
980 let session_id: String = row.get(1)?;
981 let source_id: i64 = row.get(2)?;
982 let target_id: i64 = row.get(3)?;
983 let edge_type: String = row.get(4)?;
984 let predicate: Option<String> = row.get(5)?;
985 let properties: Option<String> = row.get(6)?;
986 let weight: f32 = row.get(7)?;
987 let temporal_start: Option<String> = row.get(8)?;
988 let temporal_end: Option<String> = row.get(9)?;
989 let created_at: String = row.get(10)?;
990
991 Ok(GraphEdge {
992 id,
993 session_id,
994 source_id,
995 target_id,
996 edge_type: EdgeType::from_str(&edge_type),
997 predicate,
998 properties: properties.and_then(|p| serde_json::from_str(&p).ok()),
999 weight,
1000 temporal_start: temporal_start.and_then(|s| s.parse().ok()),
1001 temporal_end: temporal_end.and_then(|s| s.parse().ok()),
1002 created_at: created_at.parse().unwrap_or_else(|_| Utc::now()),
1003 })
1004 }
1005
1006 fn collect_graph_nodes(mut rows: duckdb::Rows) -> Result<Vec<GraphNode>> {
1007 let mut nodes = Vec::new();
1008 while let Some(row) = rows.next()? {
1009 nodes.push(Self::row_to_graph_node(row)?);
1010 }
1011 Ok(nodes)
1012 }
1013
1014 fn collect_graph_edges(mut rows: duckdb::Rows) -> Result<Vec<GraphEdge>> {
1015 let mut edges = Vec::new();
1016 while let Some(row) = rows.next()? {
1017 edges.push(Self::row_to_graph_edge(row)?);
1018 }
1019 Ok(edges)
1020 }
1021
1022 fn reconstruct_path(
1023 &self,
1024 parent_map: &std::collections::HashMap<i64, (i64, GraphEdge)>,
1025 source_id: i64,
1026 target_id: i64,
1027 ) -> Result<GraphPath> {
1028 let mut path_edges = Vec::new();
1029 let mut path_nodes = Vec::new();
1030 let mut current = target_id;
1031 let mut total_weight = 0.0;
1032
1033 while current != source_id {
1035 if let Some((parent, edge)) = parent_map.get(¤t) {
1036 path_edges.push(edge.clone());
1037 total_weight += edge.weight;
1038 current = *parent;
1039 } else {
1040 break;
1041 }
1042 }
1043
1044 path_edges.reverse();
1046
1047 if let Some(node) = self.get_graph_node(source_id)? {
1049 path_nodes.push(node);
1050 }
1051 for edge in &path_edges {
1052 if let Some(node) = self.get_graph_node(edge.target_id)? {
1053 path_nodes.push(node);
1054 }
1055 }
1056
1057 Ok(GraphPath {
1058 length: path_edges.len(),
1059 weight: total_weight,
1060 nodes: path_nodes,
1061 edges: path_edges,
1062 })
1063 }
1064
1065 pub fn insert_transcription(
1068 &self,
1069 session_id: &str,
1070 chunk_id: i64,
1071 text: &str,
1072 timestamp: chrono::DateTime<Utc>,
1073 ) -> Result<i64> {
1074 let conn = self.conn();
1075 let mut stmt = conn.prepare(
1076 "INSERT INTO transcriptions (session_id, chunk_id, text, timestamp, embedding_id) VALUES (?, ?, ?, ?, NULL) RETURNING id",
1077 )?;
1078 let id: i64 = stmt.query_row(
1079 params![session_id, chunk_id, text, timestamp.to_rfc3339()],
1080 |row| row.get(0),
1081 )?;
1082 Ok(id)
1083 }
1084
1085 pub fn update_transcription_embedding(
1086 &self,
1087 transcription_id: i64,
1088 embedding_id: i64,
1089 ) -> Result<()> {
1090 let conn = self.conn();
1091 conn.execute(
1092 "UPDATE transcriptions SET embedding_id = ? WHERE id = ?",
1093 params![embedding_id, transcription_id],
1094 )?;
1095 Ok(())
1096 }
1097
1098 pub fn list_transcriptions(
1099 &self,
1100 session_id: &str,
1101 limit: Option<i64>,
1102 ) -> Result<Vec<(i64, i64, String, DateTime<Utc>)>> {
1103 let conn = self.conn();
1104 let query = if let Some(lim) = limit {
1105 format!(
1106 "SELECT id, chunk_id, text, CAST(timestamp AS TEXT) FROM transcriptions WHERE session_id = ? ORDER BY chunk_id ASC LIMIT {}",
1107 lim
1108 )
1109 } else {
1110 "SELECT id, chunk_id, text, CAST(timestamp AS TEXT) FROM transcriptions WHERE session_id = ? ORDER BY chunk_id ASC".to_string()
1111 };
1112
1113 let mut stmt = conn.prepare(&query)?;
1114 let mut rows = stmt.query(params![session_id])?;
1115 let mut out = Vec::new();
1116
1117 while let Some(row) = rows.next()? {
1118 let id: i64 = row.get(0)?;
1119 let chunk_id: i64 = row.get(1)?;
1120 let text: String = row.get(2)?;
1121 let timestamp_str: String = row.get(3)?;
1122 let timestamp: DateTime<Utc> = timestamp_str.parse().unwrap_or_else(|_| Utc::now());
1123 out.push((id, chunk_id, text, timestamp));
1124 }
1125
1126 Ok(out)
1127 }
1128
1129 pub fn get_full_transcription(&self, session_id: &str) -> Result<String> {
1130 let transcriptions = self.list_transcriptions(session_id, None)?;
1131 Ok(transcriptions
1132 .into_iter()
1133 .map(|(_, _, text, _)| text)
1134 .collect::<Vec<_>>()
1135 .join(" "))
1136 }
1137
1138 pub fn delete_transcriptions(&self, session_id: &str) -> Result<()> {
1139 let conn = self.conn();
1140 conn.execute(
1141 "DELETE FROM transcriptions WHERE session_id = ?",
1142 params![session_id],
1143 )?;
1144 Ok(())
1145 }
1146
1147 pub fn get_transcription_by_embedding(&self, embedding_id: i64) -> Result<Option<String>> {
1148 let conn = self.conn();
1149 let mut stmt =
1150 conn.prepare("SELECT text FROM transcriptions WHERE embedding_id = ? LIMIT 1")?;
1151 let result: Result<String, _> = stmt.query_row(params![embedding_id], |row| row.get(0));
1152 match result {
1153 Ok(text) => Ok(Some(text)),
1154 Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
1155 Err(e) => Err(e.into()),
1156 }
1157 }
1158
1159 pub fn upsert_tokenized_file(
1163 &self,
1164 session_id: &str,
1165 path: &str,
1166 file_hash: &str,
1167 raw_tokens: usize,
1168 cleaned_tokens: usize,
1169 bytes_captured: usize,
1170 truncated: bool,
1171 embedding_id: Option<i64>,
1172 ) -> Result<i64> {
1173 let conn = self.conn();
1174 conn.execute(
1175 "DELETE FROM tokenized_files WHERE session_id = ? AND path = ?",
1176 params![session_id, path],
1177 )?;
1178 let mut stmt = conn.prepare("INSERT INTO tokenized_files (session_id, path, file_hash, raw_tokens, cleaned_tokens, bytes_captured, truncated, embedding_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id")?;
1179 let id: i64 = stmt.query_row(
1180 params![
1181 session_id,
1182 path,
1183 file_hash,
1184 raw_tokens as i64,
1185 cleaned_tokens as i64,
1186 bytes_captured as i64,
1187 truncated,
1188 embedding_id
1189 ],
1190 |row| row.get(0),
1191 )?;
1192 Ok(id)
1193 }
1194
1195 pub fn get_tokenized_file(
1196 &self,
1197 session_id: &str,
1198 path: &str,
1199 ) -> Result<Option<TokenizedFileRecord>> {
1200 let conn = self.conn();
1201 let mut stmt = conn.prepare("SELECT id, session_id, path, file_hash, raw_tokens, cleaned_tokens, bytes_captured, truncated, embedding_id, CAST(updated_at AS TEXT) FROM tokenized_files WHERE session_id = ? AND path = ? LIMIT 1")?;
1202 let mut rows = stmt.query(params![session_id, path])?;
1203 if let Some(row) = rows.next()? {
1204 let record = TokenizedFileRecord::from_row(row)?;
1205 Ok(Some(record))
1206 } else {
1207 Ok(None)
1208 }
1209 }
1210
1211 pub fn list_tokenized_files(&self, session_id: &str) -> Result<Vec<TokenizedFileRecord>> {
1212 let conn = self.conn();
1213 let mut stmt = conn.prepare("SELECT id, session_id, path, file_hash, raw_tokens, cleaned_tokens, bytes_captured, truncated, embedding_id, CAST(updated_at AS TEXT) FROM tokenized_files WHERE session_id = ? ORDER BY path")?;
1214 let mut rows = stmt.query(params![session_id])?;
1215 let mut out = Vec::new();
1216 while let Some(row) = rows.next()? {
1217 out.push(TokenizedFileRecord::from_row(row)?);
1218 }
1219 Ok(out)
1220 }
1221
1222 pub fn mesh_message_store(
1226 &self,
1227 message_id: &str,
1228 source_instance: &str,
1229 target_instance: Option<&str>,
1230 message_type: &str,
1231 payload: &JsonValue,
1232 status: &str,
1233 ) -> Result<i64> {
1234 let conn = self.conn();
1235 let payload_json = serde_json::to_string(payload)?;
1236 conn.execute(
1237 "INSERT INTO mesh_messages (message_id, source_instance, target_instance, message_type, payload, status) VALUES (?, ?, ?, ?, ?, ?)",
1238 params![message_id, source_instance, target_instance, message_type, payload_json, status],
1239 )?;
1240 let id: i64 = conn.query_row("SELECT last_insert_rowid()", params![], |row| row.get(0))?;
1242 Ok(id)
1243 }
1244
1245 pub fn mesh_message_exists(&self, message_id: &str) -> Result<bool> {
1247 let conn = self.conn();
1248 let count: i64 = conn.query_row(
1249 "SELECT COUNT(*) FROM mesh_messages WHERE message_id = ?",
1250 params![message_id],
1251 |row| row.get(0),
1252 )?;
1253 Ok(count > 0)
1254 }
1255
1256 pub fn mesh_message_update_status(&self, message_id: i64, status: &str) -> Result<()> {
1258 let conn = self.conn();
1259 conn.execute(
1260 "UPDATE mesh_messages SET status = ?, delivered_at = CURRENT_TIMESTAMP WHERE id = ?",
1261 params![status, message_id],
1262 )?;
1263 Ok(())
1264 }
1265
1266 pub fn mesh_message_get_pending(
1268 &self,
1269 target_instance: &str,
1270 ) -> Result<Vec<MeshMessageRecord>> {
1271 let conn = self.conn();
1272 let mut stmt = conn.prepare(
1273 "SELECT id, source_instance, target_instance, message_type, payload, status, CAST(created_at AS TEXT), CAST(delivered_at AS TEXT)
1274 FROM mesh_messages
1275 WHERE (target_instance = ? OR target_instance IS NULL) AND status = 'pending'
1276 ORDER BY created_at",
1277 )?;
1278 let mut rows = stmt.query(params![target_instance])?;
1279 let mut out = Vec::new();
1280 while let Some(row) = rows.next()? {
1281 out.push(MeshMessageRecord::from_row(row)?);
1282 }
1283 Ok(out)
1284 }
1285
1286 pub fn mesh_message_get_history(
1288 &self,
1289 instance_id: Option<&str>,
1290 limit: usize,
1291 ) -> Result<Vec<MeshMessageRecord>> {
1292 let conn = self.conn();
1293 let query = if instance_id.is_some() {
1294 format!(
1295 "SELECT id, source_instance, target_instance, message_type, payload, status, CAST(created_at AS TEXT), CAST(delivered_at AS TEXT)
1296 FROM mesh_messages
1297 WHERE source_instance = ? OR target_instance = ?
1298 ORDER BY created_at DESC LIMIT {}",
1299 limit
1300 )
1301 } else {
1302 format!(
1303 "SELECT id, source_instance, target_instance, message_type, payload, status, CAST(created_at AS TEXT), CAST(delivered_at AS TEXT)
1304 FROM mesh_messages
1305 ORDER BY created_at DESC LIMIT {}",
1306 limit
1307 )
1308 };
1309
1310 let mut stmt = conn.prepare(&query)?;
1311 let mut rows = if let Some(inst) = instance_id {
1312 stmt.query(params![inst, inst])?
1313 } else {
1314 stmt.query(params![])?
1315 };
1316
1317 let mut out = Vec::new();
1318 while let Some(row) = rows.next()? {
1319 out.push(MeshMessageRecord::from_row(row)?);
1320 }
1321 Ok(out)
1322 }
1323
1324 pub fn graph_changelog_append(
1328 &self,
1329 session_id: &str,
1330 instance_id: &str,
1331 entity_type: &str,
1332 entity_id: i64,
1333 operation: &str,
1334 vector_clock: &str,
1335 data: Option<&str>,
1336 ) -> Result<i64> {
1337 let conn = self.conn();
1338 conn.execute(
1339 "INSERT INTO graph_changelog (session_id, instance_id, entity_type, entity_id, operation, vector_clock, data)
1340 VALUES (?, ?, ?, ?, ?, ?, ?)",
1341 params![session_id, instance_id, entity_type, entity_id, operation, vector_clock, data],
1342 )?;
1343 let id: i64 = conn.query_row("SELECT last_insert_rowid()", params![], |row| row.get(0))?;
1344 Ok(id)
1345 }
1346
1347 pub fn graph_changelog_get_since(
1349 &self,
1350 session_id: &str,
1351 since_timestamp: &str,
1352 ) -> Result<Vec<ChangelogEntry>> {
1353 let conn = self.conn();
1354 let mut stmt = conn.prepare(
1355 "SELECT id, session_id, instance_id, entity_type, entity_id, operation, vector_clock, data, CAST(created_at AS TEXT)
1356 FROM graph_changelog
1357 WHERE session_id = ? AND created_at > ?
1358 ORDER BY created_at ASC",
1359 )?;
1360 let mut rows = stmt.query(params![session_id, since_timestamp])?;
1361 let mut entries = Vec::new();
1362 while let Some(row) = rows.next()? {
1363 entries.push(ChangelogEntry::from_row(row)?);
1364 }
1365 Ok(entries)
1366 }
1367
1368 pub fn graph_changelog_prune(&self, days_to_keep: i64) -> Result<usize> {
1370 let conn = self.conn();
1371 let cutoff = chrono::Utc::now() - chrono::Duration::days(days_to_keep);
1372 let cutoff_str = cutoff.to_rfc3339();
1373 let deleted = conn.execute(
1374 "DELETE FROM graph_changelog WHERE created_at < ?",
1375 params![cutoff_str],
1376 )?;
1377 Ok(deleted)
1378 }
1379
1380 pub fn graph_sync_state_get(
1382 &self,
1383 instance_id: &str,
1384 session_id: &str,
1385 graph_name: &str,
1386 ) -> Result<Option<String>> {
1387 let conn = self.conn();
1388 let result: Result<String, _> = conn.query_row(
1389 "SELECT vector_clock FROM graph_sync_state WHERE instance_id = ? AND session_id = ? AND graph_name = ?",
1390 params![instance_id, session_id, graph_name],
1391 |row| row.get(0),
1392 );
1393 match result {
1394 Ok(vc) => Ok(Some(vc)),
1395 Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
1396 Err(e) => Err(e.into()),
1397 }
1398 }
1399
1400 pub fn graph_sync_state_update(
1402 &self,
1403 instance_id: &str,
1404 session_id: &str,
1405 graph_name: &str,
1406 vector_clock: &str,
1407 ) -> Result<()> {
1408 let conn = self.conn();
1409 conn.execute("BEGIN TRANSACTION", params![])?;
1411 conn.execute(
1412 "DELETE FROM graph_sync_state WHERE instance_id = ? AND session_id = ? AND graph_name = ?",
1413 params![instance_id, session_id, graph_name],
1414 )?;
1415 conn.execute(
1416 "INSERT INTO graph_sync_state (instance_id, session_id, graph_name, vector_clock) VALUES (?, ?, ?, ?)",
1417 params![instance_id, session_id, graph_name, vector_clock],
1418 )?;
1419 conn.execute("COMMIT", params![])?;
1420 Ok(())
1421 }
1422
1423 pub fn graph_set_sync_enabled(
1425 &self,
1426 session_id: &str,
1427 graph_name: &str,
1428 enabled: bool,
1429 ) -> Result<()> {
1430 let conn = self.conn();
1431 conn.execute(
1432 "UPDATE graph_metadata SET sync_enabled = ? WHERE session_id = ? AND graph_name = ?",
1433 params![enabled, session_id, graph_name],
1434 )?;
1435 Ok(())
1436 }
1437
1438 pub fn graph_get_sync_enabled(&self, session_id: &str, graph_name: &str) -> Result<bool> {
1440 let conn = self.conn();
1441 let result: Result<bool, _> = conn.query_row(
1442 "SELECT sync_enabled FROM graph_metadata WHERE session_id = ? AND graph_name = ?",
1443 params![session_id, graph_name],
1444 |row| row.get(0),
1445 );
1446 match result {
1447 Ok(enabled) => Ok(enabled),
1448 Err(duckdb::Error::QueryReturnedNoRows) => Ok(false),
1449 Err(e) => Err(e.into()),
1450 }
1451 }
1452
1453 pub fn graph_list(&self, session_id: &str) -> Result<Vec<String>> {
1455 let conn = self.conn();
1456 let mut stmt = conn.prepare(
1457 "SELECT DISTINCT graph_name FROM graph_metadata WHERE session_id = ?
1458 UNION
1459 SELECT DISTINCT 'default' as graph_name
1460 FROM graph_nodes WHERE session_id = ?
1461 ORDER BY graph_name",
1462 )?;
1463
1464 let mut graphs = Vec::new();
1465 let mut rows = stmt.query(params![session_id, session_id])?;
1466 while let Some(row) = rows.next()? {
1467 let graph_name: String = row.get(0)?;
1468 graphs.push(graph_name);
1469 }
1470
1471 if graphs.is_empty() {
1473 let node_count: i64 = conn.query_row(
1474 "SELECT COUNT(*) FROM graph_nodes WHERE session_id = ?",
1475 params![session_id],
1476 |row| row.get(0),
1477 )?;
1478 if node_count > 0 {
1479 graphs.push("default".to_string());
1480 }
1481 }
1482
1483 Ok(graphs)
1484 }
1485
1486 pub fn graph_get_node_with_sync(&self, node_id: i64) -> Result<Option<SyncedNodeRecord>> {
1488 let conn = self.conn();
1489 let result: Result<SyncedNodeRecord, _> = conn.query_row(
1490 "SELECT id, session_id, node_type, label, properties, embedding_id,
1491 CAST(created_at AS TEXT), CAST(updated_at AS TEXT),
1492 COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
1493 FROM graph_nodes WHERE id = ?",
1494 params![node_id],
1495 SyncedNodeRecord::from_row,
1496 );
1497 match result {
1498 Ok(node) => Ok(Some(node)),
1499 Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
1500 Err(e) => Err(e.into()),
1501 }
1502 }
1503
1504 pub fn graph_list_nodes_with_sync(
1506 &self,
1507 session_id: &str,
1508 sync_enabled_only: bool,
1509 include_deleted: bool,
1510 ) -> Result<Vec<SyncedNodeRecord>> {
1511 let conn = self.conn();
1512 let mut query = String::from(
1513 "SELECT id, session_id, node_type, label, properties, embedding_id,
1514 CAST(created_at AS TEXT), CAST(updated_at AS TEXT),
1515 COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
1516 FROM graph_nodes WHERE session_id = ?",
1517 );
1518
1519 if sync_enabled_only {
1520 query.push_str(" AND sync_enabled = TRUE");
1521 }
1522 if !include_deleted {
1523 query.push_str(" AND is_deleted = FALSE");
1524 }
1525 query.push_str(" ORDER BY created_at ASC");
1526
1527 let mut stmt = conn.prepare(&query)?;
1528 let mut rows = stmt.query(params![session_id])?;
1529 let mut nodes = Vec::new();
1530 while let Some(row) = rows.next()? {
1531 nodes.push(SyncedNodeRecord::from_row(row)?);
1532 }
1533 Ok(nodes)
1534 }
1535
1536 pub fn graph_get_edge_with_sync(&self, edge_id: i64) -> Result<Option<SyncedEdgeRecord>> {
1538 let conn = self.conn();
1539 let result: Result<SyncedEdgeRecord, _> = conn.query_row(
1540 "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
1541 CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT),
1542 COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
1543 FROM graph_edges WHERE id = ?",
1544 params![edge_id],
1545 SyncedEdgeRecord::from_row,
1546 );
1547 match result {
1548 Ok(edge) => Ok(Some(edge)),
1549 Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
1550 Err(e) => Err(e.into()),
1551 }
1552 }
1553
1554 pub fn graph_list_edges_with_sync(
1556 &self,
1557 session_id: &str,
1558 sync_enabled_only: bool,
1559 include_deleted: bool,
1560 ) -> Result<Vec<SyncedEdgeRecord>> {
1561 let conn = self.conn();
1562 let mut query = String::from(
1563 "SELECT id, session_id, source_id, target_id, edge_type, predicate, properties, weight,
1564 CAST(temporal_start AS TEXT), CAST(temporal_end AS TEXT), CAST(created_at AS TEXT),
1565 COALESCE(vector_clock, '{}'), last_modified_by, is_deleted, sync_enabled
1566 FROM graph_edges WHERE session_id = ?",
1567 );
1568
1569 if sync_enabled_only {
1570 query.push_str(" AND sync_enabled = TRUE");
1571 }
1572 if !include_deleted {
1573 query.push_str(" AND is_deleted = FALSE");
1574 }
1575 query.push_str(" ORDER BY created_at ASC");
1576
1577 let mut stmt = conn.prepare(&query)?;
1578 let mut rows = stmt.query(params![session_id])?;
1579 let mut edges = Vec::new();
1580 while let Some(row) = rows.next()? {
1581 edges.push(SyncedEdgeRecord::from_row(row)?);
1582 }
1583 Ok(edges)
1584 }
1585
1586 pub fn graph_update_node_sync_metadata(
1588 &self,
1589 node_id: i64,
1590 vector_clock: &str,
1591 last_modified_by: &str,
1592 sync_enabled: bool,
1593 ) -> Result<()> {
1594 let conn = self.conn();
1595 conn.execute(
1596 "UPDATE graph_nodes SET vector_clock = ?, last_modified_by = ?, sync_enabled = ?, updated_at = CURRENT_TIMESTAMP
1597 WHERE id = ?",
1598 params![vector_clock, last_modified_by, sync_enabled, node_id],
1599 )?;
1600 Ok(())
1601 }
1602
1603 pub fn graph_update_edge_sync_metadata(
1605 &self,
1606 edge_id: i64,
1607 vector_clock: &str,
1608 last_modified_by: &str,
1609 sync_enabled: bool,
1610 ) -> Result<()> {
1611 let conn = self.conn();
1612 conn.execute(
1613 "UPDATE graph_edges SET vector_clock = ?, last_modified_by = ?, sync_enabled = ?
1614 WHERE id = ?",
1615 params![vector_clock, last_modified_by, sync_enabled, edge_id],
1616 )?;
1617 Ok(())
1618 }
1619
1620 pub fn graph_mark_node_deleted(
1622 &self,
1623 node_id: i64,
1624 vector_clock: &str,
1625 deleted_by: &str,
1626 ) -> Result<()> {
1627 let conn = self.conn();
1628 conn.execute(
1629 "UPDATE graph_nodes SET is_deleted = TRUE, vector_clock = ?, last_modified_by = ?, updated_at = CURRENT_TIMESTAMP
1630 WHERE id = ?",
1631 params![vector_clock, deleted_by, node_id],
1632 )?;
1633 Ok(())
1634 }
1635
1636 pub fn graph_mark_edge_deleted(
1638 &self,
1639 edge_id: i64,
1640 vector_clock: &str,
1641 deleted_by: &str,
1642 ) -> Result<()> {
1643 let conn = self.conn();
1644 conn.execute(
1645 "UPDATE graph_edges SET is_deleted = TRUE, vector_clock = ?, last_modified_by = ?
1646 WHERE id = ?",
1647 params![vector_clock, deleted_by, edge_id],
1648 )?;
1649 Ok(())
1650 }
1651}
1652
1653#[derive(Debug, Clone)]
1654pub struct TokenizedFileRecord {
1655 pub id: i64,
1656 pub session_id: String,
1657 pub path: String,
1658 pub file_hash: String,
1659 pub raw_tokens: usize,
1660 pub cleaned_tokens: usize,
1661 pub bytes_captured: usize,
1662 pub truncated: bool,
1663 pub embedding_id: Option<i64>,
1664 pub updated_at: DateTime<Utc>,
1665}
1666
1667impl TokenizedFileRecord {
1668 fn from_row(row: &duckdb::Row) -> Result<Self> {
1669 let id: i64 = row.get(0)?;
1670 let session_id: String = row.get(1)?;
1671 let path: String = row.get(2)?;
1672 let file_hash: String = row.get(3)?;
1673 let raw_tokens: i64 = row.get(4)?;
1674 let cleaned_tokens: i64 = row.get(5)?;
1675 let bytes_captured: i64 = row.get(6)?;
1676 let truncated: bool = row.get(7)?;
1677 let embedding_id: Option<i64> = row.get(8)?;
1678 let updated_at: String = row.get(9)?;
1679
1680 Ok(Self {
1681 id,
1682 session_id,
1683 path,
1684 file_hash,
1685 raw_tokens: raw_tokens.max(0) as usize,
1686 cleaned_tokens: cleaned_tokens.max(0) as usize,
1687 bytes_captured: bytes_captured.max(0) as usize,
1688 truncated,
1689 embedding_id,
1690 updated_at: updated_at.parse().unwrap_or_else(|_| Utc::now()),
1691 })
1692 }
1693}
1694
1695#[derive(Debug, Clone)]
1696pub struct MeshMessageRecord {
1697 pub id: i64,
1698 pub source_instance: String,
1699 pub target_instance: Option<String>,
1700 pub message_type: String,
1701 pub payload: JsonValue,
1702 pub status: String,
1703 pub created_at: DateTime<Utc>,
1704 pub delivered_at: Option<DateTime<Utc>>,
1705}
1706
1707impl MeshMessageRecord {
1708 fn from_row(row: &duckdb::Row) -> Result<Self> {
1709 let id: i64 = row.get(0)?;
1710 let source_instance: String = row.get(1)?;
1711 let target_instance: Option<String> = row.get(2)?;
1712 let message_type: String = row.get(3)?;
1713 let payload_str: String = row.get(4)?;
1714 let payload: JsonValue = serde_json::from_str(&payload_str)?;
1715 let status: String = row.get(5)?;
1716 let created_at_str: String = row.get(6)?;
1717 let delivered_at_str: Option<String> = row.get(7)?;
1718
1719 Ok(MeshMessageRecord {
1720 id,
1721 source_instance,
1722 target_instance,
1723 message_type,
1724 payload,
1725 status,
1726 created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1727 delivered_at: delivered_at_str.and_then(|s| s.parse().ok()),
1728 })
1729 }
1730}
1731
1732#[derive(Debug, Clone)]
1735pub struct ChangelogEntry {
1736 pub id: i64,
1737 pub session_id: String,
1738 pub instance_id: String,
1739 pub entity_type: String,
1740 pub entity_id: i64,
1741 pub operation: String,
1742 pub vector_clock: String,
1743 pub data: Option<String>,
1744 pub created_at: DateTime<Utc>,
1745}
1746
1747impl ChangelogEntry {
1748 fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1749 let id: i64 = row.get(0)?;
1750 let session_id: String = row.get(1)?;
1751 let instance_id: String = row.get(2)?;
1752 let entity_type: String = row.get(3)?;
1753 let entity_id: i64 = row.get(4)?;
1754 let operation: String = row.get(5)?;
1755 let vector_clock: String = row.get(6)?;
1756 let data: Option<String> = row.get(7)?;
1757 let created_at_str: String = row.get(8)?;
1758
1759 Ok(ChangelogEntry {
1760 id,
1761 session_id,
1762 instance_id,
1763 entity_type,
1764 entity_id,
1765 operation,
1766 vector_clock,
1767 data,
1768 created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1769 })
1770 }
1771}
1772
1773#[derive(Debug, Clone)]
1774pub struct SyncedNodeRecord {
1775 pub id: i64,
1776 pub session_id: String,
1777 pub node_type: String,
1778 pub label: String,
1779 pub properties: serde_json::Value,
1780 pub embedding_id: Option<i64>,
1781 pub created_at: DateTime<Utc>,
1782 pub updated_at: DateTime<Utc>,
1783 pub vector_clock: String,
1784 pub last_modified_by: Option<String>,
1785 pub is_deleted: bool,
1786 pub sync_enabled: bool,
1787}
1788
1789impl SyncedNodeRecord {
1790 fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1791 let id: i64 = row.get(0)?;
1792 let session_id: String = row.get(1)?;
1793 let node_type: String = row.get(2)?;
1794 let label: String = row.get(3)?;
1795 let properties_str: String = row.get(4)?;
1796 let properties: serde_json::Value = serde_json::from_str(&properties_str).map_err(|e| {
1797 duckdb::Error::FromSqlConversionFailure(4, duckdb::types::Type::Text, Box::new(e))
1798 })?;
1799 let embedding_id: Option<i64> = row.get(5)?;
1800 let created_at_str: String = row.get(6)?;
1801 let updated_at_str: String = row.get(7)?;
1802 let vector_clock: String = row.get(8)?;
1803 let last_modified_by: Option<String> = row.get(9)?;
1804 let is_deleted: bool = row.get(10)?;
1805 let sync_enabled: bool = row.get(11)?;
1806
1807 Ok(SyncedNodeRecord {
1808 id,
1809 session_id,
1810 node_type,
1811 label,
1812 properties,
1813 embedding_id,
1814 created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1815 updated_at: updated_at_str.parse().unwrap_or_else(|_| Utc::now()),
1816 vector_clock,
1817 last_modified_by,
1818 is_deleted,
1819 sync_enabled,
1820 })
1821 }
1822}
1823
1824#[derive(Debug, Clone)]
1825pub struct SyncedEdgeRecord {
1826 pub id: i64,
1827 pub session_id: String,
1828 pub source_id: i64,
1829 pub target_id: i64,
1830 pub edge_type: String,
1831 pub predicate: Option<String>,
1832 pub properties: Option<serde_json::Value>,
1833 pub weight: f32,
1834 pub temporal_start: Option<DateTime<Utc>>,
1835 pub temporal_end: Option<DateTime<Utc>>,
1836 pub created_at: DateTime<Utc>,
1837 pub vector_clock: String,
1838 pub last_modified_by: Option<String>,
1839 pub is_deleted: bool,
1840 pub sync_enabled: bool,
1841}
1842
1843impl SyncedEdgeRecord {
1844 fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1845 let id: i64 = row.get(0)?;
1846 let session_id: String = row.get(1)?;
1847 let source_id: i64 = row.get(2)?;
1848 let target_id: i64 = row.get(3)?;
1849 let edge_type: String = row.get(4)?;
1850 let predicate: Option<String> = row.get(5)?;
1851 let properties_str: Option<String> = row.get(6)?;
1852 let properties: Option<serde_json::Value> = properties_str
1853 .as_ref()
1854 .and_then(|s| serde_json::from_str(s).ok());
1855 let weight: f32 = row.get(7)?;
1856 let temporal_start_str: Option<String> = row.get(8)?;
1857 let temporal_end_str: Option<String> = row.get(9)?;
1858 let created_at_str: String = row.get(10)?;
1859 let vector_clock: String = row.get(11)?;
1860 let last_modified_by: Option<String> = row.get(12)?;
1861 let is_deleted: bool = row.get(13)?;
1862 let sync_enabled: bool = row.get(14)?;
1863
1864 Ok(SyncedEdgeRecord {
1865 id,
1866 session_id,
1867 source_id,
1868 target_id,
1869 edge_type,
1870 predicate,
1871 properties,
1872 weight,
1873 temporal_start: temporal_start_str.and_then(|s| s.parse().ok()),
1874 temporal_end: temporal_end_str.and_then(|s| s.parse().ok()),
1875 created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1876 vector_clock,
1877 last_modified_by,
1878 is_deleted,
1879 sync_enabled,
1880 })
1881 }
1882}