ant-core 0.2.8

Headless Rust library for the Autonomi network: data storage and retrieval with self-encryption and EVM payments, plus node lifecycle management.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
//! Chunk storage operations.
//!
//! Chunks are immutable, content-addressed data blocks where the address
//! is the BLAKE3 hash of the content.

use crate::data::client::adaptive::Outcome;
use crate::data::client::batch::{finalize_batch_payment, PreparedChunk};
use crate::data::client::peer_xor_distance;
use crate::data::client::Client;
use crate::data::error::{Error, Result};
use ant_protocol::evm::{QuoteHash, TxHash};
use ant_protocol::transport::{MultiAddr, PeerId};
use ant_protocol::{
    compute_address, detect_proof_type, send_and_await_chunk_response, ChunkGetRequest,
    ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest, ChunkPutResponse, DataChunk,
    ProofType, XorName, CLOSE_GROUP_MAJORITY,
};
use bytes::Bytes;
use futures::stream::{self, FuturesUnordered, StreamExt};
use std::collections::HashMap;
use std::future::Future;
use std::time::{Duration, Instant};
use tracing::{debug, info, warn};

/// Data type identifier for chunks (used in quote requests).
const CHUNK_DATA_TYPE: u32 = 0;

/// Result of one sweep over a chunk's close group.
///
/// Either we got the chunk from some peer, or every peer in the group
/// returned NotFound, timed out, or hit a transport / protocol error.
/// The counts feed the retry decision (`is_authoritative_not_found`):
/// only a *unanimous* NotFound from a *well-sampled* close group counts
/// as authoritative data absence — anything else (a non-unanimous
/// result, or a thin/under-sampled DHT walk) leaves room for the actual
/// storer to be in the timeout / network-error / protocol-error bucket
/// or outside the sampled view, and is worth a retry against a freshly
/// re-walked close group.
struct CloseGroupOutcome {
    chunk: Option<DataChunk>,
    queried: usize,
    not_found: usize,
    timeout: usize,
    network_err: usize,
    /// Counts peers that responded with a remote `Error` (e.g.
    /// "Chunk verification failed") or any other protocol-level error
    /// that classifies as `Error::Protocol`. Treated the same as
    /// `timeout` / `network_err` for retry decisions: one peer's bad
    /// response must not abort the whole close-group sweep — the
    /// remaining peers might still have a clean copy.
    protocol_err: usize,
}

/// `true` if the close-group sweep is strong enough evidence to
/// conclude the chunk is genuinely absent, so retrying is pointless.
///
/// Two conditions, both required:
///
/// 1. *Unanimous*: every peer we managed to query responded with an
///    authoritative NotFound (`not_found == queried`). An earlier
///    version used a majority quorum (`not_found >= close_group_size /
///    2 + 1`), but production traffic disproved that: storage
///    replicates to `CLOSE_GROUP_MAJORITY` (4) of the K=7 close-group
///    peers, so up to 3 peers legitimately don't store any given chunk
///    and a `not_found=4 timeout=3` result is "3 storers we couldn't
///    reach" plus "4 non-storers," not data loss.
///
/// 2. *Well-sampled*: at least `CLOSE_GROUP_MAJORITY` peers were
///    queried. `close_group_peers` (via `find_closest_peers`) accepts
///    any non-empty DHT result, so a thin/under-sampled walk can return
///    1 or 2 peers. A `1/1` or `3/3` NotFound from such a walk is NOT
///    authoritative — the real replica majority may sit entirely
///    outside that narrow view. Requiring a majority-sized sample means
///    a thin lookup falls through to the retry (which re-walks the DHT)
///    instead of being declared a final absence.
fn is_authoritative_not_found(not_found: usize, queried: usize) -> bool {
    queried >= CLOSE_GROUP_MAJORITY && not_found == queried
}

/// Store-response timeout for non-merkle chunk PUTs.
const STORE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(10);

/// Extra waves allowed after the computed diagnostic peer-sweep deadline.
const DIAGNOSTIC_TIMEOUT_PADDING_WAVES: usize = 1;

/// Result of fetching one chunk address from one close-group peer.
pub struct ChunkPeerGetResult {
    /// Peer queried for the chunk.
    pub peer_id: PeerId,
    /// Known network addresses used for the peer.
    pub peer_addrs: Vec<MultiAddr>,
    /// XOR distance from `peer_id` to the chunk address.
    pub xor_distance: [u8; 32],
    /// Per-peer fetch result.
    pub chunk_result: Result<Option<DataChunk>>,
}

#[derive(Clone)]
struct ChunkPeerGetTarget {
    index: usize,
    peer_id: PeerId,
    peer_addrs: Vec<MultiAddr>,
    xor_distance: [u8; 32],
}

fn chunk_peer_get_targets(
    peers: Vec<(PeerId, Vec<MultiAddr>)>,
    address: &XorName,
) -> Vec<ChunkPeerGetTarget> {
    peers
        .into_iter()
        .enumerate()
        .map(|(index, (peer_id, peer_addrs))| ChunkPeerGetTarget {
            index,
            peer_id,
            peer_addrs,
            xor_distance: peer_xor_distance(&peer_id, address),
        })
        .collect()
}

fn sort_chunk_peer_get_results(results: &mut [ChunkPeerGetResult]) {
    results.sort_by_key(|result| result.xor_distance);
}

fn diagnostic_peer_get_concurrency(peer_count: usize, close_group_size: usize) -> usize {
    peer_count.min(close_group_size.max(1))
}

fn diagnostic_peer_get_overall_timeout(
    per_peer_timeout: Duration,
    target_count: usize,
    concurrency_limit: usize,
) -> Duration {
    let concurrency_limit = concurrency_limit.max(1);
    let peer_get_waves = target_count.div_ceil(concurrency_limit);
    let timeout_waves = peer_get_waves.saturating_add(DIAGNOSTIC_TIMEOUT_PADDING_WAVES);
    let timeout_waves = u32::try_from(timeout_waves).unwrap_or(u32::MAX);

    per_peer_timeout.saturating_mul(timeout_waves)
}

fn timed_out_chunk_peer_get_result(
    target: &ChunkPeerGetTarget,
    address: &XorName,
    timeout: Duration,
) -> ChunkPeerGetResult {
    let addr_hex = hex::encode(address);
    let timeout_secs = timeout.as_secs();
    ChunkPeerGetResult {
        peer_id: target.peer_id,
        peer_addrs: target.peer_addrs.clone(),
        xor_distance: target.xor_distance,
        chunk_result: Err(Error::Timeout(format!(
            "Diagnostic chunk GET sweep timed out before peer {} completed for chunk {addr_hex} after {timeout_secs}s",
            target.peer_id
        ))),
    }
}

fn store_response_timeout_for_proof(proof: &[u8], merkle_timeout_secs: u64) -> Duration {
    match detect_proof_type(proof) {
        Some(ProofType::Merkle) => Duration::from_secs(merkle_timeout_secs),
        _ => STORE_RESPONSE_TIMEOUT,
    }
}

impl Client {
    /// Run `chunk_get` and feed one byte-aware observation per call to
    /// the adaptive fetch limiter. Use this from any consumer that
    /// drives chunk-fetch concurrency from `controller().fetch.current()`
    /// — the controller's window relies on every call along the hot
    /// path producing an observation.
    ///
    /// Classifier semantics: see `chunk_get_outcome`. Most importantly,
    /// `Ok(None)` is treated as `Outcome::Timeout`, not Success, so a
    /// sustained run of close-group exhaustions correctly drives the
    /// cap down rather than silently inflating it.
    pub(crate) async fn chunk_get_observed(&self, address: &XorName) -> Result<Option<DataChunk>> {
        self.chunk_get_observed_from_closest_peers(address, self.config().close_group_size)
            .await
    }

    pub(crate) async fn chunk_get_observed_from_closest_peers(
        &self,
        address: &XorName,
        peer_count: usize,
    ) -> Result<Option<DataChunk>> {
        let started = Instant::now();
        let result = self.chunk_get_from_closest_peers(address, peer_count).await;
        let latency = started.elapsed();
        let bytes = result
            .as_ref()
            .ok()
            .and_then(Option::as_ref)
            .map_or(0, |chunk| chunk.content.len() as u64);
        self.controller()
            .fetch
            .observe_with_bytes(chunk_get_outcome(&result), latency, bytes);
        result
    }
}

/// Map a `chunk_get` outcome to an adaptive controller `Outcome`.
///
/// This is the result-aware classifier used by the file-download paths.
/// It differs from `classify_error` in one critical way: an `Ok(None)`
/// from `chunk_get` is `Outcome::Timeout`, not `Outcome::Success`. By
/// the time `chunk_get` returns `Ok(None)` it has already exhausted
/// the close group across its first attempt + retry sweep, so
/// `Ok(None)` is the controller's load-shedding signal — a sustained
/// run of them on a saturated home link is exactly the case where the
/// cap should shrink.
///
/// Healthy returns (`Ok(Some(_))`) are Success regardless of how many
/// internal peer attempts the chunk_get had to make. The controller
/// does not need to see internal peer noise; that's noise about the
/// production network's natural peer-side variability, not about the
/// client's effective capacity.
pub(crate) fn chunk_get_outcome(result: &Result<Option<DataChunk>>) -> Outcome {
    match result {
        Ok(Some(_)) => Outcome::Success,
        Ok(None) => Outcome::Timeout,
        Err(Error::Timeout(_)) => Outcome::Timeout,
        Err(Error::Network(_)) => Outcome::NetworkError,
        Err(_) => Outcome::ApplicationError,
    }
}

impl Client {
    /// Store a chunk on the Autonomi network with payment.
    ///
    /// Checks if the chunk already exists before paying. If it does,
    /// returns the address immediately without incurring on-chain costs.
    /// Otherwise collects quotes, pays on-chain, then stores with proof
    /// to `CLOSE_GROUP_MAJORITY` peers.
    ///
    /// # Errors
    ///
    /// Returns an error if payment or the network operation fails.
    pub async fn chunk_put(&self, content: Bytes) -> Result<XorName> {
        let address = compute_address(&content);
        let data_size = u64::try_from(content.len())
            .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;

        match self
            .pay_for_storage(&address, data_size, CHUNK_DATA_TYPE)
            .await
        {
            Ok((proof, peers)) => self.chunk_put_to_close_group(content, proof, &peers).await,
            Err(Error::AlreadyStored) => {
                debug!(
                    "Chunk {} already stored on network, skipping payment",
                    hex::encode(address)
                );
                Ok(address)
            }
            Err(e) => Err(e),
        }
    }

    /// Store a chunk to `CLOSE_GROUP_MAJORITY` peers from the quoted set.
    ///
    /// Initially sends the PUT concurrently to the first
    /// `CLOSE_GROUP_MAJORITY` peers. If any fail, falls back to the
    /// remaining peers in the quoted set until majority is reached or
    /// all peers are exhausted.
    ///
    /// # Errors
    ///
    /// Returns an error if fewer than `CLOSE_GROUP_MAJORITY` peers accept
    /// the chunk.
    pub(crate) async fn chunk_put_to_close_group(
        &self,
        content: Bytes,
        proof: Vec<u8>,
        peers: &[(PeerId, Vec<MultiAddr>)],
    ) -> Result<XorName> {
        let address = compute_address(&content);

        let initial_count = peers.len().min(CLOSE_GROUP_MAJORITY);
        let (initial_peers, fallback_peers) = peers.split_at(initial_count);

        let mut put_futures = FuturesUnordered::new();
        for (peer_id, addrs) in initial_peers {
            put_futures.push(self.spawn_chunk_put(content.clone(), proof.clone(), peer_id, addrs));
        }

        let mut success_count = 0usize;
        let mut failures: Vec<String> = Vec::new();
        // Distinguish the *cause* of a quorum shortfall so it feeds the
        // store AIMD limiter correctly (V2-468). If every failure was a
        // structured remote application rejection (`Error::RemotePut` — the
        // node responded and declined: pool-rejected / quote-stale /
        // disk-full), the shortfall is not evidence the client is sending
        // too fast and must not push the limiter down. Anything else
        // (transport failure, or a different error) keeps it a real
        // capacity signal. Hold the first remote rejection as the
        // representative reason to surface when the shortfall is app-only.
        let mut had_non_rejection_failure = false;
        let mut first_remote_rejection: Option<Error> = None;
        let mut fallback_iter = fallback_peers.iter();

        while let Some((peer_id, result)) = put_futures.next().await {
            match result {
                Ok(_) => {
                    success_count += 1;
                    if success_count >= CLOSE_GROUP_MAJORITY {
                        debug!(
                            "Chunk {} stored on {success_count} peers (majority reached)",
                            hex::encode(address)
                        );
                        return Ok(address);
                    }
                }
                Err(e) => {
                    warn!("Failed to store chunk on {peer_id}: {e}");
                    failures.push(format!("{peer_id}: {e}"));
                    if matches!(e, Error::RemotePut { .. }) {
                        if first_remote_rejection.is_none() {
                            first_remote_rejection = Some(e);
                        }
                    } else {
                        had_non_rejection_failure = true;
                    }

                    if let Some((fb_peer, fb_addrs)) = fallback_iter.next() {
                        debug!(
                            "Falling back to peer {fb_peer} for chunk {}",
                            hex::encode(address)
                        );
                        put_futures.push(self.spawn_chunk_put(
                            content.clone(),
                            proof.clone(),
                            fb_peer,
                            fb_addrs,
                        ));
                    }
                }
            }
        }

        // Quorum not reached. If the only failures were structured remote
        // rejections, surface a representative `RemotePut` (classifies
        // `ApplicationError`, still recoverable in the merkle retry path)
        // so the shortfall doesn't suppress the store limiter. Otherwise
        // it's a real capacity shortfall.
        if !had_non_rejection_failure {
            if let Some(remote_rejection) = first_remote_rejection {
                return Err(remote_rejection);
            }
        }

        Err(Error::InsufficientPeers(format!(
            "Stored on {success_count} peers, need {CLOSE_GROUP_MAJORITY}. Failures: [{}]",
            failures.join("; ")
        )))
    }

    /// Spawn a chunk PUT future for a single peer.
    fn spawn_chunk_put<'a>(
        &'a self,
        content: Bytes,
        proof: Vec<u8>,
        peer_id: &'a PeerId,
        addrs: &'a [MultiAddr],
    ) -> impl Future<Output = (PeerId, Result<XorName>)> + 'a {
        let peer_id_owned = *peer_id;
        async move {
            let result = self
                .chunk_put_with_proof(content, proof, &peer_id_owned, addrs)
                .await;
            (peer_id_owned, result)
        }
    }

    /// Store a chunk on the Autonomi network with a pre-built payment proof.
    ///
    /// Sends to a single peer. Callers that need replication across the
    /// close group should use `chunk_put_to_close_group` instead.
    ///
    /// # Errors
    ///
    /// Returns an error if the network operation fails.
    pub async fn chunk_put_with_proof(
        &self,
        content: Bytes,
        proof: Vec<u8>,
        target_peer: &PeerId,
        peer_addrs: &[MultiAddr],
    ) -> Result<XorName> {
        let address = compute_address(&content);
        let node = self.network().node();
        let timeout =
            store_response_timeout_for_proof(&proof, self.config().merkle_store_timeout_secs);
        let timeout_secs = timeout.as_secs();

        let request_id = self.next_request_id();
        // `content` is a refcounted `Bytes` shared with the sibling
        // close-group sends; pass it through directly so each peer shares
        // the same backing buffer instead of deep-copying the 4 MB payload.
        let request = ChunkPutRequest::with_payment(address, content, proof);
        let message = ChunkMessage {
            request_id,
            body: ChunkMessageBody::PutRequest(request),
        };
        let message_bytes = message
            .encode()
            .map_err(|e| Error::Protocol(format!("Failed to encode PUT request: {e}")))?;

        let addr_hex = hex::encode(address);

        let result = send_and_await_chunk_response(
            node,
            target_peer,
            message_bytes,
            request_id,
            timeout,
            peer_addrs,
            |body| match body {
                ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) => {
                    debug!("Chunk stored at {}", hex::encode(addr));
                    Some(Ok(addr))
                }
                ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists {
                    address: addr,
                }) => {
                    debug!("Chunk already exists at {}", hex::encode(addr));
                    Some(Ok(addr))
                }
                ChunkMessageBody::PutResponse(ChunkPutResponse::PaymentRequired { message }) => {
                    Some(Err(Error::Payment(format!("Payment required: {message}"))))
                }
                ChunkMessageBody::PutResponse(ChunkPutResponse::Error(e)) => {
                    // Preserve the structured remote reason instead of
                    // flattening it into `Error::Protocol`. The node
                    // responded, so the transport round-trip succeeded —
                    // this is an application-level rejection and must not
                    // suppress the store AIMD limiter (V2-468).
                    Some(Err(Error::RemotePut {
                        address: addr_hex.clone(),
                        source: e,
                    }))
                }
                _ => None,
            },
            |e| Error::Network(format!("Failed to send PUT to peer: {e}")),
            || {
                Error::Timeout(format!(
                    "Timeout waiting for store response after {timeout_secs}s"
                ))
            },
        )
        .await;

        result
    }

    /// Retrieve a chunk from the Autonomi network.
    ///
    /// Queries all peers in the close group for the chunk address,
    /// returning the first successful response. This handles the case
    /// where the storing peer differs from the first peer returned by
    /// DHT routing.
    ///
    /// ## Adaptive controller feedback
    ///
    /// Each per-peer GET attempt is fed individually to the adaptive
    /// fetch limiter via `controller().fetch.observe(...)`. This is
    /// deliberately finer-grained than wrapping the outer `chunk_get`
    /// with `observe_op`: when a chunk takes 6 peer tries to land,
    /// 5 of them are real capacity signals (timeouts / network errors)
    /// that should pull the cap down even if the chunk eventually
    /// succeeds. The outer `Ok(_)` would mask all five as a single
    /// `Outcome::Success`. See `adaptive::Outcome` for the per-attempt
    /// classification rules used below.
    ///
    /// Callers should therefore NOT wrap `chunk_get` in `observe_op`.
    ///
    /// # Errors
    ///
    /// Returns an error if the network operation fails.
    pub async fn chunk_get(&self, address: &XorName) -> Result<Option<DataChunk>> {
        self.chunk_get_from_closest_peers(address, self.config().close_group_size)
            .await
    }

    /// Retrieve a chunk from the requested number of closest peers.
    ///
    /// Queries peers in XOR-distance order for the chunk address,
    /// returning the first successful response. This handles the case
    /// where the storing peer differs from the first peer returned by
    /// DHT routing.
    ///
    /// # Errors
    ///
    /// Returns an error if the network operation fails.
    pub async fn chunk_get_from_closest_peers(
        &self,
        address: &XorName,
        peer_count: usize,
    ) -> Result<Option<DataChunk>> {
        // Check cache first, with integrity verification.
        if let Some(cached) = self.chunk_cache().get(address) {
            let computed = compute_address(&cached);
            if computed == *address {
                debug!("Cache hit for chunk {}", hex::encode(address));
                return Ok(Some(DataChunk::new(*address, cached)));
            }
            // Cache entry corrupted — evict and fall through to network fetch.
            debug!(
                "Cache corruption detected for {}: evicting",
                hex::encode(address)
            );
            self.chunk_cache().remove(address);
        }

        let addr_hex = hex::encode(address);

        // First attempt against the current close-group view. A
        // lookup/transport error here (e.g. close_group_peers' DHT walk
        // momentarily returning an error, or InsufficientPeers from a
        // thin routing table) is NOT fatal: fall through to the retry
        // path exactly as a non-authoritative miss would. Otherwise one
        // transient error on the *initial* close-group walk for a single
        // chunk would fail an entire multi-hundred-chunk download. A
        // zeroed outcome (queried=0) is never authoritative, so it flows
        // straight to the retry below.
        let first = match self.chunk_get_try_closest_peers(address, peer_count).await {
            Ok(outcome) => outcome,
            Err(e) => {
                info!("chunk_get first close-group lookup failed for {addr_hex}: {e}; will retry");
                CloseGroupOutcome {
                    chunk: None,
                    queried: 0,
                    not_found: 0,
                    timeout: 0,
                    network_err: 0,
                    protocol_err: 0,
                }
            }
        };
        if let Some(chunk) = first.chunk {
            self.chunk_cache().put(chunk.address, chunk.content.clone());
            return Ok(Some(chunk));
        }

        // Only treat as authoritative absence when *every* queried peer
        // responded NotFound. Anything less leaves the actual storer
        // possibly in the timeout / network-error bucket, which a retry
        // could reach.
        if is_authoritative_not_found(first.not_found, first.queried) {
            info!(
                "chunk_get giving up on {addr_hex} (unanimous NotFound): \
                 queried={} not_found={} timeout={} network_err={} protocol_err={}",
                first.queried,
                first.not_found,
                first.timeout,
                first.network_err,
                first.protocol_err,
            );
            return Ok(None);
        }

        // Otherwise the failure looks like reachability (most peers timed out
        // or hit transport errors). The chunk is most likely still on the
        // network but the current close-group view either (a) caught a
        // transient transport blip or (b) converged on the wrong neighbourhood
        // because the routing table is thin. One retry against a freshly
        // re-walked close group is the cheapest defence against both.
        info!(
            "chunk_get retrying {addr_hex} after reachability failure: \
             queried={} not_found={} timeout={} network_err={} protocol_err={}",
            first.queried, first.not_found, first.timeout, first.network_err, first.protocol_err,
        );

        // Brief settle so any in-flight transport state can quiesce before
        // we re-walk the DHT. Keep this small so we don't add meaningful
        // latency to the genuinely-lost case (we already paid for one full
        // close-group sweep before getting here).
        tokio::time::sleep(Duration::from_secs(1)).await;

        // If the retry's DHT lookup itself fails, treat that as "still
        // couldn't find" rather than escalating the error — matches the
        // semantics of the first attempt when peers are unreachable.
        let retry = match self.chunk_get_try_closest_peers(address, peer_count).await {
            Ok(o) => o,
            Err(e) => {
                info!(
                    "chunk_get retry close-group lookup failed for {addr_hex}: {e}; \
                     first(queried={} not_found={} timeout={} network_err={} protocol_err={})",
                    first.queried,
                    first.not_found,
                    first.timeout,
                    first.network_err,
                    first.protocol_err,
                );
                return Ok(None);
            }
        };
        if let Some(chunk) = retry.chunk {
            info!("chunk_get retry succeeded for {addr_hex}");
            self.chunk_cache().put(chunk.address, chunk.content.clone());
            return Ok(Some(chunk));
        }

        info!(
            "chunk_get exhausted close group after retry for {addr_hex}: \
             first(queried={} not_found={} timeout={} network_err={} protocol_err={}) \
             retry(queried={} not_found={} timeout={} network_err={} protocol_err={})",
            first.queried,
            first.not_found,
            first.timeout,
            first.network_err,
            first.protocol_err,
            retry.queried,
            retry.not_found,
            retry.timeout,
            retry.network_err,
            retry.protocol_err,
        );
        Ok(None)
    }

    /// One sweep of the requested closest peers: fetch the closest peers
    /// for `address` from the DHT and ask each for the chunk in turn,
    /// returning on the first success.
    async fn chunk_get_try_closest_peers(
        &self,
        address: &XorName,
        peer_count: usize,
    ) -> Result<CloseGroupOutcome> {
        let peers = self.closest_peers(address, peer_count).await?;
        let addr_hex = hex::encode(address);
        let queried = peers.len();
        let mut not_found = 0usize;
        let mut timeout = 0usize;
        let mut network_err = 0usize;
        let mut protocol_err = 0usize;

        for (peer, addrs) in &peers {
            match self.chunk_get_from_peer(address, peer, addrs).await {
                Ok(Some(chunk)) => {
                    return Ok(CloseGroupOutcome {
                        chunk: Some(chunk),
                        queried,
                        not_found,
                        timeout,
                        network_err,
                        protocol_err,
                    });
                }
                Ok(None) => {
                    not_found += 1;
                    debug!("Chunk {addr_hex} not found on peer {peer}, trying next");
                }
                Err(Error::Timeout(_)) => {
                    timeout += 1;
                    debug!("Peer {peer} timed out for chunk {addr_hex}, trying next");
                }
                Err(Error::Network(_)) => {
                    network_err += 1;
                    debug!("Peer {peer} unreachable for chunk {addr_hex}, trying next");
                }
                // A `Protocol` error here is the storer responding with
                // `ChunkGetResponse::Error(...)` — e.g. "Chunk verification
                // failed" from a peer that has a corrupted local copy.
                // That's a per-peer problem, not a per-chunk one: the
                // remaining peers might still have a clean copy, so
                // continue the sweep rather than aborting it. Counted
                // separately from network_err so the summary log still
                // distinguishes "peer corrupted" from "peer unreachable".
                Err(Error::Protocol(ref e)) => {
                    protocol_err += 1;
                    debug!(
                        "Peer {peer} returned protocol error for chunk {addr_hex} ({e}), trying next"
                    );
                }
                Err(e) => return Err(e),
            }
        }

        Ok(CloseGroupOutcome {
            chunk: None,
            queried,
            not_found,
            timeout,
            network_err,
            protocol_err,
        })
    }

    /// Retrieve a chunk from every peer in the close group.
    ///
    /// Unlike [`Client::chunk_get`], this method does not return early
    /// after the first successful response. It returns one result per
    /// close-group peer, sorted from closest XOR distance to furthest.
    ///
    /// # Errors
    ///
    /// Returns an error if the close-group lookup fails.
    pub async fn chunk_get_from_close_group(
        &self,
        address: &XorName,
    ) -> Result<Vec<ChunkPeerGetResult>> {
        self.chunk_get_from_closest_peer_group(address, self.config().close_group_size)
            .await
    }

    /// Retrieve a chunk from the requested number of closest peers.
    ///
    /// Unlike [`Client::chunk_get_from_closest_peers`], this method does
    /// not return early after the first successful response. It returns
    /// one result per queried peer, sorted from closest XOR distance to
    /// furthest.
    ///
    /// # Errors
    ///
    /// Returns an error if the DHT lookup fails.
    pub async fn chunk_get_from_closest_peer_group(
        &self,
        address: &XorName,
        peer_count: usize,
    ) -> Result<Vec<ChunkPeerGetResult>> {
        let peers = self.closest_peers(address, peer_count).await?;
        let targets = chunk_peer_get_targets(peers, address);
        let concurrency_limit =
            diagnostic_peer_get_concurrency(peer_count, self.config().close_group_size);
        let per_peer_timeout = Duration::from_secs(self.config().chunk_get_timeout_secs);
        let overall_timeout =
            diagnostic_peer_get_overall_timeout(per_peer_timeout, targets.len(), concurrency_limit);

        let mut completed = vec![false; targets.len()];
        let mut results = Vec::with_capacity(targets.len());
        let mut get_results = stream::iter(targets.iter().cloned())
            .map(|target| async move {
                let chunk_result = self
                    .chunk_get_from_peer(address, &target.peer_id, &target.peer_addrs)
                    .await;

                if let Ok(Some(chunk)) = &chunk_result {
                    self.chunk_cache().put(chunk.address, chunk.content.clone());
                }

                (
                    target.index,
                    ChunkPeerGetResult {
                        peer_id: target.peer_id,
                        peer_addrs: target.peer_addrs,
                        xor_distance: target.xor_distance,
                        chunk_result,
                    },
                )
            })
            .buffer_unordered(concurrency_limit);

        let collect_results = async {
            while let Some((index, result)) = get_results.next().await {
                completed[index] = true;
                results.push(result);
            }
        };

        if tokio::time::timeout(overall_timeout, collect_results)
            .await
            .is_err()
        {
            for target in &targets {
                if !completed[target.index] {
                    results.push(timed_out_chunk_peer_get_result(
                        target,
                        address,
                        overall_timeout,
                    ));
                }
            }
        }

        sort_chunk_peer_get_results(&mut results);
        Ok(results)
    }

    /// Fetch a chunk from a specific peer.
    async fn chunk_get_from_peer(
        &self,
        address: &XorName,
        peer: &PeerId,
        peer_addrs: &[MultiAddr],
    ) -> Result<Option<DataChunk>> {
        let node = self.network().node();
        let request_id = self.next_request_id();
        let request = ChunkGetRequest::new(*address);
        let message = ChunkMessage {
            request_id,
            body: ChunkMessageBody::GetRequest(request),
        };
        let message_bytes = message
            .encode()
            .map_err(|e| Error::Protocol(format!("Failed to encode GET request: {e}")))?;

        let timeout = Duration::from_secs(self.config().chunk_get_timeout_secs);
        let addr_hex = hex::encode(address);
        let timeout_secs = self.config().chunk_get_timeout_secs;

        let result = send_and_await_chunk_response(
            node,
            peer,
            message_bytes,
            request_id,
            timeout,
            peer_addrs,
            |body| match body {
                ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
                    address: addr,
                    content,
                }) => {
                    if addr != *address {
                        return Some(Err(Error::InvalidData(format!(
                            "Mismatched chunk address: expected {addr_hex}, got {}",
                            hex::encode(addr)
                        ))));
                    }

                    let computed = compute_address(&content);
                    if computed != addr {
                        return Some(Err(Error::InvalidData(format!(
                            "Invalid chunk content: expected hash {addr_hex}, got {}",
                            hex::encode(computed)
                        ))));
                    }

                    debug!(
                        "Retrieved chunk {} ({} bytes) from peer {peer}",
                        hex::encode(addr),
                        content.len()
                    );
                    Some(Ok(Some(DataChunk::new(addr, Bytes::from(content)))))
                }
                ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { .. }) => Some(Ok(None)),
                ChunkMessageBody::GetResponse(ChunkGetResponse::Error(e)) => Some(Err(
                    Error::Protocol(format!("Remote GET error for {addr_hex}: {e}")),
                )),
                _ => None,
            },
            |e| Error::Network(format!("Failed to send GET to peer {peer}: {e}")),
            || {
                Error::Timeout(format!(
                    "Timeout waiting for chunk {addr_hex} from {peer} after {timeout_secs}s"
                ))
            },
        )
        .await;

        result
    }

    /// Check if a chunk exists on the network.
    ///
    /// # Errors
    ///
    /// Returns an error if the network operation fails.
    pub async fn chunk_exists(&self, address: &XorName) -> Result<bool> {
        self.chunk_get(address).await.map(|opt| opt.is_some())
    }

    /// Finalize a single-chunk publish after an external signer has paid.
    ///
    /// Single-chunk analogue of [`Client::finalize_upload`]. Takes a
    /// [`PreparedChunk`] (from [`Client::prepare_chunk_payment`]) and a
    /// `quote_hash -> tx_hash` map containing receipts for every non-zero
    /// quote in the chunk's payment. Builds the `PaymentProof` and stores
    /// the chunk on `CLOSE_GROUP_MAJORITY` peers, returning its address.
    ///
    /// Wave-batch payment shape only. Single-chunk publishes don't need
    /// Merkle batching: one chunk's worth of quotes is well below the
    /// wave-batch threshold.
    ///
    /// # Errors
    ///
    /// Returns an error if the proof construction fails (e.g. missing
    /// `tx_hash` for a non-zero quote) or if fewer than
    /// `CLOSE_GROUP_MAJORITY` peers accept the chunk.
    pub async fn finalize_chunk(
        &self,
        prepared: PreparedChunk,
        tx_hash_map: &HashMap<QuoteHash, TxHash>,
    ) -> Result<XorName> {
        let mut paid = finalize_batch_payment(vec![prepared], tx_hash_map)?;
        // finalize_batch_payment returns one PaidChunk per PreparedChunk
        // input; we passed exactly one. If that invariant is ever violated
        // it's an upstream bug — fail loudly rather than silently address-0.
        let chunk = paid.pop().ok_or_else(|| {
            Error::Payment(
                "finalize_batch_payment returned no paid chunks for a single \
                 prepared chunk — internal invariant violated"
                    .into(),
            )
        })?;
        self.chunk_put_to_close_group(chunk.content, chunk.proof_bytes, &chunk.quoted_peers)
            .await
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use ant_protocol::{PROOF_TAG_MERKLE, PROOF_TAG_SINGLE_NODE};

    /// Arbitrary configured Merkle store timeout used by the timeout-selection tests.
    const TEST_MERKLE_TIMEOUT_SECS: u64 = 60;
    /// Sentinel byte used to represent an unknown/unrecognized proof tag.
    const UNKNOWN_PROOF_TAG: u8 = 0xff;
    /// XorName byte width used by test peer IDs and distances.
    const TEST_XORNAME_BYTE_LEN: usize = 32;
    /// Last byte position in the test XOR distance arrays.
    const TEST_DISTANCE_TAIL_INDEX: usize = TEST_XORNAME_BYTE_LEN - 1;

    fn chunk_peer_get_result(peer_seed: u8, distance_tail: u8) -> ChunkPeerGetResult {
        let mut xor_distance = [0; TEST_XORNAME_BYTE_LEN];
        xor_distance[TEST_DISTANCE_TAIL_INDEX] = distance_tail;

        ChunkPeerGetResult {
            peer_id: PeerId::from_bytes([peer_seed; TEST_XORNAME_BYTE_LEN]),
            peer_addrs: Vec::new(),
            xor_distance,
            chunk_result: Ok(None),
        }
    }

    #[test]
    fn authoritative_not_found_requires_unanimous_well_sampled_response() {
        // Unanimous AND well-sampled: every queried peer of a full
        // close group said NotFound. The only safe stop.
        assert!(is_authoritative_not_found(7, 7));
        // Unanimous with exactly a majority-sized sample is also
        // authoritative.
        assert!(is_authoritative_not_found(
            CLOSE_GROUP_MAJORITY,
            CLOSE_GROUP_MAJORITY
        ));

        // Unanimous but UNDER-sampled: a thin DHT walk returning 1 or 3
        // peers, all NotFound, is NOT authoritative — the real replica
        // majority may sit entirely outside that narrow view. Must
        // retry (re-walk the DHT).
        assert!(!is_authoritative_not_found(1, 1));
        assert!(!is_authoritative_not_found(3, 3));
        assert!(!is_authoritative_not_found(
            CLOSE_GROUP_MAJORITY - 1,
            CLOSE_GROUP_MAJORITY - 1
        ));

        // Not unanimous: 4-of-7 / 6-of-7 NotFound leaves storers in the
        // timeout bucket. Must retry.
        assert!(!is_authoritative_not_found(4, 7));
        assert!(!is_authoritative_not_found(6, 7));

        // Pure-reachability failure — must retry.
        assert!(!is_authoritative_not_found(0, 7));

        // Defensive: a zeroed outcome (e.g. the first attempt's
        // close-group lookup errored) is never authoritative.
        assert!(!is_authoritative_not_found(0, 0));
    }

    #[test]
    fn chunk_get_outcome_classifies_each_result_kind() {
        // Success: chunk_get returned a chunk, regardless of how many
        // internal peer attempts it took.
        let chunk = DataChunk::new([0u8; 32], Bytes::from_static(b"x"));
        assert_eq!(
            chunk_get_outcome(&Ok(Some(chunk))),
            Outcome::Success,
            "found-chunk must be Success",
        );

        // Ok(None): chunk_get exhausted the close group across first
        // attempt + retry. This is the load-shedding signal — count it
        // as Timeout so a sustained run of them on a saturated link
        // shrinks the cap.
        assert_eq!(
            chunk_get_outcome(&Ok(None)),
            Outcome::Timeout,
            "Ok(None) must be Timeout — that's the controller's load-shedding signal",
        );

        // Capacity signals from explicit error variants.
        assert_eq!(
            chunk_get_outcome(&Err(Error::Timeout("t".into()))),
            Outcome::Timeout,
        );
        assert_eq!(
            chunk_get_outcome(&Err(Error::Network("n".into()))),
            Outcome::NetworkError,
        );

        // Unexpected error variant (e.g. Protocol) — propagates out of
        // chunk_get to the caller and is not a capacity signal.
        assert_eq!(
            chunk_get_outcome(&Err(Error::Protocol("p".into()))),
            Outcome::ApplicationError,
        );
    }

    #[test]
    fn single_node_proof_uses_store_response_timeout() {
        let timeout =
            store_response_timeout_for_proof(&[PROOF_TAG_SINGLE_NODE], TEST_MERKLE_TIMEOUT_SECS);

        assert_eq!(timeout, STORE_RESPONSE_TIMEOUT);
    }

    #[test]
    fn unknown_proof_uses_store_response_timeout() {
        let timeout =
            store_response_timeout_for_proof(&[UNKNOWN_PROOF_TAG], TEST_MERKLE_TIMEOUT_SECS);

        assert_eq!(timeout, STORE_RESPONSE_TIMEOUT);
    }

    #[test]
    fn merkle_proof_uses_configured_store_timeout() {
        let timeout =
            store_response_timeout_for_proof(&[PROOF_TAG_MERKLE], TEST_MERKLE_TIMEOUT_SECS);

        assert_eq!(timeout, Duration::from_secs(TEST_MERKLE_TIMEOUT_SECS));
    }

    #[test]
    fn chunk_peer_get_results_sort_by_xor_distance() {
        let mut results = vec![
            chunk_peer_get_result(3, 3),
            chunk_peer_get_result(1, 1),
            chunk_peer_get_result(2, 2),
        ];

        sort_chunk_peer_get_results(&mut results);

        let ordered_distances = results
            .iter()
            .map(|result| result.xor_distance[TEST_DISTANCE_TAIL_INDEX])
            .collect::<Vec<_>>();
        assert_eq!(ordered_distances, vec![1, 2, 3]);
    }

    #[test]
    fn diagnostic_peer_get_overall_timeout_allows_one_wave_plus_padding() {
        const PER_PEER_TIMEOUT_SECS: u64 = 10;
        const EXPECTED_WAVES_WITH_PADDING: u64 = 2;
        const TARGET_COUNT: usize = 7;
        const CONCURRENCY_LIMIT: usize = 7;

        let timeout = diagnostic_peer_get_overall_timeout(
            Duration::from_secs(PER_PEER_TIMEOUT_SECS),
            TARGET_COUNT,
            CONCURRENCY_LIMIT,
        );

        assert_eq!(
            timeout,
            Duration::from_secs(PER_PEER_TIMEOUT_SECS * EXPECTED_WAVES_WITH_PADDING)
        );
    }

    #[test]
    fn diagnostic_peer_get_overall_timeout_scales_with_peer_count() {
        const PER_PEER_TIMEOUT_SECS: u64 = 10;
        const TARGET_COUNT: usize = 20;
        const CLOSE_GROUP_SIZE: usize = 7;
        const EXPECTED_WAVES_WITH_PADDING: u64 = 4;

        let concurrency_limit = diagnostic_peer_get_concurrency(TARGET_COUNT, CLOSE_GROUP_SIZE);
        let timeout = diagnostic_peer_get_overall_timeout(
            Duration::from_secs(PER_PEER_TIMEOUT_SECS),
            TARGET_COUNT,
            concurrency_limit,
        );

        assert_eq!(
            timeout,
            Duration::from_secs(PER_PEER_TIMEOUT_SECS * EXPECTED_WAVES_WITH_PADDING)
        );
    }

    /// Regression: the default `merkle_store_timeout_secs` must be at
    /// least the storer-side `CLOSENESS_LOOKUP_TIMEOUT` (240 s) plus
    /// padding. If either side moves and this invariant breaks, the
    /// client will give up on chunks the storer is still verifying.
    /// See `DEFAULT_MERKLE_STORE_TIMEOUT_SECS` doc comment for the
    /// derivation.
    #[test]
    fn default_merkle_store_timeout_satisfies_storer_invariant() {
        use crate::data::client::ClientConfig;
        const STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS: u64 = 240;
        const MIN_PADDING_SECS: u64 = 30;
        let config = ClientConfig::default();
        assert!(
            config.merkle_store_timeout_secs
                >= STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS + MIN_PADDING_SECS,
            "merkle_store_timeout_secs ({}) must be >= storer CLOSENESS_LOOKUP_TIMEOUT ({}) + padding ({})",
            config.merkle_store_timeout_secs,
            STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS,
            MIN_PADDING_SECS,
        );
    }

    /// Regression: the non-merkle PUT path uses the hardcoded
    /// `STORE_RESPONSE_TIMEOUT` constant, not the per-config
    /// `merkle_store_timeout_secs`. If a future refactor accidentally
    /// routes non-merkle PUTs through the merkle field they'd inherit
    /// the 270 s value and silently regress non-merkle latency.
    /// `store_response_timeout_for_proof` with a non-merkle proof tag
    /// must return the const regardless of what merkle timeout is
    /// passed.
    #[test]
    fn non_merkle_put_ignores_merkle_timeout_value() {
        let absurd_merkle_timeout = 9_999;
        for tag in [PROOF_TAG_SINGLE_NODE, UNKNOWN_PROOF_TAG] {
            let timeout = store_response_timeout_for_proof(&[tag], absurd_merkle_timeout);
            assert_eq!(
                timeout, STORE_RESPONSE_TIMEOUT,
                "non-merkle proof tag {tag:#x} should ignore merkle timeout {absurd_merkle_timeout}",
            );
        }
    }
}