1pub mod migrations;
2
3use anyhow::{Context, Result};
4use chrono::{DateTime, Utc};
5use directories::BaseDirs;
6use duckdb::{Connection, params};
7use serde_json::Value as JsonValue;
8use crate::spec_ai_knowledge_graph::KnowledgeGraphStore;
9use std::path::{Path, PathBuf};
10use std::sync::{Arc, Mutex};
11
12use crate::spec_ai_config::types::{
13 GraphEdge, GraphNode, GraphPath, MemoryVector, Message, MessageRole, PolicyEntry,
14};
15
16#[derive(Clone)]
17pub struct Persistence {
18 conn: Arc<Mutex<Connection>>,
19 instance_id: String,
20 graph_store: KnowledgeGraphStore,
21}
22
23impl Persistence {
24 pub fn new<P: AsRef<Path>>(db_path: P) -> Result<Self> {
26 Self::with_instance_id(db_path, generate_instance_id())
27 }
28
29 pub fn with_instance_id<P: AsRef<Path>>(db_path: P, instance_id: String) -> Result<Self> {
31 let db_path = expand_tilde(db_path.as_ref())?;
32 if let Some(dir) = db_path.parent() {
33 std::fs::create_dir_all(dir).context("creating DB directory")?;
34 }
35 let conn = Connection::open(&db_path).context("opening DuckDB")?;
36 migrations::run(&conn).context("running migrations")?;
37 let conn_arc = Arc::new(Mutex::new(conn));
38 let graph_store = KnowledgeGraphStore::new(conn_arc.clone(), instance_id.clone());
39 Ok(Self {
40 conn: conn_arc,
41 instance_id,
42 graph_store,
43 })
44 }
45
46 pub fn instance_id(&self) -> &str {
48 &self.instance_id
49 }
50
51 pub fn graph_store(&self) -> &KnowledgeGraphStore {
53 &self.graph_store
54 }
55
56 pub fn checkpoint(&self) -> Result<()> {
59 let conn = self.conn();
60 conn.execute_batch("CHECKPOINT;")
61 .context("checkpointing database")
62 }
63
64 pub fn new_default() -> Result<Self> {
66 let base = BaseDirs::new().context("base directories not available")?;
67 let path = base.home_dir().join(".agent_cli").join("agent_data.duckdb");
68 Self::new(path)
69 }
70
71 pub fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
74 self.conn
75 .lock()
76 .expect("database connection mutex poisoned")
77 }
78
79 pub fn insert_message(
82 &self,
83 session_id: &str,
84 role: MessageRole,
85 content: &str,
86 ) -> Result<i64> {
87 let conn = self.conn();
88 let mut stmt = conn.prepare(
89 "INSERT INTO messages (session_id, role, content) VALUES (?, ?, ?) RETURNING id",
90 )?;
91 let id: i64 = stmt.query_row(params![session_id, role.as_str(), content], |row| {
92 row.get(0)
93 })?;
94 Ok(id)
95 }
96
97 pub fn list_messages(&self, session_id: &str, limit: i64) -> Result<Vec<Message>> {
98 let conn = self.conn();
99 let mut stmt = conn.prepare("SELECT id, session_id, role, content, CAST(created_at AS TEXT) as created_at FROM messages WHERE session_id = ? ORDER BY id DESC LIMIT ?")?;
100 let mut rows = stmt.query(params![session_id, limit])?;
101 let mut out = Vec::new();
102 while let Some(row) = rows.next()? {
103 let id: i64 = row.get(0)?;
104 let sid: String = row.get(1)?;
105 let role: String = row.get(2)?;
106 let content: String = row.get(3)?;
107 let created_at: String = row.get(4)?; let created_at: DateTime<Utc> = created_at.parse().unwrap_or_else(|_| Utc::now());
109 out.push(Message {
110 id,
111 session_id: sid,
112 role: MessageRole::from_str(&role),
113 content,
114 created_at,
115 });
116 }
117 out.reverse();
118 Ok(out)
119 }
120
121 pub fn get_message(&self, message_id: i64) -> Result<Option<Message>> {
122 let conn = self.conn();
123 let mut stmt = conn.prepare("SELECT id, session_id, role, content, CAST(created_at AS TEXT) as created_at FROM messages WHERE id = ?")?;
124 let mut rows = stmt.query(params![message_id])?;
125 if let Some(row) = rows.next()? {
126 let id: i64 = row.get(0)?;
127 let sid: String = row.get(1)?;
128 let role: String = row.get(2)?;
129 let content: String = row.get(3)?;
130 let created_at: String = row.get(4)?;
131 let created_at: DateTime<Utc> = created_at.parse().unwrap_or_else(|_| Utc::now());
132 Ok(Some(Message {
133 id,
134 session_id: sid,
135 role: MessageRole::from_str(&role),
136 content,
137 created_at,
138 }))
139 } else {
140 Ok(None)
141 }
142 }
143
144 pub fn prune_messages(&self, session_id: &str, keep_latest: i64) -> Result<u64> {
146 let conn = self.conn();
147 let mut stmt = conn.prepare("DELETE FROM messages WHERE session_id = ? AND id NOT IN (SELECT id FROM messages WHERE session_id = ? ORDER BY id DESC LIMIT ?)")?;
148 let changed = stmt.execute(params![session_id, session_id, keep_latest])? as u64;
149 Ok(changed)
150 }
151
152 pub fn insert_memory_vector(
155 &self,
156 session_id: &str,
157 message_id: Option<i64>,
158 embedding: &[f32],
159 ) -> Result<i64> {
160 let conn = self.conn();
161 let embedding_json = serde_json::to_string(embedding)?;
162 let mut stmt = conn.prepare("INSERT INTO memory_vectors (session_id, message_id, embedding) VALUES (?, ?, ?) RETURNING id")?;
163 let id: i64 = stmt.query_row(params![session_id, message_id, embedding_json], |row| {
164 row.get(0)
165 })?;
166 Ok(id)
167 }
168
169 pub fn recall_top_k(
170 &self,
171 session_id: &str,
172 query_embedding: &[f32],
173 k: usize,
174 ) -> Result<Vec<(MemoryVector, f32)>> {
175 let conn = self.conn();
176 let mut stmt = conn.prepare("SELECT id, session_id, message_id, embedding, CAST(created_at AS TEXT) as created_at FROM memory_vectors WHERE session_id = ?")?;
177 let mut rows = stmt.query(params![session_id])?;
178 let mut scored: Vec<(MemoryVector, f32)> = Vec::new();
179 while let Some(row) = rows.next()? {
180 let id: i64 = row.get(0)?;
181 let sid: String = row.get(1)?;
182 let message_id: Option<i64> = row.get(2)?;
183 let embedding_text: String = row.get(3)?;
184 let created_at: String = row.get(4)?;
185 let created_at: DateTime<Utc> = created_at.parse().unwrap_or_else(|_| Utc::now());
186 let embedding: Vec<f32> = serde_json::from_str(&embedding_text).unwrap_or_default();
187 let score = cosine_similarity(query_embedding, &embedding);
188 scored.push((
189 MemoryVector {
190 id,
191 session_id: sid,
192 message_id,
193 embedding,
194 created_at,
195 },
196 score,
197 ));
198 }
199 scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
200 scored.truncate(k);
201 Ok(scored)
202 }
203
204 pub fn list_sessions(&self) -> Result<Vec<String>> {
206 let conn = self.conn();
207 let mut stmt = conn.prepare(
208 "SELECT session_id, MAX(created_at) as last FROM messages GROUP BY session_id ORDER BY last DESC"
209 )?;
210 let mut rows = stmt.query([])?;
211 let mut out = Vec::new();
212 while let Some(row) = rows.next()? {
213 let sid: String = row.get(0)?;
214 out.push(sid);
215 }
216 Ok(out)
217 }
218
219 #[allow(clippy::too_many_arguments)]
222 pub fn log_tool(
223 &self,
224 session_id: &str,
225 agent_name: &str,
226 run_id: &str,
227 tool_name: &str,
228 arguments: &JsonValue,
229 result: &JsonValue,
230 success: bool,
231 error: Option<&str>,
232 ) -> Result<i64> {
233 let conn = self.conn();
234 let mut stmt = conn.prepare("INSERT INTO tool_log (session_id, agent, run_id, tool_name, arguments, result, success, error) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id")?;
235 let id: i64 = stmt.query_row(
236 params![
237 session_id,
238 agent_name,
239 run_id,
240 tool_name,
241 arguments.to_string(),
242 result.to_string(),
243 success,
244 error.unwrap_or("")
245 ],
246 |row| row.get(0),
247 )?;
248 Ok(id)
249 }
250
251 pub fn policy_upsert(&self, key: &str, value: &JsonValue) -> Result<()> {
254 let conn = self.conn();
255 conn.execute_batch("BEGIN TRANSACTION;")?;
257 {
258 let mut del = conn.prepare("DELETE FROM policy_cache WHERE key = ?")?;
259 let _ = del.execute(params![key])?;
260 let mut ins = conn.prepare("INSERT INTO policy_cache (key, value, updated_at) VALUES (?, ?, CURRENT_TIMESTAMP)")?;
261 let _ = ins.execute(params![key, value.to_string()])?;
262 }
263 conn.execute_batch("COMMIT;")?;
264 Ok(())
265 }
266
267 pub fn policy_get(&self, key: &str) -> Result<Option<PolicyEntry>> {
268 let conn = self.conn();
269 let mut stmt = conn.prepare("SELECT key, value, CAST(updated_at AS TEXT) as updated_at FROM policy_cache WHERE key = ?")?;
270 let mut rows = stmt.query(params![key])?;
271 if let Some(row) = rows.next()? {
272 let key: String = row.get(0)?;
273 let value_text: String = row.get(1)?;
274 let updated_at: String = row.get(2)?;
275 let updated_at: DateTime<Utc> = updated_at.parse().unwrap_or_else(|_| Utc::now());
276 let value: JsonValue = serde_json::from_str(&value_text).unwrap_or(JsonValue::Null);
277 Ok(Some(PolicyEntry {
278 key,
279 value,
280 updated_at,
281 }))
282 } else {
283 Ok(None)
284 }
285 }
286}
287
288fn generate_instance_id() -> String {
289 let hostname = hostname::get()
290 .ok()
291 .and_then(|h| h.into_string().ok())
292 .unwrap_or_else(|| "unknown".to_string());
293 let uuid = uuid::Uuid::new_v7(uuid::Timestamp::now(uuid::NoContext));
294 format!("{}-{}", hostname, uuid)
295}
296
297fn expand_tilde(path: &Path) -> Result<PathBuf> {
298 let path_str = path.to_string_lossy();
299 if path_str == "~" {
300 let base = BaseDirs::new().context("base directories not available")?;
301 Ok(base.home_dir().to_path_buf())
302 } else if let Some(stripped) = path_str.strip_prefix("~/") {
303 let base = BaseDirs::new().context("base directories not available")?;
304 Ok(base.home_dir().join(stripped))
305 } else {
306 Ok(path.to_path_buf())
307 }
308}
309
310#[cfg(test)]
311mod tests {
312 use super::*;
313 use std::path::Path;
314
315 #[test]
316 fn expands_home_directory_prefix() {
317 let base = BaseDirs::new().expect("home directory available");
318 let expected = base.home_dir().join("demo.db");
319 let result = expand_tilde(Path::new("~/demo.db")).expect("path expansion succeeds");
320 assert_eq!(result, expected);
321 }
322
323 #[test]
324 fn leaves_regular_paths_unchanged() {
325 let input = Path::new("relative/path.db");
326 let result = expand_tilde(input).expect("path expansion succeeds");
327 assert_eq!(result, input);
328 }
329}
330
331fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
332 if a.is_empty() || b.is_empty() || a.len() != b.len() {
333 return 0.0;
334 }
335 let mut dot = 0.0f32;
336 let mut na = 0.0f32;
337 let mut nb = 0.0f32;
338 for i in 0..a.len() {
339 dot += a[i] * b[i];
340 na += a[i] * a[i];
341 nb += b[i] * b[i];
342 }
343 if na == 0.0 || nb == 0.0 {
344 return 0.0;
345 }
346 dot / (na.sqrt() * nb.sqrt())
347}
348
349fn from_kg_node(node: crate::spec_ai_knowledge_graph::GraphNode) -> GraphNode {
354 GraphNode {
355 id: node.id,
356 session_id: node.session_id,
357 node_type: node.node_type,
358 label: node.label,
359 properties: node.properties,
360 embedding_id: node.embedding_id,
361 created_at: node.created_at,
362 updated_at: node.updated_at,
363 }
364}
365
366fn from_kg_edge(edge: crate::spec_ai_knowledge_graph::GraphEdge) -> GraphEdge {
367 GraphEdge {
368 id: edge.id,
369 session_id: edge.session_id,
370 source_id: edge.source_id,
371 target_id: edge.target_id,
372 edge_type: edge.edge_type,
373 predicate: edge.predicate,
374 properties: edge.properties,
375 weight: edge.weight,
376 temporal_start: edge.temporal_start,
377 temporal_end: edge.temporal_end,
378 created_at: edge.created_at,
379 }
380}
381
382fn from_kg_path(path: crate::spec_ai_knowledge_graph::GraphPath) -> GraphPath {
383 GraphPath {
384 nodes: path.nodes.into_iter().map(from_kg_node).collect(),
385 edges: path.edges.into_iter().map(from_kg_edge).collect(),
386 length: path.length,
387 weight: path.weight,
388 }
389}
390
391impl Persistence {
392 pub fn insert_graph_node(
395 &self,
396 session_id: &str,
397 node_type: crate::spec_ai_knowledge_graph::NodeType,
398 label: &str,
399 properties: &JsonValue,
400 embedding_id: Option<i64>,
401 ) -> Result<i64> {
402 self.graph_store
403 .insert_graph_node(session_id, node_type, label, properties, embedding_id)
404 }
405
406 pub fn get_graph_node(&self, node_id: i64) -> Result<Option<GraphNode>> {
407 self.graph_store
408 .get_graph_node(node_id)
409 .map(|opt| opt.map(from_kg_node))
410 }
411
412 pub fn list_graph_nodes(
413 &self,
414 session_id: &str,
415 node_type: Option<crate::spec_ai_knowledge_graph::NodeType>,
416 limit: Option<i64>,
417 ) -> Result<Vec<GraphNode>> {
418 self.graph_store
419 .list_graph_nodes(session_id, node_type, limit)
420 .map(|nodes| nodes.into_iter().map(from_kg_node).collect())
421 }
422
423 pub fn count_graph_nodes(&self, session_id: &str) -> Result<i64> {
424 self.graph_store.count_graph_nodes(session_id)
425 }
426
427 pub fn update_graph_node(&self, node_id: i64, properties: &JsonValue) -> Result<()> {
428 self.graph_store.update_graph_node(node_id, properties)
429 }
430
431 pub fn delete_graph_node(&self, node_id: i64) -> Result<()> {
432 self.graph_store.delete_graph_node(node_id)
433 }
434
435 #[allow(clippy::too_many_arguments)]
438 pub fn insert_graph_edge(
439 &self,
440 session_id: &str,
441 source_id: i64,
442 target_id: i64,
443 edge_type: crate::spec_ai_knowledge_graph::EdgeType,
444 predicate: Option<&str>,
445 properties: Option<&JsonValue>,
446 weight: f32,
447 ) -> Result<i64> {
448 self.graph_store.insert_graph_edge(
449 session_id, source_id, target_id, edge_type, predicate, properties, weight,
450 )
451 }
452
453 pub fn get_graph_edge(&self, edge_id: i64) -> Result<Option<GraphEdge>> {
454 self.graph_store
455 .get_graph_edge(edge_id)
456 .map(|opt| opt.map(from_kg_edge))
457 }
458
459 pub fn list_graph_edges(
460 &self,
461 session_id: &str,
462 source_id: Option<i64>,
463 target_id: Option<i64>,
464 ) -> Result<Vec<GraphEdge>> {
465 self.graph_store
466 .list_graph_edges(session_id, source_id, target_id)
467 .map(|edges| edges.into_iter().map(from_kg_edge).collect())
468 }
469
470 pub fn count_graph_edges(&self, session_id: &str) -> Result<i64> {
471 self.graph_store.count_graph_edges(session_id)
472 }
473
474 pub fn delete_graph_edge(&self, edge_id: i64) -> Result<()> {
475 self.graph_store.delete_graph_edge(edge_id)
476 }
477
478 pub fn find_shortest_path(
481 &self,
482 session_id: &str,
483 source_id: i64,
484 target_id: i64,
485 max_hops: Option<usize>,
486 ) -> Result<Option<GraphPath>> {
487 self.graph_store
488 .find_shortest_path(session_id, source_id, target_id, max_hops)
489 .map(|opt| opt.map(from_kg_path))
490 }
491
492 pub fn traverse_neighbors(
493 &self,
494 session_id: &str,
495 node_id: i64,
496 direction: crate::spec_ai_knowledge_graph::TraversalDirection,
497 depth: usize,
498 ) -> Result<Vec<GraphNode>> {
499 self.graph_store
500 .traverse_neighbors(session_id, node_id, direction, depth)
501 .map(|nodes| nodes.into_iter().map(from_kg_node).collect())
502 }
503
504 pub fn insert_transcription(
507 &self,
508 session_id: &str,
509 chunk_id: i64,
510 text: &str,
511 timestamp: chrono::DateTime<Utc>,
512 ) -> Result<i64> {
513 let conn = self.conn();
514 let mut stmt = conn.prepare(
515 "INSERT INTO transcriptions (session_id, chunk_id, text, timestamp, embedding_id) VALUES (?, ?, ?, ?, NULL) RETURNING id",
516 )?;
517 let id: i64 = stmt.query_row(
518 params![session_id, chunk_id, text, timestamp.to_rfc3339()],
519 |row| row.get(0),
520 )?;
521 Ok(id)
522 }
523
524 pub fn update_transcription_embedding(
525 &self,
526 transcription_id: i64,
527 embedding_id: i64,
528 ) -> Result<()> {
529 let conn = self.conn();
530 conn.execute(
531 "UPDATE transcriptions SET embedding_id = ? WHERE id = ?",
532 params![embedding_id, transcription_id],
533 )?;
534 Ok(())
535 }
536
537 #[allow(clippy::type_complexity)]
538 pub fn list_transcriptions(
539 &self,
540 session_id: &str,
541 limit: Option<i64>,
542 ) -> Result<Vec<(i64, i64, String, DateTime<Utc>)>> {
543 let conn = self.conn();
544 let query = if let Some(lim) = limit {
545 format!(
546 "SELECT id, chunk_id, text, CAST(timestamp AS TEXT) FROM transcriptions WHERE session_id = ? ORDER BY chunk_id ASC LIMIT {}",
547 lim
548 )
549 } else {
550 "SELECT id, chunk_id, text, CAST(timestamp AS TEXT) FROM transcriptions WHERE session_id = ? ORDER BY chunk_id ASC".to_string()
551 };
552
553 let mut stmt = conn.prepare(&query)?;
554 let mut rows = stmt.query(params![session_id])?;
555 let mut out = Vec::new();
556
557 while let Some(row) = rows.next()? {
558 let id: i64 = row.get(0)?;
559 let chunk_id: i64 = row.get(1)?;
560 let text: String = row.get(2)?;
561 let timestamp_str: String = row.get(3)?;
562 let timestamp: DateTime<Utc> = timestamp_str.parse().unwrap_or_else(|_| Utc::now());
563 out.push((id, chunk_id, text, timestamp));
564 }
565
566 Ok(out)
567 }
568
569 pub fn get_full_transcription(&self, session_id: &str) -> Result<String> {
570 let transcriptions = self.list_transcriptions(session_id, None)?;
571 Ok(transcriptions
572 .into_iter()
573 .map(|(_, _, text, _)| text)
574 .collect::<Vec<_>>()
575 .join(" "))
576 }
577
578 pub fn delete_transcriptions(&self, session_id: &str) -> Result<()> {
579 let conn = self.conn();
580 conn.execute(
581 "DELETE FROM transcriptions WHERE session_id = ?",
582 params![session_id],
583 )?;
584 Ok(())
585 }
586
587 pub fn get_transcription_by_embedding(&self, embedding_id: i64) -> Result<Option<String>> {
588 let conn = self.conn();
589 let mut stmt =
590 conn.prepare("SELECT text FROM transcriptions WHERE embedding_id = ? LIMIT 1")?;
591 let result: Result<String, _> = stmt.query_row(params![embedding_id], |row| row.get(0));
592 match result {
593 Ok(text) => Ok(Some(text)),
594 Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
595 Err(e) => Err(e.into()),
596 }
597 }
598
599 #[allow(clippy::too_many_arguments)]
603 pub fn upsert_tokenized_file(
604 &self,
605 session_id: &str,
606 path: &str,
607 file_hash: &str,
608 raw_tokens: usize,
609 cleaned_tokens: usize,
610 bytes_captured: usize,
611 truncated: bool,
612 embedding_id: Option<i64>,
613 ) -> Result<i64> {
614 let conn = self.conn();
615 conn.execute(
616 "DELETE FROM tokenized_files WHERE session_id = ? AND path = ?",
617 params![session_id, path],
618 )?;
619 let mut stmt = conn.prepare("INSERT INTO tokenized_files (session_id, path, file_hash, raw_tokens, cleaned_tokens, bytes_captured, truncated, embedding_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id")?;
620 let id: i64 = stmt.query_row(
621 params![
622 session_id,
623 path,
624 file_hash,
625 raw_tokens as i64,
626 cleaned_tokens as i64,
627 bytes_captured as i64,
628 truncated,
629 embedding_id
630 ],
631 |row| row.get(0),
632 )?;
633 Ok(id)
634 }
635
636 pub fn get_tokenized_file(
637 &self,
638 session_id: &str,
639 path: &str,
640 ) -> Result<Option<TokenizedFileRecord>> {
641 let conn = self.conn();
642 let mut stmt = conn.prepare("SELECT id, session_id, path, file_hash, raw_tokens, cleaned_tokens, bytes_captured, truncated, embedding_id, CAST(updated_at AS TEXT) FROM tokenized_files WHERE session_id = ? AND path = ? LIMIT 1")?;
643 let mut rows = stmt.query(params![session_id, path])?;
644 if let Some(row) = rows.next()? {
645 let record = TokenizedFileRecord::from_row(row)?;
646 Ok(Some(record))
647 } else {
648 Ok(None)
649 }
650 }
651
652 pub fn list_tokenized_files(&self, session_id: &str) -> Result<Vec<TokenizedFileRecord>> {
653 let conn = self.conn();
654 let mut stmt = conn.prepare("SELECT id, session_id, path, file_hash, raw_tokens, cleaned_tokens, bytes_captured, truncated, embedding_id, CAST(updated_at AS TEXT) FROM tokenized_files WHERE session_id = ? ORDER BY path")?;
655 let mut rows = stmt.query(params![session_id])?;
656 let mut out = Vec::new();
657 while let Some(row) = rows.next()? {
658 out.push(TokenizedFileRecord::from_row(row)?);
659 }
660 Ok(out)
661 }
662
663 pub fn mesh_message_store(
667 &self,
668 message_id: &str,
669 source_instance: &str,
670 target_instance: Option<&str>,
671 message_type: &str,
672 payload: &JsonValue,
673 status: &str,
674 ) -> Result<i64> {
675 let conn = self.conn();
676 let payload_json = serde_json::to_string(payload)?;
677 conn.execute(
678 "INSERT INTO mesh_messages (message_id, source_instance, target_instance, message_type, payload, status) VALUES (?, ?, ?, ?, ?, ?)",
679 params![message_id, source_instance, target_instance, message_type, payload_json, status],
680 )?;
681 let id: i64 = conn.query_row("SELECT last_insert_rowid()", params![], |row| row.get(0))?;
683 Ok(id)
684 }
685
686 pub fn mesh_message_exists(&self, message_id: &str) -> Result<bool> {
688 let conn = self.conn();
689 let count: i64 = conn.query_row(
690 "SELECT COUNT(*) FROM mesh_messages WHERE message_id = ?",
691 params![message_id],
692 |row| row.get(0),
693 )?;
694 Ok(count > 0)
695 }
696
697 pub fn mesh_message_update_status(&self, message_id: i64, status: &str) -> Result<()> {
699 let conn = self.conn();
700 conn.execute(
701 "UPDATE mesh_messages SET status = ?, delivered_at = CURRENT_TIMESTAMP WHERE id = ?",
702 params![status, message_id],
703 )?;
704 Ok(())
705 }
706
707 pub fn mesh_message_get_pending(
709 &self,
710 target_instance: &str,
711 ) -> Result<Vec<MeshMessageRecord>> {
712 let conn = self.conn();
713 let mut stmt = conn.prepare(
714 "SELECT id, source_instance, target_instance, message_type, payload, status, CAST(created_at AS TEXT), CAST(delivered_at AS TEXT)
715 FROM mesh_messages
716 WHERE (target_instance = ? OR target_instance IS NULL) AND status = 'pending'
717 ORDER BY created_at",
718 )?;
719 let mut rows = stmt.query(params![target_instance])?;
720 let mut out = Vec::new();
721 while let Some(row) = rows.next()? {
722 out.push(MeshMessageRecord::from_row(row)?);
723 }
724 Ok(out)
725 }
726
727 pub fn mesh_message_get_history(
729 &self,
730 instance_id: Option<&str>,
731 limit: usize,
732 ) -> Result<Vec<MeshMessageRecord>> {
733 let conn = self.conn();
734 let query = if instance_id.is_some() {
735 format!(
736 "SELECT id, source_instance, target_instance, message_type, payload, status, CAST(created_at AS TEXT), CAST(delivered_at AS TEXT)
737 FROM mesh_messages
738 WHERE source_instance = ? OR target_instance = ?
739 ORDER BY created_at DESC LIMIT {}",
740 limit
741 )
742 } else {
743 format!(
744 "SELECT id, source_instance, target_instance, message_type, payload, status, CAST(created_at AS TEXT), CAST(delivered_at AS TEXT)
745 FROM mesh_messages
746 ORDER BY created_at DESC LIMIT {}",
747 limit
748 )
749 };
750
751 let mut stmt = conn.prepare(&query)?;
752 let mut rows = if let Some(inst) = instance_id {
753 stmt.query(params![inst, inst])?
754 } else {
755 stmt.query(params![])?
756 };
757
758 let mut out = Vec::new();
759 while let Some(row) = rows.next()? {
760 out.push(MeshMessageRecord::from_row(row)?);
761 }
762 Ok(out)
763 }
764
765 #[allow(clippy::too_many_arguments)]
769 pub fn graph_changelog_append(
770 &self,
771 session_id: &str,
772 instance_id: &str,
773 entity_type: &str,
774 entity_id: i64,
775 operation: &str,
776 vector_clock: &str,
777 data: Option<&str>,
778 ) -> Result<i64> {
779 self.graph_store.graph_changelog_append(
780 session_id,
781 instance_id,
782 entity_type,
783 entity_id,
784 operation,
785 vector_clock,
786 data,
787 )
788 }
789
790 pub fn graph_changelog_get_since(
792 &self,
793 session_id: &str,
794 since_timestamp: &str,
795 ) -> Result<Vec<ChangelogEntry>> {
796 self.graph_store
797 .graph_changelog_get_since(session_id, since_timestamp)
798 .map(|entries| {
799 entries
800 .into_iter()
801 .map(|e| ChangelogEntry {
802 id: e.id,
803 session_id: e.session_id,
804 instance_id: e.instance_id,
805 entity_type: e.entity_type,
806 entity_id: e.entity_id,
807 operation: e.operation,
808 vector_clock: e.vector_clock,
809 data: e.data,
810 created_at: e.created_at,
811 })
812 .collect()
813 })
814 }
815
816 pub fn graph_list_conflicts(&self, session_id: Option<&str>) -> Result<Vec<ChangelogEntry>> {
818 self.graph_store
819 .graph_list_conflicts(session_id, 100)
820 .map(|entries| {
821 entries
822 .into_iter()
823 .map(|e| ChangelogEntry {
824 id: e.id,
825 session_id: e.session_id,
826 instance_id: e.instance_id,
827 entity_type: e.entity_type,
828 entity_id: e.entity_id,
829 operation: e.operation,
830 vector_clock: e.vector_clock,
831 data: e.data,
832 created_at: e.created_at,
833 })
834 .collect()
835 })
836 }
837
838 pub fn graph_changelog_prune(&self, days_to_keep: i64) -> Result<usize> {
840 self.graph_store.graph_changelog_prune(days_to_keep)
841 }
842
843 pub fn graph_sync_state_get(
845 &self,
846 instance_id: &str,
847 session_id: &str,
848 graph_name: &str,
849 ) -> Result<Option<String>> {
850 self.graph_store
851 .graph_sync_state_get(instance_id, session_id, graph_name)
852 }
853
854 pub fn graph_sync_state_get_metadata(
856 &self,
857 instance_id: &str,
858 session_id: &str,
859 graph_name: &str,
860 ) -> Result<Option<SyncStateRecord>> {
861 self.graph_store
862 .graph_sync_state_get_metadata(instance_id, session_id, graph_name)
863 .map(|opt| {
864 opt.map(|r| SyncStateRecord {
865 vector_clock: r.vector_clock,
866 last_sync_at: r.last_sync_at,
867 })
868 })
869 }
870
871 pub fn graph_sync_state_update(
873 &self,
874 instance_id: &str,
875 session_id: &str,
876 graph_name: &str,
877 vector_clock: &str,
878 ) -> Result<()> {
879 self.graph_store
880 .graph_sync_state_update(instance_id, session_id, graph_name, vector_clock)
881 }
882
883 pub fn graph_set_sync_config(
885 &self,
886 session_id: &str,
887 graph_name: &str,
888 sync_enabled: bool,
889 conflict_resolution_strategy: Option<&str>,
890 sync_interval_seconds: Option<u64>,
891 ) -> Result<GraphSyncConfig> {
892 self.graph_store
893 .graph_set_sync_config(
894 session_id,
895 graph_name,
896 sync_enabled,
897 conflict_resolution_strategy,
898 sync_interval_seconds,
899 )
900 .map(|cfg| GraphSyncConfig {
901 sync_enabled: cfg.sync_enabled,
902 conflict_resolution_strategy: cfg.conflict_resolution_strategy,
903 sync_interval_seconds: cfg.sync_interval_seconds,
904 })
905 }
906
907 pub fn graph_get_sync_config(
909 &self,
910 session_id: &str,
911 graph_name: &str,
912 ) -> Result<GraphSyncConfig> {
913 self.graph_store
914 .graph_get_sync_config(session_id, graph_name)
915 .map(|cfg| GraphSyncConfig {
916 sync_enabled: cfg.sync_enabled,
917 conflict_resolution_strategy: cfg.conflict_resolution_strategy,
918 sync_interval_seconds: cfg.sync_interval_seconds,
919 })
920 }
921
922 pub fn graph_set_sync_enabled(
924 &self,
925 session_id: &str,
926 graph_name: &str,
927 enabled: bool,
928 ) -> Result<()> {
929 self.graph_store
930 .graph_set_sync_enabled(session_id, graph_name, enabled)
931 }
932
933 pub fn graph_get_sync_enabled(&self, session_id: &str, graph_name: &str) -> Result<bool> {
935 self.graph_store
936 .graph_get_sync_enabled(session_id, graph_name)
937 }
938
939 pub fn graph_list(&self, session_id: &str) -> Result<Vec<String>> {
941 self.graph_store.graph_list(session_id)
942 }
943
944 pub fn graph_list_sync_enabled(&self) -> Result<Vec<(String, String)>> {
946 self.graph_store.graph_list_sync_enabled()
947 }
948
949 pub fn graph_get_node_with_sync(&self, node_id: i64) -> Result<Option<SyncedNodeRecord>> {
951 self.graph_store
952 .graph_get_node_with_sync(node_id)
953 .map(|opt| {
954 opt.map(|r| SyncedNodeRecord {
955 id: r.id,
956 session_id: r.session_id,
957 node_type: r.node_type,
958 label: r.label,
959 properties: r.properties,
960 embedding_id: r.embedding_id,
961 created_at: r.created_at,
962 updated_at: r.updated_at,
963 vector_clock: r.vector_clock,
964 last_modified_by: r.last_modified_by,
965 is_deleted: r.is_deleted,
966 sync_enabled: r.sync_enabled,
967 })
968 })
969 }
970
971 pub fn graph_list_nodes_with_sync(
973 &self,
974 session_id: &str,
975 sync_enabled_only: bool,
976 include_deleted: bool,
977 ) -> Result<Vec<SyncedNodeRecord>> {
978 self.graph_store
979 .graph_list_nodes_with_sync(session_id, sync_enabled_only, include_deleted)
980 .map(|nodes| {
981 nodes
982 .into_iter()
983 .map(|r| SyncedNodeRecord {
984 id: r.id,
985 session_id: r.session_id,
986 node_type: r.node_type,
987 label: r.label,
988 properties: r.properties,
989 embedding_id: r.embedding_id,
990 created_at: r.created_at,
991 updated_at: r.updated_at,
992 vector_clock: r.vector_clock,
993 last_modified_by: r.last_modified_by,
994 is_deleted: r.is_deleted,
995 sync_enabled: r.sync_enabled,
996 })
997 .collect()
998 })
999 }
1000
1001 pub fn graph_get_edge_with_sync(&self, edge_id: i64) -> Result<Option<SyncedEdgeRecord>> {
1003 self.graph_store
1004 .graph_get_edge_with_sync(edge_id)
1005 .map(|opt| {
1006 opt.map(|r| SyncedEdgeRecord {
1007 id: r.id,
1008 session_id: r.session_id,
1009 source_id: r.source_id,
1010 target_id: r.target_id,
1011 edge_type: r.edge_type,
1012 predicate: r.predicate,
1013 properties: r.properties,
1014 weight: r.weight,
1015 temporal_start: r.temporal_start,
1016 temporal_end: r.temporal_end,
1017 created_at: r.created_at,
1018 vector_clock: r.vector_clock,
1019 last_modified_by: r.last_modified_by,
1020 is_deleted: r.is_deleted,
1021 sync_enabled: r.sync_enabled,
1022 })
1023 })
1024 }
1025
1026 pub fn graph_list_edges_with_sync(
1028 &self,
1029 session_id: &str,
1030 sync_enabled_only: bool,
1031 include_deleted: bool,
1032 ) -> Result<Vec<SyncedEdgeRecord>> {
1033 self.graph_store
1034 .graph_list_edges_with_sync(session_id, sync_enabled_only, include_deleted)
1035 .map(|edges| {
1036 edges
1037 .into_iter()
1038 .map(|r| SyncedEdgeRecord {
1039 id: r.id,
1040 session_id: r.session_id,
1041 source_id: r.source_id,
1042 target_id: r.target_id,
1043 edge_type: r.edge_type,
1044 predicate: r.predicate,
1045 properties: r.properties,
1046 weight: r.weight,
1047 temporal_start: r.temporal_start,
1048 temporal_end: r.temporal_end,
1049 created_at: r.created_at,
1050 vector_clock: r.vector_clock,
1051 last_modified_by: r.last_modified_by,
1052 is_deleted: r.is_deleted,
1053 sync_enabled: r.sync_enabled,
1054 })
1055 .collect()
1056 })
1057 }
1058
1059 pub fn graph_update_node_sync_metadata(
1061 &self,
1062 node_id: i64,
1063 vector_clock: &str,
1064 last_modified_by: &str,
1065 sync_enabled: bool,
1066 ) -> Result<()> {
1067 self.graph_store.graph_update_node_sync_metadata(
1068 node_id,
1069 vector_clock,
1070 last_modified_by,
1071 sync_enabled,
1072 )
1073 }
1074
1075 pub fn graph_update_edge_sync_metadata(
1077 &self,
1078 edge_id: i64,
1079 vector_clock: &str,
1080 last_modified_by: &str,
1081 sync_enabled: bool,
1082 ) -> Result<()> {
1083 self.graph_store.graph_update_edge_sync_metadata(
1084 edge_id,
1085 vector_clock,
1086 last_modified_by,
1087 sync_enabled,
1088 )
1089 }
1090
1091 pub fn graph_mark_node_deleted(
1093 &self,
1094 node_id: i64,
1095 vector_clock: &str,
1096 deleted_by: &str,
1097 ) -> Result<()> {
1098 self.graph_store
1099 .graph_mark_node_deleted(node_id, vector_clock, deleted_by)
1100 }
1101
1102 pub fn graph_mark_edge_deleted(
1104 &self,
1105 edge_id: i64,
1106 vector_clock: &str,
1107 deleted_by: &str,
1108 ) -> Result<()> {
1109 self.graph_store
1110 .graph_mark_edge_deleted(edge_id, vector_clock, deleted_by)
1111 }
1112}
1113
1114#[derive(Debug, Clone)]
1115pub struct TokenizedFileRecord {
1116 pub id: i64,
1117 pub session_id: String,
1118 pub path: String,
1119 pub file_hash: String,
1120 pub raw_tokens: usize,
1121 pub cleaned_tokens: usize,
1122 pub bytes_captured: usize,
1123 pub truncated: bool,
1124 pub embedding_id: Option<i64>,
1125 pub updated_at: DateTime<Utc>,
1126}
1127
1128impl TokenizedFileRecord {
1129 fn from_row(row: &duckdb::Row) -> Result<Self> {
1130 let id: i64 = row.get(0)?;
1131 let session_id: String = row.get(1)?;
1132 let path: String = row.get(2)?;
1133 let file_hash: String = row.get(3)?;
1134 let raw_tokens: i64 = row.get(4)?;
1135 let cleaned_tokens: i64 = row.get(5)?;
1136 let bytes_captured: i64 = row.get(6)?;
1137 let truncated: bool = row.get(7)?;
1138 let embedding_id: Option<i64> = row.get(8)?;
1139 let updated_at: String = row.get(9)?;
1140
1141 Ok(Self {
1142 id,
1143 session_id,
1144 path,
1145 file_hash,
1146 raw_tokens: raw_tokens.max(0) as usize,
1147 cleaned_tokens: cleaned_tokens.max(0) as usize,
1148 bytes_captured: bytes_captured.max(0) as usize,
1149 truncated,
1150 embedding_id,
1151 updated_at: updated_at.parse().unwrap_or_else(|_| Utc::now()),
1152 })
1153 }
1154}
1155
1156#[derive(Debug, Clone)]
1157pub struct MeshMessageRecord {
1158 pub id: i64,
1159 pub source_instance: String,
1160 pub target_instance: Option<String>,
1161 pub message_type: String,
1162 pub payload: JsonValue,
1163 pub status: String,
1164 pub created_at: DateTime<Utc>,
1165 pub delivered_at: Option<DateTime<Utc>>,
1166}
1167
1168impl MeshMessageRecord {
1169 fn from_row(row: &duckdb::Row) -> Result<Self> {
1170 let id: i64 = row.get(0)?;
1171 let source_instance: String = row.get(1)?;
1172 let target_instance: Option<String> = row.get(2)?;
1173 let message_type: String = row.get(3)?;
1174 let payload_str: String = row.get(4)?;
1175 let payload: JsonValue = serde_json::from_str(&payload_str)?;
1176 let status: String = row.get(5)?;
1177 let created_at_str: String = row.get(6)?;
1178 let delivered_at_str: Option<String> = row.get(7)?;
1179
1180 Ok(MeshMessageRecord {
1181 id,
1182 source_instance,
1183 target_instance,
1184 message_type,
1185 payload,
1186 status,
1187 created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1188 delivered_at: delivered_at_str.and_then(|s| s.parse().ok()),
1189 })
1190 }
1191}
1192
1193#[derive(Debug, Clone)]
1196pub struct SyncStateRecord {
1197 pub vector_clock: String,
1198 pub last_sync_at: Option<String>,
1199}
1200
1201#[derive(Debug, Clone, Default)]
1202pub struct GraphSyncConfig {
1203 pub sync_enabled: bool,
1204 pub conflict_resolution_strategy: Option<String>,
1205 pub sync_interval_seconds: Option<u64>,
1206}
1207
1208#[derive(Debug, Clone)]
1209pub struct ChangelogEntry {
1210 pub id: i64,
1211 pub session_id: String,
1212 pub instance_id: String,
1213 pub entity_type: String,
1214 pub entity_id: i64,
1215 pub operation: String,
1216 pub vector_clock: String,
1217 pub data: Option<String>,
1218 pub created_at: DateTime<Utc>,
1219}
1220
1221impl ChangelogEntry {
1222 #[allow(dead_code)]
1223 fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1224 let id: i64 = row.get(0)?;
1225 let session_id: String = row.get(1)?;
1226 let instance_id: String = row.get(2)?;
1227 let entity_type: String = row.get(3)?;
1228 let entity_id: i64 = row.get(4)?;
1229 let operation: String = row.get(5)?;
1230 let vector_clock: String = row.get(6)?;
1231 let data: Option<String> = row.get(7)?;
1232 let created_at_str: String = row.get(8)?;
1233
1234 Ok(ChangelogEntry {
1235 id,
1236 session_id,
1237 instance_id,
1238 entity_type,
1239 entity_id,
1240 operation,
1241 vector_clock,
1242 data,
1243 created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1244 })
1245 }
1246}
1247
1248#[derive(Debug, Clone)]
1249pub struct SyncedNodeRecord {
1250 pub id: i64,
1251 pub session_id: String,
1252 pub node_type: String,
1253 pub label: String,
1254 pub properties: serde_json::Value,
1255 pub embedding_id: Option<i64>,
1256 pub created_at: DateTime<Utc>,
1257 pub updated_at: DateTime<Utc>,
1258 pub vector_clock: String,
1259 pub last_modified_by: Option<String>,
1260 pub is_deleted: bool,
1261 pub sync_enabled: bool,
1262}
1263
1264impl SyncedNodeRecord {
1265 #[allow(dead_code)]
1266 fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1267 let id: i64 = row.get(0)?;
1268 let session_id: String = row.get(1)?;
1269 let node_type: String = row.get(2)?;
1270 let label: String = row.get(3)?;
1271 let properties_str: String = row.get(4)?;
1272 let properties: serde_json::Value = serde_json::from_str(&properties_str).map_err(|e| {
1273 duckdb::Error::FromSqlConversionFailure(4, duckdb::types::Type::Text, Box::new(e))
1274 })?;
1275 let embedding_id: Option<i64> = row.get(5)?;
1276 let created_at_str: String = row.get(6)?;
1277 let updated_at_str: String = row.get(7)?;
1278 let vector_clock: String = row.get(8)?;
1279 let last_modified_by: Option<String> = row.get(9)?;
1280 let is_deleted: bool = row.get(10)?;
1281 let sync_enabled: bool = row.get(11)?;
1282
1283 Ok(SyncedNodeRecord {
1284 id,
1285 session_id,
1286 node_type,
1287 label,
1288 properties,
1289 embedding_id,
1290 created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1291 updated_at: updated_at_str.parse().unwrap_or_else(|_| Utc::now()),
1292 vector_clock,
1293 last_modified_by,
1294 is_deleted,
1295 sync_enabled,
1296 })
1297 }
1298}
1299
1300#[derive(Debug, Clone)]
1301pub struct SyncedEdgeRecord {
1302 pub id: i64,
1303 pub session_id: String,
1304 pub source_id: i64,
1305 pub target_id: i64,
1306 pub edge_type: String,
1307 pub predicate: Option<String>,
1308 pub properties: Option<serde_json::Value>,
1309 pub weight: f32,
1310 pub temporal_start: Option<DateTime<Utc>>,
1311 pub temporal_end: Option<DateTime<Utc>>,
1312 pub created_at: DateTime<Utc>,
1313 pub vector_clock: String,
1314 pub last_modified_by: Option<String>,
1315 pub is_deleted: bool,
1316 pub sync_enabled: bool,
1317}
1318
1319impl SyncedEdgeRecord {
1320 #[allow(dead_code)]
1321 fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1322 let id: i64 = row.get(0)?;
1323 let session_id: String = row.get(1)?;
1324 let source_id: i64 = row.get(2)?;
1325 let target_id: i64 = row.get(3)?;
1326 let edge_type: String = row.get(4)?;
1327 let predicate: Option<String> = row.get(5)?;
1328 let properties_str: Option<String> = row.get(6)?;
1329 let properties: Option<serde_json::Value> = properties_str
1330 .as_ref()
1331 .and_then(|s| serde_json::from_str(s).ok());
1332 let weight: f32 = row.get(7)?;
1333 let temporal_start_str: Option<String> = row.get(8)?;
1334 let temporal_end_str: Option<String> = row.get(9)?;
1335 let created_at_str: String = row.get(10)?;
1336 let vector_clock: String = row.get(11)?;
1337 let last_modified_by: Option<String> = row.get(12)?;
1338 let is_deleted: bool = row.get(13)?;
1339 let sync_enabled: bool = row.get(14)?;
1340
1341 Ok(SyncedEdgeRecord {
1342 id,
1343 session_id,
1344 source_id,
1345 target_id,
1346 edge_type,
1347 predicate,
1348 properties,
1349 weight,
1350 temporal_start: temporal_start_str.and_then(|s| s.parse().ok()),
1351 temporal_end: temporal_end_str.and_then(|s| s.parse().ok()),
1352 created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1353 vector_clock,
1354 last_modified_by,
1355 is_deleted,
1356 sync_enabled,
1357 })
1358 }
1359}