1use cp_core::{AssembledContext, CPError, Chunk, ContextAssembler, Result, ScoredChunk};
8use cp_tor::types::{
9 MergedSearchResult, RemoteSearchResult, ResultSource, SearchResponse, SearchStatus, MAX_RESULTS,
10};
11use glob::Pattern;
12use lru::LruCache;
13use std::collections::HashMap;
14use std::fmt::Write as _;
15use std::num::NonZeroUsize;
16use std::sync::{Arc, Mutex, RwLock};
17use tracing::{info, warn};
18use uuid::Uuid;
19
20#[derive(Debug, Clone)]
24pub enum Filter {
25 DocumentPath(String),
27 MimeType(String),
29 ModifiedAfter(i64),
31 ModifiedBefore(i64),
33}
34
35impl Filter {
36 pub fn matches(&self, doc: &cp_core::Document) -> bool {
38 match self {
39 Filter::DocumentPath(pattern) => {
40 if let Ok(glob) = Pattern::new(pattern) {
41 glob.matches(doc.path.to_string_lossy().as_ref())
42 } else {
43 false
44 }
45 }
46 Filter::MimeType(mime) => doc.mime_type == *mime,
47 Filter::ModifiedAfter(ts) => doc.mtime > *ts,
48 Filter::ModifiedBefore(ts) => doc.mtime < *ts,
49 }
50 }
51}
52
53pub struct QueryCache {
57 cache: RwLock<LruCache<[u8; 32], Vec<Uuid>>>,
59 state_root: RwLock<[u8; 32]>,
61}
62
63impl QueryCache {
64 pub fn new(capacity: usize) -> Self {
66 Self {
67 cache: RwLock::new(LruCache::new(
68 NonZeroUsize::new(capacity).unwrap_or(NonZeroUsize::new(100).unwrap()),
69 )),
70 state_root: RwLock::new([0u8; 32]),
71 }
72 }
73
74 pub fn get(&self, query: &str, k: usize) -> Option<Vec<Uuid>> {
76 let hash = Self::hash_key(query, k);
77 self.cache.write().ok()?.get(&hash).cloned()
78 }
79
80 pub fn put(&self, query: &str, k: usize, results: Vec<Uuid>) {
82 let hash = Self::hash_key(query, k);
83 if let Ok(mut cache) = self.cache.write() {
84 cache.put(hash, results);
85 }
86 }
87
88 pub fn is_valid(&self, current_root: &[u8; 32]) -> bool {
90 if let Ok(root) = self.state_root.read() {
91 *root == *current_root
92 } else {
93 false
94 }
95 }
96
97 pub fn invalidate(&self, new_root: [u8; 32]) {
99 if let Ok(mut cache) = self.cache.write() {
100 cache.clear();
101 }
102 if let Ok(mut root) = self.state_root.write() {
103 *root = new_root;
104 }
105 }
106
107 fn hash_key(query: &str, k: usize) -> [u8; 32] {
109 let mut hasher = blake3::Hasher::new();
110 hasher.update(query.as_bytes());
111 hasher.update(&k.to_le_bytes());
112 *hasher.finalize().as_bytes()
113 }
114}
115
116impl Default for QueryCache {
117 fn default() -> Self {
118 Self::new(100)
119 }
120}
121
122#[derive(Debug, Clone, serde::Serialize)]
124pub struct SearchResult {
125 pub chunk: Chunk,
127 pub score: f32,
129 pub doc_path: String,
131}
132
133#[derive(Debug, Clone, serde::Serialize)]
135pub struct GenerationResult {
136 pub answer: String,
138 pub context: String,
140 pub latency_ms: u64,
142}
143
144#[derive(Debug, Clone, serde::Serialize)]
148pub struct Citation {
149 pub chunk_id: Uuid,
151 pub span: (usize, usize),
153 pub confidence: f32,
155}
156
157#[derive(Debug, Clone, serde::Serialize)]
161pub struct ValidationResult {
162 pub is_valid: bool,
164 pub warnings: Vec<String>,
166 pub citation_coverage: f32,
168 pub citations: Vec<Citation>,
170}
171
172const HALLUCINATION_PHRASES: &[&str] = &[
174 "from my knowledge",
175 "i recall that",
176 "as far as i know",
177 "i believe that",
178 "in my experience",
179 "typically",
180 "generally speaking",
181 "it's commonly known",
182 "as everyone knows",
183 "i think that",
184 "probably",
185 "most likely",
186 "i assume",
187 "based on my understanding",
188 "from what i've learned",
189];
190
191pub fn extract_citations(response: &str, context: &AssembledContext) -> Vec<Citation> {
195 let mut citations = Vec::new();
196 let response_lower = response.to_lowercase();
197 let response_words: Vec<&str> = response_lower.split_whitespace().collect();
198
199 if response_words.len() < 5 {
200 return citations;
201 }
202
203 for chunk in &context.chunks {
204 let chunk_lower = chunk.text.to_lowercase();
205 let chunk_words: Vec<&str> = chunk_lower.split_whitespace().collect();
206
207 if chunk_words.len() < 5 {
208 continue;
209 }
210
211 let mut overlap_count = 0;
213 let mut matched_positions: Vec<usize> = Vec::new();
214
215 for i in 0..=response_words.len().saturating_sub(5) {
216 let response_ngram: Vec<&str> = response_words[i..i + 5].to_vec();
217
218 for j in 0..=chunk_words.len().saturating_sub(5) {
219 let chunk_ngram: Vec<&str> = chunk_words[j..j + 5].to_vec();
220
221 if response_ngram == chunk_ngram {
222 overlap_count += 1;
223 matched_positions.push(i);
224 break;
225 }
226 }
227 }
228
229 if overlap_count > 0 {
230 let max_ngrams = (response_words.len().saturating_sub(4)).max(1);
232 let confidence = (overlap_count as f32) / (max_ngrams as f32);
233
234 let start_pos = matched_positions.first().copied().unwrap_or(0);
236 let end_pos = matched_positions.last().copied().unwrap_or(0) + 5;
237
238 let mut byte_start = 0;
240 let mut byte_end = response.len();
241
242 let mut word_idx = 0;
243 for (i, c) in response.char_indices() {
244 if c.is_whitespace() {
245 word_idx += 1;
246 if word_idx == start_pos {
247 byte_start = i + 1;
248 }
249 if word_idx == end_pos.min(response_words.len()) {
250 byte_end = i;
251 break;
252 }
253 }
254 }
255
256 citations.push(Citation {
257 chunk_id: chunk.chunk_id,
258 span: (byte_start, byte_end),
259 confidence,
260 });
261 }
262 }
263
264 citations.sort_by(|a, b| {
266 b.confidence
267 .partial_cmp(&a.confidence)
268 .unwrap_or(std::cmp::Ordering::Equal)
269 });
270
271 citations
272}
273
274pub fn validate_response(response: &str, context: &AssembledContext) -> ValidationResult {
278 let mut warnings = Vec::new();
279
280 let citations = extract_citations(response, context);
282
283 let total_response_len = response.len() as f32;
285 let mut spans: Vec<(usize, usize)> = citations.iter().map(|c| c.span).collect();
286 spans.sort_by_key(|s| s.0);
287 let mut merged: Vec<(usize, usize)> = Vec::new();
288 for span in &spans {
289 if let Some(last) = merged.last_mut() {
290 if span.0 <= last.1 {
291 last.1 = last.1.max(span.1);
292 continue;
293 }
294 }
295 merged.push(*span);
296 }
297 let covered_bytes: usize = merged.iter().map(|(a, b)| b.saturating_sub(*a)).sum();
298
299 let citation_coverage = if total_response_len > 0.0 {
300 (covered_bytes as f32 / total_response_len).min(1.0)
301 } else {
302 0.0
303 };
304
305 let response_lower = response.to_lowercase();
307 for phrase in HALLUCINATION_PHRASES {
308 if response_lower.contains(phrase) {
309 warnings.push(format!(
310 "Response contains hallucination indicator: '{phrase}'"
311 ));
312 }
313 }
314
315 if citation_coverage < 0.3 && !response.is_empty() {
317 warnings.push(format!(
318 "Low citation coverage: {:.1}% (threshold: 30%)",
319 citation_coverage * 100.0
320 ));
321 }
322
323 let good_phrases = [
325 "information is missing",
326 "not found in the context",
327 "cannot find",
328 ];
329 let claims_missing = good_phrases.iter().any(|p| response_lower.contains(p));
330
331 let is_valid = warnings.is_empty() || claims_missing;
333
334 ValidationResult {
335 is_valid,
336 warnings,
337 citation_coverage,
338 citations,
339 }
340}
341
342#[async_trait::async_trait]
347pub trait IntelligenceEngine: Send + Sync {
348 async fn generate(&self, context: &AssembledContext, query: &str) -> Result<String>;
351}
352
353pub struct OllamaGenerator {
355 base_url: String,
356 model: String,
357}
358
359impl OllamaGenerator {
360 pub fn new(base_url: String, model: String) -> Self {
362 Self { base_url, model }
363 }
364}
365
366#[async_trait::async_trait]
367impl IntelligenceEngine for OllamaGenerator {
368 async fn generate(&self, context: &AssembledContext, query: &str) -> Result<String> {
369 let formatted_context = ContextAssembler::format(context);
370
371 let client = reqwest::Client::builder()
372 .timeout(std::time::Duration::from_secs(120))
373 .build()
374 .map_err(|e| CPError::Inference(format!("Failed to create HTTP client: {e}")))?;
375
376 let payload = serde_json::json!({
378 "model": self.model,
379 "stream": false,
380 "messages": [
381 {
382 "role": "system",
383 "content": "You are a document summarization assistant. The user provides retrieved documents and a question. Summarize the relevant information from the documents to answer the question. Always answer based on the documents provided. Be direct and cite document names."
384 },
385 {
386 "role": "user",
387 "content": format!("Here are the retrieved documents:\n\n{formatted_context}\n\nBased on these documents, {query}")
388 }
389 ]
390 });
391
392 let url = format!("{}/api/chat", self.base_url);
393 let res = client
394 .post(&url)
395 .json(&payload)
396 .send()
397 .await
398 .map_err(|e| CPError::Inference(format!("Ollama request failed: {e}")))?;
399
400 let json: serde_json::Value = res
401 .json()
402 .await
403 .map_err(|e| CPError::Parse(e.to_string()))?;
404
405 let answer = json["message"]["content"]
406 .as_str()
407 .ok_or_else(|| CPError::Parse("Invalid Ollama chat response".into()))?
408 .to_string();
409
410 Ok(answer)
411 }
412}
413
414pub struct QueryEngine {
416 graph: Arc<Mutex<cp_graph::GraphStore>>,
417 embedder: Arc<cp_embeddings::EmbeddingEngine>,
418 intelligence: Option<Box<dyn IntelligenceEngine>>,
419 cache: QueryCache,
421 context_budget: usize,
423}
424
425impl QueryEngine {
426 pub fn new(
428 graph: Arc<Mutex<cp_graph::GraphStore>>,
429 embedder: Arc<cp_embeddings::EmbeddingEngine>,
430 ) -> Self {
431 Self {
432 graph,
433 embedder,
434 intelligence: None,
435 cache: QueryCache::default(),
436 context_budget: 2000,
437 }
438 }
439
440 pub fn with_cache_capacity(
442 graph: Arc<Mutex<cp_graph::GraphStore>>,
443 embedder: Arc<cp_embeddings::EmbeddingEngine>,
444 cache_capacity: usize,
445 ) -> Self {
446 Self {
447 graph,
448 embedder,
449 intelligence: None,
450 cache: QueryCache::new(cache_capacity),
451 context_budget: 2000,
452 }
453 }
454
455 pub fn with_context_budget(mut self, budget: usize) -> Self {
457 self.context_budget = budget;
458 self
459 }
460
461 pub fn with_intelligence(mut self, intelligence: Box<dyn IntelligenceEngine>) -> Self {
463 self.intelligence = Some(intelligence);
464 self
465 }
466
467 pub fn set_intelligence(&mut self, intelligence: Box<dyn IntelligenceEngine>) {
469 self.intelligence = Some(intelligence);
470 }
471
472 pub fn search(&self, query: &str, k: usize) -> Result<Vec<SearchResult>> {
477 info!("Hybrid search for: '{}'", query);
478
479 let query_vec = self
481 .embedder
482 .embed_query(query)
483 .map_err(|e| CPError::Embedding(format!("Failed to embed query: {e}")))?;
484
485 let semantic_results = {
486 let graph = self.graph.lock().expect("graph lock poisoned");
487 graph.search(&query_vec, k)?
488 };
489
490 let lexical_results = {
492 let graph = self.graph.lock().expect("graph lock poisoned");
493 let fts_query = sanitize_fts5_query(query);
495 graph.search_lexical(&fts_query, k).unwrap_or_else(|e| {
496 warn!(
497 "Lexical search failed: {}. Falling back to semantic only.",
498 e
499 );
500 Vec::new()
501 })
502 };
503
504 const RRF_K: u64 = 60;
508 const RRF_SCALE: u64 = 1_000_000;
509
510 let mut scores: HashMap<Uuid, u64> = HashMap::new();
512
513 {
514 let graph = self.graph.lock().expect("graph lock poisoned");
515
516 for (i, (emb_id, _)) in semantic_results.iter().enumerate() {
517 if let Ok(Some(chunk_id)) = graph.get_chunk_id_for_embedding(*emb_id) {
518 let score = RRF_SCALE / (RRF_K + i as u64 + 1);
520 *scores.entry(chunk_id).or_insert(0) += score;
521 }
522 }
523
524 for (i, (chunk_id, _)) in lexical_results.iter().enumerate() {
525 let score = RRF_SCALE / (RRF_K + i as u64 + 1);
526 *scores.entry(*chunk_id).or_insert(0) += score;
527 }
528 }
529
530 let mut fused: Vec<(Uuid, u64)> = scores.into_iter().collect();
532 fused.sort_by(|a, b| {
533 b.1.cmp(&a.1) .then_with(|| a.0.cmp(&b.0)) });
536 fused.truncate(k);
537
538 let mut search_results = Vec::with_capacity(fused.len());
540 let graph = self.graph.lock().expect("graph lock poisoned");
541
542 for (chunk_id, fused_score) in fused {
543 let Some(chunk) = graph.get_chunk(chunk_id)? else {
544 continue;
545 };
546
547 let Some(doc) = graph.get_document(chunk.doc_id)? else {
548 continue;
549 };
550
551 let normalized_score = fused_score as f32 / (RRF_SCALE * 2) as f32;
553
554 search_results.push(SearchResult {
555 chunk,
556 score: normalized_score,
557 doc_path: doc.path.to_string_lossy().to_string(),
558 });
559 }
560
561 Ok(search_results)
562 }
563
564 pub fn search_semantic(&self, query: &str, k: usize) -> Result<Vec<SearchResult>> {
566 info!("Semantic search for: '{}'", query);
567
568 let query_vec = self
569 .embedder
570 .embed_query(query)
571 .map_err(|e| CPError::Embedding(format!("Failed to embed query: {e}")))?;
572
573 let raw_results = {
574 let graph = self.graph.lock().expect("graph lock poisoned");
575 graph.search(&query_vec, k)?
576 };
577
578 let mut search_results = Vec::with_capacity(raw_results.len());
579 let graph = self.graph.lock().expect("graph lock poisoned");
580
581 for (emb_id, score) in raw_results {
582 if let Some(chunk_id) = graph.get_chunk_id_for_embedding(emb_id)? {
584 if let Some(chunk) = graph.get_chunk(chunk_id)? {
585 if let Some(doc) = graph.get_document(chunk.doc_id)? {
586 search_results.push(SearchResult {
587 chunk,
588 score,
589 doc_path: doc.path.to_string_lossy().to_string(),
590 });
591 }
592 }
593 }
594 }
595
596 Ok(search_results)
597 }
598
599 pub fn search_lexical(&self, query: &str, k: usize) -> Result<Vec<SearchResult>> {
601 info!("Lexical search for: '{}'", query);
602
603 let raw_results = {
604 let graph = self.graph.lock().expect("graph lock poisoned");
605 let fts_query = sanitize_fts5_query(query);
607 graph.search_lexical(&fts_query, k)?
608 };
609
610 let mut search_results = Vec::with_capacity(raw_results.len());
611 let graph = self.graph.lock().expect("graph lock poisoned");
612
613 for (chunk_id, score) in raw_results {
614 if let Some(chunk) = graph.get_chunk(chunk_id)? {
616 if let Some(doc) = graph.get_document(chunk.doc_id)? {
617 search_results.push(SearchResult {
618 chunk,
619 score,
620 doc_path: doc.path.to_string_lossy().to_string(),
621 });
622 }
623 }
624 }
625
626 Ok(search_results)
627 }
628
629 pub fn search_filtered(
633 &self,
634 query: &str,
635 k: usize,
636 filters: &[Filter],
637 ) -> Result<Vec<SearchResult>> {
638 info!(
639 "Filtered search for: '{}' with {} filters",
640 query,
641 filters.len()
642 );
643
644 let matching_doc_ids: std::collections::HashSet<Uuid> = {
646 let graph = self.graph.lock().expect("graph lock poisoned");
647 let all_docs = graph.get_all_documents()?;
648
649 all_docs
650 .into_iter()
651 .filter(|doc| filters.iter().all(|f| f.matches(doc)))
652 .map(|doc| doc.id)
653 .collect()
654 };
655
656 if matching_doc_ids.is_empty() {
657 info!("No documents match filters");
658 return Ok(Vec::new());
659 }
660
661 let all_results = self.search(query, k * 3)?; let filtered_results: Vec<SearchResult> = all_results
666 .into_iter()
667 .filter(|r| matching_doc_ids.contains(&r.chunk.doc_id))
668 .take(k)
669 .collect();
670
671 info!(
672 "Filtered search returned {} results",
673 filtered_results.len()
674 );
675 Ok(filtered_results)
676 }
677
678 pub fn search_cached(&self, query: &str, k: usize) -> Result<Vec<SearchResult>> {
682 let current_root = {
684 let graph = self.graph.lock().expect("graph lock poisoned");
685 graph.compute_merkle_root()?
686 };
687
688 if !self.cache.is_valid(¤t_root) {
689 self.cache.invalidate(current_root);
690 }
691
692 if let Some(chunk_ids) = self.cache.get(query, k) {
694 info!("Cache hit for query: '{}' (k={})", query, k);
695
696 let graph = self.graph.lock().expect("graph lock poisoned");
697 let mut results = Vec::new();
698
699 for chunk_id in chunk_ids.iter().take(k) {
700 if let Some(chunk) = graph.get_chunk(*chunk_id)? {
701 if let Some(doc) = graph.get_document(chunk.doc_id)? {
702 results.push(SearchResult {
703 chunk,
704 score: 0.0, doc_path: doc.path.to_string_lossy().to_string(),
706 });
707 }
708 }
709 }
710
711 return Ok(results);
712 }
713
714 let results = self.search(query, k)?;
716
717 let chunk_ids: Vec<Uuid> = results.iter().map(|r| r.chunk.id).collect();
719 self.cache.put(query, k, chunk_ids);
720
721 Ok(results)
722 }
723
724 pub fn invalidate_cache(&self) -> Result<()> {
726 let root = {
727 let graph = self.graph.lock().expect("graph lock poisoned");
728 graph.compute_merkle_root()?
729 };
730 self.cache.invalidate(root);
731 Ok(())
732 }
733
734 pub fn get_chunks_for_document(&self, doc_id: Uuid) -> Result<Vec<SearchResult>> {
736 let graph = self.graph.lock().expect("graph lock poisoned");
737
738 let doc = graph
739 .get_document(doc_id)?
740 .ok_or_else(|| CPError::Database(format!("Doc {doc_id} not found")))?;
741
742 let chunks = graph.get_chunks_for_doc(doc_id)?;
743
744 Ok(chunks
745 .into_iter()
746 .map(|c| SearchResult {
747 chunk: c,
748 score: 0.0, doc_path: doc.path.to_string_lossy().to_string(),
750 })
751 .collect())
752 }
753
754 pub fn graph(&self) -> Arc<Mutex<cp_graph::GraphStore>> {
756 self.graph.clone()
757 }
758
759 pub async fn generate_answer(&self, query: &str) -> Result<GenerationResult> {
761 let start = std::time::Instant::now();
762 info!("Generating answer for: '{}'", query);
763
764 let results = self.search(query, 5)?;
766
767 let assembler = ContextAssembler::with_budget(self.context_budget);
769 let scored_chunks: Vec<ScoredChunk> = results
770 .iter()
771 .map(|r| ScoredChunk {
772 chunk: r.chunk.clone(),
773 score: r.score,
774 document_path: r.doc_path.clone(),
775 })
776 .collect();
777
778 let state_root = {
779 let graph = self.graph.lock().expect("graph lock poisoned");
780 graph.compute_merkle_root()?
781 };
782
783 let assembled_context = assembler.assemble(scored_chunks, query, state_root);
784
785 let answer = if let Some(ref engine) = self.intelligence {
787 engine.generate(&assembled_context, query).await?
788 } else {
789 return Err(CPError::NotFound(
790 "Intelligence engine not configured".into(),
791 ));
792 };
793
794 Ok(GenerationResult {
795 answer,
796 context: ContextAssembler::format(&assembled_context),
797 latency_ms: start.elapsed().as_millis() as u64,
798 })
799 }
800
801 pub fn generate_proof_receipt(
807 &self,
808 query: &str,
809 search_results: &[SearchResult],
810 identity: &cp_sync::DeviceIdentity,
811 ) -> Result<cp_core::ProofReceipt> {
812 let query_hash = *blake3::hash(query.as_bytes()).as_bytes();
813
814 let assembler = ContextAssembler::with_budget(self.context_budget * 2);
816 let scored_chunks: Vec<ScoredChunk> = search_results
817 .iter()
818 .map(|r| ScoredChunk {
819 chunk: r.chunk.clone(),
820 score: r.score,
821 document_path: r.doc_path.clone(),
822 })
823 .collect();
824
825 let state_root = {
826 let graph = self.graph.lock().expect("graph lock poisoned");
827 graph.compute_merkle_root()?
828 };
829
830 let assembled = assembler.assemble(scored_chunks, query, state_root);
831 let context_string = ContextAssembler::format(&assembled);
832 let context_hash = *blake3::hash(context_string.as_bytes()).as_bytes();
833
834 let (sorted_chunk_ids, sorted_chunk_hashes, chunk_tree_root) = {
836 let graph = self.graph.lock().expect("graph lock poisoned");
837 let sorted = graph.get_sorted_chunk_hashes()?;
838 let hashes: Vec<[u8; 32]> = sorted.iter().map(|(_, h)| *h).collect();
839 let root = cp_core::proof::compute_chunk_tree_root(&hashes);
840 (sorted, hashes, root)
841 };
842
843 let mut chunk_proofs = Vec::new();
845 let mut sources = Vec::new();
846
847 for result in search_results {
848 let chunk_id_bytes = *result.chunk.id.as_bytes();
849
850 if let Some(idx) = sorted_chunk_ids
852 .iter()
853 .position(|(id, _)| *id == chunk_id_bytes)
854 {
855 let proof = cp_core::proof::build_chunk_proof(
856 chunk_id_bytes,
857 result.chunk.text_hash,
858 idx,
859 &sorted_chunk_hashes,
860 );
861 chunk_proofs.push(proof);
862 }
863
864 sources.push(cp_core::SourceRef {
865 document_path: result.doc_path.clone(),
866 chunk_id: chunk_id_bytes,
867 chunk_text: result.chunk.text.clone(),
868 chunk_sequence: result.chunk.sequence,
869 relevance_score: result.score,
870 });
871 }
872
873 let now = std::time::SystemTime::now()
875 .duration_since(std::time::UNIX_EPOCH)
876 .unwrap_or_default();
877 let secs = now.as_secs();
878 let timestamp = format_unix_timestamp(secs);
879
880 let mut receipt = cp_core::ProofReceipt {
882 version: 1,
883 query: query.to_string(),
884 query_hash,
885 timestamp,
886 context_hash,
887 state_root,
888 chunk_tree_root,
889 chunk_proofs,
890 sources,
891 signature: [0u8; 64],
892 signer_public_key: identity.public_key,
893 device_id: identity.device_id,
894 };
895
896 let sig = identity.sign(&receipt.signing_bytes());
897 receipt.signature = sig;
898
899 Ok(receipt)
900 }
901
902 pub async fn chat(&self, query: &str, history: &[Message]) -> Result<String> {
907 let _start = std::time::Instant::now();
908
909 let results = self.search(query, 5)?;
911
912 let assembler = ContextAssembler::with_budget(2000);
914 let scored_chunks: Vec<ScoredChunk> = results
915 .iter()
916 .map(|r| ScoredChunk {
917 chunk: r.chunk.clone(),
918 score: r.score,
919 document_path: r.doc_path.clone(),
920 })
921 .collect();
922
923 let state_root = {
924 let graph = self.graph.lock().expect("graph lock poisoned");
925 graph.compute_merkle_root()?
926 };
927 let assembled_context = assembler.assemble(scored_chunks, query, state_root);
928
929 let mut full_prompt = String::new();
931 for msg in history {
932 let role = match msg.role {
933 Role::User => "User",
934 Role::Assistant => "Assistant",
935 Role::System => "System",
936 };
937 let _ = writeln!(full_prompt, "{}: {}", role, msg.content);
938 }
939 let _ = writeln!(full_prompt, "User: {query}");
940
941 let answer = if let Some(ref engine) = self.intelligence {
943 engine.generate(&assembled_context, &full_prompt).await?
944 } else {
945 return Err(CPError::NotFound(
946 "Intelligence engine not configured".into(),
947 ));
948 };
949
950 Ok(answer)
951 }
952}
953
954#[derive(Debug, Clone)]
956pub struct Message {
957 pub role: Role,
958 pub content: String,
959}
960
961#[derive(Debug, Clone, Copy, PartialEq, Eq)]
963pub enum Role {
964 User,
965 Assistant,
966 System,
967}
968
969pub struct VerifiedRemoteResult {
975 pub result: RemoteSearchResult,
976 pub weight: f64,
978 pub peer_node_id: [u8; 16],
979 pub peer_state_root: [u8; 32],
980 pub peer_signature: [u8; 64],
981}
982
983pub fn merge_results(
990 local: &[SearchResult],
991 remote: &[VerifiedRemoteResult],
992 max_results: usize,
993) -> Vec<MergedSearchResult> {
994 const K: f64 = 60.0;
995 let cap = max_results.min(MAX_RESULTS as usize);
996
997 let mut scores: HashMap<[u8; 16], (f64, MergedSearchResult)> = HashMap::new();
999
1000 for (rank, result) in local.iter().enumerate() {
1002 let rrf = 1.0 / (K + rank as f64 + 1.0);
1003 let chunk_id = *result.chunk.id.as_bytes();
1004
1005 let entry = scores.entry(chunk_id).or_insert_with(|| {
1006 (
1007 0.0,
1008 MergedSearchResult {
1009 chunk_id,
1010 chunk_text: result.chunk.text.clone(),
1011 document_path: result.doc_path.clone(),
1012 score: 0.0,
1013 source: ResultSource::Local,
1014 merkle_proof: None,
1015 peer_state_root: None,
1016 peer_signature: None,
1017 },
1018 )
1019 });
1020 entry.0 += rrf;
1021 }
1022
1023 for (rank, verified) in remote.iter().enumerate() {
1025 let rrf = verified.weight / (K + rank as f64 + 1.0);
1026 let chunk_id = verified.result.chunk_id;
1027
1028 if let Some(entry) = scores.get_mut(&chunk_id) {
1029 entry.0 += rrf;
1031 entry.1.source = ResultSource::Both {
1032 peer_node_id: verified.peer_node_id,
1033 };
1034 if entry.1.peer_state_root.is_none() {
1036 entry.1.peer_state_root = Some(verified.peer_state_root);
1037 entry.1.peer_signature = Some(verified.peer_signature);
1038 entry
1039 .1
1040 .merkle_proof
1041 .clone_from(&verified.result.merkle_proof);
1042 }
1043 } else {
1044 scores.insert(
1045 chunk_id,
1046 (
1047 rrf,
1048 MergedSearchResult {
1049 chunk_id,
1050 chunk_text: verified.result.chunk_text.clone(),
1051 document_path: verified.result.document_path.clone(),
1052 score: 0.0,
1053 source: ResultSource::Remote {
1054 peer_node_id: verified.peer_node_id,
1055 },
1056 merkle_proof: verified.result.merkle_proof.clone(),
1057 peer_state_root: Some(verified.peer_state_root),
1058 peer_signature: Some(verified.peer_signature),
1059 },
1060 ),
1061 );
1062 }
1063 }
1064
1065 let mut merged: Vec<(f64, MergedSearchResult)> = scores.into_values().collect();
1067 merged.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
1068 merged.truncate(cap);
1069
1070 merged
1072 .into_iter()
1073 .map(|(score, mut result)| {
1074 result.score = score;
1075 result
1076 })
1077 .collect()
1078}
1079
1080pub fn verify_and_extract(
1085 response: &SearchResponse,
1086 peer_public_key: &[u8; 32],
1087 peer_node_id: [u8; 16],
1088 peer_rating: Option<f64>,
1089) -> Option<Vec<VerifiedRemoteResult>> {
1090 if response.status != SearchStatus::Ok {
1091 return None;
1092 }
1093
1094 if cp_tor::verify_response(response, peer_public_key).is_err() {
1096 warn!(
1097 "Invalid response signature from peer {}",
1098 hex::encode(&peer_node_id[..4])
1099 );
1100 return None;
1101 }
1102
1103 let weight = peer_rating.map_or(1.0, |r| r.clamp(0.3, 1.0));
1104
1105 let results = response
1106 .results
1107 .iter()
1108 .map(|r| VerifiedRemoteResult {
1109 result: r.clone(),
1110 weight,
1111 peer_node_id,
1112 peer_state_root: response.peer_state_root,
1113 peer_signature: response.signature,
1114 })
1115 .collect();
1116
1117 Some(results)
1118}
1119
1120impl QueryEngine {
1121 pub fn live_search(
1128 &self,
1129 query: &str,
1130 remote_results: &[VerifiedRemoteResult],
1131 max_results: usize,
1132 ) -> Result<Vec<MergedSearchResult>> {
1133 info!(
1134 "Live search for '{}' with {} remote result(s)",
1135 query,
1136 remote_results.len()
1137 );
1138
1139 let local_results = self.search(query, max_results)?;
1141
1142 let merged = merge_results(&local_results, remote_results, max_results);
1144
1145 info!(
1146 "Live search returned {} merged results ({} local, {} remote)",
1147 merged.len(),
1148 local_results.len(),
1149 remote_results.len()
1150 );
1151
1152 Ok(merged)
1153 }
1154}
1155
1156fn sanitize_fts5_query(query: &str) -> String {
1162 let sanitized: String = query
1163 .chars()
1164 .filter(|c| !matches!(c, '"' | '*' | '^' | '+' | '-' | '(' | ')' | '{' | '}' | ':'))
1165 .collect();
1166 let trimmed = sanitized.trim();
1167 if trimmed.is_empty() {
1168 String::new()
1170 } else {
1171 format!("\"{trimmed}\"")
1172 }
1173}
1174
1175fn format_unix_timestamp(secs: u64) -> String {
1176 let days_since_epoch = secs / 86400;
1177 let time_of_day = secs % 86400;
1178 let hours = time_of_day / 3600;
1179 let minutes = (time_of_day % 3600) / 60;
1180 let seconds = time_of_day % 60;
1181 let (year, month, day) = days_to_date(days_since_epoch);
1182 format!("{year:04}-{month:02}-{day:02}T{hours:02}:{minutes:02}:{seconds:02}Z")
1183}
1184
1185fn days_to_date(days: u64) -> (u64, u64, u64) {
1186 let z = days + 719468;
1187 let era = z / 146097;
1188 let doe = z - era * 146097;
1189 let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
1190 let y = yoe + era * 400;
1191 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
1192 let mp = (5 * doy + 2) / 153;
1193 let d = doy - (153 * mp + 2) / 5 + 1;
1194 let m = if mp < 10 { mp + 3 } else { mp - 9 };
1195 let y = if m <= 2 { y + 1 } else { y };
1196 (y, m, d)
1197}
1198
1199#[cfg(test)]
1200mod tests {
1201 use super::*;
1202 use cp_core::{Chunk, Document};
1203 use std::sync::{Arc, Mutex};
1204 use tempfile::TempDir;
1205
1206 #[tokio::test]
1207 async fn test_get_chunks_for_document() {
1208 let temp = TempDir::new().unwrap();
1209 let db_path = temp.path().join("test.db");
1210 let mut graph = cp_graph::GraphStore::open(db_path.to_str().unwrap()).unwrap();
1211
1212 let doc = Document::new("test.md".into(), b"Hello world", 100);
1213 graph.insert_document(&doc).unwrap();
1214
1215 let chunk = Chunk {
1216 id: Uuid::new_v4(),
1217 doc_id: doc.id,
1218 text: "Hello world".to_string(),
1219 byte_offset: 0,
1220 byte_length: 11,
1221 sequence: 0,
1222 text_hash: [0; 32],
1223 };
1224 graph.insert_chunk(&chunk).unwrap();
1225
1226 let embedder = Arc::new(cp_embeddings::EmbeddingEngine::new().unwrap());
1227 let qe = QueryEngine::new(Arc::new(Mutex::new(graph)), embedder);
1228
1229 let results = qe.get_chunks_for_document(doc.id).unwrap();
1230 assert_eq!(results.len(), 1);
1231 assert_eq!(results[0].chunk.text, "Hello world");
1232 }
1233
1234 #[tokio::test]
1235 async fn test_hybrid_search() {
1236 let temp = TempDir::new().unwrap();
1237 let db_path = temp.path().join("test_hybrid.db");
1238 let mut graph = cp_graph::GraphStore::open(db_path.to_str().unwrap()).unwrap();
1239
1240 let doc = Document::new(
1241 "test.md".into(),
1242 b"The quick brown fox jumps over the lazy dog",
1243 100,
1244 );
1245 graph.insert_document(&doc).unwrap();
1246
1247 let chunk = Chunk {
1248 id: Uuid::new_v4(),
1249 doc_id: doc.id,
1250 text: "The quick brown fox jumps over the lazy dog".to_string(),
1251 byte_offset: 0,
1252 byte_length: 43,
1253 sequence: 0,
1254 text_hash: [0; 32],
1255 };
1256 graph.insert_chunk(&chunk).unwrap();
1257
1258 let embedder = Arc::new(cp_embeddings::EmbeddingEngine::new().unwrap());
1260 let vec = embedder.embed(&chunk.text).unwrap();
1261 let emb = cp_core::Embedding::new(chunk.id, &vec, embedder.model_hash().unwrap(), 0);
1262 graph.insert_embedding(&emb).unwrap();
1263
1264 let qe = QueryEngine::new(Arc::new(Mutex::new(graph)), embedder);
1265
1266 let results = qe.search("quick brown fox", 5).unwrap();
1268 assert!(!results.is_empty());
1269 assert!(results[0].chunk.text.contains("quick brown fox"));
1270
1271 let results_sem = qe.search("fast auburn canine", 5).unwrap();
1273 assert!(!results_sem.is_empty());
1274 assert!(results_sem[0].chunk.text.contains("quick brown fox"));
1275 }
1276
1277 #[tokio::test]
1278 async fn test_search_comparison_proof() {
1279 let temp = TempDir::new().unwrap();
1280 let db_path = temp.path().join("comparison_proof.db");
1281 let mut graph = cp_graph::GraphStore::open(db_path.to_str().unwrap()).unwrap();
1282
1283 let t1 = "The quick brown fox jumps over the lazy dog"; let t2 = "Artificial intelligence is transforming the modern world"; let t3 = "A fast auburn canine leaps across an idle hound"; let texts = [t1, t2, t3];
1289 for (i, text) in texts.iter().enumerate() {
1290 let doc = Document::new(format!("doc_{i}.md").into(), text.as_bytes(), 100);
1291 graph.insert_document(&doc).unwrap();
1292 let chunk = Chunk {
1293 id: Uuid::new_v4(),
1294 doc_id: doc.id,
1295 text: text.to_string(),
1296 byte_offset: 0,
1297 byte_length: text.len() as u64,
1298 sequence: 0,
1299 text_hash: [0; 32],
1300 };
1301 graph.insert_chunk(&chunk).unwrap();
1302 let embedder = Arc::new(cp_embeddings::EmbeddingEngine::new().unwrap());
1303 let vec = embedder.embed(text).unwrap();
1304 let emb = cp_core::Embedding::new(chunk.id, &vec, embedder.model_hash().unwrap(), 0);
1305 graph.insert_embedding(&emb).unwrap();
1306 }
1307
1308 let embedder = Arc::new(cp_embeddings::EmbeddingEngine::new().unwrap());
1309 let qe = QueryEngine::new(Arc::new(Mutex::new(graph)), embedder);
1310
1311 let query = "fox";
1313
1314 let lexical = {
1316 let graph_lock = qe.graph();
1317 let g = graph_lock.lock().expect("graph lock poisoned");
1318 g.search_lexical(query, 5).unwrap()
1319 };
1320 assert_eq!(lexical.len(), 1);
1322
1323 let vector = {
1325 let graph_lock = qe.graph();
1326 let g = graph_lock.lock().expect("graph lock poisoned");
1327 let e = cp_embeddings::EmbeddingEngine::new().unwrap();
1328 let q_vec = e.embed_query(query).unwrap();
1329 g.search(&q_vec, 5).unwrap()
1330 };
1331 assert!(vector.len() >= 2);
1333
1334 let hybrid = qe.search(query, 5).unwrap();
1336
1337 println!("\n--- SEARCH PROOF FOR '{query}' ---");
1338 println!("LEXICAL HITS: {}", lexical.len());
1339 println!("VECTOR HITS: {}", vector.len());
1340 println!("HYBRID HITS: {}", hybrid.len());
1341
1342 let texts_found: Vec<String> = hybrid.iter().map(|r| r.chunk.text.clone()).collect();
1344 assert!(texts_found.contains(&t1.to_string())); assert!(texts_found.contains(&t3.to_string())); }
1347
1348 #[test]
1349 fn test_filter_by_mime_type() {
1350 use cp_core::Document;
1351 use std::path::PathBuf;
1352
1353 let doc_md = Document::new(PathBuf::from("test.md"), b"content", 1000);
1354 let doc_pdf = Document::new(PathBuf::from("test.pdf"), b"pdf content", 1000);
1355
1356 let filter = Filter::MimeType("text/markdown".to_string());
1357
1358 assert!(filter.matches(&doc_md));
1359 assert!(!filter.matches(&doc_pdf));
1360 }
1361
1362 #[test]
1363 fn test_filter_by_path_glob() {
1364 use cp_core::Document;
1365 use std::path::PathBuf;
1366
1367 let doc1 = Document::new(PathBuf::from("docs/readme.md"), b"content", 1000);
1368 let doc2 = Document::new(PathBuf::from("src/main.rs"), b"code", 1000);
1369
1370 let filter = Filter::DocumentPath("docs/*.md".to_string());
1371
1372 assert!(filter.matches(&doc1));
1373 assert!(!filter.matches(&doc2));
1374 }
1375
1376 #[test]
1377 fn test_filter_by_modified_time() {
1378 use cp_core::Document;
1379 use std::path::PathBuf;
1380
1381 let old_doc = Document::new(PathBuf::from("old.md"), b"content", 1000);
1382 let new_doc = Document::new(PathBuf::from("new.md"), b"content", 2000);
1383
1384 let filter_after = Filter::ModifiedAfter(1500);
1385 let filter_before = Filter::ModifiedBefore(1500);
1386
1387 assert!(!filter_after.matches(&old_doc));
1388 assert!(filter_after.matches(&new_doc));
1389
1390 assert!(filter_before.matches(&old_doc));
1391 assert!(!filter_before.matches(&new_doc));
1392 }
1393
1394 #[test]
1395 fn test_citation_extraction() {
1396 use cp_core::{ContextChunk, ContextMetadata};
1397
1398 let context = AssembledContext {
1399 chunks: vec![ContextChunk {
1400 chunk_id: Uuid::new_v4(),
1401 document_path: "test.md".to_string(),
1402 text: "The quick brown fox jumps over the lazy dog".to_string(),
1403 score: 1.0,
1404 sequence: 0,
1405 }],
1406 total_tokens: 10,
1407 truncated: false,
1408 metadata: ContextMetadata {
1409 query_hash: [0u8; 32],
1410 state_root: [0u8; 32],
1411 },
1412 };
1413
1414 let response = "As mentioned, the quick brown fox jumps over the lazy dog in the story.";
1416 let citations = extract_citations(response, &context);
1417
1418 assert!(
1419 !citations.is_empty(),
1420 "Should find citations for overlapping text"
1421 );
1422 assert!(citations[0].confidence > 0.0);
1423 }
1424
1425 #[test]
1426 fn test_hallucination_detection() {
1427 use cp_core::{ContextChunk, ContextMetadata};
1428
1429 let context = AssembledContext {
1430 chunks: vec![ContextChunk {
1431 chunk_id: Uuid::new_v4(),
1432 document_path: "test.md".to_string(),
1433 text: "The capital of France is Paris".to_string(),
1434 score: 1.0,
1435 sequence: 0,
1436 }],
1437 total_tokens: 10,
1438 truncated: false,
1439 metadata: ContextMetadata {
1440 query_hash: [0u8; 32],
1441 state_root: [0u8; 32],
1442 },
1443 };
1444
1445 let bad_response = "From my knowledge, I believe that Paris is a beautiful city.";
1447 let result = validate_response(bad_response, &context);
1448
1449 assert!(
1450 !result.warnings.is_empty(),
1451 "Should detect hallucination phrases"
1452 );
1453 assert!(result.warnings.iter().any(|w| w.contains("hallucination")));
1454
1455 let good_response = "Information is missing from the substrate.";
1457 let result2 = validate_response(good_response, &context);
1458
1459 assert!(
1460 result2.is_valid,
1461 "Should be valid when admitting missing info"
1462 );
1463 }
1464
1465 #[tokio::test]
1466 async fn test_real_corpus_proof() {
1467 let temp = TempDir::new().unwrap();
1468 let db_path = temp.path().join("real_corpus.db");
1469 let mut graph = cp_graph::GraphStore::open(db_path.to_str().unwrap()).unwrap();
1470
1471 let corpus_dir = match std::env::var("CANON_TEST_CORPUS") {
1473 Ok(p) => std::path::PathBuf::from(p),
1474 Err(_) => {
1475 std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../test_corpus")
1476 }
1477 };
1478
1479 if !corpus_dir.exists() {
1481 println!("Skipping test: corpus directory not found at {corpus_dir:?}");
1482 return;
1483 }
1484
1485 let embedder = Arc::new(cp_embeddings::EmbeddingEngine::new().unwrap());
1486
1487 let mut ingested = 0u32;
1489 for entry in std::fs::read_dir(&corpus_dir).unwrap() {
1490 let entry = entry.unwrap();
1491 let path = entry.path();
1492 if path.extension().and_then(|e| e.to_str()) != Some("md") {
1493 continue;
1494 }
1495
1496 let content = std::fs::read_to_string(&path).unwrap();
1497 let doc = Document::new(path.clone(), content.as_bytes(), 0);
1498 graph.insert_document(&doc).unwrap();
1499
1500 let chunk = Chunk {
1501 id: Uuid::new_v4(),
1502 doc_id: doc.id,
1503 text: content.clone(),
1504 byte_offset: 0,
1505 byte_length: content.len() as u64,
1506 sequence: 0,
1507 text_hash: [0; 32],
1508 };
1509 graph.insert_chunk(&chunk).unwrap();
1510
1511 let vec = embedder.embed(&content).unwrap();
1512 let emb = cp_core::Embedding::new(chunk.id, &vec, embedder.model_hash().unwrap(), 0);
1513 graph.insert_embedding(&emb).unwrap();
1514 ingested += 1;
1515 }
1516
1517 assert!(
1518 ingested >= 2,
1519 "Need at least 2 corpus files, found {ingested}"
1520 );
1521
1522 let qe = QueryEngine::new(Arc::new(Mutex::new(graph)), embedder);
1523
1524 let q1 = "public key encryption and digital signatures";
1526 let res1 = qe.search(q1, 3).unwrap();
1527 println!("\n--- QUERY: '{q1}' ---");
1528 for (i, r) in res1.iter().enumerate() {
1529 println!("Rank {}: [Score: {:.4}] {}", i + 1, r.score, r.doc_path);
1530 }
1531 assert!(!res1.is_empty(), "Should return results");
1532 assert!(
1533 res1[0].doc_path.contains("cryptography"),
1534 "Top result should be cryptography.md, got: {}",
1535 res1[0].doc_path
1536 );
1537
1538 let q2 = "qubit superposition entanglement error correction";
1540 let res2 = qe.search(q2, 3).unwrap();
1541 println!("\n--- QUERY: '{q2}' ---");
1542 for (i, r) in res2.iter().enumerate() {
1543 println!("Rank {}: [Score: {:.4}] {}", i + 1, r.score, r.doc_path);
1544 }
1545 assert!(!res2.is_empty(), "Should return results");
1546 assert!(
1547 res2[0].doc_path.contains("quantum"),
1548 "Top result should be quantum_computing.md, got: {}",
1549 res2[0].doc_path
1550 );
1551
1552 let q3 = "BM25 inverted index TF-IDF";
1554 let res3 = qe.search(q3, 5).unwrap();
1555 println!("\n--- QUERY: '{q3}' ---");
1556 for (i, r) in res3.iter().enumerate() {
1557 println!("Rank {}: [Score: {:.4}] {}", i + 1, r.score, r.doc_path);
1558 }
1559 assert!(!res3.is_empty(), "Should return results");
1560 assert!(
1561 res3[0].doc_path.contains("information_retrieval"),
1562 "Top result should be information_retrieval.md, got: {}",
1563 res3[0].doc_path
1564 );
1565 }
1566
1567 #[tokio::test]
1568 async fn test_search_modes() {
1569 let temp = TempDir::new().unwrap();
1570 let db_path = temp.path().join("modes.db");
1571 let mut graph = cp_graph::GraphStore::open(db_path.to_str().unwrap()).unwrap();
1572
1573 let doc = Document::new(
1574 "test.md".into(),
1575 b"The quick brown fox jumps over the lazy dog",
1576 100,
1577 );
1578 graph.insert_document(&doc).unwrap();
1579 let chunk = Chunk {
1580 id: Uuid::new_v4(),
1581 doc_id: doc.id,
1582 text: "The quick brown fox jumps over the lazy dog".to_string(),
1583 byte_offset: 0,
1584 byte_length: 43,
1585 sequence: 0,
1586 text_hash: [0; 32],
1587 };
1588 graph.insert_chunk(&chunk).unwrap();
1589 let embedder = Arc::new(cp_embeddings::EmbeddingEngine::new().unwrap());
1590 let vec = embedder.embed(&chunk.text).unwrap();
1591 let emb = cp_core::Embedding::new(chunk.id, &vec, embedder.model_hash().unwrap(), 0);
1592 graph.insert_embedding(&emb).unwrap();
1593
1594 let qe = QueryEngine::new(Arc::new(Mutex::new(graph)), embedder);
1595
1596 let results = qe.search_semantic("canine", 5).unwrap();
1599 assert!(!results.is_empty());
1600
1601 let results = qe.search_lexical("fox", 5).unwrap();
1604 assert!(!results.is_empty());
1605 assert!(results[0].chunk.text.contains("fox"));
1606 }
1607
1608 fn make_local_result(
1613 chunk_id_byte: u8,
1614 text: &str,
1615 doc_path: &str,
1616 score: f32,
1617 ) -> SearchResult {
1618 SearchResult {
1619 chunk: Chunk {
1620 id: Uuid::from_bytes([chunk_id_byte; 16]),
1621 doc_id: Uuid::from_bytes([0u8; 16]),
1622 text: text.to_string(),
1623 byte_offset: 0,
1624 byte_length: text.len() as u64,
1625 sequence: 0,
1626 text_hash: [0u8; 32],
1627 },
1628 score,
1629 doc_path: doc_path.to_string(),
1630 }
1631 }
1632
1633 fn make_remote_result(
1634 chunk_id_byte: u8,
1635 text: &str,
1636 doc_path: &str,
1637 score: u32,
1638 ) -> RemoteSearchResult {
1639 RemoteSearchResult {
1640 chunk_id: [chunk_id_byte; 16],
1641 chunk_text: text.to_string(),
1642 document_path: doc_path.to_string(),
1643 score,
1644 merkle_proof: None,
1645 }
1646 }
1647
1648 fn make_verified(
1649 result: RemoteSearchResult,
1650 weight: f64,
1651 peer_byte: u8,
1652 ) -> VerifiedRemoteResult {
1653 VerifiedRemoteResult {
1654 result,
1655 weight,
1656 peer_node_id: [peer_byte; 16],
1657 peer_state_root: [0u8; 32],
1658 peer_signature: [0u8; 64],
1659 }
1660 }
1661
1662 #[test]
1663 fn test_merge_local_only() {
1664 let local = vec![
1665 make_local_result(1, "chunk A", "doc_a.md", 0.9),
1666 make_local_result(2, "chunk B", "doc_b.md", 0.8),
1667 ];
1668
1669 let merged = merge_results(&local, &[], 10);
1670 assert_eq!(merged.len(), 2);
1671 assert!(merged[0].score > merged[1].score);
1672 assert!(matches!(merged[0].source, ResultSource::Local));
1673 assert_eq!(merged[0].chunk_text, "chunk A");
1674 }
1675
1676 #[test]
1677 fn test_merge_remote_only() {
1678 let remote = vec![make_verified(
1679 make_remote_result(3, "remote chunk", "remote.md", 16000),
1680 0.8,
1681 99,
1682 )];
1683
1684 let merged = merge_results(&[], &remote, 10);
1685 assert_eq!(merged.len(), 1);
1686 assert!(matches!(merged[0].source, ResultSource::Remote { .. }));
1687 assert_eq!(merged[0].chunk_text, "remote chunk");
1688 assert!(merged[0].peer_state_root.is_some());
1689 }
1690
1691 #[test]
1692 fn test_merge_deduplication() {
1693 let local = vec![make_local_result(1, "shared chunk local", "doc.md", 0.9)];
1695 let remote = vec![make_verified(
1696 make_remote_result(1, "shared chunk remote", "doc.md", 16000),
1697 0.7,
1698 50,
1699 )];
1700
1701 let merged = merge_results(&local, &remote, 10);
1702 assert_eq!(merged.len(), 1, "Duplicate chunk should be merged");
1703 assert!(matches!(merged[0].source, ResultSource::Both { .. }));
1704
1705 let local_only = merge_results(&local, &[], 10);
1707 assert!(
1708 merged[0].score > local_only[0].score,
1709 "Merged score ({}) should exceed local-only score ({})",
1710 merged[0].score,
1711 local_only[0].score
1712 );
1713 }
1714
1715 #[test]
1716 fn test_merge_weight_affects_score() {
1717 let remote_high = vec![make_verified(
1718 make_remote_result(1, "high weight", "doc.md", 16000),
1719 1.0,
1720 10,
1721 )];
1722 let remote_low = vec![make_verified(
1723 make_remote_result(1, "low weight", "doc.md", 16000),
1724 0.3,
1725 20,
1726 )];
1727
1728 let merged_high = merge_results(&[], &remote_high, 10);
1729 let merged_low = merge_results(&[], &remote_low, 10);
1730
1731 assert!(
1732 merged_high[0].score > merged_low[0].score,
1733 "Higher weight ({}) should produce higher score ({}) vs ({})",
1734 1.0,
1735 merged_high[0].score,
1736 merged_low[0].score,
1737 );
1738 }
1739
1740 #[test]
1741 fn test_merge_respects_max_results() {
1742 let local: Vec<SearchResult> = (0..15)
1743 .map(|i| {
1744 make_local_result(
1745 i,
1746 &format!("chunk {i}"),
1747 "doc.md",
1748 0.9 - f32::from(i) * 0.01,
1749 )
1750 })
1751 .collect();
1752 let remote: Vec<VerifiedRemoteResult> = (15..30)
1753 .map(|i| {
1754 make_verified(
1755 make_remote_result(i, &format!("remote {i}"), "remote.md", 10000),
1756 0.5,
1757 99,
1758 )
1759 })
1760 .collect();
1761
1762 let merged = merge_results(&local, &remote, 10);
1763 assert_eq!(merged.len(), 10, "Should cap at max_results");
1764 }
1765
1766 #[test]
1767 fn test_merge_cap_at_20() {
1768 let local: Vec<SearchResult> = (0..25)
1769 .map(|i| make_local_result(i, &format!("c{i}"), "d.md", 0.5))
1770 .collect();
1771
1772 let merged = merge_results(&local, &[], 100);
1774 assert_eq!(merged.len(), 20, "Should cap at MAX_RESULTS (20)");
1775 }
1776
1777 #[test]
1778 fn test_merge_scores_decrease() {
1779 let local = vec![
1780 make_local_result(1, "first", "a.md", 0.9),
1781 make_local_result(2, "second", "b.md", 0.8),
1782 make_local_result(3, "third", "c.md", 0.7),
1783 ];
1784 let remote = vec![
1785 make_verified(make_remote_result(4, "r1", "d.md", 16000), 0.8, 10),
1786 make_verified(make_remote_result(5, "r2", "e.md", 15000), 0.8, 10),
1787 ];
1788
1789 let merged = merge_results(&local, &remote, 20);
1790 for w in merged.windows(2) {
1791 assert!(
1792 w[0].score >= w[1].score,
1793 "Scores should be in descending order: {} >= {}",
1794 w[0].score,
1795 w[1].score
1796 );
1797 }
1798 }
1799
1800 #[test]
1801 fn test_merge_empty_inputs() {
1802 let merged = merge_results(&[], &[], 10);
1803 assert!(merged.is_empty());
1804 }
1805
1806 #[test]
1807 fn test_verify_and_extract_valid() {
1808 let signing_key = ed25519_dalek::SigningKey::from_bytes(&[88u8; 32]);
1810 let public_key = signing_key.verifying_key().to_bytes();
1811
1812 let mut response = SearchResponse {
1813 request_id: [1u8; 16],
1814 status: SearchStatus::Ok,
1815 results: vec![RemoteSearchResult {
1816 chunk_id: [2u8; 16],
1817 chunk_text: "test chunk".to_string(),
1818 document_path: "test.md".to_string(),
1819 score: 16000,
1820 merkle_proof: None,
1821 }],
1822 peer_state_root: [3u8; 32],
1823 search_latency_ms: 50,
1824 timestamp: 1000,
1825 signature: [0u8; 64],
1826 };
1827
1828 let signing_bytes = response.signing_bytes();
1829 response.signature = ed25519_dalek::Signer::sign(&signing_key, &signing_bytes).to_bytes();
1830
1831 let extracted = verify_and_extract(&response, &public_key, [10u8; 16], Some(0.9));
1832 assert!(extracted.is_some());
1833 let results = extracted.unwrap();
1834 assert_eq!(results.len(), 1);
1835 assert!((results[0].weight - 0.9).abs() < 0.001);
1836 }
1837
1838 #[test]
1839 fn test_verify_and_extract_bad_signature() {
1840 let response = SearchResponse {
1841 request_id: [1u8; 16],
1842 status: SearchStatus::Ok,
1843 results: vec![],
1844 peer_state_root: [0u8; 32],
1845 search_latency_ms: 0,
1846 timestamp: 1000,
1847 signature: [0u8; 64], };
1849
1850 let fake_key = [0u8; 32];
1851 let extracted = verify_and_extract(&response, &fake_key, [10u8; 16], None);
1852 assert!(extracted.is_none());
1853 }
1854
1855 #[test]
1856 fn test_verify_and_extract_non_ok_status() {
1857 let response = SearchResponse {
1858 request_id: [1u8; 16],
1859 status: SearchStatus::ModelMismatch,
1860 results: vec![],
1861 peer_state_root: [0u8; 32],
1862 search_latency_ms: 0,
1863 timestamp: 1000,
1864 signature: [0u8; 64],
1865 };
1866
1867 let key = [0u8; 32];
1868 let extracted = verify_and_extract(&response, &key, [10u8; 16], None);
1869 assert!(extracted.is_none(), "Non-Ok status should return None");
1870 }
1871
1872 #[test]
1873 fn test_verify_and_extract_unrated_peer() {
1874 let signing_key = ed25519_dalek::SigningKey::from_bytes(&[77u8; 32]);
1875 let public_key = signing_key.verifying_key().to_bytes();
1876
1877 let mut response = SearchResponse {
1878 request_id: [1u8; 16],
1879 status: SearchStatus::Ok,
1880 results: vec![RemoteSearchResult {
1881 chunk_id: [5u8; 16],
1882 chunk_text: "unrated".to_string(),
1883 document_path: "doc.md".to_string(),
1884 score: 14000,
1885 merkle_proof: None,
1886 }],
1887 peer_state_root: [0u8; 32],
1888 search_latency_ms: 10,
1889 timestamp: 2000,
1890 signature: [0u8; 64],
1891 };
1892
1893 let signing_bytes = response.signing_bytes();
1894 response.signature = ed25519_dalek::Signer::sign(&signing_key, &signing_bytes).to_bytes();
1895
1896 let extracted = verify_and_extract(&response, &public_key, [20u8; 16], None);
1898 assert!(extracted.is_some());
1899 assert!((extracted.unwrap()[0].weight - 0.5).abs() < 0.001);
1900 }
1901
1902 #[test]
1903 fn test_verify_and_extract_clamps_weight() {
1904 let signing_key = ed25519_dalek::SigningKey::from_bytes(&[77u8; 32]);
1905 let public_key = signing_key.verifying_key().to_bytes();
1906
1907 let mut response = SearchResponse {
1908 request_id: [1u8; 16],
1909 status: SearchStatus::Ok,
1910 results: vec![RemoteSearchResult {
1911 chunk_id: [5u8; 16],
1912 chunk_text: "clamped".to_string(),
1913 document_path: "doc.md".to_string(),
1914 score: 14000,
1915 merkle_proof: None,
1916 }],
1917 peer_state_root: [0u8; 32],
1918 search_latency_ms: 10,
1919 timestamp: 2000,
1920 signature: [0u8; 64],
1921 };
1922
1923 let signing_bytes = response.signing_bytes();
1924 response.signature = ed25519_dalek::Signer::sign(&signing_key, &signing_bytes).to_bytes();
1925
1926 let extracted = verify_and_extract(&response, &public_key, [20u8; 16], Some(0.01));
1928 assert!((extracted.unwrap()[0].weight - 0.3).abs() < 0.001);
1929
1930 let mut response2 = response.clone();
1932 let signing_bytes2 = response2.signing_bytes();
1933 response2.signature = ed25519_dalek::Signer::sign(&signing_key, &signing_bytes2).to_bytes();
1934
1935 let extracted2 = verify_and_extract(&response2, &public_key, [20u8; 16], Some(5.0));
1936 assert!((extracted2.unwrap()[0].weight - 1.0).abs() < 0.001);
1937 }
1938}