1mod migrations;
6mod index;
7
8pub use migrations::{get_schema_version, needs_migration, run_migrations};
9pub use index::{IndexConfig, PersistentHnswIndex, SharedPersistentIndex};
10
11use cp_core::{Chunk, CPError, Document, Edge, EdgeKind, Embedding, Result};
12use rusqlite::{params, Connection, OptionalExtension};
13use std::path::PathBuf;
14use tracing::info;
15use uuid::Uuid;
16
17
18pub struct GraphStore {
20 db: Connection,
22 hnsw: SharedPersistentIndex,
24 db_path: Option<PathBuf>,
26 index_config: IndexConfig,
28}
29
30impl GraphStore {
31 pub fn open(db_path: &str) -> Result<Self> {
33 Self::open_with_config(db_path, IndexConfig::default())
34 }
35
36 pub fn open_with_config(db_path: &str, index_config: IndexConfig) -> Result<Self> {
38 let (db, path) = if db_path == ":memory:" {
39 (Connection::open_in_memory().map_err(|e| CPError::Database(e.to_string()))?, None)
40 } else {
41 let path = PathBuf::from(db_path);
42 (Connection::open(&path).map_err(|e| CPError::Database(e.to_string()))?, Some(path))
43 };
44
45 run_migrations(&db)?;
47
48 let hnsw = if let Some(ref p) = path {
50 let index_path = p.with_extension("usearch");
51 SharedPersistentIndex::open(index_path, index_config.clone())?
52 } else {
53 SharedPersistentIndex::new(index_config.clone())?
54 };
55
56 let store = Self {
57 db,
58 hnsw,
59 db_path: path,
60 index_config,
61 };
62
63 if store.hnsw.needs_rebuild() {
65 store.rebuild_hnsw_index()?;
66 } else {
67 let current_root = store.compute_merkle_root()?;
69 if !store.hnsw.is_valid(¤t_root) {
70 info!("Index checkpoint mismatch, rebuilding...");
71 store.rebuild_hnsw_index()?;
72 }
73 }
74
75 info!("Opened graph store at {} and loaded index", db_path);
76
77 Ok(store)
78 }
79
80 pub fn in_memory() -> Result<Self> {
82 Self::open(":memory:")
83 }
84
85 fn rebuild_hnsw_index(&self) -> Result<()> {
87 info!("Rebuilding HNSW index from database...");
88 self.hnsw.clear()?;
89
90 let embs = self.get_all_embeddings()?;
91 for emb in embs {
92 self.hnsw.insert(emb.id, emb.to_f32())?;
93 }
94
95 let root = self.compute_merkle_root()?;
97 self.hnsw.checkpoint(root)?;
98
99 info!("HNSW index rebuilt with {} vectors", self.hnsw.len());
100 Ok(())
101 }
102
103 pub fn insert_document(&mut self, doc: &Document) -> Result<()> {
107 self.db
108 .execute(
109 "INSERT OR REPLACE INTO documents (id, path, hash, hierarchical_hash, mtime, size, mime_type)
110 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
111 params![
112 doc.id.as_bytes().as_slice(),
113 doc.path.to_string_lossy().as_ref(),
114 doc.hash.as_slice(),
115 doc.hierarchical_hash.as_slice(),
116 doc.mtime,
117 doc.size as i64,
118 &doc.mime_type,
119 ],
120 )
121 .map_err(|e| CPError::Database(e.to_string()))?;
122
123 Ok(())
124 }
125
126 pub fn get_document(&self, id: Uuid) -> Result<Option<Document>> {
128 self.db
129 .query_row(
130 "SELECT id, path, hash, hierarchical_hash, mtime, size, mime_type FROM documents WHERE id = ?1",
131 params![id.as_bytes().as_slice()],
132 |row| {
133 let id_bytes: Vec<u8> = row.get(0)?;
134 let path_str: String = row.get(1)?;
135 let hash_bytes: Vec<u8> = row.get(2)?;
136 let hh_bytes: Vec<u8> = row.get(3)?;
137 let mtime: i64 = row.get(4)?;
138 let size: i64 = row.get(5)?;
139 let mime_type: String = row.get(6)?;
140
141 let id = Uuid::from_slice(&id_bytes).expect("invalid uuid");
142 let mut hash = [0u8; 32];
143 hash.copy_from_slice(&hash_bytes);
144 let mut hierarchical_hash = [0u8; 32];
145 hierarchical_hash.copy_from_slice(&hh_bytes);
146
147 Ok(Document {
148 id,
149 path_id: Uuid::nil(), path: std::path::PathBuf::from(path_str),
152 hash,
153 hierarchical_hash,
154 mtime,
155 size: size as u64,
156 mime_type,
157 })
158 },
159 )
160 .optional()
161 .map_err(|e| CPError::Database(e.to_string()))
162 }
163
164 pub fn get_document_by_path(&self, path: &std::path::Path) -> Result<Option<Document>> {
166 self.db
167 .query_row(
168 "SELECT id, path, hash, hierarchical_hash, mtime, size, mime_type FROM documents WHERE path = ?1",
169 params![path.to_string_lossy().as_ref()],
170 |row| {
171 let id_bytes: Vec<u8> = row.get(0)?;
172 let path_str: String = row.get(1)?;
173 let hash_bytes: Vec<u8> = row.get(2)?;
174 let hh_bytes: Vec<u8> = row.get(3)?;
175 let mtime: i64 = row.get(4)?;
176 let size: i64 = row.get(5)?;
177 let mime_type: String = row.get(6)?;
178
179 let id = Uuid::from_slice(&id_bytes).expect("invalid uuid");
180 let mut hash = [0u8; 32];
181 hash.copy_from_slice(&hash_bytes);
182 let mut hierarchical_hash = [0u8; 32];
183 hierarchical_hash.copy_from_slice(&hh_bytes);
184
185 Ok(Document {
186 id,
187 path_id: Uuid::nil(), path: std::path::PathBuf::from(path_str),
190 hash,
191 hierarchical_hash,
192 mtime,
193 size: size as u64,
194 mime_type,
195 })
196 },
197 )
198 .optional()
199 .map_err(|e| CPError::Database(e.to_string()))
200 }
201
202 pub fn delete_document(&mut self, id: Uuid) -> Result<()> {
204 self.db
206 .execute(
207 "DELETE FROM embeddings WHERE chunk_id IN (SELECT id FROM chunks WHERE doc_id = ?1)",
208 params![id.as_bytes().as_slice()],
209 )
210 .map_err(|e| CPError::Database(e.to_string()))?;
211
212 self.db
213 .execute(
214 "DELETE FROM chunks WHERE doc_id = ?1",
215 params![id.as_bytes().as_slice()],
216 )
217 .map_err(|e| CPError::Database(e.to_string()))?;
218
219 self.db
220 .execute(
221 "DELETE FROM documents WHERE id = ?1",
222 params![id.as_bytes().as_slice()],
223 )
224 .map_err(|e| CPError::Database(e.to_string()))?;
225
226 Ok(())
227 }
228
229 pub fn get_all_documents(&self) -> Result<Vec<Document>> {
231 let mut stmt = self
232 .db
233 .prepare("SELECT id, path, hash, hierarchical_hash, mtime, size, mime_type FROM documents")
234 .map_err(|e| CPError::Database(e.to_string()))?;
235
236 let docs = stmt
237 .query_map([], |row| {
238 let id_bytes: Vec<u8> = row.get(0)?;
239 let path_str: String = row.get(1)?;
240 let hash_bytes: Vec<u8> = row.get(2)?;
241 let hh_bytes: Vec<u8> = row.get(3)?;
242 let mtime: i64 = row.get(4)?;
243 let size: i64 = row.get(5)?;
244 let mime_type: String = row.get(6)?;
245
246 let id = Uuid::from_slice(&id_bytes).expect("invalid uuid");
247 let mut hash = [0u8; 32];
248 hash.copy_from_slice(&hash_bytes);
249 let mut hierarchical_hash = [0u8; 32];
250 hierarchical_hash.copy_from_slice(&hh_bytes);
251
252 Ok(Document {
253 id,
254 path_id: Uuid::nil(), path: std::path::PathBuf::from(path_str),
257 hash,
258 hierarchical_hash,
259 mtime,
260 size: size as u64,
261 mime_type,
262 })
263 })
264 .map_err(|e| CPError::Database(e.to_string()))?
265 .collect::<std::result::Result<Vec<_>, _>>()
266 .map_err(|e| CPError::Database(e.to_string()))?;
267
268 Ok(docs)
269 }
270
271 pub fn insert_chunk(&mut self, chunk: &Chunk) -> Result<()> {
275 self.db
276 .execute(
277 "INSERT OR REPLACE INTO chunks (id, doc_id, text, byte_offset, byte_length, sequence, text_hash)
278 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
279 params![
280 chunk.id.as_bytes().as_slice(),
281 chunk.doc_id.as_bytes().as_slice(),
282 &chunk.text,
283 chunk.byte_offset as i64,
284 chunk.byte_length as i64,
285 chunk.sequence,
286 chunk.text_hash.as_slice(),
287 ],
288 )
289 .map_err(|e| CPError::Database(e.to_string()))?;
290
291 Ok(())
292 }
293
294 pub fn get_chunks_for_doc(&self, doc_id: Uuid) -> Result<Vec<Chunk>> {
296 let mut stmt = self
297 .db
298 .prepare(
299 "SELECT id, doc_id, text, byte_offset, byte_length, sequence, text_hash
300 FROM chunks WHERE doc_id = ?1 ORDER BY sequence",
301 )
302 .map_err(|e| CPError::Database(e.to_string()))?;
303
304 let chunks = stmt
305 .query_map(params![doc_id.as_bytes().as_slice()], |row| {
306 let id_bytes: Vec<u8> = row.get(0)?;
307 let doc_id_bytes: Vec<u8> = row.get(1)?;
308 let text: String = row.get(2)?;
309 let byte_offset: i64 = row.get(3)?;
310 let byte_length: i64 = row.get(4)?;
311 let sequence: u32 = row.get(5)?;
312 let text_hash_bytes: Vec<u8> = row.get(6)?;
313
314 let id = Uuid::from_slice(&id_bytes).expect("invalid uuid");
315 let doc_id = Uuid::from_slice(&doc_id_bytes).expect("invalid uuid");
316 let mut text_hash = [0u8; 32];
317 text_hash.copy_from_slice(&text_hash_bytes);
318
319 Ok(Chunk {
320 id,
321 doc_id,
322 text,
323 byte_offset: byte_offset as u64,
324 byte_length: byte_length as u64,
325 sequence,
326 text_hash,
327 })
328 })
329 .map_err(|e| CPError::Database(e.to_string()))?
330 .collect::<std::result::Result<Vec<_>, _>>()
331 .map_err(|e| CPError::Database(e.to_string()))?;
332
333 Ok(chunks)
334 }
335
336 pub fn get_chunk(&self, id: Uuid) -> Result<Option<Chunk>> {
338 self.db
339 .query_row(
340 "SELECT id, doc_id, text, byte_offset, byte_length, sequence, text_hash FROM chunks WHERE id = ?1",
341 params![id.as_bytes().as_slice()],
342 |row| {
343 let id_bytes: Vec<u8> = row.get(0)?;
344 let doc_id_bytes: Vec<u8> = row.get(1)?;
345 let text: String = row.get(2)?;
346 let byte_offset: i64 = row.get(3)?;
347 let byte_length: i64 = row.get(4)?;
348 let sequence: u32 = row.get(5)?;
349 let text_hash_bytes: Vec<u8> = row.get(6)?;
350
351 let id = Uuid::from_slice(&id_bytes).expect("invalid uuid");
352 let doc_id = Uuid::from_slice(&doc_id_bytes).expect("invalid uuid");
353 let mut text_hash = [0u8; 32];
354 text_hash.copy_from_slice(&text_hash_bytes);
355
356 Ok(Chunk {
357 id,
358 doc_id,
359 text,
360 byte_offset: byte_offset as u64,
361 byte_length: byte_length as u64,
362 sequence,
363 text_hash,
364 })
365 },
366 )
367 .optional()
368 .map_err(|e| CPError::Database(e.to_string()))
369 }
370
371 pub fn insert_embedding(&mut self, emb: &Embedding) -> Result<()> {
375 let vector_bytes: Vec<u8> = emb
378 .vector
379 .iter()
380 .flat_map(|val| val.to_le_bytes())
381 .collect();
382
383 self.db
384 .execute(
385 "INSERT OR REPLACE INTO embeddings (id, chunk_id, vector, model_hash, dim, l2_norm)
386 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
387 params![
388 emb.id.as_bytes().as_slice(),
389 emb.chunk_id.as_bytes().as_slice(),
390 &vector_bytes,
391 emb.model_hash.as_slice(),
392 emb.dim as i32,
393 emb.l2_norm,
394 ],
395 )
396 .map_err(|e| CPError::Database(e.to_string()))?;
397
398 let vector_f32 = emb.to_f32();
400 self.hnsw.insert(emb.id, vector_f32)?;
401
402 self.hnsw.invalidate();
404
405 Ok(())
406 }
407
408 pub fn get_embedding_for_chunk(&self, chunk_id: Uuid) -> Result<Option<Embedding>> {
410 self.db
411 .query_row(
412 "SELECT id, chunk_id, vector, model_hash, dim, l2_norm
413 FROM embeddings WHERE chunk_id = ?1",
414 params![chunk_id.as_bytes().as_slice()],
415 |row| self.row_to_embedding(row),
416 )
417 .optional()
418 .map_err(|e| CPError::Database(e.to_string()))
419 }
420
421 pub fn get_embedding(&self, id: Uuid) -> Result<Option<Embedding>> {
423 self.db
424 .query_row(
425 "SELECT id, chunk_id, vector, model_hash, dim, l2_norm
426 FROM embeddings WHERE id = ?1",
427 params![id.as_bytes().as_slice()],
428 |row| self.row_to_embedding(row),
429 )
430 .optional()
431 .map_err(|e| CPError::Database(e.to_string()))
432 }
433
434 fn row_to_embedding(&self, row: &rusqlite::Row) -> rusqlite::Result<Embedding> {
435 let id_bytes: Vec<u8> = row.get(0)?;
436 let chunk_id_bytes: Vec<u8> = row.get(1)?;
437 let vector_bytes: Vec<u8> = row.get(2)?;
438 let model_hash_bytes: Vec<u8> = row.get(3)?;
439 let _dim: i32 = row.get(4)?;
440 let l2_norm: f32 = row.get(5).unwrap_or(0.0);
441
442 let _id = Uuid::from_slice(&id_bytes).expect("invalid uuid");
443 let chunk_id = Uuid::from_slice(&chunk_id_bytes).expect("invalid uuid");
444 let mut model_hash = [0u8; 32];
445 model_hash.copy_from_slice(&model_hash_bytes);
446
447 let vector: Vec<i16> = vector_bytes
449 .chunks(2)
450 .map(|bytes| i16::from_le_bytes([bytes[0], bytes[1]]))
451 .collect();
452
453 Ok(Embedding::from_quantized_with_norm(
454 chunk_id,
455 vector,
456 model_hash,
457 l2_norm,
458 0, ))
460 }
461
462 pub fn get_chunk_id_for_embedding(&self, embedding_id: Uuid) -> Result<Option<Uuid>> {
464 self.db
465 .query_row(
466 "SELECT chunk_id FROM embeddings WHERE id = ?1",
467 params![embedding_id.as_bytes().as_slice()],
468 |row| {
469 let id_bytes: Vec<u8> = row.get(0)?;
470 Ok(Uuid::from_slice(&id_bytes).expect("invalid uuid"))
471 },
472 )
473 .optional()
474 .map_err(|e| CPError::Database(e.to_string()))
475 }
476
477 pub fn get_all_embeddings(&self) -> Result<Vec<Embedding>> {
479 let mut stmt = self
480 .db
481 .prepare("SELECT id, chunk_id, vector, model_hash, dim, l2_norm FROM embeddings")
482 .map_err(|e| CPError::Database(e.to_string()))?;
483
484 let embs = stmt
485 .query_map([], |row| self.row_to_embedding(row))
486 .map_err(|e| CPError::Database(e.to_string()))?
487 .collect::<std::result::Result<Vec<_>, _>>()
488 .map_err(|e| CPError::Database(e.to_string()))?;
489
490 Ok(embs)
491 }
492
493 pub fn checkpoint_index(&self) -> Result<()> {
495 let root = self.compute_merkle_root()?;
496 self.hnsw.checkpoint(root)
497 }
498
499 pub fn save_index(&self) -> Result<()> {
501 self.hnsw.save()
502 }
503
504 pub fn add_edge(&mut self, edge: &Edge) -> Result<()> {
508 self.db
509 .execute(
510 "INSERT OR REPLACE INTO edges (source, target, kind, weight, metadata) VALUES (?1, ?2, ?3, ?4, ?5)",
511 params![
512 edge.source.as_bytes().as_slice(),
513 edge.target.as_bytes().as_slice(),
514 edge.kind as i32,
515 edge.weight,
516 edge.metadata.as_deref(),
517 ],
518 )
519 .map_err(|e| CPError::Database(e.to_string()))?;
520
521 Ok(())
522 }
523
524 pub fn get_edges(&self, node_id: Uuid) -> Result<Vec<Edge>> {
526 let mut stmt = self
527 .db
528 .prepare("SELECT source, target, kind, weight, metadata FROM edges WHERE source = ?1")
529 .map_err(|e| CPError::Database(e.to_string()))?;
530
531 let edges = stmt
532 .query_map(params![node_id.as_bytes().as_slice()], |row| {
533 let source_bytes: Vec<u8> = row.get(0)?;
534 let target_bytes: Vec<u8> = row.get(1)?;
535 let kind: i32 = row.get(2)?;
536 let weight: Option<f64> = row.get(3)?;
537 let metadata: Option<String> = row.get(4)?;
538
539 let source = Uuid::from_slice(&source_bytes).expect("invalid uuid");
540 let target = Uuid::from_slice(&target_bytes).expect("invalid uuid");
541 let kind = EdgeKind::from_u8(kind as u8).unwrap_or(EdgeKind::DocToChunk);
542
543 Ok(Edge {
544 source,
545 target,
546 kind,
547 weight: weight.map(|w| w as f32),
548 metadata,
549 })
550 })
551 .map_err(|e| CPError::Database(e.to_string()))?
552 .collect::<std::result::Result<Vec<_>, _>>()
553 .map_err(|e| CPError::Database(e.to_string()))?;
554
555 Ok(edges)
556 }
557
558 pub fn edges_to(&self, node_id: Uuid) -> Result<Vec<Edge>> {
560 let mut stmt = self
561 .db
562 .prepare("SELECT source, target, kind, weight, metadata FROM edges WHERE target = ?1")
563 .map_err(|e| CPError::Database(e.to_string()))?;
564
565 let edges = stmt
566 .query_map(params![node_id.as_bytes().as_slice()], |row| {
567 let source_bytes: Vec<u8> = row.get(0)?;
568 let target_bytes: Vec<u8> = row.get(1)?;
569 let kind: i32 = row.get(2)?;
570 let weight: Option<f64> = row.get(3)?;
571 let metadata: Option<String> = row.get(4)?;
572
573 let source = Uuid::from_slice(&source_bytes).expect("invalid uuid");
574 let target = Uuid::from_slice(&target_bytes).expect("invalid uuid");
575 let kind = EdgeKind::from_u8(kind as u8).unwrap_or(EdgeKind::DocToChunk);
576
577 Ok(Edge {
578 source,
579 target,
580 kind,
581 weight: weight.map(|w| w as f32),
582 metadata,
583 })
584 })
585 .map_err(|e| CPError::Database(e.to_string()))?
586 .collect::<std::result::Result<Vec<_>, _>>()
587 .map_err(|e| CPError::Database(e.to_string()))?;
588
589 Ok(edges)
590 }
591
592 pub fn all_edges(&self) -> Result<Vec<Edge>> {
594 let mut stmt = self
595 .db
596 .prepare("SELECT source, target, kind, weight, metadata FROM edges")
597 .map_err(|e| CPError::Database(e.to_string()))?;
598
599 let edges = stmt
600 .query_map([], |row| {
601 let source_bytes: Vec<u8> = row.get(0)?;
602 let target_bytes: Vec<u8> = row.get(1)?;
603 let kind: i32 = row.get(2)?;
604 let weight: Option<f64> = row.get(3)?;
605 let metadata: Option<String> = row.get(4)?;
606
607 let source = Uuid::from_slice(&source_bytes).expect("invalid uuid");
608 let target = Uuid::from_slice(&target_bytes).expect("invalid uuid");
609 let kind = EdgeKind::from_u8(kind as u8).unwrap_or(EdgeKind::DocToChunk);
610
611 Ok(Edge {
612 source,
613 target,
614 kind,
615 weight: weight.map(|w| w as f32),
616 metadata,
617 })
618 })
619 .map_err(|e| CPError::Database(e.to_string()))?
620 .collect::<std::result::Result<Vec<_>, _>>()
621 .map_err(|e| CPError::Database(e.to_string()))?;
622
623 Ok(edges)
624 }
625
626 pub fn search(&self, query_vec: &[f32], k: usize) -> Result<Vec<(Uuid, f32)>> {
630 Ok(self.hnsw.search(query_vec, k))
631 }
632
633 pub fn search_lexical(&self, query: &str, k: usize) -> Result<Vec<(Uuid, f32)>> {
636 let mut stmt = self
637 .db
638 .prepare(
639 "SELECT id, rank FROM fts_chunks
640 WHERE fts_chunks MATCH ?1
641 ORDER BY rank LIMIT ?2",
642 )
643 .map_err(|e| CPError::Database(e.to_string()))?;
644
645 let results = stmt
646 .query_map(params![query, k as i64], |row| {
647 let id_bytes: Vec<u8> = row.get(0)?;
648 let rank: f64 = row.get(1)?;
649
650 let id = Uuid::from_slice(&id_bytes).expect("invalid uuid");
651 Ok((id, -rank as f32))
655 })
656 .map_err(|e| CPError::Database(e.to_string()))?
657 .collect::<std::result::Result<Vec<_>, _>>()
658 .map_err(|e| CPError::Database(e.to_string()))?;
659
660 Ok(results)
661 }
662
663 pub fn get_sorted_chunk_hashes(&self) -> Result<Vec<([u8; 16], [u8; 32])>> {
669 let mut stmt = self
670 .db
671 .prepare("SELECT id, text_hash FROM chunks ORDER BY id")
672 .map_err(|e| CPError::Database(e.to_string()))?;
673
674 let results = stmt
675 .query_map([], |row| {
676 let id_bytes: Vec<u8> = row.get(0)?;
677 let hash_bytes: Vec<u8> = row.get(1)?;
678
679 let mut id = [0u8; 16];
680 id.copy_from_slice(&id_bytes);
681 let mut hash = [0u8; 32];
682 hash.copy_from_slice(&hash_bytes);
683
684 Ok((id, hash))
685 })
686 .map_err(|e| CPError::Database(e.to_string()))?
687 .collect::<std::result::Result<Vec<_>, _>>()
688 .map_err(|e| CPError::Database(e.to_string()))?;
689
690 Ok(results)
691 }
692
693 pub fn compute_chunk_tree_root(&self) -> Result<[u8; 32]> {
698 let sorted = self.get_sorted_chunk_hashes()?;
699 let hashes: Vec<[u8; 32]> = sorted.iter().map(|(_, h)| *h).collect();
700 Ok(cp_core::state::compute_merkle_root(&hashes))
701 }
702
703 pub fn compute_merkle_root(&self) -> Result<[u8; 32]> {
710 let mut hasher = blake3::Hasher::new();
711
712 let mut docs = self.get_all_documents()?;
714 docs.sort_by_key(|d| d.id);
715 for doc in docs {
716 hasher.update(doc.id.as_bytes());
717 hasher.update(&doc.hash);
718 hasher.update(&doc.hierarchical_hash);
719 }
721
722 let mut stmt = self.db.prepare("SELECT id, text_hash FROM chunks ORDER BY id")
726 .map_err(|e| CPError::Database(e.to_string()))?;
727 let chunk_iter = stmt.query_map([], |row| {
728 let id_bytes: Vec<u8> = row.get(0)?;
729 let hash_bytes: Vec<u8> = row.get(1)?;
730 Ok((id_bytes, hash_bytes))
731 }).map_err(|e| CPError::Database(e.to_string()))?;
732
733 for chunk in chunk_iter {
734 let (id, hash) = chunk.map_err(|e| CPError::Database(e.to_string()))?;
735 hasher.update(&id);
736 hasher.update(&hash);
737 }
738
739 let mut stmt = self.db.prepare("SELECT id, vector, model_hash FROM embeddings ORDER BY id")
741 .map_err(|e| CPError::Database(e.to_string()))?;
742 let emb_iter = stmt.query_map([], |row| {
743 let id_bytes: Vec<u8> = row.get(0)?;
744 let vec_bytes: Vec<u8> = row.get(1)?;
745 let model_hash: Vec<u8> = row.get(2)?;
746 Ok((id_bytes, vec_bytes, model_hash))
747 }).map_err(|e| CPError::Database(e.to_string()))?;
748
749 for emb in emb_iter {
750 let (id, vec, model) = emb.map_err(|e| CPError::Database(e.to_string()))?;
751 hasher.update(&id);
752 hasher.update(&vec);
753 hasher.update(&model);
754 }
755
756 let mut stmt = self.db.prepare("SELECT source, target, kind, weight FROM edges ORDER BY source, target, kind")
758 .map_err(|e| CPError::Database(e.to_string()))?;
759 let edge_iter = stmt.query_map([], |row| {
760 let s: Vec<u8> = row.get(0)?;
761 let t: Vec<u8> = row.get(1)?;
762 let k: i32 = row.get(2)?;
763 let w: Option<i32> = row.get(3)?;
764 Ok((s, t, k, w))
765 }).map_err(|e| CPError::Database(e.to_string()))?;
766
767 for edge in edge_iter {
768 let (s, t, k, w) = edge.map_err(|e| CPError::Database(e.to_string()))?;
769 hasher.update(&s);
770 hasher.update(&t);
771 hasher.update(&k.to_le_bytes());
772 if let Some(weight) = w {
773 hasher.update(&weight.to_le_bytes());
774 }
775 }
776
777 Ok(*hasher.finalize().as_bytes())
778 }
779
780 pub fn get_state_root_by_hash(&self, hash: &[u8; 32]) -> Result<Option<cp_core::StateRoot>> {
782 self.db
783 .query_row(
784 "SELECT hash, parent, hlc_wall_ms, hlc_counter, hlc_node_id, device_id, signature, seq
785 FROM state_roots WHERE hash = ?1",
786 params![hash.as_slice()],
787 |row| {
788 let hash_bytes: Vec<u8> = row.get(0)?;
789 let parent_bytes: Option<Vec<u8>> = row.get(1)?;
790 let hlc_wall_ms: i64 = row.get(2)?;
791 let hlc_counter: i32 = row.get(3)?;
792 let hlc_node_id_bytes: Vec<u8> = row.get(4)?;
793 let device_id_bytes: Vec<u8> = row.get(5)?;
794 let signature_bytes: Vec<u8> = row.get(6)?;
795 let seq: i64 = row.get(7)?;
796
797 let mut hash = [0u8; 32];
798 hash.copy_from_slice(&hash_bytes);
799
800 let parent = parent_bytes.map(|b| {
801 let mut p = [0u8; 32];
802 p.copy_from_slice(&b);
803 p
804 });
805
806 let mut hlc_node_id = [0u8; 16];
807 hlc_node_id.copy_from_slice(&hlc_node_id_bytes);
808 let hlc = cp_core::Hlc {
809 wall_ms: hlc_wall_ms as u64,
810 counter: hlc_counter as u16,
811 node_id: hlc_node_id,
812 };
813
814 let device_id = Uuid::from_slice(&device_id_bytes).expect("invalid uuid");
815
816 let mut signature = [0u8; 64];
817 signature.copy_from_slice(&signature_bytes);
818
819 Ok(cp_core::StateRoot {
820 hash,
821 parent,
822 hlc,
823 device_id,
824 signature,
825 seq: seq as u64,
826 })
827 },
828 )
829 .optional()
830 .map_err(|e| CPError::Database(e.to_string()))
831 }
832
833 pub fn get_latest_root(&self) -> Result<Option<cp_core::StateRoot>> {
835 use rusqlite::OptionalExtension;
836 self.db
837 .query_row(
838 "SELECT hash, parent, hlc_wall_ms, hlc_counter, hlc_node_id, device_id, signature, seq
839 FROM state_roots ORDER BY seq DESC LIMIT 1",
840 [],
841 |row| {
842 let hash_bytes: Vec<u8> = row.get(0)?;
843 let parent_bytes: Option<Vec<u8>> = row.get(1)?;
844 let hlc_wall_ms: i64 = row.get(2)?;
845 let hlc_counter: i32 = row.get(3)?;
846 let hlc_node_id_bytes: Vec<u8> = row.get(4)?;
847 let device_id_bytes: Vec<u8> = row.get(5)?;
848 let signature_bytes: Vec<u8> = row.get(6)?;
849 let seq: i64 = row.get(7)?;
850
851 let mut hash = [0u8; 32];
852 hash.copy_from_slice(&hash_bytes);
853
854 let parent = parent_bytes.map(|b| {
855 let mut p = [0u8; 32];
856 p.copy_from_slice(&b);
857 p
858 });
859
860 let mut hlc_node_id = [0u8; 16];
861 hlc_node_id.copy_from_slice(&hlc_node_id_bytes);
862 let hlc = cp_core::Hlc {
863 wall_ms: hlc_wall_ms as u64,
864 counter: hlc_counter as u16,
865 node_id: hlc_node_id,
866 };
867
868 let device_id = Uuid::from_slice(&device_id_bytes).expect("invalid uuid");
869
870 let mut signature = [0u8; 64];
871 signature.copy_from_slice(&signature_bytes);
872
873 Ok(cp_core::StateRoot {
874 hash,
875 parent,
876 hlc,
877 device_id,
878 signature,
879 seq: seq as u64,
880 })
881 },
882 )
883 .optional()
884 .map_err(|e| CPError::Database(e.to_string()))
885 }
886
887 pub fn set_latest_root(&mut self, root: &cp_core::StateRoot) -> Result<()> {
889 self.db
890 .execute(
891 "INSERT OR REPLACE INTO state_roots (hash, parent, hlc_wall_ms, hlc_counter, hlc_node_id, device_id, signature, seq)
892 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
893 params![
894 root.hash.as_slice(),
895 root.parent.as_ref().map(|p| p.as_slice()),
896 root.hlc.wall_ms as i64,
897 root.hlc.counter as i32,
898 root.hlc.node_id.as_slice(),
899 root.device_id.as_bytes().as_slice(),
900 root.signature.as_slice(),
901 root.seq as i64,
902 ],
903 )
904 .map_err(|e| CPError::Database(e.to_string()))?;
905
906 Ok(())
907 }
908
909 pub fn apply_diff(&mut self, diff: &cp_core::CognitiveDiff) -> Result<()> {
916 let latest_root = self.get_latest_root()?;
918
919 let (resolved_diff, has_conflict) = self.resolve_conflicts(diff, latest_root.as_ref())?;
921
922 if has_conflict {
923 info!("Conflict detected and resolved using LWW/HLC");
924 }
925
926 let tx = self.db.transaction().map_err(|e| CPError::Database(e.to_string()))?;
927
928 for id in &resolved_diff.removed_doc_ids {
930 tx.execute("DELETE FROM documents WHERE id = ?1", params![id.as_bytes().as_slice()])
931 .map_err(|e| CPError::Database(e.to_string()))?;
932 }
933 for id in &resolved_diff.removed_chunk_ids {
934 tx.execute("DELETE FROM chunks WHERE id = ?1", params![id.as_bytes().as_slice()])
935 .map_err(|e| CPError::Database(e.to_string()))?;
936 }
937 for id in &resolved_diff.removed_embedding_ids {
938 tx.execute("DELETE FROM embeddings WHERE id = ?1", params![id.as_bytes().as_slice()])
939 .map_err(|e| CPError::Database(e.to_string()))?;
940 }
941
942 if !resolved_diff.removed_embedding_ids.is_empty() {
944 self.hnsw.invalidate();
945 }
946 for (source, target, kind) in &resolved_diff.removed_edges {
947 tx.execute(
948 "DELETE FROM edges WHERE source = ?1 AND target = ?2 AND kind = ?3",
949 params![source.as_bytes().as_slice(), target.as_bytes().as_slice(), *kind as i32],
950 )
951 .map_err(|e| CPError::Database(e.to_string()))?;
952 }
953
954 for doc in &resolved_diff.added_docs {
956 tx.execute(
957 "INSERT OR REPLACE INTO documents (id, path, hash, hierarchical_hash, mtime, size, mime_type)
958 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
959 params![
960 doc.id.as_bytes().as_slice(),
961 doc.path.to_string_lossy().as_ref(),
962 doc.hash.as_slice(),
963 doc.hierarchical_hash.as_slice(),
964 doc.mtime,
965 doc.size as i64,
966 &doc.mime_type,
967 ],
968 ).map_err(|e| CPError::Database(e.to_string()))?;
969 }
970
971 for chunk in &resolved_diff.added_chunks {
972 tx.execute(
973 "INSERT OR REPLACE INTO chunks (id, doc_id, text, byte_offset, byte_length, sequence, text_hash)
974 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
975 params![
976 chunk.id.as_bytes().as_slice(),
977 chunk.doc_id.as_bytes().as_slice(),
978 &chunk.text,
979 chunk.byte_offset as i64,
980 chunk.byte_length as i64,
981 chunk.sequence,
982 chunk.text_hash.as_slice(),
983 ],
984 ).map_err(|e| CPError::Database(e.to_string()))?;
985 }
986
987 for emb in &resolved_diff.added_embeddings {
988 let vector_bytes: Vec<u8> = emb.vector.iter().flat_map(|f| f.to_le_bytes()).collect();
989 tx.execute(
990 "INSERT OR REPLACE INTO embeddings (id, chunk_id, vector, model_hash, dim)
991 VALUES (?1, ?2, ?3, ?4, ?5)",
992 params![
993 emb.id.as_bytes().as_slice(),
994 emb.chunk_id.as_bytes().as_slice(),
995 &vector_bytes,
996 emb.model_hash.as_slice(),
997 emb.dim as i32,
998 ],
999 ).map_err(|e| CPError::Database(e.to_string()))?;
1000 }
1001
1002 for edge in &resolved_diff.added_edges {
1003 tx.execute(
1004 "INSERT OR REPLACE INTO edges (source, target, kind, weight) VALUES (?1, ?2, ?3, ?4)",
1005 params![
1006 edge.source.as_bytes().as_slice(),
1007 edge.target.as_bytes().as_slice(),
1008 edge.kind as i32,
1009 edge.weight.map(|w| w as i32),
1010 ],
1011 ).map_err(|e| CPError::Database(e.to_string()))?;
1012 }
1013
1014 tx.execute(
1016 "INSERT OR REPLACE INTO state_roots (hash, parent, hlc_wall_ms, hlc_counter, hlc_node_id, device_id, signature, seq)
1017 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1018 params![
1019 resolved_diff.metadata.new_root.as_slice(),
1020 if resolved_diff.metadata.prev_root == [0u8; 32] { None } else { Some(resolved_diff.metadata.prev_root.as_slice()) },
1021 resolved_diff.metadata.hlc.wall_ms as i64,
1022 resolved_diff.metadata.hlc.counter as i32,
1023 resolved_diff.metadata.hlc.node_id.as_slice(),
1024 resolved_diff.metadata.device_id.as_bytes().as_slice(),
1025 [0u8; 64].as_slice(),
1026 resolved_diff.metadata.seq as i64,
1027 ],
1028 ).map_err(|e| CPError::Database(e.to_string()))?;
1029
1030 tx.commit().map_err(|e| CPError::Database(e.to_string()))?;
1031
1032 for emb in &resolved_diff.added_embeddings {
1034 let vec_f32 = emb.to_f32();
1035 let _ = self.hnsw.insert(emb.id, vec_f32);
1036 }
1037
1038 self.hnsw.invalidate();
1040
1041 Ok(())
1042 }
1043
1044 fn resolve_conflicts(
1052 &self,
1053 incoming_diff: &cp_core::CognitiveDiff,
1054 current_root: Option<&cp_core::StateRoot>,
1055 ) -> Result<(cp_core::CognitiveDiff, bool)> {
1056 let mut has_conflict = false;
1057
1058 let current_root = match current_root {
1060 Some(root) => root,
1061 None => return Ok((incoming_diff.clone(), false)),
1062 };
1063
1064 let incoming_hlc = &incoming_diff.metadata.hlc;
1066 let current_hlc = ¤t_root.hlc;
1067
1068 let incoming_is_newer = incoming_hlc > current_hlc;
1070
1071 if !incoming_is_newer && incoming_hlc != current_hlc {
1073 info!(
1074 "Conflict: incoming diff HLC {:?} is older than current {:?}",
1075 incoming_hlc, current_hlc
1076 );
1077 has_conflict = true;
1078 return Ok((cp_core::CognitiveDiff::empty(
1080 current_root.hash,
1081 incoming_diff.metadata.device_id,
1082 incoming_diff.metadata.seq,
1083 incoming_diff.metadata.hlc.clone(),
1084 ), has_conflict));
1085 }
1086
1087 if incoming_hlc == current_hlc {
1089 let incoming_hash = blake3::hash(&incoming_diff.metadata.new_root);
1090 let current_hash = blake3::hash(¤t_root.hash);
1091
1092 if incoming_hash.as_bytes() <= current_hash.as_bytes() {
1094 info!("Conflict: equal HLC, current hash wins (tiebreaker)");
1095 has_conflict = true;
1096 return Ok((cp_core::CognitiveDiff::empty(
1097 current_root.hash,
1098 incoming_diff.metadata.device_id,
1099 incoming_diff.metadata.seq,
1100 incoming_diff.metadata.hlc.clone(),
1101 ), has_conflict));
1102 }
1103 }
1104
1105 Ok((incoming_diff.clone(), has_conflict))
1107 }
1108
1109 pub fn clear_state_roots(&mut self) -> Result<()> {
1111 self.db.execute_batch("DELETE FROM state_roots;")
1112 .map_err(|e| CPError::Database(e.to_string()))?;
1113 info!("Cleared state roots");
1114 Ok(())
1115 }
1116
1117 pub fn clear_all(&mut self) -> Result<()> {
1119 self.db.execute_batch(
1120 r#"
1121 DELETE FROM embeddings;
1122 DELETE FROM chunks;
1123 DELETE FROM documents;
1124 DELETE FROM edges;
1125 DELETE FROM state_roots;
1126 "#,
1127 )
1128 .map_err(|e| CPError::Database(e.to_string()))?;
1129
1130 self.hnsw.clear()?;
1132
1133 info!("Cleared all data from graph store for fresh sync");
1134 Ok(())
1135 }
1136
1137 pub fn stats(&self) -> Result<GraphStats> {
1139 let doc_count: i64 = self
1140 .db
1141 .query_row("SELECT COUNT(*) FROM documents", [], |row| row.get(0))
1142 .map_err(|e| CPError::Database(e.to_string()))?;
1143
1144 let chunk_count: i64 = self
1145 .db
1146 .query_row("SELECT COUNT(*) FROM chunks", [], |row| row.get(0))
1147 .map_err(|e| CPError::Database(e.to_string()))?;
1148
1149 let embedding_count: i64 = self
1150 .db
1151 .query_row("SELECT COUNT(*) FROM embeddings", [], |row| row.get(0))
1152 .map_err(|e| CPError::Database(e.to_string()))?;
1153
1154 let edge_count: i64 = self
1155 .db
1156 .query_row("SELECT COUNT(*) FROM edges", [], |row| row.get(0))
1157 .map_err(|e| CPError::Database(e.to_string()))?;
1158
1159 Ok(GraphStats {
1160 documents: doc_count as usize,
1161 chunks: chunk_count as usize,
1162 embeddings: embedding_count as usize,
1163 edges: edge_count as usize,
1164 })
1165 }
1166}
1167
1168#[derive(Debug, Clone)]
1170pub struct GraphStats {
1171 pub documents: usize,
1172 pub chunks: usize,
1173 pub embeddings: usize,
1174 pub edges: usize,
1175}
1176
1177#[cfg(test)]
1178mod tests {
1179 use super::*;
1180 use std::path::PathBuf;
1181
1182 fn fresh_store() -> GraphStore {
1183 GraphStore::in_memory().unwrap()
1184 }
1185
1186 fn test_hlc() -> cp_core::Hlc {
1187 cp_core::Hlc::new(1234567890, [0u8; 16])
1188 }
1189
1190 #[test]
1193 fn test_graph_store_new() {
1194 let store = GraphStore::in_memory().unwrap();
1195 assert!(store.stats().is_ok());
1196 }
1197
1198 #[test]
1199 fn test_graph_store_in_memory() {
1200 let store = GraphStore::in_memory().unwrap();
1201 let stats = store.stats().unwrap();
1202 assert_eq!(stats.documents, 0);
1203 assert_eq!(stats.chunks, 0);
1204 assert_eq!(stats.embeddings, 0);
1205 assert_eq!(stats.edges, 0);
1206 }
1207
1208 #[test]
1209 fn test_graph_store_insert_document() {
1210 let mut store = fresh_store();
1211 let doc = Document::new(PathBuf::from("test.md"), b"Hello, world!", 12345);
1212 store.insert_document(&doc).unwrap();
1213
1214 let retrieved = store.get_document(doc.id).unwrap().unwrap();
1215 assert_eq!(retrieved.id, doc.id);
1216 assert_eq!(retrieved.path, doc.path);
1217 assert_eq!(retrieved.hash, doc.hash);
1218 }
1219
1220 #[test]
1221 fn test_graph_store_get_document_by_id() {
1222 let mut store = fresh_store();
1223 let doc = Document::new(PathBuf::from("test.md"), b"Content", 12345);
1224 store.insert_document(&doc).unwrap();
1225
1226 let retrieved = store.get_document(doc.id).unwrap();
1227 assert!(retrieved.is_some());
1228 assert_eq!(retrieved.unwrap().id, doc.id);
1229 }
1230
1231 #[test]
1232 fn test_graph_store_get_document_by_path() {
1233 let mut store = fresh_store();
1234 let path = PathBuf::from("test.md");
1235 let doc = Document::new(path.clone(), b"Content", 12345);
1236 store.insert_document(&doc).unwrap();
1237
1238 let retrieved = store.get_document_by_path(&path).unwrap();
1239 assert!(retrieved.is_some());
1240 assert_eq!(retrieved.unwrap().path, path);
1241 }
1242
1243 #[test]
1244 fn test_graph_store_document_not_found() {
1245 let store = fresh_store();
1246 let non_existent_id = Uuid::new_v4();
1247 let retrieved = store.get_document(non_existent_id).unwrap();
1248 assert!(retrieved.is_none());
1249
1250 let non_existent_path = PathBuf::from("non_existent.md");
1251 let retrieved_by_path = store.get_document_by_path(&non_existent_path).unwrap();
1252 assert!(retrieved_by_path.is_none());
1253 }
1254
1255 #[test]
1256 fn test_graph_store_update_document() {
1257 let mut store = fresh_store();
1258
1259 let doc = Document::new(PathBuf::from("test.md"), b"Original content", 12345);
1261 let original_id = doc.id;
1262 store.insert_document(&doc).unwrap();
1263
1264 store.insert_document(&doc).unwrap();
1266
1267 let docs = store.get_all_documents().unwrap();
1269 assert_eq!(docs.len(), 1);
1270
1271 let retrieved = store.get_document(original_id).unwrap();
1273 assert!(retrieved.is_some());
1274 }
1275
1276 #[test]
1277 fn test_graph_store_delete_document() {
1278 let mut store = fresh_store();
1279
1280 let doc = Document::new(PathBuf::from("test.md"), b"Content", 12345);
1282 store.insert_document(&doc).unwrap();
1283
1284 let chunk = Chunk::new(doc.id, "Test chunk".to_string(), 0, 0);
1285 store.insert_chunk(&chunk).unwrap();
1286
1287 let vector: Vec<f32> = (0..384).map(|i| i as f32 * 0.01).collect();
1288 let emb = Embedding::new(chunk.id, &vector, [0u8; 32], 0);
1289 store.insert_embedding(&emb).unwrap();
1290
1291 store.delete_document(doc.id).unwrap();
1293
1294 let retrieved = store.get_document(doc.id).unwrap();
1296 assert!(retrieved.is_none());
1297
1298 let chunks = store.get_chunks_for_doc(doc.id).unwrap();
1300 assert!(chunks.is_empty());
1301
1302 let embedding = store.get_embedding(emb.id).unwrap();
1304 assert!(embedding.is_none());
1305 }
1306
1307 #[test]
1308 fn test_graph_store_all_documents() {
1309 let mut store = fresh_store();
1310
1311 let doc1 = Document::new(PathBuf::from("test1.md"), b"Content 1", 12345);
1312 let doc2 = Document::new(PathBuf::from("test2.md"), b"Content 2", 12346);
1313
1314 store.insert_document(&doc1).unwrap();
1315 store.insert_document(&doc2).unwrap();
1316
1317 let all_docs = store.get_all_documents().unwrap();
1318 assert_eq!(all_docs.len(), 2);
1319 }
1320
1321 #[test]
1322 fn test_graph_store_insert_chunk() {
1323 let mut store = fresh_store();
1324
1325 let doc = Document::new(PathBuf::from("test.md"), b"Content", 12345);
1326 store.insert_document(&doc).unwrap();
1327
1328 let chunk = Chunk::new(doc.id, "Test chunk text".to_string(), 0, 0);
1329 store.insert_chunk(&chunk).unwrap();
1330
1331 let retrieved = store.get_chunk(chunk.id).unwrap().unwrap();
1332 assert_eq!(retrieved.id, chunk.id);
1333 assert_eq!(retrieved.doc_id, doc.id);
1334 }
1335
1336 #[test]
1337 fn test_graph_store_get_chunk() {
1338 let mut store = fresh_store();
1339
1340 let doc = Document::new(PathBuf::from("test.md"), b"Content", 12345);
1341 store.insert_document(&doc).unwrap();
1342
1343 let chunk = Chunk::new(doc.id, "Test chunk".to_string(), 0, 0);
1344 store.insert_chunk(&chunk).unwrap();
1345
1346 let retrieved = store.get_chunk(chunk.id).unwrap();
1347 assert!(retrieved.is_some());
1348 assert_eq!(retrieved.unwrap().id, chunk.id);
1349 }
1350
1351 #[test]
1352 fn test_graph_store_chunks_for_document() {
1353 let mut store = fresh_store();
1354
1355 let doc = Document::new(PathBuf::from("test.md"), b"Content", 12345);
1356 store.insert_document(&doc).unwrap();
1357
1358 let chunk1 = Chunk::new(doc.id, "Chunk 1".to_string(), 0, 10);
1359 let chunk2 = Chunk::new(doc.id, "Chunk 2".to_string(), 10, 20);
1360
1361 store.insert_chunk(&chunk1).unwrap();
1362 store.insert_chunk(&chunk2).unwrap();
1363
1364 let chunks = store.get_chunks_for_doc(doc.id).unwrap();
1365 assert_eq!(chunks.len(), 2);
1366 }
1367
1368 #[test]
1369 fn test_graph_store_delete_chunks_for_document() {
1370 let mut store = fresh_store();
1371
1372 let doc = Document::new(PathBuf::from("test.md"), b"Content", 12345);
1373 store.insert_document(&doc).unwrap();
1374
1375 let chunk = Chunk::new(doc.id, "Test chunk".to_string(), 0, 0);
1376 store.insert_chunk(&chunk).unwrap();
1377
1378 store.delete_document(doc.id).unwrap();
1380
1381 let chunks = store.get_chunks_for_doc(doc.id).unwrap();
1382 assert!(chunks.is_empty());
1383 }
1384
1385 #[test]
1386 fn test_graph_store_insert_embedding() {
1387 let mut store = fresh_store();
1388
1389 let doc = Document::new(PathBuf::from("test.md"), b"Content", 12345);
1390 store.insert_document(&doc).unwrap();
1391
1392 let chunk = Chunk::new(doc.id, "Test chunk".to_string(), 0, 0);
1393 store.insert_chunk(&chunk).unwrap();
1394
1395 let vector: Vec<f32> = (0..384).map(|i| i as f32 * 0.01).collect();
1396 let emb = Embedding::new(chunk.id, &vector, [0u8; 32], 0);
1397
1398 store.insert_embedding(&emb).unwrap();
1399
1400 let retrieved = store.get_embedding(emb.id).unwrap().unwrap();
1401 assert_eq!(retrieved.id, emb.id);
1402 }
1403
1404 #[test]
1405 fn test_graph_store_get_embedding() {
1406 let mut store = fresh_store();
1407
1408 let doc = Document::new(PathBuf::from("test.md"), b"Content", 12345);
1409 store.insert_document(&doc).unwrap();
1410
1411 let chunk = Chunk::new(doc.id, "Test chunk".to_string(), 0, 0);
1412 store.insert_chunk(&chunk).unwrap();
1413
1414 let vector: Vec<f32> = (0..384).map(|i| i as f32 * 0.01).collect();
1415 let emb = Embedding::new(chunk.id, &vector, [0u8; 32], 0);
1416
1417 store.insert_embedding(&emb).unwrap();
1418
1419 let retrieved = store.get_embedding(emb.id).unwrap();
1420 assert!(retrieved.is_some());
1421 assert_eq!(retrieved.unwrap().id, emb.id);
1422 }
1423
1424 #[test]
1425 fn test_graph_store_embeddings_for_chunk() {
1426 let mut store = fresh_store();
1427
1428 let doc = Document::new(PathBuf::from("test.md"), b"Content", 12345);
1429 store.insert_document(&doc).unwrap();
1430
1431 let chunk = Chunk::new(doc.id, "Test chunk".to_string(), 0, 0);
1432 store.insert_chunk(&chunk).unwrap();
1433
1434 let vector: Vec<f32> = (0..384).map(|i| i as f32 * 0.01).collect();
1435 let emb = Embedding::new(chunk.id, &vector, [0u8; 32], 0);
1436
1437 store.insert_embedding(&emb).unwrap();
1438
1439 let retrieved = store.get_embedding_for_chunk(chunk.id).unwrap();
1440 assert!(retrieved.is_some());
1441 assert_eq!(retrieved.unwrap().chunk_id, chunk.id);
1442 }
1443
1444 #[test]
1445 fn test_graph_store_all_embeddings() {
1446 let mut store = fresh_store();
1447
1448 let doc = Document::new(PathBuf::from("test.md"), b"Content", 12345);
1449 store.insert_document(&doc).unwrap();
1450
1451 let chunk = Chunk::new(doc.id, "Test chunk".to_string(), 0, 0);
1452 store.insert_chunk(&chunk).unwrap();
1453
1454 let vector: Vec<f32> = (0..384).map(|i| i as f32 * 0.01).collect();
1455 let emb = Embedding::new(chunk.id, &vector, [0u8; 32], 0);
1456
1457 store.insert_embedding(&emb).unwrap();
1458
1459 let all_embeddings = store.get_all_embeddings().unwrap();
1460 assert_eq!(all_embeddings.len(), 1);
1461 }
1462
1463 #[test]
1464 fn test_graph_store_insert_edge() {
1465 let mut store = fresh_store();
1466
1467 let source = Uuid::new_v4();
1468 let target = Uuid::new_v4();
1469 let edge = Edge::new(source, target, EdgeKind::DocToChunk);
1470
1471 store.add_edge(&edge).unwrap();
1472
1473 let edges = store.get_edges(source).unwrap();
1474 assert_eq!(edges.len(), 1);
1475 assert_eq!(edges[0].target, target);
1476 }
1477
1478 #[test]
1479 fn test_graph_store_edges_from() {
1480 let mut store = fresh_store();
1481
1482 let source = Uuid::new_v4();
1483 let target1 = Uuid::new_v4();
1484 let target2 = Uuid::new_v4();
1485
1486 let edge1 = Edge::new(source, target1, EdgeKind::DocToChunk);
1487 let edge2 = Edge::new(source, target2, EdgeKind::ChunkToChunk);
1488
1489 store.add_edge(&edge1).unwrap();
1490 store.add_edge(&edge2).unwrap();
1491
1492 let edges = store.get_edges(source).unwrap();
1493 assert_eq!(edges.len(), 2);
1494 }
1495
1496 #[test]
1497 fn test_graph_store_edges_to() {
1498 let mut store = fresh_store();
1499
1500 let source1 = Uuid::new_v4();
1501 let source2 = Uuid::new_v4();
1502 let target = Uuid::new_v4();
1503
1504 let edge1 = Edge::new(source1, target, EdgeKind::DocToChunk);
1505 let edge2 = Edge::new(source2, target, EdgeKind::ChunkToChunk);
1506
1507 store.add_edge(&edge1).unwrap();
1508 store.add_edge(&edge2).unwrap();
1509
1510 let edges_from_source1 = store.get_edges(source1).unwrap();
1512 assert_eq!(edges_from_source1.len(), 1);
1513
1514 let edges_from_source2 = store.get_edges(source2).unwrap();
1515 assert_eq!(edges_from_source2.len(), 1);
1516 }
1517
1518 #[test]
1519 fn test_graph_store_all_edges() {
1520 let mut store = fresh_store();
1521
1522 let edge1 = Edge::new(Uuid::new_v4(), Uuid::new_v4(), EdgeKind::DocToChunk);
1523 let edge2 = Edge::new(Uuid::new_v4(), Uuid::new_v4(), EdgeKind::ChunkToChunk);
1524
1525 store.add_edge(&edge1).unwrap();
1526 store.add_edge(&edge2).unwrap();
1527
1528 let stats = store.stats().unwrap();
1530 assert_eq!(stats.edges, 2);
1531 }
1532
1533 #[test]
1534 fn test_graph_store_compute_merkle_root() {
1535 let mut store = fresh_store();
1536
1537 let root1 = store.compute_merkle_root().unwrap();
1538
1539 let doc = Document::new(PathBuf::from("test.md"), b"Hello", 0);
1540 store.insert_document(&doc).unwrap();
1541
1542 let root2 = store.compute_merkle_root().unwrap();
1543
1544 assert_ne!(root1, root2);
1546 }
1547
1548 #[test]
1549 fn test_graph_store_insert_state_root() {
1550 let mut store = fresh_store();
1551
1552 let state_root = cp_core::StateRoot::new([1u8; 32], Some([2u8; 32]), test_hlc(), Uuid::new_v4(), 1);
1553 store.set_latest_root(&state_root).unwrap();
1554
1555 let retrieved = store.get_latest_root().unwrap();
1556 assert!(retrieved.is_some());
1557 assert_eq!(retrieved.unwrap().hash, state_root.hash);
1558 }
1559
1560 #[test]
1561 fn test_graph_store_get_latest_state_root() {
1562 let mut store = fresh_store();
1563
1564 let latest = store.get_latest_root().unwrap();
1566 assert!(latest.is_none());
1567
1568 let state_root = cp_core::StateRoot::new([1u8; 32], None, test_hlc(), Uuid::new_v4(), 0);
1570 store.set_latest_root(&state_root).unwrap();
1571
1572 let retrieved = store.get_latest_root().unwrap();
1573 assert!(retrieved.is_some());
1574 }
1575
1576 #[test]
1577 fn test_graph_store_stats() {
1578 let mut store = fresh_store();
1579
1580 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1581 store.insert_document(&doc).unwrap();
1582
1583 let stats = store.stats().unwrap();
1584 assert_eq!(stats.documents, 1);
1585 assert_eq!(stats.chunks, 0);
1586 assert_eq!(stats.embeddings, 0);
1587 assert_eq!(stats.edges, 0);
1588 }
1589
1590 #[test]
1591 fn test_graph_store_stats_empty() {
1592 let store = fresh_store();
1593
1594 let stats = store.stats().unwrap();
1595 assert_eq!(stats.documents, 0);
1596 assert_eq!(stats.chunks, 0);
1597 assert_eq!(stats.embeddings, 0);
1598 assert_eq!(stats.edges, 0);
1599 }
1600
1601 #[test]
1604 fn test_transaction_begin() {
1605 let store = fresh_store();
1606 assert!(store.stats().is_ok());
1608 }
1609
1610 #[test]
1611 fn test_transaction_commit() {
1612 let mut store = fresh_store();
1613
1614 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1616 store.insert_document(&doc).unwrap();
1617
1618 let retrieved = store.get_document(doc.id).unwrap();
1620 assert!(retrieved.is_some());
1621 }
1622
1623 #[test]
1624 fn test_transaction_rollback() {
1625 let mut store = fresh_store();
1628
1629 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1630
1631 store.insert_document(&doc).unwrap();
1633
1634 let retrieved = store.get_document(doc.id).unwrap();
1636 assert!(retrieved.is_some());
1637 }
1638
1639 #[test]
1640 fn test_transaction_atomicity() {
1641 let mut store = fresh_store();
1642
1643 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1644 store.insert_document(&doc).unwrap();
1645
1646 let stats = store.stats().unwrap();
1647 assert_eq!(stats.documents, 1);
1648 }
1649
1650 #[test]
1651 fn test_transaction_isolation() {
1652 let store = fresh_store();
1653 let stats1 = store.stats().unwrap();
1656 let stats2 = store.stats().unwrap();
1657 assert_eq!(stats1.documents, stats2.documents);
1658 }
1659
1660 #[test]
1663 fn test_fts_search_basic() {
1664 let mut store = fresh_store();
1665
1666 let doc = Document::new(PathBuf::from("test.md"), b"Some test content", 0);
1667 store.insert_document(&doc).unwrap();
1668
1669 let chunk = Chunk::new(doc.id, "Hello world this is a test".to_string(), 0, 0);
1670 store.insert_chunk(&chunk).unwrap();
1671
1672 std::thread::sleep(std::time::Duration::from_millis(100));
1674
1675 let results = store.search_lexical("test", 10).unwrap();
1676 }
1679
1680 #[test]
1681 fn test_fts_search_multiple_terms() {
1682 let mut store = fresh_store();
1683
1684 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1685 store.insert_document(&doc).unwrap();
1686
1687 let chunk = Chunk::new(doc.id, "Hello world test Rust programming".to_string(), 0, 0);
1688 store.insert_chunk(&chunk).unwrap();
1689
1690 std::thread::sleep(std::time::Duration::from_millis(100));
1691
1692 let results = store.search_lexical("hello world", 10).unwrap();
1694 }
1696
1697 #[test]
1698 fn test_fts_search_no_results() {
1699 let mut store = fresh_store();
1700
1701 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1702 store.insert_document(&doc).unwrap();
1703
1704 let chunk = Chunk::new(doc.id, "Some content here".to_string(), 0, 0);
1705 store.insert_chunk(&chunk).unwrap();
1706
1707 std::thread::sleep(std::time::Duration::from_millis(100));
1708
1709 let results = store.search_lexical("nonexistentterm12345", 10).unwrap();
1710 }
1712
1713 #[test]
1714 fn test_fts_search_ranking() {
1715 let mut store = fresh_store();
1716
1717 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1718 store.insert_document(&doc).unwrap();
1719
1720 let chunk1 = Chunk::new(doc.id, "test word test".to_string(), 0, 0);
1721
1722 store.insert_chunk(&chunk1).unwrap();
1723
1724 std::thread::sleep(std::time::Duration::from_millis(100));
1725
1726 let results = store.search_lexical("test", 10);
1728 assert!(results.is_ok());
1730 }
1731
1732 #[test]
1733 fn test_fts_search_unicode() {
1734 let mut store = fresh_store();
1735
1736 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1737 store.insert_document(&doc).unwrap();
1738
1739 let chunk = Chunk::new(doc.id, "Hello 世界 unicode".to_string(), 0, 0);
1740 store.insert_chunk(&chunk).unwrap();
1741
1742 std::thread::sleep(std::time::Duration::from_millis(100));
1743
1744 let results = store.search_lexical("世界", 10).unwrap();
1746 }
1747
1748 #[test]
1749 fn test_fts_trigger_insert() {
1750 let mut store = fresh_store();
1751
1752 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1753 store.insert_document(&doc).unwrap();
1754
1755 let chunk = Chunk::new(doc.id, "Trigger test content".to_string(), 0, 0);
1757 store.insert_chunk(&chunk).unwrap();
1758
1759 std::thread::sleep(std::time::Duration::from_millis(100));
1761
1762 let results = store.search_lexical("trigger", 10).unwrap();
1764 }
1765
1766 #[test]
1767 fn test_fts_trigger_delete() {
1768 let mut store = fresh_store();
1769
1770 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1771 store.insert_document(&doc).unwrap();
1772
1773 let chunk = Chunk::new(doc.id, "Delete test content".to_string(), 0, 0);
1774 store.insert_chunk(&chunk).unwrap();
1775
1776 std::thread::sleep(std::time::Duration::from_millis(100));
1777
1778 store.delete_document(doc.id).unwrap();
1780
1781 std::thread::sleep(std::time::Duration::from_millis(100));
1782
1783 let results = store.search_lexical("delete", 10).unwrap();
1785 }
1786
1787 #[test]
1788 fn test_fts_trigger_update() {
1789 let mut store = fresh_store();
1791
1792 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1793 store.insert_document(&doc).unwrap();
1794
1795 let chunk = Chunk::new(doc.id, "Original content".to_string(), 0, 0);
1796 store.insert_chunk(&chunk).unwrap();
1797
1798 std::thread::sleep(std::time::Duration::from_millis(100));
1799
1800 store.delete_document(doc.id).unwrap();
1802
1803 let updated_chunk = Chunk::new(doc.id, "Updated content".to_string(), 0, 0);
1804 store.insert_document(&doc).unwrap();
1805 store.insert_chunk(&updated_chunk).unwrap();
1806
1807 std::thread::sleep(std::time::Duration::from_millis(100));
1808
1809 let results = store.search_lexical("updated", 10).unwrap();
1811 }
1812
1813 #[test]
1816 fn test_document_roundtrip() {
1817 let mut store = fresh_store();
1818
1819 let doc = Document::new(PathBuf::from("test.md"), b"Hello, world!", 12345);
1820 store.insert_document(&doc).unwrap();
1821
1822 let retrieved = store.get_document(doc.id).unwrap().unwrap();
1823 assert_eq!(retrieved.id, doc.id);
1824 assert_eq!(retrieved.path, doc.path);
1825 assert_eq!(retrieved.hash, doc.hash);
1826 }
1827
1828 #[test]
1829 fn test_chunk_operations() {
1830 let mut store = fresh_store();
1831
1832 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1833 store.insert_document(&doc).unwrap();
1834
1835 let chunk = Chunk::new(doc.id, "Test chunk text".to_string(), 0, 0);
1837 store.insert_chunk(&chunk).unwrap();
1838
1839 let chunks = store.get_chunks_for_doc(doc.id).unwrap();
1840 assert_eq!(chunks.len(), 1);
1841 assert_eq!(chunks[0].text, "Test chunk text\n");
1843 }
1844
1845 #[test]
1846 fn test_embedding_and_search() {
1847 let mut store = fresh_store();
1848
1849 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1850 store.insert_document(&doc).unwrap();
1851
1852 let chunk = Chunk::new(doc.id, "Test text".to_string(), 0, 0);
1853 store.insert_chunk(&chunk).unwrap();
1854
1855 let vector: Vec<f32> = (0..384).map(|i| (i as f32) * 0.01).collect();
1857 let emb = Embedding::new(chunk.id, &vector, [0u8; 32], 0);
1858 store.insert_embedding(&emb).unwrap();
1859
1860 let results = store.search(&vector, 1).unwrap();
1862 assert_eq!(results.len(), 1);
1863 assert!(results[0].1 > 0.99); }
1865
1866 #[test]
1867 fn test_edge_operations() {
1868 let mut store = fresh_store();
1869
1870 let source = Uuid::new_v4();
1871 let target = Uuid::new_v4();
1872 let edge = Edge::new(source, target, EdgeKind::DocToChunk);
1873
1874 store.add_edge(&edge).unwrap();
1875
1876 let edges = store.get_edges(source).unwrap();
1877 assert_eq!(edges.len(), 1);
1878 assert_eq!(edges[0].target, target);
1879 }
1880
1881 #[test]
1882 fn test_merkle_root() {
1883 let mut store = fresh_store();
1884
1885 let root1 = store.compute_merkle_root().unwrap();
1886
1887 let doc = Document::new(PathBuf::from("test.md"), b"Hello", 0);
1888 store.insert_document(&doc).unwrap();
1889
1890 let root2 = store.compute_merkle_root().unwrap();
1891
1892 assert_ne!(root1, root2);
1894 }
1895
1896 #[test]
1897 fn test_stats() {
1898 let mut store = fresh_store();
1899
1900 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1901 store.insert_document(&doc).unwrap();
1902
1903 let stats = store.stats().unwrap();
1904 assert_eq!(stats.documents, 1);
1905 assert_eq!(stats.chunks, 0);
1906 }
1907}