1pub mod migrations;
2
3use anyhow::{Context, Result};
4use chrono::{DateTime, Utc};
5use directories::BaseDirs;
6use duckdb::{params, Connection};
7use serde_json::Value as JsonValue;
8use spec_ai_knowledge_graph::KnowledgeGraphStore;
9use std::path::{Path, PathBuf};
10use std::sync::{Arc, Mutex};
11
12use crate::types::{
13 EdgeType, GraphEdge, GraphNode, GraphPath, MemoryVector, Message, MessageRole, NodeType,
14 PolicyEntry, TraversalDirection,
15};
16
17#[derive(Clone)]
18pub struct Persistence {
19 conn: Arc<Mutex<Connection>>,
20 instance_id: String,
21 graph_store: KnowledgeGraphStore,
22}
23
24impl Persistence {
25 pub fn new<P: AsRef<Path>>(db_path: P) -> Result<Self> {
27 Self::with_instance_id(db_path, generate_instance_id())
28 }
29
30 pub fn with_instance_id<P: AsRef<Path>>(db_path: P, instance_id: String) -> Result<Self> {
32 let db_path = expand_tilde(db_path.as_ref())?;
33 if let Some(dir) = db_path.parent() {
34 std::fs::create_dir_all(dir).context("creating DB directory")?;
35 }
36 let conn = Connection::open(&db_path).context("opening DuckDB")?;
37 migrations::run(&conn).context("running migrations")?;
38 let conn_arc = Arc::new(Mutex::new(conn));
39 let graph_store = KnowledgeGraphStore::new(conn_arc.clone(), instance_id.clone());
40 Ok(Self {
41 conn: conn_arc,
42 instance_id,
43 graph_store,
44 })
45 }
46
47 pub fn instance_id(&self) -> &str {
49 &self.instance_id
50 }
51
52 pub fn graph_store(&self) -> &KnowledgeGraphStore {
54 &self.graph_store
55 }
56
57 pub fn checkpoint(&self) -> Result<()> {
60 let conn = self.conn();
61 conn.execute_batch("CHECKPOINT;")
62 .context("checkpointing database")
63 }
64
65 pub fn new_default() -> Result<Self> {
67 let base = BaseDirs::new().context("base directories not available")?;
68 let path = base.home_dir().join(".agent_cli").join("agent_data.duckdb");
69 Self::new(path)
70 }
71
72 pub fn conn(&self) -> std::sync::MutexGuard<'_, Connection> {
75 self.conn
76 .lock()
77 .expect("database connection mutex poisoned")
78 }
79
80 pub fn insert_message(
83 &self,
84 session_id: &str,
85 role: MessageRole,
86 content: &str,
87 ) -> Result<i64> {
88 let conn = self.conn();
89 let mut stmt = conn.prepare(
90 "INSERT INTO messages (session_id, role, content) VALUES (?, ?, ?) RETURNING id",
91 )?;
92 let id: i64 = stmt.query_row(params![session_id, role.as_str(), content], |row| {
93 row.get(0)
94 })?;
95 Ok(id)
96 }
97
98 pub fn list_messages(&self, session_id: &str, limit: i64) -> Result<Vec<Message>> {
99 let conn = self.conn();
100 let mut stmt = conn.prepare("SELECT id, session_id, role, content, CAST(created_at AS TEXT) as created_at FROM messages WHERE session_id = ? ORDER BY id DESC LIMIT ?")?;
101 let mut rows = stmt.query(params![session_id, limit])?;
102 let mut out = Vec::new();
103 while let Some(row) = rows.next()? {
104 let id: i64 = row.get(0)?;
105 let sid: String = row.get(1)?;
106 let role: String = row.get(2)?;
107 let content: String = row.get(3)?;
108 let created_at: String = row.get(4)?; let created_at: DateTime<Utc> = created_at.parse().unwrap_or_else(|_| Utc::now());
110 out.push(Message {
111 id,
112 session_id: sid,
113 role: MessageRole::from_str(&role),
114 content,
115 created_at,
116 });
117 }
118 out.reverse();
119 Ok(out)
120 }
121
122 pub fn get_message(&self, message_id: i64) -> Result<Option<Message>> {
123 let conn = self.conn();
124 let mut stmt = conn.prepare("SELECT id, session_id, role, content, CAST(created_at AS TEXT) as created_at FROM messages WHERE id = ?")?;
125 let mut rows = stmt.query(params![message_id])?;
126 if let Some(row) = rows.next()? {
127 let id: i64 = row.get(0)?;
128 let sid: String = row.get(1)?;
129 let role: String = row.get(2)?;
130 let content: String = row.get(3)?;
131 let created_at: String = row.get(4)?;
132 let created_at: DateTime<Utc> = created_at.parse().unwrap_or_else(|_| Utc::now());
133 Ok(Some(Message {
134 id,
135 session_id: sid,
136 role: MessageRole::from_str(&role),
137 content,
138 created_at,
139 }))
140 } else {
141 Ok(None)
142 }
143 }
144
145 pub fn prune_messages(&self, session_id: &str, keep_latest: i64) -> Result<u64> {
147 let conn = self.conn();
148 let mut stmt = conn.prepare("DELETE FROM messages WHERE session_id = ? AND id NOT IN (SELECT id FROM messages WHERE session_id = ? ORDER BY id DESC LIMIT ?)")?;
149 let changed = stmt.execute(params![session_id, session_id, keep_latest])? as u64;
150 Ok(changed)
151 }
152
153 pub fn insert_memory_vector(
156 &self,
157 session_id: &str,
158 message_id: Option<i64>,
159 embedding: &[f32],
160 ) -> Result<i64> {
161 let conn = self.conn();
162 let embedding_json = serde_json::to_string(embedding)?;
163 let mut stmt = conn.prepare("INSERT INTO memory_vectors (session_id, message_id, embedding) VALUES (?, ?, ?) RETURNING id")?;
164 let id: i64 = stmt.query_row(params![session_id, message_id, embedding_json], |row| {
165 row.get(0)
166 })?;
167 Ok(id)
168 }
169
170 pub fn recall_top_k(
171 &self,
172 session_id: &str,
173 query_embedding: &[f32],
174 k: usize,
175 ) -> Result<Vec<(MemoryVector, f32)>> {
176 let conn = self.conn();
177 let mut stmt = conn.prepare("SELECT id, session_id, message_id, embedding, CAST(created_at AS TEXT) as created_at FROM memory_vectors WHERE session_id = ?")?;
178 let mut rows = stmt.query(params![session_id])?;
179 let mut scored: Vec<(MemoryVector, f32)> = Vec::new();
180 while let Some(row) = rows.next()? {
181 let id: i64 = row.get(0)?;
182 let sid: String = row.get(1)?;
183 let message_id: Option<i64> = row.get(2)?;
184 let embedding_text: String = row.get(3)?;
185 let created_at: String = row.get(4)?;
186 let created_at: DateTime<Utc> = created_at.parse().unwrap_or_else(|_| Utc::now());
187 let embedding: Vec<f32> = serde_json::from_str(&embedding_text).unwrap_or_default();
188 let score = cosine_similarity(query_embedding, &embedding);
189 scored.push((
190 MemoryVector {
191 id,
192 session_id: sid,
193 message_id,
194 embedding,
195 created_at,
196 },
197 score,
198 ));
199 }
200 scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
201 scored.truncate(k);
202 Ok(scored)
203 }
204
205 pub fn list_sessions(&self) -> Result<Vec<String>> {
207 let conn = self.conn();
208 let mut stmt = conn.prepare(
209 "SELECT session_id, MAX(created_at) as last FROM messages GROUP BY session_id ORDER BY last DESC"
210 )?;
211 let mut rows = stmt.query([])?;
212 let mut out = Vec::new();
213 while let Some(row) = rows.next()? {
214 let sid: String = row.get(0)?;
215 out.push(sid);
216 }
217 Ok(out)
218 }
219
220 pub fn log_tool(
223 &self,
224 session_id: &str,
225 agent_name: &str,
226 run_id: &str,
227 tool_name: &str,
228 arguments: &JsonValue,
229 result: &JsonValue,
230 success: bool,
231 error: Option<&str>,
232 ) -> Result<i64> {
233 let conn = self.conn();
234 let mut stmt = conn.prepare("INSERT INTO tool_log (session_id, agent, run_id, tool_name, arguments, result, success, error) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id")?;
235 let id: i64 = stmt.query_row(
236 params![
237 session_id,
238 agent_name,
239 run_id,
240 tool_name,
241 arguments.to_string(),
242 result.to_string(),
243 success,
244 error.unwrap_or("")
245 ],
246 |row| row.get(0),
247 )?;
248 Ok(id)
249 }
250
251 pub fn policy_upsert(&self, key: &str, value: &JsonValue) -> Result<()> {
254 let conn = self.conn();
255 conn.execute_batch("BEGIN TRANSACTION;")?;
257 {
258 let mut del = conn.prepare("DELETE FROM policy_cache WHERE key = ?")?;
259 let _ = del.execute(params![key])?;
260 let mut ins = conn.prepare("INSERT INTO policy_cache (key, value, updated_at) VALUES (?, ?, CURRENT_TIMESTAMP)")?;
261 let _ = ins.execute(params![key, value.to_string()])?;
262 }
263 conn.execute_batch("COMMIT;")?;
264 Ok(())
265 }
266
267 pub fn policy_get(&self, key: &str) -> Result<Option<PolicyEntry>> {
268 let conn = self.conn();
269 let mut stmt = conn.prepare("SELECT key, value, CAST(updated_at AS TEXT) as updated_at FROM policy_cache WHERE key = ?")?;
270 let mut rows = stmt.query(params![key])?;
271 if let Some(row) = rows.next()? {
272 let key: String = row.get(0)?;
273 let value_text: String = row.get(1)?;
274 let updated_at: String = row.get(2)?;
275 let updated_at: DateTime<Utc> = updated_at.parse().unwrap_or_else(|_| Utc::now());
276 let value: JsonValue = serde_json::from_str(&value_text).unwrap_or(JsonValue::Null);
277 Ok(Some(PolicyEntry {
278 key,
279 value,
280 updated_at,
281 }))
282 } else {
283 Ok(None)
284 }
285 }
286}
287
288fn generate_instance_id() -> String {
289 let hostname = hostname::get()
290 .ok()
291 .and_then(|h| h.into_string().ok())
292 .unwrap_or_else(|| "unknown".to_string());
293 let uuid = uuid::Uuid::new_v7(uuid::Timestamp::now(uuid::NoContext));
294 format!("{}-{}", hostname, uuid)
295}
296
297fn expand_tilde(path: &Path) -> Result<PathBuf> {
298 let path_str = path.to_string_lossy();
299 if path_str == "~" {
300 let base = BaseDirs::new().context("base directories not available")?;
301 Ok(base.home_dir().to_path_buf())
302 } else if let Some(stripped) = path_str.strip_prefix("~/") {
303 let base = BaseDirs::new().context("base directories not available")?;
304 Ok(base.home_dir().join(stripped))
305 } else {
306 Ok(path.to_path_buf())
307 }
308}
309
310#[cfg(test)]
311mod tests {
312 use super::*;
313 use std::path::Path;
314
315 #[test]
316 fn expands_home_directory_prefix() {
317 let base = BaseDirs::new().expect("home directory available");
318 let expected = base.home_dir().join("demo.db");
319 let result = expand_tilde(Path::new("~/demo.db")).expect("path expansion succeeds");
320 assert_eq!(result, expected);
321 }
322
323 #[test]
324 fn leaves_regular_paths_unchanged() {
325 let input = Path::new("relative/path.db");
326 let result = expand_tilde(input).expect("path expansion succeeds");
327 assert_eq!(result, input);
328 }
329}
330
331fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
332 if a.is_empty() || b.is_empty() || a.len() != b.len() {
333 return 0.0;
334 }
335 let mut dot = 0.0f32;
336 let mut na = 0.0f32;
337 let mut nb = 0.0f32;
338 for i in 0..a.len() {
339 dot += a[i] * b[i];
340 na += a[i] * a[i];
341 nb += b[i] * b[i];
342 }
343 if na == 0.0 || nb == 0.0 {
344 return 0.0;
345 }
346 dot / (na.sqrt() * nb.sqrt())
347}
348
349fn from_kg_node(node: spec_ai_knowledge_graph::GraphNode) -> GraphNode {
354 GraphNode {
355 id: node.id,
356 session_id: node.session_id,
357 node_type: node.node_type,
358 label: node.label,
359 properties: node.properties,
360 embedding_id: node.embedding_id,
361 created_at: node.created_at,
362 updated_at: node.updated_at,
363 }
364}
365
366fn from_kg_edge(edge: spec_ai_knowledge_graph::GraphEdge) -> GraphEdge {
367 GraphEdge {
368 id: edge.id,
369 session_id: edge.session_id,
370 source_id: edge.source_id,
371 target_id: edge.target_id,
372 edge_type: edge.edge_type,
373 predicate: edge.predicate,
374 properties: edge.properties,
375 weight: edge.weight,
376 temporal_start: edge.temporal_start,
377 temporal_end: edge.temporal_end,
378 created_at: edge.created_at,
379 }
380}
381
382fn from_kg_path(path: spec_ai_knowledge_graph::GraphPath) -> GraphPath {
383 GraphPath {
384 nodes: path.nodes.into_iter().map(from_kg_node).collect(),
385 edges: path.edges.into_iter().map(from_kg_edge).collect(),
386 length: path.length,
387 weight: path.weight,
388 }
389}
390
391impl Persistence {
392 pub fn insert_graph_node(
395 &self,
396 session_id: &str,
397 node_type: spec_ai_knowledge_graph::NodeType,
398 label: &str,
399 properties: &JsonValue,
400 embedding_id: Option<i64>,
401 ) -> Result<i64> {
402 self.graph_store
403 .insert_graph_node(session_id, node_type, label, properties, embedding_id)
404 }
405
406 pub fn get_graph_node(&self, node_id: i64) -> Result<Option<GraphNode>> {
407 self.graph_store
408 .get_graph_node(node_id)
409 .map(|opt| opt.map(from_kg_node))
410 }
411
412 pub fn list_graph_nodes(
413 &self,
414 session_id: &str,
415 node_type: Option<spec_ai_knowledge_graph::NodeType>,
416 limit: Option<i64>,
417 ) -> Result<Vec<GraphNode>> {
418 self.graph_store
419 .list_graph_nodes(session_id, node_type, limit)
420 .map(|nodes| nodes.into_iter().map(from_kg_node).collect())
421 }
422
423 pub fn count_graph_nodes(&self, session_id: &str) -> Result<i64> {
424 self.graph_store.count_graph_nodes(session_id)
425 }
426
427 pub fn update_graph_node(&self, node_id: i64, properties: &JsonValue) -> Result<()> {
428 self.graph_store.update_graph_node(node_id, properties)
429 }
430
431 pub fn delete_graph_node(&self, node_id: i64) -> Result<()> {
432 self.graph_store.delete_graph_node(node_id)
433 }
434
435 pub fn insert_graph_edge(
438 &self,
439 session_id: &str,
440 source_id: i64,
441 target_id: i64,
442 edge_type: spec_ai_knowledge_graph::EdgeType,
443 predicate: Option<&str>,
444 properties: Option<&JsonValue>,
445 weight: f32,
446 ) -> Result<i64> {
447 self.graph_store.insert_graph_edge(
448 session_id, source_id, target_id, edge_type, predicate, properties, weight,
449 )
450 }
451
452 pub fn get_graph_edge(&self, edge_id: i64) -> Result<Option<GraphEdge>> {
453 self.graph_store
454 .get_graph_edge(edge_id)
455 .map(|opt| opt.map(from_kg_edge))
456 }
457
458 pub fn list_graph_edges(
459 &self,
460 session_id: &str,
461 source_id: Option<i64>,
462 target_id: Option<i64>,
463 ) -> Result<Vec<GraphEdge>> {
464 self.graph_store
465 .list_graph_edges(session_id, source_id, target_id)
466 .map(|edges| edges.into_iter().map(from_kg_edge).collect())
467 }
468
469 pub fn count_graph_edges(&self, session_id: &str) -> Result<i64> {
470 self.graph_store.count_graph_edges(session_id)
471 }
472
473 pub fn delete_graph_edge(&self, edge_id: i64) -> Result<()> {
474 self.graph_store.delete_graph_edge(edge_id)
475 }
476
477 pub fn find_shortest_path(
480 &self,
481 session_id: &str,
482 source_id: i64,
483 target_id: i64,
484 max_hops: Option<usize>,
485 ) -> Result<Option<GraphPath>> {
486 self.graph_store
487 .find_shortest_path(session_id, source_id, target_id, max_hops)
488 .map(|opt| opt.map(from_kg_path))
489 }
490
491 pub fn traverse_neighbors(
492 &self,
493 session_id: &str,
494 node_id: i64,
495 direction: spec_ai_knowledge_graph::TraversalDirection,
496 depth: usize,
497 ) -> Result<Vec<GraphNode>> {
498 self.graph_store
499 .traverse_neighbors(session_id, node_id, direction, depth)
500 .map(|nodes| nodes.into_iter().map(from_kg_node).collect())
501 }
502
503 pub fn insert_transcription(
506 &self,
507 session_id: &str,
508 chunk_id: i64,
509 text: &str,
510 timestamp: chrono::DateTime<Utc>,
511 ) -> Result<i64> {
512 let conn = self.conn();
513 let mut stmt = conn.prepare(
514 "INSERT INTO transcriptions (session_id, chunk_id, text, timestamp, embedding_id) VALUES (?, ?, ?, ?, NULL) RETURNING id",
515 )?;
516 let id: i64 = stmt.query_row(
517 params![session_id, chunk_id, text, timestamp.to_rfc3339()],
518 |row| row.get(0),
519 )?;
520 Ok(id)
521 }
522
523 pub fn update_transcription_embedding(
524 &self,
525 transcription_id: i64,
526 embedding_id: i64,
527 ) -> Result<()> {
528 let conn = self.conn();
529 conn.execute(
530 "UPDATE transcriptions SET embedding_id = ? WHERE id = ?",
531 params![embedding_id, transcription_id],
532 )?;
533 Ok(())
534 }
535
536 pub fn list_transcriptions(
537 &self,
538 session_id: &str,
539 limit: Option<i64>,
540 ) -> Result<Vec<(i64, i64, String, DateTime<Utc>)>> {
541 let conn = self.conn();
542 let query = if let Some(lim) = limit {
543 format!(
544 "SELECT id, chunk_id, text, CAST(timestamp AS TEXT) FROM transcriptions WHERE session_id = ? ORDER BY chunk_id ASC LIMIT {}",
545 lim
546 )
547 } else {
548 "SELECT id, chunk_id, text, CAST(timestamp AS TEXT) FROM transcriptions WHERE session_id = ? ORDER BY chunk_id ASC".to_string()
549 };
550
551 let mut stmt = conn.prepare(&query)?;
552 let mut rows = stmt.query(params![session_id])?;
553 let mut out = Vec::new();
554
555 while let Some(row) = rows.next()? {
556 let id: i64 = row.get(0)?;
557 let chunk_id: i64 = row.get(1)?;
558 let text: String = row.get(2)?;
559 let timestamp_str: String = row.get(3)?;
560 let timestamp: DateTime<Utc> = timestamp_str.parse().unwrap_or_else(|_| Utc::now());
561 out.push((id, chunk_id, text, timestamp));
562 }
563
564 Ok(out)
565 }
566
567 pub fn get_full_transcription(&self, session_id: &str) -> Result<String> {
568 let transcriptions = self.list_transcriptions(session_id, None)?;
569 Ok(transcriptions
570 .into_iter()
571 .map(|(_, _, text, _)| text)
572 .collect::<Vec<_>>()
573 .join(" "))
574 }
575
576 pub fn delete_transcriptions(&self, session_id: &str) -> Result<()> {
577 let conn = self.conn();
578 conn.execute(
579 "DELETE FROM transcriptions WHERE session_id = ?",
580 params![session_id],
581 )?;
582 Ok(())
583 }
584
585 pub fn get_transcription_by_embedding(&self, embedding_id: i64) -> Result<Option<String>> {
586 let conn = self.conn();
587 let mut stmt =
588 conn.prepare("SELECT text FROM transcriptions WHERE embedding_id = ? LIMIT 1")?;
589 let result: Result<String, _> = stmt.query_row(params![embedding_id], |row| row.get(0));
590 match result {
591 Ok(text) => Ok(Some(text)),
592 Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
593 Err(e) => Err(e.into()),
594 }
595 }
596
597 pub fn upsert_tokenized_file(
601 &self,
602 session_id: &str,
603 path: &str,
604 file_hash: &str,
605 raw_tokens: usize,
606 cleaned_tokens: usize,
607 bytes_captured: usize,
608 truncated: bool,
609 embedding_id: Option<i64>,
610 ) -> Result<i64> {
611 let conn = self.conn();
612 conn.execute(
613 "DELETE FROM tokenized_files WHERE session_id = ? AND path = ?",
614 params![session_id, path],
615 )?;
616 let mut stmt = conn.prepare("INSERT INTO tokenized_files (session_id, path, file_hash, raw_tokens, cleaned_tokens, bytes_captured, truncated, embedding_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING id")?;
617 let id: i64 = stmt.query_row(
618 params![
619 session_id,
620 path,
621 file_hash,
622 raw_tokens as i64,
623 cleaned_tokens as i64,
624 bytes_captured as i64,
625 truncated,
626 embedding_id
627 ],
628 |row| row.get(0),
629 )?;
630 Ok(id)
631 }
632
633 pub fn get_tokenized_file(
634 &self,
635 session_id: &str,
636 path: &str,
637 ) -> Result<Option<TokenizedFileRecord>> {
638 let conn = self.conn();
639 let mut stmt = conn.prepare("SELECT id, session_id, path, file_hash, raw_tokens, cleaned_tokens, bytes_captured, truncated, embedding_id, CAST(updated_at AS TEXT) FROM tokenized_files WHERE session_id = ? AND path = ? LIMIT 1")?;
640 let mut rows = stmt.query(params![session_id, path])?;
641 if let Some(row) = rows.next()? {
642 let record = TokenizedFileRecord::from_row(row)?;
643 Ok(Some(record))
644 } else {
645 Ok(None)
646 }
647 }
648
649 pub fn list_tokenized_files(&self, session_id: &str) -> Result<Vec<TokenizedFileRecord>> {
650 let conn = self.conn();
651 let mut stmt = conn.prepare("SELECT id, session_id, path, file_hash, raw_tokens, cleaned_tokens, bytes_captured, truncated, embedding_id, CAST(updated_at AS TEXT) FROM tokenized_files WHERE session_id = ? ORDER BY path")?;
652 let mut rows = stmt.query(params![session_id])?;
653 let mut out = Vec::new();
654 while let Some(row) = rows.next()? {
655 out.push(TokenizedFileRecord::from_row(row)?);
656 }
657 Ok(out)
658 }
659
660 pub fn mesh_message_store(
664 &self,
665 message_id: &str,
666 source_instance: &str,
667 target_instance: Option<&str>,
668 message_type: &str,
669 payload: &JsonValue,
670 status: &str,
671 ) -> Result<i64> {
672 let conn = self.conn();
673 let payload_json = serde_json::to_string(payload)?;
674 conn.execute(
675 "INSERT INTO mesh_messages (message_id, source_instance, target_instance, message_type, payload, status) VALUES (?, ?, ?, ?, ?, ?)",
676 params![message_id, source_instance, target_instance, message_type, payload_json, status],
677 )?;
678 let id: i64 = conn.query_row("SELECT last_insert_rowid()", params![], |row| row.get(0))?;
680 Ok(id)
681 }
682
683 pub fn mesh_message_exists(&self, message_id: &str) -> Result<bool> {
685 let conn = self.conn();
686 let count: i64 = conn.query_row(
687 "SELECT COUNT(*) FROM mesh_messages WHERE message_id = ?",
688 params![message_id],
689 |row| row.get(0),
690 )?;
691 Ok(count > 0)
692 }
693
694 pub fn mesh_message_update_status(&self, message_id: i64, status: &str) -> Result<()> {
696 let conn = self.conn();
697 conn.execute(
698 "UPDATE mesh_messages SET status = ?, delivered_at = CURRENT_TIMESTAMP WHERE id = ?",
699 params![status, message_id],
700 )?;
701 Ok(())
702 }
703
704 pub fn mesh_message_get_pending(
706 &self,
707 target_instance: &str,
708 ) -> Result<Vec<MeshMessageRecord>> {
709 let conn = self.conn();
710 let mut stmt = conn.prepare(
711 "SELECT id, source_instance, target_instance, message_type, payload, status, CAST(created_at AS TEXT), CAST(delivered_at AS TEXT)
712 FROM mesh_messages
713 WHERE (target_instance = ? OR target_instance IS NULL) AND status = 'pending'
714 ORDER BY created_at",
715 )?;
716 let mut rows = stmt.query(params![target_instance])?;
717 let mut out = Vec::new();
718 while let Some(row) = rows.next()? {
719 out.push(MeshMessageRecord::from_row(row)?);
720 }
721 Ok(out)
722 }
723
724 pub fn mesh_message_get_history(
726 &self,
727 instance_id: Option<&str>,
728 limit: usize,
729 ) -> Result<Vec<MeshMessageRecord>> {
730 let conn = self.conn();
731 let query = if instance_id.is_some() {
732 format!(
733 "SELECT id, source_instance, target_instance, message_type, payload, status, CAST(created_at AS TEXT), CAST(delivered_at AS TEXT)
734 FROM mesh_messages
735 WHERE source_instance = ? OR target_instance = ?
736 ORDER BY created_at DESC LIMIT {}",
737 limit
738 )
739 } else {
740 format!(
741 "SELECT id, source_instance, target_instance, message_type, payload, status, CAST(created_at AS TEXT), CAST(delivered_at AS TEXT)
742 FROM mesh_messages
743 ORDER BY created_at DESC LIMIT {}",
744 limit
745 )
746 };
747
748 let mut stmt = conn.prepare(&query)?;
749 let mut rows = if let Some(inst) = instance_id {
750 stmt.query(params![inst, inst])?
751 } else {
752 stmt.query(params![])?
753 };
754
755 let mut out = Vec::new();
756 while let Some(row) = rows.next()? {
757 out.push(MeshMessageRecord::from_row(row)?);
758 }
759 Ok(out)
760 }
761
762 pub fn graph_changelog_append(
766 &self,
767 session_id: &str,
768 instance_id: &str,
769 entity_type: &str,
770 entity_id: i64,
771 operation: &str,
772 vector_clock: &str,
773 data: Option<&str>,
774 ) -> Result<i64> {
775 self.graph_store.graph_changelog_append(
776 session_id,
777 instance_id,
778 entity_type,
779 entity_id,
780 operation,
781 vector_clock,
782 data,
783 )
784 }
785
786 pub fn graph_changelog_get_since(
788 &self,
789 session_id: &str,
790 since_timestamp: &str,
791 ) -> Result<Vec<ChangelogEntry>> {
792 self.graph_store
793 .graph_changelog_get_since(session_id, since_timestamp)
794 .map(|entries| {
795 entries
796 .into_iter()
797 .map(|e| ChangelogEntry {
798 id: e.id,
799 session_id: e.session_id,
800 instance_id: e.instance_id,
801 entity_type: e.entity_type,
802 entity_id: e.entity_id,
803 operation: e.operation,
804 vector_clock: e.vector_clock,
805 data: e.data,
806 created_at: e.created_at,
807 })
808 .collect()
809 })
810 }
811
812 pub fn graph_changelog_prune(&self, days_to_keep: i64) -> Result<usize> {
814 self.graph_store.graph_changelog_prune(days_to_keep)
815 }
816
817 pub fn graph_sync_state_get(
819 &self,
820 instance_id: &str,
821 session_id: &str,
822 graph_name: &str,
823 ) -> Result<Option<String>> {
824 self.graph_store
825 .graph_sync_state_get(instance_id, session_id, graph_name)
826 }
827
828 pub fn graph_sync_state_update(
830 &self,
831 instance_id: &str,
832 session_id: &str,
833 graph_name: &str,
834 vector_clock: &str,
835 ) -> Result<()> {
836 self.graph_store
837 .graph_sync_state_update(instance_id, session_id, graph_name, vector_clock)
838 }
839
840 pub fn graph_set_sync_enabled(
842 &self,
843 session_id: &str,
844 graph_name: &str,
845 enabled: bool,
846 ) -> Result<()> {
847 self.graph_store
848 .graph_set_sync_enabled(session_id, graph_name, enabled)
849 }
850
851 pub fn graph_get_sync_enabled(&self, session_id: &str, graph_name: &str) -> Result<bool> {
853 self.graph_store
854 .graph_get_sync_enabled(session_id, graph_name)
855 }
856
857 pub fn graph_list(&self, session_id: &str) -> Result<Vec<String>> {
859 self.graph_store.graph_list(session_id)
860 }
861
862 pub fn graph_list_sync_enabled(&self) -> Result<Vec<(String, String)>> {
864 self.graph_store.graph_list_sync_enabled()
865 }
866
867 pub fn graph_get_node_with_sync(&self, node_id: i64) -> Result<Option<SyncedNodeRecord>> {
869 self.graph_store
870 .graph_get_node_with_sync(node_id)
871 .map(|opt| {
872 opt.map(|r| SyncedNodeRecord {
873 id: r.id,
874 session_id: r.session_id,
875 node_type: r.node_type,
876 label: r.label,
877 properties: r.properties,
878 embedding_id: r.embedding_id,
879 created_at: r.created_at,
880 updated_at: r.updated_at,
881 vector_clock: r.vector_clock,
882 last_modified_by: r.last_modified_by,
883 is_deleted: r.is_deleted,
884 sync_enabled: r.sync_enabled,
885 })
886 })
887 }
888
889 pub fn graph_list_nodes_with_sync(
891 &self,
892 session_id: &str,
893 sync_enabled_only: bool,
894 include_deleted: bool,
895 ) -> Result<Vec<SyncedNodeRecord>> {
896 self.graph_store
897 .graph_list_nodes_with_sync(session_id, sync_enabled_only, include_deleted)
898 .map(|nodes| {
899 nodes
900 .into_iter()
901 .map(|r| SyncedNodeRecord {
902 id: r.id,
903 session_id: r.session_id,
904 node_type: r.node_type,
905 label: r.label,
906 properties: r.properties,
907 embedding_id: r.embedding_id,
908 created_at: r.created_at,
909 updated_at: r.updated_at,
910 vector_clock: r.vector_clock,
911 last_modified_by: r.last_modified_by,
912 is_deleted: r.is_deleted,
913 sync_enabled: r.sync_enabled,
914 })
915 .collect()
916 })
917 }
918
919 pub fn graph_get_edge_with_sync(&self, edge_id: i64) -> Result<Option<SyncedEdgeRecord>> {
921 self.graph_store
922 .graph_get_edge_with_sync(edge_id)
923 .map(|opt| {
924 opt.map(|r| SyncedEdgeRecord {
925 id: r.id,
926 session_id: r.session_id,
927 source_id: r.source_id,
928 target_id: r.target_id,
929 edge_type: r.edge_type,
930 predicate: r.predicate,
931 properties: r.properties,
932 weight: r.weight,
933 temporal_start: r.temporal_start,
934 temporal_end: r.temporal_end,
935 created_at: r.created_at,
936 vector_clock: r.vector_clock,
937 last_modified_by: r.last_modified_by,
938 is_deleted: r.is_deleted,
939 sync_enabled: r.sync_enabled,
940 })
941 })
942 }
943
944 pub fn graph_list_edges_with_sync(
946 &self,
947 session_id: &str,
948 sync_enabled_only: bool,
949 include_deleted: bool,
950 ) -> Result<Vec<SyncedEdgeRecord>> {
951 self.graph_store
952 .graph_list_edges_with_sync(session_id, sync_enabled_only, include_deleted)
953 .map(|edges| {
954 edges
955 .into_iter()
956 .map(|r| SyncedEdgeRecord {
957 id: r.id,
958 session_id: r.session_id,
959 source_id: r.source_id,
960 target_id: r.target_id,
961 edge_type: r.edge_type,
962 predicate: r.predicate,
963 properties: r.properties,
964 weight: r.weight,
965 temporal_start: r.temporal_start,
966 temporal_end: r.temporal_end,
967 created_at: r.created_at,
968 vector_clock: r.vector_clock,
969 last_modified_by: r.last_modified_by,
970 is_deleted: r.is_deleted,
971 sync_enabled: r.sync_enabled,
972 })
973 .collect()
974 })
975 }
976
977 pub fn graph_update_node_sync_metadata(
979 &self,
980 node_id: i64,
981 vector_clock: &str,
982 last_modified_by: &str,
983 sync_enabled: bool,
984 ) -> Result<()> {
985 self.graph_store.graph_update_node_sync_metadata(
986 node_id,
987 vector_clock,
988 last_modified_by,
989 sync_enabled,
990 )
991 }
992
993 pub fn graph_update_edge_sync_metadata(
995 &self,
996 edge_id: i64,
997 vector_clock: &str,
998 last_modified_by: &str,
999 sync_enabled: bool,
1000 ) -> Result<()> {
1001 self.graph_store.graph_update_edge_sync_metadata(
1002 edge_id,
1003 vector_clock,
1004 last_modified_by,
1005 sync_enabled,
1006 )
1007 }
1008
1009 pub fn graph_mark_node_deleted(
1011 &self,
1012 node_id: i64,
1013 vector_clock: &str,
1014 deleted_by: &str,
1015 ) -> Result<()> {
1016 self.graph_store
1017 .graph_mark_node_deleted(node_id, vector_clock, deleted_by)
1018 }
1019
1020 pub fn graph_mark_edge_deleted(
1022 &self,
1023 edge_id: i64,
1024 vector_clock: &str,
1025 deleted_by: &str,
1026 ) -> Result<()> {
1027 self.graph_store
1028 .graph_mark_edge_deleted(edge_id, vector_clock, deleted_by)
1029 }
1030}
1031
1032#[derive(Debug, Clone)]
1033pub struct TokenizedFileRecord {
1034 pub id: i64,
1035 pub session_id: String,
1036 pub path: String,
1037 pub file_hash: String,
1038 pub raw_tokens: usize,
1039 pub cleaned_tokens: usize,
1040 pub bytes_captured: usize,
1041 pub truncated: bool,
1042 pub embedding_id: Option<i64>,
1043 pub updated_at: DateTime<Utc>,
1044}
1045
1046impl TokenizedFileRecord {
1047 fn from_row(row: &duckdb::Row) -> Result<Self> {
1048 let id: i64 = row.get(0)?;
1049 let session_id: String = row.get(1)?;
1050 let path: String = row.get(2)?;
1051 let file_hash: String = row.get(3)?;
1052 let raw_tokens: i64 = row.get(4)?;
1053 let cleaned_tokens: i64 = row.get(5)?;
1054 let bytes_captured: i64 = row.get(6)?;
1055 let truncated: bool = row.get(7)?;
1056 let embedding_id: Option<i64> = row.get(8)?;
1057 let updated_at: String = row.get(9)?;
1058
1059 Ok(Self {
1060 id,
1061 session_id,
1062 path,
1063 file_hash,
1064 raw_tokens: raw_tokens.max(0) as usize,
1065 cleaned_tokens: cleaned_tokens.max(0) as usize,
1066 bytes_captured: bytes_captured.max(0) as usize,
1067 truncated,
1068 embedding_id,
1069 updated_at: updated_at.parse().unwrap_or_else(|_| Utc::now()),
1070 })
1071 }
1072}
1073
1074#[derive(Debug, Clone)]
1075pub struct MeshMessageRecord {
1076 pub id: i64,
1077 pub source_instance: String,
1078 pub target_instance: Option<String>,
1079 pub message_type: String,
1080 pub payload: JsonValue,
1081 pub status: String,
1082 pub created_at: DateTime<Utc>,
1083 pub delivered_at: Option<DateTime<Utc>>,
1084}
1085
1086impl MeshMessageRecord {
1087 fn from_row(row: &duckdb::Row) -> Result<Self> {
1088 let id: i64 = row.get(0)?;
1089 let source_instance: String = row.get(1)?;
1090 let target_instance: Option<String> = row.get(2)?;
1091 let message_type: String = row.get(3)?;
1092 let payload_str: String = row.get(4)?;
1093 let payload: JsonValue = serde_json::from_str(&payload_str)?;
1094 let status: String = row.get(5)?;
1095 let created_at_str: String = row.get(6)?;
1096 let delivered_at_str: Option<String> = row.get(7)?;
1097
1098 Ok(MeshMessageRecord {
1099 id,
1100 source_instance,
1101 target_instance,
1102 message_type,
1103 payload,
1104 status,
1105 created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1106 delivered_at: delivered_at_str.and_then(|s| s.parse().ok()),
1107 })
1108 }
1109}
1110
1111#[derive(Debug, Clone)]
1114pub struct ChangelogEntry {
1115 pub id: i64,
1116 pub session_id: String,
1117 pub instance_id: String,
1118 pub entity_type: String,
1119 pub entity_id: i64,
1120 pub operation: String,
1121 pub vector_clock: String,
1122 pub data: Option<String>,
1123 pub created_at: DateTime<Utc>,
1124}
1125
1126impl ChangelogEntry {
1127 fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1128 let id: i64 = row.get(0)?;
1129 let session_id: String = row.get(1)?;
1130 let instance_id: String = row.get(2)?;
1131 let entity_type: String = row.get(3)?;
1132 let entity_id: i64 = row.get(4)?;
1133 let operation: String = row.get(5)?;
1134 let vector_clock: String = row.get(6)?;
1135 let data: Option<String> = row.get(7)?;
1136 let created_at_str: String = row.get(8)?;
1137
1138 Ok(ChangelogEntry {
1139 id,
1140 session_id,
1141 instance_id,
1142 entity_type,
1143 entity_id,
1144 operation,
1145 vector_clock,
1146 data,
1147 created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1148 })
1149 }
1150}
1151
1152#[derive(Debug, Clone)]
1153pub struct SyncedNodeRecord {
1154 pub id: i64,
1155 pub session_id: String,
1156 pub node_type: String,
1157 pub label: String,
1158 pub properties: serde_json::Value,
1159 pub embedding_id: Option<i64>,
1160 pub created_at: DateTime<Utc>,
1161 pub updated_at: DateTime<Utc>,
1162 pub vector_clock: String,
1163 pub last_modified_by: Option<String>,
1164 pub is_deleted: bool,
1165 pub sync_enabled: bool,
1166}
1167
1168impl SyncedNodeRecord {
1169 fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1170 let id: i64 = row.get(0)?;
1171 let session_id: String = row.get(1)?;
1172 let node_type: String = row.get(2)?;
1173 let label: String = row.get(3)?;
1174 let properties_str: String = row.get(4)?;
1175 let properties: serde_json::Value = serde_json::from_str(&properties_str).map_err(|e| {
1176 duckdb::Error::FromSqlConversionFailure(4, duckdb::types::Type::Text, Box::new(e))
1177 })?;
1178 let embedding_id: Option<i64> = row.get(5)?;
1179 let created_at_str: String = row.get(6)?;
1180 let updated_at_str: String = row.get(7)?;
1181 let vector_clock: String = row.get(8)?;
1182 let last_modified_by: Option<String> = row.get(9)?;
1183 let is_deleted: bool = row.get(10)?;
1184 let sync_enabled: bool = row.get(11)?;
1185
1186 Ok(SyncedNodeRecord {
1187 id,
1188 session_id,
1189 node_type,
1190 label,
1191 properties,
1192 embedding_id,
1193 created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1194 updated_at: updated_at_str.parse().unwrap_or_else(|_| Utc::now()),
1195 vector_clock,
1196 last_modified_by,
1197 is_deleted,
1198 sync_enabled,
1199 })
1200 }
1201}
1202
1203#[derive(Debug, Clone)]
1204pub struct SyncedEdgeRecord {
1205 pub id: i64,
1206 pub session_id: String,
1207 pub source_id: i64,
1208 pub target_id: i64,
1209 pub edge_type: String,
1210 pub predicate: Option<String>,
1211 pub properties: Option<serde_json::Value>,
1212 pub weight: f32,
1213 pub temporal_start: Option<DateTime<Utc>>,
1214 pub temporal_end: Option<DateTime<Utc>>,
1215 pub created_at: DateTime<Utc>,
1216 pub vector_clock: String,
1217 pub last_modified_by: Option<String>,
1218 pub is_deleted: bool,
1219 pub sync_enabled: bool,
1220}
1221
1222impl SyncedEdgeRecord {
1223 fn from_row(row: &duckdb::Row) -> Result<Self, duckdb::Error> {
1224 let id: i64 = row.get(0)?;
1225 let session_id: String = row.get(1)?;
1226 let source_id: i64 = row.get(2)?;
1227 let target_id: i64 = row.get(3)?;
1228 let edge_type: String = row.get(4)?;
1229 let predicate: Option<String> = row.get(5)?;
1230 let properties_str: Option<String> = row.get(6)?;
1231 let properties: Option<serde_json::Value> = properties_str
1232 .as_ref()
1233 .and_then(|s| serde_json::from_str(s).ok());
1234 let weight: f32 = row.get(7)?;
1235 let temporal_start_str: Option<String> = row.get(8)?;
1236 let temporal_end_str: Option<String> = row.get(9)?;
1237 let created_at_str: String = row.get(10)?;
1238 let vector_clock: String = row.get(11)?;
1239 let last_modified_by: Option<String> = row.get(12)?;
1240 let is_deleted: bool = row.get(13)?;
1241 let sync_enabled: bool = row.get(14)?;
1242
1243 Ok(SyncedEdgeRecord {
1244 id,
1245 session_id,
1246 source_id,
1247 target_id,
1248 edge_type,
1249 predicate,
1250 properties,
1251 weight,
1252 temporal_start: temporal_start_str.and_then(|s| s.parse().ok()),
1253 temporal_end: temporal_end_str.and_then(|s| s.parse().ok()),
1254 created_at: created_at_str.parse().unwrap_or_else(|_| Utc::now()),
1255 vector_clock,
1256 last_modified_by,
1257 is_deleted,
1258 sync_enabled,
1259 })
1260 }
1261}