Skip to main content

cp_validator/
evaluation.rs

1//! Evaluation pipeline for content contributors and live peers.
2//!
3//! Per CP-015 section 5: Validators download content from Arweave, index it
4//! locally in an isolated substrate, run test queries, and measure retrieval
5//! quality using precision@10, NDCG@10, and MRR.
6//!
7//! Per CP-015 section 13: Validators test live peers by connecting over Tor,
8//! sending test queries, and measuring search quality and latency.
9
10use crate::corpus::TestQuery;
11use crate::rating::Rating;
12use crate::window::ValidationWindow;
13use cp_arweave::ArweaveClient;
14use cp_graph::GraphStore;
15use cp_tor::types::{PeerRegistration, SearchResponse, SearchStatus};
16use ed25519_dalek::{Signer, SigningKey};
17use serde::{Deserialize, Serialize};
18use serde_big_array::BigArray;
19use std::collections::HashMap;
20use std::sync::Arc;
21use std::time::Instant;
22use tracing::{debug, info, warn};
23use uuid::Uuid;
24
25/// Quality metrics for a contributor's content, measured against ground truth.
26///
27/// Per CP-015 section 1.
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct QualityMetrics {
30    /// Fraction of top-10 results that are relevant.
31    pub precision_at_10: f32,
32    /// Normalized discounted cumulative gain at 10.
33    pub ndcg_at_10: f32,
34    /// Mean reciprocal rank of first relevant result.
35    pub mrr: f32,
36    /// Number of valid Merkle proofs in content.
37    pub merkle_proofs_valid: u32,
38    /// Number of invalid or missing proofs.
39    pub merkle_proofs_invalid: u32,
40    /// Adapter perplexity delta (negative means improvement).
41    pub adapter_perplexity_delta: Option<f32>,
42}
43
44/// Result of evaluating a single contributor's content.
45///
46/// Per CP-015 section 10.
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct EvaluationResult {
49    /// Ed25519 public key of the contributor.
50    pub contributor_key: [u8; 32],
51    /// Node ID of the validator that performed this evaluation.
52    pub validator_node_id: [u8; 16],
53    /// Window ID when this evaluation was performed.
54    pub window_id: u64,
55    /// Aggregated quality metrics.
56    pub metrics: QualityMetrics,
57    /// Timestamp of the evaluation (Unix milliseconds).
58    pub timestamp: i64,
59    /// Ed25519 signature over (`contributor_key` || `window_id` || metrics).
60    #[serde(with = "BigArray")]
61    pub signature: [u8; 64],
62}
63
64impl EvaluationResult {
65    /// Compute the bytes to sign: `BLAKE3(CBOR(contributor_key` || `window_id` || metrics)).
66    pub fn signing_bytes(&self) -> [u8; 32] {
67        let signable = EvaluationResultSignable {
68            contributor_key: &self.contributor_key,
69            window_id: self.window_id,
70            metrics: &self.metrics,
71            timestamp: self.timestamp,
72        };
73        let mut buf = Vec::new();
74        ciborium::into_writer(&signable, &mut buf).expect("CBOR serialization cannot fail");
75        *blake3::hash(&buf).as_bytes()
76    }
77}
78
79#[derive(Serialize)]
80struct EvaluationResultSignable<'a> {
81    contributor_key: &'a [u8; 32],
82    window_id: u64,
83    metrics: &'a QualityMetrics,
84    timestamp: i64,
85}
86
87/// Quality metrics for a live peer, including latency and availability.
88///
89/// Per CP-015 section 13.
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct PeerQualityMetrics {
92    /// Fraction of top-10 results that are relevant.
93    pub precision_at_10: f32,
94    /// Normalized discounted cumulative gain at 10.
95    pub ndcg_at_10: f32,
96    /// Mean reciprocal rank of first relevant result.
97    pub mrr: f32,
98    /// Average response latency in milliseconds.
99    pub response_latency_ms: u16,
100    /// Fraction of results with valid Merkle proofs.
101    pub proof_validity_rate: f32,
102    /// Fraction of test windows where peer was reachable.
103    pub availability_rate: f32,
104}
105
106/// Result of testing a live peer.
107///
108/// Per CP-015 section 13.
109#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct PeerTestResult {
111    /// Node ID of the tested peer.
112    pub peer_node_id: [u8; 16],
113    /// Onion address of the tested peer.
114    pub peer_onion: String,
115    /// Node ID of the validator.
116    pub validator_node_id: [u8; 16],
117    /// Window ID when this test was performed.
118    pub window_id: u64,
119    /// Quality metrics.
120    pub metrics: PeerQualityMetrics,
121    /// Timestamp (Unix milliseconds).
122    pub timestamp: i64,
123    /// Ed25519 signature.
124    #[serde(with = "BigArray")]
125    pub signature: [u8; 64],
126}
127
128impl PeerTestResult {
129    /// Compute the bytes to sign.
130    pub fn signing_bytes(&self) -> [u8; 32] {
131        let signable = PeerTestResultSignable {
132            peer_node_id: &self.peer_node_id,
133            peer_onion: &self.peer_onion,
134            window_id: self.window_id,
135            metrics: &self.metrics,
136            timestamp: self.timestamp,
137        };
138        let mut buf = Vec::new();
139        ciborium::into_writer(&signable, &mut buf).expect("CBOR serialization cannot fail");
140        *blake3::hash(&buf).as_bytes()
141    }
142}
143
144#[derive(Serialize)]
145struct PeerTestResultSignable<'a> {
146    peer_node_id: &'a [u8; 16],
147    peer_onion: &'a str,
148    window_id: u64,
149    metrics: &'a PeerQualityMetrics,
150    timestamp: i64,
151}
152
153// ============================================================================
154// Retrieval quality metrics
155// ============================================================================
156
157/// Precision at K: fraction of the top-K results that are relevant.
158///
159/// Given a ranked list of returned IDs and a set of known relevant IDs,
160/// counts how many of the top K returned IDs appear in the relevant set.
161///
162/// Returns 0.0 if k is 0 or returned is empty.
163pub fn precision_at_k(returned: &[Uuid], relevant: &[Uuid], k: usize) -> f32 {
164    if k == 0 || returned.is_empty() {
165        return 0.0;
166    }
167
168    let take = returned.len().min(k);
169    let relevant_count = returned[..take]
170        .iter()
171        .filter(|id| relevant.contains(id))
172        .count();
173
174    relevant_count as f32 / k as f32
175}
176
177/// Normalized Discounted Cumulative Gain at K.
178///
179/// Measures ranking quality accounting for position: relevant results
180/// appearing earlier are more valuable. Uses graded relevance (0-3)
181/// when available, falling back to binary relevance from the relevant set.
182///
183/// The `relevance_grades` map provides graded relevance for each ID.
184/// IDs not in the map but present in the relevant set get grade 1.
185/// IDs in neither get grade 0.
186///
187/// DCG@K = sum_{i=1}^{K} (`2^rel_i` - 1) / log2(i + 1)
188/// NDCG@K = DCG@K / IDCG@K
189///
190/// where IDCG@K is the DCG of the ideal ranking.
191pub fn ndcg_at_k(
192    returned: &[Uuid],
193    relevant: &[Uuid],
194    relevance_grades: &HashMap<Uuid, u8>,
195    k: usize,
196) -> f32 {
197    if k == 0 || returned.is_empty() || relevant.is_empty() {
198        return 0.0;
199    }
200
201    let take = returned.len().min(k);
202
203    // Compute DCG for the returned ranking
204    let dcg = compute_dcg(&returned[..take], relevant, relevance_grades);
205
206    // Compute ideal DCG: sort all known relevant items by their grade, descending
207    let mut ideal_grades: Vec<u8> = relevant
208        .iter()
209        .map(|id| *relevance_grades.get(id).unwrap_or(&1))
210        .collect();
211    ideal_grades.sort_unstable_by(|a, b| b.cmp(a));
212    ideal_grades.truncate(k);
213
214    let idcg: f64 = ideal_grades
215        .iter()
216        .enumerate()
217        .map(|(i, &grade)| {
218            let gain = (2.0_f64).powi(i32::from(grade)) - 1.0;
219            gain / (i as f64 + 2.0).log2()
220        })
221        .sum();
222
223    if idcg == 0.0 {
224        return 0.0;
225    }
226
227    (dcg / idcg) as f32
228}
229
230/// Compute Discounted Cumulative Gain for a ranked list.
231fn compute_dcg(ranked: &[Uuid], relevant: &[Uuid], relevance_grades: &HashMap<Uuid, u8>) -> f64 {
232    ranked
233        .iter()
234        .enumerate()
235        .map(|(i, id)| {
236            let grade = if let Some(&g) = relevance_grades.get(id) {
237                g
238            } else {
239                u8::from(relevant.contains(id))
240            };
241            let gain = (2.0_f64).powi(i32::from(grade)) - 1.0;
242            gain / (i as f64 + 2.0).log2()
243        })
244        .sum()
245}
246
247/// Mean Reciprocal Rank: the reciprocal of the rank of the first relevant result.
248///
249/// If no relevant result is found in the returned list, returns 0.0.
250/// Rank is 1-indexed (first position = rank 1).
251pub fn mrr(returned: &[Uuid], relevant: &[Uuid]) -> f32 {
252    for (i, id) in returned.iter().enumerate() {
253        if relevant.contains(id) {
254            return 1.0 / (i as f32 + 1.0);
255        }
256    }
257    0.0
258}
259
260/// Composite quality score for a contributor evaluation result.
261///
262/// Per CP-015 section 7:
263///   score = 0.4 * ndcg@10 + 0.3 * precision@10 + 0.2 * mrr
264///         + 0.1 * `proof_validity_rate`
265///   With a 0.8 penalty multiplier if `adapter_perplexity_delta` > 0.
266pub fn composite_score(metrics: &QualityMetrics) -> f32 {
267    let proof_rate = if metrics.merkle_proofs_valid + metrics.merkle_proofs_invalid > 0 {
268        metrics.merkle_proofs_valid as f32
269            / (metrics.merkle_proofs_valid + metrics.merkle_proofs_invalid) as f32
270    } else {
271        0.0
272    };
273
274    let mut score = 0.4 * metrics.ndcg_at_10
275        + 0.3 * metrics.precision_at_10
276        + 0.2 * metrics.mrr
277        + 0.1 * proof_rate;
278
279    // Penalty for adapter degradation
280    if let Some(delta) = metrics.adapter_perplexity_delta {
281        if delta > 0.0 {
282            score *= 0.8;
283        }
284    }
285
286    score
287}
288
289/// Composite quality score for a peer test result.
290///
291/// Per CP-015 section 14:
292///   70% quality + 20% reliability + 10% latency
293pub fn peer_composite_score(metrics: &PeerQualityMetrics) -> f32 {
294    let quality = 0.4 * metrics.ndcg_at_10 + 0.2 * metrics.precision_at_10 + 0.1 * metrics.mrr;
295
296    let reliability = metrics.proof_validity_rate * metrics.availability_rate;
297
298    // Latency score: 1.0 at 0ms, 0.0 at 5000ms, linear
299    let latency_score = (1.0 - f32::from(metrics.response_latency_ms) / 5000.0).max(0.0);
300
301    0.7 * quality + 0.2 * reliability + 0.1 * latency_score
302}
303
304/// Evaluate a contributor by downloading their content from Arweave,
305/// indexing it in an isolated substrate, and running test queries.
306///
307/// Per CP-015 section 5.
308pub async fn evaluate_contributor(
309    contributor_key: [u8; 32],
310    arweave: &ArweaveClient,
311    test_queries: &[TestQuery],
312    validator_node_id: [u8; 16],
313    signing_key: &SigningKey,
314) -> Result<EvaluationResult, crate::ValidatorError> {
315    info!(
316        contributor = hex::encode(contributor_key),
317        queries = test_queries.len(),
318        "Evaluating contributor"
319    );
320
321    // 1. Discover contributor's content on Arweave
322    let contributor_hex = hex::encode(contributor_key);
323    let (discovered, _cursor) =
324        cp_arweave::discover_diffs_by_contributor(arweave, &contributor_hex, 50, None)
325            .await
326            .map_err(|e| crate::ValidatorError::Arweave(e.to_string()))?;
327
328    if discovered.is_empty() {
329        return Err(crate::ValidatorError::NoContent(contributor_hex));
330    }
331
332    // 2. Build a temporary local index of the contributor's content
333    let temp_dir = tempfile::tempdir()
334        .map_err(|e| crate::ValidatorError::Internal(format!("Failed to create temp dir: {e}")))?;
335    let db_path = temp_dir.path().join("eval.db");
336    let db_path_str = db_path.to_string_lossy().to_string();
337
338    let mut local_index = GraphStore::open(&db_path_str)
339        .map_err(|e| crate::ValidatorError::Internal(format!("Failed to open graph store: {e}")))?;
340
341    let mut downloaded_count = 0u64;
342    for diff_meta in &discovered {
343        match cp_arweave::download_diff(arweave, diff_meta).await {
344            Ok(downloaded) => {
345                for doc in &downloaded.diff.added_docs {
346                    let _ = local_index.insert_document(doc);
347                }
348                for chunk in &downloaded.diff.added_chunks {
349                    let _ = local_index.insert_chunk(chunk);
350                }
351                for emb in &downloaded.diff.added_embeddings {
352                    let _ = local_index.insert_embedding(emb);
353                }
354                for edge in &downloaded.diff.added_edges {
355                    let _ = local_index.add_edge(edge);
356                }
357                downloaded_count += 1;
358            }
359            Err(e) => {
360                warn!(tx_id = diff_meta.tx_id, error = %e, "Failed to download diff");
361            }
362        }
363    }
364
365    if downloaded_count == 0 {
366        return Err(crate::ValidatorError::NoContent(contributor_hex));
367    }
368
369    info!(
370        contributor = hex::encode(contributor_key),
371        diffs = downloaded_count,
372        "Indexed contributor content"
373    );
374
375    // 3. Run test queries against the contributor's content
376    let mut precision_sum = 0.0f32;
377    let mut ndcg_sum = 0.0f32;
378    let mut mrr_sum = 0.0f32;
379    let mut query_count = 0u32;
380
381    for query in test_queries {
382        // Convert i16 query embedding to f32 for search
383        let query_f32: Vec<f32> = query
384            .query_embedding
385            .iter()
386            .map(|&v| f32::from(v) / 32767.0)
387            .collect();
388
389        let results = local_index.search(&query_f32, 10);
390
391        match results {
392            Ok(result_ids) => {
393                let returned_uuids: Vec<Uuid> = result_ids.iter().map(|(id, _score)| *id).collect();
394
395                let relevant_uuids: Vec<Uuid> = query.relevant_chunk_ids.clone();
396
397                precision_sum += precision_at_k(&returned_uuids, &relevant_uuids, 10);
398                ndcg_sum += ndcg_at_k(
399                    &returned_uuids,
400                    &relevant_uuids,
401                    &query.relevance_grade_map(),
402                    10,
403                );
404                mrr_sum += mrr(&returned_uuids, &relevant_uuids);
405                query_count += 1;
406            }
407            Err(e) => {
408                debug!(error = %e, "Search failed for query");
409            }
410        }
411    }
412
413    // 4. Verify corpus integrity: check that each document's hierarchical_hash
414    // matches the Merkle root of its chunk text_hashes (Bug 7 + 8 fix)
415    let mut proofs_valid = 0u32;
416    let mut proofs_invalid = 0u32;
417    let all_docs = local_index.get_all_documents().unwrap_or_default();
418    for doc in &all_docs {
419        if doc.hierarchical_hash == [0u8; 32] {
420            // Placeholder hash — contributor never set it
421            proofs_invalid += 1;
422            continue;
423        }
424        let chunks = local_index.get_chunks_for_doc(doc.id).unwrap_or_default();
425        if chunks.is_empty() {
426            proofs_invalid += 1;
427            continue;
428        }
429        let chunk_hashes: Vec<[u8; 32]> = chunks.iter().map(|c| c.text_hash).collect();
430        let recomputed = cp_core::Document::compute_hierarchical_hash(&chunk_hashes);
431        if recomputed == doc.hierarchical_hash {
432            proofs_valid += 1;
433        } else {
434            proofs_invalid += 1;
435        }
436    }
437
438    let metrics = if query_count > 0 {
439        QualityMetrics {
440            precision_at_10: precision_sum / query_count as f32,
441            ndcg_at_10: ndcg_sum / query_count as f32,
442            mrr: mrr_sum / query_count as f32,
443            merkle_proofs_valid: proofs_valid,
444            merkle_proofs_invalid: proofs_invalid,
445            adapter_perplexity_delta: None,
446        }
447    } else {
448        QualityMetrics {
449            precision_at_10: 0.0,
450            ndcg_at_10: 0.0,
451            mrr: 0.0,
452            merkle_proofs_valid: proofs_valid,
453            merkle_proofs_invalid: proofs_invalid,
454            adapter_perplexity_delta: None,
455        }
456    };
457
458    let window = ValidationWindow::current();
459    let now = std::time::SystemTime::now()
460        .duration_since(std::time::UNIX_EPOCH)
461        .unwrap()
462        .as_millis() as i64;
463
464    let mut result = EvaluationResult {
465        contributor_key,
466        validator_node_id,
467        window_id: window.window_id,
468        metrics,
469        timestamp: now,
470        signature: [0u8; 64],
471    };
472
473    // Sign the result
474    let hash = result.signing_bytes();
475    let sig = signing_key.sign(&hash);
476    result.signature = sig.to_bytes();
477
478    Ok(result)
479}
480
481/// Evaluate a local graph store directly by running test queries against it.
482///
483/// This extracts the query-running and metrics logic from `evaluate_contributor()`
484/// into a reusable function that works against any `GraphStore` without requiring
485/// Arweave download. Useful for local validation and testing.
486pub fn evaluate_local_graph(
487    graph: &Arc<std::sync::Mutex<GraphStore>>,
488    test_queries: &[TestQuery],
489    validator_node_id: [u8; 16],
490    signing_key: &SigningKey,
491) -> Result<EvaluationResult, crate::ValidatorError> {
492    let local_index = graph
493        .lock()
494        .map_err(|e| crate::ValidatorError::Internal(format!("Graph lock poisoned: {e}")))?;
495
496    let mut precision_sum = 0.0f32;
497    let mut ndcg_sum = 0.0f32;
498    let mut mrr_sum = 0.0f32;
499    let mut query_count = 0u32;
500
501    for query in test_queries {
502        let query_f32: Vec<f32> = query
503            .query_embedding
504            .iter()
505            .map(|&v| f32::from(v) / 32767.0)
506            .collect();
507        let results = local_index.search(&query_f32, 10);
508        match results {
509            Ok(result_ids) => {
510                let returned_uuids: Vec<Uuid> = result_ids.iter().map(|(id, _score)| *id).collect();
511                let relevant_uuids: Vec<Uuid> = query.relevant_chunk_ids.clone();
512
513                precision_sum += precision_at_k(&returned_uuids, &relevant_uuids, 10);
514                ndcg_sum += ndcg_at_k(
515                    &returned_uuids,
516                    &relevant_uuids,
517                    &query.relevance_grade_map(),
518                    10,
519                );
520                mrr_sum += mrr(&returned_uuids, &relevant_uuids);
521                query_count += 1;
522            }
523            Err(e) => {
524                debug!(error = %e, "Search failed for query");
525            }
526        }
527    }
528
529    // Verify corpus integrity
530    let mut proofs_valid = 0u32;
531    let mut proofs_invalid = 0u32;
532    let all_docs = local_index.get_all_documents().unwrap_or_default();
533    for doc in &all_docs {
534        if doc.hierarchical_hash == [0u8; 32] {
535            proofs_invalid += 1;
536            continue;
537        }
538        let chunks = local_index.get_chunks_for_doc(doc.id).unwrap_or_default();
539        if chunks.is_empty() {
540            proofs_invalid += 1;
541            continue;
542        }
543        let chunk_hashes: Vec<[u8; 32]> = chunks.iter().map(|c| c.text_hash).collect();
544        let recomputed = cp_core::Document::compute_hierarchical_hash(&chunk_hashes);
545        if recomputed == doc.hierarchical_hash {
546            proofs_valid += 1;
547        } else {
548            proofs_invalid += 1;
549        }
550    }
551
552    let metrics = if query_count > 0 {
553        QualityMetrics {
554            precision_at_10: precision_sum / query_count as f32,
555            ndcg_at_10: ndcg_sum / query_count as f32,
556            mrr: mrr_sum / query_count as f32,
557            merkle_proofs_valid: proofs_valid,
558            merkle_proofs_invalid: proofs_invalid,
559            adapter_perplexity_delta: None,
560        }
561    } else {
562        QualityMetrics {
563            precision_at_10: 0.0,
564            ndcg_at_10: 0.0,
565            mrr: 0.0,
566            merkle_proofs_valid: proofs_valid,
567            merkle_proofs_invalid: proofs_invalid,
568            adapter_perplexity_delta: None,
569        }
570    };
571
572    let window = ValidationWindow::current();
573    let now = std::time::SystemTime::now()
574        .duration_since(std::time::UNIX_EPOCH)
575        .unwrap()
576        .as_millis() as i64;
577
578    // Use a zero contributor_key since this is local evaluation
579    let mut result = EvaluationResult {
580        contributor_key: [0u8; 32],
581        validator_node_id,
582        window_id: window.window_id,
583        metrics,
584        timestamp: now,
585        signature: [0u8; 64],
586    };
587
588    let hash = result.signing_bytes();
589    let sig = signing_key.sign(&hash);
590    result.signature = sig.to_bytes();
591
592    Ok(result)
593}
594
595/// Evaluate a live peer by connecting over Tor, sending test queries, and
596/// measuring search quality and latency.
597///
598/// Per CP-015 section 13. Requires a running `TorRuntime` to establish
599/// connections to the peer's onion service.
600pub async fn evaluate_peer(
601    peer: &PeerRegistration,
602    test_queries: &[TestQuery],
603    validator_node_id: [u8; 16],
604    signing_key: &SigningKey,
605    model_hash: [u8; 32],
606    tor_runtime: &cp_tor::TorRuntime,
607) -> Result<PeerTestResult, crate::ValidatorError> {
608    info!(
609        peer = hex::encode(peer.node_id),
610        onion = &peer.onion_address,
611        "Testing live peer"
612    );
613
614    // Select up to 5 test queries per validation window
615    let selected = if test_queries.len() > 5 {
616        &test_queries[..5]
617    } else {
618        test_queries
619    };
620
621    let mut precision_sum = 0.0f32;
622    let mut ndcg_sum = 0.0f32;
623    let mut mrr_sum = 0.0f32;
624    let mut latency_sum = 0u64;
625    let mut valid_proofs = 0u32;
626    let mut total_proofs = 0u32;
627    let mut successful_queries = 0u32;
628
629    // Derive an ephemeral session key for these queries
630    let session_key = cp_tor::SessionKey::derive_with_salt(
631        &signing_key.to_bytes(),
632        &blake3::hash(b"validator-peer-test").as_bytes()[..32]
633            .try_into()
634            .unwrap(),
635    );
636
637    for query in selected {
638        let start = Instant::now();
639
640        // Connect to peer and send query
641        let response_result = tokio::time::timeout(
642            std::time::Duration::from_secs(5),
643            send_query_to_peer(
644                tor_runtime,
645                &peer.onion_address,
646                &query.query_embedding,
647                Some(query.query_text.clone()),
648                model_hash,
649                &session_key,
650                &peer.public_key,
651            ),
652        )
653        .await;
654
655        let elapsed_ms = start.elapsed().as_millis() as u64;
656
657        match response_result {
658            Ok(Ok(response)) if response.status == SearchStatus::Ok => {
659                latency_sum += elapsed_ms;
660
661                let returned_uuids: Vec<Uuid> = response
662                    .results
663                    .iter()
664                    .map(|r| Uuid::from_bytes(r.chunk_id))
665                    .collect();
666
667                let relevant_uuids: Vec<Uuid> = query.relevant_chunk_ids.clone();
668
669                precision_sum += precision_at_k(&returned_uuids, &relevant_uuids, 10);
670                ndcg_sum += ndcg_at_k(
671                    &returned_uuids,
672                    &relevant_uuids,
673                    &query.relevance_grade_map(),
674                    10,
675                );
676                mrr_sum += mrr(&returned_uuids, &relevant_uuids);
677
678                // Count proof validity
679                for result in &response.results {
680                    if let Some(proof_hashes) = &result.merkle_proof {
681                        total_proofs += 1;
682                        // The Tor search response provides proof as Vec<[u8; 32]>
683                        // (sibling hashes only). To use cp_core::state::verify_merkle_proof
684                        // we would need (hash, is_left) tuples and a leaf index.
685                        // For now, verify the proof chain manually: hash the chunk_id
686                        // as the leaf and walk up the tree. We assume left-first ordering
687                        // matching the canonical Merkle tree construction.
688                        let leaf = *blake3::hash(&result.chunk_id).as_bytes();
689                        let computed_root = proof_hashes.iter().fold(leaf, |current, sibling| {
690                            let mut hasher = blake3::Hasher::new();
691                            if current <= *sibling {
692                                hasher.update(&current);
693                                hasher.update(sibling);
694                            } else {
695                                hasher.update(sibling);
696                                hasher.update(&current);
697                            }
698                            *hasher.finalize().as_bytes()
699                        });
700                        if computed_root == response.peer_state_root {
701                            valid_proofs += 1;
702                        }
703                    }
704                }
705
706                successful_queries += 1;
707            }
708            Ok(Ok(response)) => {
709                debug!(status = ?response.status, "Peer returned non-OK status");
710            }
711            Ok(Err(e)) => {
712                debug!(error = %e, "Search request failed");
713            }
714            Err(_) => {
715                debug!("Search request timed out");
716            }
717        }
718    }
719
720    let metrics = if successful_queries > 0 {
721        PeerQualityMetrics {
722            precision_at_10: precision_sum / successful_queries as f32,
723            ndcg_at_10: ndcg_sum / successful_queries as f32,
724            mrr: mrr_sum / successful_queries as f32,
725            response_latency_ms: (latency_sum / u64::from(successful_queries))
726                .min(u64::from(u16::MAX)) as u16,
727            proof_validity_rate: if total_proofs > 0 {
728                valid_proofs as f32 / total_proofs as f32
729            } else {
730                0.0
731            },
732            availability_rate: 1.0,
733        }
734    } else {
735        PeerQualityMetrics {
736            precision_at_10: 0.0,
737            ndcg_at_10: 0.0,
738            mrr: 0.0,
739            response_latency_ms: 0,
740            proof_validity_rate: 0.0,
741            availability_rate: 0.0,
742        }
743    };
744
745    let window = ValidationWindow::current();
746    let now = std::time::SystemTime::now()
747        .duration_since(std::time::UNIX_EPOCH)
748        .unwrap()
749        .as_millis() as i64;
750
751    let mut result = PeerTestResult {
752        peer_node_id: peer.node_id,
753        peer_onion: peer.onion_address.clone(),
754        validator_node_id,
755        window_id: window.window_id,
756        metrics,
757        timestamp: now,
758        signature: [0u8; 64],
759    };
760
761    let hash = result.signing_bytes();
762    let sig = signing_key.sign(&hash);
763    result.signature = sig.to_bytes();
764
765    Ok(result)
766}
767
768/// Send a search query to a peer over Tor using the cp-tor client.
769async fn send_query_to_peer(
770    tor_runtime: &cp_tor::TorRuntime,
771    onion_address: &str,
772    query_embedding: &[i16],
773    query_text: Option<String>,
774    model_hash: [u8; 32],
775    session_key: &cp_tor::SessionKey,
776    peer_public_key: &[u8; 32],
777) -> Result<SearchResponse, crate::ValidatorError> {
778    let mut stream = tor_runtime
779        .connect_to_peer(onion_address)
780        .await
781        .map_err(|e| crate::ValidatorError::Tor(e.to_string()))?;
782
783    let response = cp_tor::search(
784        &mut stream,
785        query_embedding.to_vec(),
786        query_text,
787        10,
788        true,
789        model_hash,
790        session_key,
791        peer_public_key,
792    )
793    .await
794    .map_err(|e| crate::ValidatorError::Tor(e.to_string()))?;
795
796    Ok(response)
797}
798
799/// Update contributor ratings based on evaluation results using pairwise
800/// `OpenSkill` comparisons.
801///
802/// Per CP-015 section 7: Results are sorted by composite score, then
803/// adjacent pairs are compared using the `OpenSkill` update rule.
804pub fn update_contributor_ratings(
805    results: &[EvaluationResult],
806    ratings: &mut HashMap<[u8; 32], (Rating, u64, i64)>,
807) {
808    if results.len() < 2 {
809        return;
810    }
811
812    // Sort by composite score, descending
813    let mut ranked: Vec<&EvaluationResult> = results.iter().collect();
814    ranked.sort_by(|a, b| {
815        composite_score(&b.metrics)
816            .partial_cmp(&composite_score(&a.metrics))
817            .unwrap_or(std::cmp::Ordering::Equal)
818    });
819
820    let now = std::time::SystemTime::now()
821        .duration_since(std::time::UNIX_EPOCH)
822        .unwrap()
823        .as_millis() as i64;
824
825    // All-pairs comparisons: every higher-ranked entry beats every lower-ranked one
826    for i in 0..ranked.len() {
827        for j in (i + 1)..ranked.len() {
828            let winner_key = ranked[i].contributor_key;
829            let loser_key = ranked[j].contributor_key;
830
831            let winner_entry = ratings
832                .entry(winner_key)
833                .or_insert((Rating::default(), 0, now));
834            let winner_rating = winner_entry.0;
835
836            let loser_entry = ratings
837                .entry(loser_key)
838                .or_insert((Rating::default(), 0, now));
839            let loser_rating = loser_entry.0;
840
841            let (new_winner, new_loser) =
842                crate::rating::pairwise_update(&winner_rating, &loser_rating);
843
844            ratings.get_mut(&winner_key).unwrap().0 = new_winner;
845            ratings.get_mut(&winner_key).unwrap().1 += 1;
846            ratings.get_mut(&winner_key).unwrap().2 = now;
847
848            ratings.get_mut(&loser_key).unwrap().0 = new_loser;
849            ratings.get_mut(&loser_key).unwrap().1 += 1;
850            ratings.get_mut(&loser_key).unwrap().2 = now;
851        }
852    }
853}
854
855/// Update peer ratings based on peer test results using pairwise
856/// `OpenSkill` comparisons.
857///
858/// Same mechanism as contributor ratings (CP-015 section 14) but uses
859/// `peer_composite_score` for ranking.
860pub fn update_peer_ratings(
861    results: &[PeerTestResult],
862    ratings: &mut HashMap<[u8; 16], (Rating, u64, i64)>,
863) {
864    if results.len() < 2 {
865        return;
866    }
867
868    let mut ranked: Vec<&PeerTestResult> = results.iter().collect();
869    ranked.sort_by(|a, b| {
870        peer_composite_score(&b.metrics)
871            .partial_cmp(&peer_composite_score(&a.metrics))
872            .unwrap_or(std::cmp::Ordering::Equal)
873    });
874
875    let now = std::time::SystemTime::now()
876        .duration_since(std::time::UNIX_EPOCH)
877        .unwrap()
878        .as_millis() as i64;
879
880    // All-pairs comparisons: every higher-ranked entry beats every lower-ranked one
881    for i in 0..ranked.len() {
882        for j in (i + 1)..ranked.len() {
883            let winner_key = ranked[i].peer_node_id;
884            let loser_key = ranked[j].peer_node_id;
885
886            let winner_entry = ratings
887                .entry(winner_key)
888                .or_insert((Rating::default(), 0, now));
889            let winner_rating = winner_entry.0;
890
891            let loser_entry = ratings
892                .entry(loser_key)
893                .or_insert((Rating::default(), 0, now));
894            let loser_rating = loser_entry.0;
895
896            let (new_winner, new_loser) =
897                crate::rating::pairwise_update(&winner_rating, &loser_rating);
898
899            ratings.get_mut(&winner_key).unwrap().0 = new_winner;
900            ratings.get_mut(&winner_key).unwrap().1 += 1;
901            ratings.get_mut(&winner_key).unwrap().2 = now;
902
903            ratings.get_mut(&loser_key).unwrap().0 = new_loser;
904            ratings.get_mut(&loser_key).unwrap().1 += 1;
905            ratings.get_mut(&loser_key).unwrap().2 = now;
906        }
907    }
908}
909
910#[cfg(test)]
911mod tests {
912    use super::*;
913
914    fn make_uuids(n: usize) -> Vec<Uuid> {
915        (0..n)
916            .map(|i| {
917                let mut bytes = [0u8; 16];
918                bytes[0] = i as u8;
919                Uuid::from_bytes(bytes)
920            })
921            .collect()
922    }
923
924    // ---- precision_at_k ----
925
926    #[test]
927    fn test_precision_at_k_all_relevant() {
928        let ids = make_uuids(10);
929        let relevant = ids.clone();
930        assert!((precision_at_k(&ids, &relevant, 10) - 1.0).abs() < 1e-6);
931    }
932
933    #[test]
934    fn test_precision_at_k_none_relevant() {
935        let returned = make_uuids(10);
936        let relevant = vec![Uuid::from_bytes([99u8; 16])];
937        assert!((precision_at_k(&returned, &relevant, 10)).abs() < 1e-6);
938    }
939
940    #[test]
941    fn test_precision_at_k_half_relevant() {
942        let returned = make_uuids(10);
943        let relevant: Vec<Uuid> = returned[..5].to_vec();
944        assert!((precision_at_k(&returned, &relevant, 10) - 0.5).abs() < 1e-6);
945    }
946
947    #[test]
948    fn test_precision_at_k_fewer_results_than_k() {
949        let returned = make_uuids(3);
950        let relevant = returned.clone();
951        // 3 relevant out of k=10 -> 3/10 = 0.3
952        assert!((precision_at_k(&returned, &relevant, 10) - 0.3).abs() < 1e-6);
953    }
954
955    #[test]
956    fn test_precision_at_k_zero() {
957        let returned = make_uuids(5);
958        let relevant = returned.clone();
959        assert!((precision_at_k(&returned, &relevant, 0)).abs() < 1e-6);
960    }
961
962    #[test]
963    fn test_precision_at_k_empty_returned() {
964        let relevant = make_uuids(5);
965        assert!((precision_at_k(&[], &relevant, 10)).abs() < 1e-6);
966    }
967
968    // ---- ndcg_at_k ----
969
970    #[test]
971    fn test_ndcg_at_k_perfect_ranking() {
972        let ids = make_uuids(5);
973        let relevant = ids.clone();
974        let mut grades = HashMap::new();
975        for (i, id) in ids.iter().enumerate() {
976            grades.insert(*id, (5 - i) as u8);
977        }
978        let score = ndcg_at_k(&ids, &relevant, &grades, 5);
979        assert!(
980            (score - 1.0).abs() < 1e-5,
981            "Perfect NDCG should be 1.0, got {score}"
982        );
983    }
984
985    #[test]
986    fn test_ndcg_at_k_no_relevant() {
987        let returned = make_uuids(5);
988        let relevant: Vec<Uuid> = vec![];
989        let grades = HashMap::new();
990        assert!((ndcg_at_k(&returned, &relevant, &grades, 5)).abs() < 1e-6);
991    }
992
993    #[test]
994    fn test_ndcg_at_k_binary_relevance() {
995        let returned = make_uuids(5);
996        let relevant = vec![returned[4]];
997        let grades = HashMap::new();
998
999        let score = ndcg_at_k(&returned, &relevant, &grades, 5);
1000        assert!(score > 0.3 && score < 0.5, "Got NDCG = {score}");
1001    }
1002
1003    #[test]
1004    fn test_ndcg_at_k_zero_k() {
1005        let ids = make_uuids(5);
1006        let grades = HashMap::new();
1007        assert!((ndcg_at_k(&ids, &ids, &grades, 0)).abs() < 1e-6);
1008    }
1009
1010    // ---- mrr ----
1011
1012    #[test]
1013    fn test_mrr_first_position() {
1014        let returned = make_uuids(5);
1015        let relevant = vec![returned[0]];
1016        assert!((mrr(&returned, &relevant) - 1.0).abs() < 1e-6);
1017    }
1018
1019    #[test]
1020    fn test_mrr_second_position() {
1021        let returned = make_uuids(5);
1022        let relevant = vec![returned[1]];
1023        assert!((mrr(&returned, &relevant) - 0.5).abs() < 1e-6);
1024    }
1025
1026    #[test]
1027    fn test_mrr_third_position() {
1028        let returned = make_uuids(5);
1029        let relevant = vec![returned[2]];
1030        let score = mrr(&returned, &relevant);
1031        assert!(
1032            (score - 1.0 / 3.0).abs() < 1e-6,
1033            "Expected 1/3, got {score}"
1034        );
1035    }
1036
1037    #[test]
1038    fn test_mrr_no_relevant() {
1039        let returned = make_uuids(5);
1040        let relevant = vec![Uuid::from_bytes([99u8; 16])];
1041        assert!((mrr(&returned, &relevant)).abs() < 1e-6);
1042    }
1043
1044    #[test]
1045    fn test_mrr_empty() {
1046        let relevant = make_uuids(3);
1047        assert!((mrr(&[], &relevant)).abs() < 1e-6);
1048    }
1049
1050    #[test]
1051    fn test_mrr_multiple_relevant_returns_first() {
1052        let returned = make_uuids(5);
1053        let relevant = vec![returned[1], returned[3]];
1054        assert!((mrr(&returned, &relevant) - 0.5).abs() < 1e-6);
1055    }
1056
1057    // ---- composite_score ----
1058
1059    #[test]
1060    fn test_composite_score_perfect() {
1061        let metrics = QualityMetrics {
1062            precision_at_10: 1.0,
1063            ndcg_at_10: 1.0,
1064            mrr: 1.0,
1065            merkle_proofs_valid: 10,
1066            merkle_proofs_invalid: 0,
1067            adapter_perplexity_delta: None,
1068        };
1069        let score = composite_score(&metrics);
1070        assert!(
1071            (score - 1.0).abs() < 1e-6,
1072            "Perfect score should be 1.0, got {score}"
1073        );
1074    }
1075
1076    #[test]
1077    fn test_composite_score_zero() {
1078        let metrics = QualityMetrics {
1079            precision_at_10: 0.0,
1080            ndcg_at_10: 0.0,
1081            mrr: 0.0,
1082            merkle_proofs_valid: 0,
1083            merkle_proofs_invalid: 0,
1084            adapter_perplexity_delta: None,
1085        };
1086        assert!((composite_score(&metrics)).abs() < 1e-6);
1087    }
1088
1089    #[test]
1090    fn test_composite_score_adapter_penalty() {
1091        let metrics = QualityMetrics {
1092            precision_at_10: 1.0,
1093            ndcg_at_10: 1.0,
1094            mrr: 1.0,
1095            merkle_proofs_valid: 10,
1096            merkle_proofs_invalid: 0,
1097            adapter_perplexity_delta: Some(0.5),
1098        };
1099        let score = composite_score(&metrics);
1100        assert!(
1101            (score - 0.8).abs() < 1e-6,
1102            "Penalized score should be 0.8, got {score}"
1103        );
1104    }
1105
1106    #[test]
1107    fn test_composite_score_adapter_improvement() {
1108        let metrics = QualityMetrics {
1109            precision_at_10: 1.0,
1110            ndcg_at_10: 1.0,
1111            mrr: 1.0,
1112            merkle_proofs_valid: 10,
1113            merkle_proofs_invalid: 0,
1114            adapter_perplexity_delta: Some(-0.5),
1115        };
1116        let score = composite_score(&metrics);
1117        assert!((score - 1.0).abs() < 1e-6);
1118    }
1119
1120    // ---- peer_composite_score ----
1121
1122    #[test]
1123    fn test_peer_composite_score_fast_good_peer() {
1124        let metrics = PeerQualityMetrics {
1125            precision_at_10: 1.0,
1126            ndcg_at_10: 1.0,
1127            mrr: 1.0,
1128            response_latency_ms: 0,
1129            proof_validity_rate: 1.0,
1130            availability_rate: 1.0,
1131        };
1132        let score = peer_composite_score(&metrics);
1133        // quality = 0.4 + 0.2 + 0.1 = 0.7
1134        // reliability = 1.0
1135        // latency_score = 1.0
1136        // 0.7*0.7 + 0.2*1.0 + 0.1*1.0 = 0.49 + 0.2 + 0.1 = 0.79
1137        assert!((score - 0.79).abs() < 0.01, "Got {score}");
1138    }
1139
1140    #[test]
1141    fn test_peer_composite_score_slow_peer() {
1142        let metrics = PeerQualityMetrics {
1143            precision_at_10: 1.0,
1144            ndcg_at_10: 1.0,
1145            mrr: 1.0,
1146            response_latency_ms: 5000,
1147            proof_validity_rate: 1.0,
1148            availability_rate: 1.0,
1149        };
1150        let score = peer_composite_score(&metrics);
1151        assert!((score - 0.69).abs() < 0.01, "Got {score}");
1152    }
1153
1154    #[test]
1155    fn test_peer_composite_score_unreachable() {
1156        let metrics = PeerQualityMetrics {
1157            precision_at_10: 0.0,
1158            ndcg_at_10: 0.0,
1159            mrr: 0.0,
1160            response_latency_ms: 0,
1161            proof_validity_rate: 0.0,
1162            availability_rate: 0.0,
1163        };
1164        let score = peer_composite_score(&metrics);
1165        assert!((score - 0.1).abs() < 0.01, "Got {score}");
1166    }
1167
1168    // ---- update_contributor_ratings ----
1169
1170    #[test]
1171    fn test_update_contributor_ratings_single_result() {
1172        let result = EvaluationResult {
1173            contributor_key: [1u8; 32],
1174            validator_node_id: [0u8; 16],
1175            window_id: 100,
1176            metrics: QualityMetrics {
1177                precision_at_10: 0.8,
1178                ndcg_at_10: 0.7,
1179                mrr: 0.9,
1180                merkle_proofs_valid: 5,
1181                merkle_proofs_invalid: 0,
1182                adapter_perplexity_delta: None,
1183            },
1184            timestamp: 0,
1185            signature: [0u8; 64],
1186        };
1187
1188        let mut ratings = HashMap::new();
1189        update_contributor_ratings(&[result], &mut ratings);
1190        assert!(ratings.is_empty());
1191    }
1192
1193    #[test]
1194    fn test_update_contributor_ratings_two_results() {
1195        let good = EvaluationResult {
1196            contributor_key: [1u8; 32],
1197            validator_node_id: [0u8; 16],
1198            window_id: 100,
1199            metrics: QualityMetrics {
1200                precision_at_10: 0.9,
1201                ndcg_at_10: 0.9,
1202                mrr: 0.9,
1203                merkle_proofs_valid: 10,
1204                merkle_proofs_invalid: 0,
1205                adapter_perplexity_delta: None,
1206            },
1207            timestamp: 0,
1208            signature: [0u8; 64],
1209        };
1210
1211        let bad = EvaluationResult {
1212            contributor_key: [2u8; 32],
1213            validator_node_id: [0u8; 16],
1214            window_id: 100,
1215            metrics: QualityMetrics {
1216                precision_at_10: 0.1,
1217                ndcg_at_10: 0.1,
1218                mrr: 0.1,
1219                merkle_proofs_valid: 1,
1220                merkle_proofs_invalid: 9,
1221                adapter_perplexity_delta: None,
1222            },
1223            timestamp: 0,
1224            signature: [0u8; 64],
1225        };
1226
1227        let mut ratings = HashMap::new();
1228        update_contributor_ratings(&[good, bad], &mut ratings);
1229
1230        let good_rating = ratings.get(&[1u8; 32]).unwrap();
1231        let bad_rating = ratings.get(&[2u8; 32]).unwrap();
1232
1233        assert!(
1234            good_rating.0.mu > bad_rating.0.mu,
1235            "Good contributor mu ({}) should exceed bad contributor mu ({})",
1236            good_rating.0.mu,
1237            bad_rating.0.mu
1238        );
1239    }
1240}