Skip to main content

cp_validator/
lib.rs

1//! Validator protocol for empirical quality verification (CP-015).
2//!
3//! Any Canon node can opt into validation. Validators download content from
4//! Arweave, index it locally, run test queries against it, measure retrieval
5//! quality, and publish OpenSkill Bayesian ratings for content contributors.
6//! These ratings help other nodes prioritize what to download from Arweave.
7//!
8//! Validators also test live search peers by connecting over Tor, sending
9//! test queries, and rating peer quality for circuit selection.
10//!
11//! The approach adapts Templar/Covenant (tplr.ai), which uses empirical
12//! quality verification for decentralized pre-training. OpenSkill Bayesian
13//! ratings provide a robust, game-resistant ranking that converges even
14//! with sparse observations. Window-based coordination ensures validators
15//! test the same content without requiring blockchain synchronization.
16//!
17//! Data flow:
18//!   1. Load test query corpus (from Arweave or local file)
19//!   2. Compute current validation window
20//!   3. Select test queries deterministically from corpus
21//!   4. For each contributor: download from Arweave, index, run queries, measure metrics
22//!   5. Rank contributors by composite score, update OpenSkill ratings pairwise
23//!   6. For each peer: connect over Tor, send queries, measure quality + latency
24//!   7. Rank peers by composite score, update OpenSkill ratings pairwise
25//!   8. Publish results to Arweave
26
27pub mod corpus;
28pub mod evaluation;
29pub mod rating;
30pub mod window;
31
32pub use corpus::{TestCorpus, TestQuery};
33pub use evaluation::{
34    composite_score, evaluate_contributor, evaluate_peer, mrr, ndcg_at_k, peer_composite_score,
35    precision_at_k, update_contributor_ratings, update_peer_ratings, EvaluationResult,
36    PeerQualityMetrics, PeerTestResult, QualityMetrics,
37};
38pub use rating::{pairwise_update, Rating, DEFAULT_MU, DEFAULT_SIGMA};
39pub use window::{select_test_queries, ValidationWindow};
40
41use cp_arweave::ArweaveClient;
42use ed25519_dalek::SigningKey;
43use serde::{Deserialize, Serialize};
44use std::collections::HashMap;
45use std::path::PathBuf;
46use std::sync::Arc;
47use tokio::sync::RwLock;
48use tracing::{error, info, warn};
49
50/// Errors specific to the validator protocol.
51#[derive(Debug, thiserror::Error)]
52pub enum ValidatorError {
53    #[error("Arweave error: {0}")]
54    Arweave(String),
55
56    #[error("No content found for contributor: {0}")]
57    NoContent(String),
58
59    #[error("Corpus error: {0}")]
60    Corpus(String),
61
62    #[error("Tor not available")]
63    TorNotAvailable,
64
65    #[error("Tor error: {0}")]
66    Tor(String),
67
68    #[error("Internal error: {0}")]
69    Internal(String),
70}
71
72/// Configuration for the validator.
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct ValidatorConfig {
75    /// How often to run the evaluation loop (seconds).
76    /// Defaults to 3600 (once per validation window).
77    pub evaluation_interval_secs: u64,
78
79    /// Path to a local test corpus file. If set, this is used instead of
80    /// downloading from Arweave. Useful for bootstrapping and testing.
81    pub corpus_path: Option<PathBuf>,
82
83    /// Number of test queries to select per evaluation window.
84    pub queries_per_window: usize,
85
86    /// Maximum number of contributors to evaluate per window.
87    pub max_contributors_per_window: usize,
88
89    /// Maximum number of peers to test per window.
90    pub max_peers_per_window: usize,
91
92    /// Whether to test live peers over Tor.
93    pub test_peers: bool,
94}
95
96impl Default for ValidatorConfig {
97    fn default() -> Self {
98        Self {
99            evaluation_interval_secs: 3600,
100            corpus_path: None,
101            queries_per_window: 20,
102            max_contributors_per_window: 50,
103            max_peers_per_window: 20,
104            test_peers: true,
105        }
106    }
107}
108
109/// OpenSkill Bayesian rating for a content contributor.
110///
111/// Per CP-015 section 2.
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct ContributorRating {
114    /// Ed25519 public key of the contributor.
115    pub contributor_key: [u8; 32],
116    /// OpenSkill mean skill estimate.
117    pub mu: f64,
118    /// OpenSkill uncertainty.
119    pub sigma: f64,
120    /// Number of times this contributor has been validated.
121    pub validation_count: u64,
122    /// Last updated timestamp (Unix milliseconds).
123    pub last_updated: i64,
124}
125
126impl ContributorRating {
127    /// Create a new contributor rating with default OpenSkill values.
128    pub fn new(contributor_key: [u8; 32]) -> Self {
129        Self {
130            contributor_key,
131            mu: DEFAULT_MU,
132            sigma: DEFAULT_SIGMA,
133            validation_count: 0,
134            last_updated: 0,
135        }
136    }
137
138    /// Conservative score: mu - 2*sigma.
139    pub fn conservative_score(&self) -> f64 {
140        self.mu - 2.0 * self.sigma
141    }
142
143    /// Convert to a Rating struct for the OpenSkill update functions.
144    pub fn to_rating(&self) -> Rating {
145        Rating::new(self.mu, self.sigma)
146    }
147
148    /// Update from a Rating struct after an OpenSkill update.
149    pub fn apply_rating(&mut self, rating: &Rating, now_ms: i64) {
150        self.mu = rating.mu;
151        self.sigma = rating.sigma;
152        self.validation_count += 1;
153        self.last_updated = now_ms;
154    }
155}
156
157/// OpenSkill Bayesian rating for a live search peer.
158///
159/// Per CP-015 section 14. Same OpenSkill model as ContributorRating but
160/// tracked independently. A contributor's content quality (measured by
161/// downloading and indexing) is distinct from their peer quality
162/// (measured by connecting and querying live).
163#[derive(Debug, Clone, Serialize, Deserialize)]
164pub struct PeerRating {
165    /// Node ID of the peer.
166    pub peer_node_id: [u8; 16],
167    /// OpenSkill mean skill estimate.
168    pub mu: f64,
169    /// OpenSkill uncertainty.
170    pub sigma: f64,
171    /// Number of peer tests.
172    pub test_count: u64,
173    /// Last updated timestamp (Unix milliseconds).
174    pub last_updated: i64,
175}
176
177impl PeerRating {
178    /// Create a new peer rating with default OpenSkill values.
179    pub fn new(peer_node_id: [u8; 16]) -> Self {
180        Self {
181            peer_node_id,
182            mu: DEFAULT_MU,
183            sigma: DEFAULT_SIGMA,
184            test_count: 0,
185            last_updated: 0,
186        }
187    }
188
189    /// Conservative score: mu - 2*sigma.
190    pub fn conservative_score(&self) -> f64 {
191        self.mu - 2.0 * self.sigma
192    }
193
194    /// Convert to a Rating struct.
195    pub fn to_rating(&self) -> Rating {
196        Rating::new(self.mu, self.sigma)
197    }
198
199    /// Update from a Rating struct.
200    pub fn apply_rating(&mut self, rating: &Rating, now_ms: i64) {
201        self.mu = rating.mu;
202        self.sigma = rating.sigma;
203        self.test_count += 1;
204        self.last_updated = now_ms;
205    }
206}
207
208/// The main validator that runs evaluation loops.
209///
210/// Orchestrates the full validation pipeline: loading the test corpus,
211/// selecting queries, evaluating contributors, updating ratings.
212/// Peer testing requires a TorRuntime to be passed in.
213pub struct Validator {
214    /// Validator configuration.
215    config: ValidatorConfig,
216
217    /// Arweave client for content download.
218    arweave: Arc<ArweaveClient>,
219
220    /// Ed25519 signing key for this validator node.
221    signing_key: SigningKey,
222
223    /// Node ID derived from the signing key.
224    node_id: [u8; 16],
225
226    /// Contributor ratings indexed by public key.
227    contributor_ratings: Arc<RwLock<HashMap<[u8; 32], ContributorRating>>>,
228
229    /// Peer ratings indexed by node ID.
230    peer_ratings: Arc<RwLock<HashMap<[u8; 16], PeerRating>>>,
231
232    /// The test query corpus.
233    corpus: Arc<RwLock<Option<TestCorpus>>>,
234
235    /// BLAKE3 hash of the embedding model, for compatibility checking.
236    model_hash: [u8; 32],
237}
238
239impl Validator {
240    /// Create a new validator.
241    pub fn new(
242        config: ValidatorConfig,
243        arweave: Arc<ArweaveClient>,
244        signing_key: SigningKey,
245        model_hash: [u8; 32],
246    ) -> Self {
247        let public_key = signing_key.verifying_key().to_bytes();
248        let node_id_hash = blake3::hash(&public_key);
249        let mut node_id = [0u8; 16];
250        node_id.copy_from_slice(&node_id_hash.as_bytes()[..16]);
251
252        Self {
253            config,
254            arweave,
255            signing_key,
256            node_id,
257            contributor_ratings: Arc::new(RwLock::new(HashMap::new())),
258            peer_ratings: Arc::new(RwLock::new(HashMap::new())),
259            corpus: Arc::new(RwLock::new(None)),
260            model_hash,
261        }
262    }
263
264    /// Get a read reference to the contributor ratings.
265    pub fn contributor_ratings(&self) -> &Arc<RwLock<HashMap<[u8; 32], ContributorRating>>> {
266        &self.contributor_ratings
267    }
268
269    /// Get a read reference to the peer ratings.
270    pub fn peer_ratings(&self) -> &Arc<RwLock<HashMap<[u8; 16], PeerRating>>> {
271        &self.peer_ratings
272    }
273
274    /// Get this validator's node ID.
275    pub fn node_id(&self) -> [u8; 16] {
276        self.node_id
277    }
278
279    /// Load or refresh the test corpus.
280    ///
281    /// Uses the local file if configured, otherwise downloads from Arweave.
282    pub async fn load_corpus(&self) -> Result<(), ValidatorError> {
283        let new_corpus = if let Some(ref path) = self.config.corpus_path {
284            TestCorpus::load_from_file(path)?
285        } else {
286            TestCorpus::load_from_arweave(&self.arweave).await?
287        };
288
289        info!(queries = new_corpus.len(), "Loaded test corpus");
290        *self.corpus.write().await = Some(new_corpus);
291        Ok(())
292    }
293
294    /// Run a single evaluation cycle for the current validation window.
295    ///
296    /// This is the core validation loop:
297    /// 1. Ensure corpus is loaded
298    /// 2. Select test queries for the current window
299    /// 3. Discover contributors on Arweave
300    /// 4. Evaluate each contributor
301    /// 5. Update contributor ratings
302    ///
303    /// Returns the evaluation results for optional publication to Arweave.
304    pub async fn run_evaluation_cycle(&self) -> Result<Vec<EvaluationResult>, ValidatorError> {
305        // 1. Ensure corpus is loaded
306        {
307            let corpus_guard = self.corpus.read().await;
308            if corpus_guard.is_none() {
309                drop(corpus_guard);
310                self.load_corpus().await?;
311            }
312        }
313
314        let corpus = self.corpus.read().await;
315        let corpus = corpus.as_ref().ok_or_else(|| {
316            ValidatorError::Corpus("Corpus not loaded".to_string())
317        })?;
318
319        // 2. Select test queries for current window
320        let window = ValidationWindow::current();
321        let test_queries = window.select_test_queries(corpus, self.config.queries_per_window);
322
323        if test_queries.is_empty() {
324            warn!("No test queries selected for window {}", window.window_id);
325            return Ok(Vec::new());
326        }
327
328        info!(
329            window_id = window.window_id,
330            queries = test_queries.len(),
331            "Starting evaluation cycle"
332        );
333
334        // 3. Discover contributors on Arweave
335        let (discovered, _cursor) = cp_arweave::discover_diffs(&self.arweave, None, 100, None)
336            .await
337            .map_err(|e| ValidatorError::Arweave(e.to_string()))?;
338
339        // Extract unique contributor keys
340        let mut contributor_keys: Vec<[u8; 32]> = Vec::new();
341        for diff in &discovered {
342            if let Ok(key_bytes) = hex::decode(&diff.contributor_key) {
343                if key_bytes.len() == 32 {
344                    let mut key = [0u8; 32];
345                    key.copy_from_slice(&key_bytes);
346                    if !contributor_keys.contains(&key) {
347                        contributor_keys.push(key);
348                    }
349                }
350            }
351        }
352
353        contributor_keys.truncate(self.config.max_contributors_per_window);
354
355        info!(
356            contributors = contributor_keys.len(),
357            "Discovered contributors to evaluate"
358        );
359
360        // 4. Evaluate each contributor
361        let mut eval_results = Vec::new();
362        for contributor_key in &contributor_keys {
363            match evaluate_contributor(
364                *contributor_key,
365                &self.arweave,
366                &test_queries,
367                self.node_id,
368                &self.signing_key,
369            )
370            .await
371            {
372                Ok(result) => {
373                    eval_results.push(result);
374                }
375                Err(e) => {
376                    warn!(
377                        contributor = hex::encode(contributor_key),
378                        error = %e,
379                        "Failed to evaluate contributor"
380                    );
381                }
382            }
383        }
384
385        // 5. Update contributor ratings
386        if eval_results.len() >= 2 {
387            let mut raw_ratings: HashMap<[u8; 32], (Rating, u64, i64)> = HashMap::new();
388
389            {
390                let existing = self.contributor_ratings.read().await;
391                for (key, cr) in existing.iter() {
392                    raw_ratings.insert(*key, (cr.to_rating(), cr.validation_count, cr.last_updated));
393                }
394            }
395
396            update_contributor_ratings(&eval_results, &mut raw_ratings);
397
398            {
399                let mut ratings = self.contributor_ratings.write().await;
400                for (key, (r, count, updated)) in &raw_ratings {
401                    let entry = ratings.entry(*key).or_insert_with(|| ContributorRating::new(*key));
402                    entry.mu = r.mu;
403                    entry.sigma = r.sigma;
404                    entry.validation_count = *count;
405                    entry.last_updated = *updated;
406                }
407            }
408
409            info!(
410                evaluated = eval_results.len(),
411                "Updated contributor ratings"
412            );
413        }
414
415        Ok(eval_results)
416    }
417
418    /// Test live peers by connecting over Tor and running test queries.
419    ///
420    /// Requires a running TorRuntime. Returns the peer test results
421    /// for optional publication to Arweave.
422    pub async fn test_live_peers(
423        &self,
424        tor_runtime: &cp_tor::TorRuntime,
425    ) -> Result<Vec<PeerTestResult>, ValidatorError> {
426        // Ensure corpus is loaded
427        let corpus = self.corpus.read().await;
428        let corpus = corpus.as_ref().ok_or_else(|| {
429            ValidatorError::Corpus("Corpus not loaded".to_string())
430        })?;
431
432        let window = ValidationWindow::current();
433        let test_queries = window.select_test_queries(corpus, self.config.queries_per_window);
434
435        // Discover peers on Arweave
436        let model_hash_hex = hex::encode(self.model_hash);
437        let (peers, _cursor) =
438            cp_arweave::discover_peers(&self.arweave, Some(&model_hash_hex), 50, None)
439                .await
440                .map_err(|e| ValidatorError::Arweave(e.to_string()))?;
441
442        let peers_to_test: Vec<_> = peers
443            .into_iter()
444            .take(self.config.max_peers_per_window)
445            .collect();
446
447        info!(peers = peers_to_test.len(), "Testing live peers");
448
449        let mut peer_results = Vec::new();
450        for peer_meta in &peers_to_test {
451            match cp_arweave::download_peer_registration(&self.arweave, peer_meta).await {
452                Ok(registration) => {
453                    match evaluate_peer(
454                        &registration,
455                        &test_queries,
456                        self.node_id,
457                        &self.signing_key,
458                        self.model_hash,
459                        tor_runtime,
460                    )
461                    .await
462                    {
463                        Ok(result) => {
464                            peer_results.push(result);
465                        }
466                        Err(e) => {
467                            warn!(
468                                peer = hex::encode(registration.node_id),
469                                error = %e,
470                                "Failed to test peer"
471                            );
472                        }
473                    }
474                }
475                Err(e) => {
476                    warn!(tx_id = &peer_meta.tx_id, error = %e, "Failed to download peer registration");
477                }
478            }
479        }
480
481        // Update peer ratings
482        if peer_results.len() >= 2 {
483            let mut raw_ratings: HashMap<[u8; 16], (Rating, u64, i64)> = HashMap::new();
484
485            {
486                let existing = self.peer_ratings.read().await;
487                for (key, pr) in existing.iter() {
488                    raw_ratings.insert(*key, (pr.to_rating(), pr.test_count, pr.last_updated));
489                }
490            }
491
492            update_peer_ratings(&peer_results, &mut raw_ratings);
493
494            {
495                let mut ratings = self.peer_ratings.write().await;
496                for (key, (r, count, updated)) in &raw_ratings {
497                    let entry = ratings.entry(*key).or_insert_with(|| PeerRating::new(*key));
498                    entry.mu = r.mu;
499                    entry.sigma = r.sigma;
500                    entry.test_count = *count;
501                    entry.last_updated = *updated;
502                }
503            }
504
505            info!(tested = peer_results.len(), "Updated peer ratings");
506        }
507
508        Ok(peer_results)
509    }
510
511    /// Run the validator loop continuously.
512    ///
513    /// Runs evaluation cycles at the configured interval, sleeping
514    /// between cycles. If peer testing is enabled and a TorRuntime is
515    /// provided, also tests live peers each cycle.
516    pub async fn run_loop(
517        &self,
518        tor_runtime: Option<&cp_tor::TorRuntime>,
519    ) -> Result<(), ValidatorError> {
520        info!(
521            interval_secs = self.config.evaluation_interval_secs,
522            test_peers = self.config.test_peers,
523            "Starting validator loop"
524        );
525
526        loop {
527            // Contributor evaluation
528            match self.run_evaluation_cycle().await {
529                Ok(results) => {
530                    info!(results = results.len(), "Evaluation cycle completed");
531                }
532                Err(e) => {
533                    error!(error = %e, "Evaluation cycle failed");
534                }
535            }
536
537            // Live peer testing (if enabled and Tor is available)
538            if self.config.test_peers {
539                if let Some(runtime) = tor_runtime {
540                    match self.test_live_peers(runtime).await {
541                        Ok(results) => {
542                            info!(peers_tested = results.len(), "Peer testing completed");
543                        }
544                        Err(e) => {
545                            warn!(error = %e, "Peer testing failed");
546                        }
547                    }
548                } else {
549                    warn!("Peer testing enabled but no TorRuntime provided");
550                }
551            }
552
553            tokio::time::sleep(std::time::Duration::from_secs(
554                self.config.evaluation_interval_secs,
555            ))
556            .await;
557        }
558    }
559}
560
561/// Prioritize Arweave downloads by contributor rating.
562///
563/// Per CP-015 section 9: content from well-rated contributors is
564/// downloaded first. Uses conservative score (mu - 2*sigma) so
565/// uncertain contributors are ranked lower than confidently-good ones.
566pub fn prioritize_downloads(
567    available_tx_ids: &[(String, Option<[u8; 32]>)],
568    contributor_ratings: &HashMap<[u8; 32], ContributorRating>,
569) -> Vec<String> {
570    let mut scored: Vec<(&str, f64)> = available_tx_ids
571        .iter()
572        .map(|(tx_id, contributor_key)| {
573            let score = contributor_key
574                .and_then(|key| contributor_ratings.get(&key))
575                .map(|cr| cr.conservative_score())
576                .unwrap_or(DEFAULT_MU - 2.0 * DEFAULT_SIGMA);
577            (tx_id.as_str(), score)
578        })
579        .collect();
580
581    scored.sort_by(|(_, a), (_, b)| b.partial_cmp(a).unwrap_or(std::cmp::Ordering::Equal));
582
583    scored.into_iter().map(|(tx_id, _)| tx_id.to_string()).collect()
584}
585
586#[cfg(test)]
587mod tests {
588    use super::*;
589
590    #[test]
591    fn test_contributor_rating_default() {
592        let cr = ContributorRating::new([1u8; 32]);
593        assert!((cr.mu - 25.0).abs() < 1e-6);
594        assert!((cr.sigma - DEFAULT_SIGMA).abs() < 1e-6);
595        assert_eq!(cr.validation_count, 0);
596    }
597
598    #[test]
599    fn test_contributor_rating_conservative_score() {
600        let cr = ContributorRating {
601            contributor_key: [1u8; 32],
602            mu: 30.0,
603            sigma: 5.0,
604            validation_count: 10,
605            last_updated: 0,
606        };
607        assert!((cr.conservative_score() - 20.0).abs() < 1e-6);
608    }
609
610    #[test]
611    fn test_contributor_rating_apply() {
612        let mut cr = ContributorRating::new([1u8; 32]);
613        let new_rating = Rating::new(28.0, 6.5);
614        cr.apply_rating(&new_rating, 1000);
615
616        assert!((cr.mu - 28.0).abs() < 1e-6);
617        assert!((cr.sigma - 6.5).abs() < 1e-6);
618        assert_eq!(cr.validation_count, 1);
619        assert_eq!(cr.last_updated, 1000);
620    }
621
622    #[test]
623    fn test_peer_rating_default() {
624        let pr = PeerRating::new([1u8; 16]);
625        assert!((pr.mu - 25.0).abs() < 1e-6);
626        assert!((pr.sigma - DEFAULT_SIGMA).abs() < 1e-6);
627        assert_eq!(pr.test_count, 0);
628    }
629
630    #[test]
631    fn test_peer_rating_apply() {
632        let mut pr = PeerRating::new([1u8; 16]);
633        let new_rating = Rating::new(27.0, 7.0);
634        pr.apply_rating(&new_rating, 2000);
635
636        assert!((pr.mu - 27.0).abs() < 1e-6);
637        assert!((pr.sigma - 7.0).abs() < 1e-6);
638        assert_eq!(pr.test_count, 1);
639        assert_eq!(pr.last_updated, 2000);
640    }
641
642    #[test]
643    fn test_validator_config_default() {
644        let config = ValidatorConfig::default();
645        assert_eq!(config.evaluation_interval_secs, 3600);
646        assert_eq!(config.queries_per_window, 20);
647        assert_eq!(config.max_contributors_per_window, 50);
648        assert_eq!(config.max_peers_per_window, 20);
649        assert!(config.test_peers);
650    }
651
652    #[test]
653    fn test_prioritize_downloads_rated() {
654        let mut ratings = HashMap::new();
655
656        ratings.insert(
657            [1u8; 32],
658            ContributorRating {
659                contributor_key: [1u8; 32],
660                mu: 35.0,
661                sigma: 3.0,
662                validation_count: 20,
663                last_updated: 0,
664            },
665        );
666
667        ratings.insert(
668            [2u8; 32],
669            ContributorRating {
670                contributor_key: [2u8; 32],
671                mu: 15.0,
672                sigma: 3.0,
673                validation_count: 20,
674                last_updated: 0,
675            },
676        );
677
678        let txs = vec![
679            ("tx_bad".to_string(), Some([2u8; 32])),
680            ("tx_good".to_string(), Some([1u8; 32])),
681            ("tx_unknown".to_string(), None),
682        ];
683
684        let prioritized = prioritize_downloads(&txs, &ratings);
685
686        assert_eq!(prioritized[0], "tx_good");
687        // Bad contributor has conservative_score = 15.0 - 6.0 = 9.0
688        // Unknown contributor has default = 25.0 - 16.666 = 8.333
689        // So bad comes before unknown
690        assert_eq!(prioritized[1], "tx_bad");
691        assert_eq!(prioritized[2], "tx_unknown");
692    }
693
694    #[test]
695    fn test_prioritize_downloads_empty() {
696        let ratings = HashMap::new();
697        let txs: Vec<(String, Option<[u8; 32]>)> = Vec::new();
698        let prioritized = prioritize_downloads(&txs, &ratings);
699        assert!(prioritized.is_empty());
700    }
701
702    #[test]
703    fn test_prioritize_downloads_all_unrated() {
704        let ratings = HashMap::new();
705        let txs = vec![
706            ("tx1".to_string(), Some([1u8; 32])),
707            ("tx2".to_string(), Some([2u8; 32])),
708        ];
709
710        let prioritized = prioritize_downloads(&txs, &ratings);
711        assert_eq!(prioritized.len(), 2);
712    }
713}