1use crate::seeds::SEEDS;
2pub use crate::{
3 AnchorSet, EpochHint, HintsResponse, Message, PeerInfo, Query, QueryRequest, TrustId,
4};
5use libveritas::cert::CertificateChain;
6use libveritas::msg::QueryContext;
7use libveritas::spaces_protocol::sname::{NameLike, SName};
8use libveritas::{
9 MessageError, ProvableOption, SovereigntyState, TrustSet, VerifiedMessage, Veritas, Zone,
10 compute_trust_set,
11};
12use rand::seq::SliceRandom;
13use serde::{Deserialize, Serialize};
14use std::collections::{HashMap, HashSet};
15use std::fmt;
16use std::str::FromStr;
17use std::sync::Mutex;
18use std::sync::atomic::{AtomicBool, Ordering};
19
20#[cfg(feature = "signing")]
21use libveritas::{
22 builder::MessageBuilder,
23 msg::ChainProof,
24 sip7::{RecordSet, SIG_PRIMARY_ZONE},
25};
26
27pub type Result<T> = std::result::Result<T, Error>;
28
29pub struct AnchorBundle {
30 pub trust_set: TrustSet,
31 pub anchors: Vec<spaces_nums::RootAnchor>,
32}
33
34pub struct ScanParams {
36 pub id: TrustId,
37}
38
39impl ScanParams {
40 pub fn parse(uri: &str) -> Result<Self> {
42 let uri = uri.trim();
43 let query = uri.strip_prefix("veritas://scan?").ok_or_else(|| {
44 std::io::Error::new(
45 std::io::ErrorKind::InvalidData,
46 "expected veritas://scan?... URI",
47 )
48 })?;
49
50 let mut id = None;
51 for pair in query.split('&') {
52 if let Some((key, value)) = pair.split_once('=') {
53 if key == "id" {
54 id = Some(TrustId::from_str(value)?);
55 }
56 }
57 }
58
59 Ok(Self {
60 id: id.ok_or_else(|| {
61 std::io::Error::new(std::io::ErrorKind::InvalidData, "missing id parameter")
62 })?,
63 })
64 }
65}
66
67pub struct Fabric {
68 http: reqwest::Client,
69 pool: RelayPool,
70 veritas: Mutex<Veritas>,
71 dev_mode: bool,
72 root_cache: dashmap::DashMap<String, Zone>,
73 seeds: Vec<String>,
74 trusted: Mutex<Option<TrustSet>>,
76 observed: Mutex<Option<TrustSet>>,
78 semi_trusted: Mutex<Option<TrustSet>>,
81 anchor_pool: Mutex<AnchorPool>,
83 prefer_latest: AtomicBool,
85}
86
87#[derive(Default)]
89struct AnchorPool {
90 trusted: Vec<spaces_nums::RootAnchor>,
91 semi_trusted: Vec<spaces_nums::RootAnchor>,
92 observed: Vec<spaces_nums::RootAnchor>,
93}
94
95impl AnchorPool {
96 fn merged(&self) -> Vec<spaces_nums::RootAnchor> {
97 let mut all = Vec::new();
98 all.extend_from_slice(&self.trusted);
99 all.extend_from_slice(&self.semi_trusted);
100 all.extend_from_slice(&self.observed);
101 all.sort_by_key(|a| std::cmp::Reverse(a.block.height));
102 all.dedup_by_key(|a| a.block.height);
103 all
104 }
105}
106
107#[derive(Serialize, Deserialize)]
109pub struct FabricState {
110 pub version: u32,
111 pub relays: Vec<String>,
112 pub anchors: AnchorPoolState,
113 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
114 pub zone_cache: HashMap<String, Zone>,
115}
116
117#[derive(Serialize, Deserialize, Default)]
119pub struct AnchorPoolState {
120 #[serde(default, skip_serializing_if = "Vec::is_empty")]
121 pub trusted: Vec<spaces_nums::RootAnchor>,
122 #[serde(default, skip_serializing_if = "Vec::is_empty")]
123 pub semi_trusted: Vec<spaces_nums::RootAnchor>,
124 #[serde(default, skip_serializing_if = "Vec::is_empty")]
125 pub observed: Vec<spaces_nums::RootAnchor>,
126}
127
128pub struct RelayPool {
129 inner: Mutex<Vec<RelayEntry>>,
130}
131
132pub struct RelayEntry {
133 pub url: String,
134 pub failures: u32,
135}
136
137enum TrustKind {
138 Trusted(TrustId),
140 SemiTrusted(TrustId),
142 Observed,
144}
145
146pub enum Badge {
148 Orange,
150 Unverified,
153 None,
155}
156
157impl fmt::Display for Badge {
158 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
159 match self {
160 Badge::Orange => write!(f, "orange"),
161 Badge::Unverified => write!(f, "unverified"),
162 Badge::None => write!(f, "none"),
163 }
164 }
165}
166
167impl Default for Fabric {
168 fn default() -> Self {
169 Self::new()
170 }
171}
172
173impl Fabric {
174 pub fn new() -> Self {
176 Self::with_seeds(SEEDS)
177 }
178
179 pub fn with_seeds(seeds: &[&str]) -> Self {
181 Self {
182 http: reqwest::Client::new(),
183 pool: RelayPool::new(std::iter::empty::<String>()),
184 veritas: Mutex::new(Veritas::new()),
185 dev_mode: false,
186 root_cache: Default::default(),
187 seeds: seeds.iter().map(|s| s.to_string()).collect(),
188 observed: Mutex::new(None),
189 trusted: Mutex::new(None),
190 semi_trusted: Mutex::new(None),
191 anchor_pool: Mutex::new(AnchorPool::default()),
192 prefer_latest: AtomicBool::new(true),
193 }
194 }
195
196 pub fn with_dev_mode(mut self) -> Self {
197 self.dev_mode = true;
198 self
199 }
200
201 pub fn save_state(&self) -> FabricState {
203 let pool = self.anchor_pool.lock().unwrap();
204 let zone_cache: HashMap<String, Zone> = self
205 .root_cache
206 .iter()
207 .map(|entry| (entry.key().clone(), entry.value().clone()))
208 .collect();
209
210 FabricState {
211 version: 1,
212 relays: self.pool.urls(),
213 anchors: AnchorPoolState {
214 trusted: pool.trusted.clone(),
215 semi_trusted: pool.semi_trusted.clone(),
216 observed: pool.observed.clone(),
217 },
218 zone_cache,
219 }
220 }
221
222 pub fn from_state(state: FabricState) -> Result<Self> {
225 let fabric = Self::new();
226
227 if !state.relays.is_empty() {
228 fabric.pool.refresh(state.relays);
229 }
230
231 {
232 let mut pool = fabric.anchor_pool.lock().unwrap();
233 pool.trusted = state.anchors.trusted;
234 pool.semi_trusted = state.anchors.semi_trusted;
235 pool.observed = state.anchors.observed;
236
237 if !pool.trusted.is_empty() {
238 *fabric.trusted.lock().unwrap() = Some(compute_trust_set(&pool.trusted));
239 }
240 if !pool.semi_trusted.is_empty() {
241 *fabric.semi_trusted.lock().unwrap() = Some(compute_trust_set(&pool.semi_trusted));
242 }
243 if !pool.observed.is_empty() {
244 *fabric.observed.lock().unwrap() = Some(compute_trust_set(&pool.observed));
245 }
246
247 let merged = pool.merged();
248 if !merged.is_empty() {
249 let v = Veritas::new().with_anchors(merged).map_err(|e| {
250 std::io::Error::new(std::io::ErrorKind::InvalidData, format!("{e:?}"))
251 })?;
252 *fabric.veritas.lock().unwrap() = v;
253 }
254 }
255
256 for (key, zone) in state.zone_cache {
257 fabric.root_cache.insert(key, zone);
258 }
259
260 Ok(fabric)
261 }
262
263 fn are_roots_trusted(&self, roots: &[TrustId]) -> bool {
264 let set = self.trusted.lock().unwrap();
265 let Some(trusted_set) = set.as_ref() else {
266 return false;
267 };
268 for root in roots {
269 if !trusted_set.roots.contains(&root.to_bytes()) {
270 return false;
271 }
272 }
273 true
274 }
275
276 fn are_roots_observed(&self, roots: &[TrustId]) -> bool {
277 let set = self.observed.lock().unwrap();
278 let Some(trusted_set) = set.as_ref() else {
279 return false;
280 };
281 for root in roots {
282 if !trusted_set.roots.contains(&root.to_bytes()) {
283 return false;
284 }
285 }
286 true
287 }
288
289 fn are_roots_semi_trusted(&self, roots: &[TrustId]) -> bool {
290 let set = self.semi_trusted.lock().unwrap();
291 let Some(semi_set) = set.as_ref() else {
292 return false;
293 };
294 for root in roots {
295 if !semi_set.roots.contains(&root.to_bytes()) {
296 return false;
297 }
298 }
299 true
300 }
301
302 pub fn badge(&self, zone: &Zone) -> Badge {
303 let has_any_pool = self.trusted.lock().unwrap().is_some()
304 || self.observed.lock().unwrap().is_some()
305 || self.semi_trusted.lock().unwrap().is_some();
306 if !has_any_pool {
307 return Badge::Unverified;
308 }
309
310 let root = TrustId::from(zone.anchor_hash);
311 let is_trusted = self.are_roots_trusted(&[root]);
312 let is_observed = is_trusted || self.are_roots_observed(&[root]);
313 let is_semi_trusted = is_trusted || self.are_roots_semi_trusted(&[root]);
314
315 if is_trusted && matches!(zone.sovereignty, SovereigntyState::Sovereign) {
316 Badge::Orange
317 } else if is_observed && !is_trusted && !is_semi_trusted {
318 Badge::Unverified
319 } else {
320 Badge::None
321 }
322 }
323
324 pub async fn trust(&self, trust_id: TrustId) -> Result<()> {
326 if self.needs_peers() {
327 self.bootstrap_peers().await?;
328 }
329 self.update_anchors(TrustKind::Trusted(trust_id)).await
330 }
331
332 pub async fn observe(&self) -> Result<()> {
334 if self.needs_peers() {
335 self.bootstrap_peers().await?;
336 }
337 self.update_anchors(TrustKind::Observed).await
338 }
339
340 pub async fn semi_trust(&self, trust_id: TrustId) -> Result<()> {
342 if self.needs_peers() {
343 self.bootstrap_peers().await?;
344 }
345 self.update_anchors(TrustKind::SemiTrusted(trust_id)).await
346 }
347
348 pub fn trusted(&self) -> Option<TrustId> {
350 self.trusted
351 .lock()
352 .unwrap()
353 .as_ref()
354 .map(|t| TrustId::from(t.id))
355 }
356
357 pub fn observed(&self) -> Option<TrustId> {
359 self.observed
360 .lock()
361 .unwrap()
362 .as_ref()
363 .map(|t| TrustId::from(t.id))
364 }
365
366 pub fn semi_trusted(&self) -> Option<TrustId> {
368 self.semi_trusted
369 .lock()
370 .unwrap()
371 .as_ref()
372 .map(|t| TrustId::from(t.id))
373 }
374
375 pub fn trust_from_set(&self, set: &crate::AnchorSet) -> Result<TrustId> {
378 let trust_set = compute_trust_set(&set.entries);
379 let id = TrustId::from(trust_set.id);
380 let mut pool = self.anchor_pool.lock().unwrap();
381 pool.trusted = set.entries.clone();
382 let v = Veritas::new()
383 .with_anchors(pool.merged())
384 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, format!("{e:?}")))?;
385 *self.veritas.lock().unwrap() = v;
386 *self.trusted.lock().unwrap() = Some(trust_set);
387 Ok(id)
388 }
389
390 pub async fn trust_from_qr(&self, payload: &str) -> Result<()> {
392 let params = ScanParams::parse(payload)?;
393 self.trust(params.id).await
394 }
395
396 pub async fn semi_trust_from_qr(&self, payload: &str) -> Result<()> {
398 let params = ScanParams::parse(payload)?;
399 self.semi_trust(params.id).await
400 }
401
402 pub fn clear_trusted(&self) {
404 *self.trusted.lock().unwrap() = None;
405 }
406
407 pub fn clear_semi_trusted(&self) {
409 *self.semi_trusted.lock().unwrap() = None;
410 }
411
412 pub fn set_prefer_latest(&self, latest: bool) {
414 self.prefer_latest.store(latest, Ordering::Relaxed);
415 }
416
417 async fn update_anchors(&self, kind: TrustKind) -> Result<()> {
418 let (id, peers) = match &kind {
419 TrustKind::Trusted(id) | TrustKind::SemiTrusted(id) => {
420 let peers = self.pool.shuffled_urls_n(4);
421 (*id, peers)
422 }
423 TrustKind::Observed => fetch_latest_trust_id(&self.http, &self.seeds).await?,
424 };
425
426 let ab = fetch_anchor_set(&self.http, id, &peers).await?;
427
428 let mut pool = self.anchor_pool.lock().unwrap();
429 match &kind {
430 TrustKind::Trusted(_) => pool.trusted = ab.anchors,
431 TrustKind::SemiTrusted(_) => pool.semi_trusted = ab.anchors,
432 TrustKind::Observed => pool.observed = ab.anchors,
433 }
434 if let Ok(v) = Veritas::new().with_anchors(pool.merged()) {
435 *self.veritas.lock().unwrap() = v;
436 }
437 drop(pool);
438
439 match kind {
440 TrustKind::Trusted(_) => *self.trusted.lock().unwrap() = Some(ab.trust_set),
441 TrustKind::SemiTrusted(_) => *self.semi_trusted.lock().unwrap() = Some(ab.trust_set),
442 TrustKind::Observed => *self.observed.lock().unwrap() = Some(ab.trust_set),
443 }
444 Ok(())
445 }
446
447 fn needs_peers(&self) -> bool {
449 self.pool.is_empty()
450 }
451
452 fn needs_anchors(&self) -> bool {
454 self.veritas.lock().unwrap().newest_anchor() == 0
455 }
456
457 pub async fn bootstrap(&self) -> Result<()> {
459 if self.needs_peers() {
460 self.bootstrap_peers().await?;
461 }
462 if self.needs_anchors() {
463 self.update_anchors(TrustKind::Observed).await?;
464 }
465 Ok(())
466 }
467
468 async fn bootstrap_peers(&self) -> Result<()> {
470 let mut urls: HashSet<String> = self.seeds.iter().cloned().collect();
471 let mut last_err: Option<Error> = None;
472
473 for seed in &self.seeds {
474 match fetch_peers(&self.http, seed).await {
475 Ok(peers) => {
476 for peer in peers {
477 urls.insert(peer.url);
478 }
479 }
480 Err(e) => {
481 last_err = Some(e);
482 }
483 }
484 }
485
486 if urls.is_empty() {
487 if last_err.is_none() {
488 self.pool.refresh(self.seeds.clone());
489 return Ok(());
490 }
491 return Err(last_err.unwrap_or(Error::NoPeers));
492 }
493
494 self.pool.refresh(urls);
495 Ok(())
496 }
497
498 pub async fn resolve(&self, handle: &str) -> Result<Option<Zone>> {
502 let zones = self.resolve_all(&[handle]).await?;
503 Ok(zones.into_iter().find(|z| z.handle.to_string() == handle))
504 }
505
506 pub async fn resolve_by_id(&self, num_id: &str) -> Result<Option<Zone>> {
511 self.bootstrap().await?;
512 let relays = self.pool.shuffled_urls_n(4);
513 let mut last_err: Option<Error> = None;
514 let mut any_responded = false;
515
516 for url in &relays {
517 let reverse_url = format!("{url}/reverse?ids={num_id}");
518 let records: Vec<crate::ReverseRecord> = match self.http.get(&reverse_url).send().await
519 {
520 Ok(resp) if resp.status().is_success() => match resp.json().await {
521 Ok(r) => r,
522 Err(_) => continue,
523 },
524 _ => continue,
525 };
526
527 any_responded = true;
528
529 let Some(entry) = records.iter().find(|r| r.id == num_id) else {
530 continue;
531 };
532
533 let zone = match self.resolve(&entry.name).await {
534 Ok(Some(z)) => z,
535 Ok(None) => continue,
536 Err(e) => {
537 last_err = Some(e);
538 continue;
539 }
540 };
541
542 let zone_num_id = zone.num_id.as_ref().map(|id| id.to_string());
543 if zone_num_id.as_deref() != Some(num_id) {
544 last_err = Some(Error::Decode(std::io::Error::new(
545 std::io::ErrorKind::InvalidData,
546 format!("reverse mismatch: expected {num_id}, got {:?}", zone_num_id),
547 )));
548 continue;
549 }
550
551 return Ok(Some(zone));
552 }
553
554 if any_responded && last_err.is_none() {
555 return Ok(None);
556 }
557
558 Err(last_err.unwrap_or(Error::NoPeers))
559 }
560
561 pub async fn search_addr(&self, name: &str, addr: &str) -> Result<Vec<Zone>> {
566 self.bootstrap().await?;
567 let relays = self.pool.shuffled_urls_n(4);
568 let mut last_err = Error::NoPeers;
569
570 for url in &relays {
571 let addr_url = format!("{url}/addrs?name={name}&addr={addr}");
572 let addr_match: crate::AddrMatch = match self.http.get(&addr_url).send().await {
573 Ok(resp) if resp.status().is_success() => match resp.json().await {
574 Ok(r) => r,
575 Err(_) => continue,
576 },
577 _ => continue,
578 };
579
580 if addr_match.handles.is_empty() {
581 continue;
582 }
583
584 let rev_names: Vec<String> = addr_match.handles.iter().map(|e| e.rev.clone()).collect();
585 let refs: Vec<&str> = rev_names.iter().map(|s| s.as_str()).collect();
586 let zones = match self.resolve_all(&refs).await {
587 Ok(z) => z,
588 Err(e) => {
589 last_err = e;
590 continue;
591 }
592 };
593
594 let matching: Vec<Zone> = zones
595 .into_iter()
596 .filter(|zone| {
597 zone.records
598 .iter()
599 .map(|mut rrs| {
600 rrs.any(|r| {
601 matches!(r, libveritas::sip7::ParsedRecord::Addr { key, value }
602 if key == name && value.iter().next() == Some(addr))
603 })
604 })
605 .unwrap_or(false)
606 })
607 .collect();
608
609 if matching.is_empty() {
610 last_err = Error::Decode(std::io::Error::new(
611 std::io::ErrorKind::NotFound,
612 "no verified matches",
613 ));
614 continue;
615 }
616
617 return Ok(matching);
618 }
619
620 Err(last_err)
621 }
622
623 pub async fn resolve_all(&self, handles: &[&str]) -> Result<Vec<Zone>> {
625 let snames: Vec<SName> = handles
626 .iter()
627 .filter_map(|h| SName::try_from(*h).ok())
628 .collect();
629
630 let lookup = libveritas::names::Lookup::new(snames);
631 let mut all_zones: Vec<Zone> = Vec::new();
632
633 let mut prev_batch: Vec<SName> = Vec::new();
634 let mut batch: Vec<SName> = lookup.start();
635 while !batch.is_empty() {
636 if batch == prev_batch {
637 break;
638 }
639 let strs: Vec<String> = batch.iter().map(|s| s.to_string()).collect();
640 let refs: Vec<&str> = strs.iter().map(|s| s.as_str()).collect();
641 let (verified, _relay_url) = self.resolve_flat(&refs, true).await?;
642 prev_batch = batch;
643 batch = lookup.advance(&verified.zones);
644 all_zones.extend(verified.zones);
645 }
646
647 lookup.expand_zones(&mut all_zones);
648 Ok(all_zones)
649 }
650
651 pub async fn export(&self, handle: &str) -> Result<Vec<u8>> {
653 let sname = SName::try_from(handle)
654 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e.to_string()))?;
655
656 let lookup = libveritas::names::Lookup::new(vec![sname.clone()]);
657 let mut all_verified: Vec<VerifiedMessage> = Vec::new();
658
659 let mut prev_batch: Vec<SName> = Vec::new();
660 let mut batch: Vec<SName> = lookup.start();
661 while !batch.is_empty() {
662 if batch == prev_batch {
663 break;
664 }
665 let strs: Vec<String> = batch.iter().map(|s| s.to_string()).collect();
666 let refs: Vec<&str> = strs.iter().map(|s| s.as_str()).collect();
667 let (verified, _relay_url) = self.resolve_flat(&refs, false).await?;
668 prev_batch = batch;
669 batch = lookup.advance(&verified.zones);
670 all_verified.push(verified);
671 }
672
673 let mut certs = Vec::new();
674 for msg in &all_verified {
675 certs.extend(msg.certificates());
676 }
677
678 let chain = CertificateChain::new(sname, certs);
679 Ok(chain.to_bytes())
680 }
681
682 async fn resolve_flat(
684 &self,
685 handles: &[&str],
686 hints: bool,
687 ) -> Result<(VerifiedMessage, String)> {
688 let mut by_space: HashMap<String, Vec<String>> = HashMap::new();
689 for &h in handles {
690 let sname = SName::try_from(h).map_err(|e| {
691 std::io::Error::new(std::io::ErrorKind::InvalidInput, e.to_string())
692 })?;
693 let space = sname
694 .space()
695 .ok_or_else(|| {
696 std::io::Error::new(std::io::ErrorKind::InvalidInput, format!("{h}: no space"))
697 })?
698 .to_string();
699 let subspace = sname.subspace().map(|l| l.to_string()).unwrap_or_default();
700 by_space.entry(space).or_default().push(subspace);
701 }
702
703 let queries = by_space
704 .into_iter()
705 .map(|(space, handles)| {
706 let mut q = Query::new(space.clone(), handles);
707 if hints {
708 if let Some(zone) = self.root_cache.get(&space) {
709 if let Some(hint) = epoch_hint_from_zone(&zone) {
710 q = q.with_epoch_hint(hint);
711 }
712 }
713 }
714 q
715 })
716 .collect();
717 let request = QueryRequest::new(queries);
718 self.query(&request).await
719 }
720
721 async fn query(&self, request: &QueryRequest) -> Result<(VerifiedMessage, String)> {
722 self.bootstrap().await?;
723 let mut ctx = QueryContext::new();
724 request
725 .queries
726 .iter()
727 .filter_map(|q| self.root_cache.get(&q.space))
728 .map(|z| z.clone())
729 .for_each(|z| {
730 ctx.add_zone(z);
731 });
732
733 let relays = if self.prefer_latest.load(Ordering::Relaxed) {
734 self.pick_relays(request, 4).await
735 } else {
736 self.pool.shuffled_urls_n(4)
737 };
738
739 let (res, relay_url) = self.send_query(&ctx, request, &relays).await?;
740 res.zones
741 .iter()
742 .filter(|z| z.handle.is_single_label())
743 .for_each(|z| {
744 self.root_cache.insert(z.handle.to_string(), z.clone());
745 });
746 Ok((res, relay_url))
747 }
748
749 async fn send_query(
752 &self,
753 ctx: &QueryContext,
754 request: &QueryRequest,
755 relays: &[String],
756 ) -> Result<(VerifiedMessage, String)> {
757 let mut q_parts: Vec<String> = Vec::new();
759 let mut hint_parts: Vec<String> = Vec::new();
760 for q in &request.queries {
761 q_parts.push(q.space.clone());
762 for h in &q.handles {
763 if !h.is_empty() {
764 q_parts.push(format!("{}{}", h, q.space));
765 }
766 }
767 if let Some(ref hint) = q.epoch_hint {
768 hint_parts.push(format!("{}:{}:{}", q.space, hint.root, hint.height));
769 }
770 }
771 let q_param = q_parts.join(",");
772 let hints_param = hint_parts.join(",");
773
774 let mut last_err = Error::NoPeers;
775 for url in relays {
776 let mut req = self
777 .http
778 .get(format!("{url}/query"))
779 .query(&[("q", &q_param)]);
780 if !hints_param.is_empty() {
781 req = req.query(&[("hints", &hints_param)]);
782 }
783 let resp = match req.send().await {
784 Ok(r) => r,
785 Err(e) => {
786 self.pool.mark_failed(url);
787 last_err = Error::Decode(std::io::Error::new(
788 std::io::ErrorKind::ConnectionRefused,
789 format!("GET {url}/query: {e}"),
790 ));
791 continue;
792 }
793 };
794 if !resp.status().is_success() {
795 self.pool.mark_failed(url);
796 let status = resp.status().as_u16();
797 let body = resp.text().await.unwrap_or_default();
798 last_err = Error::Relay { status, body };
799 continue;
800 }
801 let bytes = match resp.bytes().await {
802 Ok(b) => b,
803 Err(e) => {
804 self.pool.mark_failed(url);
805 last_err = Error::Decode(std::io::Error::new(
806 std::io::ErrorKind::InvalidData,
807 format!("GET {url}/query: reading response: {e}"),
808 ));
809 continue;
810 }
811 };
812 let msg = Message::from_slice(&bytes).map_err(|e| {
813 Error::Decode(std::io::Error::new(
814 e.kind(),
815 format!("{url}/query: decoding message: {e}"),
816 ))
817 })?;
818 let options = if self.dev_mode {
819 libveritas::VERIFY_DEV_MODE
820 } else {
821 0
822 };
823 match self
824 .veritas
825 .lock()
826 .unwrap()
827 .verify_with_options(ctx, msg, options)
828 {
829 Ok(res) => {
830 self.pool.mark_alive(url);
831 return Ok((res, url.clone()));
832 }
833 Err(e) => {
834 self.pool.mark_failed(url);
835 last_err = Error::Verify(e);
836 }
837 }
838 }
839 Err(last_err)
840 }
841
842 async fn pick_relays(&self, request: &QueryRequest, count: usize) -> Vec<String> {
844 let hints_query = hints_query_string(request);
845 let shuffled = self.pool.shuffled_urls();
846
847 let mut ranked: Vec<(String, HintsResponse)> = Vec::new();
848
849 for batch in shuffled.chunks(count) {
850 if ranked.len() >= count {
851 break;
852 }
853
854 let mut tasks: Vec<(String, tokio::task::JoinHandle<Option<HintsResponse>>)> =
855 Vec::with_capacity(batch.len());
856 for url in batch {
857 let http = self.http.clone();
858 let hints_url = format!("{url}/hints");
859 let q = hints_query.clone();
860 tasks.push((
861 url.clone(),
862 tokio::spawn(async move {
863 let resp = http.get(&hints_url).query(&[("q", &q)]).send().await.ok()?;
864 if !resp.status().is_success() {
865 return None;
866 }
867 resp.json::<HintsResponse>().await.ok()
868 }),
869 ));
870 }
871
872 for (url, task) in tasks {
873 match task.await {
874 Ok(Some(hints)) => ranked.push((url, hints)),
875 _ => {
876 self.pool.mark_failed(&url);
877 }
878 }
879 }
880 }
881
882 ranked.sort_by(|a, b| b.1.cmp(&a.1));
883 ranked.into_iter().map(|(url, _)| url).collect()
884 }
885
886 pub async fn prove(&self, request: &spaces_nums::ChainProofRequest) -> Result<Vec<u8>> {
889 self.bootstrap().await?;
890 let body = serde_json::to_vec(request)
891 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
892
893 let urls = self.pool.shuffled_urls_n(4);
894 let mut last_err = Error::NoPeers;
895
896 for url in &urls {
897 let prove_url = format!("{url}/chain-proof");
898 let result = self
899 .http
900 .post(&prove_url)
901 .body(body.clone())
902 .header("content-type", "application/json")
903 .send()
904 .await;
905
906 match result {
907 Ok(resp) if resp.status().is_success() => {
908 self.pool.mark_alive(url);
909 return resp.bytes().await.map(|b| b.to_vec()).map_err(|e| {
910 Error::Decode(std::io::Error::new(
911 std::io::ErrorKind::InvalidData,
912 format!("POST {prove_url}: reading response: {e}"),
913 ))
914 });
915 }
916 Ok(resp) => {
917 self.pool.mark_failed(url);
918 let status = resp.status().as_u16();
919 let body = resp.text().await.unwrap_or_default();
920 last_err = Error::Relay { status, body };
921 }
922 Err(e) => {
923 self.pool.mark_failed(url);
924 last_err = Error::Decode(std::io::Error::new(
925 std::io::ErrorKind::ConnectionRefused,
926 format!("POST {prove_url}: {e}"),
927 ));
928 }
929 }
930 }
931
932 Err(last_err)
933 }
934
935 pub async fn broadcast(&self, msg_bytes: &[u8]) -> Result<()> {
938 self.bootstrap().await?;
939 let urls = self.pool.shuffled_urls_n(4);
940 if urls.is_empty() {
941 return Err(Error::NoPeers);
942 }
943
944 let mut any_ok = false;
945 let mut last_err = None;
946 for url in &urls {
947 let msg_url = format!("{url}/message");
948 let result = self
949 .http
950 .post(&msg_url)
951 .body(msg_bytes.to_vec())
952 .header("content-type", "application/octet-stream")
953 .send()
954 .await;
955
956 match result {
957 Ok(resp) if resp.status().is_success() => any_ok = true,
958 Ok(resp) => {
959 let status = resp.status().as_u16();
960 let body = resp.text().await.unwrap_or_default();
961 last_err = Some(Error::Relay { status, body });
962 }
963 Err(e) => {
964 last_err = Some(Error::Decode(std::io::Error::new(
965 std::io::ErrorKind::ConnectionRefused,
966 format!("POST {msg_url}: {e}"),
967 )))
968 }
969 }
970 }
971
972 if any_ok {
973 Ok(())
974 } else {
975 Err(last_err.unwrap())
976 }
977 }
978
979 #[cfg(feature = "signing")]
986 pub async fn publish(
987 &self,
988 cert: &[u8],
989 records: RecordSet,
990 secret_key: &[u8; 32],
991 primary: bool,
992 ) -> Result<()> {
993 let msg = self.sign(cert, records, secret_key, primary).await?;
994 self.broadcast(&msg).await
995 }
996
997 #[cfg(feature = "signing")]
1001 pub async fn sign(
1002 &self,
1003 cert: &[u8],
1004 records: RecordSet,
1005 secret_key: &[u8; 32],
1006 primary: bool,
1007 ) -> Result<Vec<u8>> {
1008 let chain = CertificateChain::from_slice(cert)?;
1009 let mut builder = MessageBuilder::new();
1010 builder.add_handle(chain, records);
1011 let proof_bytes = self.prove(&builder.chain_proof_request()).await?;
1012 let proof = ChainProof::from_slice(&proof_bytes)?;
1013 let (mut message, mut unsigned) = builder.build(proof)?;
1014
1015 for u in &mut unsigned {
1016 if primary {
1017 u.flags |= SIG_PRIMARY_ZONE;
1018 }
1019 let sig = crate::signing::sign_schnorr(&u.signing_id(), secret_key)
1020 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
1021 let signed = u.pack_sig(sig.to_vec());
1022 message.set_records(&u.canonical, signed);
1023 }
1024
1025 Ok(message.to_bytes())
1026 }
1027
1028 pub async fn peers(&self) -> Result<Vec<PeerInfo>> {
1030 let urls = self.pool.shuffled_urls_n(1);
1031 let url = urls.first().ok_or(Error::NoPeers)?;
1032 fetch_peers(&self.http, url).await
1033 }
1034
1035 pub async fn refresh_peers(&self) -> Result<()> {
1037 let current = self.pool.urls();
1038 let mut new_urls: HashSet<String> = HashSet::new();
1039
1040 for url in ¤t {
1041 if let Ok(peers) = fetch_peers(&self.http, url).await {
1042 for peer in peers {
1043 new_urls.insert(peer.url);
1044 }
1045 }
1046 }
1047
1048 self.pool.refresh(new_urls);
1049 if self.pool.is_empty() {
1050 return Err(Error::NoPeers);
1051 }
1052 Ok(())
1053 }
1054
1055 pub fn relays(&self) -> Vec<String> {
1057 self.pool.urls()
1058 }
1059
1060 pub fn veritas(&self) -> Veritas {
1062 self.veritas.lock().unwrap().clone()
1063 }
1064}
1065
1066fn hints_query_string(request: &QueryRequest) -> String {
1069 let mut parts = HashSet::new();
1070 for query in &request.queries {
1071 parts.insert(query.space.clone());
1072 for handle in &query.handles {
1073 parts.insert(format!("{}{}", handle, query.space));
1074 }
1075 }
1076 parts.into_iter().collect::<Vec<_>>().join(",")
1077}
1078
1079fn epoch_hint_from_zone(zone: &Zone) -> Option<EpochHint> {
1080 if let ProvableOption::Exists { value: c } = &zone.commitment {
1081 Some(EpochHint {
1082 root: hex::encode(c.onchain.state_root),
1083 height: c.onchain.block_height,
1084 })
1085 } else {
1086 None
1087 }
1088}
1089
1090async fn fetch_peers(http: &reqwest::Client, relay_url: &str) -> Result<Vec<PeerInfo>> {
1091 let url = format!("{relay_url}/peers");
1092 let resp = http.get(&url).send().await.map_err(|e| {
1093 Error::Decode(std::io::Error::new(
1094 std::io::ErrorKind::ConnectionRefused,
1095 format!("GET {url}: {e}"),
1096 ))
1097 })?;
1098 if !resp.status().is_success() {
1099 let status = resp.status().as_u16();
1100 let body = resp.text().await.unwrap_or_default();
1101 return Err(Error::Relay { status, body });
1102 }
1103 resp.json().await.map_err(|e| {
1104 Error::Decode(std::io::Error::new(
1105 std::io::ErrorKind::InvalidData,
1106 format!("GET {url}: {e}"),
1107 ))
1108 })
1109}
1110
1111impl RelayPool {
1112 fn new(urls: impl IntoIterator<Item = String>) -> Self {
1113 let entries = urls
1114 .into_iter()
1115 .map(|url| RelayEntry { url, failures: 0 })
1116 .collect();
1117 Self {
1118 inner: Mutex::new(entries),
1119 }
1120 }
1121
1122 pub fn shuffled_urls(&self) -> Vec<String> {
1124 self.shuffled_urls_n(usize::MAX)
1125 }
1126
1127 pub fn shuffled_urls_n(&self, n: usize) -> Vec<String> {
1129 let mut entries = self.inner.lock().unwrap();
1130 entries.shuffle(&mut rand::rng());
1131 entries.sort_by_key(|e| e.failures);
1132 entries.iter().take(n).map(|e| e.url.clone()).collect()
1133 }
1134
1135 pub fn mark_failed(&self, url: &str) {
1136 let mut entries = self.inner.lock().unwrap();
1137 if let Some(e) = entries.iter_mut().find(|e| e.url == url) {
1138 e.failures = e.failures.saturating_add(1);
1139 }
1140 }
1141
1142 pub fn mark_alive(&self, url: &str) {
1143 let mut entries = self.inner.lock().unwrap();
1144 if let Some(e) = entries.iter_mut().find(|e| e.url == url) {
1145 e.failures = 0;
1146 }
1147 }
1148
1149 pub fn refresh(&self, new_urls: impl IntoIterator<Item = String>) {
1151 let mut entries = self.inner.lock().unwrap();
1152 let existing: HashSet<String> = entries.iter().map(|e| e.url.clone()).collect();
1153 for url in new_urls {
1154 if !existing.contains(url.as_str()) {
1155 entries.push(RelayEntry { url, failures: 0 });
1156 }
1157 }
1158 }
1159
1160 pub fn is_empty(&self) -> bool {
1161 self.inner.lock().unwrap().is_empty()
1162 }
1163
1164 pub fn urls(&self) -> Vec<String> {
1165 self.inner
1166 .lock()
1167 .unwrap()
1168 .iter()
1169 .map(|e| e.url.clone())
1170 .collect()
1171 }
1172}
1173
1174#[derive(Debug)]
1175pub enum Error {
1176 Decode(std::io::Error),
1177 Verify(MessageError),
1178 Relay { status: u16, body: String },
1179 NoPeers,
1180}
1181
1182impl fmt::Display for Error {
1183 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1184 match self {
1185 Error::Decode(e) => write!(f, "decode error: {e}"),
1186 Error::Verify(e) => write!(f, "verification error: {e}"),
1187 Error::Relay { status, body } => write!(f, "relay error ({status}): {body}"),
1188 Error::NoPeers => write!(f, "no peers available"),
1189 }
1190 }
1191}
1192
1193impl std::error::Error for Error {}
1194
1195impl From<std::io::Error> for Error {
1196 fn from(e: std::io::Error) -> Self {
1197 Error::Decode(e)
1198 }
1199}
1200
1201impl From<MessageError> for Error {
1202 fn from(e: MessageError) -> Self {
1203 Error::Verify(e)
1204 }
1205}
1206
1207impl From<hex::FromHexError> for Error {
1208 fn from(e: hex::FromHexError) -> Self {
1209 Error::Decode(std::io::Error::new(
1210 std::io::ErrorKind::InvalidData,
1211 e.to_string(),
1212 ))
1213 }
1214}
1215
1216async fn fetch_latest_trust_id(
1220 http: &reqwest::Client,
1221 peers: &[String],
1222) -> Result<(TrustId, Vec<String>)> {
1223 let mut votes: HashMap<(String, u32), Vec<String>> = HashMap::new();
1224 let mut last_err: Option<Error> = None;
1225
1226 for url in peers {
1227 let resp = match http.head(format!("{url}/anchors")).send().await {
1228 Ok(r) => r,
1229 Err(e) => {
1230 last_err = Some(Error::Decode(std::io::Error::new(
1231 std::io::ErrorKind::ConnectionRefused,
1232 format!("HEAD {url}/anchors: {e}"),
1233 )));
1234 continue;
1235 }
1236 };
1237
1238 if !resp.status().is_success() {
1239 let status = resp.status().as_u16();
1240 last_err = Some(Error::Relay {
1241 status,
1242 body: format!("HEAD {url}/anchors: {status}"),
1243 });
1244 continue;
1245 }
1246
1247 let root = resp
1248 .headers()
1249 .get("x-anchor-root")
1250 .and_then(|v| v.to_str().ok())
1251 .map(|s| s.to_string());
1252
1253 let height: u32 = resp
1254 .headers()
1255 .get("x-anchor-height")
1256 .and_then(|v| v.to_str().ok())
1257 .and_then(|s| s.parse().ok())
1258 .unwrap_or(0);
1259
1260 if let Some(root) = root {
1261 votes
1262 .entry((root, height))
1263 .or_default()
1264 .push(url.to_string());
1265 }
1266 }
1267
1268 let (hash, peers) = votes
1269 .into_iter()
1270 .max_by_key(|((_, height), peers)| (peers.len(), *height))
1271 .map(|((root, _), peers)| (root, peers))
1272 .ok_or_else(|| last_err.unwrap_or(Error::NoPeers))?;
1273
1274 Ok((TrustId::from_str(&hash)?, peers))
1275}
1276
1277async fn fetch_anchor_set(
1278 http: &reqwest::Client,
1279 trust_id: TrustId,
1280 peers: &[String],
1281) -> Result<AnchorBundle> {
1282 let mut last_err: Option<Error> = None;
1283 for url in peers {
1284 let anchor_url = format!("{url}/anchors?root={trust_id}");
1285 let resp = match http.get(&anchor_url).send().await {
1286 Ok(r) => r,
1287 Err(e) => {
1288 last_err = Some(Error::Decode(std::io::Error::new(
1289 std::io::ErrorKind::ConnectionRefused,
1290 format!("GET {anchor_url}: {e}"),
1291 )));
1292 continue;
1293 }
1294 };
1295
1296 if !resp.status().is_success() {
1297 let status = resp.status().as_u16();
1298 let body = resp.text().await.unwrap_or_default();
1299 last_err = Some(Error::Relay { status, body });
1300 continue;
1301 }
1302
1303 let anchor_set: AnchorSet = match resp.json().await {
1304 Ok(a) => a,
1305 Err(e) => {
1306 last_err = Some(Error::Decode(std::io::Error::new(
1307 std::io::ErrorKind::InvalidData,
1308 format!("GET {anchor_url}: {e}"),
1309 )));
1310 continue;
1311 }
1312 };
1313
1314 let ab = AnchorBundle {
1315 trust_set: compute_trust_set(&anchor_set.entries),
1316 anchors: anchor_set.entries,
1317 };
1318
1319 if TrustId::from(ab.trust_set.id) != trust_id {
1320 last_err = Some(Error::Decode(std::io::Error::new(
1321 std::io::ErrorKind::InvalidData,
1322 format!("GET {anchor_url}: anchor root mismatch"),
1323 )));
1324 continue;
1325 }
1326
1327 return Ok(ab);
1328 }
1329
1330 Err(last_err.unwrap_or(Error::NoPeers))
1331}