1use crate::data::client::peer_xor_distance;
7use crate::data::client::Client;
8use crate::data::error::{Error, Result};
9use ant_protocol::evm::{Amount, PaymentQuote};
10use ant_protocol::transport::{DHTNode, MultiAddr, P2PNode, PeerId, WitnessedCloseGroup};
11use ant_protocol::{
12 compute_address, send_and_await_chunk_response, ChunkMessage, ChunkMessageBody,
13 ChunkQuoteRequest, ChunkQuoteResponse, CLOSE_GROUP_MAJORITY, CLOSE_GROUP_SIZE,
14};
15use futures::stream::{FuturesUnordered, StreamExt};
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18use std::time::Duration;
19use tracing::{debug, info, warn};
20
21const FAULT_TOLERANT_QUOTE_QUERY_MULTIPLIER: usize = 2;
26
27const WITNESSED_QUORUM_NUMERATOR: usize = 2;
31const WITNESSED_QUORUM_DENOMINATOR: usize = 3;
32
33const SINGLE_NODE_WITNESSED_VIEW_COUNT: usize = 20;
35
36const MEDIAN_QUOTE_INDEX: usize = CLOSE_GROUP_SIZE / 2;
38
39const QUOTE_COLLECTION_TIMEOUT_SECS: u64 = 120;
43
44const ML_DSA_PUB_KEY_LEN: usize = 1952;
53
54fn quote_binding_is_valid(peer_id: &PeerId, quote: &PaymentQuote) -> bool {
70 if quote.pub_key.len() != ML_DSA_PUB_KEY_LEN {
71 return false;
72 }
73 compute_address("e.pub_key) == *peer_id.as_bytes()
74}
75
76fn classify_quote_response(
103 peer_id: &PeerId,
104 quote_bytes: &[u8],
105 already_stored: bool,
106) -> std::result::Result<(PaymentQuote, Amount), Error> {
107 let payment_quote = rmp_serde::from_slice::<PaymentQuote>(quote_bytes).map_err(|e| {
108 Error::Serialization(format!("Failed to deserialize quote from {peer_id}: {e}"))
109 })?;
110
111 if !quote_binding_is_valid(peer_id, &payment_quote) {
116 let derived = compute_address(&payment_quote.pub_key);
117 warn!(
118 "Dropping response from {peer_id} — quote.pub_key BLAKE3 mismatch \
119 (peer is signing quotes with another peer's key); the storer \
120 would reject this proof"
121 );
122 return Err(Error::BadQuoteBinding {
123 peer_id: peer_id.to_string(),
124 detail: format!(
125 "BLAKE3(pub_key)={} pub_key_len={}",
126 hex::encode(derived),
127 payment_quote.pub_key.len(),
128 ),
129 });
130 }
131
132 if already_stored {
133 debug!("Peer {peer_id} already has chunk");
134 return Err(Error::AlreadyStored);
135 }
136 let price = payment_quote.price;
137 debug!("Received quote from {peer_id}: price = {price}");
138 Ok((payment_quote, price))
139}
140
141fn drop_quotes_with_bad_bindings(
144 quotes: &mut Vec<(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)>,
145) -> usize {
146 let before = quotes.len();
147 quotes.retain(|(peer_id, _, quote, _)| {
148 if quote_binding_is_valid(peer_id, quote) {
149 true
150 } else {
151 warn!(
152 "Dropping quote from peer {peer_id} — quote.pub_key BLAKE3 mismatch \
153 (peer is signing quotes with another peer's key); the storer would \
154 reject this proof"
155 );
156 false
157 }
158 });
159 before - quotes.len()
160}
161
162#[allow(clippy::too_many_arguments)]
163async fn request_store_quote_from_peer(
164 node: Arc<P2PNode>,
165 peer_id: PeerId,
166 peer_addrs: Vec<MultiAddr>,
167 request_id: u64,
168 address: [u8; 32],
169 data_size: u64,
170 data_type: u32,
171 per_peer_timeout: Duration,
172) -> StoreQuoteRequestResult {
173 let request = ChunkQuoteRequest {
174 address,
175 data_size,
176 data_type,
177 };
178 let message = ChunkMessage {
179 request_id,
180 body: ChunkMessageBody::QuoteRequest(request),
181 };
182
183 let message_bytes = match message.encode() {
184 Ok(bytes) => bytes,
185 Err(e) => {
186 return (
187 peer_id,
188 peer_addrs,
189 Err(Error::Protocol(format!(
190 "Failed to encode quote request for {peer_id}: {e}"
191 ))),
192 );
193 }
194 };
195
196 let result = send_and_await_chunk_response(
197 &node,
198 &peer_id,
199 message_bytes,
200 request_id,
201 per_peer_timeout,
202 &peer_addrs,
203 |body| match body {
204 ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
205 quote,
206 already_stored,
207 }) => Some(classify_quote_response(&peer_id, "e, already_stored)),
208 ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Error(e)) => Some(Err(
209 Error::Protocol(format!("Quote error from {peer_id}: {e}")),
210 )),
211 _ => None,
212 },
213 |e| Error::Network(format!("Failed to send quote request to {peer_id}: {e}")),
214 || Error::Timeout(format!("Timeout waiting for quote from {peer_id}")),
215 )
216 .await;
217
218 (peer_id, peer_addrs, result)
219}
220
221#[allow(clippy::too_many_arguments)]
222fn record_store_quote_result(
223 peer_id: PeerId,
224 addrs: Vec<MultiAddr>,
225 quote_result: Result<(PaymentQuote, Amount)>,
226 address: &[u8; 32],
227 quotes: &mut Vec<StoreQuote>,
228 already_stored_peers: &mut Vec<(PeerId, [u8; 32])>,
229 failures: &mut Vec<String>,
230 bad_quote_count: &mut usize,
231) {
232 match quote_result {
233 Ok((quote, price)) => {
234 quotes.push((peer_id, addrs, quote, price));
235 }
236 Err(Error::AlreadyStored) => {
237 info!("Peer {peer_id} reports chunk already stored");
238 let dist = peer_xor_distance(&peer_id, address);
239 already_stored_peers.push((peer_id, dist));
240 }
241 Err(e) => {
242 if matches!(&e, Error::BadQuoteBinding { .. }) {
243 *bad_quote_count += 1;
244 }
245 warn!("Failed to get quote from {peer_id}: {e}");
246 failures.push(format!("{peer_id}: {e}"));
247 }
248 }
249}
250
251fn witnessed_quote_launch_budget(
252 successful_quotes: usize,
253 in_flight: usize,
254 remaining_peers: usize,
255) -> usize {
256 CLOSE_GROUP_SIZE
257 .saturating_sub(successful_quotes.saturating_add(in_flight))
258 .min(remaining_peers)
259}
260
261fn single_node_quote_query_count() -> usize {
262 CLOSE_GROUP_SIZE
263}
264
265fn fault_tolerant_quote_query_count() -> usize {
266 CLOSE_GROUP_SIZE * FAULT_TOLERANT_QUOTE_QUERY_MULTIPLIER
267}
268
269fn witnessed_close_group_quorum() -> usize {
270 (CLOSE_GROUP_SIZE * WITNESSED_QUORUM_NUMERATOR).div_ceil(WITNESSED_QUORUM_DENOMINATOR)
271}
272
273fn witnessed_close_group_quorum_for_missing_views(missing_views: usize) -> usize {
274 witnessed_close_group_quorum()
275 .saturating_sub(missing_views)
276 .max(1)
277}
278
279fn missing_witnessed_responder_views(witnessed: &WitnessedCloseGroup) -> usize {
280 witnessed
281 .initial_closest
282 .len()
283 .saturating_sub(witnessed.responder_views.len())
284}
285
286fn witnessed_close_group_quorum_for_transcript(witnessed: &WitnessedCloseGroup) -> usize {
287 witnessed_close_group_quorum_for_missing_views(missing_witnessed_responder_views(witnessed))
288}
289
290fn peer_list(peers: &[PeerId]) -> Vec<String> {
291 peers.iter().map(ToString::to_string).collect()
292}
293
294pub(crate) type StoreQuote = (PeerId, Vec<MultiAddr>, PaymentQuote, Amount);
295type StoreQuoteRequestResult = (PeerId, Vec<MultiAddr>, Result<(PaymentQuote, Amount)>);
296type VotersByPeer = HashMap<PeerId, HashSet<PeerId>>;
297type WitnessedVoteData = (HashMap<PeerId, DHTNode>, VotersByPeer, Vec<(PeerId, usize)>);
298
299pub(crate) struct StoreQuotePlan {
300 pub(crate) quotes: Vec<StoreQuote>,
301 pub(crate) put_peers: Vec<(PeerId, Vec<MultiAddr>)>,
302}
303
304#[derive(Debug, Clone)]
305struct WitnessedQuoteCandidate {
306 node: DHTNode,
307 votes: usize,
308 voters: HashSet<PeerId>,
309}
310
311#[derive(Debug, Clone)]
312struct WitnessedQuotePeer {
313 peer_id: PeerId,
314 addrs: Vec<MultiAddr>,
315 voters: HashSet<PeerId>,
316}
317
318#[derive(Debug, Clone)]
319struct WitnessedQuoteSelection {
320 quote_peers: Vec<WitnessedQuotePeer>,
321 initial_put_peers: Vec<(PeerId, Vec<MultiAddr>)>,
322 quorum: usize,
323}
324
325enum QuoteSelectionPolicy {
326 ClosestByDistance,
327 WitnessedMedianVoters {
328 voters_by_peer: VotersByPeer,
329 quorum: usize,
330 },
331}
332
333fn witnessed_initial_peers(witnessed: &WitnessedCloseGroup) -> Vec<String> {
334 witnessed
335 .initial_closest
336 .iter()
337 .map(|node| node.peer_id.to_string())
338 .collect()
339}
340
341fn witnessed_responder_views(witnessed: &WitnessedCloseGroup) -> Vec<String> {
342 witnessed
343 .responder_views
344 .iter()
345 .map(|view| {
346 let peers = view
347 .closest
348 .iter()
349 .map(|node| node.peer_id)
350 .collect::<Vec<_>>();
351 format!("{}=>{:?}", view.responder, peer_list(&peers))
352 })
353 .collect()
354}
355
356fn merge_witnessed_node(nodes: &mut HashMap<PeerId, DHTNode>, node: DHTNode) {
357 match nodes.entry(node.peer_id) {
358 std::collections::hash_map::Entry::Occupied(mut entry) => {
359 entry.get_mut().merge_from(node);
360 }
361 std::collections::hash_map::Entry::Vacant(entry) => {
362 entry.insert(node);
363 }
364 }
365}
366
367fn sort_vote_counts_by_distance(vote_counts: &mut [(PeerId, usize)], address: &[u8; 32]) {
368 vote_counts.sort_by(|left, right| {
369 peer_xor_distance(&left.0, address)
370 .cmp(&peer_xor_distance(&right.0, address))
371 .then_with(|| left.0.as_bytes().cmp(right.0.as_bytes()))
372 });
373}
374
375fn witnessed_vote_counts_and_nodes(
376 witnessed: &WitnessedCloseGroup,
377 address: &[u8; 32],
378) -> WitnessedVoteData {
379 let mut known_nodes = HashMap::new();
380 for node in &witnessed.initial_closest {
381 merge_witnessed_node(&mut known_nodes, node.clone());
382 }
383
384 let mut voters_by_peer: HashMap<PeerId, HashSet<PeerId>> = HashMap::new();
385 for view in &witnessed.responder_views {
386 let mut voted = HashSet::new();
387 for node in &view.closest {
388 merge_witnessed_node(&mut known_nodes, node.clone());
389 if voted.insert(node.peer_id) {
390 voters_by_peer
391 .entry(node.peer_id)
392 .or_default()
393 .insert(view.responder);
394 }
395 }
396 }
397
398 let mut vote_counts: Vec<(PeerId, usize)> = voters_by_peer
399 .iter()
400 .map(|(peer_id, voters)| (*peer_id, voters.len()))
401 .collect();
402 sort_vote_counts_by_distance(&mut vote_counts, address);
403 (known_nodes, voters_by_peer, vote_counts)
404}
405
406fn witnessed_consensus_candidates(
407 witnessed: &WitnessedCloseGroup,
408 address: &[u8; 32],
409 quorum: usize,
410) -> Vec<WitnessedQuoteCandidate> {
411 let (known_nodes, voters_by_peer, vote_counts) =
412 witnessed_vote_counts_and_nodes(witnessed, address);
413 let mut candidates = vote_counts
414 .iter()
415 .filter_map(|(peer_id, votes)| {
416 if *votes < quorum {
417 return None;
418 }
419 known_nodes.get(peer_id).cloned().and_then(|node| {
420 voters_by_peer
421 .get(peer_id)
422 .cloned()
423 .map(|voters| WitnessedQuoteCandidate {
424 node,
425 votes: *votes,
426 voters,
427 })
428 })
429 })
430 .collect::<Vec<_>>();
431
432 candidates.sort_by(|left, right| {
433 peer_xor_distance(&left.node.peer_id, address)
434 .cmp(&peer_xor_distance(&right.node.peer_id, address))
435 .then_with(|| right.votes.cmp(&left.votes))
436 .then_with(|| {
437 left.node
438 .peer_id
439 .as_bytes()
440 .cmp(right.node.peer_id.as_bytes())
441 })
442 });
443 candidates
444}
445
446fn witnessed_vote_counts(witnessed: &WitnessedCloseGroup, address: &[u8; 32]) -> Vec<String> {
447 let (_, _, vote_counts) = witnessed_vote_counts_and_nodes(witnessed, address);
448 vote_counts
449 .iter()
450 .map(|(peer_id, votes)| format!("{peer_id}:{votes}"))
451 .collect()
452}
453
454fn witnessed_consensus(
455 witnessed: &WitnessedCloseGroup,
456 address: &[u8; 32],
457 quorum: usize,
458) -> Vec<String> {
459 witnessed_consensus_candidates(witnessed, address, quorum)
460 .iter()
461 .map(|candidate| format!("{}:{}", candidate.node.peer_id, candidate.votes))
462 .collect()
463}
464
465fn witnessed_close_group_diagnostics(
466 address: &[u8; 32],
467 witnessed: &WitnessedCloseGroup,
468 quorum: usize,
469) -> String {
470 format!(
471 "target={}, initial={:?}, responder_views={:?}, vote_counts={:?}, quorum={}, final={:?}",
472 hex::encode(address),
473 witnessed_initial_peers(witnessed),
474 witnessed_responder_views(witnessed),
475 witnessed_vote_counts(witnessed, address),
476 quorum,
477 witnessed_consensus(witnessed, address, quorum)
478 )
479}
480
481fn witnessed_quote_selection_or_error(
482 address: &[u8; 32],
483 witnessed: &WitnessedCloseGroup,
484 required: usize,
485 quorum: usize,
486) -> Result<WitnessedQuoteSelection> {
487 let candidates = witnessed_consensus_candidates(witnessed, address, quorum);
488 if candidates.len() < required {
489 return Err(Error::InsufficientPeers(format!(
490 "Witnessed close group inconclusive before payment: got {}/{} quorum-recognised peers. {}",
491 candidates.len(),
492 required,
493 witnessed_close_group_diagnostics(address, witnessed, quorum)
494 )));
495 }
496
497 let initial_put_peers = witnessed
498 .initial_closest
499 .iter()
500 .take(CLOSE_GROUP_SIZE)
501 .map(|node| (node.peer_id, node.addresses_by_priority()))
502 .collect::<Vec<_>>();
503
504 if initial_put_peers.len() < CLOSE_GROUP_SIZE {
505 return Err(Error::InsufficientPeers(format!(
506 "Witnessed close group returned only {}/{} initial PUT peers before payment. {}",
507 initial_put_peers.len(),
508 CLOSE_GROUP_SIZE,
509 witnessed_close_group_diagnostics(address, witnessed, quorum)
510 )));
511 }
512
513 let quote_peers = candidates
514 .into_iter()
515 .map(|candidate| WitnessedQuotePeer {
516 peer_id: candidate.node.peer_id,
517 addrs: candidate.node.addresses_by_priority(),
518 voters: candidate.voters,
519 })
520 .collect();
521
522 Ok(WitnessedQuoteSelection {
523 quote_peers,
524 initial_put_peers,
525 quorum,
526 })
527}
528
529pub(crate) fn median_paid_quote_issuer(
530 quotes: &[(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)],
531) -> Option<(PeerId, Amount)> {
532 if quotes.len() <= MEDIAN_QUOTE_INDEX {
533 return None;
534 }
535
536 let mut by_price: Vec<(usize, PeerId, Amount)> = quotes
537 .iter()
538 .enumerate()
539 .map(|(index, (peer_id, _, _, price))| (index, *peer_id, *price))
540 .collect();
541 by_price.sort_by_key(|(index, _, price)| (*price, *index));
542 by_price
543 .get(MEDIAN_QUOTE_INDEX)
544 .map(|(_, peer_id, price)| (*peer_id, *price))
545}
546
547fn sort_quotes_by_distance(quotes: &mut [StoreQuote], address: &[u8; 32]) {
548 quotes.sort_by(|left, right| {
549 peer_xor_distance(&left.0, address)
550 .cmp(&peer_xor_distance(&right.0, address))
551 .then_with(|| left.0.as_bytes().cmp(right.0.as_bytes()))
552 });
553}
554
555fn median_paid_quote_issuer_for_indices(
556 quotes: &[StoreQuote],
557 indices: &[usize],
558) -> Option<(PeerId, Amount)> {
559 if indices.len() <= MEDIAN_QUOTE_INDEX {
560 return None;
561 }
562
563 let mut by_price: Vec<(usize, PeerId, Amount)> = indices
564 .iter()
565 .enumerate()
566 .map(|(selected_index, quote_index)| {
567 let (peer_id, _, _, price) = "es[*quote_index];
568 (selected_index, *peer_id, *price)
569 })
570 .collect();
571 by_price.sort_by_key(|(selected_index, _, price)| (*price, *selected_index));
572 by_price
573 .get(MEDIAN_QUOTE_INDEX)
574 .map(|(_, peer_id, price)| (*peer_id, *price))
575}
576
577fn median_issuer_voter_support(
578 quotes: &[StoreQuote],
579 indices: &[usize],
580 voters_by_peer: &VotersByPeer,
581) -> Option<(PeerId, usize)> {
582 let (median_peer_id, _) = median_paid_quote_issuer_for_indices(quotes, indices)?;
583 let voters = voters_by_peer.get(&median_peer_id)?;
584 Some((median_peer_id, voters.len()))
585}
586
587fn visit_quote_subsets<F>(
588 quote_count: usize,
589 subset_size: usize,
590 start_index: usize,
591 current: &mut Vec<usize>,
592 visit: &mut F,
593) where
594 F: FnMut(&[usize]),
595{
596 if current.len() == subset_size {
597 visit(current);
598 return;
599 }
600
601 let remaining = subset_size - current.len();
602 let last_start = quote_count - remaining;
603 for index in start_index..=last_start {
604 current.push(index);
605 visit_quote_subsets(quote_count, subset_size, index + 1, current, visit);
606 current.pop();
607 }
608}
609
610fn select_closest_quotes(mut quotes: Vec<StoreQuote>, address: &[u8; 32]) -> Vec<StoreQuote> {
611 sort_quotes_by_distance(&mut quotes, address);
612 quotes.truncate(CLOSE_GROUP_SIZE);
613 quotes
614}
615
616fn select_witnessed_median_voter_quotes(
617 mut quotes: Vec<StoreQuote>,
618 address: &[u8; 32],
619 voters_by_peer: &VotersByPeer,
620 required_support: usize,
621) -> Option<Vec<StoreQuote>> {
622 if quotes.len() < CLOSE_GROUP_SIZE {
623 return None;
624 }
625
626 sort_quotes_by_distance(&mut quotes, address);
627
628 let mut best_indices: Option<(usize, Vec<usize>)> = None;
629 let mut current_indices = Vec::with_capacity(CLOSE_GROUP_SIZE);
630 visit_quote_subsets(
631 quotes.len(),
632 CLOSE_GROUP_SIZE,
633 0,
634 &mut current_indices,
635 &mut |indices| {
636 let Some((_, support)) = median_issuer_voter_support("es, indices, voters_by_peer)
637 else {
638 return;
639 };
640 if support < required_support {
641 return;
642 }
643 match &best_indices {
644 Some((best_support, best)) if *best_support > support => {}
645 Some((best_support, best))
646 if *best_support == support && best.as_slice() <= indices => {}
647 _ => best_indices = Some((support, indices.to_vec())),
648 }
649 },
650 );
651
652 best_indices.map(|(_, indices)| {
653 indices
654 .into_iter()
655 .map(|index| quotes[index].clone())
656 .collect()
657 })
658}
659
660fn put_peers_with_median_voters_first(
661 quotes: &[StoreQuote],
662 put_peers: &[(PeerId, Vec<MultiAddr>)],
663 voters_by_peer: &VotersByPeer,
664 required_support: usize,
665) -> Option<Vec<(PeerId, Vec<MultiAddr>)>> {
666 let (median_peer_id, _) = median_paid_quote_issuer(quotes)?;
667 let voters = voters_by_peer.get(&median_peer_id)?;
668
669 let mut supporting_peers = Vec::new();
670 let mut fallback_peers = Vec::new();
671 for (peer_id, addrs) in put_peers {
672 let peer = (*peer_id, addrs.clone());
673 if voters.contains(peer_id) {
674 supporting_peers.push(peer);
675 } else {
676 fallback_peers.push(peer);
677 }
678 }
679
680 if supporting_peers.len() < required_support {
681 return None;
682 }
683
684 supporting_peers.extend(fallback_peers);
685 Some(supporting_peers)
686}
687
688impl Client {
689 pub async fn get_store_quotes(
705 &self,
706 address: &[u8; 32],
707 data_size: u64,
708 data_type: u32,
709 ) -> Result<Vec<(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)>> {
710 Ok(self
711 .get_store_quote_plan(address, data_size, data_type)
712 .await?
713 .quotes)
714 }
715
716 pub(crate) async fn get_store_quote_plan(
723 &self,
724 address: &[u8; 32],
725 data_size: u64,
726 data_type: u32,
727 ) -> Result<StoreQuotePlan> {
728 let witnessed_selection = self.select_witnessed_quote_selection(address).await?;
729 let voters_by_peer: VotersByPeer = witnessed_selection
730 .quote_peers
731 .iter()
732 .map(|peer| (peer.peer_id, peer.voters.clone()))
733 .collect();
734 let remote_peers = witnessed_selection
735 .quote_peers
736 .into_iter()
737 .map(|peer| (peer.peer_id, peer.addrs))
738 .collect();
739 let initial_put_peers = witnessed_selection.initial_put_peers;
740 let quorum = witnessed_selection.quorum;
741 let quotes = self
742 .collect_store_quotes_from_remote_peers(
743 address,
744 data_size,
745 data_type,
746 remote_peers,
747 QuoteSelectionPolicy::WitnessedMedianVoters {
748 voters_by_peer: voters_by_peer.clone(),
749 quorum,
750 },
751 )
752 .await?;
753 let put_peers = put_peers_with_median_voters_first(
754 "es,
755 &initial_put_peers,
756 &voters_by_peer,
757 quorum,
758 )
759 .ok_or_else(|| {
760 Error::InsufficientPeers(format!(
761 "Collected {} witnessed quotes, but fewer than {} initial witness PUT peers \
762 voted for the paid median issuer for {}",
763 quotes.len(),
764 quorum,
765 hex::encode(address)
766 ))
767 })?;
768
769 Ok(StoreQuotePlan { quotes, put_peers })
770 }
771
772 pub(crate) async fn get_store_quotes_with_fault_tolerance(
779 &self,
780 address: &[u8; 32],
781 data_size: u64,
782 data_type: u32,
783 ) -> Result<Vec<(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)>> {
784 let peer_query_count = fault_tolerant_quote_query_count();
785 let remote_peers = self
786 .network()
787 .find_closest_peers(address, peer_query_count)
788 .await?;
789
790 self.collect_store_quotes_from_remote_peers(
791 address,
792 data_size,
793 data_type,
794 remote_peers,
795 QuoteSelectionPolicy::ClosestByDistance,
796 )
797 .await
798 }
799
800 async fn select_witnessed_quote_selection(
801 &self,
802 address: &[u8; 32],
803 ) -> Result<WitnessedQuoteSelection> {
804 let required = single_node_quote_query_count();
805 let witnessed = self
806 .network()
807 .find_witnessed_close_group_with_view_count(
808 address,
809 required,
810 SINGLE_NODE_WITNESSED_VIEW_COUNT,
811 )
812 .await
813 .map_err(|e| {
814 Error::InsufficientPeers(format!(
815 "Witnessed close group lookup failed before payment for target {}: {e}",
816 hex::encode(address)
817 ))
818 })?;
819 let base_quorum = witnessed_close_group_quorum();
820 let missing_views = missing_witnessed_responder_views(&witnessed);
821 let quorum = witnessed_close_group_quorum_for_transcript(&witnessed);
822
823 if missing_views > 0 {
824 warn!(
825 target = %hex::encode(address),
826 initial = witnessed.initial_closest.len(),
827 responder_views = witnessed.responder_views.len(),
828 missing_views = missing_views,
829 base_quorum = base_quorum,
830 adjusted_quorum = quorum,
831 "Witnessed close group transcript is missing responder views; lowering SNP witness quorum"
832 );
833 }
834
835 debug!(
836 target = %hex::encode(address),
837 quorum = quorum,
838 view_count = SINGLE_NODE_WITNESSED_VIEW_COUNT,
839 initial = ?witnessed_initial_peers(&witnessed),
840 responder_views = ?witnessed_responder_views(&witnessed),
841 vote_counts = ?witnessed_vote_counts(&witnessed, address),
842 final_witnessed_set = ?witnessed_consensus(&witnessed, address, quorum),
843 "Witnessed close group selected for SNP quote collection"
844 );
845
846 witnessed_quote_selection_or_error(address, &witnessed, required, quorum)
847 }
848
849 #[allow(clippy::too_many_lines)]
850 async fn collect_store_quotes_from_remote_peers(
851 &self,
852 address: &[u8; 32],
853 data_size: u64,
854 data_type: u32,
855 remote_peers: Vec<(PeerId, Vec<MultiAddr>)>,
856 quote_selection_policy: QuoteSelectionPolicy,
857 ) -> Result<Vec<(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)>> {
858 let peer_query_count = remote_peers.len();
859
860 let node = self.network().node();
861
862 debug!(
863 "Requesting quotes from up to {peer_query_count} peers for address {} (size: {data_size})",
864 hex::encode(address)
865 );
866
867 if remote_peers.len() < CLOSE_GROUP_SIZE {
868 return Err(Error::InsufficientPeers(format!(
869 "Found {} peers, need {CLOSE_GROUP_SIZE}",
870 remote_peers.len()
871 )));
872 }
873 debug_assert!(peer_query_count >= CLOSE_GROUP_SIZE);
874
875 let per_peer_timeout = Duration::from_secs(self.config().quote_timeout_secs);
876 let overall_timeout = Duration::from_secs(QUOTE_COLLECTION_TIMEOUT_SECS);
877
878 let mut quotes = Vec::with_capacity(peer_query_count);
882 let mut already_stored_peers: Vec<(PeerId, [u8; 32])> = Vec::new();
883 let mut failures: Vec<String> = Vec::new();
884
885 let mut bad_quote_count = 0usize;
890
891 let staged_witnessed_collection = matches!(
892 "e_selection_policy,
893 QuoteSelectionPolicy::WitnessedMedianVoters { .. }
894 );
895
896 if staged_witnessed_collection {
897 let mut quote_futures = FuturesUnordered::new();
898 let mut next_peer_index = 0usize;
899 let collect_result: std::result::Result<std::result::Result<(), Error>, _> =
900 tokio::time::timeout(overall_timeout, async {
901 loop {
902 let launch_count = witnessed_quote_launch_budget(
903 quotes.len(),
904 quote_futures.len(),
905 remote_peers.len().saturating_sub(next_peer_index),
906 );
907 for _ in 0..launch_count {
908 let (peer_id, peer_addrs) = &remote_peers[next_peer_index];
909 next_peer_index += 1;
910 quote_futures.push(request_store_quote_from_peer(
911 node.clone(),
912 *peer_id,
913 peer_addrs.clone(),
914 self.next_request_id(),
915 *address,
916 data_size,
917 data_type,
918 per_peer_timeout,
919 ));
920 }
921
922 if quotes.len() >= CLOSE_GROUP_SIZE || quote_futures.is_empty() {
923 break;
924 }
925
926 let Some((peer_id, addrs, quote_result)) = quote_futures.next().await
927 else {
928 break;
929 };
930 record_store_quote_result(
931 peer_id,
932 addrs,
933 quote_result,
934 address,
935 &mut quotes,
936 &mut already_stored_peers,
937 &mut failures,
938 &mut bad_quote_count,
939 );
940 }
941 Ok(())
942 })
943 .await;
944
945 match collect_result {
946 Err(_elapsed) => {
947 warn!(
948 "Quote collection timed out after {overall_timeout:?} for address {}",
949 hex::encode(address)
950 );
951 }
952 Ok(Err(e)) => return Err(e),
953 Ok(Ok(())) => {}
954 }
955 } else {
956 let mut quote_futures = FuturesUnordered::new();
960
961 for (peer_id, peer_addrs) in &remote_peers {
962 quote_futures.push(request_store_quote_from_peer(
963 node.clone(),
964 *peer_id,
965 peer_addrs.clone(),
966 self.next_request_id(),
967 *address,
968 data_size,
969 data_type,
970 per_peer_timeout,
971 ));
972 }
973
974 let collect_result: std::result::Result<std::result::Result<(), Error>, _> =
975 tokio::time::timeout(overall_timeout, async {
976 while let Some((peer_id, addrs, quote_result)) = quote_futures.next().await {
977 record_store_quote_result(
978 peer_id,
979 addrs,
980 quote_result,
981 address,
982 &mut quotes,
983 &mut already_stored_peers,
984 &mut failures,
985 &mut bad_quote_count,
986 );
987 }
988 Ok(())
989 })
990 .await;
991
992 match collect_result {
993 Err(_elapsed) => {
994 warn!(
995 "Quote collection timed out after {overall_timeout:?} for address {}",
996 hex::encode(address)
997 );
998 }
1002 Ok(Err(e)) => return Err(e),
1003 Ok(Ok(())) => {}
1004 }
1005 }
1006
1007 let bad_dropped = drop_quotes_with_bad_bindings(&mut quotes);
1013 if bad_dropped > 0 {
1014 warn!(
1015 "Defensive filter dropped {bad_dropped} quotes with mismatched peer bindings \
1016 for address {} — the per-peer handler should have caught these earlier \
1017 (this indicates an upstream regression)",
1018 hex::encode(address),
1019 );
1020 bad_quote_count += bad_dropped;
1021 }
1022
1023 if !already_stored_peers.is_empty() {
1025 let mut all_peers_by_distance: Vec<(bool, [u8; 32])> = Vec::new();
1026 for (peer_id, _, _, _) in "es {
1027 all_peers_by_distance.push((false, peer_xor_distance(peer_id, address)));
1028 }
1029 for (_, dist) in &already_stored_peers {
1030 all_peers_by_distance.push((true, *dist));
1031 }
1032 all_peers_by_distance.sort_by_key(|a| a.1);
1033
1034 let close_group_stored = all_peers_by_distance
1035 .iter()
1036 .take(CLOSE_GROUP_SIZE)
1037 .filter(|(is_stored, _)| *is_stored)
1038 .count();
1039
1040 if close_group_stored >= CLOSE_GROUP_MAJORITY {
1041 debug!(
1042 "Chunk {} already stored ({close_group_stored}/{CLOSE_GROUP_SIZE} close-group peers confirm)",
1043 hex::encode(address)
1044 );
1045 return Err(Error::AlreadyStored);
1046 }
1047 }
1048
1049 let already_stored_count = already_stored_peers.len();
1050 let failure_count = failures.len();
1051 let quote_count = quotes.len();
1052 let total_responses = quote_count + failure_count + already_stored_count;
1053
1054 if quotes.len() >= CLOSE_GROUP_SIZE {
1055 let selected_quotes = match quote_selection_policy {
1056 QuoteSelectionPolicy::ClosestByDistance => select_closest_quotes(quotes, address),
1057 QuoteSelectionPolicy::WitnessedMedianVoters {
1058 voters_by_peer,
1059 quorum,
1060 } => select_witnessed_median_voter_quotes(quotes, address, &voters_by_peer, quorum)
1061 .ok_or_else(|| {
1062 Error::InsufficientPeers(format!(
1063 "Got {quote_count} quotes, need {CLOSE_GROUP_SIZE} whose paid \
1064 median issuer is recognised by at least {} \
1065 selected witness peers ({total_responses} responses: \
1066 {already_stored_count} already_stored, {failure_count} failed \
1067 including {bad_quote_count} with mismatched peer bindings). \
1068 Failures: [{}]",
1069 quorum,
1070 failures.join("; ")
1071 ))
1072 })?,
1073 };
1074
1075 info!(
1076 "Collected {} quotes for address {} ({total_responses} responses: \
1077 {quote_count} ok, {already_stored_count} already_stored, {failure_count} failed, \
1078 {bad_quote_count} bad-binding)",
1079 selected_quotes.len(),
1080 hex::encode(address),
1081 );
1082 return Ok(selected_quotes);
1083 }
1084
1085 Err(Error::InsufficientPeers(format!(
1086 "Got {quote_count} quotes, need {CLOSE_GROUP_SIZE} ({total_responses} responses: \
1087 {already_stored_count} already_stored, {failure_count} failed including \
1088 {bad_quote_count} with mismatched peer bindings). Failures: [{}]",
1089 failures.join("; ")
1090 )))
1091 }
1092}
1093
1094#[cfg(test)]
1095#[allow(clippy::unwrap_used, clippy::expect_used)]
1096mod tests {
1097 use super::*;
1109 use ant_protocol::evm::RewardsAddress;
1110 use ant_protocol::pqc::ops::{MlDsaOperations, MlDsaPublicKey};
1111 use ant_protocol::transport::{DHTNode, MlDsa65, ResponderView, WitnessedCloseGroup};
1112 use std::time::SystemTime;
1113 use xor_name::XorName;
1114
1115 struct Keypair {
1117 peer_id: PeerId,
1118 pub_key_bytes: Vec<u8>,
1119 }
1120
1121 fn gen_keypair() -> Keypair {
1122 let ml_dsa = MlDsa65::new();
1123 let (pub_key, _sk) = ml_dsa.generate_keypair().expect("ML-DSA-65 keygen");
1124 let pub_key_bytes = pub_key.as_bytes().to_vec();
1125 let peer_id = PeerId::from_bytes(compute_address(&pub_key_bytes));
1126 Keypair {
1127 peer_id,
1128 pub_key_bytes,
1129 }
1130 }
1131
1132 fn good_quote_real() -> (PeerId, Vec<MultiAddr>, PaymentQuote, Amount) {
1135 let kp = gen_keypair();
1136 let quote = PaymentQuote {
1137 content: XorName([0u8; 32]),
1138 timestamp: SystemTime::UNIX_EPOCH,
1139 price: Amount::ZERO,
1140 rewards_address: RewardsAddress::new([0u8; 20]),
1141 pub_key: kp.pub_key_bytes,
1142 signature: Vec::new(),
1143 };
1144 (kp.peer_id, Vec::new(), quote, Amount::ZERO)
1145 }
1146
1147 fn bad_quote_real() -> (PeerId, Vec<MultiAddr>, PaymentQuote, Amount) {
1152 let claimed = gen_keypair();
1153 let signing = gen_keypair();
1154 assert_ne!(claimed.pub_key_bytes, signing.pub_key_bytes);
1155 assert_ne!(claimed.peer_id.as_bytes(), signing.peer_id.as_bytes());
1156 let quote = PaymentQuote {
1157 content: XorName([0u8; 32]),
1158 timestamp: SystemTime::UNIX_EPOCH,
1159 price: Amount::ZERO,
1160 rewards_address: RewardsAddress::new([0u8; 20]),
1161 pub_key: signing.pub_key_bytes,
1162 signature: Vec::new(),
1163 };
1164 (claimed.peer_id, Vec::new(), quote, Amount::ZERO)
1165 }
1166
1167 fn witnessed_test_node(seed: u8) -> DHTNode {
1168 DHTNode {
1169 peer_id: PeerId::from_bytes([seed; 32]),
1170 addresses: Vec::new(),
1171 address_types: Vec::new(),
1172 distance: None,
1173 reliability: 1.0,
1174 }
1175 }
1176
1177 fn witnessed_test_nodes(seeds: &[u8]) -> Vec<DHTNode> {
1178 seeds.iter().copied().map(witnessed_test_node).collect()
1179 }
1180
1181 fn witnessed_test_view(responder: u8, closest: &[u8]) -> ResponderView {
1182 ResponderView {
1183 responder: PeerId::from_bytes([responder; 32]),
1184 closest: witnessed_test_nodes(closest),
1185 }
1186 }
1187
1188 fn synthetic_peer(seed: u8) -> PeerId {
1189 PeerId::from_bytes([seed; 32])
1190 }
1191
1192 fn synthetic_quote(seed: u8, price: u64) -> (PeerId, Vec<MultiAddr>, PaymentQuote, Amount) {
1193 let amount = Amount::from(price);
1194 let quote = PaymentQuote {
1195 content: XorName([0u8; 32]),
1196 timestamp: SystemTime::UNIX_EPOCH,
1197 price: amount,
1198 rewards_address: RewardsAddress::new([0u8; 20]),
1199 pub_key: Vec::new(),
1200 signature: Vec::new(),
1201 };
1202 (synthetic_peer(seed), Vec::new(), quote, amount)
1203 }
1204
1205 fn synthetic_voters(seeds: &[u8]) -> HashSet<PeerId> {
1206 seeds.iter().copied().map(synthetic_peer).collect()
1207 }
1208
1209 fn quote_peer_seeds(quotes: &[(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)]) -> Vec<u8> {
1210 quotes
1211 .iter()
1212 .map(|(peer_id, _, _, _)| peer_id.as_bytes()[0])
1213 .collect()
1214 }
1215
1216 fn put_peer_seeds(peers: &[(PeerId, Vec<MultiAddr>)]) -> Vec<u8> {
1217 peers
1218 .iter()
1219 .map(|(peer_id, _)| peer_id.as_bytes()[0])
1220 .collect()
1221 }
1222
1223 fn put_peers_from_seeds(seeds: &[u8]) -> Vec<(PeerId, Vec<MultiAddr>)> {
1224 seeds
1225 .iter()
1226 .copied()
1227 .map(|seed| (synthetic_peer(seed), Vec::new()))
1228 .collect()
1229 }
1230
1231 fn storer_binding_would_accept(peer_id: &PeerId, quote: &PaymentQuote) -> bool {
1240 if MlDsaPublicKey::from_bytes("e.pub_key).is_err() {
1241 return false;
1242 }
1243 compute_address("e.pub_key) == *peer_id.as_bytes()
1244 }
1245
1246 #[test]
1251 fn binding_accepts_real_self_consistent_keypair() {
1252 let (peer_id, _, quote, _) = good_quote_real();
1253 assert!(quote_binding_is_valid(&peer_id, "e));
1256 assert!(storer_binding_would_accept(&peer_id, "e));
1258 }
1259
1260 #[test]
1261 fn binding_rejects_real_crossed_keypair() {
1262 let (peer_id, _, quote, _) = bad_quote_real();
1263 assert!(!quote_binding_is_valid(&peer_id, "e));
1264 assert!(!storer_binding_would_accept(&peer_id, "e));
1265 }
1266
1267 #[test]
1268 fn binding_rejects_oversize_pubkey() {
1269 let oversized = vec![0u8; ML_DSA_PUB_KEY_LEN + 1];
1273 let peer_id = PeerId::from_bytes(compute_address(&oversized));
1274 let quote = PaymentQuote {
1275 content: XorName([0u8; 32]),
1276 timestamp: SystemTime::UNIX_EPOCH,
1277 price: Amount::ZERO,
1278 rewards_address: RewardsAddress::new([0u8; 20]),
1279 pub_key: oversized,
1280 signature: Vec::new(),
1281 };
1282 assert_eq!(compute_address("e.pub_key), *peer_id.as_bytes());
1285 assert!(
1286 !quote_binding_is_valid(&peer_id, "e),
1287 "predicate must reject oversize pub_key even when BLAKE3 happens to match"
1288 );
1289 assert!(!storer_binding_would_accept(&peer_id, "e));
1290 }
1291
1292 #[test]
1293 fn binding_rejects_undersize_pubkey() {
1294 let undersized = vec![0u8; ML_DSA_PUB_KEY_LEN - 1];
1295 let peer_id = PeerId::from_bytes(compute_address(&undersized));
1296 let quote = PaymentQuote {
1297 content: XorName([0u8; 32]),
1298 timestamp: SystemTime::UNIX_EPOCH,
1299 price: Amount::ZERO,
1300 rewards_address: RewardsAddress::new([0u8; 20]),
1301 pub_key: undersized,
1302 signature: Vec::new(),
1303 };
1304 assert!(!quote_binding_is_valid(&peer_id, "e));
1305 assert!(!storer_binding_would_accept(&peer_id, "e));
1306 }
1307
1308 #[test]
1313 fn quote_query_counts_keep_single_node_close_group_only() {
1314 assert_eq!(single_node_quote_query_count(), CLOSE_GROUP_SIZE);
1315 assert_eq!(SINGLE_NODE_WITNESSED_VIEW_COUNT, 20);
1316 assert!(SINGLE_NODE_WITNESSED_VIEW_COUNT > single_node_quote_query_count());
1317 assert_eq!(witnessed_close_group_quorum(), 5);
1318 assert_eq!(witnessed_close_group_quorum_for_missing_views(0), 5);
1319 assert_eq!(witnessed_close_group_quorum_for_missing_views(1), 4);
1320 assert_eq!(witnessed_close_group_quorum_for_missing_views(2), 3);
1321 assert_eq!(
1322 fault_tolerant_quote_query_count(),
1323 CLOSE_GROUP_SIZE * FAULT_TOLERANT_QUOTE_QUERY_MULTIPLIER
1324 );
1325 assert!(fault_tolerant_quote_query_count() > single_node_quote_query_count());
1326 }
1327
1328 #[test]
1329 fn witnessed_quote_launch_budget_keeps_exact_quote_window() {
1330 assert_eq!(
1331 witnessed_quote_launch_budget(0, 0, CLOSE_GROUP_SIZE * 2),
1332 CLOSE_GROUP_SIZE,
1333 "initial SNP quote fetch should launch the closest seven peers"
1334 );
1335 assert_eq!(
1336 witnessed_quote_launch_budget(1, CLOSE_GROUP_SIZE - 1, CLOSE_GROUP_SIZE),
1337 0,
1338 "a successful quote should not launch an extra fallback"
1339 );
1340 assert_eq!(
1341 witnessed_quote_launch_budget(0, CLOSE_GROUP_SIZE - 1, CLOSE_GROUP_SIZE),
1342 1,
1343 "a failed in-flight quote should launch the next closest fallback"
1344 );
1345 assert_eq!(
1346 witnessed_quote_launch_budget(CLOSE_GROUP_SIZE - 1, 0, 3),
1347 1,
1348 "only one more peer is needed for the seventh quote"
1349 );
1350 assert_eq!(
1351 witnessed_quote_launch_budget(0, 0, CLOSE_GROUP_SIZE - 1),
1352 CLOSE_GROUP_SIZE - 1,
1353 "launch budget is capped by remaining candidates"
1354 );
1355 }
1356
1357 #[test]
1358 fn witnessed_candidates_sort_by_xor_distance_then_votes() {
1359 let address = [0u8; 32];
1360 let witnessed = WitnessedCloseGroup {
1361 target: address,
1362 k: CLOSE_GROUP_SIZE,
1363 initial_closest: witnessed_test_nodes(&[1, 2, 3, 4, 5, 6, 7]),
1364 responder_views: vec![
1365 witnessed_test_view(1, &[1, 9]),
1366 witnessed_test_view(2, &[1, 9]),
1367 witnessed_test_view(3, &[1, 9]),
1368 witnessed_test_view(4, &[1, 9]),
1369 witnessed_test_view(5, &[1, 9]),
1370 witnessed_test_view(6, &[9]),
1371 witnessed_test_view(7, &[9]),
1372 ],
1373 };
1374
1375 let candidates =
1376 witnessed_consensus_candidates(&witnessed, &address, witnessed_close_group_quorum());
1377
1378 assert_eq!(
1379 candidates
1380 .iter()
1381 .map(|candidate| candidate.node.peer_id.as_bytes()[0])
1382 .collect::<Vec<_>>(),
1383 vec![1, 9],
1384 "XOR closeness must be the primary sort before quote collection"
1385 );
1386 }
1387
1388 #[test]
1389 fn witnessed_quote_peers_error_is_typed_and_pre_payment_when_consensus_is_short() {
1390 let address = [0u8; 32];
1391 let responder_views = (1..=7)
1392 .map(|responder| witnessed_test_view(responder, &[1, 2, 3, 4]))
1393 .collect();
1394 let witnessed = WitnessedCloseGroup {
1395 target: address,
1396 k: CLOSE_GROUP_SIZE,
1397 initial_closest: witnessed_test_nodes(&[1, 2, 3, 4, 5, 6, 7]),
1398 responder_views,
1399 };
1400
1401 let err = witnessed_quote_selection_or_error(
1402 &address,
1403 &witnessed,
1404 CLOSE_GROUP_SIZE,
1405 witnessed_close_group_quorum(),
1406 )
1407 .expect_err("short witnessed consensus must fail before payment");
1408
1409 match err {
1410 Error::InsufficientPeers(message) => {
1411 assert!(message.contains("before payment"));
1412 assert!(message.contains("vote_counts"));
1413 assert!(message.contains("quorum"));
1414 }
1415 other => panic!("expected typed InsufficientPeers error, got {other:?}"),
1416 }
1417 }
1418
1419 #[test]
1420 fn witnessed_quote_peers_include_quorum_fallback_candidates() {
1421 const EXTRA_QUORUM_CANDIDATES: usize = 1;
1422
1423 let address = [0u8; 32];
1424 let witnessed = WitnessedCloseGroup {
1425 target: address,
1426 k: CLOSE_GROUP_SIZE,
1427 initial_closest: witnessed_test_nodes(&[1, 2, 3, 4, 5, 6, 7]),
1428 responder_views: vec![
1429 witnessed_test_view(1, &[1, 2, 3, 4, 5, 6, 7]),
1430 witnessed_test_view(2, &[1, 2, 3, 4, 5, 6, 8]),
1431 witnessed_test_view(3, &[1, 2, 3, 4, 5, 7, 8]),
1432 witnessed_test_view(4, &[1, 2, 3, 4, 6, 7, 8]),
1433 witnessed_test_view(5, &[1, 2, 3, 5, 6, 7, 8]),
1434 witnessed_test_view(6, &[1, 2, 4, 5, 6, 7, 8]),
1435 witnessed_test_view(7, &[1, 3, 4, 5, 6, 7, 8]),
1436 ],
1437 };
1438
1439 let selection = witnessed_quote_selection_or_error(
1440 &address,
1441 &witnessed,
1442 CLOSE_GROUP_SIZE,
1443 witnessed_close_group_quorum(),
1444 )
1445 .expect("fallback candidates should be retained for quote collection");
1446
1447 assert_eq!(
1448 selection.quote_peers.len(),
1449 CLOSE_GROUP_SIZE + EXTRA_QUORUM_CANDIDATES
1450 );
1451 assert_eq!(
1452 selection
1453 .quote_peers
1454 .iter()
1455 .map(|peer| peer.peer_id.as_bytes()[0])
1456 .collect::<Vec<_>>(),
1457 vec![1, 2, 3, 4, 5, 6, 7, 8]
1458 );
1459 assert_eq!(
1460 put_peer_seeds(&selection.initial_put_peers),
1461 vec![1, 2, 3, 4, 5, 6, 7]
1462 );
1463 }
1464
1465 #[test]
1466 fn witnessed_quote_peers_lower_quorum_for_missing_responder_views() {
1467 let address = [0u8; 32];
1468 let witnessed = WitnessedCloseGroup {
1469 target: address,
1470 k: CLOSE_GROUP_SIZE,
1471 initial_closest: witnessed_test_nodes(&[1, 2, 3, 4, 5, 6, 7]),
1472 responder_views: vec![
1473 witnessed_test_view(1, &[1, 2, 3, 4, 5, 6, 7]),
1474 witnessed_test_view(2, &[1, 2, 3, 4, 5, 6, 8]),
1475 witnessed_test_view(3, &[1, 2, 3, 4, 5, 7, 8]),
1476 witnessed_test_view(4, &[1, 2, 3, 4, 6, 7, 8]),
1477 witnessed_test_view(5, &[1, 2, 3, 5, 6, 7, 8]),
1478 witnessed_test_view(6, &[1, 2, 4, 5, 6, 7, 8]),
1479 ],
1480 };
1481 let quorum = witnessed_close_group_quorum_for_transcript(&witnessed);
1482
1483 assert_eq!(missing_witnessed_responder_views(&witnessed), 1);
1484 assert_eq!(quorum, 4);
1485
1486 let selection =
1487 witnessed_quote_selection_or_error(&address, &witnessed, CLOSE_GROUP_SIZE, quorum)
1488 .expect(
1489 "one missing responder view should lower quorum and still select candidates",
1490 );
1491
1492 assert_eq!(
1493 selection
1494 .quote_peers
1495 .iter()
1496 .map(|peer| peer.peer_id.as_bytes()[0])
1497 .collect::<Vec<_>>(),
1498 vec![1, 2, 3, 4, 5, 6, 7, 8]
1499 );
1500 assert_eq!(selection.quorum, quorum);
1501 }
1502
1503 #[test]
1504 fn witnessed_quote_selection_keeps_closest_set_with_median_voter_quorum() {
1505 const MEDIAN_ISSUER_SEED: u8 = 7;
1506 const FAR_SUPPORTING_VOTER_SEED: u8 = 20;
1507 const UNSUCCESSFUL_SUPPORTING_VOTER_SEED: u8 = 21;
1508
1509 let address = [0u8; 32];
1510 let quotes = vec![
1511 synthetic_quote(1, 10),
1512 synthetic_quote(2, 20),
1513 synthetic_quote(3, 30),
1514 synthetic_quote(6, 50),
1515 synthetic_quote(MEDIAN_ISSUER_SEED, 40),
1516 synthetic_quote(8, 60),
1517 synthetic_quote(9, 70),
1518 synthetic_quote(FAR_SUPPORTING_VOTER_SEED, 80),
1519 ];
1520 let mut voters_by_peer = HashMap::new();
1521 voters_by_peer.insert(
1522 synthetic_peer(MEDIAN_ISSUER_SEED),
1523 synthetic_voters(&[
1524 1,
1525 2,
1526 3,
1527 MEDIAN_ISSUER_SEED,
1528 FAR_SUPPORTING_VOTER_SEED,
1529 UNSUCCESSFUL_SUPPORTING_VOTER_SEED,
1530 ]),
1531 );
1532
1533 let quorum = witnessed_close_group_quorum();
1534 let selected =
1535 select_witnessed_median_voter_quotes(quotes, &address, &voters_by_peer, quorum)
1536 .expect("a supported close-group quote set should be selected");
1537
1538 assert_eq!(quote_peer_seeds(&selected), vec![1, 2, 3, 6, 7, 8, 9]);
1539 let (median_peer_id, _) =
1540 median_paid_quote_issuer(&selected).expect("selected quotes have a median");
1541 assert_eq!(median_peer_id, synthetic_peer(MEDIAN_ISSUER_SEED));
1542 assert!(voters_by_peer[&median_peer_id].len() >= quorum);
1543 }
1544
1545 #[test]
1546 fn witnessed_quote_selection_uses_direct_median_witness_recognition() {
1547 const MEDIAN_ISSUER_SEED: u8 = 7;
1548
1549 let address = [0u8; 32];
1550 let quotes = vec![
1551 synthetic_quote(1, 10),
1552 synthetic_quote(2, 20),
1553 synthetic_quote(3, 30),
1554 synthetic_quote(4, 50),
1555 synthetic_quote(MEDIAN_ISSUER_SEED, 40),
1556 synthetic_quote(8, 60),
1557 synthetic_quote(9, 70),
1558 ];
1559 let mut voters_by_peer = HashMap::new();
1560 voters_by_peer.insert(
1561 synthetic_peer(MEDIAN_ISSUER_SEED),
1562 synthetic_voters(&[20, 21, 22, 23, 24]),
1563 );
1564
1565 let quorum = witnessed_close_group_quorum();
1566 let selected =
1567 select_witnessed_median_voter_quotes(quotes, &address, &voters_by_peer, quorum)
1568 .expect("direct witness recognition should support the paid median issuer");
1569
1570 let (median_peer_id, _) =
1571 median_paid_quote_issuer(&selected).expect("selected quotes have a median");
1572 let selected_peers = selected
1573 .iter()
1574 .map(|(peer_id, _, _, _)| *peer_id)
1575 .collect::<HashSet<_>>();
1576 assert_eq!(median_peer_id, synthetic_peer(MEDIAN_ISSUER_SEED));
1577 assert_eq!(
1578 voters_by_peer[&median_peer_id]
1579 .intersection(&selected_peers)
1580 .count(),
1581 0,
1582 "recognising witnesses need not also be selected quote issuers"
1583 );
1584 assert_eq!(voters_by_peer[&median_peer_id].len(), quorum);
1585 }
1586
1587 #[test]
1588 fn witnessed_quote_selection_rejects_median_without_witness_quorum() {
1589 const MEDIAN_ISSUER_SEED: u8 = 7;
1590
1591 let address = [0u8; 32];
1592 let quotes = vec![
1593 synthetic_quote(1, 10),
1594 synthetic_quote(2, 20),
1595 synthetic_quote(3, 30),
1596 synthetic_quote(6, 50),
1597 synthetic_quote(MEDIAN_ISSUER_SEED, 40),
1598 synthetic_quote(8, 60),
1599 synthetic_quote(9, 70),
1600 synthetic_quote(10, 80),
1601 ];
1602 let mut voters_by_peer = HashMap::new();
1603 voters_by_peer.insert(
1604 synthetic_peer(MEDIAN_ISSUER_SEED),
1605 synthetic_voters(&[1, 2, 3, 20]),
1606 );
1607
1608 let selected = select_witnessed_median_voter_quotes(
1609 quotes,
1610 &address,
1611 &voters_by_peer,
1612 witnessed_close_group_quorum(),
1613 );
1614
1615 assert!(
1616 selected.is_none(),
1617 "the selector must not return a paid quote set when fewer than the \
1618 witnessed median voter quorum recognised the paid median issuer"
1619 );
1620 }
1621
1622 #[test]
1623 fn put_peers_prioritise_median_voters_without_reordering_quotes() {
1624 const MEDIAN_ISSUER_SEED: u8 = 7;
1625
1626 let quotes = vec![
1627 synthetic_quote(1, 10),
1628 synthetic_quote(2, 20),
1629 synthetic_quote(3, 30),
1630 synthetic_quote(4, 50),
1631 synthetic_quote(5, 60),
1632 synthetic_quote(6, 70),
1633 synthetic_quote(MEDIAN_ISSUER_SEED, 40),
1634 ];
1635 let mut voters_by_peer = HashMap::new();
1636 voters_by_peer.insert(
1637 synthetic_peer(MEDIAN_ISSUER_SEED),
1638 synthetic_voters(&[3, 4, 5, 6, MEDIAN_ISSUER_SEED]),
1639 );
1640
1641 let put_candidates = put_peers_from_seeds(&[1, 2, 3, 4, 5, 6, 7]);
1642 let put_peers = put_peers_with_median_voters_first(
1643 "es,
1644 &put_candidates,
1645 &voters_by_peer,
1646 witnessed_close_group_quorum(),
1647 )
1648 .expect("median voters should produce an ordered PUT set");
1649
1650 assert_eq!(quote_peer_seeds("es), vec![1, 2, 3, 4, 5, 6, 7]);
1651 let (median_peer_id, _) =
1652 median_paid_quote_issuer("es).expect("selected quotes have a median");
1653 assert_eq!(median_peer_id, synthetic_peer(MEDIAN_ISSUER_SEED));
1654 assert_eq!(put_peer_seeds(&put_peers), vec![3, 4, 5, 6, 7, 1, 2]);
1655 }
1656
1657 #[test]
1658 fn filter_drops_only_bad_bindings_and_leaves_storer_acceptable_quotes() {
1659 let mut quotes = vec![
1660 good_quote_real(),
1661 bad_quote_real(),
1662 good_quote_real(),
1663 bad_quote_real(),
1664 good_quote_real(),
1665 ];
1666
1667 let dropped = drop_quotes_with_bad_bindings(&mut quotes);
1668
1669 assert_eq!(dropped, 2, "two crossed-key quotes must be dropped");
1670 assert_eq!(quotes.len(), 3, "three real-key quotes must remain");
1671
1672 for (peer_id, _, quote, _) in "es {
1678 assert!(
1679 storer_binding_would_accept(peer_id, quote),
1680 "every retained quote must satisfy the full storer-side spec"
1681 );
1682 }
1683 }
1684
1685 #[test]
1686 fn filter_is_noop_when_all_quotes_are_storer_acceptable() {
1687 let mut quotes: Vec<_> = (0..5).map(|_| good_quote_real()).collect();
1688 let before = quotes.len();
1689 let dropped = drop_quotes_with_bad_bindings(&mut quotes);
1690 assert_eq!(dropped, 0);
1691 assert_eq!(quotes.len(), before);
1692 for (peer_id, _, quote, _) in "es {
1693 assert!(storer_binding_would_accept(peer_id, quote));
1694 }
1695 }
1696
1697 #[test]
1698 fn filter_drops_all_when_every_responder_is_bad() {
1699 let mut quotes: Vec<_> = (0..fault_tolerant_quote_query_count())
1704 .map(|_| bad_quote_real())
1705 .collect();
1706 let dropped = drop_quotes_with_bad_bindings(&mut quotes);
1707 assert_eq!(dropped, fault_tolerant_quote_query_count());
1708 assert!(quotes.is_empty());
1709 }
1710
1711 #[test]
1712 fn filter_preserves_quote_payload_byte_for_byte() {
1713 let (peer_id, addrs, original_quote, amount) = good_quote_real();
1718 let mut quotes = vec![(peer_id, addrs.clone(), original_quote.clone(), amount)];
1719 let _ = drop_quotes_with_bad_bindings(&mut quotes);
1720
1721 let (kept_peer, kept_addrs, kept_quote, kept_amount) =
1722 quotes.pop().expect("the good quote must survive filtering");
1723 assert_eq!(kept_peer.as_bytes(), peer_id.as_bytes());
1724 assert_eq!(kept_addrs.len(), addrs.len());
1725 assert_eq!(kept_amount, amount);
1726 assert_eq!(kept_quote.pub_key, original_quote.pub_key);
1727 assert_eq!(kept_quote.signature, original_quote.signature);
1728 assert_eq!(kept_quote.content.0, original_quote.content.0);
1729 assert_eq!(kept_quote.timestamp, original_quote.timestamp);
1730 assert_eq!(kept_quote.price, original_quote.price);
1731 assert_eq!(kept_quote.rewards_address, original_quote.rewards_address);
1732 }
1733
1734 #[test]
1765 fn repro_apr_30_storer_would_have_rejected_pre_filter_and_accepts_post_filter() {
1766 let over_query_count = fault_tolerant_quote_query_count();
1767 let mut quotes: Vec<_> = (0..over_query_count - 1)
1768 .map(|_| good_quote_real())
1769 .collect();
1770 quotes.insert(over_query_count / 2, bad_quote_real());
1773 assert_eq!(quotes.len(), over_query_count);
1774
1775 let storer_would_reject_count = quotes
1777 .iter()
1778 .filter(|(p, _, q, _)| !storer_binding_would_accept(p, q))
1779 .count();
1780 assert_eq!(
1781 storer_would_reject_count, 1,
1782 "exactly one quote (the crossed-key one) must be rejected by the storer spec"
1783 );
1784
1785 let dropped = drop_quotes_with_bad_bindings(&mut quotes);
1787 assert_eq!(dropped, 1, "exactly the crossed-key quote must be filtered");
1788
1789 for (peer_id, _, quote, _) in "es {
1791 assert!(
1792 storer_binding_would_accept(peer_id, quote),
1793 "every post-filter quote must be accepted by the storer spec — \
1794 this is what the filter guarantees before any quote set is used"
1795 );
1796 }
1797
1798 assert!(
1800 quotes.len() >= CLOSE_GROUP_SIZE,
1801 "after filtering, at least CLOSE_GROUP_SIZE good quotes must remain \
1802 so a fault-tolerant probe can still return a full close group"
1803 );
1804 }
1805
1806 #[test]
1811 fn filter_leaves_short_set_when_too_many_bad_peers() {
1812 let good_count = CLOSE_GROUP_SIZE - 1;
1813 let bad_count = fault_tolerant_quote_query_count() - good_count;
1814 let mut quotes: Vec<_> = std::iter::repeat_with(bad_quote_real)
1815 .take(bad_count)
1816 .chain(std::iter::repeat_with(good_quote_real).take(good_count))
1817 .collect();
1818
1819 let dropped = drop_quotes_with_bad_bindings(&mut quotes);
1820 assert_eq!(dropped, bad_count);
1821 assert!(
1822 quotes.len() < CLOSE_GROUP_SIZE,
1823 "this is the precondition for InsufficientPeers downstream"
1824 );
1825 for (peer_id, _, quote, _) in "es {
1827 assert!(storer_binding_would_accept(peer_id, quote));
1828 }
1829 }
1830
1831 fn serialize_quote(quote: &PaymentQuote) -> Vec<u8> {
1846 rmp_serde::to_vec(quote).expect("serialize quote")
1847 }
1848
1849 #[test]
1850 fn classifier_accepts_real_self_consistent_quote() {
1851 let (peer_id, _, quote, _) = good_quote_real();
1852 let bytes = serialize_quote("e);
1853 let result = classify_quote_response(&peer_id, &bytes, false);
1854 match result {
1855 Ok((q, price)) => {
1856 assert_eq!(q.pub_key, quote.pub_key);
1857 assert_eq!(price, quote.price);
1858 }
1859 Err(e) => panic!("expected Ok, got {e}"),
1860 }
1861 }
1862
1863 #[test]
1864 fn classifier_rejects_crossed_keypair_with_typed_error() {
1865 let (peer_id, _, quote, _) = bad_quote_real();
1866 let bytes = serialize_quote("e);
1867 let result = classify_quote_response(&peer_id, &bytes, false);
1868 match result {
1869 Err(Error::BadQuoteBinding {
1870 peer_id: pid,
1871 detail,
1872 }) => {
1873 assert_eq!(pid, peer_id.to_string());
1874 assert!(
1875 detail.contains("BLAKE3(pub_key)="),
1876 "diagnostic detail must include the derived peer id: {detail}"
1877 );
1878 }
1879 other => panic!("expected BadQuoteBinding for crossed-key quote, got {other:?}"),
1880 }
1881 }
1882
1883 #[test]
1894 fn classifier_rejects_already_stored_vote_from_bad_binding_peer() {
1895 let (peer_id, _, quote, _) = bad_quote_real();
1896 let bytes = serialize_quote("e);
1897 let result = classify_quote_response(&peer_id, &bytes, true);
1899 assert!(
1900 matches!(result, Err(Error::BadQuoteBinding { .. })),
1901 "crossed-key peer must be classified BadQuoteBinding even when \
1902 voting already_stored=true; got {result:?}"
1903 );
1904 }
1905
1906 #[test]
1909 fn classifier_honours_already_stored_vote_from_good_binding_peer() {
1910 let (peer_id, _, quote, _) = good_quote_real();
1911 let bytes = serialize_quote("e);
1912 let result = classify_quote_response(&peer_id, &bytes, true);
1913 assert!(
1914 matches!(result, Err(Error::AlreadyStored)),
1915 "honest peer's already_stored vote must be honoured; got {result:?}"
1916 );
1917 }
1918
1919 #[test]
1920 fn classifier_returns_serialization_error_on_bad_bytes() {
1921 let (peer_id, _, _, _) = good_quote_real();
1922 let garbage = b"this is not a valid msgpack PaymentQuote".to_vec();
1923 let result = classify_quote_response(&peer_id, &garbage, false);
1924 assert!(
1925 matches!(result, Err(Error::Serialization(_))),
1926 "garbage bytes must produce a Serialization error; got {result:?}"
1927 );
1928 }
1929
1930 #[test]
1933 fn classifier_verdict_matches_storer_binding_spec_for_mixed_responders() {
1934 let mut responders: Vec<(PeerId, PaymentQuote)> = (0..12)
1935 .map(|_| {
1936 let (p, _, q, _) = good_quote_real();
1937 (p, q)
1938 })
1939 .collect();
1940 for _ in 0..4 {
1941 let (p, _, q, _) = bad_quote_real();
1942 responders.push((p, q));
1943 }
1944
1945 for (peer_id, quote) in &responders {
1946 let bytes = serialize_quote(quote);
1947 let storer_verdict = storer_binding_would_accept(peer_id, quote);
1948 let classifier_verdict = classify_quote_response(peer_id, &bytes, false).is_ok();
1949 assert_eq!(
1950 classifier_verdict, storer_verdict,
1951 "classifier and storer-binding-spec must agree on every responder \
1952 (peer_id={}, storer={storer_verdict}, classifier={classifier_verdict})",
1953 peer_id
1954 );
1955 }
1956 }
1957}