1pub mod corpus;
28pub mod evaluation;
29pub mod rating;
30pub mod window;
31
32pub use corpus::{TestCorpus, TestQuery};
33pub use evaluation::{
34 composite_score, evaluate_contributor, evaluate_peer, mrr, ndcg_at_k, peer_composite_score,
35 precision_at_k, update_contributor_ratings, update_peer_ratings, EvaluationResult,
36 PeerQualityMetrics, PeerTestResult, QualityMetrics,
37};
38pub use rating::{pairwise_update, Rating, DEFAULT_MU, DEFAULT_SIGMA};
39pub use window::{select_test_queries, ValidationWindow};
40
41use cp_arweave::ArweaveClient;
42use ed25519_dalek::SigningKey;
43use serde::{Deserialize, Serialize};
44use std::collections::HashMap;
45use std::path::PathBuf;
46use std::sync::Arc;
47use tokio::sync::RwLock;
48use tracing::{error, info, warn};
49
50#[derive(Debug, thiserror::Error)]
52pub enum ValidatorError {
53 #[error("Arweave error: {0}")]
54 Arweave(String),
55
56 #[error("No content found for contributor: {0}")]
57 NoContent(String),
58
59 #[error("Corpus error: {0}")]
60 Corpus(String),
61
62 #[error("Tor not available")]
63 TorNotAvailable,
64
65 #[error("Tor error: {0}")]
66 Tor(String),
67
68 #[error("Internal error: {0}")]
69 Internal(String),
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct ValidatorConfig {
75 pub evaluation_interval_secs: u64,
78
79 pub corpus_path: Option<PathBuf>,
82
83 pub queries_per_window: usize,
85
86 pub max_contributors_per_window: usize,
88
89 pub max_peers_per_window: usize,
91
92 pub test_peers: bool,
94}
95
96impl Default for ValidatorConfig {
97 fn default() -> Self {
98 Self {
99 evaluation_interval_secs: 3600,
100 corpus_path: None,
101 queries_per_window: 20,
102 max_contributors_per_window: 50,
103 max_peers_per_window: 20,
104 test_peers: true,
105 }
106 }
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct ContributorRating {
114 pub contributor_key: [u8; 32],
116 pub mu: f64,
118 pub sigma: f64,
120 pub validation_count: u64,
122 pub last_updated: i64,
124}
125
126impl ContributorRating {
127 pub fn new(contributor_key: [u8; 32]) -> Self {
129 Self {
130 contributor_key,
131 mu: DEFAULT_MU,
132 sigma: DEFAULT_SIGMA,
133 validation_count: 0,
134 last_updated: 0,
135 }
136 }
137
138 pub fn conservative_score(&self) -> f64 {
140 self.mu - 2.0 * self.sigma
141 }
142
143 pub fn to_rating(&self) -> Rating {
145 Rating::new(self.mu, self.sigma)
146 }
147
148 pub fn apply_rating(&mut self, rating: &Rating, now_ms: i64) {
150 self.mu = rating.mu;
151 self.sigma = rating.sigma;
152 self.validation_count += 1;
153 self.last_updated = now_ms;
154 }
155}
156
157#[derive(Debug, Clone, Serialize, Deserialize)]
164pub struct PeerRating {
165 pub peer_node_id: [u8; 16],
167 pub mu: f64,
169 pub sigma: f64,
171 pub test_count: u64,
173 pub last_updated: i64,
175}
176
177impl PeerRating {
178 pub fn new(peer_node_id: [u8; 16]) -> Self {
180 Self {
181 peer_node_id,
182 mu: DEFAULT_MU,
183 sigma: DEFAULT_SIGMA,
184 test_count: 0,
185 last_updated: 0,
186 }
187 }
188
189 pub fn conservative_score(&self) -> f64 {
191 self.mu - 2.0 * self.sigma
192 }
193
194 pub fn to_rating(&self) -> Rating {
196 Rating::new(self.mu, self.sigma)
197 }
198
199 pub fn apply_rating(&mut self, rating: &Rating, now_ms: i64) {
201 self.mu = rating.mu;
202 self.sigma = rating.sigma;
203 self.test_count += 1;
204 self.last_updated = now_ms;
205 }
206}
207
208pub struct Validator {
214 config: ValidatorConfig,
216
217 arweave: Arc<ArweaveClient>,
219
220 signing_key: SigningKey,
222
223 node_id: [u8; 16],
225
226 contributor_ratings: Arc<RwLock<HashMap<[u8; 32], ContributorRating>>>,
228
229 peer_ratings: Arc<RwLock<HashMap<[u8; 16], PeerRating>>>,
231
232 corpus: Arc<RwLock<Option<TestCorpus>>>,
234
235 model_hash: [u8; 32],
237}
238
239impl Validator {
240 pub fn new(
242 config: ValidatorConfig,
243 arweave: Arc<ArweaveClient>,
244 signing_key: SigningKey,
245 model_hash: [u8; 32],
246 ) -> Self {
247 let public_key = signing_key.verifying_key().to_bytes();
248 let node_id_hash = blake3::hash(&public_key);
249 let mut node_id = [0u8; 16];
250 node_id.copy_from_slice(&node_id_hash.as_bytes()[..16]);
251
252 Self {
253 config,
254 arweave,
255 signing_key,
256 node_id,
257 contributor_ratings: Arc::new(RwLock::new(HashMap::new())),
258 peer_ratings: Arc::new(RwLock::new(HashMap::new())),
259 corpus: Arc::new(RwLock::new(None)),
260 model_hash,
261 }
262 }
263
264 pub fn contributor_ratings(&self) -> &Arc<RwLock<HashMap<[u8; 32], ContributorRating>>> {
266 &self.contributor_ratings
267 }
268
269 pub fn peer_ratings(&self) -> &Arc<RwLock<HashMap<[u8; 16], PeerRating>>> {
271 &self.peer_ratings
272 }
273
274 pub fn node_id(&self) -> [u8; 16] {
276 self.node_id
277 }
278
279 pub async fn load_corpus(&self) -> Result<(), ValidatorError> {
283 let new_corpus = if let Some(ref path) = self.config.corpus_path {
284 TestCorpus::load_from_file(path)?
285 } else {
286 TestCorpus::load_from_arweave(&self.arweave).await?
287 };
288
289 info!(queries = new_corpus.len(), "Loaded test corpus");
290 *self.corpus.write().await = Some(new_corpus);
291 Ok(())
292 }
293
294 pub async fn run_evaluation_cycle(&self) -> Result<Vec<EvaluationResult>, ValidatorError> {
305 {
307 let corpus_guard = self.corpus.read().await;
308 if corpus_guard.is_none() {
309 drop(corpus_guard);
310 self.load_corpus().await?;
311 }
312 }
313
314 let corpus = self.corpus.read().await;
315 let corpus = corpus.as_ref().ok_or_else(|| {
316 ValidatorError::Corpus("Corpus not loaded".to_string())
317 })?;
318
319 let window = ValidationWindow::current();
321 let test_queries = window.select_test_queries(corpus, self.config.queries_per_window);
322
323 if test_queries.is_empty() {
324 warn!("No test queries selected for window {}", window.window_id);
325 return Ok(Vec::new());
326 }
327
328 info!(
329 window_id = window.window_id,
330 queries = test_queries.len(),
331 "Starting evaluation cycle"
332 );
333
334 let (discovered, _cursor) = cp_arweave::discover_diffs(&self.arweave, None, 100, None)
336 .await
337 .map_err(|e| ValidatorError::Arweave(e.to_string()))?;
338
339 let mut contributor_keys: Vec<[u8; 32]> = Vec::new();
341 for diff in &discovered {
342 if let Ok(key_bytes) = hex::decode(&diff.contributor_key) {
343 if key_bytes.len() == 32 {
344 let mut key = [0u8; 32];
345 key.copy_from_slice(&key_bytes);
346 if !contributor_keys.contains(&key) {
347 contributor_keys.push(key);
348 }
349 }
350 }
351 }
352
353 contributor_keys.truncate(self.config.max_contributors_per_window);
354
355 info!(
356 contributors = contributor_keys.len(),
357 "Discovered contributors to evaluate"
358 );
359
360 let mut eval_results = Vec::new();
362 for contributor_key in &contributor_keys {
363 match evaluate_contributor(
364 *contributor_key,
365 &self.arweave,
366 &test_queries,
367 self.node_id,
368 &self.signing_key,
369 )
370 .await
371 {
372 Ok(result) => {
373 eval_results.push(result);
374 }
375 Err(e) => {
376 warn!(
377 contributor = hex::encode(contributor_key),
378 error = %e,
379 "Failed to evaluate contributor"
380 );
381 }
382 }
383 }
384
385 if eval_results.len() >= 2 {
387 let mut raw_ratings: HashMap<[u8; 32], (Rating, u64, i64)> = HashMap::new();
388
389 {
390 let existing = self.contributor_ratings.read().await;
391 for (key, cr) in existing.iter() {
392 raw_ratings.insert(*key, (cr.to_rating(), cr.validation_count, cr.last_updated));
393 }
394 }
395
396 update_contributor_ratings(&eval_results, &mut raw_ratings);
397
398 {
399 let mut ratings = self.contributor_ratings.write().await;
400 for (key, (r, count, updated)) in &raw_ratings {
401 let entry = ratings.entry(*key).or_insert_with(|| ContributorRating::new(*key));
402 entry.mu = r.mu;
403 entry.sigma = r.sigma;
404 entry.validation_count = *count;
405 entry.last_updated = *updated;
406 }
407 }
408
409 info!(
410 evaluated = eval_results.len(),
411 "Updated contributor ratings"
412 );
413 }
414
415 Ok(eval_results)
416 }
417
418 pub async fn test_live_peers(
423 &self,
424 tor_runtime: &cp_tor::TorRuntime,
425 ) -> Result<Vec<PeerTestResult>, ValidatorError> {
426 let corpus = self.corpus.read().await;
428 let corpus = corpus.as_ref().ok_or_else(|| {
429 ValidatorError::Corpus("Corpus not loaded".to_string())
430 })?;
431
432 let window = ValidationWindow::current();
433 let test_queries = window.select_test_queries(corpus, self.config.queries_per_window);
434
435 let model_hash_hex = hex::encode(self.model_hash);
437 let (peers, _cursor) =
438 cp_arweave::discover_peers(&self.arweave, Some(&model_hash_hex), 50, None)
439 .await
440 .map_err(|e| ValidatorError::Arweave(e.to_string()))?;
441
442 let peers_to_test: Vec<_> = peers
443 .into_iter()
444 .take(self.config.max_peers_per_window)
445 .collect();
446
447 info!(peers = peers_to_test.len(), "Testing live peers");
448
449 let mut peer_results = Vec::new();
450 for peer_meta in &peers_to_test {
451 match cp_arweave::download_peer_registration(&self.arweave, peer_meta).await {
452 Ok(registration) => {
453 match evaluate_peer(
454 ®istration,
455 &test_queries,
456 self.node_id,
457 &self.signing_key,
458 self.model_hash,
459 tor_runtime,
460 )
461 .await
462 {
463 Ok(result) => {
464 peer_results.push(result);
465 }
466 Err(e) => {
467 warn!(
468 peer = hex::encode(registration.node_id),
469 error = %e,
470 "Failed to test peer"
471 );
472 }
473 }
474 }
475 Err(e) => {
476 warn!(tx_id = &peer_meta.tx_id, error = %e, "Failed to download peer registration");
477 }
478 }
479 }
480
481 if peer_results.len() >= 2 {
483 let mut raw_ratings: HashMap<[u8; 16], (Rating, u64, i64)> = HashMap::new();
484
485 {
486 let existing = self.peer_ratings.read().await;
487 for (key, pr) in existing.iter() {
488 raw_ratings.insert(*key, (pr.to_rating(), pr.test_count, pr.last_updated));
489 }
490 }
491
492 update_peer_ratings(&peer_results, &mut raw_ratings);
493
494 {
495 let mut ratings = self.peer_ratings.write().await;
496 for (key, (r, count, updated)) in &raw_ratings {
497 let entry = ratings.entry(*key).or_insert_with(|| PeerRating::new(*key));
498 entry.mu = r.mu;
499 entry.sigma = r.sigma;
500 entry.test_count = *count;
501 entry.last_updated = *updated;
502 }
503 }
504
505 info!(tested = peer_results.len(), "Updated peer ratings");
506 }
507
508 Ok(peer_results)
509 }
510
511 pub async fn run_loop(
517 &self,
518 tor_runtime: Option<&cp_tor::TorRuntime>,
519 ) -> Result<(), ValidatorError> {
520 info!(
521 interval_secs = self.config.evaluation_interval_secs,
522 test_peers = self.config.test_peers,
523 "Starting validator loop"
524 );
525
526 loop {
527 match self.run_evaluation_cycle().await {
529 Ok(results) => {
530 info!(results = results.len(), "Evaluation cycle completed");
531 }
532 Err(e) => {
533 error!(error = %e, "Evaluation cycle failed");
534 }
535 }
536
537 if self.config.test_peers {
539 if let Some(runtime) = tor_runtime {
540 match self.test_live_peers(runtime).await {
541 Ok(results) => {
542 info!(peers_tested = results.len(), "Peer testing completed");
543 }
544 Err(e) => {
545 warn!(error = %e, "Peer testing failed");
546 }
547 }
548 } else {
549 warn!("Peer testing enabled but no TorRuntime provided");
550 }
551 }
552
553 tokio::time::sleep(std::time::Duration::from_secs(
554 self.config.evaluation_interval_secs,
555 ))
556 .await;
557 }
558 }
559}
560
561pub fn prioritize_downloads(
567 available_tx_ids: &[(String, Option<[u8; 32]>)],
568 contributor_ratings: &HashMap<[u8; 32], ContributorRating>,
569) -> Vec<String> {
570 let mut scored: Vec<(&str, f64)> = available_tx_ids
571 .iter()
572 .map(|(tx_id, contributor_key)| {
573 let score = contributor_key
574 .and_then(|key| contributor_ratings.get(&key))
575 .map(|cr| cr.conservative_score())
576 .unwrap_or(DEFAULT_MU - 2.0 * DEFAULT_SIGMA);
577 (tx_id.as_str(), score)
578 })
579 .collect();
580
581 scored.sort_by(|(_, a), (_, b)| b.partial_cmp(a).unwrap_or(std::cmp::Ordering::Equal));
582
583 scored.into_iter().map(|(tx_id, _)| tx_id.to_string()).collect()
584}
585
586#[cfg(test)]
587mod tests {
588 use super::*;
589
590 #[test]
591 fn test_contributor_rating_default() {
592 let cr = ContributorRating::new([1u8; 32]);
593 assert!((cr.mu - 25.0).abs() < 1e-6);
594 assert!((cr.sigma - DEFAULT_SIGMA).abs() < 1e-6);
595 assert_eq!(cr.validation_count, 0);
596 }
597
598 #[test]
599 fn test_contributor_rating_conservative_score() {
600 let cr = ContributorRating {
601 contributor_key: [1u8; 32],
602 mu: 30.0,
603 sigma: 5.0,
604 validation_count: 10,
605 last_updated: 0,
606 };
607 assert!((cr.conservative_score() - 20.0).abs() < 1e-6);
608 }
609
610 #[test]
611 fn test_contributor_rating_apply() {
612 let mut cr = ContributorRating::new([1u8; 32]);
613 let new_rating = Rating::new(28.0, 6.5);
614 cr.apply_rating(&new_rating, 1000);
615
616 assert!((cr.mu - 28.0).abs() < 1e-6);
617 assert!((cr.sigma - 6.5).abs() < 1e-6);
618 assert_eq!(cr.validation_count, 1);
619 assert_eq!(cr.last_updated, 1000);
620 }
621
622 #[test]
623 fn test_peer_rating_default() {
624 let pr = PeerRating::new([1u8; 16]);
625 assert!((pr.mu - 25.0).abs() < 1e-6);
626 assert!((pr.sigma - DEFAULT_SIGMA).abs() < 1e-6);
627 assert_eq!(pr.test_count, 0);
628 }
629
630 #[test]
631 fn test_peer_rating_apply() {
632 let mut pr = PeerRating::new([1u8; 16]);
633 let new_rating = Rating::new(27.0, 7.0);
634 pr.apply_rating(&new_rating, 2000);
635
636 assert!((pr.mu - 27.0).abs() < 1e-6);
637 assert!((pr.sigma - 7.0).abs() < 1e-6);
638 assert_eq!(pr.test_count, 1);
639 assert_eq!(pr.last_updated, 2000);
640 }
641
642 #[test]
643 fn test_validator_config_default() {
644 let config = ValidatorConfig::default();
645 assert_eq!(config.evaluation_interval_secs, 3600);
646 assert_eq!(config.queries_per_window, 20);
647 assert_eq!(config.max_contributors_per_window, 50);
648 assert_eq!(config.max_peers_per_window, 20);
649 assert!(config.test_peers);
650 }
651
652 #[test]
653 fn test_prioritize_downloads_rated() {
654 let mut ratings = HashMap::new();
655
656 ratings.insert(
657 [1u8; 32],
658 ContributorRating {
659 contributor_key: [1u8; 32],
660 mu: 35.0,
661 sigma: 3.0,
662 validation_count: 20,
663 last_updated: 0,
664 },
665 );
666
667 ratings.insert(
668 [2u8; 32],
669 ContributorRating {
670 contributor_key: [2u8; 32],
671 mu: 15.0,
672 sigma: 3.0,
673 validation_count: 20,
674 last_updated: 0,
675 },
676 );
677
678 let txs = vec![
679 ("tx_bad".to_string(), Some([2u8; 32])),
680 ("tx_good".to_string(), Some([1u8; 32])),
681 ("tx_unknown".to_string(), None),
682 ];
683
684 let prioritized = prioritize_downloads(&txs, &ratings);
685
686 assert_eq!(prioritized[0], "tx_good");
687 assert_eq!(prioritized[1], "tx_bad");
691 assert_eq!(prioritized[2], "tx_unknown");
692 }
693
694 #[test]
695 fn test_prioritize_downloads_empty() {
696 let ratings = HashMap::new();
697 let txs: Vec<(String, Option<[u8; 32]>)> = Vec::new();
698 let prioritized = prioritize_downloads(&txs, &ratings);
699 assert!(prioritized.is_empty());
700 }
701
702 #[test]
703 fn test_prioritize_downloads_all_unrated() {
704 let ratings = HashMap::new();
705 let txs = vec![
706 ("tx1".to_string(), Some([1u8; 32])),
707 ("tx2".to_string(), Some([2u8; 32])),
708 ];
709
710 let prioritized = prioritize_downloads(&txs, &ratings);
711 assert_eq!(prioritized.len(), 2);
712 }
713}