1pub mod corpus;
28pub mod evaluation;
29pub mod rating;
30pub mod window;
31
32pub use corpus::{TestCorpus, TestQuery};
33pub use evaluation::{
34 composite_score, evaluate_contributor, evaluate_local_graph, evaluate_peer, mrr, ndcg_at_k,
35 peer_composite_score, precision_at_k, update_contributor_ratings, update_peer_ratings,
36 EvaluationResult, PeerQualityMetrics, PeerTestResult, QualityMetrics,
37};
38pub use rating::{pairwise_update, Rating, DEFAULT_MU, DEFAULT_SIGMA};
39pub use window::{select_test_queries, ValidationWindow};
40
41use cp_arweave::ArweaveClient;
42use ed25519_dalek::SigningKey;
43use serde::{Deserialize, Serialize};
44use std::collections::HashMap;
45use std::path::PathBuf;
46use std::sync::Arc;
47use tokio::sync::RwLock;
48use tracing::{error, info, warn};
49
50#[derive(Debug, thiserror::Error)]
52pub enum ValidatorError {
53 #[error("Arweave error: {0}")]
54 Arweave(String),
55
56 #[error("No content found for contributor: {0}")]
57 NoContent(String),
58
59 #[error("Corpus error: {0}")]
60 Corpus(String),
61
62 #[error("Tor not available")]
63 TorNotAvailable,
64
65 #[error("Tor error: {0}")]
66 Tor(String),
67
68 #[error("Internal error: {0}")]
69 Internal(String),
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct ValidatorConfig {
75 pub evaluation_interval_secs: u64,
78
79 pub corpus_path: Option<PathBuf>,
82
83 pub queries_per_window: usize,
85
86 pub max_contributors_per_window: usize,
88
89 pub max_peers_per_window: usize,
91
92 pub test_peers: bool,
94}
95
96impl Default for ValidatorConfig {
97 fn default() -> Self {
98 Self {
99 evaluation_interval_secs: 3600,
100 corpus_path: None,
101 queries_per_window: 20,
102 max_contributors_per_window: 50,
103 max_peers_per_window: 20,
104 test_peers: true,
105 }
106 }
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct ContributorRating {
114 pub contributor_key: [u8; 32],
116 pub mu: f64,
118 pub sigma: f64,
120 pub validation_count: u64,
122 pub last_updated: i64,
124}
125
126impl ContributorRating {
127 pub fn new(contributor_key: [u8; 32]) -> Self {
129 Self {
130 contributor_key,
131 mu: DEFAULT_MU,
132 sigma: DEFAULT_SIGMA,
133 validation_count: 0,
134 last_updated: 0,
135 }
136 }
137
138 pub fn conservative_score(&self) -> f64 {
140 self.mu - 2.0 * self.sigma
141 }
142
143 pub fn to_rating(&self) -> Rating {
145 Rating::new(self.mu, self.sigma)
146 }
147
148 pub fn apply_rating(&mut self, rating: &Rating, now_ms: i64) {
150 self.mu = rating.mu;
151 self.sigma = rating.sigma;
152 self.validation_count += 1;
153 self.last_updated = now_ms;
154 }
155}
156
157#[derive(Debug, Clone, Serialize, Deserialize)]
164pub struct PeerRating {
165 pub peer_node_id: [u8; 16],
167 pub mu: f64,
169 pub sigma: f64,
171 pub test_count: u64,
173 pub last_updated: i64,
175}
176
177impl PeerRating {
178 pub fn new(peer_node_id: [u8; 16]) -> Self {
180 Self {
181 peer_node_id,
182 mu: DEFAULT_MU,
183 sigma: DEFAULT_SIGMA,
184 test_count: 0,
185 last_updated: 0,
186 }
187 }
188
189 pub fn conservative_score(&self) -> f64 {
191 self.mu - 2.0 * self.sigma
192 }
193
194 pub fn to_rating(&self) -> Rating {
196 Rating::new(self.mu, self.sigma)
197 }
198
199 pub fn apply_rating(&mut self, rating: &Rating, now_ms: i64) {
201 self.mu = rating.mu;
202 self.sigma = rating.sigma;
203 self.test_count += 1;
204 self.last_updated = now_ms;
205 }
206}
207
208pub struct Validator {
214 config: ValidatorConfig,
216
217 arweave: Arc<ArweaveClient>,
219
220 signing_key: SigningKey,
222
223 node_id: [u8; 16],
225
226 contributor_ratings: Arc<RwLock<HashMap<[u8; 32], ContributorRating>>>,
228
229 peer_ratings: Arc<RwLock<HashMap<[u8; 16], PeerRating>>>,
231
232 corpus: Arc<RwLock<Option<TestCorpus>>>,
234
235 model_hash: [u8; 32],
237}
238
239impl Validator {
240 pub fn new(
242 config: ValidatorConfig,
243 arweave: Arc<ArweaveClient>,
244 signing_key: SigningKey,
245 model_hash: [u8; 32],
246 ) -> Self {
247 let public_key = signing_key.verifying_key().to_bytes();
248 let node_id_hash = blake3::hash(&public_key);
249 let mut node_id = [0u8; 16];
250 node_id.copy_from_slice(&node_id_hash.as_bytes()[..16]);
251
252 Self {
253 config,
254 arweave,
255 signing_key,
256 node_id,
257 contributor_ratings: Arc::new(RwLock::new(HashMap::new())),
258 peer_ratings: Arc::new(RwLock::new(HashMap::new())),
259 corpus: Arc::new(RwLock::new(None)),
260 model_hash,
261 }
262 }
263
264 pub fn contributor_ratings(&self) -> &Arc<RwLock<HashMap<[u8; 32], ContributorRating>>> {
266 &self.contributor_ratings
267 }
268
269 pub fn peer_ratings(&self) -> &Arc<RwLock<HashMap<[u8; 16], PeerRating>>> {
271 &self.peer_ratings
272 }
273
274 pub fn node_id(&self) -> [u8; 16] {
276 self.node_id
277 }
278
279 pub async fn load_corpus(&self) -> Result<(), ValidatorError> {
283 let new_corpus = if let Some(ref path) = self.config.corpus_path {
284 TestCorpus::load_from_file(path)?
285 } else {
286 TestCorpus::load_from_arweave(&self.arweave).await?
287 };
288
289 info!(queries = new_corpus.len(), "Loaded test corpus");
290 *self.corpus.write().await = Some(new_corpus);
291 Ok(())
292 }
293
294 pub async fn run_evaluation_cycle(&self) -> Result<Vec<EvaluationResult>, ValidatorError> {
305 {
307 let corpus_guard = self.corpus.read().await;
308 if corpus_guard.is_none() {
309 drop(corpus_guard);
310 self.load_corpus().await?;
311 }
312 }
313
314 let corpus = self.corpus.read().await;
315 let corpus = corpus
316 .as_ref()
317 .ok_or_else(|| ValidatorError::Corpus("Corpus not loaded".to_string()))?;
318
319 let window = ValidationWindow::current();
321 let test_queries = window.select_test_queries(corpus, self.config.queries_per_window);
322
323 if test_queries.is_empty() {
324 warn!("No test queries selected for window {}", window.window_id);
325 return Ok(Vec::new());
326 }
327
328 info!(
329 window_id = window.window_id,
330 queries = test_queries.len(),
331 "Starting evaluation cycle"
332 );
333
334 let (discovered, _cursor) = cp_arweave::discover_diffs(&self.arweave, None, 100, None)
336 .await
337 .map_err(|e| ValidatorError::Arweave(e.to_string()))?;
338
339 let mut contributor_keys: Vec<[u8; 32]> = Vec::new();
341 for diff in &discovered {
342 if let Ok(key_bytes) = hex::decode(&diff.contributor_key) {
343 if key_bytes.len() == 32 {
344 let mut key = [0u8; 32];
345 key.copy_from_slice(&key_bytes);
346 if !contributor_keys.contains(&key) {
347 contributor_keys.push(key);
348 }
349 }
350 }
351 }
352
353 contributor_keys.truncate(self.config.max_contributors_per_window);
354
355 info!(
356 contributors = contributor_keys.len(),
357 "Discovered contributors to evaluate"
358 );
359
360 let mut eval_results = Vec::new();
362 for contributor_key in &contributor_keys {
363 match evaluate_contributor(
364 *contributor_key,
365 &self.arweave,
366 &test_queries,
367 self.node_id,
368 &self.signing_key,
369 )
370 .await
371 {
372 Ok(result) => {
373 eval_results.push(result);
374 }
375 Err(e) => {
376 warn!(
377 contributor = hex::encode(contributor_key),
378 error = %e,
379 "Failed to evaluate contributor"
380 );
381 }
382 }
383 }
384
385 if eval_results.len() >= 2 {
387 let mut raw_ratings: HashMap<[u8; 32], (Rating, u64, i64)> = HashMap::new();
388
389 {
390 let existing = self.contributor_ratings.read().await;
391 for (key, cr) in existing.iter() {
392 raw_ratings
393 .insert(*key, (cr.to_rating(), cr.validation_count, cr.last_updated));
394 }
395 }
396
397 update_contributor_ratings(&eval_results, &mut raw_ratings);
398
399 {
400 let mut ratings = self.contributor_ratings.write().await;
401 for (key, (r, count, updated)) in &raw_ratings {
402 let entry = ratings
403 .entry(*key)
404 .or_insert_with(|| ContributorRating::new(*key));
405 entry.mu = r.mu;
406 entry.sigma = r.sigma;
407 entry.validation_count = *count;
408 entry.last_updated = *updated;
409 }
410 }
411
412 info!(
413 evaluated = eval_results.len(),
414 "Updated contributor ratings"
415 );
416 }
417
418 Ok(eval_results)
419 }
420
421 pub async fn test_live_peers(
426 &self,
427 tor_runtime: &cp_tor::TorRuntime,
428 ) -> Result<Vec<PeerTestResult>, ValidatorError> {
429 let corpus = self.corpus.read().await;
431 let corpus = corpus
432 .as_ref()
433 .ok_or_else(|| ValidatorError::Corpus("Corpus not loaded".to_string()))?;
434
435 let window = ValidationWindow::current();
436 let test_queries = window.select_test_queries(corpus, self.config.queries_per_window);
437
438 let model_hash_hex = hex::encode(self.model_hash);
440 let (peers, _cursor) =
441 cp_arweave::discover_peers(&self.arweave, Some(&model_hash_hex), 50, None)
442 .await
443 .map_err(|e| ValidatorError::Arweave(e.to_string()))?;
444
445 let peers_to_test: Vec<_> = peers
446 .into_iter()
447 .take(self.config.max_peers_per_window)
448 .collect();
449
450 info!(peers = peers_to_test.len(), "Testing live peers");
451
452 let mut peer_results = Vec::new();
453 for peer_meta in &peers_to_test {
454 match cp_arweave::download_peer_registration(&self.arweave, peer_meta).await {
455 Ok(registration) => {
456 match evaluate_peer(
457 ®istration,
458 &test_queries,
459 self.node_id,
460 &self.signing_key,
461 self.model_hash,
462 tor_runtime,
463 )
464 .await
465 {
466 Ok(result) => {
467 peer_results.push(result);
468 }
469 Err(e) => {
470 warn!(
471 peer = hex::encode(registration.node_id),
472 error = %e,
473 "Failed to test peer"
474 );
475 }
476 }
477 }
478 Err(e) => {
479 warn!(tx_id = &peer_meta.tx_id, error = %e, "Failed to download peer registration");
480 }
481 }
482 }
483
484 if peer_results.len() >= 2 {
486 let mut raw_ratings: HashMap<[u8; 16], (Rating, u64, i64)> = HashMap::new();
487
488 {
489 let existing = self.peer_ratings.read().await;
490 for (key, pr) in existing.iter() {
491 raw_ratings.insert(*key, (pr.to_rating(), pr.test_count, pr.last_updated));
492 }
493 }
494
495 update_peer_ratings(&peer_results, &mut raw_ratings);
496
497 {
498 let mut ratings = self.peer_ratings.write().await;
499 for (key, (r, count, updated)) in &raw_ratings {
500 let entry = ratings.entry(*key).or_insert_with(|| PeerRating::new(*key));
501 entry.mu = r.mu;
502 entry.sigma = r.sigma;
503 entry.test_count = *count;
504 entry.last_updated = *updated;
505 }
506 }
507
508 info!(tested = peer_results.len(), "Updated peer ratings");
509 }
510
511 Ok(peer_results)
512 }
513
514 pub async fn run_loop(
520 &self,
521 tor_runtime: Option<&cp_tor::TorRuntime>,
522 ) -> Result<(), ValidatorError> {
523 info!(
524 interval_secs = self.config.evaluation_interval_secs,
525 test_peers = self.config.test_peers,
526 "Starting validator loop"
527 );
528
529 loop {
530 match self.run_evaluation_cycle().await {
532 Ok(results) => {
533 info!(results = results.len(), "Evaluation cycle completed");
534 }
535 Err(e) => {
536 error!(error = %e, "Evaluation cycle failed");
537 }
538 }
539
540 if self.config.test_peers {
542 if let Some(runtime) = tor_runtime {
543 match self.test_live_peers(runtime).await {
544 Ok(results) => {
545 info!(peers_tested = results.len(), "Peer testing completed");
546 }
547 Err(e) => {
548 warn!(error = %e, "Peer testing failed");
549 }
550 }
551 } else {
552 warn!("Peer testing enabled but no TorRuntime provided");
553 }
554 }
555
556 tokio::time::sleep(std::time::Duration::from_secs(
557 self.config.evaluation_interval_secs,
558 ))
559 .await;
560 }
561 }
562}
563
564pub fn prioritize_downloads(
570 available_tx_ids: &[(String, Option<[u8; 32]>)],
571 contributor_ratings: &HashMap<[u8; 32], ContributorRating>,
572) -> Vec<String> {
573 let mut scored: Vec<(&str, f64)> = available_tx_ids
574 .iter()
575 .map(|(tx_id, contributor_key)| {
576 let score = contributor_key
577 .and_then(|key| contributor_ratings.get(&key))
578 .map_or(
579 DEFAULT_MU - 2.0 * DEFAULT_SIGMA,
580 ContributorRating::conservative_score,
581 );
582 (tx_id.as_str(), score)
583 })
584 .collect();
585
586 scored.sort_by(|(_, a), (_, b)| b.partial_cmp(a).unwrap_or(std::cmp::Ordering::Equal));
587
588 scored
589 .into_iter()
590 .map(|(tx_id, _)| tx_id.to_string())
591 .collect()
592}
593
594#[cfg(test)]
595mod tests {
596 use super::*;
597
598 #[test]
599 fn test_contributor_rating_default() {
600 let cr = ContributorRating::new([1u8; 32]);
601 assert!((cr.mu - 25.0).abs() < 1e-6);
602 assert!((cr.sigma - DEFAULT_SIGMA).abs() < 1e-6);
603 assert_eq!(cr.validation_count, 0);
604 }
605
606 #[test]
607 fn test_contributor_rating_conservative_score() {
608 let cr = ContributorRating {
609 contributor_key: [1u8; 32],
610 mu: 30.0,
611 sigma: 5.0,
612 validation_count: 10,
613 last_updated: 0,
614 };
615 assert!((cr.conservative_score() - 20.0).abs() < 1e-6);
616 }
617
618 #[test]
619 fn test_contributor_rating_apply() {
620 let mut cr = ContributorRating::new([1u8; 32]);
621 let new_rating = Rating::new(28.0, 6.5);
622 cr.apply_rating(&new_rating, 1000);
623
624 assert!((cr.mu - 28.0).abs() < 1e-6);
625 assert!((cr.sigma - 6.5).abs() < 1e-6);
626 assert_eq!(cr.validation_count, 1);
627 assert_eq!(cr.last_updated, 1000);
628 }
629
630 #[test]
631 fn test_peer_rating_default() {
632 let pr = PeerRating::new([1u8; 16]);
633 assert!((pr.mu - 25.0).abs() < 1e-6);
634 assert!((pr.sigma - DEFAULT_SIGMA).abs() < 1e-6);
635 assert_eq!(pr.test_count, 0);
636 }
637
638 #[test]
639 fn test_peer_rating_apply() {
640 let mut pr = PeerRating::new([1u8; 16]);
641 let new_rating = Rating::new(27.0, 7.0);
642 pr.apply_rating(&new_rating, 2000);
643
644 assert!((pr.mu - 27.0).abs() < 1e-6);
645 assert!((pr.sigma - 7.0).abs() < 1e-6);
646 assert_eq!(pr.test_count, 1);
647 assert_eq!(pr.last_updated, 2000);
648 }
649
650 #[test]
651 fn test_validator_config_default() {
652 let config = ValidatorConfig::default();
653 assert_eq!(config.evaluation_interval_secs, 3600);
654 assert_eq!(config.queries_per_window, 20);
655 assert_eq!(config.max_contributors_per_window, 50);
656 assert_eq!(config.max_peers_per_window, 20);
657 assert!(config.test_peers);
658 }
659
660 #[test]
661 fn test_prioritize_downloads_rated() {
662 let mut ratings = HashMap::new();
663
664 ratings.insert(
665 [1u8; 32],
666 ContributorRating {
667 contributor_key: [1u8; 32],
668 mu: 35.0,
669 sigma: 3.0,
670 validation_count: 20,
671 last_updated: 0,
672 },
673 );
674
675 ratings.insert(
676 [2u8; 32],
677 ContributorRating {
678 contributor_key: [2u8; 32],
679 mu: 15.0,
680 sigma: 3.0,
681 validation_count: 20,
682 last_updated: 0,
683 },
684 );
685
686 let txs = vec![
687 ("tx_bad".to_string(), Some([2u8; 32])),
688 ("tx_good".to_string(), Some([1u8; 32])),
689 ("tx_unknown".to_string(), None),
690 ];
691
692 let prioritized = prioritize_downloads(&txs, &ratings);
693
694 assert_eq!(prioritized[0], "tx_good");
695 assert_eq!(prioritized[1], "tx_bad");
699 assert_eq!(prioritized[2], "tx_unknown");
700 }
701
702 #[test]
703 fn test_prioritize_downloads_empty() {
704 let ratings = HashMap::new();
705 let txs: Vec<(String, Option<[u8; 32]>)> = Vec::new();
706 let prioritized = prioritize_downloads(&txs, &ratings);
707 assert!(prioritized.is_empty());
708 }
709
710 #[test]
711 fn test_prioritize_downloads_all_unrated() {
712 let ratings = HashMap::new();
713 let txs = vec![
714 ("tx1".to_string(), Some([1u8; 32])),
715 ("tx2".to_string(), Some([2u8; 32])),
716 ];
717
718 let prioritized = prioritize_downloads(&txs, &ratings);
719 assert_eq!(prioritized.len(), 2);
720 }
721}