1mod index;
6mod migrations;
7
8pub use index::{IndexConfig, PersistentHnswIndex, SharedPersistentIndex};
9pub use migrations::{get_schema_version, needs_migration, run_migrations};
10
11use cp_core::{CPError, Chunk, Document, Edge, EdgeKind, Embedding, Result};
12use rusqlite::{params, Connection, OptionalExtension};
13use std::path::PathBuf;
14use tracing::info;
15use uuid::Uuid;
16
17fn uuid_from_bytes(bytes: &[u8]) -> rusqlite::Result<Uuid> {
19 Uuid::from_slice(bytes).map_err(|e| {
20 rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Blob, Box::new(e))
21 })
22}
23
24#[allow(dead_code)]
26pub struct GraphStore {
27 db: Connection,
29 hnsw: SharedPersistentIndex,
31 db_path: Option<PathBuf>,
33 index_config: IndexConfig,
35}
36
37impl GraphStore {
38 pub fn open(db_path: &str) -> Result<Self> {
40 Self::open_with_config(db_path, IndexConfig::default())
41 }
42
43 pub fn open_with_config(db_path: &str, index_config: IndexConfig) -> Result<Self> {
45 let (db, path) = if db_path == ":memory:" {
46 (
47 Connection::open_in_memory().map_err(|e| CPError::Database(e.to_string()))?,
48 None,
49 )
50 } else {
51 let path = PathBuf::from(db_path);
52 (
53 Connection::open(&path).map_err(|e| CPError::Database(e.to_string()))?,
54 Some(path),
55 )
56 };
57
58 run_migrations(&db)?;
60
61 let hnsw = if let Some(ref p) = path {
63 let index_path = p.with_extension("usearch");
64 SharedPersistentIndex::open(&index_path, index_config.clone())?
65 } else {
66 SharedPersistentIndex::new(index_config.clone())?
67 };
68
69 let store = Self {
70 db,
71 hnsw,
72 db_path: path,
73 index_config,
74 };
75
76 if store.hnsw.needs_rebuild() {
78 store.rebuild_hnsw_index()?;
79 } else {
80 let current_root = store.compute_merkle_root()?;
82 if !store.hnsw.is_valid(¤t_root) {
83 info!("Index checkpoint mismatch, rebuilding...");
84 store.rebuild_hnsw_index()?;
85 }
86 }
87
88 info!("Opened graph store at {} and loaded index", db_path);
89
90 Ok(store)
91 }
92
93 pub fn in_memory() -> Result<Self> {
95 Self::open(":memory:")
96 }
97
98 fn rebuild_hnsw_index(&self) -> Result<()> {
100 info!("Rebuilding HNSW index from database...");
101 self.hnsw.clear()?;
102
103 let embs = self.get_all_embeddings()?;
104 for emb in embs {
105 self.hnsw.insert(emb.id, &emb.to_f32())?;
106 }
107
108 let root = self.compute_merkle_root()?;
110 self.hnsw.checkpoint(root)?;
111
112 info!("HNSW index rebuilt with {} vectors", self.hnsw.len());
113 Ok(())
114 }
115
116 pub fn insert_document(&mut self, doc: &Document) -> Result<()> {
120 self.db
121 .execute(
122 "INSERT OR REPLACE INTO documents (id, path, hash, hierarchical_hash, mtime, size, mime_type, path_id, arweave_tx)
123 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
124 params![
125 doc.id.as_bytes().as_slice(),
126 doc.path.to_string_lossy().as_ref(),
127 doc.hash.as_slice(),
128 doc.hierarchical_hash.as_slice(),
129 doc.mtime,
130 doc.size as i64,
131 &doc.mime_type,
132 doc.path_id.as_bytes().as_slice(),
133 doc.arweave_tx.as_deref(),
134 ],
135 )
136 .map_err(|e| CPError::Database(e.to_string()))?;
137
138 Ok(())
139 }
140
141 const DOC_COLUMNS: &'static str =
143 "id, path, hash, hierarchical_hash, mtime, size, mime_type, path_id, arweave_tx";
144
145 fn row_to_document(row: &rusqlite::Row) -> rusqlite::Result<Document> {
147 let id_bytes: Vec<u8> = row.get(0)?;
148 let path_str: String = row.get(1)?;
149 let hash_bytes: Vec<u8> = row.get(2)?;
150 let hh_bytes: Vec<u8> = row.get(3)?;
151 let mtime: i64 = row.get(4)?;
152 let size: i64 = row.get(5)?;
153 let mime_type: String = row.get(6)?;
154 let path_id_bytes: Option<Vec<u8>> = row.get(7)?;
155 let arweave_tx: Option<String> = row.get(8)?;
156
157 let id = uuid_from_bytes(&id_bytes)?;
158 let mut hash = [0u8; 32];
159 hash.copy_from_slice(&hash_bytes);
160 let mut hierarchical_hash = [0u8; 32];
161 hierarchical_hash.copy_from_slice(&hh_bytes);
162
163 let path_id = path_id_bytes
164 .and_then(|b| Uuid::from_slice(&b).ok())
165 .unwrap_or(Uuid::nil());
166
167 Ok(Document {
168 id,
169 path_id,
170 path: PathBuf::from(path_str),
171 hash,
172 hierarchical_hash,
173 mtime,
174 size: size as u64,
175 mime_type,
176 arweave_tx,
177 })
178 }
179
180 pub fn set_document_arweave_tx(&self, id: Uuid, tx_id: &str) -> Result<()> {
182 self.db
183 .execute(
184 "UPDATE documents SET arweave_tx = ?1 WHERE id = ?2",
185 params![tx_id, id.as_bytes().as_slice()],
186 )
187 .map_err(|e| CPError::Database(e.to_string()))?;
188 Ok(())
189 }
190
191 pub fn get_document(&self, id: Uuid) -> Result<Option<Document>> {
193 let sql = format!("SELECT {} FROM documents WHERE id = ?1", Self::DOC_COLUMNS);
194 self.db
195 .query_row(
196 &sql,
197 params![id.as_bytes().as_slice()],
198 Self::row_to_document,
199 )
200 .optional()
201 .map_err(|e| CPError::Database(e.to_string()))
202 }
203
204 pub fn get_document_by_path(&self, path: &std::path::Path) -> Result<Option<Document>> {
206 let sql = format!(
207 "SELECT {} FROM documents WHERE path = ?1",
208 Self::DOC_COLUMNS
209 );
210 self.db
211 .query_row(
212 &sql,
213 params![path.to_string_lossy().as_ref()],
214 Self::row_to_document,
215 )
216 .optional()
217 .map_err(|e| CPError::Database(e.to_string()))
218 }
219
220 pub fn delete_document(&mut self, id: Uuid) -> Result<()> {
222 let tx = self
223 .db
224 .transaction()
225 .map_err(|e| CPError::Database(e.to_string()))?;
226
227 tx.execute(
229 "DELETE FROM edges WHERE source = ?1 OR target = ?1",
230 params![id.as_bytes().as_slice()],
231 )
232 .map_err(|e| CPError::Database(e.to_string()))?;
233 tx.execute(
234 "DELETE FROM edges WHERE source IN (SELECT id FROM chunks WHERE doc_id = ?1)
235 OR target IN (SELECT id FROM chunks WHERE doc_id = ?1)",
236 params![id.as_bytes().as_slice()],
237 )
238 .map_err(|e| CPError::Database(e.to_string()))?;
239 tx.execute(
240 "DELETE FROM edges WHERE source IN (SELECT id FROM embeddings WHERE chunk_id IN (SELECT id FROM chunks WHERE doc_id = ?1))
241 OR target IN (SELECT id FROM embeddings WHERE chunk_id IN (SELECT id FROM chunks WHERE doc_id = ?1))",
242 params![id.as_bytes().as_slice()],
243 )
244 .map_err(|e| CPError::Database(e.to_string()))?;
245
246 tx.execute(
248 "DELETE FROM embeddings WHERE chunk_id IN (SELECT id FROM chunks WHERE doc_id = ?1)",
249 params![id.as_bytes().as_slice()],
250 )
251 .map_err(|e| CPError::Database(e.to_string()))?;
252
253 tx.execute(
254 "DELETE FROM chunks WHERE doc_id = ?1",
255 params![id.as_bytes().as_slice()],
256 )
257 .map_err(|e| CPError::Database(e.to_string()))?;
258
259 tx.execute(
260 "DELETE FROM documents WHERE id = ?1",
261 params![id.as_bytes().as_slice()],
262 )
263 .map_err(|e| CPError::Database(e.to_string()))?;
264
265 tx.commit().map_err(|e| CPError::Database(e.to_string()))?;
266
267 self.hnsw.invalidate();
269
270 Ok(())
271 }
272
273 pub fn delete_chunk(&mut self, id: Uuid) -> Result<()> {
275 let tx = self
276 .db
277 .transaction()
278 .map_err(|e| CPError::Database(e.to_string()))?;
279
280 tx.execute(
282 "DELETE FROM embeddings WHERE chunk_id = ?1",
283 params![id.as_bytes().as_slice()],
284 )
285 .map_err(|e| CPError::Database(e.to_string()))?;
286 tx.execute(
288 "DELETE FROM edges WHERE source = ?1 OR target = ?1",
289 params![id.as_bytes().as_slice()],
290 )
291 .map_err(|e| CPError::Database(e.to_string()))?;
292 tx.execute(
293 "DELETE FROM chunks WHERE id = ?1",
294 params![id.as_bytes().as_slice()],
295 )
296 .map_err(|e| CPError::Database(e.to_string()))?;
297
298 tx.commit().map_err(|e| CPError::Database(e.to_string()))?;
299 self.hnsw.invalidate();
300 Ok(())
301 }
302
303 pub fn delete_embedding(&mut self, id: Uuid) -> Result<()> {
305 let tx = self
306 .db
307 .transaction()
308 .map_err(|e| CPError::Database(e.to_string()))?;
309
310 tx.execute(
311 "DELETE FROM edges WHERE source = ?1 OR target = ?1",
312 params![id.as_bytes().as_slice()],
313 )
314 .map_err(|e| CPError::Database(e.to_string()))?;
315 tx.execute(
316 "DELETE FROM embeddings WHERE id = ?1",
317 params![id.as_bytes().as_slice()],
318 )
319 .map_err(|e| CPError::Database(e.to_string()))?;
320
321 tx.commit().map_err(|e| CPError::Database(e.to_string()))?;
322 self.hnsw.invalidate();
323 Ok(())
324 }
325
326 pub fn get_all_documents(&self) -> Result<Vec<Document>> {
328 let sql = format!("SELECT {} FROM documents", Self::DOC_COLUMNS);
329 let mut stmt = self
330 .db
331 .prepare(&sql)
332 .map_err(|e| CPError::Database(e.to_string()))?;
333
334 let docs = stmt
335 .query_map([], Self::row_to_document)
336 .map_err(|e| CPError::Database(e.to_string()))?
337 .collect::<std::result::Result<Vec<_>, _>>()
338 .map_err(|e| CPError::Database(e.to_string()))?;
339
340 Ok(docs)
341 }
342
343 pub fn get_all_chunk_ids(&self) -> Result<Vec<Uuid>> {
347 let mut stmt = self
348 .db
349 .prepare("SELECT id FROM chunks ORDER BY id")
350 .map_err(|e| CPError::Database(e.to_string()))?;
351
352 let ids = stmt
353 .query_map([], |row| {
354 let id_bytes: Vec<u8> = row.get(0)?;
355 uuid_from_bytes(&id_bytes)
356 })
357 .map_err(|e| CPError::Database(e.to_string()))?
358 .collect::<std::result::Result<Vec<_>, _>>()
359 .map_err(|e| CPError::Database(e.to_string()))?;
360
361 Ok(ids)
362 }
363
364 pub fn insert_chunk(&mut self, chunk: &Chunk) -> Result<()> {
366 self.db
367 .execute(
368 "INSERT OR REPLACE INTO chunks (id, doc_id, text, byte_offset, byte_length, sequence, text_hash)
369 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
370 params![
371 chunk.id.as_bytes().as_slice(),
372 chunk.doc_id.as_bytes().as_slice(),
373 &chunk.text,
374 chunk.byte_offset as i64,
375 chunk.byte_length as i64,
376 chunk.sequence,
377 chunk.text_hash.as_slice(),
378 ],
379 )
380 .map_err(|e| CPError::Database(e.to_string()))?;
381
382 Ok(())
383 }
384
385 pub fn get_chunks_for_doc(&self, doc_id: Uuid) -> Result<Vec<Chunk>> {
387 let mut stmt = self
388 .db
389 .prepare(
390 "SELECT id, doc_id, text, byte_offset, byte_length, sequence, text_hash
391 FROM chunks WHERE doc_id = ?1 ORDER BY sequence",
392 )
393 .map_err(|e| CPError::Database(e.to_string()))?;
394
395 let chunks = stmt
396 .query_map(params![doc_id.as_bytes().as_slice()], |row| {
397 let id_bytes: Vec<u8> = row.get(0)?;
398 let doc_id_bytes: Vec<u8> = row.get(1)?;
399 let text: String = row.get(2)?;
400 let byte_offset: i64 = row.get(3)?;
401 let byte_length: i64 = row.get(4)?;
402 let sequence: u32 = row.get(5)?;
403 let text_hash_bytes: Vec<u8> = row.get(6)?;
404
405 let id = uuid_from_bytes(&id_bytes)?;
406 let doc_id = uuid_from_bytes(&doc_id_bytes)?;
407 let mut text_hash = [0u8; 32];
408 text_hash.copy_from_slice(&text_hash_bytes);
409
410 Ok(Chunk {
411 id,
412 doc_id,
413 text,
414 byte_offset: byte_offset as u64,
415 byte_length: byte_length as u64,
416 sequence,
417 text_hash,
418 })
419 })
420 .map_err(|e| CPError::Database(e.to_string()))?
421 .collect::<std::result::Result<Vec<_>, _>>()
422 .map_err(|e| CPError::Database(e.to_string()))?;
423
424 Ok(chunks)
425 }
426
427 pub fn get_chunk(&self, id: Uuid) -> Result<Option<Chunk>> {
429 self.db
430 .query_row(
431 "SELECT id, doc_id, text, byte_offset, byte_length, sequence, text_hash FROM chunks WHERE id = ?1",
432 params![id.as_bytes().as_slice()],
433 |row| {
434 let id_bytes: Vec<u8> = row.get(0)?;
435 let doc_id_bytes: Vec<u8> = row.get(1)?;
436 let text: String = row.get(2)?;
437 let byte_offset: i64 = row.get(3)?;
438 let byte_length: i64 = row.get(4)?;
439 let sequence: u32 = row.get(5)?;
440 let text_hash_bytes: Vec<u8> = row.get(6)?;
441
442 let id = uuid_from_bytes(&id_bytes)?;
443 let doc_id = uuid_from_bytes(&doc_id_bytes)?;
444 let mut text_hash = [0u8; 32];
445 text_hash.copy_from_slice(&text_hash_bytes);
446
447 Ok(Chunk {
448 id,
449 doc_id,
450 text,
451 byte_offset: byte_offset as u64,
452 byte_length: byte_length as u64,
453 sequence,
454 text_hash,
455 })
456 },
457 )
458 .optional()
459 .map_err(|e| CPError::Database(e.to_string()))
460 }
461
462 pub fn insert_embedding(&mut self, emb: &Embedding) -> Result<()> {
466 let vector_bytes: Vec<u8> = emb
469 .vector
470 .iter()
471 .flat_map(|val| val.to_le_bytes())
472 .collect();
473
474 self.db
475 .execute(
476 "INSERT OR REPLACE INTO embeddings (id, chunk_id, vector, model_hash, dim, l2_norm)
477 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
478 params![
479 emb.id.as_bytes().as_slice(),
480 emb.chunk_id.as_bytes().as_slice(),
481 &vector_bytes,
482 emb.model_hash.as_slice(),
483 i32::from(emb.dim),
484 emb.l2_norm,
485 ],
486 )
487 .map_err(|e| CPError::Database(e.to_string()))?;
488
489 let vector_f32 = emb.to_f32();
491 self.hnsw.insert(emb.id, &vector_f32)?;
492
493 self.hnsw.invalidate();
495
496 Ok(())
497 }
498
499 pub fn get_embedding_for_chunk(&self, chunk_id: Uuid) -> Result<Option<Embedding>> {
501 self.db
502 .query_row(
503 "SELECT id, chunk_id, vector, model_hash, dim, l2_norm, embedding_version
504 FROM embeddings WHERE chunk_id = ?1",
505 params![chunk_id.as_bytes().as_slice()],
506 Self::row_to_embedding,
507 )
508 .optional()
509 .map_err(|e| CPError::Database(e.to_string()))
510 }
511
512 pub fn get_embedding(&self, id: Uuid) -> Result<Option<Embedding>> {
514 self.db
515 .query_row(
516 "SELECT id, chunk_id, vector, model_hash, dim, l2_norm, embedding_version
517 FROM embeddings WHERE id = ?1",
518 params![id.as_bytes().as_slice()],
519 Self::row_to_embedding,
520 )
521 .optional()
522 .map_err(|e| CPError::Database(e.to_string()))
523 }
524
525 fn row_to_embedding(row: &rusqlite::Row) -> rusqlite::Result<Embedding> {
526 let id_bytes: Vec<u8> = row.get(0)?;
527 let chunk_id_bytes: Vec<u8> = row.get(1)?;
528 let vector_bytes: Vec<u8> = row.get(2)?;
529 let model_hash_bytes: Vec<u8> = row.get(3)?;
530 let _dim: i32 = row.get(4)?;
531 let l2_norm: f32 = row.get(5).unwrap_or(0.0);
532 let embedding_version: u32 = row.get(6).unwrap_or(0);
533
534 let _id = uuid_from_bytes(&id_bytes)?;
535 let chunk_id = uuid_from_bytes(&chunk_id_bytes)?;
536 let mut model_hash = [0u8; 32];
537 model_hash.copy_from_slice(&model_hash_bytes);
538
539 let vector: Vec<i16> = vector_bytes
541 .chunks(2)
542 .map(|bytes| i16::from_le_bytes([bytes[0], bytes[1]]))
543 .collect();
544
545 Ok(Embedding::from_quantized_with_norm(
546 chunk_id,
547 vector,
548 model_hash,
549 l2_norm,
550 embedding_version,
551 ))
552 }
553
554 pub fn get_chunk_id_for_embedding(&self, embedding_id: Uuid) -> Result<Option<Uuid>> {
556 self.db
557 .query_row(
558 "SELECT chunk_id FROM embeddings WHERE id = ?1",
559 params![embedding_id.as_bytes().as_slice()],
560 |row| {
561 let id_bytes: Vec<u8> = row.get(0)?;
562 uuid_from_bytes(&id_bytes)
563 },
564 )
565 .optional()
566 .map_err(|e| CPError::Database(e.to_string()))
567 }
568
569 pub fn get_all_embeddings(&self) -> Result<Vec<Embedding>> {
571 let mut stmt = self
572 .db
573 .prepare("SELECT id, chunk_id, vector, model_hash, dim, l2_norm, embedding_version FROM embeddings")
574 .map_err(|e| CPError::Database(e.to_string()))?;
575
576 let embs = stmt
577 .query_map([], Self::row_to_embedding)
578 .map_err(|e| CPError::Database(e.to_string()))?
579 .collect::<std::result::Result<Vec<_>, _>>()
580 .map_err(|e| CPError::Database(e.to_string()))?;
581
582 Ok(embs)
583 }
584
585 pub fn checkpoint_index(&self) -> Result<()> {
587 let root = self.compute_merkle_root()?;
588 self.hnsw.checkpoint(root)
589 }
590
591 pub fn save_index(&self) -> Result<()> {
593 self.hnsw.save()
594 }
595
596 pub fn add_edge(&mut self, edge: &Edge) -> Result<()> {
600 self.db
601 .execute(
602 "INSERT OR REPLACE INTO edges (source, target, kind, weight, metadata) VALUES (?1, ?2, ?3, ?4, ?5)",
603 params![
604 edge.source.as_bytes().as_slice(),
605 edge.target.as_bytes().as_slice(),
606 edge.kind as i32,
607 edge.weight,
608 edge.metadata.as_deref(),
609 ],
610 )
611 .map_err(|e| CPError::Database(e.to_string()))?;
612
613 Ok(())
614 }
615
616 pub fn get_edges(&self, node_id: Uuid) -> Result<Vec<Edge>> {
618 let mut stmt = self
619 .db
620 .prepare("SELECT source, target, kind, weight, metadata FROM edges WHERE source = ?1")
621 .map_err(|e| CPError::Database(e.to_string()))?;
622
623 let edges = stmt
624 .query_map(params![node_id.as_bytes().as_slice()], |row| {
625 let source_bytes: Vec<u8> = row.get(0)?;
626 let target_bytes: Vec<u8> = row.get(1)?;
627 let kind: i32 = row.get(2)?;
628 let weight: Option<f64> = row.get(3)?;
629 let metadata: Option<String> = row.get(4)?;
630
631 let source = uuid_from_bytes(&source_bytes)?;
632 let target = uuid_from_bytes(&target_bytes)?;
633 let kind = EdgeKind::from_u8(kind as u8).ok_or_else(|| {
634 rusqlite::Error::FromSqlConversionFailure(
635 2,
636 rusqlite::types::Type::Integer,
637 format!("unknown edge kind: {kind}").into(),
638 )
639 })?;
640
641 Ok(Edge {
642 source,
643 target,
644 kind,
645 weight: weight.map(|w| w as f32),
646 metadata,
647 })
648 })
649 .map_err(|e| CPError::Database(e.to_string()))?
650 .collect::<std::result::Result<Vec<_>, _>>()
651 .map_err(|e| CPError::Database(e.to_string()))?;
652
653 Ok(edges)
654 }
655
656 pub fn edges_to(&self, node_id: Uuid) -> Result<Vec<Edge>> {
658 let mut stmt = self
659 .db
660 .prepare("SELECT source, target, kind, weight, metadata FROM edges WHERE target = ?1")
661 .map_err(|e| CPError::Database(e.to_string()))?;
662
663 let edges = stmt
664 .query_map(params![node_id.as_bytes().as_slice()], |row| {
665 let source_bytes: Vec<u8> = row.get(0)?;
666 let target_bytes: Vec<u8> = row.get(1)?;
667 let kind: i32 = row.get(2)?;
668 let weight: Option<f64> = row.get(3)?;
669 let metadata: Option<String> = row.get(4)?;
670
671 let source = uuid_from_bytes(&source_bytes)?;
672 let target = uuid_from_bytes(&target_bytes)?;
673 let kind = EdgeKind::from_u8(kind as u8).ok_or_else(|| {
674 rusqlite::Error::FromSqlConversionFailure(
675 2,
676 rusqlite::types::Type::Integer,
677 format!("unknown edge kind: {kind}").into(),
678 )
679 })?;
680
681 Ok(Edge {
682 source,
683 target,
684 kind,
685 weight: weight.map(|w| w as f32),
686 metadata,
687 })
688 })
689 .map_err(|e| CPError::Database(e.to_string()))?
690 .collect::<std::result::Result<Vec<_>, _>>()
691 .map_err(|e| CPError::Database(e.to_string()))?;
692
693 Ok(edges)
694 }
695
696 pub fn all_edges(&self) -> Result<Vec<Edge>> {
698 let mut stmt = self
699 .db
700 .prepare("SELECT source, target, kind, weight, metadata FROM edges")
701 .map_err(|e| CPError::Database(e.to_string()))?;
702
703 let edges = stmt
704 .query_map([], |row| {
705 let source_bytes: Vec<u8> = row.get(0)?;
706 let target_bytes: Vec<u8> = row.get(1)?;
707 let kind: i32 = row.get(2)?;
708 let weight: Option<f64> = row.get(3)?;
709 let metadata: Option<String> = row.get(4)?;
710
711 let source = uuid_from_bytes(&source_bytes)?;
712 let target = uuid_from_bytes(&target_bytes)?;
713 let kind = EdgeKind::from_u8(kind as u8).ok_or_else(|| {
714 rusqlite::Error::FromSqlConversionFailure(
715 2,
716 rusqlite::types::Type::Integer,
717 format!("unknown edge kind: {kind}").into(),
718 )
719 })?;
720
721 Ok(Edge {
722 source,
723 target,
724 kind,
725 weight: weight.map(|w| w as f32),
726 metadata,
727 })
728 })
729 .map_err(|e| CPError::Database(e.to_string()))?
730 .collect::<std::result::Result<Vec<_>, _>>()
731 .map_err(|e| CPError::Database(e.to_string()))?;
732
733 Ok(edges)
734 }
735
736 pub fn search(&self, query_vec: &[f32], k: usize) -> Result<Vec<(Uuid, f32)>> {
740 Ok(self.hnsw.search(query_vec, k))
741 }
742
743 pub fn search_lexical(&self, query: &str, k: usize) -> Result<Vec<(Uuid, f32)>> {
745 let mut stmt = self
746 .db
747 .prepare(
748 "SELECT id, rank FROM fts_chunks
749 WHERE fts_chunks MATCH ?1
750 ORDER BY rank LIMIT ?2",
751 )
752 .map_err(|e| CPError::Database(e.to_string()))?;
753
754 let results = stmt
755 .query_map(params![query, k as i64], |row| {
756 let id_bytes: Vec<u8> = row.get(0)?;
757 let rank: f64 = row.get(1)?;
758
759 let id = uuid_from_bytes(&id_bytes)?;
760 Ok((id, -rank as f32))
764 })
765 .map_err(|e| CPError::Database(e.to_string()))?
766 .collect::<std::result::Result<Vec<_>, _>>()
767 .map_err(|e| CPError::Database(e.to_string()))?;
768
769 Ok(results)
770 }
771
772 pub fn get_sorted_chunk_hashes(&self) -> Result<Vec<([u8; 16], [u8; 32])>> {
778 let mut stmt = self
779 .db
780 .prepare("SELECT id, text_hash FROM chunks ORDER BY id")
781 .map_err(|e| CPError::Database(e.to_string()))?;
782
783 let results = stmt
784 .query_map([], |row| {
785 let id_bytes: Vec<u8> = row.get(0)?;
786 let hash_bytes: Vec<u8> = row.get(1)?;
787
788 let mut id = [0u8; 16];
789 id.copy_from_slice(&id_bytes);
790 let mut hash = [0u8; 32];
791 hash.copy_from_slice(&hash_bytes);
792
793 Ok((id, hash))
794 })
795 .map_err(|e| CPError::Database(e.to_string()))?
796 .collect::<std::result::Result<Vec<_>, _>>()
797 .map_err(|e| CPError::Database(e.to_string()))?;
798
799 Ok(results)
800 }
801
802 pub fn compute_chunk_tree_root(&self) -> Result<[u8; 32]> {
807 let sorted = self.get_sorted_chunk_hashes()?;
808 let hashes: Vec<[u8; 32]> = sorted.iter().map(|(_, h)| *h).collect();
809 Ok(cp_core::state::compute_merkle_root(&hashes))
810 }
811
812 pub fn compute_merkle_root(&self) -> Result<[u8; 32]> {
819 let mut hasher = blake3::Hasher::new();
820
821 let mut docs = self.get_all_documents()?;
823 docs.sort_by_key(|d| d.id);
824 for doc in docs {
825 hasher.update(doc.id.as_bytes());
826 hasher.update(&doc.hash);
827 hasher.update(&doc.hierarchical_hash);
828 }
830
831 let mut stmt = self
835 .db
836 .prepare("SELECT id, text_hash FROM chunks ORDER BY id")
837 .map_err(|e| CPError::Database(e.to_string()))?;
838 let chunk_iter = stmt
839 .query_map([], |row| {
840 let id_bytes: Vec<u8> = row.get(0)?;
841 let hash_bytes: Vec<u8> = row.get(1)?;
842 Ok((id_bytes, hash_bytes))
843 })
844 .map_err(|e| CPError::Database(e.to_string()))?;
845
846 for chunk in chunk_iter {
847 let (id, hash) = chunk.map_err(|e| CPError::Database(e.to_string()))?;
848 hasher.update(&id);
849 hasher.update(&hash);
850 }
851
852 let mut stmt = self
854 .db
855 .prepare("SELECT id, vector, model_hash FROM embeddings ORDER BY id")
856 .map_err(|e| CPError::Database(e.to_string()))?;
857 let emb_iter = stmt
858 .query_map([], |row| {
859 let id_bytes: Vec<u8> = row.get(0)?;
860 let vec_bytes: Vec<u8> = row.get(1)?;
861 let model_hash: Vec<u8> = row.get(2)?;
862 Ok((id_bytes, vec_bytes, model_hash))
863 })
864 .map_err(|e| CPError::Database(e.to_string()))?;
865
866 for emb in emb_iter {
867 let (id, vec, model) = emb.map_err(|e| CPError::Database(e.to_string()))?;
868 hasher.update(&id);
869 hasher.update(&vec);
870 hasher.update(&model);
871 }
872
873 let mut stmt = self
875 .db
876 .prepare("SELECT source, target, kind, weight FROM edges ORDER BY source, target, kind")
877 .map_err(|e| CPError::Database(e.to_string()))?;
878 let edge_iter = stmt
879 .query_map([], |row| {
880 let s: Vec<u8> = row.get(0)?;
881 let t: Vec<u8> = row.get(1)?;
882 let k: i32 = row.get(2)?;
883 let w: Option<f64> = row.get(3)?;
884 Ok((s, t, k, w))
885 })
886 .map_err(|e| CPError::Database(e.to_string()))?;
887
888 for edge in edge_iter {
889 let (s, t, k, w) = edge.map_err(|e| CPError::Database(e.to_string()))?;
890 hasher.update(&s);
891 hasher.update(&t);
892 hasher.update(&k.to_le_bytes());
893 if let Some(weight) = w {
894 hasher.update(&weight.to_le_bytes());
895 }
896 }
897
898 Ok(*hasher.finalize().as_bytes())
899 }
900
901 pub fn get_state_root_by_hash(&self, hash: &[u8; 32]) -> Result<Option<cp_core::StateRoot>> {
903 self.db
904 .query_row(
905 "SELECT hash, parent, hlc_wall_ms, hlc_counter, hlc_node_id, device_id, signature, seq
906 FROM state_roots WHERE hash = ?1",
907 params![hash.as_slice()],
908 |row| {
909 let hash_bytes: Vec<u8> = row.get(0)?;
910 let parent_bytes: Option<Vec<u8>> = row.get(1)?;
911 let hlc_wall_ms: i64 = row.get(2)?;
912 let hlc_counter: i32 = row.get(3)?;
913 let hlc_node_id_bytes: Vec<u8> = row.get(4)?;
914 let device_id_bytes: Vec<u8> = row.get(5)?;
915 let signature_bytes: Vec<u8> = row.get(6)?;
916 let seq: i64 = row.get(7)?;
917
918 let mut hash = [0u8; 32];
919 hash.copy_from_slice(&hash_bytes);
920
921 let parent = parent_bytes.map(|b| {
922 let mut p = [0u8; 32];
923 p.copy_from_slice(&b);
924 p
925 });
926
927 let mut hlc_node_id = [0u8; 16];
928 hlc_node_id.copy_from_slice(&hlc_node_id_bytes);
929 let hlc = cp_core::Hlc {
930 wall_ms: hlc_wall_ms as u64,
931 counter: hlc_counter as u16,
932 node_id: hlc_node_id,
933 };
934
935 let device_id = uuid_from_bytes(&device_id_bytes)?;
936
937 let mut signature = [0u8; 64];
938 signature.copy_from_slice(&signature_bytes);
939
940 Ok(cp_core::StateRoot {
941 hash,
942 parent,
943 hlc,
944 device_id,
945 signature,
946 seq: seq as u64,
947 })
948 },
949 )
950 .optional()
951 .map_err(|e| CPError::Database(e.to_string()))
952 }
953
954 pub fn get_latest_root(&self) -> Result<Option<cp_core::StateRoot>> {
956 use rusqlite::OptionalExtension;
957 self.db
958 .query_row(
959 "SELECT hash, parent, hlc_wall_ms, hlc_counter, hlc_node_id, device_id, signature, seq
960 FROM state_roots ORDER BY seq DESC LIMIT 1",
961 [],
962 |row| {
963 let hash_bytes: Vec<u8> = row.get(0)?;
964 let parent_bytes: Option<Vec<u8>> = row.get(1)?;
965 let hlc_wall_ms: i64 = row.get(2)?;
966 let hlc_counter: i32 = row.get(3)?;
967 let hlc_node_id_bytes: Vec<u8> = row.get(4)?;
968 let device_id_bytes: Vec<u8> = row.get(5)?;
969 let signature_bytes: Vec<u8> = row.get(6)?;
970 let seq: i64 = row.get(7)?;
971
972 let mut hash = [0u8; 32];
973 hash.copy_from_slice(&hash_bytes);
974
975 let parent = parent_bytes.map(|b| {
976 let mut p = [0u8; 32];
977 p.copy_from_slice(&b);
978 p
979 });
980
981 let mut hlc_node_id = [0u8; 16];
982 hlc_node_id.copy_from_slice(&hlc_node_id_bytes);
983 let hlc = cp_core::Hlc {
984 wall_ms: hlc_wall_ms as u64,
985 counter: hlc_counter as u16,
986 node_id: hlc_node_id,
987 };
988
989 let device_id = uuid_from_bytes(&device_id_bytes)?;
990
991 let mut signature = [0u8; 64];
992 signature.copy_from_slice(&signature_bytes);
993
994 Ok(cp_core::StateRoot {
995 hash,
996 parent,
997 hlc,
998 device_id,
999 signature,
1000 seq: seq as u64,
1001 })
1002 },
1003 )
1004 .optional()
1005 .map_err(|e| CPError::Database(e.to_string()))
1006 }
1007
1008 pub fn set_latest_root(&mut self, root: &cp_core::StateRoot) -> Result<()> {
1010 self.db
1011 .execute(
1012 "INSERT OR REPLACE INTO state_roots (hash, parent, hlc_wall_ms, hlc_counter, hlc_node_id, device_id, signature, seq)
1013 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1014 params![
1015 root.hash.as_slice(),
1016 root.parent.as_ref().map(<[u8; 32]>::as_slice),
1017 root.hlc.wall_ms as i64,
1018 i32::from(root.hlc.counter),
1019 root.hlc.node_id.as_slice(),
1020 root.device_id.as_bytes().as_slice(),
1021 root.signature.as_slice(),
1022 root.seq as i64,
1023 ],
1024 )
1025 .map_err(|e| CPError::Database(e.to_string()))?;
1026
1027 Ok(())
1028 }
1029
1030 pub fn apply_diff(&mut self, diff: &cp_core::CognitiveDiff) -> Result<()> {
1037 let latest_root = self.get_latest_root()?;
1039
1040 let (resolved_diff, has_conflict) = Self::resolve_conflicts(diff, latest_root.as_ref());
1042
1043 if has_conflict {
1044 info!("Conflict detected and resolved using LWW/HLC");
1045 }
1046
1047 let tx = self
1048 .db
1049 .transaction()
1050 .map_err(|e| CPError::Database(e.to_string()))?;
1051
1052 for id in &resolved_diff.removed_doc_ids {
1054 tx.execute(
1055 "DELETE FROM documents WHERE id = ?1",
1056 params![id.as_bytes().as_slice()],
1057 )
1058 .map_err(|e| CPError::Database(e.to_string()))?;
1059 }
1060 for id in &resolved_diff.removed_chunk_ids {
1061 tx.execute(
1062 "DELETE FROM chunks WHERE id = ?1",
1063 params![id.as_bytes().as_slice()],
1064 )
1065 .map_err(|e| CPError::Database(e.to_string()))?;
1066 }
1067 for id in &resolved_diff.removed_embedding_ids {
1068 tx.execute(
1069 "DELETE FROM embeddings WHERE id = ?1",
1070 params![id.as_bytes().as_slice()],
1071 )
1072 .map_err(|e| CPError::Database(e.to_string()))?;
1073 }
1074
1075 if !resolved_diff.removed_embedding_ids.is_empty() {
1077 self.hnsw.invalidate();
1078 }
1079 for (source, target, kind) in &resolved_diff.removed_edges {
1080 tx.execute(
1081 "DELETE FROM edges WHERE source = ?1 AND target = ?2 AND kind = ?3",
1082 params![
1083 source.as_bytes().as_slice(),
1084 target.as_bytes().as_slice(),
1085 *kind as i32
1086 ],
1087 )
1088 .map_err(|e| CPError::Database(e.to_string()))?;
1089 }
1090
1091 for doc in &resolved_diff.added_docs {
1093 tx.execute(
1094 "INSERT OR REPLACE INTO documents (id, path, hash, hierarchical_hash, mtime, size, mime_type, path_id, arweave_tx)
1095 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
1096 params![
1097 doc.id.as_bytes().as_slice(),
1098 doc.path.to_string_lossy().as_ref(),
1099 doc.hash.as_slice(),
1100 doc.hierarchical_hash.as_slice(),
1101 doc.mtime,
1102 doc.size as i64,
1103 &doc.mime_type,
1104 doc.path_id.as_bytes().as_slice(),
1105 doc.arweave_tx.as_deref(),
1106 ],
1107 ).map_err(|e| CPError::Database(e.to_string()))?;
1108 }
1109
1110 for chunk in &resolved_diff.added_chunks {
1111 tx.execute(
1112 "INSERT OR REPLACE INTO chunks (id, doc_id, text, byte_offset, byte_length, sequence, text_hash)
1113 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1114 params![
1115 chunk.id.as_bytes().as_slice(),
1116 chunk.doc_id.as_bytes().as_slice(),
1117 &chunk.text,
1118 chunk.byte_offset as i64,
1119 chunk.byte_length as i64,
1120 chunk.sequence,
1121 chunk.text_hash.as_slice(),
1122 ],
1123 ).map_err(|e| CPError::Database(e.to_string()))?;
1124 }
1125
1126 for emb in &resolved_diff.added_embeddings {
1127 let vector_bytes: Vec<u8> = emb.vector.iter().flat_map(|f| f.to_le_bytes()).collect();
1128 tx.execute(
1129 "INSERT OR REPLACE INTO embeddings (id, chunk_id, vector, model_hash, dim, l2_norm)
1130 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1131 params![
1132 emb.id.as_bytes().as_slice(),
1133 emb.chunk_id.as_bytes().as_slice(),
1134 &vector_bytes,
1135 emb.model_hash.as_slice(),
1136 i32::from(emb.dim),
1137 emb.l2_norm,
1138 ],
1139 )
1140 .map_err(|e| CPError::Database(e.to_string()))?;
1141 }
1142
1143 for edge in &resolved_diff.added_edges {
1144 tx.execute(
1145 "INSERT OR REPLACE INTO edges (source, target, kind, weight) VALUES (?1, ?2, ?3, ?4)",
1146 params![
1147 edge.source.as_bytes().as_slice(),
1148 edge.target.as_bytes().as_slice(),
1149 edge.kind as i32,
1150 edge.weight,
1151 ],
1152 ).map_err(|e| CPError::Database(e.to_string()))?;
1153 }
1154
1155 tx.execute(
1157 "INSERT OR REPLACE INTO state_roots (hash, parent, hlc_wall_ms, hlc_counter, hlc_node_id, device_id, signature, seq)
1158 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
1159 params![
1160 resolved_diff.metadata.new_root.as_slice(),
1161 if resolved_diff.metadata.prev_root == [0u8; 32] { None } else { Some(resolved_diff.metadata.prev_root.as_slice()) },
1162 resolved_diff.metadata.hlc.wall_ms as i64,
1163 i32::from(resolved_diff.metadata.hlc.counter),
1164 resolved_diff.metadata.hlc.node_id.as_slice(),
1165 resolved_diff.metadata.device_id.as_bytes().as_slice(),
1166 [0u8; 64].as_slice(),
1167 resolved_diff.metadata.seq as i64,
1168 ],
1169 ).map_err(|e| CPError::Database(e.to_string()))?;
1170
1171 tx.commit().map_err(|e| CPError::Database(e.to_string()))?;
1172
1173 for emb in &resolved_diff.added_embeddings {
1175 let vec_f32 = emb.to_f32();
1176 if let Err(e) = self.hnsw.insert(emb.id, &vec_f32) {
1177 tracing::warn!(embedding_id = %emb.id, error = %e, "HNSW insert failed after diff apply");
1178 }
1179 }
1180
1181 self.hnsw.invalidate();
1183
1184 Ok(())
1185 }
1186
1187 fn resolve_conflicts(
1195 incoming_diff: &cp_core::CognitiveDiff,
1196 current_root: Option<&cp_core::StateRoot>,
1197 ) -> (cp_core::CognitiveDiff, bool) {
1198 let mut has_conflict = false;
1199
1200 let Some(current_root) = current_root else {
1202 return (incoming_diff.clone(), false);
1203 };
1204
1205 let incoming_hlc = &incoming_diff.metadata.hlc;
1207 let current_hlc = ¤t_root.hlc;
1208
1209 let incoming_is_newer = incoming_hlc > current_hlc;
1211
1212 if !incoming_is_newer && incoming_hlc != current_hlc {
1214 info!(
1215 "Conflict: incoming diff HLC {:?} is older than current {:?}",
1216 incoming_hlc, current_hlc
1217 );
1218 has_conflict = true;
1219 return (
1221 cp_core::CognitiveDiff::empty(
1222 current_root.hash,
1223 incoming_diff.metadata.device_id,
1224 incoming_diff.metadata.seq,
1225 incoming_diff.metadata.hlc.clone(),
1226 ),
1227 has_conflict,
1228 );
1229 }
1230
1231 if incoming_hlc == current_hlc {
1233 let incoming_hash = blake3::hash(&incoming_diff.metadata.new_root);
1234 let current_hash = blake3::hash(¤t_root.hash);
1235
1236 if incoming_hash.as_bytes() <= current_hash.as_bytes() {
1238 info!("Conflict: equal HLC, current hash wins (tiebreaker)");
1239 has_conflict = true;
1240 return (
1241 cp_core::CognitiveDiff::empty(
1242 current_root.hash,
1243 incoming_diff.metadata.device_id,
1244 incoming_diff.metadata.seq,
1245 incoming_diff.metadata.hlc.clone(),
1246 ),
1247 has_conflict,
1248 );
1249 }
1250 }
1251
1252 (incoming_diff.clone(), has_conflict)
1254 }
1255
1256 pub fn clear_state_roots(&mut self) -> Result<()> {
1258 self.db
1259 .execute_batch("DELETE FROM state_roots;")
1260 .map_err(|e| CPError::Database(e.to_string()))?;
1261 info!("Cleared state roots");
1262 Ok(())
1263 }
1264
1265 pub fn clear_all(&mut self) -> Result<()> {
1267 self.db
1268 .execute_batch(
1269 r"
1270 DELETE FROM embeddings;
1271 DELETE FROM chunks;
1272 DELETE FROM documents;
1273 DELETE FROM edges;
1274 DELETE FROM state_roots;
1275 ",
1276 )
1277 .map_err(|e| CPError::Database(e.to_string()))?;
1278
1279 self.hnsw.clear()?;
1281
1282 info!("Cleared all data from graph store for fresh sync");
1283 Ok(())
1284 }
1285
1286 pub fn stats(&self) -> Result<GraphStats> {
1288 let doc_count: i64 = self
1289 .db
1290 .query_row("SELECT COUNT(*) FROM documents", [], |row| row.get(0))
1291 .map_err(|e| CPError::Database(e.to_string()))?;
1292
1293 let chunk_count: i64 = self
1294 .db
1295 .query_row("SELECT COUNT(*) FROM chunks", [], |row| row.get(0))
1296 .map_err(|e| CPError::Database(e.to_string()))?;
1297
1298 let embedding_count: i64 = self
1299 .db
1300 .query_row("SELECT COUNT(*) FROM embeddings", [], |row| row.get(0))
1301 .map_err(|e| CPError::Database(e.to_string()))?;
1302
1303 let edge_count: i64 = self
1304 .db
1305 .query_row("SELECT COUNT(*) FROM edges", [], |row| row.get(0))
1306 .map_err(|e| CPError::Database(e.to_string()))?;
1307
1308 Ok(GraphStats {
1309 documents: doc_count as usize,
1310 chunks: chunk_count as usize,
1311 embeddings: embedding_count as usize,
1312 edges: edge_count as usize,
1313 })
1314 }
1315}
1316
1317#[derive(Debug, Clone)]
1319pub struct GraphStats {
1320 pub documents: usize,
1321 pub chunks: usize,
1322 pub embeddings: usize,
1323 pub edges: usize,
1324}
1325
1326#[cfg(test)]
1327mod tests {
1328 use super::*;
1329 use std::path::PathBuf;
1330
1331 fn fresh_store() -> GraphStore {
1332 GraphStore::in_memory().unwrap()
1333 }
1334
1335 fn test_hlc() -> cp_core::Hlc {
1336 cp_core::Hlc::new(1234567890, [0u8; 16])
1337 }
1338
1339 #[test]
1342 fn test_graph_store_new() {
1343 let store = GraphStore::in_memory().unwrap();
1344 assert!(store.stats().is_ok());
1345 }
1346
1347 #[test]
1348 fn test_graph_store_in_memory() {
1349 let store = GraphStore::in_memory().unwrap();
1350 let stats = store.stats().unwrap();
1351 assert_eq!(stats.documents, 0);
1352 assert_eq!(stats.chunks, 0);
1353 assert_eq!(stats.embeddings, 0);
1354 assert_eq!(stats.edges, 0);
1355 }
1356
1357 #[test]
1358 fn test_graph_store_insert_document() {
1359 let mut store = fresh_store();
1360 let doc = Document::new(PathBuf::from("test.md"), b"Hello, world!", 12345);
1361 store.insert_document(&doc).unwrap();
1362
1363 let retrieved = store.get_document(doc.id).unwrap().unwrap();
1364 assert_eq!(retrieved.id, doc.id);
1365 assert_eq!(retrieved.path, doc.path);
1366 assert_eq!(retrieved.hash, doc.hash);
1367 }
1368
1369 #[test]
1370 fn test_graph_store_get_document_by_id() {
1371 let mut store = fresh_store();
1372 let doc = Document::new(PathBuf::from("test.md"), b"Content", 12345);
1373 store.insert_document(&doc).unwrap();
1374
1375 let retrieved = store.get_document(doc.id).unwrap();
1376 assert!(retrieved.is_some());
1377 assert_eq!(retrieved.unwrap().id, doc.id);
1378 }
1379
1380 #[test]
1381 fn test_graph_store_get_document_by_path() {
1382 let mut store = fresh_store();
1383 let path = PathBuf::from("test.md");
1384 let doc = Document::new(path.clone(), b"Content", 12345);
1385 store.insert_document(&doc).unwrap();
1386
1387 let retrieved = store.get_document_by_path(&path).unwrap();
1388 assert!(retrieved.is_some());
1389 assert_eq!(retrieved.unwrap().path, path);
1390 }
1391
1392 #[test]
1393 fn test_graph_store_document_not_found() {
1394 let store = fresh_store();
1395 let non_existent_id = Uuid::new_v4();
1396 let retrieved = store.get_document(non_existent_id).unwrap();
1397 assert!(retrieved.is_none());
1398
1399 let non_existent_path = PathBuf::from("non_existent.md");
1400 let retrieved_by_path = store.get_document_by_path(&non_existent_path).unwrap();
1401 assert!(retrieved_by_path.is_none());
1402 }
1403
1404 #[test]
1405 fn test_graph_store_update_document() {
1406 let mut store = fresh_store();
1407
1408 let doc = Document::new(PathBuf::from("test.md"), b"Original content", 12345);
1410 let original_id = doc.id;
1411 store.insert_document(&doc).unwrap();
1412
1413 store.insert_document(&doc).unwrap();
1415
1416 let docs = store.get_all_documents().unwrap();
1418 assert_eq!(docs.len(), 1);
1419
1420 let retrieved = store.get_document(original_id).unwrap();
1422 assert!(retrieved.is_some());
1423 }
1424
1425 #[test]
1426 fn test_graph_store_delete_document() {
1427 let mut store = fresh_store();
1428
1429 let doc = Document::new(PathBuf::from("test.md"), b"Content", 12345);
1431 store.insert_document(&doc).unwrap();
1432
1433 let chunk = Chunk::new(doc.id, "Test chunk", 0, 0);
1434 store.insert_chunk(&chunk).unwrap();
1435
1436 let vector: Vec<f32> = (0..1536).map(|i| i as f32 * 0.01).collect();
1437 let emb = Embedding::new(chunk.id, &vector, [0u8; 32], 0);
1438 store.insert_embedding(&emb).unwrap();
1439
1440 store.delete_document(doc.id).unwrap();
1442
1443 let retrieved = store.get_document(doc.id).unwrap();
1445 assert!(retrieved.is_none());
1446
1447 let chunks = store.get_chunks_for_doc(doc.id).unwrap();
1449 assert!(chunks.is_empty());
1450
1451 let embedding = store.get_embedding(emb.id).unwrap();
1453 assert!(embedding.is_none());
1454 }
1455
1456 #[test]
1457 fn test_graph_store_all_documents() {
1458 let mut store = fresh_store();
1459
1460 let doc1 = Document::new(PathBuf::from("test1.md"), b"Content 1", 12345);
1461 let doc2 = Document::new(PathBuf::from("test2.md"), b"Content 2", 12346);
1462
1463 store.insert_document(&doc1).unwrap();
1464 store.insert_document(&doc2).unwrap();
1465
1466 let all_docs = store.get_all_documents().unwrap();
1467 assert_eq!(all_docs.len(), 2);
1468 }
1469
1470 #[test]
1471 fn test_graph_store_insert_chunk() {
1472 let mut store = fresh_store();
1473
1474 let doc = Document::new(PathBuf::from("test.md"), b"Content", 12345);
1475 store.insert_document(&doc).unwrap();
1476
1477 let chunk = Chunk::new(doc.id, "Test chunk text", 0, 0);
1478 store.insert_chunk(&chunk).unwrap();
1479
1480 let retrieved = store.get_chunk(chunk.id).unwrap().unwrap();
1481 assert_eq!(retrieved.id, chunk.id);
1482 assert_eq!(retrieved.doc_id, doc.id);
1483 }
1484
1485 #[test]
1486 fn test_graph_store_get_chunk() {
1487 let mut store = fresh_store();
1488
1489 let doc = Document::new(PathBuf::from("test.md"), b"Content", 12345);
1490 store.insert_document(&doc).unwrap();
1491
1492 let chunk = Chunk::new(doc.id, "Test chunk", 0, 0);
1493 store.insert_chunk(&chunk).unwrap();
1494
1495 let retrieved = store.get_chunk(chunk.id).unwrap();
1496 assert!(retrieved.is_some());
1497 assert_eq!(retrieved.unwrap().id, chunk.id);
1498 }
1499
1500 #[test]
1501 fn test_graph_store_chunks_for_document() {
1502 let mut store = fresh_store();
1503
1504 let doc = Document::new(PathBuf::from("test.md"), b"Content", 12345);
1505 store.insert_document(&doc).unwrap();
1506
1507 let chunk1 = Chunk::new(doc.id, "Chunk 1", 0, 10);
1508 let chunk2 = Chunk::new(doc.id, "Chunk 2", 10, 20);
1509
1510 store.insert_chunk(&chunk1).unwrap();
1511 store.insert_chunk(&chunk2).unwrap();
1512
1513 let chunks = store.get_chunks_for_doc(doc.id).unwrap();
1514 assert_eq!(chunks.len(), 2);
1515 }
1516
1517 #[test]
1518 fn test_graph_store_delete_chunks_for_document() {
1519 let mut store = fresh_store();
1520
1521 let doc = Document::new(PathBuf::from("test.md"), b"Content", 12345);
1522 store.insert_document(&doc).unwrap();
1523
1524 let chunk = Chunk::new(doc.id, "Test chunk", 0, 0);
1525 store.insert_chunk(&chunk).unwrap();
1526
1527 store.delete_document(doc.id).unwrap();
1529
1530 let chunks = store.get_chunks_for_doc(doc.id).unwrap();
1531 assert!(chunks.is_empty());
1532 }
1533
1534 #[test]
1535 fn test_graph_store_insert_embedding() {
1536 let mut store = fresh_store();
1537
1538 let doc = Document::new(PathBuf::from("test.md"), b"Content", 12345);
1539 store.insert_document(&doc).unwrap();
1540
1541 let chunk = Chunk::new(doc.id, "Test chunk", 0, 0);
1542 store.insert_chunk(&chunk).unwrap();
1543
1544 let vector: Vec<f32> = (0..1536).map(|i| i as f32 * 0.01).collect();
1545 let emb = Embedding::new(chunk.id, &vector, [0u8; 32], 0);
1546
1547 store.insert_embedding(&emb).unwrap();
1548
1549 let retrieved = store.get_embedding(emb.id).unwrap().unwrap();
1550 assert_eq!(retrieved.id, emb.id);
1551 }
1552
1553 #[test]
1554 fn test_graph_store_get_embedding() {
1555 let mut store = fresh_store();
1556
1557 let doc = Document::new(PathBuf::from("test.md"), b"Content", 12345);
1558 store.insert_document(&doc).unwrap();
1559
1560 let chunk = Chunk::new(doc.id, "Test chunk", 0, 0);
1561 store.insert_chunk(&chunk).unwrap();
1562
1563 let vector: Vec<f32> = (0..1536).map(|i| i as f32 * 0.01).collect();
1564 let emb = Embedding::new(chunk.id, &vector, [0u8; 32], 0);
1565
1566 store.insert_embedding(&emb).unwrap();
1567
1568 let retrieved = store.get_embedding(emb.id).unwrap();
1569 assert!(retrieved.is_some());
1570 assert_eq!(retrieved.unwrap().id, emb.id);
1571 }
1572
1573 #[test]
1574 fn test_graph_store_embeddings_for_chunk() {
1575 let mut store = fresh_store();
1576
1577 let doc = Document::new(PathBuf::from("test.md"), b"Content", 12345);
1578 store.insert_document(&doc).unwrap();
1579
1580 let chunk = Chunk::new(doc.id, "Test chunk", 0, 0);
1581 store.insert_chunk(&chunk).unwrap();
1582
1583 let vector: Vec<f32> = (0..1536).map(|i| i as f32 * 0.01).collect();
1584 let emb = Embedding::new(chunk.id, &vector, [0u8; 32], 0);
1585
1586 store.insert_embedding(&emb).unwrap();
1587
1588 let retrieved = store.get_embedding_for_chunk(chunk.id).unwrap();
1589 assert!(retrieved.is_some());
1590 assert_eq!(retrieved.unwrap().chunk_id, chunk.id);
1591 }
1592
1593 #[test]
1594 fn test_graph_store_all_embeddings() {
1595 let mut store = fresh_store();
1596
1597 let doc = Document::new(PathBuf::from("test.md"), b"Content", 12345);
1598 store.insert_document(&doc).unwrap();
1599
1600 let chunk = Chunk::new(doc.id, "Test chunk", 0, 0);
1601 store.insert_chunk(&chunk).unwrap();
1602
1603 let vector: Vec<f32> = (0..1536).map(|i| i as f32 * 0.01).collect();
1604 let emb = Embedding::new(chunk.id, &vector, [0u8; 32], 0);
1605
1606 store.insert_embedding(&emb).unwrap();
1607
1608 let all_embeddings = store.get_all_embeddings().unwrap();
1609 assert_eq!(all_embeddings.len(), 1);
1610 }
1611
1612 #[test]
1613 fn test_graph_store_insert_edge() {
1614 let mut store = fresh_store();
1615
1616 let source = Uuid::new_v4();
1617 let target = Uuid::new_v4();
1618 let edge = Edge::new(source, target, EdgeKind::DocToChunk);
1619
1620 store.add_edge(&edge).unwrap();
1621
1622 let edges = store.get_edges(source).unwrap();
1623 assert_eq!(edges.len(), 1);
1624 assert_eq!(edges[0].target, target);
1625 }
1626
1627 #[test]
1628 fn test_graph_store_edges_from() {
1629 let mut store = fresh_store();
1630
1631 let source = Uuid::new_v4();
1632 let target1 = Uuid::new_v4();
1633 let target2 = Uuid::new_v4();
1634
1635 let edge1 = Edge::new(source, target1, EdgeKind::DocToChunk);
1636 let edge2 = Edge::new(source, target2, EdgeKind::ChunkToChunk);
1637
1638 store.add_edge(&edge1).unwrap();
1639 store.add_edge(&edge2).unwrap();
1640
1641 let edges = store.get_edges(source).unwrap();
1642 assert_eq!(edges.len(), 2);
1643 }
1644
1645 #[test]
1646 fn test_graph_store_edges_to() {
1647 let mut store = fresh_store();
1648
1649 let source1 = Uuid::new_v4();
1650 let source2 = Uuid::new_v4();
1651 let target = Uuid::new_v4();
1652
1653 let edge1 = Edge::new(source1, target, EdgeKind::DocToChunk);
1654 let edge2 = Edge::new(source2, target, EdgeKind::ChunkToChunk);
1655
1656 store.add_edge(&edge1).unwrap();
1657 store.add_edge(&edge2).unwrap();
1658
1659 let edges_from_source1 = store.get_edges(source1).unwrap();
1661 assert_eq!(edges_from_source1.len(), 1);
1662
1663 let edges_from_source2 = store.get_edges(source2).unwrap();
1664 assert_eq!(edges_from_source2.len(), 1);
1665 }
1666
1667 #[test]
1668 fn test_graph_store_all_edges() {
1669 let mut store = fresh_store();
1670
1671 let edge1 = Edge::new(Uuid::new_v4(), Uuid::new_v4(), EdgeKind::DocToChunk);
1672 let edge2 = Edge::new(Uuid::new_v4(), Uuid::new_v4(), EdgeKind::ChunkToChunk);
1673
1674 store.add_edge(&edge1).unwrap();
1675 store.add_edge(&edge2).unwrap();
1676
1677 let stats = store.stats().unwrap();
1679 assert_eq!(stats.edges, 2);
1680 }
1681
1682 #[test]
1683 fn test_graph_store_compute_merkle_root() {
1684 let mut store = fresh_store();
1685
1686 let root1 = store.compute_merkle_root().unwrap();
1687
1688 let doc = Document::new(PathBuf::from("test.md"), b"Hello", 0);
1689 store.insert_document(&doc).unwrap();
1690
1691 let root2 = store.compute_merkle_root().unwrap();
1692
1693 assert_ne!(root1, root2);
1695 }
1696
1697 #[test]
1698 fn test_graph_store_insert_state_root() {
1699 let mut store = fresh_store();
1700
1701 let state_root =
1702 cp_core::StateRoot::new([1u8; 32], Some([2u8; 32]), test_hlc(), Uuid::new_v4(), 1);
1703 store.set_latest_root(&state_root).unwrap();
1704
1705 let retrieved = store.get_latest_root().unwrap();
1706 assert!(retrieved.is_some());
1707 assert_eq!(retrieved.unwrap().hash, state_root.hash);
1708 }
1709
1710 #[test]
1711 fn test_graph_store_get_latest_state_root() {
1712 let mut store = fresh_store();
1713
1714 let latest = store.get_latest_root().unwrap();
1716 assert!(latest.is_none());
1717
1718 let state_root = cp_core::StateRoot::new([1u8; 32], None, test_hlc(), Uuid::new_v4(), 0);
1720 store.set_latest_root(&state_root).unwrap();
1721
1722 let retrieved = store.get_latest_root().unwrap();
1723 assert!(retrieved.is_some());
1724 }
1725
1726 #[test]
1727 fn test_graph_store_stats() {
1728 let mut store = fresh_store();
1729
1730 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1731 store.insert_document(&doc).unwrap();
1732
1733 let stats = store.stats().unwrap();
1734 assert_eq!(stats.documents, 1);
1735 assert_eq!(stats.chunks, 0);
1736 assert_eq!(stats.embeddings, 0);
1737 assert_eq!(stats.edges, 0);
1738 }
1739
1740 #[test]
1741 fn test_graph_store_stats_empty() {
1742 let store = fresh_store();
1743
1744 let stats = store.stats().unwrap();
1745 assert_eq!(stats.documents, 0);
1746 assert_eq!(stats.chunks, 0);
1747 assert_eq!(stats.embeddings, 0);
1748 assert_eq!(stats.edges, 0);
1749 }
1750
1751 #[test]
1754 fn test_transaction_begin() {
1755 let store = fresh_store();
1756 assert!(store.stats().is_ok());
1758 }
1759
1760 #[test]
1761 fn test_transaction_commit() {
1762 let mut store = fresh_store();
1763
1764 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1766 store.insert_document(&doc).unwrap();
1767
1768 let retrieved = store.get_document(doc.id).unwrap();
1770 assert!(retrieved.is_some());
1771 }
1772
1773 #[test]
1774 fn test_transaction_rollback() {
1775 let mut store = fresh_store();
1778
1779 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1780
1781 store.insert_document(&doc).unwrap();
1783
1784 let retrieved = store.get_document(doc.id).unwrap();
1786 assert!(retrieved.is_some());
1787 }
1788
1789 #[test]
1790 fn test_transaction_atomicity() {
1791 let mut store = fresh_store();
1792
1793 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1794 store.insert_document(&doc).unwrap();
1795
1796 let stats = store.stats().unwrap();
1797 assert_eq!(stats.documents, 1);
1798 }
1799
1800 #[test]
1801 fn test_transaction_isolation() {
1802 let store = fresh_store();
1803 let stats1 = store.stats().unwrap();
1806 let stats2 = store.stats().unwrap();
1807 assert_eq!(stats1.documents, stats2.documents);
1808 }
1809
1810 #[test]
1813 fn test_fts_search_basic() {
1814 let mut store = fresh_store();
1815
1816 let doc = Document::new(PathBuf::from("test.md"), b"Some test content", 0);
1817 store.insert_document(&doc).unwrap();
1818
1819 let chunk = Chunk::new(doc.id, "Hello world this is a test", 0, 0);
1820 store.insert_chunk(&chunk).unwrap();
1821
1822 std::thread::sleep(std::time::Duration::from_millis(100));
1824
1825 let _results = store.search_lexical("test", 10).unwrap();
1826 }
1829
1830 #[test]
1831 fn test_fts_search_multiple_terms() {
1832 let mut store = fresh_store();
1833
1834 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1835 store.insert_document(&doc).unwrap();
1836
1837 let chunk = Chunk::new(doc.id, "Hello world test Rust programming", 0, 0);
1838 store.insert_chunk(&chunk).unwrap();
1839
1840 std::thread::sleep(std::time::Duration::from_millis(100));
1841
1842 let _results = store.search_lexical("hello world", 10).unwrap();
1844 }
1846
1847 #[test]
1848 fn test_fts_search_no_results() {
1849 let mut store = fresh_store();
1850
1851 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1852 store.insert_document(&doc).unwrap();
1853
1854 let chunk = Chunk::new(doc.id, "Some content here", 0, 0);
1855 store.insert_chunk(&chunk).unwrap();
1856
1857 std::thread::sleep(std::time::Duration::from_millis(100));
1858
1859 let _results = store.search_lexical("nonexistentterm12345", 10).unwrap();
1860 }
1862
1863 #[test]
1864 fn test_fts_search_ranking() {
1865 let mut store = fresh_store();
1866
1867 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1868 store.insert_document(&doc).unwrap();
1869
1870 let chunk1 = Chunk::new(doc.id, "test word test", 0, 0);
1871
1872 store.insert_chunk(&chunk1).unwrap();
1873
1874 std::thread::sleep(std::time::Duration::from_millis(100));
1875
1876 let results = store.search_lexical("test", 10);
1878 assert!(results.is_ok());
1880 }
1881
1882 #[test]
1883 fn test_fts_search_unicode() {
1884 let mut store = fresh_store();
1885
1886 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1887 store.insert_document(&doc).unwrap();
1888
1889 let chunk = Chunk::new(doc.id, "Hello 世界 unicode", 0, 0);
1890 store.insert_chunk(&chunk).unwrap();
1891
1892 std::thread::sleep(std::time::Duration::from_millis(100));
1893
1894 let _results = store.search_lexical("世界", 10).unwrap();
1896 }
1897
1898 #[test]
1899 fn test_fts_trigger_insert() {
1900 let mut store = fresh_store();
1901
1902 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1903 store.insert_document(&doc).unwrap();
1904
1905 let chunk = Chunk::new(doc.id, "Trigger test content", 0, 0);
1907 store.insert_chunk(&chunk).unwrap();
1908
1909 std::thread::sleep(std::time::Duration::from_millis(100));
1911
1912 let _results = store.search_lexical("trigger", 10).unwrap();
1914 }
1915
1916 #[test]
1917 fn test_fts_trigger_delete() {
1918 let mut store = fresh_store();
1919
1920 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1921 store.insert_document(&doc).unwrap();
1922
1923 let chunk = Chunk::new(doc.id, "Delete test content", 0, 0);
1924 store.insert_chunk(&chunk).unwrap();
1925
1926 std::thread::sleep(std::time::Duration::from_millis(100));
1927
1928 store.delete_document(doc.id).unwrap();
1930
1931 std::thread::sleep(std::time::Duration::from_millis(100));
1932
1933 let _results = store.search_lexical("delete", 10).unwrap();
1935 }
1936
1937 #[test]
1938 fn test_fts_trigger_update() {
1939 let mut store = fresh_store();
1941
1942 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1943 store.insert_document(&doc).unwrap();
1944
1945 let chunk = Chunk::new(doc.id, "Original content", 0, 0);
1946 store.insert_chunk(&chunk).unwrap();
1947
1948 std::thread::sleep(std::time::Duration::from_millis(100));
1949
1950 store.delete_document(doc.id).unwrap();
1952
1953 let updated_chunk = Chunk::new(doc.id, "Updated content", 0, 0);
1954 store.insert_document(&doc).unwrap();
1955 store.insert_chunk(&updated_chunk).unwrap();
1956
1957 std::thread::sleep(std::time::Duration::from_millis(100));
1958
1959 let _results = store.search_lexical("updated", 10).unwrap();
1961 }
1962
1963 #[test]
1966 fn test_document_roundtrip() {
1967 let mut store = fresh_store();
1968
1969 let doc = Document::new(PathBuf::from("test.md"), b"Hello, world!", 12345);
1970 store.insert_document(&doc).unwrap();
1971
1972 let retrieved = store.get_document(doc.id).unwrap().unwrap();
1973 assert_eq!(retrieved.id, doc.id);
1974 assert_eq!(retrieved.path, doc.path);
1975 assert_eq!(retrieved.hash, doc.hash);
1976 }
1977
1978 #[test]
1979 fn test_chunk_operations() {
1980 let mut store = fresh_store();
1981
1982 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
1983 store.insert_document(&doc).unwrap();
1984
1985 let chunk = Chunk::new(doc.id, "Test chunk text", 0, 0);
1987 store.insert_chunk(&chunk).unwrap();
1988
1989 let chunks = store.get_chunks_for_doc(doc.id).unwrap();
1990 assert_eq!(chunks.len(), 1);
1991 assert_eq!(chunks[0].text, "Test chunk text\n");
1993 }
1994
1995 #[test]
1996 fn test_embedding_and_search() {
1997 let mut store = fresh_store();
1998
1999 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
2000 store.insert_document(&doc).unwrap();
2001
2002 let chunk = Chunk::new(doc.id, "Test text", 0, 0);
2003 store.insert_chunk(&chunk).unwrap();
2004
2005 let vector: Vec<f32> = (0..1536).map(|i| (i as f32) * 0.01).collect();
2007 let emb = Embedding::new(chunk.id, &vector, [0u8; 32], 0);
2008 store.insert_embedding(&emb).unwrap();
2009
2010 let results = store.search(&vector, 1).unwrap();
2012 assert_eq!(results.len(), 1);
2013 assert!(results[0].1 > 0.99); }
2015
2016 #[test]
2017 fn test_edge_operations() {
2018 let mut store = fresh_store();
2019
2020 let source = Uuid::new_v4();
2021 let target = Uuid::new_v4();
2022 let edge = Edge::new(source, target, EdgeKind::DocToChunk);
2023
2024 store.add_edge(&edge).unwrap();
2025
2026 let edges = store.get_edges(source).unwrap();
2027 assert_eq!(edges.len(), 1);
2028 assert_eq!(edges[0].target, target);
2029 }
2030
2031 #[test]
2032 fn test_merkle_root() {
2033 let mut store = fresh_store();
2034
2035 let root1 = store.compute_merkle_root().unwrap();
2036
2037 let doc = Document::new(PathBuf::from("test.md"), b"Hello", 0);
2038 store.insert_document(&doc).unwrap();
2039
2040 let root2 = store.compute_merkle_root().unwrap();
2041
2042 assert_ne!(root1, root2);
2044 }
2045
2046 #[test]
2047 fn test_stats() {
2048 let mut store = fresh_store();
2049
2050 let doc = Document::new(PathBuf::from("test.md"), b"Content", 0);
2051 store.insert_document(&doc).unwrap();
2052
2053 let stats = store.stats().unwrap();
2054 assert_eq!(stats.documents, 1);
2055 assert_eq!(stats.chunks, 0);
2056 }
2057}