use cp_core::{AssembledContext, CPError, Chunk, ContextAssembler, Result, ScoredChunk};
use cp_tor::types::{
MergedSearchResult, RemoteSearchResult, ResultSource, SearchResponse, SearchStatus, MAX_RESULTS,
};
use glob::Pattern;
use lru::LruCache;
use std::collections::HashMap;
use std::fmt::Write as _;
use std::num::NonZeroUsize;
use std::sync::{Arc, Mutex, RwLock};
use tracing::{info, warn};
use uuid::Uuid;
#[derive(Debug, Clone)]
pub enum Filter {
DocumentPath(String),
MimeType(String),
ModifiedAfter(i64),
ModifiedBefore(i64),
}
impl Filter {
pub fn matches(&self, doc: &cp_core::Document) -> bool {
match self {
Filter::DocumentPath(pattern) => {
if let Ok(glob) = Pattern::new(pattern) {
glob.matches(doc.path.to_string_lossy().as_ref())
} else {
false
}
}
Filter::MimeType(mime) => doc.mime_type == *mime,
Filter::ModifiedAfter(ts) => doc.mtime > *ts,
Filter::ModifiedBefore(ts) => doc.mtime < *ts,
}
}
}
pub struct QueryCache {
cache: RwLock<LruCache<[u8; 32], Vec<Uuid>>>,
state_root: RwLock<[u8; 32]>,
}
impl QueryCache {
pub fn new(capacity: usize) -> Self {
Self {
cache: RwLock::new(LruCache::new(
NonZeroUsize::new(capacity).unwrap_or(NonZeroUsize::new(100).unwrap()),
)),
state_root: RwLock::new([0u8; 32]),
}
}
pub fn get(&self, query: &str, k: usize) -> Option<Vec<Uuid>> {
let hash = Self::hash_key(query, k);
self.cache.write().ok()?.get(&hash).cloned()
}
pub fn put(&self, query: &str, k: usize, results: Vec<Uuid>) {
let hash = Self::hash_key(query, k);
if let Ok(mut cache) = self.cache.write() {
cache.put(hash, results);
}
}
pub fn is_valid(&self, current_root: &[u8; 32]) -> bool {
if let Ok(root) = self.state_root.read() {
*root == *current_root
} else {
false
}
}
pub fn invalidate(&self, new_root: [u8; 32]) {
if let Ok(mut cache) = self.cache.write() {
cache.clear();
}
if let Ok(mut root) = self.state_root.write() {
*root = new_root;
}
}
fn hash_key(query: &str, k: usize) -> [u8; 32] {
let mut hasher = blake3::Hasher::new();
hasher.update(query.as_bytes());
hasher.update(&k.to_le_bytes());
*hasher.finalize().as_bytes()
}
}
impl Default for QueryCache {
fn default() -> Self {
Self::new(100)
}
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct SearchResult {
pub chunk: Chunk,
pub score: f32,
pub doc_path: String,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct GenerationResult {
pub answer: String,
pub context: String,
pub latency_ms: u64,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct Citation {
pub chunk_id: Uuid,
pub span: (usize, usize),
pub confidence: f32,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct ValidationResult {
pub is_valid: bool,
pub warnings: Vec<String>,
pub citation_coverage: f32,
pub citations: Vec<Citation>,
}
const HALLUCINATION_PHRASES: &[&str] = &[
"from my knowledge",
"i recall that",
"as far as i know",
"i believe that",
"in my experience",
"typically",
"generally speaking",
"it's commonly known",
"as everyone knows",
"i think that",
"probably",
"most likely",
"i assume",
"based on my understanding",
"from what i've learned",
];
pub fn extract_citations(response: &str, context: &AssembledContext) -> Vec<Citation> {
let mut citations = Vec::new();
let response_lower = response.to_lowercase();
let response_words: Vec<&str> = response_lower.split_whitespace().collect();
if response_words.len() < 5 {
return citations;
}
for chunk in &context.chunks {
let chunk_lower = chunk.text.to_lowercase();
let chunk_words: Vec<&str> = chunk_lower.split_whitespace().collect();
if chunk_words.len() < 5 {
continue;
}
let mut overlap_count = 0;
let mut matched_positions: Vec<usize> = Vec::new();
for i in 0..=response_words.len().saturating_sub(5) {
let response_ngram: Vec<&str> = response_words[i..i + 5].to_vec();
for j in 0..=chunk_words.len().saturating_sub(5) {
let chunk_ngram: Vec<&str> = chunk_words[j..j + 5].to_vec();
if response_ngram == chunk_ngram {
overlap_count += 1;
matched_positions.push(i);
break;
}
}
}
if overlap_count > 0 {
let max_ngrams = (response_words.len().saturating_sub(4)).max(1);
let confidence = (overlap_count as f32) / (max_ngrams as f32);
let start_pos = matched_positions.first().copied().unwrap_or(0);
let end_pos = matched_positions.last().copied().unwrap_or(0) + 5;
let mut byte_start = 0;
let mut byte_end = response.len();
let mut word_idx = 0;
for (i, c) in response.char_indices() {
if c.is_whitespace() {
word_idx += 1;
if word_idx == start_pos {
byte_start = i + 1;
}
if word_idx == end_pos.min(response_words.len()) {
byte_end = i;
break;
}
}
}
citations.push(Citation {
chunk_id: chunk.chunk_id,
span: (byte_start, byte_end),
confidence,
});
}
}
citations.sort_by(|a, b| {
b.confidence
.partial_cmp(&a.confidence)
.unwrap_or(std::cmp::Ordering::Equal)
});
citations
}
pub fn validate_response(response: &str, context: &AssembledContext) -> ValidationResult {
let mut warnings = Vec::new();
let citations = extract_citations(response, context);
let total_response_len = response.len() as f32;
let mut spans: Vec<(usize, usize)> = citations.iter().map(|c| c.span).collect();
spans.sort_by_key(|s| s.0);
let mut merged: Vec<(usize, usize)> = Vec::new();
for span in &spans {
if let Some(last) = merged.last_mut() {
if span.0 <= last.1 {
last.1 = last.1.max(span.1);
continue;
}
}
merged.push(*span);
}
let covered_bytes: usize = merged.iter().map(|(a, b)| b.saturating_sub(*a)).sum();
let citation_coverage = if total_response_len > 0.0 {
(covered_bytes as f32 / total_response_len).min(1.0)
} else {
0.0
};
let response_lower = response.to_lowercase();
for phrase in HALLUCINATION_PHRASES {
if response_lower.contains(phrase) {
warnings.push(format!(
"Response contains hallucination indicator: '{phrase}'"
));
}
}
if citation_coverage < 0.3 && !response.is_empty() {
warnings.push(format!(
"Low citation coverage: {:.1}% (threshold: 30%)",
citation_coverage * 100.0
));
}
let good_phrases = [
"information is missing",
"not found in the context",
"cannot find",
];
let claims_missing = good_phrases.iter().any(|p| response_lower.contains(p));
let is_valid = warnings.is_empty() || claims_missing;
ValidationResult {
is_valid,
warnings,
citation_coverage,
citations,
}
}
#[async_trait::async_trait]
pub trait IntelligenceEngine: Send + Sync {
async fn generate(&self, context: &AssembledContext, query: &str) -> Result<String>;
}
pub struct OllamaGenerator {
base_url: String,
model: String,
}
impl OllamaGenerator {
pub fn new(base_url: String, model: String) -> Self {
Self { base_url, model }
}
}
#[async_trait::async_trait]
impl IntelligenceEngine for OllamaGenerator {
async fn generate(&self, context: &AssembledContext, query: &str) -> Result<String> {
let formatted_context = ContextAssembler::format(context);
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(120))
.build()
.map_err(|e| CPError::Inference(format!("Failed to create HTTP client: {e}")))?;
let payload = serde_json::json!({
"model": self.model,
"stream": false,
"messages": [
{
"role": "system",
"content": "You are a document summarization assistant. The user provides retrieved documents and a question. Summarize the relevant information from the documents to answer the question. Always answer based on the documents provided. Be direct and cite document names."
},
{
"role": "user",
"content": format!("Here are the retrieved documents:\n\n{formatted_context}\n\nBased on these documents, {query}")
}
]
});
let url = format!("{}/api/chat", self.base_url);
let res = client
.post(&url)
.json(&payload)
.send()
.await
.map_err(|e| CPError::Inference(format!("Ollama request failed: {e}")))?;
let json: serde_json::Value = res
.json()
.await
.map_err(|e| CPError::Parse(e.to_string()))?;
let answer = json["message"]["content"]
.as_str()
.ok_or_else(|| CPError::Parse("Invalid Ollama chat response".into()))?
.to_string();
Ok(answer)
}
}
pub struct QueryEngine {
graph: Arc<Mutex<cp_graph::GraphStore>>,
embedder: Arc<cp_embeddings::EmbeddingEngine>,
intelligence: Option<Box<dyn IntelligenceEngine>>,
cache: QueryCache,
context_budget: usize,
}
impl QueryEngine {
pub fn new(
graph: Arc<Mutex<cp_graph::GraphStore>>,
embedder: Arc<cp_embeddings::EmbeddingEngine>,
) -> Self {
Self {
graph,
embedder,
intelligence: None,
cache: QueryCache::default(),
context_budget: 2000,
}
}
pub fn with_cache_capacity(
graph: Arc<Mutex<cp_graph::GraphStore>>,
embedder: Arc<cp_embeddings::EmbeddingEngine>,
cache_capacity: usize,
) -> Self {
Self {
graph,
embedder,
intelligence: None,
cache: QueryCache::new(cache_capacity),
context_budget: 2000,
}
}
pub fn with_context_budget(mut self, budget: usize) -> Self {
self.context_budget = budget;
self
}
pub fn with_intelligence(mut self, intelligence: Box<dyn IntelligenceEngine>) -> Self {
self.intelligence = Some(intelligence);
self
}
pub fn set_intelligence(&mut self, intelligence: Box<dyn IntelligenceEngine>) {
self.intelligence = Some(intelligence);
}
pub fn search(&self, query: &str, k: usize) -> Result<Vec<SearchResult>> {
info!("Hybrid search for: '{}'", query);
let query_vec = self
.embedder
.embed_query(query)
.map_err(|e| CPError::Embedding(format!("Failed to embed query: {e}")))?;
let semantic_results = {
let graph = self.graph.lock().expect("graph lock poisoned");
graph.search(&query_vec, k)?
};
let lexical_results = {
let graph = self.graph.lock().expect("graph lock poisoned");
let fts_query = sanitize_fts5_query(query);
graph.search_lexical(&fts_query, k).unwrap_or_else(|e| {
warn!(
"Lexical search failed: {}. Falling back to semantic only.",
e
);
Vec::new()
})
};
const RRF_K: u64 = 60;
const RRF_SCALE: u64 = 1_000_000;
let mut scores: HashMap<Uuid, u64> = HashMap::new();
{
let graph = self.graph.lock().expect("graph lock poisoned");
for (i, (emb_id, _)) in semantic_results.iter().enumerate() {
if let Ok(Some(chunk_id)) = graph.get_chunk_id_for_embedding(*emb_id) {
let score = RRF_SCALE / (RRF_K + i as u64 + 1);
*scores.entry(chunk_id).or_insert(0) += score;
}
}
for (i, (chunk_id, _)) in lexical_results.iter().enumerate() {
let score = RRF_SCALE / (RRF_K + i as u64 + 1);
*scores.entry(*chunk_id).or_insert(0) += score;
}
}
let mut fused: Vec<(Uuid, u64)> = scores.into_iter().collect();
fused.sort_by(|a, b| {
b.1.cmp(&a.1) .then_with(|| a.0.cmp(&b.0)) });
fused.truncate(k);
let mut search_results = Vec::with_capacity(fused.len());
let graph = self.graph.lock().expect("graph lock poisoned");
for (chunk_id, fused_score) in fused {
let Some(chunk) = graph.get_chunk(chunk_id)? else {
continue;
};
let Some(doc) = graph.get_document(chunk.doc_id)? else {
continue;
};
let normalized_score = fused_score as f32 / (RRF_SCALE * 2) as f32;
search_results.push(SearchResult {
chunk,
score: normalized_score,
doc_path: doc.path.to_string_lossy().to_string(),
});
}
Ok(search_results)
}
pub fn search_semantic(&self, query: &str, k: usize) -> Result<Vec<SearchResult>> {
info!("Semantic search for: '{}'", query);
let query_vec = self
.embedder
.embed_query(query)
.map_err(|e| CPError::Embedding(format!("Failed to embed query: {e}")))?;
let raw_results = {
let graph = self.graph.lock().expect("graph lock poisoned");
graph.search(&query_vec, k)?
};
let mut search_results = Vec::with_capacity(raw_results.len());
let graph = self.graph.lock().expect("graph lock poisoned");
for (emb_id, score) in raw_results {
if let Some(chunk_id) = graph.get_chunk_id_for_embedding(emb_id)? {
if let Some(chunk) = graph.get_chunk(chunk_id)? {
if let Some(doc) = graph.get_document(chunk.doc_id)? {
search_results.push(SearchResult {
chunk,
score,
doc_path: doc.path.to_string_lossy().to_string(),
});
}
}
}
}
Ok(search_results)
}
pub fn search_lexical(&self, query: &str, k: usize) -> Result<Vec<SearchResult>> {
info!("Lexical search for: '{}'", query);
let raw_results = {
let graph = self.graph.lock().expect("graph lock poisoned");
let fts_query = sanitize_fts5_query(query);
graph.search_lexical(&fts_query, k)?
};
let mut search_results = Vec::with_capacity(raw_results.len());
let graph = self.graph.lock().expect("graph lock poisoned");
for (chunk_id, score) in raw_results {
if let Some(chunk) = graph.get_chunk(chunk_id)? {
if let Some(doc) = graph.get_document(chunk.doc_id)? {
search_results.push(SearchResult {
chunk,
score,
doc_path: doc.path.to_string_lossy().to_string(),
});
}
}
}
Ok(search_results)
}
pub fn search_filtered(
&self,
query: &str,
k: usize,
filters: &[Filter],
) -> Result<Vec<SearchResult>> {
info!(
"Filtered search for: '{}' with {} filters",
query,
filters.len()
);
let matching_doc_ids: std::collections::HashSet<Uuid> = {
let graph = self.graph.lock().expect("graph lock poisoned");
let all_docs = graph.get_all_documents()?;
all_docs
.into_iter()
.filter(|doc| filters.iter().all(|f| f.matches(doc)))
.map(|doc| doc.id)
.collect()
};
if matching_doc_ids.is_empty() {
info!("No documents match filters");
return Ok(Vec::new());
}
let all_results = self.search(query, k * 3)?;
let filtered_results: Vec<SearchResult> = all_results
.into_iter()
.filter(|r| matching_doc_ids.contains(&r.chunk.doc_id))
.take(k)
.collect();
info!(
"Filtered search returned {} results",
filtered_results.len()
);
Ok(filtered_results)
}
pub fn search_cached(&self, query: &str, k: usize) -> Result<Vec<SearchResult>> {
let current_root = {
let graph = self.graph.lock().expect("graph lock poisoned");
graph.compute_merkle_root()?
};
if !self.cache.is_valid(¤t_root) {
self.cache.invalidate(current_root);
}
if let Some(chunk_ids) = self.cache.get(query, k) {
info!("Cache hit for query: '{}' (k={})", query, k);
let graph = self.graph.lock().expect("graph lock poisoned");
let mut results = Vec::new();
for chunk_id in chunk_ids.iter().take(k) {
if let Some(chunk) = graph.get_chunk(*chunk_id)? {
if let Some(doc) = graph.get_document(chunk.doc_id)? {
results.push(SearchResult {
chunk,
score: 0.0, doc_path: doc.path.to_string_lossy().to_string(),
});
}
}
}
return Ok(results);
}
let results = self.search(query, k)?;
let chunk_ids: Vec<Uuid> = results.iter().map(|r| r.chunk.id).collect();
self.cache.put(query, k, chunk_ids);
Ok(results)
}
pub fn invalidate_cache(&self) -> Result<()> {
let root = {
let graph = self.graph.lock().expect("graph lock poisoned");
graph.compute_merkle_root()?
};
self.cache.invalidate(root);
Ok(())
}
pub fn get_chunks_for_document(&self, doc_id: Uuid) -> Result<Vec<SearchResult>> {
let graph = self.graph.lock().expect("graph lock poisoned");
let doc = graph
.get_document(doc_id)?
.ok_or_else(|| CPError::Database(format!("Doc {doc_id} not found")))?;
let chunks = graph.get_chunks_for_doc(doc_id)?;
Ok(chunks
.into_iter()
.map(|c| SearchResult {
chunk: c,
score: 0.0, doc_path: doc.path.to_string_lossy().to_string(),
})
.collect())
}
pub fn graph(&self) -> Arc<Mutex<cp_graph::GraphStore>> {
self.graph.clone()
}
pub async fn generate_answer(&self, query: &str) -> Result<GenerationResult> {
let start = std::time::Instant::now();
info!("Generating answer for: '{}'", query);
let results = self.search(query, 5)?;
let assembler = ContextAssembler::with_budget(self.context_budget);
let scored_chunks: Vec<ScoredChunk> = results
.iter()
.map(|r| ScoredChunk {
chunk: r.chunk.clone(),
score: r.score,
document_path: r.doc_path.clone(),
})
.collect();
let state_root = {
let graph = self.graph.lock().expect("graph lock poisoned");
graph.compute_merkle_root()?
};
let assembled_context = assembler.assemble(scored_chunks, query, state_root);
let answer = if let Some(ref engine) = self.intelligence {
engine.generate(&assembled_context, query).await?
} else {
return Err(CPError::NotFound(
"Intelligence engine not configured".into(),
));
};
Ok(GenerationResult {
answer,
context: ContextAssembler::format(&assembled_context),
latency_ms: start.elapsed().as_millis() as u64,
})
}
pub fn generate_proof_receipt(
&self,
query: &str,
search_results: &[SearchResult],
identity: &cp_sync::DeviceIdentity,
) -> Result<cp_core::ProofReceipt> {
let query_hash = *blake3::hash(query.as_bytes()).as_bytes();
let assembler = ContextAssembler::with_budget(self.context_budget * 2);
let scored_chunks: Vec<ScoredChunk> = search_results
.iter()
.map(|r| ScoredChunk {
chunk: r.chunk.clone(),
score: r.score,
document_path: r.doc_path.clone(),
})
.collect();
let state_root = {
let graph = self.graph.lock().expect("graph lock poisoned");
graph.compute_merkle_root()?
};
let assembled = assembler.assemble(scored_chunks, query, state_root);
let context_string = ContextAssembler::format(&assembled);
let context_hash = *blake3::hash(context_string.as_bytes()).as_bytes();
let (sorted_chunk_ids, sorted_chunk_hashes, chunk_tree_root) = {
let graph = self.graph.lock().expect("graph lock poisoned");
let sorted = graph.get_sorted_chunk_hashes()?;
let hashes: Vec<[u8; 32]> = sorted.iter().map(|(_, h)| *h).collect();
let root = cp_core::proof::compute_chunk_tree_root(&hashes);
(sorted, hashes, root)
};
let mut chunk_proofs = Vec::new();
let mut sources = Vec::new();
for result in search_results {
let chunk_id_bytes = *result.chunk.id.as_bytes();
if let Some(idx) = sorted_chunk_ids
.iter()
.position(|(id, _)| *id == chunk_id_bytes)
{
let proof = cp_core::proof::build_chunk_proof(
chunk_id_bytes,
result.chunk.text_hash,
idx,
&sorted_chunk_hashes,
);
chunk_proofs.push(proof);
}
sources.push(cp_core::SourceRef {
document_path: result.doc_path.clone(),
chunk_id: chunk_id_bytes,
chunk_text: result.chunk.text.clone(),
chunk_sequence: result.chunk.sequence,
relevance_score: result.score,
});
}
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default();
let secs = now.as_secs();
let timestamp = format_unix_timestamp(secs);
let mut receipt = cp_core::ProofReceipt {
version: 1,
query: query.to_string(),
query_hash,
timestamp,
context_hash,
state_root,
chunk_tree_root,
chunk_proofs,
sources,
signature: [0u8; 64],
signer_public_key: identity.public_key,
device_id: identity.device_id,
};
let sig = identity.sign(&receipt.signing_bytes());
receipt.signature = sig;
Ok(receipt)
}
pub async fn chat(&self, query: &str, history: &[Message]) -> Result<String> {
let _start = std::time::Instant::now();
let results = self.search(query, 5)?;
let assembler = ContextAssembler::with_budget(2000);
let scored_chunks: Vec<ScoredChunk> = results
.iter()
.map(|r| ScoredChunk {
chunk: r.chunk.clone(),
score: r.score,
document_path: r.doc_path.clone(),
})
.collect();
let state_root = {
let graph = self.graph.lock().expect("graph lock poisoned");
graph.compute_merkle_root()?
};
let assembled_context = assembler.assemble(scored_chunks, query, state_root);
let mut full_prompt = String::new();
for msg in history {
let role = match msg.role {
Role::User => "User",
Role::Assistant => "Assistant",
Role::System => "System",
};
let _ = writeln!(full_prompt, "{}: {}", role, msg.content);
}
let _ = writeln!(full_prompt, "User: {query}");
let answer = if let Some(ref engine) = self.intelligence {
engine.generate(&assembled_context, &full_prompt).await?
} else {
return Err(CPError::NotFound(
"Intelligence engine not configured".into(),
));
};
Ok(answer)
}
}
#[derive(Debug, Clone)]
pub struct Message {
pub role: Role,
pub content: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Role {
User,
Assistant,
System,
}
pub struct VerifiedRemoteResult {
pub result: RemoteSearchResult,
pub weight: f64,
pub peer_node_id: [u8; 16],
pub peer_state_root: [u8; 32],
pub peer_signature: [u8; 64],
}
pub fn merge_results(
local: &[SearchResult],
remote: &[VerifiedRemoteResult],
max_results: usize,
) -> Vec<MergedSearchResult> {
const K: f64 = 60.0;
let cap = max_results.min(MAX_RESULTS as usize);
let mut scores: HashMap<[u8; 16], (f64, MergedSearchResult)> = HashMap::new();
for (rank, result) in local.iter().enumerate() {
let rrf = 1.0 / (K + rank as f64 + 1.0);
let chunk_id = *result.chunk.id.as_bytes();
let entry = scores.entry(chunk_id).or_insert_with(|| {
(
0.0,
MergedSearchResult {
chunk_id,
chunk_text: result.chunk.text.clone(),
document_path: result.doc_path.clone(),
score: 0.0,
source: ResultSource::Local,
merkle_proof: None,
peer_state_root: None,
peer_signature: None,
},
)
});
entry.0 += rrf;
}
for (rank, verified) in remote.iter().enumerate() {
let rrf = verified.weight / (K + rank as f64 + 1.0);
let chunk_id = verified.result.chunk_id;
if let Some(entry) = scores.get_mut(&chunk_id) {
entry.0 += rrf;
entry.1.source = ResultSource::Both {
peer_node_id: verified.peer_node_id,
};
if entry.1.peer_state_root.is_none() {
entry.1.peer_state_root = Some(verified.peer_state_root);
entry.1.peer_signature = Some(verified.peer_signature);
entry
.1
.merkle_proof
.clone_from(&verified.result.merkle_proof);
}
} else {
scores.insert(
chunk_id,
(
rrf,
MergedSearchResult {
chunk_id,
chunk_text: verified.result.chunk_text.clone(),
document_path: verified.result.document_path.clone(),
score: 0.0,
source: ResultSource::Remote {
peer_node_id: verified.peer_node_id,
},
merkle_proof: verified.result.merkle_proof.clone(),
peer_state_root: Some(verified.peer_state_root),
peer_signature: Some(verified.peer_signature),
},
),
);
}
}
let mut merged: Vec<(f64, MergedSearchResult)> = scores.into_values().collect();
merged.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
merged.truncate(cap);
merged
.into_iter()
.map(|(score, mut result)| {
result.score = score;
result
})
.collect()
}
pub fn verify_and_extract(
response: &SearchResponse,
peer_public_key: &[u8; 32],
peer_node_id: [u8; 16],
peer_rating: Option<f64>,
) -> Option<Vec<VerifiedRemoteResult>> {
if response.status != SearchStatus::Ok {
return None;
}
if cp_tor::verify_response(response, peer_public_key).is_err() {
warn!(
"Invalid response signature from peer {}",
hex::encode(&peer_node_id[..4])
);
return None;
}
let weight = peer_rating.map_or(1.0, |r| r.clamp(0.3, 1.0));
let results = response
.results
.iter()
.map(|r| VerifiedRemoteResult {
result: r.clone(),
weight,
peer_node_id,
peer_state_root: response.peer_state_root,
peer_signature: response.signature,
})
.collect();
Some(results)
}
impl QueryEngine {
pub fn live_search(
&self,
query: &str,
remote_results: &[VerifiedRemoteResult],
max_results: usize,
) -> Result<Vec<MergedSearchResult>> {
info!(
"Live search for '{}' with {} remote result(s)",
query,
remote_results.len()
);
let local_results = self.search(query, max_results)?;
let merged = merge_results(&local_results, remote_results, max_results);
info!(
"Live search returned {} merged results ({} local, {} remote)",
merged.len(),
local_results.len(),
remote_results.len()
);
Ok(merged)
}
}
fn sanitize_fts5_query(query: &str) -> String {
let sanitized: String = query
.chars()
.filter(|c| !matches!(c, '"' | '*' | '^' | '+' | '-' | '(' | ')' | '{' | '}' | ':'))
.collect();
let trimmed = sanitized.trim();
if trimmed.is_empty() {
String::new()
} else {
format!("\"{trimmed}\"")
}
}
fn format_unix_timestamp(secs: u64) -> String {
let days_since_epoch = secs / 86400;
let time_of_day = secs % 86400;
let hours = time_of_day / 3600;
let minutes = (time_of_day % 3600) / 60;
let seconds = time_of_day % 60;
let (year, month, day) = days_to_date(days_since_epoch);
format!("{year:04}-{month:02}-{day:02}T{hours:02}:{minutes:02}:{seconds:02}Z")
}
fn days_to_date(days: u64) -> (u64, u64, u64) {
let z = days + 719468;
let era = z / 146097;
let doe = z - era * 146097;
let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
let y = yoe + era * 400;
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
let mp = (5 * doy + 2) / 153;
let d = doy - (153 * mp + 2) / 5 + 1;
let m = if mp < 10 { mp + 3 } else { mp - 9 };
let y = if m <= 2 { y + 1 } else { y };
(y, m, d)
}
#[cfg(test)]
mod tests {
use super::*;
use cp_core::{Chunk, Document};
use std::sync::{Arc, Mutex};
use tempfile::TempDir;
#[tokio::test]
async fn test_get_chunks_for_document() {
let temp = TempDir::new().unwrap();
let db_path = temp.path().join("test.db");
let mut graph = cp_graph::GraphStore::open(db_path.to_str().unwrap()).unwrap();
let doc = Document::new("test.md".into(), b"Hello world", 100);
graph.insert_document(&doc).unwrap();
let chunk = Chunk {
id: Uuid::new_v4(),
doc_id: doc.id,
text: "Hello world".to_string(),
byte_offset: 0,
byte_length: 11,
sequence: 0,
text_hash: [0; 32],
};
graph.insert_chunk(&chunk).unwrap();
let embedder = Arc::new(cp_embeddings::EmbeddingEngine::new().unwrap());
let qe = QueryEngine::new(Arc::new(Mutex::new(graph)), embedder);
let results = qe.get_chunks_for_document(doc.id).unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].chunk.text, "Hello world");
}
#[tokio::test]
async fn test_hybrid_search() {
let temp = TempDir::new().unwrap();
let db_path = temp.path().join("test_hybrid.db");
let mut graph = cp_graph::GraphStore::open(db_path.to_str().unwrap()).unwrap();
let doc = Document::new(
"test.md".into(),
b"The quick brown fox jumps over the lazy dog",
100,
);
graph.insert_document(&doc).unwrap();
let chunk = Chunk {
id: Uuid::new_v4(),
doc_id: doc.id,
text: "The quick brown fox jumps over the lazy dog".to_string(),
byte_offset: 0,
byte_length: 43,
sequence: 0,
text_hash: [0; 32],
};
graph.insert_chunk(&chunk).unwrap();
let embedder = Arc::new(cp_embeddings::EmbeddingEngine::new().unwrap());
let vec = embedder.embed(&chunk.text).unwrap();
let emb = cp_core::Embedding::new(chunk.id, &vec, embedder.model_hash().unwrap(), 0);
graph.insert_embedding(&emb).unwrap();
let qe = QueryEngine::new(Arc::new(Mutex::new(graph)), embedder);
let results = qe.search("quick brown fox", 5).unwrap();
assert!(!results.is_empty());
assert!(results[0].chunk.text.contains("quick brown fox"));
let results_sem = qe.search("fast auburn canine", 5).unwrap();
assert!(!results_sem.is_empty());
assert!(results_sem[0].chunk.text.contains("quick brown fox"));
}
#[tokio::test]
async fn test_search_comparison_proof() {
let temp = TempDir::new().unwrap();
let db_path = temp.path().join("comparison_proof.db");
let mut graph = cp_graph::GraphStore::open(db_path.to_str().unwrap()).unwrap();
let t1 = "The quick brown fox jumps over the lazy dog"; let t2 = "Artificial intelligence is transforming the modern world"; let t3 = "A fast auburn canine leaps across an idle hound";
let texts = [t1, t2, t3];
for (i, text) in texts.iter().enumerate() {
let doc = Document::new(format!("doc_{i}.md").into(), text.as_bytes(), 100);
graph.insert_document(&doc).unwrap();
let chunk = Chunk {
id: Uuid::new_v4(),
doc_id: doc.id,
text: text.to_string(),
byte_offset: 0,
byte_length: text.len() as u64,
sequence: 0,
text_hash: [0; 32],
};
graph.insert_chunk(&chunk).unwrap();
let embedder = Arc::new(cp_embeddings::EmbeddingEngine::new().unwrap());
let vec = embedder.embed(text).unwrap();
let emb = cp_core::Embedding::new(chunk.id, &vec, embedder.model_hash().unwrap(), 0);
graph.insert_embedding(&emb).unwrap();
}
let embedder = Arc::new(cp_embeddings::EmbeddingEngine::new().unwrap());
let qe = QueryEngine::new(Arc::new(Mutex::new(graph)), embedder);
let query = "fox";
let lexical = {
let graph_lock = qe.graph();
let g = graph_lock.lock().expect("graph lock poisoned");
g.search_lexical(query, 5).unwrap()
};
assert_eq!(lexical.len(), 1);
let vector = {
let graph_lock = qe.graph();
let g = graph_lock.lock().expect("graph lock poisoned");
let e = cp_embeddings::EmbeddingEngine::new().unwrap();
let q_vec = e.embed_query(query).unwrap();
g.search(&q_vec, 5).unwrap()
};
assert!(vector.len() >= 2);
let hybrid = qe.search(query, 5).unwrap();
println!("\n--- SEARCH PROOF FOR '{query}' ---");
println!("LEXICAL HITS: {}", lexical.len());
println!("VECTOR HITS: {}", vector.len());
println!("HYBRID HITS: {}", hybrid.len());
let texts_found: Vec<String> = hybrid.iter().map(|r| r.chunk.text.clone()).collect();
assert!(texts_found.contains(&t1.to_string())); assert!(texts_found.contains(&t3.to_string())); }
#[test]
fn test_filter_by_mime_type() {
use cp_core::Document;
use std::path::PathBuf;
let doc_md = Document::new(PathBuf::from("test.md"), b"content", 1000);
let doc_pdf = Document::new(PathBuf::from("test.pdf"), b"pdf content", 1000);
let filter = Filter::MimeType("text/markdown".to_string());
assert!(filter.matches(&doc_md));
assert!(!filter.matches(&doc_pdf));
}
#[test]
fn test_filter_by_path_glob() {
use cp_core::Document;
use std::path::PathBuf;
let doc1 = Document::new(PathBuf::from("docs/readme.md"), b"content", 1000);
let doc2 = Document::new(PathBuf::from("src/main.rs"), b"code", 1000);
let filter = Filter::DocumentPath("docs/*.md".to_string());
assert!(filter.matches(&doc1));
assert!(!filter.matches(&doc2));
}
#[test]
fn test_filter_by_modified_time() {
use cp_core::Document;
use std::path::PathBuf;
let old_doc = Document::new(PathBuf::from("old.md"), b"content", 1000);
let new_doc = Document::new(PathBuf::from("new.md"), b"content", 2000);
let filter_after = Filter::ModifiedAfter(1500);
let filter_before = Filter::ModifiedBefore(1500);
assert!(!filter_after.matches(&old_doc));
assert!(filter_after.matches(&new_doc));
assert!(filter_before.matches(&old_doc));
assert!(!filter_before.matches(&new_doc));
}
#[test]
fn test_citation_extraction() {
use cp_core::{ContextChunk, ContextMetadata};
let context = AssembledContext {
chunks: vec![ContextChunk {
chunk_id: Uuid::new_v4(),
document_path: "test.md".to_string(),
text: "The quick brown fox jumps over the lazy dog".to_string(),
score: 1.0,
sequence: 0,
}],
total_tokens: 10,
truncated: false,
metadata: ContextMetadata {
query_hash: [0u8; 32],
state_root: [0u8; 32],
},
};
let response = "As mentioned, the quick brown fox jumps over the lazy dog in the story.";
let citations = extract_citations(response, &context);
assert!(
!citations.is_empty(),
"Should find citations for overlapping text"
);
assert!(citations[0].confidence > 0.0);
}
#[test]
fn test_hallucination_detection() {
use cp_core::{ContextChunk, ContextMetadata};
let context = AssembledContext {
chunks: vec![ContextChunk {
chunk_id: Uuid::new_v4(),
document_path: "test.md".to_string(),
text: "The capital of France is Paris".to_string(),
score: 1.0,
sequence: 0,
}],
total_tokens: 10,
truncated: false,
metadata: ContextMetadata {
query_hash: [0u8; 32],
state_root: [0u8; 32],
},
};
let bad_response = "From my knowledge, I believe that Paris is a beautiful city.";
let result = validate_response(bad_response, &context);
assert!(
!result.warnings.is_empty(),
"Should detect hallucination phrases"
);
assert!(result.warnings.iter().any(|w| w.contains("hallucination")));
let good_response = "Information is missing from the substrate.";
let result2 = validate_response(good_response, &context);
assert!(
result2.is_valid,
"Should be valid when admitting missing info"
);
}
#[tokio::test]
async fn test_real_corpus_proof() {
let temp = TempDir::new().unwrap();
let db_path = temp.path().join("real_corpus.db");
let mut graph = cp_graph::GraphStore::open(db_path.to_str().unwrap()).unwrap();
let corpus_dir = match std::env::var("CANON_TEST_CORPUS") {
Ok(p) => std::path::PathBuf::from(p),
Err(_) => {
std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../test_corpus")
}
};
if !corpus_dir.exists() {
println!("Skipping test: corpus directory not found at {corpus_dir:?}");
return;
}
let embedder = Arc::new(cp_embeddings::EmbeddingEngine::new().unwrap());
let mut ingested = 0u32;
for entry in std::fs::read_dir(&corpus_dir).unwrap() {
let entry = entry.unwrap();
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) != Some("md") {
continue;
}
let content = std::fs::read_to_string(&path).unwrap();
let doc = Document::new(path.clone(), content.as_bytes(), 0);
graph.insert_document(&doc).unwrap();
let chunk = Chunk {
id: Uuid::new_v4(),
doc_id: doc.id,
text: content.clone(),
byte_offset: 0,
byte_length: content.len() as u64,
sequence: 0,
text_hash: [0; 32],
};
graph.insert_chunk(&chunk).unwrap();
let vec = embedder.embed(&content).unwrap();
let emb = cp_core::Embedding::new(chunk.id, &vec, embedder.model_hash().unwrap(), 0);
graph.insert_embedding(&emb).unwrap();
ingested += 1;
}
assert!(
ingested >= 2,
"Need at least 2 corpus files, found {ingested}"
);
let qe = QueryEngine::new(Arc::new(Mutex::new(graph)), embedder);
let q1 = "public key encryption and digital signatures";
let res1 = qe.search(q1, 3).unwrap();
println!("\n--- QUERY: '{q1}' ---");
for (i, r) in res1.iter().enumerate() {
println!("Rank {}: [Score: {:.4}] {}", i + 1, r.score, r.doc_path);
}
assert!(!res1.is_empty(), "Should return results");
assert!(
res1[0].doc_path.contains("cryptography"),
"Top result should be cryptography.md, got: {}",
res1[0].doc_path
);
let q2 = "qubit superposition entanglement error correction";
let res2 = qe.search(q2, 3).unwrap();
println!("\n--- QUERY: '{q2}' ---");
for (i, r) in res2.iter().enumerate() {
println!("Rank {}: [Score: {:.4}] {}", i + 1, r.score, r.doc_path);
}
assert!(!res2.is_empty(), "Should return results");
assert!(
res2[0].doc_path.contains("quantum"),
"Top result should be quantum_computing.md, got: {}",
res2[0].doc_path
);
let q3 = "BM25 inverted index TF-IDF";
let res3 = qe.search(q3, 5).unwrap();
println!("\n--- QUERY: '{q3}' ---");
for (i, r) in res3.iter().enumerate() {
println!("Rank {}: [Score: {:.4}] {}", i + 1, r.score, r.doc_path);
}
assert!(!res3.is_empty(), "Should return results");
assert!(
res3[0].doc_path.contains("information_retrieval"),
"Top result should be information_retrieval.md, got: {}",
res3[0].doc_path
);
}
#[tokio::test]
async fn test_search_modes() {
let temp = TempDir::new().unwrap();
let db_path = temp.path().join("modes.db");
let mut graph = cp_graph::GraphStore::open(db_path.to_str().unwrap()).unwrap();
let doc = Document::new(
"test.md".into(),
b"The quick brown fox jumps over the lazy dog",
100,
);
graph.insert_document(&doc).unwrap();
let chunk = Chunk {
id: Uuid::new_v4(),
doc_id: doc.id,
text: "The quick brown fox jumps over the lazy dog".to_string(),
byte_offset: 0,
byte_length: 43,
sequence: 0,
text_hash: [0; 32],
};
graph.insert_chunk(&chunk).unwrap();
let embedder = Arc::new(cp_embeddings::EmbeddingEngine::new().unwrap());
let vec = embedder.embed(&chunk.text).unwrap();
let emb = cp_core::Embedding::new(chunk.id, &vec, embedder.model_hash().unwrap(), 0);
graph.insert_embedding(&emb).unwrap();
let qe = QueryEngine::new(Arc::new(Mutex::new(graph)), embedder);
let results = qe.search_semantic("canine", 5).unwrap();
assert!(!results.is_empty());
let results = qe.search_lexical("fox", 5).unwrap();
assert!(!results.is_empty());
assert!(results[0].chunk.text.contains("fox"));
}
fn make_local_result(
chunk_id_byte: u8,
text: &str,
doc_path: &str,
score: f32,
) -> SearchResult {
SearchResult {
chunk: Chunk {
id: Uuid::from_bytes([chunk_id_byte; 16]),
doc_id: Uuid::from_bytes([0u8; 16]),
text: text.to_string(),
byte_offset: 0,
byte_length: text.len() as u64,
sequence: 0,
text_hash: [0u8; 32],
},
score,
doc_path: doc_path.to_string(),
}
}
fn make_remote_result(
chunk_id_byte: u8,
text: &str,
doc_path: &str,
score: u32,
) -> RemoteSearchResult {
RemoteSearchResult {
chunk_id: [chunk_id_byte; 16],
chunk_text: text.to_string(),
document_path: doc_path.to_string(),
score,
merkle_proof: None,
}
}
fn make_verified(
result: RemoteSearchResult,
weight: f64,
peer_byte: u8,
) -> VerifiedRemoteResult {
VerifiedRemoteResult {
result,
weight,
peer_node_id: [peer_byte; 16],
peer_state_root: [0u8; 32],
peer_signature: [0u8; 64],
}
}
#[test]
fn test_merge_local_only() {
let local = vec![
make_local_result(1, "chunk A", "doc_a.md", 0.9),
make_local_result(2, "chunk B", "doc_b.md", 0.8),
];
let merged = merge_results(&local, &[], 10);
assert_eq!(merged.len(), 2);
assert!(merged[0].score > merged[1].score);
assert!(matches!(merged[0].source, ResultSource::Local));
assert_eq!(merged[0].chunk_text, "chunk A");
}
#[test]
fn test_merge_remote_only() {
let remote = vec![make_verified(
make_remote_result(3, "remote chunk", "remote.md", 16000),
0.8,
99,
)];
let merged = merge_results(&[], &remote, 10);
assert_eq!(merged.len(), 1);
assert!(matches!(merged[0].source, ResultSource::Remote { .. }));
assert_eq!(merged[0].chunk_text, "remote chunk");
assert!(merged[0].peer_state_root.is_some());
}
#[test]
fn test_merge_deduplication() {
let local = vec![make_local_result(1, "shared chunk local", "doc.md", 0.9)];
let remote = vec![make_verified(
make_remote_result(1, "shared chunk remote", "doc.md", 16000),
0.7,
50,
)];
let merged = merge_results(&local, &remote, 10);
assert_eq!(merged.len(), 1, "Duplicate chunk should be merged");
assert!(matches!(merged[0].source, ResultSource::Both { .. }));
let local_only = merge_results(&local, &[], 10);
assert!(
merged[0].score > local_only[0].score,
"Merged score ({}) should exceed local-only score ({})",
merged[0].score,
local_only[0].score
);
}
#[test]
fn test_merge_weight_affects_score() {
let remote_high = vec![make_verified(
make_remote_result(1, "high weight", "doc.md", 16000),
1.0,
10,
)];
let remote_low = vec![make_verified(
make_remote_result(1, "low weight", "doc.md", 16000),
0.3,
20,
)];
let merged_high = merge_results(&[], &remote_high, 10);
let merged_low = merge_results(&[], &remote_low, 10);
assert!(
merged_high[0].score > merged_low[0].score,
"Higher weight ({}) should produce higher score ({}) vs ({})",
1.0,
merged_high[0].score,
merged_low[0].score,
);
}
#[test]
fn test_merge_respects_max_results() {
let local: Vec<SearchResult> = (0..15)
.map(|i| {
make_local_result(
i,
&format!("chunk {i}"),
"doc.md",
0.9 - f32::from(i) * 0.01,
)
})
.collect();
let remote: Vec<VerifiedRemoteResult> = (15..30)
.map(|i| {
make_verified(
make_remote_result(i, &format!("remote {i}"), "remote.md", 10000),
0.5,
99,
)
})
.collect();
let merged = merge_results(&local, &remote, 10);
assert_eq!(merged.len(), 10, "Should cap at max_results");
}
#[test]
fn test_merge_cap_at_20() {
let local: Vec<SearchResult> = (0..25)
.map(|i| make_local_result(i, &format!("c{i}"), "d.md", 0.5))
.collect();
let merged = merge_results(&local, &[], 100);
assert_eq!(merged.len(), 20, "Should cap at MAX_RESULTS (20)");
}
#[test]
fn test_merge_scores_decrease() {
let local = vec![
make_local_result(1, "first", "a.md", 0.9),
make_local_result(2, "second", "b.md", 0.8),
make_local_result(3, "third", "c.md", 0.7),
];
let remote = vec![
make_verified(make_remote_result(4, "r1", "d.md", 16000), 0.8, 10),
make_verified(make_remote_result(5, "r2", "e.md", 15000), 0.8, 10),
];
let merged = merge_results(&local, &remote, 20);
for w in merged.windows(2) {
assert!(
w[0].score >= w[1].score,
"Scores should be in descending order: {} >= {}",
w[0].score,
w[1].score
);
}
}
#[test]
fn test_merge_empty_inputs() {
let merged = merge_results(&[], &[], 10);
assert!(merged.is_empty());
}
#[test]
fn test_verify_and_extract_valid() {
let signing_key = ed25519_dalek::SigningKey::from_bytes(&[88u8; 32]);
let public_key = signing_key.verifying_key().to_bytes();
let mut response = SearchResponse {
request_id: [1u8; 16],
status: SearchStatus::Ok,
results: vec![RemoteSearchResult {
chunk_id: [2u8; 16],
chunk_text: "test chunk".to_string(),
document_path: "test.md".to_string(),
score: 16000,
merkle_proof: None,
}],
peer_state_root: [3u8; 32],
search_latency_ms: 50,
timestamp: 1000,
signature: [0u8; 64],
};
let signing_bytes = response.signing_bytes();
response.signature = ed25519_dalek::Signer::sign(&signing_key, &signing_bytes).to_bytes();
let extracted = verify_and_extract(&response, &public_key, [10u8; 16], Some(0.9));
assert!(extracted.is_some());
let results = extracted.unwrap();
assert_eq!(results.len(), 1);
assert!((results[0].weight - 0.9).abs() < 0.001);
}
#[test]
fn test_verify_and_extract_bad_signature() {
let response = SearchResponse {
request_id: [1u8; 16],
status: SearchStatus::Ok,
results: vec![],
peer_state_root: [0u8; 32],
search_latency_ms: 0,
timestamp: 1000,
signature: [0u8; 64], };
let fake_key = [0u8; 32];
let extracted = verify_and_extract(&response, &fake_key, [10u8; 16], None);
assert!(extracted.is_none());
}
#[test]
fn test_verify_and_extract_non_ok_status() {
let response = SearchResponse {
request_id: [1u8; 16],
status: SearchStatus::ModelMismatch,
results: vec![],
peer_state_root: [0u8; 32],
search_latency_ms: 0,
timestamp: 1000,
signature: [0u8; 64],
};
let key = [0u8; 32];
let extracted = verify_and_extract(&response, &key, [10u8; 16], None);
assert!(extracted.is_none(), "Non-Ok status should return None");
}
#[test]
fn test_verify_and_extract_unrated_peer() {
let signing_key = ed25519_dalek::SigningKey::from_bytes(&[77u8; 32]);
let public_key = signing_key.verifying_key().to_bytes();
let mut response = SearchResponse {
request_id: [1u8; 16],
status: SearchStatus::Ok,
results: vec![RemoteSearchResult {
chunk_id: [5u8; 16],
chunk_text: "unrated".to_string(),
document_path: "doc.md".to_string(),
score: 14000,
merkle_proof: None,
}],
peer_state_root: [0u8; 32],
search_latency_ms: 10,
timestamp: 2000,
signature: [0u8; 64],
};
let signing_bytes = response.signing_bytes();
response.signature = ed25519_dalek::Signer::sign(&signing_key, &signing_bytes).to_bytes();
let extracted = verify_and_extract(&response, &public_key, [20u8; 16], None);
assert!(extracted.is_some());
assert!((extracted.unwrap()[0].weight - 0.5).abs() < 0.001);
}
#[test]
fn test_verify_and_extract_clamps_weight() {
let signing_key = ed25519_dalek::SigningKey::from_bytes(&[77u8; 32]);
let public_key = signing_key.verifying_key().to_bytes();
let mut response = SearchResponse {
request_id: [1u8; 16],
status: SearchStatus::Ok,
results: vec![RemoteSearchResult {
chunk_id: [5u8; 16],
chunk_text: "clamped".to_string(),
document_path: "doc.md".to_string(),
score: 14000,
merkle_proof: None,
}],
peer_state_root: [0u8; 32],
search_latency_ms: 10,
timestamp: 2000,
signature: [0u8; 64],
};
let signing_bytes = response.signing_bytes();
response.signature = ed25519_dalek::Signer::sign(&signing_key, &signing_bytes).to_bytes();
let extracted = verify_and_extract(&response, &public_key, [20u8; 16], Some(0.01));
assert!((extracted.unwrap()[0].weight - 0.3).abs() < 0.001);
let mut response2 = response.clone();
let signing_bytes2 = response2.signing_bytes();
response2.signature = ed25519_dalek::Signer::sign(&signing_key, &signing_bytes2).to_bytes();
let extracted2 = verify_and_extract(&response2, &public_key, [20u8; 16], Some(5.0));
assert!((extracted2.unwrap()[0].weight - 1.0).abs() < 0.001);
}
}