1pub mod migrations;
2
3use anyhow::{Context, Result};
4use chrono::{DateTime, Utc};
5use directories::BaseDirs;
6use duckdb::{params, Connection};
7use serde_json::Value as JsonValue;
8use spec_ai_knowledge_graph::KnowledgeGraphStore;
9use std::path::{Path, PathBuf};
10use std::sync::{Arc, Mutex};
11
12use crate::types::{
13 EdgeType, GraphEdge, GraphNode, GraphPath, MemoryVector, Message, MessageRole, NodeType,
14 PolicyEntry, TraversalDirection,
15};
16
17#[derive(Clone)]
18pub struct Persistence {
19 conn: Arc<Mutex<Connection>>,
20 instance_id: String,
21 graph_store: KnowledgeGraphStore,
22}
23
24impl Persistence {
25 pub fn new<P: AsRef<Path>>(db_path: P) -> Result<Self> {
27 Self::with_instance_id(db_path, generate_instance_id())
28 }
29
30 pub fn with_instance_id<P: AsRef<Path>>(db_path: P, instance_id: String) -> Result<Self> {
32 let db_path = expand_tilde(db_path.as_ref())?;
33 if let Some(dir) = db_path.parent() {
34 std::fs::create_dir_all(dir).context("creating DB directory")?;
35 }
36 let conn = Connection::open(&db_path).context("opening DuckDB")?;
37 migrations::run(&conn).context("running migrations")?;
38 let conn_arc = Arc::new(Mutex::new(conn));
39 let graph_store = KnowledgeGraphStore::new(conn_arc.clone(), instance_id.clone());
40 Ok(Self {
41 conn: conn_arc,
42 instance_id,
43 graph_store,
44 })
45 }
46
47 pub fn instance_id(&self) -> &str {
49 &self.instance_id
50 }
51
52 pub fn graph_store(&self) -> &KnowledgeGraphStore {
54 &self.graph_store
55 }
56
57 pub fn checkpoint(&self) -> Result<()> {
60 let conn = self.conn();
61 conn.execute_batch("CHECKPOINT;")
62 .context("checkpointing database")
63 }
64
65 pub fn new_default() -> Result<Self> {
67 let base = BaseDirs::new().context("base directories not available")?;
68 let path = base.home_dir().join(".agent_cli").join("agent_data.duckdb");
69 Self::new(path)
70 }
71
72 pub fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
75 self.conn
76 .lock()
77 .expect("database connection mutex poisoned")
78 }
79
80 pub fn insert_message(
83 &self,
84 session_id: &str,
85 role: MessageRole,
86 content: &str,
87 ) -> Result<i64> {
88 let conn = self.conn();
89 let mut stmt = conn.prepare(
90 "INSERT INTO messages (session_id, role, content) VALUES (?, ?, ?) RETURNING id",
91 )?;
92 let id: i64 = stmt.query_row(params![session_id, role.as_str(), content], |row| {
93 row.get(0)
94 })?;
95 Ok(id)
96 }
97
98 pub fn list_messages(&self, session_id: &str, limit: i64) -> Result<Vec<Message>> {
99 let conn = self.conn();
100 let mut stmt = conn.prepare("SELECT id, session_id, role, content, CAST(created_at AS TEXT) as created_at FROM messages WHERE session_id = ? ORDER BY id DESC LIMIT ?")?;
101 let mut rows = stmt.query(params![session_id, limit])?;
102 let mut out = Vec::new();
103 while let Some(row) = rows.next()? {
104 let id: i64 = row.get(0)?;
105 let sid: String = row.get(1)?;
106 let role: String = row.get(2)?;
107 let content: String = row.get(3)?;
108 let created_at: String = row.get(4)?; let created_at: DateTime<Utc> = created_at.parse().unwrap_or_else(|_| Utc::now());
110 out.push(Message {
111 id,
112 session_id: sid,
113 role: MessageRole::from_str(&role),
114 content,
115 created_at,
116 });
117 }
118 out.reverse();
119 Ok(out)
120 }
121
122 pub fn get_message(&self, message_id: i64) -> Result<Option<Message>> {
123 let conn = self.conn();
124 let mut stmt = conn.prepare("SELECT id, session_id, role, content, CAST(created_at AS TEXT) as created_at FROM messages WHERE id = ?")?;
125 let mut rows = stmt.query(params![message_id])?;
126 if let Some(row) = rows.next()? {
127 let id: i64 = row.get(0)?;
128 let sid: String = row.get(1)?;
129 let role: String = row.get(2)?;
130 let content: String = row.get(3)?;
131 let created_at: String = row.get(4)?;
132 let created_at: DateTime<Utc> = created_at.parse().unwrap_or_else(|_| Utc::now());
133 Ok(Some(Message {
134 id,
135 session_id: sid,
136 role: MessageRole::from_str(&role),
137 content,
138 created_at,
139 }))
140 } else {
141 Ok(None)
142 }
143 }
144
145 pub fn prune_messages(&self, session_id: &str, keep_latest: i64) -> Result<u64> {
147 let conn = self.conn();
148 let mut stmt = conn.prepare("DELETE FROM messages WHERE session_id = ? AND id NOT IN (SELECT id FROM messages WHERE session_id = ? ORDER BY id DESC LIMIT ?)")?;
149 let changed = stmt.execute(params![session_id, session_id, keep_latest])? as u64;
150 Ok(changed)
151 }
152
153 pub fn insert_memory_vector(
156 &self,
157 session_id: &str,
158 message_id: Option<i64>,
159 embedding: &[f32],
160 ) -> Result<i64> {
161 let conn = self.conn();
162 let embedding_json = serde_json::to_string(embedding)?;
163 let mut stmt = conn.prepare("INSERT INTO memory_vectors (session_id, message_id, embedding) VALUES (?, ?, ?) RETURNING id")?;
164 let id: i64 = stmt.query_row(params![session_id, message_id, embedding_json], |row| {
165 row.get(0)
166 })?;
167 Ok(id)
168 }
169
170 pub fn recall_top_k(
171 &self,
172 session_id: &str,
173 query_embedding: &[f32],
174 k: usize,
175 ) -> Result<Vec<(MemoryVector, f32)>> {
176 let conn = self.conn();
177 let mut stmt = conn.prepare("SELECT id, session_id, message_id, embedding, CAST(created_at AS TEXT) as created_at FROM memory_vectors WHERE session_id = ?")?;
178 let mut rows = stmt.query(params![session_id])?;
179 let mut scored: Vec<(MemoryVector, f32)> = Vec::new();
180 while let Some(row) = rows.next()? {
181 let id: i64 = row.get(0)?;
182 let sid: String = row.get(1)?;
183 let message_id: Option<i64> = row.get(2)?;
184 let embedding_text: String = row.get(3)?;
185 let created_at: String = row.get(4)?;
186 let created_at: DateTime<Utc> = created_at.parse().unwrap_or_else(|_| Utc::now());
187 let embedding: Vec<f32> = serde_json::from_str(&embedding_text).unwrap_or_default();
188 let score = cosine_similarity(query_embedding, &embedding);
189 scored.push((
190 MemoryVector {
191 id,
192 session_id: sid,
193 message_id,
194 embedding,
195 created_at,
196 },
197 score,
198 ));
199 }
200 scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
201 scored.truncate(k);
202 Ok(scored)
203 }
204
205 pub fn list_sessions(&self) -> Result<Vec<String>> {
207 let conn = self.conn();
208 let mut stmt = conn.prepare(
209 "SELECT session_id, MAX(created_at) as last FROM messages GROUP BY session_id ORDER BY last DESC"
210 )?;
211 let mut rows = stmt.query([])?;
212 let mut out = Vec::new();
213 while let Some(row) = rows.next()? {
214 let sid: String = row.get(0)?;
215 out.push(sid);
216 }
217 Ok(out)
218 }
219
220 pub fn log_tool(
223 &self,
224 session_id: &str,
225 agent_name: &str,
226 run_id: &str,
227 tool_name: &str,
228 arguments: &JsonValue,
229 result: &JsonValue,
230 success: bool,
231 error: Option<&str>,
232 ) -> Result<i64> {
233 let conn = self.conn();
234 let mut stmt = conn.prepare("INSERT INTO tool_log (session_id, agent, run_id, tool_name, arguments, result, success, error) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id")?;
235 let id: i64 = stmt.query_row(
236 params![
237 session_id,
238 agent_name,
239 run_id,
240 tool_name,
241 arguments.to_string(),
242 result.to_string(),
243 success,
244 error.unwrap_or("")
245 ],
246 |row| row.get(0),
247 )?;
248 Ok(id)
249 }
250
251 pub fn policy_upsert(&self, key: &str, value: &JsonValue) -> Result<()> {
254 let conn = self.conn();
255 conn.execute_batch("BEGIN TRANSACTION;")?;
257 {
258 let mut del = conn.prepare("DELETE FROM policy_cache WHERE key = ?")?;
259 let _ = del.execute(params![key])?;
260 let mut ins = conn.prepare("INSERT INTO policy_cache (key, value, updated_at) VALUES (?, ?, CURRENT_TIMESTAMP)")?;
261 let _ = ins.execute(params![key, value.to_string()])?;
262 }
263 conn.execute_batch("COMMIT;")?;
264 Ok(())
265 }
266
267 pub fn policy_get(&self, key: &str) -> Result<Option<PolicyEntry>> {
268 let conn = self.conn();
269 let mut stmt = conn.prepare("SELECT key, value, CAST(updated_at AS TEXT) as updated_at FROM policy_cache WHERE key = ?")?;
270 let mut rows = stmt.query(params![key])?;
271 if let Some(row) = rows.next()? {
272 let key: String = row.get(0)?;
273 let value_text: String = row.get(1)?;
274 let updated_at: String = row.get(2)?;
275 let updated_at: DateTime<Utc> = updated_at.parse().unwrap_or_else(|_| Utc::now());
276 let value: JsonValue = serde_json::from_str(&value_text).unwrap_or(JsonValue::Null);
277 Ok(Some(PolicyEntry {
278 key,
279 value,
280 updated_at,
281 }))
282 } else {
283 Ok(None)
284 }
285 }
286}
287
288fn generate_instance_id() -> String {
289 let hostname = hostname::get()
290 .ok()
291 .and_then(|h| h.into_string().ok())
292 .unwrap_or_else(|| "unknown".to_string());
293 let uuid = uuid::Uuid::new_v7(uuid::Timestamp::now(uuid::NoContext));
294 format!("{}-{}", hostname, uuid)
295}
296
297fn expand_tilde(path: &Path) -> Result<PathBuf> {
298 let path_str = path.to_string_lossy();
299 if path_str == "~" {
300 let base = BaseDirs::new().context("base directories not available")?;
301 Ok(base.home_dir().to_path_buf())
302 } else if let Some(stripped) = path_str.strip_prefix("~/") {
303 let base = BaseDirs::new().context("base directories not available")?;
304 Ok(base.home_dir().join(stripped))
305 } else {
306 Ok(path.to_path_buf())
307 }
308}
309
310#[cfg(test)]
311mod tests {
312 use super::*;
313 use std::path::Path;
314
315 #[test]
316 fn expands_home_directory_prefix() {
317 let base = BaseDirs::new().expect("home directory available");
318 let expected = base.home_dir().join("demo.db");
319 let result = expand_tilde(Path::new("~/demo.db")).expect("path expansion succeeds");
320 assert_eq!(result, expected);
321 }
322
323 #[test]
324 fn leaves_regular_paths_unchanged() {
325 let input = Path::new("relative/path.db");
326 let result = expand_tilde(input).expect("path expansion succeeds");
327 assert_eq!(result, input);
328 }
329}
330
331fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
332 if a.is_empty() || b.is_empty() || a.len() != b.len() {
333 return 0.0;
334 }
335 let mut dot = 0.0f32;
336 let mut na = 0.0f32;
337 let mut nb = 0.0f32;
338 for i in 0..a.len() {
339 dot += a[i] * b[i];
340 na += a[i] * a[i];
341 nb += b[i] * b[i];
342 }
343 if na == 0.0 || nb == 0.0 {
344 return 0.0;
345 }
346 dot / (na.sqrt() * nb.sqrt())
347}
348
349fn from_kg_node(node: spec_ai_knowledge_graph::GraphNode) -> GraphNode {
354 GraphNode {
355 id: node.id,
356 session_id: node.session_id,
357 node_type: node.node_type,
358 label: node.label,
359 properties: node.properties,
360 embedding_id: node.embedding_id,
361 created_at: node.created_at,
362 updated_at: node.updated_at,
363 }
364}
365
366fn from_kg_edge(edge: spec_ai_knowledge_graph::GraphEdge) -> GraphEdge {
367 GraphEdge {
368 id: edge.id,
369 session_id: edge.session_id,
370 source_id: edge.source_id,
371 target_id: edge.target_id,
372 edge_type: edge.edge_type,
373 predicate: edge.predicate,
374 properties: edge.properties,
375 weight: edge.weight,
376 temporal_start: edge.temporal_start,
377 temporal_end: edge.temporal_end,
378 created_at: edge.created_at,
379 }
380}
381
382fn from_kg_path(path: spec_ai_knowledge_graph::GraphPath) -> GraphPath {
383 GraphPath {
384 nodes: path.nodes.into_iter().map(from_kg_node).collect(),
385 edges: path.edges.into_iter().map(from_kg_edge).collect(),
386 length: path.length,
387 weight: path.weight,
388 }
389}
390
391impl Persistence {
392 pub fn insert_graph_node(
395 &self,
396 session_id: &str,
397 node_type: spec_ai_knowledge_graph::NodeType,
398 label: &str,
399 properties: &JsonValue,
400 embedding_id: Option<i64>,
401 ) -> Result<i64> {
402 self.graph_store
403 .insert_graph_node(session_id, node_type, label, properties, embedding_id)
404 }
405
406 pub fn get_graph_node(&self, node_id: i64) -> Result<Option<GraphNode>> {
407 self.graph_store
408 .get_graph_node(node_id)
409 .map(|opt| opt.map(from_kg_node))
410 }
411
412 pub fn list_graph_nodes(
413 &self,
414 session_id: &str,
415 node_type: Option<spec_ai_knowledge_graph::NodeType>,
416 limit: Option<i64>,
417 ) -> Result<Vec<GraphNode>> {
418 self.graph_store
419 .list_graph_nodes(session_id, node_type, limit)
420 .map(|nodes| nodes.into_iter().map(from_kg_node).collect())
421 }
422
423 pub fn count_graph_nodes(&self, session_id: &str) -> Result<i64> {
424 self.graph_store.count_graph_nodes(session_id)
425 }
426
427 pub fn update_graph_node(&self, node_id: i64, properties: &JsonValue) -> Result<()> {
428 self.graph_store.update_graph_node(node_id, properties)
429 }
430
431 pub fn delete_graph_node(&self, node_id: i64) -> Result<()> {
432 self.graph_store.delete_graph_node(node_id)
433 }
434
435 pub fn insert_graph_edge(
438 &self,
439 session_id: &str,
440 source_id: i64,
441 target_id: i64,
442 edge_type: spec_ai_knowledge_graph::EdgeType,
443 predicate: Option<&str>,
444 properties: Option<&JsonValue>,
445 weight: f32,
446 ) -> Result<i64> {
447 self.graph_store.insert_graph_edge(
448 session_id, source_id, target_id, edge_type, predicate, properties, weight,
449 )
450 }
451
452 pub fn get_graph_edge(&self, edge_id: i64) -> Result<Option<GraphEdge>> {
453 self.graph_store
454 .get_graph_edge(edge_id)
455 .map(|opt| opt.map(from_kg_edge))
456 }
457
458 pub fn list_graph_edges(
459 &self,
460 session_id: &str,
461 source_id: Option<i64>,
462 target_id: Option<i64>,
463 ) -> Result<Vec<GraphEdge>> {
464 self.graph_store
465 .list_graph_edges(session_id, source_id, target_id)
466 .map(|edges| edges.into_iter().map(from_kg_edge).collect())
467 }
468
469 pub fn count_graph_edges(&self, session_id: &str) -> Result<i64> {
470 self.graph_store.count_graph_edges(session_id)
471 }
472
473 pub fn delete_graph_edge(&self, edge_id: i64) -> Result<()> {
474 self.graph_store.delete_graph_edge(edge_id)
475 }
476
477 pub fn find_shortest_path(
480 &self,
481 session_id: &str,
482 source_id: i64,
483 target_id: i64,
484 max_hops: Option<usize>,
485 ) -> Result<Option<GraphPath>> {
486 self.graph_store
487 .find_shortest_path(session_id, source_id, target_id, max_hops)
488 .map(|opt| opt.map(from_kg_path))
489 }
490
491 pub fn traverse_neighbors(
492 &self,
493 session_id: &str,
494 node_id: i64,
495 direction: spec_ai_knowledge_graph::TraversalDirection,
496 depth: usize,
497 ) -> Result<Vec<GraphNode>> {
498 self.graph_store
499 .traverse_neighbors(session_id, node_id, direction, depth)
500 .map(|nodes| nodes.into_iter().map(from_kg_node).collect())
501 }
502
503 pub fn insert_transcription(
506 &self,
507 session_id: &str,
508 chunk_id: i64,
509 text: &str,
510 timestamp: chrono::DateTime<Utc>,
511 ) -> Result<i64> {
512 let conn = self.conn();
513 let mut stmt = conn.prepare(
514 "INSERT INTO transcriptions (session_id, chunk_id, text, timestamp, embedding_id) VALUES (?, ?, ?, ?, NULL) RETURNING id",
515 )?;
516 let id: i64 = stmt.query_row(
517 params![session_id, chunk_id, text, timestamp.to_rfc3339()],
518 |row| row.get(0),
519 )?;
520 Ok(id)
521 }
522
523 pub fn update_transcription_embedding(
524 &self,
525 transcription_id: i64,
526 embedding_id: i64,
527 ) -> Result<()> {
528 let conn = self.conn();
529 conn.execute(
530 "UPDATE transcriptions SET embedding_id = ? WHERE id = ?",
531 params![embedding_id, transcription_id],
532 )?;
533 Ok(())
534 }
535
536 pub fn list_transcriptions(
537 &self,
538 session_id: &str,
539 limit: Option<i64>,
540 ) -> Result<Vec<(i64, i64, String, DateTime<Utc>)>> {
541 let conn = self.conn();
542 let query = if let Some(lim) = limit {
543 format!(
544 "SELECT id, chunk_id, text, CAST(timestamp AS TEXT) FROM transcriptions WHERE session_id = ? ORDER BY chunk_id ASC LIMIT {}",
545 lim
546 )
547 } else {
548 "SELECT id, chunk_id, text, CAST(timestamp AS TEXT) FROM transcriptions WHERE session_id = ? ORDER BY chunk_id ASC".to_string()
549 };
550
551 let mut stmt = conn.prepare(&query)?;
552 let mut rows = stmt.query(params![session_id])?;
553 let mut out = Vec::new();
554
555 while let Some(row) = rows.next()? {
556 let id: i64 = row.get(0)?;
557 let chunk_id: i64 = row.get(1)?;
558 let text: String = row.get(2)?;
559 let timestamp_str: String = row.get(3)?;
560 let timestamp: DateTime<Utc> = timestamp_str.parse().unwrap_or_else(|_| Utc::now());
561 out.push((id, chunk_id, text, timestamp));
562 }
563
564 Ok(out)
565 }
566
567 pub fn get_full_transcription(&self, session_id: &str) -> Result<String> {
568 let transcriptions = self.list_transcriptions(session_id, None)?;
569 Ok(transcriptions
570 .into_iter()
571 .map(|(_, _, text, _)| text)
572 .collect::<Vec<_>>()
573 .join(" "))
574 }
575
576 pub fn delete_transcriptions(&self, session_id: &str) -> Result<()> {
577 let conn = self.conn();
578 conn.execute(
579 "DELETE FROM transcriptions WHERE session_id = ?",
580 params![session_id],
581 )?;
582 Ok(())
583 }
584
585 pub fn get_transcription_by_embedding(&self, embedding_id: i64) -> Result<Option<String>> {
586 let conn = self.conn();
587 let mut stmt =
588 conn.prepare("SELECT text FROM transcriptions WHERE embedding_id = ? LIMIT 1")?;
589 let result: Result<String, _> = stmt.query_row(params![embedding_id], |row| row.get(0));
590 match result {
591 Ok(text) => Ok(Some(text)),
592 Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
593 Err(e) => Err(e.into()),
594 }
595 }
596
597 pub fn upsert_tokenized_file(
601 &self,
602 session_id: &str,
603 path: &str,
604 file_hash: &str,
605 raw_tokens: usize,
606 cleaned_tokens: usize,
607 bytes_captured: usize,
608 truncated: bool,
609 embedding_id: Option<i64>,
610 ) -> Result<i64> {
611 let conn = self.conn();
612 conn.execute(
613 "DELETE FROM tokenized_files WHERE session_id = ? AND path = ?",
614 params![session_id, path],
615 )?;
616 let mut stmt = conn.prepare("INSERT INTO tokenized_files (session_id, path, file_hash, raw_tokens, cleaned_tokens, bytes_captured, truncated, embedding_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id")?;
617 let id: i64 = stmt.query_row(
618 params![
619 session_id,
620 path,
621 file_hash,
622 raw_tokens as i64,
623 cleaned_tokens as i64,
624 bytes_captured as i64,
625 truncated,
626 embedding_id
627 ],
628 |row| row.get(0),
629 )?;
630 Ok(id)
631 }
632
633 pub fn get_tokenized_file(
634 &self,
635 session_id: &str,
636 path: &str,
637 ) -> Result<Option<TokenizedFileRecord>> {
638 let conn = self.conn();
639 let mut stmt = conn.prepare("SELECT id, session_id, path, file_hash, raw_tokens, cleaned_tokens, bytes_captured, truncated, embedding_id, CAST(updated_at AS TEXT) FROM tokenized_files WHERE session_id = ? AND path = ? LIMIT 1")?;
640 let mut rows = stmt.query(params![session_id, path])?;
641 if let Some(row) = rows.next()? {
642 let record = TokenizedFileRecord::from_row(row)?;
643 Ok(Some(record))
644 } else {
645 Ok(None)
646 }
647 }
648
649 pub fn list_tokenized_files(&self, session_id: &str) -> Result<Vec<TokenizedFileRecord>> {
650 let conn = self.conn();
651 let mut stmt = conn.prepare("SELECT id, session_id, path, file_hash, raw_tokens, cleaned_tokens, bytes_captured, truncated, embedding_id, CAST(updated_at AS TEXT) FROM tokenized_files WHERE session_id = ? ORDER BY path")?;
652 let mut rows = stmt.query(params![session_id])?;
653 let mut out = Vec::new();
654 while let Some(row) = rows.next()? {
655 out.push(TokenizedFileRecord::from_row(row)?);
656 }
657 Ok(out)
658 }
659
660 pub fn mesh_message_store(
664 &self,
665 message_id: &str,
666 source_instance: &str,
667 target_instance: Option<&str>,
668 message_type: &str,
669 payload: &JsonValue,
670 status: &str,
671 ) -> Result<i64> {
672 let conn = self.conn();
673 let payload_json = serde_json::to_string(payload)?;
674 conn.execute(
675 "INSERT INTO mesh_messages (message_id, source_instance, target_instance, message_type, payload, status) VALUES (?, ?, ?, ?, ?, ?)",
676 params![message_id, source_instance, target_instance, message_type, payload_json, status],
677 )?;
678 let id: i64 = conn.query_row("SELECT last_insert_rowid()", params![], |row| row.get(0))?;
680 Ok(id)
681 }
682
683 pub fn mesh_message_exists(&self, message_id: &str) -> Result<bool> {
685 let conn = self.conn();
686 let count: i64 = conn.query_row(
687 "SELECT COUNT(*) FROM mesh_messages WHERE message_id = ?",
688 params![message_id],
689 |row| row.get(0),
690 )?;
691 Ok(count > 0)
692 }
693
694 pub fn mesh_message_update_status(&self, message_id: i64, status: &str) -> Result<()> {
696 let conn = self.conn();
697 conn.execute(
698 "UPDATE mesh_messages SET status = ?, delivered_at = CURRENT_TIMESTAMP WHERE id = ?",
699 params![status, message_id],
700 )?;
701 Ok(())
702 }
703
704 pub fn mesh_message_get_pending(
706 &self,
707 target_instance: &str,
708 ) -> Result<Vec<MeshMessageRecord>> {
709 let conn = self.conn();
710 let mut stmt = conn.prepare(
711 "SELECT id, source_instance, target_instance, message_type, payload, status, CAST(created_at AS TEXT), CAST(delivered_at AS TEXT)
712 FROM mesh_messages
713 WHERE (target_instance = ? OR target_instance IS NULL) AND status = 'pending'
714 ORDER BY created_at",
715 )?;
716 let mut rows = stmt.query(params![target_instance])?;
717 let mut out = Vec::new();
718 while let Some(row) = rows.next()? {
719 out.push(MeshMessageRecord::from_row(row)?);
720 }
721 Ok(out)
722 }
723
724 pub fn mesh_message_get_history(
726 &self,
727 instance_id: Option<&str>,
728 limit: usize,
729 ) -> Result<Vec<MeshMessageRecord>> {
730 let conn = self.conn();
731 let query = if instance_id.is_some() {
732 format!(
733 "SELECT id, source_instance, target_instance, message_type, payload, status, CAST(created_at AS TEXT), CAST(delivered_at AS TEXT)
734 FROM mesh_messages
735 WHERE source_instance = ? OR target_instance = ?
736 ORDER BY created_at DESC LIMIT {}",
737 limit
738 )
739 } else {
740 format!(
741 "SELECT id, source_instance, target_instance, message_type, payload, status, CAST(created_at AS TEXT), CAST(delivered_at AS TEXT)
742 FROM mesh_messages
743 ORDER BY created_at DESC LIMIT {}",
744 limit
745 )
746 };
747
748 let mut stmt = conn.prepare(&query)?;
749 let mut rows = if let Some(inst) = instance_id {
750 stmt.query(params![inst, inst])?
751 } else {
752 stmt.query(params![])?
753 };
754
755 let mut out = Vec::new();
756 while let Some(row) = rows.next()? {
757 out.push(MeshMessageRecord::from_row(row)?);
758 }
759 Ok(out)
760 }
761
762 pub fn graph_changelog_append(
766 &self,
767 session_id: &str,
768 instance_id: &str,
769 entity_type: &str,
770 entity_id: i64,
771 operation: &str,
772 vector_clock: &str,
773 data: Option<&str>,
774 ) -> Result<i64> {
775 self.graph_store.graph_changelog_append(
776 session_id,
777 instance_id,
778 entity_type,
779 entity_id,
780 operation,
781 vector_clock,
782 data,
783 )
784 }
785
786 pub fn graph_changelog_get_since(
788 &self,
789 session_id: &str,
790 since_timestamp: &str,
791 ) -> Result<Vec<ChangelogEntry>> {
792 self.graph_store
793 .graph_changelog_get_since(session_id, since_timestamp)
794 .map(|entries| {
795 entries
796 .into_iter()
797 .map(|e| ChangelogEntry {
798 id: e.id,
799 session_id: e.session_id,
800 instance_id: e.instance_id,
801 entity_type: e.entity_type,
802 entity_id: e.entity_id,
803 operation: e.operation,
804 vector_clock: e.vector_clock,
805 data: e.data,
806 created_at: e.created_at,
807 })
808 .collect()
809 })
810 }
811
812 pub fn graph_list_conflicts(&self, session_id: Option<&str>) -> Result<Vec<ChangelogEntry>> {
814 self.graph_store
815 .graph_list_conflicts(session_id, 100)
816 .map(|entries| {
817 entries
818 .into_iter()
819 .map(|e| ChangelogEntry {
820 id: e.id,
821 session_id: e.session_id,
822 instance_id: e.instance_id,
823 entity_type: e.entity_type,
824 entity_id: e.entity_id,
825 operation: e.operation,
826 vector_clock: e.vector_clock,
827 data: e.data,
828 created_at: e.created_at,
829 })
830 .collect()
831 })
832 }
833
834 pub fn graph_changelog_prune(&self, days_to_keep: i64) -> Result<usize> {
836 self.graph_store.graph_changelog_prune(days_to_keep)
837 }
838
839 pub fn graph_sync_state_get(
841 &self,
842 instance_id: &str,
843 session_id: &str,
844 graph_name: &str,
845 ) -> Result<Option<String>> {
846 self.graph_store
847 .graph_sync_state_get(instance_id, session_id, graph_name)
848 }
849
850 pub fn graph_sync_state_get_metadata(
852 &self,
853 instance_id: &str,
854 session_id: &str,
855 graph_name: &str,
856 ) -> Result<Option<SyncStateRecord>> {
857 self.graph_store
858 .graph_sync_state_get_metadata(instance_id, session_id, graph_name)
859 .map(|opt| {
860 opt.map(|r| SyncStateRecord {
861 vector_clock: r.vector_clock,
862 last_sync_at: r.last_sync_at,
863 })
864 })
865 }
866
867 pub fn graph_sync_state_update(
869 &self,
870 instance_id: &str,
871 session_id: &str,
872 graph_name: &str,
873 vector_clock: &str,
874 ) -> Result<()> {
875 self.graph_store
876 .graph_sync_state_update(instance_id, session_id, graph_name, vector_clock)
877 }
878
879 pub fn graph_set_sync_config(
881 &self,
882 session_id: &str,
883 graph_name: &str,
884 sync_enabled: bool,
885 conflict_resolution_strategy: Option<&str>,
886 sync_interval_seconds: Option<u64>,
887 ) -> Result<GraphSyncConfig> {
888 self.graph_store
889 .graph_set_sync_config(
890 session_id,
891 graph_name,
892 sync_enabled,
893 conflict_resolution_strategy,
894 sync_interval_seconds,
895 )
896 .map(|cfg| GraphSyncConfig {
897 sync_enabled: cfg.sync_enabled,
898 conflict_resolution_strategy: cfg.conflict_resolution_strategy,
899 sync_interval_seconds: cfg.sync_interval_seconds,
900 })
901 }
902
903 pub fn graph_get_sync_config(
905 &self,
906 session_id: &str,
907 graph_name: &str,
908 ) -> Result<GraphSyncConfig> {
909 self.graph_store
910 .graph_get_sync_config(session_id, graph_name)
911 .map(|cfg| GraphSyncConfig {
912 sync_enabled: cfg.sync_enabled,
913 conflict_resolution_strategy: cfg.conflict_resolution_strategy,
914 sync_interval_seconds: cfg.sync_interval_seconds,
915 })
916 }
917
918 pub fn graph_set_sync_enabled(
920 &self,
921 session_id: &str,
922 graph_name: &str,
923 enabled: bool,
924 ) -> Result<()> {
925 self.graph_store
926 .graph_set_sync_enabled(session_id, graph_name, enabled)
927 }
928
929 pub fn graph_get_sync_enabled(&self, session_id: &str, graph_name: &str) -> Result<bool> {
931 self.graph_store
932 .graph_get_sync_enabled(session_id, graph_name)
933 }
934
935 pub fn graph_list(&self, session_id: &str) -> Result<Vec<String>> {
937 self.graph_store.graph_list(session_id)
938 }
939
940 pub fn graph_list_sync_enabled(&self) -> Result<Vec<(String, String)>> {
942 self.graph_store.graph_list_sync_enabled()
943 }
944
945 pub fn graph_get_node_with_sync(&self, node_id: i64) -> Result<Option<SyncedNodeRecord>> {
947 self.graph_store
948 .graph_get_node_with_sync(node_id)
949 .map(|opt| {
950 opt.map(|r| SyncedNodeRecord {
951 id: r.id,
952 session_id: r.session_id,
953 node_type: r.node_type,
954 label: r.label,
955 properties: r.properties,
956 embedding_id: r.embedding_id,
957 created_at: r.created_at,
958 updated_at: r.updated_at,
959 vector_clock: r.vector_clock,
960 last_modified_by: r.last_modified_by,
961 is_deleted: r.is_deleted,
962 sync_enabled: r.sync_enabled,
963 })
964 })
965 }
966
967 pub fn graph_list_nodes_with_sync(
969 &self,
970 session_id: &str,
971 sync_enabled_only: bool,
972 include_deleted: bool,
973 ) -> Result<Vec<SyncedNodeRecord>> {
974 self.graph_store
975 .graph_list_nodes_with_sync(session_id, sync_enabled_only, include_deleted)
976 .map(|nodes| {
977 nodes
978 .into_iter()
979 .map(|r| SyncedNodeRecord {
980 id: r.id,
981 session_id: r.session_id,
982 node_type: r.node_type,
983 label: r.label,
984 properties: r.properties,
985 embedding_id: r.embedding_id,
986 created_at: r.created_at,
987 updated_at: r.updated_at,
988 vector_clock: r.vector_clock,
989 last_modified_by: r.last_modified_by,
990 is_deleted: r.is_deleted,
991 sync_enabled: r.sync_enabled,
992 })
993 .collect()
994 })
995 }
996
997 pub fn graph_get_edge_with_sync(&self, edge_id: i64) -> Result<Option<SyncedEdgeRecord>> {
999 self.graph_store
1000 .graph_get_edge_with_sync(edge_id)
1001 .map(|opt| {
1002 opt.map(|r| SyncedEdgeRecord {
1003 id: r.id,
1004 session_id: r.session_id,
1005 source_id: r.source_id,
1006 target_id: r.target_id,
1007 edge_type: r.edge_type,
1008 predicate: r.predicate,
1009 properties: r.properties,
1010 weight: r.weight,
1011 temporal_start: r.temporal_start,
1012 temporal_end: r.temporal_end,
1013 created_at: r.created_at,
1014 vector_clock: r.vector_clock,
1015 last_modified_by: r.last_modified_by,
1016 is_deleted: r.is_deleted,
1017 sync_enabled: r.sync_enabled,
1018 })
1019 })
1020 }
1021
1022 pub fn graph_list_edges_with_sync(
1024 &self,
1025 session_id: &str,
1026 sync_enabled_only: bool,
1027 include_deleted: bool,
1028 ) -> Result<Vec<SyncedEdgeRecord>> {
1029 self.graph_store
1030 .graph_list_edges_with_sync(session_id, sync_enabled_only, include_deleted)
1031 .map(|edges| {
1032 edges
1033 .into_iter()
1034 .map(|r| SyncedEdgeRecord {
1035 id: r.id,
1036 session_id: r.session_id,
1037 source_id: r.source_id,
1038 target_id: r.target_id,
1039 edge_type: r.edge_type,
1040 predicate: r.predicate,
1041 properties: r.properties,
1042 weight: r.weight,
1043 temporal_start: r.temporal_start,
1044 temporal_end: r.temporal_end,
1045 created_at: r.created_at,
1046 vector_clock: r.vector_clock,
1047 last_modified_by: r.last_modified_by,
1048 is_deleted: r.is_deleted,
1049 sync_enabled: r.sync_enabled,
1050 })
1051 .collect()
1052 })
1053 }
1054
1055 pub fn graph_update_node_sync_metadata(
1057 &self,
1058 node_id: i64,
1059 vector_clock: &str,
1060 last_modified_by: &str,
1061 sync_enabled: bool,
1062 ) -> Result<()> {
1063 self.graph_store.graph_update_node_sync_metadata(
1064 node_id,
1065 vector_clock,
1066 last_modified_by,
1067 sync_enabled,
1068 )
1069 }
1070
1071 pub fn graph_update_edge_sync_metadata(
1073 &self,
1074 edge_id: i64,
1075 vector_clock: &str,
1076 last_modified_by: &str,
1077 sync_enabled: bool,
1078 ) -> Result<()> {
1079 self.graph_store.graph_update_edge_sync_metadata(
1080 edge_id,
1081 vector_clock,
1082 last_modified_by,
1083 sync_enabled,
1084 )
1085 }
1086
1087 pub fn graph_mark_node_deleted(
1089 &self,
1090 node_id: i64,
1091 vector_clock: &str,
1092 deleted_by: &str,
1093 ) -> Result<()> {
1094 self.graph_store
1095 .graph_mark_node_deleted(node_id, vector_clock, deleted_by)
1096 }
1097
1098 pub fn graph_mark_edge_deleted(
1100 &self,
1101 edge_id: i64,
1102 vector_clock: &str,
1103 deleted_by: &str,
1104 ) -> Result<()> {
1105 self.graph_store
1106 .graph_mark_edge_deleted(edge_id, vector_clock, deleted_by)
1107 }
1108}
1109
1110#[derive(Debug, Clone)]
1111pub struct TokenizedFileRecord {
1112 pub id: i64,
1113 pub session_id: String,
1114 pub path: String,
1115 pub file_hash: String,
1116 pub raw_tokens: usize,
1117 pub cleaned_tokens: usize,
1118 pub bytes_captured: usize,
1119 pub truncated: bool,
1120 pub embedding_id: Option<i64>,
1121 pub updated_at: DateTime<Utc>,
1122}
1123
1124impl TokenizedFileRecord {
1125 fn from_row(row: &duckdb::Row) -> Result<Self> {
1126 let id: i64 = row.get(0)?;
1127 let session_id: String = row.get(1)?;
1128 let path: String = row.get(2)?;
1129 let file_hash: String = row.get(3)?;
1130 let raw_tokens: i64 = row.get(4)?;
1131 let cleaned_tokens: i64 = row.get(5)?;
1132 let bytes_captured: i64 = row.get(6)?;
1133 let truncated: bool = row.get(7)?;
1134 let embedding_id: Option<i64> = row.get(8)?;
1135 let updated_at: String = row.get(9)?;
1136
1137 Ok(Self {
1138 id,
1139 session_id,
1140 path,
1141 file_hash,
1142 raw_tokens: raw_tokens.max(0) as usize,
1143 cleaned_tokens: cleaned_tokens.max(0) as usize,
1144 bytes_captured: bytes_captured.max(0) as usize,
1145 truncated,
1146 embedding_id,
1147 updated_at: updated_at.parse().unwrap_or_else(|_| Utc::now()),
1148 })
1149 }
1150}
1151
1152#[derive(Debug, Clone)]
1153pub struct MeshMessageRecord {
1154 pub id: i64,
1155 pub source_instance: String,
1156 pub target_instance: Option<String>,
1157 pub message_type: String,
1158 pub payload: JsonValue,
1159 pub status: String,
1160 pub created_at: DateTime<Utc>,
1161 pub delivered_at: Option<DateTime<Utc>>,
1162}
1163
1164impl MeshMessageRecord {
1165 fn from_row(row: &duckdb::Row) -> Result<Self> {
1166 let id: i64 = row.get(0)?;
1167 let source_instance: String = row.get(1)?;
1168 let target_instance: Option<String> = row.get(2)?;
1169 let message_type: String = row.get(3)?;
1170 let payload_str: String = row.get(4)?;
1171 let payload: JsonValue = serde_json::from_str(&payload_str)?;
1172 let status: String = row.get(5)?;
1173 let created_at_str: String = row.get(6)?;
1174 let delivered_at_str: Option<String> = row.get(7)?;
1175
1176 Ok(MeshMessageRecord {
1177 id,
1178 source_instance,
1179 target_instance,
1180 message_type,
1181 payload,
1182 status,
1183 created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1184 delivered_at: delivered_at_str.and_then(|s| s.parse().ok()),
1185 })
1186 }
1187}
1188
1189#[derive(Debug, Clone)]
1192pub struct SyncStateRecord {
1193 pub vector_clock: String,
1194 pub last_sync_at: Option<String>,
1195}
1196
1197#[derive(Debug, Clone, Default)]
1198pub struct GraphSyncConfig {
1199 pub sync_enabled: bool,
1200 pub conflict_resolution_strategy: Option<String>,
1201 pub sync_interval_seconds: Option<u64>,
1202}
1203
1204#[derive(Debug, Clone)]
1205pub struct ChangelogEntry {
1206 pub id: i64,
1207 pub session_id: String,
1208 pub instance_id: String,
1209 pub entity_type: String,
1210 pub entity_id: i64,
1211 pub operation: String,
1212 pub vector_clock: String,
1213 pub data: Option<String>,
1214 pub created_at: DateTime<Utc>,
1215}
1216
1217impl ChangelogEntry {
1218 fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1219 let id: i64 = row.get(0)?;
1220 let session_id: String = row.get(1)?;
1221 let instance_id: String = row.get(2)?;
1222 let entity_type: String = row.get(3)?;
1223 let entity_id: i64 = row.get(4)?;
1224 let operation: String = row.get(5)?;
1225 let vector_clock: String = row.get(6)?;
1226 let data: Option<String> = row.get(7)?;
1227 let created_at_str: String = row.get(8)?;
1228
1229 Ok(ChangelogEntry {
1230 id,
1231 session_id,
1232 instance_id,
1233 entity_type,
1234 entity_id,
1235 operation,
1236 vector_clock,
1237 data,
1238 created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1239 })
1240 }
1241}
1242
1243#[derive(Debug, Clone)]
1244pub struct SyncedNodeRecord {
1245 pub id: i64,
1246 pub session_id: String,
1247 pub node_type: String,
1248 pub label: String,
1249 pub properties: serde_json::Value,
1250 pub embedding_id: Option<i64>,
1251 pub created_at: DateTime<Utc>,
1252 pub updated_at: DateTime<Utc>,
1253 pub vector_clock: String,
1254 pub last_modified_by: Option<String>,
1255 pub is_deleted: bool,
1256 pub sync_enabled: bool,
1257}
1258
1259impl SyncedNodeRecord {
1260 fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1261 let id: i64 = row.get(0)?;
1262 let session_id: String = row.get(1)?;
1263 let node_type: String = row.get(2)?;
1264 let label: String = row.get(3)?;
1265 let properties_str: String = row.get(4)?;
1266 let properties: serde_json::Value = serde_json::from_str(&properties_str).map_err(|e| {
1267 duckdb::Error::FromSqlConversionFailure(4, duckdb::types::Type::Text, Box::new(e))
1268 })?;
1269 let embedding_id: Option<i64> = row.get(5)?;
1270 let created_at_str: String = row.get(6)?;
1271 let updated_at_str: String = row.get(7)?;
1272 let vector_clock: String = row.get(8)?;
1273 let last_modified_by: Option<String> = row.get(9)?;
1274 let is_deleted: bool = row.get(10)?;
1275 let sync_enabled: bool = row.get(11)?;
1276
1277 Ok(SyncedNodeRecord {
1278 id,
1279 session_id,
1280 node_type,
1281 label,
1282 properties,
1283 embedding_id,
1284 created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1285 updated_at: updated_at_str.parse().unwrap_or_else(|_| Utc::now()),
1286 vector_clock,
1287 last_modified_by,
1288 is_deleted,
1289 sync_enabled,
1290 })
1291 }
1292}
1293
1294#[derive(Debug, Clone)]
1295pub struct SyncedEdgeRecord {
1296 pub id: i64,
1297 pub session_id: String,
1298 pub source_id: i64,
1299 pub target_id: i64,
1300 pub edge_type: String,
1301 pub predicate: Option<String>,
1302 pub properties: Option<serde_json::Value>,
1303 pub weight: f32,
1304 pub temporal_start: Option<DateTime<Utc>>,
1305 pub temporal_end: Option<DateTime<Utc>>,
1306 pub created_at: DateTime<Utc>,
1307 pub vector_clock: String,
1308 pub last_modified_by: Option<String>,
1309 pub is_deleted: bool,
1310 pub sync_enabled: bool,
1311}
1312
1313impl SyncedEdgeRecord {
1314 fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1315 let id: i64 = row.get(0)?;
1316 let session_id: String = row.get(1)?;
1317 let source_id: i64 = row.get(2)?;
1318 let target_id: i64 = row.get(3)?;
1319 let edge_type: String = row.get(4)?;
1320 let predicate: Option<String> = row.get(5)?;
1321 let properties_str: Option<String> = row.get(6)?;
1322 let properties: Option<serde_json::Value> = properties_str
1323 .as_ref()
1324 .and_then(|s| serde_json::from_str(s).ok());
1325 let weight: f32 = row.get(7)?;
1326 let temporal_start_str: Option<String> = row.get(8)?;
1327 let temporal_end_str: Option<String> = row.get(9)?;
1328 let created_at_str: String = row.get(10)?;
1329 let vector_clock: String = row.get(11)?;
1330 let last_modified_by: Option<String> = row.get(12)?;
1331 let is_deleted: bool = row.get(13)?;
1332 let sync_enabled: bool = row.get(14)?;
1333
1334 Ok(SyncedEdgeRecord {
1335 id,
1336 session_id,
1337 source_id,
1338 target_id,
1339 edge_type,
1340 predicate,
1341 properties,
1342 weight,
1343 temporal_start: temporal_start_str.and_then(|s| s.parse().ok()),
1344 temporal_end: temporal_end_str.and_then(|s| s.parse().ok()),
1345 created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1346 vector_clock,
1347 last_modified_by,
1348 is_deleted,
1349 sync_enabled,
1350 })
1351 }
1352}