1use crate::data::client::peer_xor_distance;
7use crate::data::client::Client;
8use crate::data::client::PUT_TARGET_WIDTH;
9use crate::data::error::{Error, Result};
10use ant_protocol::evm::{Amount, PaymentQuote};
11use ant_protocol::transport::{
12 DHTNode, MultiAddr, P2PNode, PeerId, ResponderView, WitnessedCloseGroup,
13};
14use ant_protocol::{
15 compute_address, send_and_await_chunk_response, ChunkMessage, ChunkMessageBody,
16 ChunkQuoteRequest, ChunkQuoteResponse, CLOSE_GROUP_MAJORITY, CLOSE_GROUP_SIZE,
17};
18use futures::stream::{FuturesUnordered, StreamExt};
19use std::collections::{HashMap, HashSet};
20use std::sync::Arc;
21use std::time::Duration;
22use tracing::{debug, info, warn};
23
24const FAULT_TOLERANT_QUOTE_QUERY_MULTIPLIER: usize = 2;
29
30const WITNESSED_QUORUM_NUMERATOR: usize = 2;
34const WITNESSED_QUORUM_DENOMINATOR: usize = 3;
35
36const SINGLE_NODE_WITNESSED_VIEW_COUNT: usize = 20;
38
39const MEDIAN_QUOTE_INDEX: usize = CLOSE_GROUP_SIZE / 2;
41
42const QUOTE_COLLECTION_TIMEOUT_SECS: u64 = 120;
46
47const ML_DSA_PUB_KEY_LEN: usize = 1952;
56
57fn quote_binding_is_valid(peer_id: &PeerId, quote: &PaymentQuote) -> bool {
73 if quote.pub_key.len() != ML_DSA_PUB_KEY_LEN {
74 return false;
75 }
76 compute_address("e.pub_key) == *peer_id.as_bytes()
77}
78
79fn classify_quote_response(
106 peer_id: &PeerId,
107 quote_bytes: &[u8],
108 already_stored: bool,
109) -> std::result::Result<(PaymentQuote, Amount), Error> {
110 let payment_quote = rmp_serde::from_slice::<PaymentQuote>(quote_bytes).map_err(|e| {
111 Error::Serialization(format!("Failed to deserialize quote from {peer_id}: {e}"))
112 })?;
113
114 if !quote_binding_is_valid(peer_id, &payment_quote) {
119 let derived = compute_address(&payment_quote.pub_key);
120 warn!(
121 "Dropping response from {peer_id} — quote.pub_key BLAKE3 mismatch \
122 (peer is signing quotes with another peer's key); the storer \
123 would reject this proof"
124 );
125 return Err(Error::BadQuoteBinding {
126 peer_id: peer_id.to_string(),
127 detail: format!(
128 "BLAKE3(pub_key)={} pub_key_len={}",
129 hex::encode(derived),
130 payment_quote.pub_key.len(),
131 ),
132 });
133 }
134
135 if already_stored {
136 debug!("Peer {peer_id} already has chunk");
137 return Err(Error::AlreadyStored);
138 }
139 let price = payment_quote.price;
140 debug!("Received quote from {peer_id}: price = {price}");
141 Ok((payment_quote, price))
142}
143
144fn drop_quotes_with_bad_bindings(
147 quotes: &mut Vec<(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)>,
148) -> usize {
149 let before = quotes.len();
150 quotes.retain(|(peer_id, _, quote, _)| {
151 if quote_binding_is_valid(peer_id, quote) {
152 true
153 } else {
154 warn!(
155 "Dropping quote from peer {peer_id} — quote.pub_key BLAKE3 mismatch \
156 (peer is signing quotes with another peer's key); the storer would \
157 reject this proof"
158 );
159 false
160 }
161 });
162 before - quotes.len()
163}
164
165#[allow(clippy::too_many_arguments)]
166async fn request_store_quote_from_peer(
167 node: Arc<P2PNode>,
168 peer_id: PeerId,
169 peer_addrs: Vec<MultiAddr>,
170 request_id: u64,
171 address: [u8; 32],
172 data_size: u64,
173 data_type: u32,
174 per_peer_timeout: Duration,
175) -> StoreQuoteRequestResult {
176 let request = ChunkQuoteRequest {
177 address,
178 data_size,
179 data_type,
180 };
181 let message = ChunkMessage {
182 request_id,
183 body: ChunkMessageBody::QuoteRequest(request),
184 };
185
186 let message_bytes = match message.encode() {
187 Ok(bytes) => bytes,
188 Err(e) => {
189 return (
190 peer_id,
191 peer_addrs,
192 Err(Error::Protocol(format!(
193 "Failed to encode quote request for {peer_id}: {e}"
194 ))),
195 );
196 }
197 };
198
199 let result = send_and_await_chunk_response(
200 &node,
201 &peer_id,
202 message_bytes,
203 request_id,
204 per_peer_timeout,
205 &peer_addrs,
206 |body| match body {
207 ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
208 quote,
209 already_stored,
210 }) => Some(classify_quote_response(&peer_id, "e, already_stored)),
211 ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Error(e)) => Some(Err(
212 Error::Protocol(format!("Quote error from {peer_id}: {e}")),
213 )),
214 _ => None,
215 },
216 |e| Error::Network(format!("Failed to send quote request to {peer_id}: {e}")),
217 || Error::Timeout(format!("Timeout waiting for quote from {peer_id}")),
218 )
219 .await;
220
221 (peer_id, peer_addrs, result)
222}
223
224#[allow(clippy::too_many_arguments)]
225fn record_store_quote_result(
226 peer_id: PeerId,
227 addrs: Vec<MultiAddr>,
228 quote_result: Result<(PaymentQuote, Amount)>,
229 address: &[u8; 32],
230 quotes: &mut Vec<StoreQuote>,
231 already_stored_peers: &mut Vec<(PeerId, [u8; 32])>,
232 failures: &mut Vec<String>,
233 bad_quote_count: &mut usize,
234) {
235 match quote_result {
236 Ok((quote, price)) => {
237 quotes.push((peer_id, addrs, quote, price));
238 }
239 Err(Error::AlreadyStored) => {
240 info!("Peer {peer_id} reports chunk already stored");
241 let dist = peer_xor_distance(&peer_id, address);
242 already_stored_peers.push((peer_id, dist));
243 }
244 Err(e) => {
245 if matches!(&e, Error::BadQuoteBinding { .. }) {
246 *bad_quote_count += 1;
247 }
248 warn!("Failed to get quote from {peer_id}: {e}");
249 failures.push(format!("{peer_id}: {e}"));
250 }
251 }
252}
253
254fn witnessed_quote_launch_budget(
255 successful_quotes: usize,
256 in_flight: usize,
257 remaining_peers: usize,
258) -> usize {
259 CLOSE_GROUP_SIZE
260 .saturating_sub(successful_quotes.saturating_add(in_flight))
261 .min(remaining_peers)
262}
263
264fn single_node_quote_query_count() -> usize {
265 CLOSE_GROUP_SIZE
266}
267
268fn fault_tolerant_quote_query_count() -> usize {
269 CLOSE_GROUP_SIZE * FAULT_TOLERANT_QUOTE_QUERY_MULTIPLIER
270}
271
272fn witnessed_close_group_quorum() -> usize {
273 (CLOSE_GROUP_SIZE * WITNESSED_QUORUM_NUMERATOR).div_ceil(WITNESSED_QUORUM_DENOMINATOR)
274}
275
276fn witnessed_close_group_quorum_for_missing_views(missing_views: usize) -> usize {
277 witnessed_close_group_quorum()
278 .saturating_sub(missing_views)
279 .max(1)
280}
281
282fn missing_witnessed_responder_views(witnessed: &WitnessedCloseGroup) -> usize {
283 witnessed
284 .initial_closest
285 .len()
286 .saturating_sub(witnessed.responder_views.len())
287}
288
289fn witnessed_close_group_quorum_for_transcript(witnessed: &WitnessedCloseGroup) -> usize {
290 witnessed_close_group_quorum_for_missing_views(missing_witnessed_responder_views(witnessed))
291}
292
293fn scope_witnessed_to_close_group(witnessed: &WitnessedCloseGroup) -> WitnessedCloseGroup {
302 let initial_closest: Vec<DHTNode> = witnessed
303 .initial_closest
304 .iter()
305 .take(CLOSE_GROUP_SIZE)
306 .cloned()
307 .collect();
308 let scope: HashSet<PeerId> = initial_closest.iter().map(|node| node.peer_id).collect();
309 let responder_views: Vec<ResponderView> = witnessed
310 .responder_views
311 .iter()
312 .filter(|view| scope.contains(&view.responder))
313 .cloned()
314 .collect();
315 WitnessedCloseGroup {
316 target: witnessed.target,
317 k: CLOSE_GROUP_SIZE,
318 initial_closest,
319 responder_views,
320 }
321}
322
323fn peer_list(peers: &[PeerId]) -> Vec<String> {
324 peers.iter().map(ToString::to_string).collect()
325}
326
327pub(crate) type StoreQuote = (PeerId, Vec<MultiAddr>, PaymentQuote, Amount);
328type StoreQuoteRequestResult = (PeerId, Vec<MultiAddr>, Result<(PaymentQuote, Amount)>);
329type VotersByPeer = HashMap<PeerId, HashSet<PeerId>>;
330type WitnessedVoteData = (HashMap<PeerId, DHTNode>, VotersByPeer, Vec<(PeerId, usize)>);
331
332pub(crate) struct StoreQuotePlan {
333 pub(crate) quotes: Vec<StoreQuote>,
334 pub(crate) put_peers: Vec<(PeerId, Vec<MultiAddr>)>,
335}
336
337#[derive(Debug, Clone)]
338struct WitnessedQuoteCandidate {
339 node: DHTNode,
340 votes: usize,
341 voters: HashSet<PeerId>,
342}
343
344#[derive(Debug, Clone)]
345struct WitnessedQuotePeer {
346 peer_id: PeerId,
347 addrs: Vec<MultiAddr>,
348 voters: HashSet<PeerId>,
349}
350
351#[derive(Debug, Clone)]
352struct WitnessedQuoteSelection {
353 quote_peers: Vec<WitnessedQuotePeer>,
354 initial_put_peers: Vec<(PeerId, Vec<MultiAddr>)>,
355 quorum: usize,
356}
357
358enum QuoteSelectionPolicy {
359 ClosestByDistance,
360 WitnessedMedianVoters {
361 voters_by_peer: VotersByPeer,
362 quorum: usize,
363 },
364}
365
366fn witnessed_initial_peers(witnessed: &WitnessedCloseGroup) -> Vec<String> {
367 witnessed
368 .initial_closest
369 .iter()
370 .map(|node| node.peer_id.to_string())
371 .collect()
372}
373
374fn witnessed_responder_views(witnessed: &WitnessedCloseGroup) -> Vec<String> {
375 witnessed
376 .responder_views
377 .iter()
378 .map(|view| {
379 let peers = view
380 .closest
381 .iter()
382 .map(|node| node.peer_id)
383 .collect::<Vec<_>>();
384 format!("{}=>{:?}", view.responder, peer_list(&peers))
385 })
386 .collect()
387}
388
389fn merge_witnessed_node(nodes: &mut HashMap<PeerId, DHTNode>, node: DHTNode) {
390 match nodes.entry(node.peer_id) {
391 std::collections::hash_map::Entry::Occupied(mut entry) => {
392 entry.get_mut().merge_from(node);
393 }
394 std::collections::hash_map::Entry::Vacant(entry) => {
395 entry.insert(node);
396 }
397 }
398}
399
400fn sort_vote_counts_by_distance(vote_counts: &mut [(PeerId, usize)], address: &[u8; 32]) {
401 vote_counts.sort_by(|left, right| {
402 peer_xor_distance(&left.0, address)
403 .cmp(&peer_xor_distance(&right.0, address))
404 .then_with(|| left.0.as_bytes().cmp(right.0.as_bytes()))
405 });
406}
407
408fn witnessed_vote_counts_and_nodes(
409 witnessed: &WitnessedCloseGroup,
410 address: &[u8; 32],
411) -> WitnessedVoteData {
412 let mut known_nodes = HashMap::new();
413 for node in &witnessed.initial_closest {
414 merge_witnessed_node(&mut known_nodes, node.clone());
415 }
416
417 let mut voters_by_peer: HashMap<PeerId, HashSet<PeerId>> = HashMap::new();
418 for view in &witnessed.responder_views {
419 let mut voted = HashSet::new();
420 for node in &view.closest {
421 merge_witnessed_node(&mut known_nodes, node.clone());
422 if voted.insert(node.peer_id) {
423 voters_by_peer
424 .entry(node.peer_id)
425 .or_default()
426 .insert(view.responder);
427 }
428 }
429 }
430
431 let mut vote_counts: Vec<(PeerId, usize)> = voters_by_peer
432 .iter()
433 .map(|(peer_id, voters)| (*peer_id, voters.len()))
434 .collect();
435 sort_vote_counts_by_distance(&mut vote_counts, address);
436 (known_nodes, voters_by_peer, vote_counts)
437}
438
439fn witnessed_consensus_candidates(
440 witnessed: &WitnessedCloseGroup,
441 address: &[u8; 32],
442 quorum: usize,
443) -> Vec<WitnessedQuoteCandidate> {
444 let (known_nodes, voters_by_peer, vote_counts) =
445 witnessed_vote_counts_and_nodes(witnessed, address);
446 let mut candidates = vote_counts
447 .iter()
448 .filter_map(|(peer_id, votes)| {
449 if *votes < quorum {
450 return None;
451 }
452 known_nodes.get(peer_id).cloned().and_then(|node| {
453 voters_by_peer
454 .get(peer_id)
455 .cloned()
456 .map(|voters| WitnessedQuoteCandidate {
457 node,
458 votes: *votes,
459 voters,
460 })
461 })
462 })
463 .collect::<Vec<_>>();
464
465 candidates.sort_by(|left, right| {
466 peer_xor_distance(&left.node.peer_id, address)
467 .cmp(&peer_xor_distance(&right.node.peer_id, address))
468 .then_with(|| right.votes.cmp(&left.votes))
469 .then_with(|| {
470 left.node
471 .peer_id
472 .as_bytes()
473 .cmp(right.node.peer_id.as_bytes())
474 })
475 });
476 candidates
477}
478
479fn witnessed_vote_counts(witnessed: &WitnessedCloseGroup, address: &[u8; 32]) -> Vec<String> {
480 let (_, _, vote_counts) = witnessed_vote_counts_and_nodes(witnessed, address);
481 vote_counts
482 .iter()
483 .map(|(peer_id, votes)| format!("{peer_id}:{votes}"))
484 .collect()
485}
486
487fn witnessed_consensus(
488 witnessed: &WitnessedCloseGroup,
489 address: &[u8; 32],
490 quorum: usize,
491) -> Vec<String> {
492 witnessed_consensus_candidates(witnessed, address, quorum)
493 .iter()
494 .map(|candidate| format!("{}:{}", candidate.node.peer_id, candidate.votes))
495 .collect()
496}
497
498fn witnessed_close_group_diagnostics(
499 address: &[u8; 32],
500 witnessed: &WitnessedCloseGroup,
501 quorum: usize,
502) -> String {
503 format!(
504 "target={}, initial={:?}, responder_views={:?}, vote_counts={:?}, quorum={}, final={:?}",
505 hex::encode(address),
506 witnessed_initial_peers(witnessed),
507 witnessed_responder_views(witnessed),
508 witnessed_vote_counts(witnessed, address),
509 quorum,
510 witnessed_consensus(witnessed, address, quorum)
511 )
512}
513
514fn witnessed_quote_selection_or_error(
515 address: &[u8; 32],
516 witnessed: &WitnessedCloseGroup,
517 required: usize,
518 quorum: usize,
519) -> Result<WitnessedQuoteSelection> {
520 let candidates = witnessed_consensus_candidates(witnessed, address, quorum);
521 if candidates.len() < required {
522 return Err(Error::InsufficientPeers(format!(
523 "Witnessed close group inconclusive before payment: got {}/{} quorum-recognised peers. {}",
524 candidates.len(),
525 required,
526 witnessed_close_group_diagnostics(address, witnessed, quorum)
527 )));
528 }
529
530 let initial_put_peers = witnessed
531 .initial_closest
532 .iter()
533 .take(CLOSE_GROUP_SIZE)
534 .map(|node| (node.peer_id, node.addresses_by_priority()))
535 .collect::<Vec<_>>();
536
537 if initial_put_peers.len() < CLOSE_GROUP_SIZE {
538 return Err(Error::InsufficientPeers(format!(
539 "Witnessed close group returned only {}/{} initial PUT peers before payment. {}",
540 initial_put_peers.len(),
541 CLOSE_GROUP_SIZE,
542 witnessed_close_group_diagnostics(address, witnessed, quorum)
543 )));
544 }
545
546 let quote_peers = candidates
547 .into_iter()
548 .map(|candidate| WitnessedQuotePeer {
549 peer_id: candidate.node.peer_id,
550 addrs: candidate.node.addresses_by_priority(),
551 voters: candidate.voters,
552 })
553 .collect();
554
555 Ok(WitnessedQuoteSelection {
556 quote_peers,
557 initial_put_peers,
558 quorum,
559 })
560}
561
562pub(crate) fn median_paid_quote_issuer(
563 quotes: &[(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)],
564) -> Option<(PeerId, Amount)> {
565 if quotes.len() <= MEDIAN_QUOTE_INDEX {
566 return None;
567 }
568
569 let mut by_price: Vec<(usize, PeerId, Amount)> = quotes
570 .iter()
571 .enumerate()
572 .map(|(index, (peer_id, _, _, price))| (index, *peer_id, *price))
573 .collect();
574 by_price.sort_by_key(|(index, _, price)| (*price, *index));
575 by_price
576 .get(MEDIAN_QUOTE_INDEX)
577 .map(|(_, peer_id, price)| (*peer_id, *price))
578}
579
580fn sort_quotes_by_distance(quotes: &mut [StoreQuote], address: &[u8; 32]) {
581 quotes.sort_by(|left, right| {
582 peer_xor_distance(&left.0, address)
583 .cmp(&peer_xor_distance(&right.0, address))
584 .then_with(|| left.0.as_bytes().cmp(right.0.as_bytes()))
585 });
586}
587
588fn median_paid_quote_issuer_for_indices(
589 quotes: &[StoreQuote],
590 indices: &[usize],
591) -> Option<(PeerId, Amount)> {
592 if indices.len() <= MEDIAN_QUOTE_INDEX {
593 return None;
594 }
595
596 let mut by_price: Vec<(usize, PeerId, Amount)> = indices
597 .iter()
598 .enumerate()
599 .map(|(selected_index, quote_index)| {
600 let (peer_id, _, _, price) = "es[*quote_index];
601 (selected_index, *peer_id, *price)
602 })
603 .collect();
604 by_price.sort_by_key(|(selected_index, _, price)| (*price, *selected_index));
605 by_price
606 .get(MEDIAN_QUOTE_INDEX)
607 .map(|(_, peer_id, price)| (*peer_id, *price))
608}
609
610fn median_issuer_voter_support(
611 quotes: &[StoreQuote],
612 indices: &[usize],
613 voters_by_peer: &VotersByPeer,
614) -> Option<(PeerId, usize)> {
615 let (median_peer_id, _) = median_paid_quote_issuer_for_indices(quotes, indices)?;
616 let voters = voters_by_peer.get(&median_peer_id)?;
617 Some((median_peer_id, voters.len()))
618}
619
620fn visit_quote_subsets<F>(
621 quote_count: usize,
622 subset_size: usize,
623 start_index: usize,
624 current: &mut Vec<usize>,
625 visit: &mut F,
626) where
627 F: FnMut(&[usize]),
628{
629 if current.len() == subset_size {
630 visit(current);
631 return;
632 }
633
634 let remaining = subset_size - current.len();
635 let last_start = quote_count - remaining;
636 for index in start_index..=last_start {
637 current.push(index);
638 visit_quote_subsets(quote_count, subset_size, index + 1, current, visit);
639 current.pop();
640 }
641}
642
643fn select_closest_quotes(mut quotes: Vec<StoreQuote>, address: &[u8; 32]) -> Vec<StoreQuote> {
644 sort_quotes_by_distance(&mut quotes, address);
645 quotes.truncate(CLOSE_GROUP_SIZE);
646 quotes
647}
648
649fn select_witnessed_median_voter_quotes(
650 mut quotes: Vec<StoreQuote>,
651 address: &[u8; 32],
652 voters_by_peer: &VotersByPeer,
653 required_support: usize,
654) -> Option<Vec<StoreQuote>> {
655 if quotes.len() < CLOSE_GROUP_SIZE {
656 return None;
657 }
658
659 sort_quotes_by_distance(&mut quotes, address);
660
661 let mut best_indices: Option<(usize, Vec<usize>)> = None;
662 let mut current_indices = Vec::with_capacity(CLOSE_GROUP_SIZE);
663 visit_quote_subsets(
664 quotes.len(),
665 CLOSE_GROUP_SIZE,
666 0,
667 &mut current_indices,
668 &mut |indices| {
669 let Some((_, support)) = median_issuer_voter_support("es, indices, voters_by_peer)
670 else {
671 return;
672 };
673 if support < required_support {
674 return;
675 }
676 match &best_indices {
677 Some((best_support, best)) if *best_support > support => {}
678 Some((best_support, best))
679 if *best_support == support && best.as_slice() <= indices => {}
680 _ => best_indices = Some((support, indices.to_vec())),
681 }
682 },
683 );
684
685 best_indices.map(|(_, indices)| {
686 indices
687 .into_iter()
688 .map(|index| quotes[index].clone())
689 .collect()
690 })
691}
692
693fn put_peers_with_median_voters_first(
694 quotes: &[StoreQuote],
695 put_peers: &[(PeerId, Vec<MultiAddr>)],
696 voters_by_peer: &VotersByPeer,
697 required_support: usize,
698) -> Option<Vec<(PeerId, Vec<MultiAddr>)>> {
699 let (median_peer_id, _) = median_paid_quote_issuer(quotes)?;
700 let voters = voters_by_peer.get(&median_peer_id)?;
701
702 let mut supporting_peers = Vec::new();
703 let mut fallback_peers = Vec::new();
704 for (peer_id, addrs) in put_peers {
705 let peer = (*peer_id, addrs.clone());
706 if voters.contains(peer_id) {
707 supporting_peers.push(peer);
708 } else {
709 fallback_peers.push(peer);
710 }
711 }
712
713 if supporting_peers.len() < required_support {
714 return None;
715 }
716
717 supporting_peers.extend(fallback_peers);
718 Some(supporting_peers)
719}
720
721impl Client {
722 pub async fn get_store_quotes(
738 &self,
739 address: &[u8; 32],
740 data_size: u64,
741 data_type: u32,
742 ) -> Result<Vec<(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)>> {
743 Ok(self
744 .get_store_quote_plan(address, data_size, data_type)
745 .await?
746 .quotes)
747 }
748
749 pub(crate) async fn get_store_quote_plan(
756 &self,
757 address: &[u8; 32],
758 data_size: u64,
759 data_type: u32,
760 ) -> Result<StoreQuotePlan> {
761 let witnessed_selection = self.select_witnessed_quote_selection(address).await?;
762 let voters_by_peer: VotersByPeer = witnessed_selection
763 .quote_peers
764 .iter()
765 .map(|peer| (peer.peer_id, peer.voters.clone()))
766 .collect();
767 let remote_peers = witnessed_selection
768 .quote_peers
769 .into_iter()
770 .map(|peer| (peer.peer_id, peer.addrs))
771 .collect();
772 let initial_put_peers = witnessed_selection.initial_put_peers;
773 let quorum = witnessed_selection.quorum;
774 let quotes = self
775 .collect_store_quotes_from_remote_peers(
776 address,
777 data_size,
778 data_type,
779 remote_peers,
780 QuoteSelectionPolicy::WitnessedMedianVoters {
781 voters_by_peer: voters_by_peer.clone(),
782 quorum,
783 },
784 )
785 .await?;
786 let put_peers = put_peers_with_median_voters_first(
787 "es,
788 &initial_put_peers,
789 &voters_by_peer,
790 quorum,
791 )
792 .ok_or_else(|| {
793 Error::InsufficientPeers(format!(
794 "Collected {} witnessed quotes, but fewer than {} initial witness PUT peers \
795 voted for the paid median issuer for {}",
796 quotes.len(),
797 quorum,
798 hex::encode(address)
799 ))
800 })?;
801
802 Ok(StoreQuotePlan { quotes, put_peers })
803 }
804
805 pub(crate) async fn get_store_quotes_with_fault_tolerance(
812 &self,
813 address: &[u8; 32],
814 data_size: u64,
815 data_type: u32,
816 ) -> Result<Vec<(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)>> {
817 let peer_query_count = fault_tolerant_quote_query_count();
818 let remote_peers = self
819 .network()
820 .find_closest_peers(address, peer_query_count)
821 .await?;
822
823 self.collect_store_quotes_from_remote_peers(
824 address,
825 data_size,
826 data_type,
827 remote_peers,
828 QuoteSelectionPolicy::ClosestByDistance,
829 )
830 .await
831 }
832
833 async fn select_witnessed_quote_selection(
834 &self,
835 address: &[u8; 32],
836 ) -> Result<WitnessedQuoteSelection> {
837 let required = single_node_quote_query_count();
839 let witnessed = match self
845 .network()
846 .find_witnessed_close_group_with_view_count(
847 address,
848 PUT_TARGET_WIDTH,
849 SINGLE_NODE_WITNESSED_VIEW_COUNT,
850 )
851 .await
852 {
853 Ok(witnessed) => witnessed,
854 Err(wide_err) => {
855 debug!(
856 target = %hex::encode(address),
857 "Wide witnessed lookup ({PUT_TARGET_WIDTH}) failed ({wide_err}); \
858 retrying at close-group width ({required})"
859 );
860 self.network()
861 .find_witnessed_close_group_with_view_count(
862 address,
863 required,
864 SINGLE_NODE_WITNESSED_VIEW_COUNT,
865 )
866 .await
867 .map_err(|e| {
868 Error::InsufficientPeers(format!(
869 "Witnessed close group lookup failed before payment for target {}: {e}",
870 hex::encode(address)
871 ))
872 })?
873 }
874 };
875 let witnessed_quote = scope_witnessed_to_close_group(&witnessed);
878 let base_quorum = witnessed_close_group_quorum();
879 let missing_views = missing_witnessed_responder_views(&witnessed_quote);
880 let quorum = witnessed_close_group_quorum_for_transcript(&witnessed_quote);
881
882 if missing_views > 0 {
883 warn!(
884 target = %hex::encode(address),
885 initial = witnessed_quote.initial_closest.len(),
886 responder_views = witnessed_quote.responder_views.len(),
887 missing_views = missing_views,
888 base_quorum = base_quorum,
889 adjusted_quorum = quorum,
890 "Witnessed close group transcript is missing responder views; lowering SNP witness quorum"
891 );
892 }
893
894 debug!(
895 target = %hex::encode(address),
896 quorum = quorum,
897 view_count = SINGLE_NODE_WITNESSED_VIEW_COUNT,
898 initial = ?witnessed_initial_peers(&witnessed_quote),
899 responder_views = ?witnessed_responder_views(&witnessed_quote),
900 vote_counts = ?witnessed_vote_counts(&witnessed_quote, address),
901 final_witnessed_set = ?witnessed_consensus(&witnessed_quote, address, quorum),
902 "Witnessed close group selected for SNP quote collection"
903 );
904
905 let mut selection =
906 witnessed_quote_selection_or_error(address, &witnessed_quote, required, quorum)?;
907 selection.initial_put_peers = witnessed
911 .initial_closest
912 .iter()
913 .take(PUT_TARGET_WIDTH)
914 .map(|node| (node.peer_id, node.addresses_by_priority()))
915 .collect();
916 Ok(selection)
917 }
918
919 #[allow(clippy::too_many_lines)]
920 async fn collect_store_quotes_from_remote_peers(
921 &self,
922 address: &[u8; 32],
923 data_size: u64,
924 data_type: u32,
925 remote_peers: Vec<(PeerId, Vec<MultiAddr>)>,
926 quote_selection_policy: QuoteSelectionPolicy,
927 ) -> Result<Vec<(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)>> {
928 let peer_query_count = remote_peers.len();
929
930 let node = self.network().node();
931
932 debug!(
933 "Requesting quotes from up to {peer_query_count} peers for address {} (size: {data_size})",
934 hex::encode(address)
935 );
936
937 if remote_peers.len() < CLOSE_GROUP_SIZE {
938 return Err(Error::InsufficientPeers(format!(
939 "Found {} peers, need {CLOSE_GROUP_SIZE}",
940 remote_peers.len()
941 )));
942 }
943 debug_assert!(peer_query_count >= CLOSE_GROUP_SIZE);
944
945 let per_peer_timeout = Duration::from_secs(self.config().quote_timeout_secs);
946 let overall_timeout = Duration::from_secs(QUOTE_COLLECTION_TIMEOUT_SECS);
947
948 let mut quotes = Vec::with_capacity(peer_query_count);
952 let mut already_stored_peers: Vec<(PeerId, [u8; 32])> = Vec::new();
953 let mut failures: Vec<String> = Vec::new();
954
955 let mut bad_quote_count = 0usize;
960
961 let staged_witnessed_collection = matches!(
962 "e_selection_policy,
963 QuoteSelectionPolicy::WitnessedMedianVoters { .. }
964 );
965
966 if staged_witnessed_collection {
967 let mut quote_futures = FuturesUnordered::new();
968 let mut next_peer_index = 0usize;
969 let collect_result: std::result::Result<std::result::Result<(), Error>, _> =
970 tokio::time::timeout(overall_timeout, async {
971 loop {
972 let launch_count = witnessed_quote_launch_budget(
973 quotes.len(),
974 quote_futures.len(),
975 remote_peers.len().saturating_sub(next_peer_index),
976 );
977 for _ in 0..launch_count {
978 let (peer_id, peer_addrs) = &remote_peers[next_peer_index];
979 next_peer_index += 1;
980 quote_futures.push(request_store_quote_from_peer(
981 node.clone(),
982 *peer_id,
983 peer_addrs.clone(),
984 self.next_request_id(),
985 *address,
986 data_size,
987 data_type,
988 per_peer_timeout,
989 ));
990 }
991
992 if quotes.len() >= CLOSE_GROUP_SIZE || quote_futures.is_empty() {
993 break;
994 }
995
996 let Some((peer_id, addrs, quote_result)) = quote_futures.next().await
997 else {
998 break;
999 };
1000 record_store_quote_result(
1001 peer_id,
1002 addrs,
1003 quote_result,
1004 address,
1005 &mut quotes,
1006 &mut already_stored_peers,
1007 &mut failures,
1008 &mut bad_quote_count,
1009 );
1010 }
1011 Ok(())
1012 })
1013 .await;
1014
1015 match collect_result {
1016 Err(_elapsed) => {
1017 warn!(
1018 "Quote collection timed out after {overall_timeout:?} for address {}",
1019 hex::encode(address)
1020 );
1021 }
1022 Ok(Err(e)) => return Err(e),
1023 Ok(Ok(())) => {}
1024 }
1025 } else {
1026 let mut quote_futures = FuturesUnordered::new();
1030
1031 for (peer_id, peer_addrs) in &remote_peers {
1032 quote_futures.push(request_store_quote_from_peer(
1033 node.clone(),
1034 *peer_id,
1035 peer_addrs.clone(),
1036 self.next_request_id(),
1037 *address,
1038 data_size,
1039 data_type,
1040 per_peer_timeout,
1041 ));
1042 }
1043
1044 let collect_result: std::result::Result<std::result::Result<(), Error>, _> =
1045 tokio::time::timeout(overall_timeout, async {
1046 while let Some((peer_id, addrs, quote_result)) = quote_futures.next().await {
1047 record_store_quote_result(
1048 peer_id,
1049 addrs,
1050 quote_result,
1051 address,
1052 &mut quotes,
1053 &mut already_stored_peers,
1054 &mut failures,
1055 &mut bad_quote_count,
1056 );
1057 }
1058 Ok(())
1059 })
1060 .await;
1061
1062 match collect_result {
1063 Err(_elapsed) => {
1064 warn!(
1065 "Quote collection timed out after {overall_timeout:?} for address {}",
1066 hex::encode(address)
1067 );
1068 }
1072 Ok(Err(e)) => return Err(e),
1073 Ok(Ok(())) => {}
1074 }
1075 }
1076
1077 let bad_dropped = drop_quotes_with_bad_bindings(&mut quotes);
1083 if bad_dropped > 0 {
1084 warn!(
1085 "Defensive filter dropped {bad_dropped} quotes with mismatched peer bindings \
1086 for address {} — the per-peer handler should have caught these earlier \
1087 (this indicates an upstream regression)",
1088 hex::encode(address),
1089 );
1090 bad_quote_count += bad_dropped;
1091 }
1092
1093 if !already_stored_peers.is_empty() {
1095 let mut all_peers_by_distance: Vec<(bool, [u8; 32])> = Vec::new();
1096 for (peer_id, _, _, _) in "es {
1097 all_peers_by_distance.push((false, peer_xor_distance(peer_id, address)));
1098 }
1099 for (_, dist) in &already_stored_peers {
1100 all_peers_by_distance.push((true, *dist));
1101 }
1102 all_peers_by_distance.sort_by_key(|a| a.1);
1103
1104 let close_group_stored = all_peers_by_distance
1105 .iter()
1106 .take(CLOSE_GROUP_SIZE)
1107 .filter(|(is_stored, _)| *is_stored)
1108 .count();
1109
1110 if close_group_stored >= CLOSE_GROUP_MAJORITY {
1111 debug!(
1112 "Chunk {} already stored ({close_group_stored}/{CLOSE_GROUP_SIZE} close-group peers confirm)",
1113 hex::encode(address)
1114 );
1115 return Err(Error::AlreadyStored);
1116 }
1117 }
1118
1119 let already_stored_count = already_stored_peers.len();
1120 let failure_count = failures.len();
1121 let quote_count = quotes.len();
1122 let total_responses = quote_count + failure_count + already_stored_count;
1123
1124 if quotes.len() >= CLOSE_GROUP_SIZE {
1125 let selected_quotes = match quote_selection_policy {
1126 QuoteSelectionPolicy::ClosestByDistance => select_closest_quotes(quotes, address),
1127 QuoteSelectionPolicy::WitnessedMedianVoters {
1128 voters_by_peer,
1129 quorum,
1130 } => select_witnessed_median_voter_quotes(quotes, address, &voters_by_peer, quorum)
1131 .ok_or_else(|| {
1132 Error::InsufficientPeers(format!(
1133 "Got {quote_count} quotes, need {CLOSE_GROUP_SIZE} whose paid \
1134 median issuer is recognised by at least {} \
1135 selected witness peers ({total_responses} responses: \
1136 {already_stored_count} already_stored, {failure_count} failed \
1137 including {bad_quote_count} with mismatched peer bindings). \
1138 Failures: [{}]",
1139 quorum,
1140 failures.join("; ")
1141 ))
1142 })?,
1143 };
1144
1145 info!(
1146 "Collected {} quotes for address {} ({total_responses} responses: \
1147 {quote_count} ok, {already_stored_count} already_stored, {failure_count} failed, \
1148 {bad_quote_count} bad-binding)",
1149 selected_quotes.len(),
1150 hex::encode(address),
1151 );
1152 return Ok(selected_quotes);
1153 }
1154
1155 Err(Error::InsufficientPeers(format!(
1156 "Got {quote_count} quotes, need {CLOSE_GROUP_SIZE} ({total_responses} responses: \
1157 {already_stored_count} already_stored, {failure_count} failed including \
1158 {bad_quote_count} with mismatched peer bindings). Failures: [{}]",
1159 failures.join("; ")
1160 )))
1161 }
1162}
1163
1164#[cfg(test)]
1165#[allow(clippy::unwrap_used, clippy::expect_used)]
1166mod tests {
1167 use super::*;
1179 use ant_protocol::evm::RewardsAddress;
1180 use ant_protocol::pqc::ops::{MlDsaOperations, MlDsaPublicKey};
1181 use ant_protocol::transport::{DHTNode, MlDsa65, ResponderView, WitnessedCloseGroup};
1182 use std::time::SystemTime;
1183 use xor_name::XorName;
1184
1185 struct Keypair {
1187 peer_id: PeerId,
1188 pub_key_bytes: Vec<u8>,
1189 }
1190
1191 fn gen_keypair() -> Keypair {
1192 let ml_dsa = MlDsa65::new();
1193 let (pub_key, _sk) = ml_dsa.generate_keypair().expect("ML-DSA-65 keygen");
1194 let pub_key_bytes = pub_key.as_bytes().to_vec();
1195 let peer_id = PeerId::from_bytes(compute_address(&pub_key_bytes));
1196 Keypair {
1197 peer_id,
1198 pub_key_bytes,
1199 }
1200 }
1201
1202 fn good_quote_real() -> (PeerId, Vec<MultiAddr>, PaymentQuote, Amount) {
1205 let kp = gen_keypair();
1206 let quote = PaymentQuote {
1207 content: XorName([0u8; 32]),
1208 timestamp: SystemTime::UNIX_EPOCH,
1209 price: Amount::ZERO,
1210 rewards_address: RewardsAddress::new([0u8; 20]),
1211 pub_key: kp.pub_key_bytes,
1212 signature: Vec::new(),
1213 };
1214 (kp.peer_id, Vec::new(), quote, Amount::ZERO)
1215 }
1216
1217 fn bad_quote_real() -> (PeerId, Vec<MultiAddr>, PaymentQuote, Amount) {
1222 let claimed = gen_keypair();
1223 let signing = gen_keypair();
1224 assert_ne!(claimed.pub_key_bytes, signing.pub_key_bytes);
1225 assert_ne!(claimed.peer_id.as_bytes(), signing.peer_id.as_bytes());
1226 let quote = PaymentQuote {
1227 content: XorName([0u8; 32]),
1228 timestamp: SystemTime::UNIX_EPOCH,
1229 price: Amount::ZERO,
1230 rewards_address: RewardsAddress::new([0u8; 20]),
1231 pub_key: signing.pub_key_bytes,
1232 signature: Vec::new(),
1233 };
1234 (claimed.peer_id, Vec::new(), quote, Amount::ZERO)
1235 }
1236
1237 fn witnessed_test_node(seed: u8) -> DHTNode {
1238 DHTNode {
1239 peer_id: PeerId::from_bytes([seed; 32]),
1240 addresses: Vec::new(),
1241 address_types: Vec::new(),
1242 distance: None,
1243 reliability: 1.0,
1244 }
1245 }
1246
1247 fn witnessed_test_nodes(seeds: &[u8]) -> Vec<DHTNode> {
1248 seeds.iter().copied().map(witnessed_test_node).collect()
1249 }
1250
1251 fn witnessed_test_view(responder: u8, closest: &[u8]) -> ResponderView {
1252 ResponderView {
1253 responder: PeerId::from_bytes([responder; 32]),
1254 closest: witnessed_test_nodes(closest),
1255 }
1256 }
1257
1258 fn synthetic_peer(seed: u8) -> PeerId {
1259 PeerId::from_bytes([seed; 32])
1260 }
1261
1262 fn synthetic_quote(seed: u8, price: u64) -> (PeerId, Vec<MultiAddr>, PaymentQuote, Amount) {
1263 let amount = Amount::from(price);
1264 let quote = PaymentQuote {
1265 content: XorName([0u8; 32]),
1266 timestamp: SystemTime::UNIX_EPOCH,
1267 price: amount,
1268 rewards_address: RewardsAddress::new([0u8; 20]),
1269 pub_key: Vec::new(),
1270 signature: Vec::new(),
1271 };
1272 (synthetic_peer(seed), Vec::new(), quote, amount)
1273 }
1274
1275 fn synthetic_voters(seeds: &[u8]) -> HashSet<PeerId> {
1276 seeds.iter().copied().map(synthetic_peer).collect()
1277 }
1278
1279 fn quote_peer_seeds(quotes: &[(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)]) -> Vec<u8> {
1280 quotes
1281 .iter()
1282 .map(|(peer_id, _, _, _)| peer_id.as_bytes()[0])
1283 .collect()
1284 }
1285
1286 fn put_peer_seeds(peers: &[(PeerId, Vec<MultiAddr>)]) -> Vec<u8> {
1287 peers
1288 .iter()
1289 .map(|(peer_id, _)| peer_id.as_bytes()[0])
1290 .collect()
1291 }
1292
1293 fn put_peers_from_seeds(seeds: &[u8]) -> Vec<(PeerId, Vec<MultiAddr>)> {
1294 seeds
1295 .iter()
1296 .copied()
1297 .map(|seed| (synthetic_peer(seed), Vec::new()))
1298 .collect()
1299 }
1300
1301 fn storer_binding_would_accept(peer_id: &PeerId, quote: &PaymentQuote) -> bool {
1310 if MlDsaPublicKey::from_bytes("e.pub_key).is_err() {
1311 return false;
1312 }
1313 compute_address("e.pub_key) == *peer_id.as_bytes()
1314 }
1315
1316 #[test]
1321 fn binding_accepts_real_self_consistent_keypair() {
1322 let (peer_id, _, quote, _) = good_quote_real();
1323 assert!(quote_binding_is_valid(&peer_id, "e));
1326 assert!(storer_binding_would_accept(&peer_id, "e));
1328 }
1329
1330 #[test]
1331 fn binding_rejects_real_crossed_keypair() {
1332 let (peer_id, _, quote, _) = bad_quote_real();
1333 assert!(!quote_binding_is_valid(&peer_id, "e));
1334 assert!(!storer_binding_would_accept(&peer_id, "e));
1335 }
1336
1337 #[test]
1338 fn binding_rejects_oversize_pubkey() {
1339 let oversized = vec![0u8; ML_DSA_PUB_KEY_LEN + 1];
1343 let peer_id = PeerId::from_bytes(compute_address(&oversized));
1344 let quote = PaymentQuote {
1345 content: XorName([0u8; 32]),
1346 timestamp: SystemTime::UNIX_EPOCH,
1347 price: Amount::ZERO,
1348 rewards_address: RewardsAddress::new([0u8; 20]),
1349 pub_key: oversized,
1350 signature: Vec::new(),
1351 };
1352 assert_eq!(compute_address("e.pub_key), *peer_id.as_bytes());
1355 assert!(
1356 !quote_binding_is_valid(&peer_id, "e),
1357 "predicate must reject oversize pub_key even when BLAKE3 happens to match"
1358 );
1359 assert!(!storer_binding_would_accept(&peer_id, "e));
1360 }
1361
1362 #[test]
1363 fn binding_rejects_undersize_pubkey() {
1364 let undersized = vec![0u8; ML_DSA_PUB_KEY_LEN - 1];
1365 let peer_id = PeerId::from_bytes(compute_address(&undersized));
1366 let quote = PaymentQuote {
1367 content: XorName([0u8; 32]),
1368 timestamp: SystemTime::UNIX_EPOCH,
1369 price: Amount::ZERO,
1370 rewards_address: RewardsAddress::new([0u8; 20]),
1371 pub_key: undersized,
1372 signature: Vec::new(),
1373 };
1374 assert!(!quote_binding_is_valid(&peer_id, "e));
1375 assert!(!storer_binding_would_accept(&peer_id, "e));
1376 }
1377
1378 #[test]
1383 fn quote_query_counts_keep_single_node_close_group_only() {
1384 assert_eq!(single_node_quote_query_count(), CLOSE_GROUP_SIZE);
1385 assert_eq!(SINGLE_NODE_WITNESSED_VIEW_COUNT, 20);
1386 assert!(SINGLE_NODE_WITNESSED_VIEW_COUNT > single_node_quote_query_count());
1387 assert_eq!(witnessed_close_group_quorum(), 5);
1388 assert_eq!(witnessed_close_group_quorum_for_missing_views(0), 5);
1389 assert_eq!(witnessed_close_group_quorum_for_missing_views(1), 4);
1390 assert_eq!(witnessed_close_group_quorum_for_missing_views(2), 3);
1391 assert_eq!(
1392 fault_tolerant_quote_query_count(),
1393 CLOSE_GROUP_SIZE * FAULT_TOLERANT_QUOTE_QUERY_MULTIPLIER
1394 );
1395 assert!(fault_tolerant_quote_query_count() > single_node_quote_query_count());
1396 }
1397
1398 #[test]
1399 fn witnessed_quote_launch_budget_keeps_exact_quote_window() {
1400 assert_eq!(
1401 witnessed_quote_launch_budget(0, 0, CLOSE_GROUP_SIZE * 2),
1402 CLOSE_GROUP_SIZE,
1403 "initial SNP quote fetch should launch the closest seven peers"
1404 );
1405 assert_eq!(
1406 witnessed_quote_launch_budget(1, CLOSE_GROUP_SIZE - 1, CLOSE_GROUP_SIZE),
1407 0,
1408 "a successful quote should not launch an extra fallback"
1409 );
1410 assert_eq!(
1411 witnessed_quote_launch_budget(0, CLOSE_GROUP_SIZE - 1, CLOSE_GROUP_SIZE),
1412 1,
1413 "a failed in-flight quote should launch the next closest fallback"
1414 );
1415 assert_eq!(
1416 witnessed_quote_launch_budget(CLOSE_GROUP_SIZE - 1, 0, 3),
1417 1,
1418 "only one more peer is needed for the seventh quote"
1419 );
1420 assert_eq!(
1421 witnessed_quote_launch_budget(0, 0, CLOSE_GROUP_SIZE - 1),
1422 CLOSE_GROUP_SIZE - 1,
1423 "launch budget is capped by remaining candidates"
1424 );
1425 }
1426
1427 #[test]
1428 fn witnessed_candidates_sort_by_xor_distance_then_votes() {
1429 let address = [0u8; 32];
1430 let witnessed = WitnessedCloseGroup {
1431 target: address,
1432 k: CLOSE_GROUP_SIZE,
1433 initial_closest: witnessed_test_nodes(&[1, 2, 3, 4, 5, 6, 7]),
1434 responder_views: vec![
1435 witnessed_test_view(1, &[1, 9]),
1436 witnessed_test_view(2, &[1, 9]),
1437 witnessed_test_view(3, &[1, 9]),
1438 witnessed_test_view(4, &[1, 9]),
1439 witnessed_test_view(5, &[1, 9]),
1440 witnessed_test_view(6, &[9]),
1441 witnessed_test_view(7, &[9]),
1442 ],
1443 };
1444
1445 let candidates =
1446 witnessed_consensus_candidates(&witnessed, &address, witnessed_close_group_quorum());
1447
1448 assert_eq!(
1449 candidates
1450 .iter()
1451 .map(|candidate| candidate.node.peer_id.as_bytes()[0])
1452 .collect::<Vec<_>>(),
1453 vec![1, 9],
1454 "XOR closeness must be the primary sort before quote collection"
1455 );
1456 }
1457
1458 fn ascending_seeds(count: usize) -> Vec<u8> {
1460 (1..=count)
1461 .map(|n| u8::try_from(n).expect("test seed fits in u8"))
1462 .collect()
1463 }
1464
1465 #[test]
1466 fn scope_witnessed_to_close_group_matches_native_close_group_query() {
1467 const RESPONDED_IN_SCOPE: usize = 5;
1471 const OUT_OF_SCOPE_RESPONDERS: usize = 2;
1473
1474 let address = [0u8; 32];
1475 let close_seeds = ascending_seeds(CLOSE_GROUP_SIZE);
1476 let view_closest = [1, 2, 8, 9];
1479 let in_scope_views = || -> Vec<ResponderView> {
1480 ascending_seeds(RESPONDED_IN_SCOPE)
1481 .into_iter()
1482 .map(|responder| witnessed_test_view(responder, &view_closest))
1483 .collect()
1484 };
1485
1486 let mut wide_views = in_scope_views();
1490 for offset in 1..=OUT_OF_SCOPE_RESPONDERS {
1491 let responder =
1492 u8::try_from(CLOSE_GROUP_SIZE + offset).expect("out-of-scope seed fits in u8");
1493 wide_views.push(witnessed_test_view(responder, &[1, 2, 3]));
1494 }
1495 let wide = WitnessedCloseGroup {
1496 target: address,
1497 k: PUT_TARGET_WIDTH,
1498 initial_closest: witnessed_test_nodes(&ascending_seeds(PUT_TARGET_WIDTH)),
1499 responder_views: wide_views,
1500 };
1501
1502 let native = WitnessedCloseGroup {
1505 target: address,
1506 k: CLOSE_GROUP_SIZE,
1507 initial_closest: witnessed_test_nodes(&close_seeds),
1508 responder_views: in_scope_views(),
1509 };
1510
1511 let scoped = scope_witnessed_to_close_group(&wide);
1512
1513 assert_eq!(scoped.target, wide.target);
1515 assert_eq!(scoped.k, CLOSE_GROUP_SIZE);
1516 assert_eq!(
1517 scoped
1518 .initial_closest
1519 .iter()
1520 .map(|node| node.peer_id.as_bytes()[0])
1521 .collect::<Vec<_>>(),
1522 close_seeds,
1523 "initial set must be the closest CLOSE_GROUP_SIZE, in order"
1524 );
1525
1526 assert_eq!(
1529 scoped
1530 .responder_views
1531 .iter()
1532 .map(|view| view.responder.as_bytes()[0])
1533 .collect::<Vec<_>>(),
1534 ascending_seeds(RESPONDED_IN_SCOPE),
1535 "only responders inside the close group survive"
1536 );
1537 assert_eq!(
1538 scoped.responder_views[0]
1539 .closest
1540 .iter()
1541 .map(|node| node.peer_id.as_bytes()[0])
1542 .collect::<Vec<_>>(),
1543 view_closest.to_vec(),
1544 "a surviving view's closest set must be preserved verbatim"
1545 );
1546
1547 assert_eq!(
1550 missing_witnessed_responder_views(&scoped),
1551 missing_witnessed_responder_views(&native),
1552 );
1553 let quorum = witnessed_close_group_quorum_for_transcript(&scoped);
1554 assert_eq!(quorum, witnessed_close_group_quorum_for_transcript(&native));
1555 let candidate_seeds = |group: &WitnessedCloseGroup| {
1556 witnessed_consensus_candidates(group, &address, quorum)
1557 .iter()
1558 .map(|candidate| candidate.node.peer_id.as_bytes()[0])
1559 .collect::<Vec<_>>()
1560 };
1561 assert_eq!(
1562 candidate_seeds(&scoped),
1563 candidate_seeds(&native),
1564 "scoped consensus must match a native close-group query"
1565 );
1566 }
1567
1568 #[test]
1569 fn witnessed_quote_peers_error_is_typed_and_pre_payment_when_consensus_is_short() {
1570 let address = [0u8; 32];
1571 let responder_views = (1..=7)
1572 .map(|responder| witnessed_test_view(responder, &[1, 2, 3, 4]))
1573 .collect();
1574 let witnessed = WitnessedCloseGroup {
1575 target: address,
1576 k: CLOSE_GROUP_SIZE,
1577 initial_closest: witnessed_test_nodes(&[1, 2, 3, 4, 5, 6, 7]),
1578 responder_views,
1579 };
1580
1581 let err = witnessed_quote_selection_or_error(
1582 &address,
1583 &witnessed,
1584 CLOSE_GROUP_SIZE,
1585 witnessed_close_group_quorum(),
1586 )
1587 .expect_err("short witnessed consensus must fail before payment");
1588
1589 match err {
1590 Error::InsufficientPeers(message) => {
1591 assert!(message.contains("before payment"));
1592 assert!(message.contains("vote_counts"));
1593 assert!(message.contains("quorum"));
1594 }
1595 other => panic!("expected typed InsufficientPeers error, got {other:?}"),
1596 }
1597 }
1598
1599 #[test]
1600 fn witnessed_quote_peers_include_quorum_fallback_candidates() {
1601 const EXTRA_QUORUM_CANDIDATES: usize = 1;
1602
1603 let address = [0u8; 32];
1604 let witnessed = WitnessedCloseGroup {
1605 target: address,
1606 k: CLOSE_GROUP_SIZE,
1607 initial_closest: witnessed_test_nodes(&[1, 2, 3, 4, 5, 6, 7]),
1608 responder_views: vec![
1609 witnessed_test_view(1, &[1, 2, 3, 4, 5, 6, 7]),
1610 witnessed_test_view(2, &[1, 2, 3, 4, 5, 6, 8]),
1611 witnessed_test_view(3, &[1, 2, 3, 4, 5, 7, 8]),
1612 witnessed_test_view(4, &[1, 2, 3, 4, 6, 7, 8]),
1613 witnessed_test_view(5, &[1, 2, 3, 5, 6, 7, 8]),
1614 witnessed_test_view(6, &[1, 2, 4, 5, 6, 7, 8]),
1615 witnessed_test_view(7, &[1, 3, 4, 5, 6, 7, 8]),
1616 ],
1617 };
1618
1619 let selection = witnessed_quote_selection_or_error(
1620 &address,
1621 &witnessed,
1622 CLOSE_GROUP_SIZE,
1623 witnessed_close_group_quorum(),
1624 )
1625 .expect("fallback candidates should be retained for quote collection");
1626
1627 assert_eq!(
1628 selection.quote_peers.len(),
1629 CLOSE_GROUP_SIZE + EXTRA_QUORUM_CANDIDATES
1630 );
1631 assert_eq!(
1632 selection
1633 .quote_peers
1634 .iter()
1635 .map(|peer| peer.peer_id.as_bytes()[0])
1636 .collect::<Vec<_>>(),
1637 vec![1, 2, 3, 4, 5, 6, 7, 8]
1638 );
1639 assert_eq!(
1640 put_peer_seeds(&selection.initial_put_peers),
1641 vec![1, 2, 3, 4, 5, 6, 7]
1642 );
1643 }
1644
1645 #[test]
1646 fn witnessed_quote_peers_lower_quorum_for_missing_responder_views() {
1647 let address = [0u8; 32];
1648 let witnessed = WitnessedCloseGroup {
1649 target: address,
1650 k: CLOSE_GROUP_SIZE,
1651 initial_closest: witnessed_test_nodes(&[1, 2, 3, 4, 5, 6, 7]),
1652 responder_views: vec![
1653 witnessed_test_view(1, &[1, 2, 3, 4, 5, 6, 7]),
1654 witnessed_test_view(2, &[1, 2, 3, 4, 5, 6, 8]),
1655 witnessed_test_view(3, &[1, 2, 3, 4, 5, 7, 8]),
1656 witnessed_test_view(4, &[1, 2, 3, 4, 6, 7, 8]),
1657 witnessed_test_view(5, &[1, 2, 3, 5, 6, 7, 8]),
1658 witnessed_test_view(6, &[1, 2, 4, 5, 6, 7, 8]),
1659 ],
1660 };
1661 let quorum = witnessed_close_group_quorum_for_transcript(&witnessed);
1662
1663 assert_eq!(missing_witnessed_responder_views(&witnessed), 1);
1664 assert_eq!(quorum, 4);
1665
1666 let selection =
1667 witnessed_quote_selection_or_error(&address, &witnessed, CLOSE_GROUP_SIZE, quorum)
1668 .expect(
1669 "one missing responder view should lower quorum and still select candidates",
1670 );
1671
1672 assert_eq!(
1673 selection
1674 .quote_peers
1675 .iter()
1676 .map(|peer| peer.peer_id.as_bytes()[0])
1677 .collect::<Vec<_>>(),
1678 vec![1, 2, 3, 4, 5, 6, 7, 8]
1679 );
1680 assert_eq!(selection.quorum, quorum);
1681 }
1682
1683 #[test]
1684 fn witnessed_quote_selection_keeps_closest_set_with_median_voter_quorum() {
1685 const MEDIAN_ISSUER_SEED: u8 = 7;
1686 const FAR_SUPPORTING_VOTER_SEED: u8 = 20;
1687 const UNSUCCESSFUL_SUPPORTING_VOTER_SEED: u8 = 21;
1688
1689 let address = [0u8; 32];
1690 let quotes = vec![
1691 synthetic_quote(1, 10),
1692 synthetic_quote(2, 20),
1693 synthetic_quote(3, 30),
1694 synthetic_quote(6, 50),
1695 synthetic_quote(MEDIAN_ISSUER_SEED, 40),
1696 synthetic_quote(8, 60),
1697 synthetic_quote(9, 70),
1698 synthetic_quote(FAR_SUPPORTING_VOTER_SEED, 80),
1699 ];
1700 let mut voters_by_peer = HashMap::new();
1701 voters_by_peer.insert(
1702 synthetic_peer(MEDIAN_ISSUER_SEED),
1703 synthetic_voters(&[
1704 1,
1705 2,
1706 3,
1707 MEDIAN_ISSUER_SEED,
1708 FAR_SUPPORTING_VOTER_SEED,
1709 UNSUCCESSFUL_SUPPORTING_VOTER_SEED,
1710 ]),
1711 );
1712
1713 let quorum = witnessed_close_group_quorum();
1714 let selected =
1715 select_witnessed_median_voter_quotes(quotes, &address, &voters_by_peer, quorum)
1716 .expect("a supported close-group quote set should be selected");
1717
1718 assert_eq!(quote_peer_seeds(&selected), vec![1, 2, 3, 6, 7, 8, 9]);
1719 let (median_peer_id, _) =
1720 median_paid_quote_issuer(&selected).expect("selected quotes have a median");
1721 assert_eq!(median_peer_id, synthetic_peer(MEDIAN_ISSUER_SEED));
1722 assert!(voters_by_peer[&median_peer_id].len() >= quorum);
1723 }
1724
1725 #[test]
1726 fn witnessed_quote_selection_uses_direct_median_witness_recognition() {
1727 const MEDIAN_ISSUER_SEED: u8 = 7;
1728
1729 let address = [0u8; 32];
1730 let quotes = vec![
1731 synthetic_quote(1, 10),
1732 synthetic_quote(2, 20),
1733 synthetic_quote(3, 30),
1734 synthetic_quote(4, 50),
1735 synthetic_quote(MEDIAN_ISSUER_SEED, 40),
1736 synthetic_quote(8, 60),
1737 synthetic_quote(9, 70),
1738 ];
1739 let mut voters_by_peer = HashMap::new();
1740 voters_by_peer.insert(
1741 synthetic_peer(MEDIAN_ISSUER_SEED),
1742 synthetic_voters(&[20, 21, 22, 23, 24]),
1743 );
1744
1745 let quorum = witnessed_close_group_quorum();
1746 let selected =
1747 select_witnessed_median_voter_quotes(quotes, &address, &voters_by_peer, quorum)
1748 .expect("direct witness recognition should support the paid median issuer");
1749
1750 let (median_peer_id, _) =
1751 median_paid_quote_issuer(&selected).expect("selected quotes have a median");
1752 let selected_peers = selected
1753 .iter()
1754 .map(|(peer_id, _, _, _)| *peer_id)
1755 .collect::<HashSet<_>>();
1756 assert_eq!(median_peer_id, synthetic_peer(MEDIAN_ISSUER_SEED));
1757 assert_eq!(
1758 voters_by_peer[&median_peer_id]
1759 .intersection(&selected_peers)
1760 .count(),
1761 0,
1762 "recognising witnesses need not also be selected quote issuers"
1763 );
1764 assert_eq!(voters_by_peer[&median_peer_id].len(), quorum);
1765 }
1766
1767 #[test]
1768 fn witnessed_quote_selection_rejects_median_without_witness_quorum() {
1769 const MEDIAN_ISSUER_SEED: u8 = 7;
1770
1771 let address = [0u8; 32];
1772 let quotes = vec![
1773 synthetic_quote(1, 10),
1774 synthetic_quote(2, 20),
1775 synthetic_quote(3, 30),
1776 synthetic_quote(6, 50),
1777 synthetic_quote(MEDIAN_ISSUER_SEED, 40),
1778 synthetic_quote(8, 60),
1779 synthetic_quote(9, 70),
1780 synthetic_quote(10, 80),
1781 ];
1782 let mut voters_by_peer = HashMap::new();
1783 voters_by_peer.insert(
1784 synthetic_peer(MEDIAN_ISSUER_SEED),
1785 synthetic_voters(&[1, 2, 3, 20]),
1786 );
1787
1788 let selected = select_witnessed_median_voter_quotes(
1789 quotes,
1790 &address,
1791 &voters_by_peer,
1792 witnessed_close_group_quorum(),
1793 );
1794
1795 assert!(
1796 selected.is_none(),
1797 "the selector must not return a paid quote set when fewer than the \
1798 witnessed median voter quorum recognised the paid median issuer"
1799 );
1800 }
1801
1802 #[test]
1803 fn put_peers_prioritise_median_voters_without_reordering_quotes() {
1804 const MEDIAN_ISSUER_SEED: u8 = 7;
1805
1806 let quotes = vec![
1807 synthetic_quote(1, 10),
1808 synthetic_quote(2, 20),
1809 synthetic_quote(3, 30),
1810 synthetic_quote(4, 50),
1811 synthetic_quote(5, 60),
1812 synthetic_quote(6, 70),
1813 synthetic_quote(MEDIAN_ISSUER_SEED, 40),
1814 ];
1815 let mut voters_by_peer = HashMap::new();
1816 voters_by_peer.insert(
1817 synthetic_peer(MEDIAN_ISSUER_SEED),
1818 synthetic_voters(&[3, 4, 5, 6, MEDIAN_ISSUER_SEED]),
1819 );
1820
1821 let put_candidates = put_peers_from_seeds(&[1, 2, 3, 4, 5, 6, 7]);
1822 let put_peers = put_peers_with_median_voters_first(
1823 "es,
1824 &put_candidates,
1825 &voters_by_peer,
1826 witnessed_close_group_quorum(),
1827 )
1828 .expect("median voters should produce an ordered PUT set");
1829
1830 assert_eq!(quote_peer_seeds("es), vec![1, 2, 3, 4, 5, 6, 7]);
1831 let (median_peer_id, _) =
1832 median_paid_quote_issuer("es).expect("selected quotes have a median");
1833 assert_eq!(median_peer_id, synthetic_peer(MEDIAN_ISSUER_SEED));
1834 assert_eq!(put_peer_seeds(&put_peers), vec![3, 4, 5, 6, 7, 1, 2]);
1835 }
1836
1837 #[test]
1838 fn filter_drops_only_bad_bindings_and_leaves_storer_acceptable_quotes() {
1839 let mut quotes = vec![
1840 good_quote_real(),
1841 bad_quote_real(),
1842 good_quote_real(),
1843 bad_quote_real(),
1844 good_quote_real(),
1845 ];
1846
1847 let dropped = drop_quotes_with_bad_bindings(&mut quotes);
1848
1849 assert_eq!(dropped, 2, "two crossed-key quotes must be dropped");
1850 assert_eq!(quotes.len(), 3, "three real-key quotes must remain");
1851
1852 for (peer_id, _, quote, _) in "es {
1858 assert!(
1859 storer_binding_would_accept(peer_id, quote),
1860 "every retained quote must satisfy the full storer-side spec"
1861 );
1862 }
1863 }
1864
1865 #[test]
1866 fn filter_is_noop_when_all_quotes_are_storer_acceptable() {
1867 let mut quotes: Vec<_> = (0..5).map(|_| good_quote_real()).collect();
1868 let before = quotes.len();
1869 let dropped = drop_quotes_with_bad_bindings(&mut quotes);
1870 assert_eq!(dropped, 0);
1871 assert_eq!(quotes.len(), before);
1872 for (peer_id, _, quote, _) in "es {
1873 assert!(storer_binding_would_accept(peer_id, quote));
1874 }
1875 }
1876
1877 #[test]
1878 fn filter_drops_all_when_every_responder_is_bad() {
1879 let mut quotes: Vec<_> = (0..fault_tolerant_quote_query_count())
1884 .map(|_| bad_quote_real())
1885 .collect();
1886 let dropped = drop_quotes_with_bad_bindings(&mut quotes);
1887 assert_eq!(dropped, fault_tolerant_quote_query_count());
1888 assert!(quotes.is_empty());
1889 }
1890
1891 #[test]
1892 fn filter_preserves_quote_payload_byte_for_byte() {
1893 let (peer_id, addrs, original_quote, amount) = good_quote_real();
1898 let mut quotes = vec![(peer_id, addrs.clone(), original_quote.clone(), amount)];
1899 let _ = drop_quotes_with_bad_bindings(&mut quotes);
1900
1901 let (kept_peer, kept_addrs, kept_quote, kept_amount) =
1902 quotes.pop().expect("the good quote must survive filtering");
1903 assert_eq!(kept_peer.as_bytes(), peer_id.as_bytes());
1904 assert_eq!(kept_addrs.len(), addrs.len());
1905 assert_eq!(kept_amount, amount);
1906 assert_eq!(kept_quote.pub_key, original_quote.pub_key);
1907 assert_eq!(kept_quote.signature, original_quote.signature);
1908 assert_eq!(kept_quote.content.0, original_quote.content.0);
1909 assert_eq!(kept_quote.timestamp, original_quote.timestamp);
1910 assert_eq!(kept_quote.price, original_quote.price);
1911 assert_eq!(kept_quote.rewards_address, original_quote.rewards_address);
1912 }
1913
1914 #[test]
1945 fn repro_apr_30_storer_would_have_rejected_pre_filter_and_accepts_post_filter() {
1946 let over_query_count = fault_tolerant_quote_query_count();
1947 let mut quotes: Vec<_> = (0..over_query_count - 1)
1948 .map(|_| good_quote_real())
1949 .collect();
1950 quotes.insert(over_query_count / 2, bad_quote_real());
1953 assert_eq!(quotes.len(), over_query_count);
1954
1955 let storer_would_reject_count = quotes
1957 .iter()
1958 .filter(|(p, _, q, _)| !storer_binding_would_accept(p, q))
1959 .count();
1960 assert_eq!(
1961 storer_would_reject_count, 1,
1962 "exactly one quote (the crossed-key one) must be rejected by the storer spec"
1963 );
1964
1965 let dropped = drop_quotes_with_bad_bindings(&mut quotes);
1967 assert_eq!(dropped, 1, "exactly the crossed-key quote must be filtered");
1968
1969 for (peer_id, _, quote, _) in "es {
1971 assert!(
1972 storer_binding_would_accept(peer_id, quote),
1973 "every post-filter quote must be accepted by the storer spec — \
1974 this is what the filter guarantees before any quote set is used"
1975 );
1976 }
1977
1978 assert!(
1980 quotes.len() >= CLOSE_GROUP_SIZE,
1981 "after filtering, at least CLOSE_GROUP_SIZE good quotes must remain \
1982 so a fault-tolerant probe can still return a full close group"
1983 );
1984 }
1985
1986 #[test]
1991 fn filter_leaves_short_set_when_too_many_bad_peers() {
1992 let good_count = CLOSE_GROUP_SIZE - 1;
1993 let bad_count = fault_tolerant_quote_query_count() - good_count;
1994 let mut quotes: Vec<_> = std::iter::repeat_with(bad_quote_real)
1995 .take(bad_count)
1996 .chain(std::iter::repeat_with(good_quote_real).take(good_count))
1997 .collect();
1998
1999 let dropped = drop_quotes_with_bad_bindings(&mut quotes);
2000 assert_eq!(dropped, bad_count);
2001 assert!(
2002 quotes.len() < CLOSE_GROUP_SIZE,
2003 "this is the precondition for InsufficientPeers downstream"
2004 );
2005 for (peer_id, _, quote, _) in "es {
2007 assert!(storer_binding_would_accept(peer_id, quote));
2008 }
2009 }
2010
2011 fn serialize_quote(quote: &PaymentQuote) -> Vec<u8> {
2026 rmp_serde::to_vec(quote).expect("serialize quote")
2027 }
2028
2029 #[test]
2030 fn classifier_accepts_real_self_consistent_quote() {
2031 let (peer_id, _, quote, _) = good_quote_real();
2032 let bytes = serialize_quote("e);
2033 let result = classify_quote_response(&peer_id, &bytes, false);
2034 match result {
2035 Ok((q, price)) => {
2036 assert_eq!(q.pub_key, quote.pub_key);
2037 assert_eq!(price, quote.price);
2038 }
2039 Err(e) => panic!("expected Ok, got {e}"),
2040 }
2041 }
2042
2043 #[test]
2044 fn classifier_rejects_crossed_keypair_with_typed_error() {
2045 let (peer_id, _, quote, _) = bad_quote_real();
2046 let bytes = serialize_quote("e);
2047 let result = classify_quote_response(&peer_id, &bytes, false);
2048 match result {
2049 Err(Error::BadQuoteBinding {
2050 peer_id: pid,
2051 detail,
2052 }) => {
2053 assert_eq!(pid, peer_id.to_string());
2054 assert!(
2055 detail.contains("BLAKE3(pub_key)="),
2056 "diagnostic detail must include the derived peer id: {detail}"
2057 );
2058 }
2059 other => panic!("expected BadQuoteBinding for crossed-key quote, got {other:?}"),
2060 }
2061 }
2062
2063 #[test]
2074 fn classifier_rejects_already_stored_vote_from_bad_binding_peer() {
2075 let (peer_id, _, quote, _) = bad_quote_real();
2076 let bytes = serialize_quote("e);
2077 let result = classify_quote_response(&peer_id, &bytes, true);
2079 assert!(
2080 matches!(result, Err(Error::BadQuoteBinding { .. })),
2081 "crossed-key peer must be classified BadQuoteBinding even when \
2082 voting already_stored=true; got {result:?}"
2083 );
2084 }
2085
2086 #[test]
2089 fn classifier_honours_already_stored_vote_from_good_binding_peer() {
2090 let (peer_id, _, quote, _) = good_quote_real();
2091 let bytes = serialize_quote("e);
2092 let result = classify_quote_response(&peer_id, &bytes, true);
2093 assert!(
2094 matches!(result, Err(Error::AlreadyStored)),
2095 "honest peer's already_stored vote must be honoured; got {result:?}"
2096 );
2097 }
2098
2099 #[test]
2100 fn classifier_returns_serialization_error_on_bad_bytes() {
2101 let (peer_id, _, _, _) = good_quote_real();
2102 let garbage = b"this is not a valid msgpack PaymentQuote".to_vec();
2103 let result = classify_quote_response(&peer_id, &garbage, false);
2104 assert!(
2105 matches!(result, Err(Error::Serialization(_))),
2106 "garbage bytes must produce a Serialization error; got {result:?}"
2107 );
2108 }
2109
2110 #[test]
2113 fn classifier_verdict_matches_storer_binding_spec_for_mixed_responders() {
2114 let mut responders: Vec<(PeerId, PaymentQuote)> = (0..12)
2115 .map(|_| {
2116 let (p, _, q, _) = good_quote_real();
2117 (p, q)
2118 })
2119 .collect();
2120 for _ in 0..4 {
2121 let (p, _, q, _) = bad_quote_real();
2122 responders.push((p, q));
2123 }
2124
2125 for (peer_id, quote) in &responders {
2126 let bytes = serialize_quote(quote);
2127 let storer_verdict = storer_binding_would_accept(peer_id, quote);
2128 let classifier_verdict = classify_quote_response(peer_id, &bytes, false).is_ok();
2129 assert_eq!(
2130 classifier_verdict, storer_verdict,
2131 "classifier and storer-binding-spec must agree on every responder \
2132 (peer_id={}, storer={storer_verdict}, classifier={classifier_verdict})",
2133 peer_id
2134 );
2135 }
2136 }
2137}