Skip to main content

cp_validator/
lib.rs

1//! Validator protocol for empirical quality verification (CP-015).
2//!
3//! Any Canon node can opt into validation. Validators download content from
4//! Arweave, index it locally, run test queries against it, measure retrieval
5//! quality, and publish `OpenSkill` Bayesian ratings for content contributors.
6//! These ratings help other nodes prioritize what to download from Arweave.
7//!
8//! Validators also test live search peers by connecting over Tor, sending
9//! test queries, and rating peer quality for circuit selection.
10//!
11//! The approach adapts Templar/Covenant (tplr.ai), which uses empirical
12//! quality verification for decentralized pre-training. `OpenSkill` Bayesian
13//! ratings provide a robust, game-resistant ranking that converges even
14//! with sparse observations. Window-based coordination ensures validators
15//! test the same content without requiring blockchain synchronization.
16//!
17//! Data flow:
18//!   1. Load test query corpus (from Arweave or local file)
19//!   2. Compute current validation window
20//!   3. Select test queries deterministically from corpus
21//!   4. For each contributor: download from Arweave, index, run queries, measure metrics
22//!   5. Rank contributors by composite score, update `OpenSkill` ratings pairwise
23//!   6. For each peer: connect over Tor, send queries, measure quality + latency
24//!   7. Rank peers by composite score, update `OpenSkill` ratings pairwise
25//!   8. Publish results to Arweave
26
27pub mod corpus;
28pub mod evaluation;
29pub mod rating;
30pub mod window;
31
32pub use corpus::{TestCorpus, TestQuery};
33pub use evaluation::{
34    composite_score, evaluate_contributor, evaluate_local_graph, evaluate_peer, mrr, ndcg_at_k,
35    peer_composite_score, precision_at_k, update_contributor_ratings, update_peer_ratings,
36    EvaluationResult, PeerQualityMetrics, PeerTestResult, QualityMetrics,
37};
38pub use rating::{pairwise_update, Rating, DEFAULT_MU, DEFAULT_SIGMA};
39pub use window::{select_test_queries, ValidationWindow};
40
41use cp_arweave::ArweaveClient;
42use ed25519_dalek::SigningKey;
43use serde::{Deserialize, Serialize};
44use std::collections::HashMap;
45use std::path::PathBuf;
46use std::sync::Arc;
47use tokio::sync::RwLock;
48use tracing::{error, info, warn};
49
50/// Errors specific to the validator protocol.
51#[derive(Debug, thiserror::Error)]
52pub enum ValidatorError {
53    #[error("Arweave error: {0}")]
54    Arweave(String),
55
56    #[error("No content found for contributor: {0}")]
57    NoContent(String),
58
59    #[error("Corpus error: {0}")]
60    Corpus(String),
61
62    #[error("Tor not available")]
63    TorNotAvailable,
64
65    #[error("Tor error: {0}")]
66    Tor(String),
67
68    #[error("Internal error: {0}")]
69    Internal(String),
70}
71
72/// Configuration for the validator.
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct ValidatorConfig {
75    /// How often to run the evaluation loop (seconds).
76    /// Defaults to 3600 (once per validation window).
77    pub evaluation_interval_secs: u64,
78
79    /// Path to a local test corpus file. If set, this is used instead of
80    /// downloading from Arweave. Useful for bootstrapping and testing.
81    pub corpus_path: Option<PathBuf>,
82
83    /// Number of test queries to select per evaluation window.
84    pub queries_per_window: usize,
85
86    /// Maximum number of contributors to evaluate per window.
87    pub max_contributors_per_window: usize,
88
89    /// Maximum number of peers to test per window.
90    pub max_peers_per_window: usize,
91
92    /// Whether to test live peers over Tor.
93    pub test_peers: bool,
94}
95
96impl Default for ValidatorConfig {
97    fn default() -> Self {
98        Self {
99            evaluation_interval_secs: 3600,
100            corpus_path: None,
101            queries_per_window: 20,
102            max_contributors_per_window: 50,
103            max_peers_per_window: 20,
104            test_peers: true,
105        }
106    }
107}
108
109/// `OpenSkill` Bayesian rating for a content contributor.
110///
111/// Per CP-015 section 2.
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct ContributorRating {
114    /// Ed25519 public key of the contributor.
115    pub contributor_key: [u8; 32],
116    /// `OpenSkill` mean skill estimate.
117    pub mu: f64,
118    /// `OpenSkill` uncertainty.
119    pub sigma: f64,
120    /// Number of times this contributor has been validated.
121    pub validation_count: u64,
122    /// Last updated timestamp (Unix milliseconds).
123    pub last_updated: i64,
124}
125
126impl ContributorRating {
127    /// Create a new contributor rating with default `OpenSkill` values.
128    pub fn new(contributor_key: [u8; 32]) -> Self {
129        Self {
130            contributor_key,
131            mu: DEFAULT_MU,
132            sigma: DEFAULT_SIGMA,
133            validation_count: 0,
134            last_updated: 0,
135        }
136    }
137
138    /// Conservative score: mu - 2*sigma.
139    pub fn conservative_score(&self) -> f64 {
140        self.mu - 2.0 * self.sigma
141    }
142
143    /// Convert to a Rating struct for the `OpenSkill` update functions.
144    pub fn to_rating(&self) -> Rating {
145        Rating::new(self.mu, self.sigma)
146    }
147
148    /// Update from a Rating struct after an `OpenSkill` update.
149    pub fn apply_rating(&mut self, rating: &Rating, now_ms: i64) {
150        self.mu = rating.mu;
151        self.sigma = rating.sigma;
152        self.validation_count += 1;
153        self.last_updated = now_ms;
154    }
155}
156
157/// `OpenSkill` Bayesian rating for a live search peer.
158///
159/// Per CP-015 section 14. Same `OpenSkill` model as `ContributorRating` but
160/// tracked independently. A contributor's content quality (measured by
161/// downloading and indexing) is distinct from their peer quality
162/// (measured by connecting and querying live).
163#[derive(Debug, Clone, Serialize, Deserialize)]
164pub struct PeerRating {
165    /// Node ID of the peer.
166    pub peer_node_id: [u8; 16],
167    /// `OpenSkill` mean skill estimate.
168    pub mu: f64,
169    /// `OpenSkill` uncertainty.
170    pub sigma: f64,
171    /// Number of peer tests.
172    pub test_count: u64,
173    /// Last updated timestamp (Unix milliseconds).
174    pub last_updated: i64,
175}
176
177impl PeerRating {
178    /// Create a new peer rating with default `OpenSkill` values.
179    pub fn new(peer_node_id: [u8; 16]) -> Self {
180        Self {
181            peer_node_id,
182            mu: DEFAULT_MU,
183            sigma: DEFAULT_SIGMA,
184            test_count: 0,
185            last_updated: 0,
186        }
187    }
188
189    /// Conservative score: mu - 2*sigma.
190    pub fn conservative_score(&self) -> f64 {
191        self.mu - 2.0 * self.sigma
192    }
193
194    /// Convert to a Rating struct.
195    pub fn to_rating(&self) -> Rating {
196        Rating::new(self.mu, self.sigma)
197    }
198
199    /// Update from a Rating struct.
200    pub fn apply_rating(&mut self, rating: &Rating, now_ms: i64) {
201        self.mu = rating.mu;
202        self.sigma = rating.sigma;
203        self.test_count += 1;
204        self.last_updated = now_ms;
205    }
206}
207
208/// The main validator that runs evaluation loops.
209///
210/// Orchestrates the full validation pipeline: loading the test corpus,
211/// selecting queries, evaluating contributors, updating ratings.
212/// Peer testing requires a `TorRuntime` to be passed in.
213pub struct Validator {
214    /// Validator configuration.
215    config: ValidatorConfig,
216
217    /// Arweave client for content download.
218    arweave: Arc<ArweaveClient>,
219
220    /// Ed25519 signing key for this validator node.
221    signing_key: SigningKey,
222
223    /// Node ID derived from the signing key.
224    node_id: [u8; 16],
225
226    /// Contributor ratings indexed by public key.
227    contributor_ratings: Arc<RwLock<HashMap<[u8; 32], ContributorRating>>>,
228
229    /// Peer ratings indexed by node ID.
230    peer_ratings: Arc<RwLock<HashMap<[u8; 16], PeerRating>>>,
231
232    /// The test query corpus.
233    corpus: Arc<RwLock<Option<TestCorpus>>>,
234
235    /// BLAKE3 hash of the embedding model, for compatibility checking.
236    model_hash: [u8; 32],
237}
238
239impl Validator {
240    /// Create a new validator.
241    pub fn new(
242        config: ValidatorConfig,
243        arweave: Arc<ArweaveClient>,
244        signing_key: SigningKey,
245        model_hash: [u8; 32],
246    ) -> Self {
247        let public_key = signing_key.verifying_key().to_bytes();
248        let node_id_hash = blake3::hash(&public_key);
249        let mut node_id = [0u8; 16];
250        node_id.copy_from_slice(&node_id_hash.as_bytes()[..16]);
251
252        Self {
253            config,
254            arweave,
255            signing_key,
256            node_id,
257            contributor_ratings: Arc::new(RwLock::new(HashMap::new())),
258            peer_ratings: Arc::new(RwLock::new(HashMap::new())),
259            corpus: Arc::new(RwLock::new(None)),
260            model_hash,
261        }
262    }
263
264    /// Get a read reference to the contributor ratings.
265    pub fn contributor_ratings(&self) -> &Arc<RwLock<HashMap<[u8; 32], ContributorRating>>> {
266        &self.contributor_ratings
267    }
268
269    /// Get a read reference to the peer ratings.
270    pub fn peer_ratings(&self) -> &Arc<RwLock<HashMap<[u8; 16], PeerRating>>> {
271        &self.peer_ratings
272    }
273
274    /// Get this validator's node ID.
275    pub fn node_id(&self) -> [u8; 16] {
276        self.node_id
277    }
278
279    /// Load or refresh the test corpus.
280    ///
281    /// Uses the local file if configured, otherwise downloads from Arweave.
282    pub async fn load_corpus(&self) -> Result<(), ValidatorError> {
283        let new_corpus = if let Some(ref path) = self.config.corpus_path {
284            TestCorpus::load_from_file(path)?
285        } else {
286            TestCorpus::load_from_arweave(&self.arweave).await?
287        };
288
289        info!(queries = new_corpus.len(), "Loaded test corpus");
290        *self.corpus.write().await = Some(new_corpus);
291        Ok(())
292    }
293
294    /// Run a single evaluation cycle for the current validation window.
295    ///
296    /// This is the core validation loop:
297    /// 1. Ensure corpus is loaded
298    /// 2. Select test queries for the current window
299    /// 3. Discover contributors on Arweave
300    /// 4. Evaluate each contributor
301    /// 5. Update contributor ratings
302    ///
303    /// Returns the evaluation results for optional publication to Arweave.
304    pub async fn run_evaluation_cycle(&self) -> Result<Vec<EvaluationResult>, ValidatorError> {
305        // 1. Ensure corpus is loaded
306        {
307            let corpus_guard = self.corpus.read().await;
308            if corpus_guard.is_none() {
309                drop(corpus_guard);
310                self.load_corpus().await?;
311            }
312        }
313
314        let corpus = self.corpus.read().await;
315        let corpus = corpus
316            .as_ref()
317            .ok_or_else(|| ValidatorError::Corpus("Corpus not loaded".to_string()))?;
318
319        // 2. Select test queries for current window
320        let window = ValidationWindow::current();
321        let test_queries = window.select_test_queries(corpus, self.config.queries_per_window);
322
323        if test_queries.is_empty() {
324            warn!("No test queries selected for window {}", window.window_id);
325            return Ok(Vec::new());
326        }
327
328        info!(
329            window_id = window.window_id,
330            queries = test_queries.len(),
331            "Starting evaluation cycle"
332        );
333
334        // 3. Discover contributors on Arweave
335        let (discovered, _cursor) = cp_arweave::discover_diffs(&self.arweave, None, 100, None)
336            .await
337            .map_err(|e| ValidatorError::Arweave(e.to_string()))?;
338
339        // Extract unique contributor keys
340        let mut contributor_keys: Vec<[u8; 32]> = Vec::new();
341        for diff in &discovered {
342            if let Ok(key_bytes) = hex::decode(&diff.contributor_key) {
343                if key_bytes.len() == 32 {
344                    let mut key = [0u8; 32];
345                    key.copy_from_slice(&key_bytes);
346                    if !contributor_keys.contains(&key) {
347                        contributor_keys.push(key);
348                    }
349                }
350            }
351        }
352
353        contributor_keys.truncate(self.config.max_contributors_per_window);
354
355        info!(
356            contributors = contributor_keys.len(),
357            "Discovered contributors to evaluate"
358        );
359
360        // 4. Evaluate each contributor
361        let mut eval_results = Vec::new();
362        for contributor_key in &contributor_keys {
363            match evaluate_contributor(
364                *contributor_key,
365                &self.arweave,
366                &test_queries,
367                self.node_id,
368                &self.signing_key,
369            )
370            .await
371            {
372                Ok(result) => {
373                    eval_results.push(result);
374                }
375                Err(e) => {
376                    warn!(
377                        contributor = hex::encode(contributor_key),
378                        error = %e,
379                        "Failed to evaluate contributor"
380                    );
381                }
382            }
383        }
384
385        // 5. Update contributor ratings
386        if eval_results.len() >= 2 {
387            let mut raw_ratings: HashMap<[u8; 32], (Rating, u64, i64)> = HashMap::new();
388
389            {
390                let existing = self.contributor_ratings.read().await;
391                for (key, cr) in existing.iter() {
392                    raw_ratings
393                        .insert(*key, (cr.to_rating(), cr.validation_count, cr.last_updated));
394                }
395            }
396
397            update_contributor_ratings(&eval_results, &mut raw_ratings);
398
399            {
400                let mut ratings = self.contributor_ratings.write().await;
401                for (key, (r, count, updated)) in &raw_ratings {
402                    let entry = ratings
403                        .entry(*key)
404                        .or_insert_with(|| ContributorRating::new(*key));
405                    entry.mu = r.mu;
406                    entry.sigma = r.sigma;
407                    entry.validation_count = *count;
408                    entry.last_updated = *updated;
409                }
410            }
411
412            info!(
413                evaluated = eval_results.len(),
414                "Updated contributor ratings"
415            );
416        }
417
418        Ok(eval_results)
419    }
420
421    /// Test live peers by connecting over Tor and running test queries.
422    ///
423    /// Requires a running `TorRuntime`. Returns the peer test results
424    /// for optional publication to Arweave.
425    pub async fn test_live_peers(
426        &self,
427        tor_runtime: &cp_tor::TorRuntime,
428    ) -> Result<Vec<PeerTestResult>, ValidatorError> {
429        // Ensure corpus is loaded
430        let corpus = self.corpus.read().await;
431        let corpus = corpus
432            .as_ref()
433            .ok_or_else(|| ValidatorError::Corpus("Corpus not loaded".to_string()))?;
434
435        let window = ValidationWindow::current();
436        let test_queries = window.select_test_queries(corpus, self.config.queries_per_window);
437
438        // Discover peers on Arweave
439        let model_hash_hex = hex::encode(self.model_hash);
440        let (peers, _cursor) =
441            cp_arweave::discover_peers(&self.arweave, Some(&model_hash_hex), 50, None)
442                .await
443                .map_err(|e| ValidatorError::Arweave(e.to_string()))?;
444
445        let peers_to_test: Vec<_> = peers
446            .into_iter()
447            .take(self.config.max_peers_per_window)
448            .collect();
449
450        info!(peers = peers_to_test.len(), "Testing live peers");
451
452        let mut peer_results = Vec::new();
453        for peer_meta in &peers_to_test {
454            match cp_arweave::download_peer_registration(&self.arweave, peer_meta).await {
455                Ok(registration) => {
456                    match evaluate_peer(
457                        &registration,
458                        &test_queries,
459                        self.node_id,
460                        &self.signing_key,
461                        self.model_hash,
462                        tor_runtime,
463                    )
464                    .await
465                    {
466                        Ok(result) => {
467                            peer_results.push(result);
468                        }
469                        Err(e) => {
470                            warn!(
471                                peer = hex::encode(registration.node_id),
472                                error = %e,
473                                "Failed to test peer"
474                            );
475                        }
476                    }
477                }
478                Err(e) => {
479                    warn!(tx_id = &peer_meta.tx_id, error = %e, "Failed to download peer registration");
480                }
481            }
482        }
483
484        // Update peer ratings
485        if peer_results.len() >= 2 {
486            let mut raw_ratings: HashMap<[u8; 16], (Rating, u64, i64)> = HashMap::new();
487
488            {
489                let existing = self.peer_ratings.read().await;
490                for (key, pr) in existing.iter() {
491                    raw_ratings.insert(*key, (pr.to_rating(), pr.test_count, pr.last_updated));
492                }
493            }
494
495            update_peer_ratings(&peer_results, &mut raw_ratings);
496
497            {
498                let mut ratings = self.peer_ratings.write().await;
499                for (key, (r, count, updated)) in &raw_ratings {
500                    let entry = ratings.entry(*key).or_insert_with(|| PeerRating::new(*key));
501                    entry.mu = r.mu;
502                    entry.sigma = r.sigma;
503                    entry.test_count = *count;
504                    entry.last_updated = *updated;
505                }
506            }
507
508            info!(tested = peer_results.len(), "Updated peer ratings");
509        }
510
511        Ok(peer_results)
512    }
513
514    /// Run the validator loop continuously.
515    ///
516    /// Runs evaluation cycles at the configured interval, sleeping
517    /// between cycles. If peer testing is enabled and a `TorRuntime` is
518    /// provided, also tests live peers each cycle.
519    pub async fn run_loop(
520        &self,
521        tor_runtime: Option<&cp_tor::TorRuntime>,
522    ) -> Result<(), ValidatorError> {
523        info!(
524            interval_secs = self.config.evaluation_interval_secs,
525            test_peers = self.config.test_peers,
526            "Starting validator loop"
527        );
528
529        loop {
530            // Contributor evaluation
531            match self.run_evaluation_cycle().await {
532                Ok(results) => {
533                    info!(results = results.len(), "Evaluation cycle completed");
534                }
535                Err(e) => {
536                    error!(error = %e, "Evaluation cycle failed");
537                }
538            }
539
540            // Live peer testing (if enabled and Tor is available)
541            if self.config.test_peers {
542                if let Some(runtime) = tor_runtime {
543                    match self.test_live_peers(runtime).await {
544                        Ok(results) => {
545                            info!(peers_tested = results.len(), "Peer testing completed");
546                        }
547                        Err(e) => {
548                            warn!(error = %e, "Peer testing failed");
549                        }
550                    }
551                } else {
552                    warn!("Peer testing enabled but no TorRuntime provided");
553                }
554            }
555
556            tokio::time::sleep(std::time::Duration::from_secs(
557                self.config.evaluation_interval_secs,
558            ))
559            .await;
560        }
561    }
562}
563
564/// Prioritize Arweave downloads by contributor rating.
565///
566/// Per CP-015 section 9: content from well-rated contributors is
567/// downloaded first. Uses conservative score (mu - 2*sigma) so
568/// uncertain contributors are ranked lower than confidently-good ones.
569pub fn prioritize_downloads(
570    available_tx_ids: &[(String, Option<[u8; 32]>)],
571    contributor_ratings: &HashMap<[u8; 32], ContributorRating>,
572) -> Vec<String> {
573    let mut scored: Vec<(&str, f64)> = available_tx_ids
574        .iter()
575        .map(|(tx_id, contributor_key)| {
576            let score = contributor_key
577                .and_then(|key| contributor_ratings.get(&key))
578                .map_or(
579                    DEFAULT_MU - 2.0 * DEFAULT_SIGMA,
580                    ContributorRating::conservative_score,
581                );
582            (tx_id.as_str(), score)
583        })
584        .collect();
585
586    scored.sort_by(|(_, a), (_, b)| b.partial_cmp(a).unwrap_or(std::cmp::Ordering::Equal));
587
588    scored
589        .into_iter()
590        .map(|(tx_id, _)| tx_id.to_string())
591        .collect()
592}
593
594#[cfg(test)]
595mod tests {
596    use super::*;
597
598    #[test]
599    fn test_contributor_rating_default() {
600        let cr = ContributorRating::new([1u8; 32]);
601        assert!((cr.mu - 25.0).abs() < 1e-6);
602        assert!((cr.sigma - DEFAULT_SIGMA).abs() < 1e-6);
603        assert_eq!(cr.validation_count, 0);
604    }
605
606    #[test]
607    fn test_contributor_rating_conservative_score() {
608        let cr = ContributorRating {
609            contributor_key: [1u8; 32],
610            mu: 30.0,
611            sigma: 5.0,
612            validation_count: 10,
613            last_updated: 0,
614        };
615        assert!((cr.conservative_score() - 20.0).abs() < 1e-6);
616    }
617
618    #[test]
619    fn test_contributor_rating_apply() {
620        let mut cr = ContributorRating::new([1u8; 32]);
621        let new_rating = Rating::new(28.0, 6.5);
622        cr.apply_rating(&new_rating, 1000);
623
624        assert!((cr.mu - 28.0).abs() < 1e-6);
625        assert!((cr.sigma - 6.5).abs() < 1e-6);
626        assert_eq!(cr.validation_count, 1);
627        assert_eq!(cr.last_updated, 1000);
628    }
629
630    #[test]
631    fn test_peer_rating_default() {
632        let pr = PeerRating::new([1u8; 16]);
633        assert!((pr.mu - 25.0).abs() < 1e-6);
634        assert!((pr.sigma - DEFAULT_SIGMA).abs() < 1e-6);
635        assert_eq!(pr.test_count, 0);
636    }
637
638    #[test]
639    fn test_peer_rating_apply() {
640        let mut pr = PeerRating::new([1u8; 16]);
641        let new_rating = Rating::new(27.0, 7.0);
642        pr.apply_rating(&new_rating, 2000);
643
644        assert!((pr.mu - 27.0).abs() < 1e-6);
645        assert!((pr.sigma - 7.0).abs() < 1e-6);
646        assert_eq!(pr.test_count, 1);
647        assert_eq!(pr.last_updated, 2000);
648    }
649
650    #[test]
651    fn test_validator_config_default() {
652        let config = ValidatorConfig::default();
653        assert_eq!(config.evaluation_interval_secs, 3600);
654        assert_eq!(config.queries_per_window, 20);
655        assert_eq!(config.max_contributors_per_window, 50);
656        assert_eq!(config.max_peers_per_window, 20);
657        assert!(config.test_peers);
658    }
659
660    #[test]
661    fn test_prioritize_downloads_rated() {
662        let mut ratings = HashMap::new();
663
664        ratings.insert(
665            [1u8; 32],
666            ContributorRating {
667                contributor_key: [1u8; 32],
668                mu: 35.0,
669                sigma: 3.0,
670                validation_count: 20,
671                last_updated: 0,
672            },
673        );
674
675        ratings.insert(
676            [2u8; 32],
677            ContributorRating {
678                contributor_key: [2u8; 32],
679                mu: 15.0,
680                sigma: 3.0,
681                validation_count: 20,
682                last_updated: 0,
683            },
684        );
685
686        let txs = vec![
687            ("tx_bad".to_string(), Some([2u8; 32])),
688            ("tx_good".to_string(), Some([1u8; 32])),
689            ("tx_unknown".to_string(), None),
690        ];
691
692        let prioritized = prioritize_downloads(&txs, &ratings);
693
694        assert_eq!(prioritized[0], "tx_good");
695        // Bad contributor has conservative_score = 15.0 - 6.0 = 9.0
696        // Unknown contributor has default = 25.0 - 16.666 = 8.333
697        // So bad comes before unknown
698        assert_eq!(prioritized[1], "tx_bad");
699        assert_eq!(prioritized[2], "tx_unknown");
700    }
701
702    #[test]
703    fn test_prioritize_downloads_empty() {
704        let ratings = HashMap::new();
705        let txs: Vec<(String, Option<[u8; 32]>)> = Vec::new();
706        let prioritized = prioritize_downloads(&txs, &ratings);
707        assert!(prioritized.is_empty());
708    }
709
710    #[test]
711    fn test_prioritize_downloads_all_unrated() {
712        let ratings = HashMap::new();
713        let txs = vec![
714            ("tx1".to_string(), Some([1u8; 32])),
715            ("tx2".to_string(), Some([2u8; 32])),
716        ];
717
718        let prioritized = prioritize_downloads(&txs, &ratings);
719        assert_eq!(prioritized.len(), 2);
720    }
721}