Skip to main content

ant_core/data/client/
mod.rs

1//! Client operations for the Autonomi network.
2//!
3//! Provides high-level APIs for storing and retrieving data
4//! on the Autonomi decentralized network.
5
6pub mod adaptive;
7pub mod batch;
8pub mod cache;
9pub(crate) mod cached_merkle;
10pub(crate) mod cached_single;
11pub mod chunk;
12pub mod data;
13pub mod file;
14pub mod merkle;
15pub mod payment;
16pub mod quote;
17
18use crate::data::client::adaptive::{AdaptiveConfig, AdaptiveController, ChannelStart, Outcome};
19use crate::data::client::cache::ChunkCache;
20use crate::data::error::{Error, Result};
21use crate::data::network::Network;
22use crate::data::peer_cache;
23use ant_protocol::evm::Wallet;
24use ant_protocol::transport::{MultiAddr, P2PNode, PeerId};
25use ant_protocol::{XorName, CLOSE_GROUP_SIZE};
26use std::path::PathBuf;
27use std::sync::atomic::{AtomicU64, Ordering};
28use std::sync::Arc;
29use tracing::debug;
30
31/// Width of the chunk PUT-target set (initial writes plus fallback): the
32/// closest `PUT_TARGET_WIDTH` peers to the address.
33///
34/// Mirrors the node-side `K_BUCKET_SIZE` / `PAID_QUOTE_ISSUER_CLOSENESS_WIDTH`
35/// (20): a node accepts a reused payment proof only when one of the proof's
36/// closest-`CLOSE_GROUP_SIZE` quote issuers is within its own local 20-closest,
37/// so trying peers past this width is pointless.
38pub(crate) const PUT_TARGET_WIDTH: usize = 20;
39
40/// Classify a `data::error::Error` into a controller `Outcome`.
41///
42/// Capacity signals (Timeout / NetworkError) drive the controller
43/// down; application errors do not. The mapping is conservative:
44/// anything that COULD be transport-related is treated as a network
45/// signal, because under-classifying a real network failure as
46/// "application error" makes the controller blind to genuine stress.
47///
48/// Mapping policy:
49/// - `Timeout` -> `Timeout` (per-op deadline elapsed)
50/// - `Network`, `InsufficientPeers`, `Io` -> `NetworkError` (transport
51///   layer reported failure)
52/// - `Protocol`, `Storage` -> `NetworkError` (these wrap remote errors
53///   that frequently include peer disconnects mid-stream — under
54///   network stress these are how transport failures surface)
55/// - `PartialUpload` -> `NetworkError` (literal capacity signal: some
56///   chunks could not be stored)
57/// - `AlreadyStored`, `Encryption`, `Crypto`, `Payment`,
58///   `Serialization`, `InvalidData`, `SignatureVerification`,
59///   `Config`, `InsufficientDiskSpace`, `CostEstimationInconclusive`,
60///   `Cancelled` -> `ApplicationError` (would happen on a perfectly
61///   healthy link; `Cancelled` is caller-initiated and must not be retried
62///   as a transport failure)
63/// - `RemotePut` -> `ApplicationError` (the remote node responded with a
64///   structured rejection — the transport succeeded, so the node declined
65///   at the application layer; not a local capacity signal)
66/// - `CloseGroupShortfall` -> `ApplicationError` (a quorum shortfall caused
67///   by close-group dial/relay churn with no PUT-response timeouts — remote
68///   peer churn, not local backpressure; a timeout-bearing shortfall keeps
69///   `InsufficientPeers`/`NetworkError` instead, so genuine congestion still
70///   cuts the cap — V2-554)
71pub(crate) fn classify_error(err: &Error) -> Outcome {
72    match err {
73        Error::Timeout(_) => Outcome::Timeout,
74        Error::Network(_)
75        | Error::InsufficientPeers(_)
76        | Error::Io(_)
77        | Error::Protocol(_)
78        | Error::Storage(_)
79        | Error::PartialUpload { .. } => Outcome::NetworkError,
80        Error::AlreadyStored
81        | Error::Encryption(_)
82        | Error::Crypto(_)
83        | Error::Payment(_)
84        | Error::Serialization(_)
85        | Error::InvalidData(_)
86        | Error::SignatureVerification(_)
87        | Error::Config(_)
88        | Error::InsufficientDiskSpace(_)
89        | Error::CostEstimationInconclusive(_)
90        | Error::Cancelled(_)
91        | Error::BadQuoteBinding { .. }
92        // A remote node responded with a structured rejection — the
93        // transport round-trip succeeded, so the node declined at the
94        // application layer (payment/disk/quote/pool). Not a local
95        // capacity signal; recorded but must not push the limiter down.
96        | Error::RemotePut { .. }
97        // A close-group PUT shortfall caused purely by dial/relay churn
98        // (dead/stale relayed peer addresses), with no PUT-response
99        // timeouts to signal local backpressure. Remote peer churn, not
100        // "client sending too fast" — must not push the limiter down
101        // (V2-554). A shortfall that DID time out keeps `InsufficientPeers`
102        // (`NetworkError`) so real congestion still cuts the cap.
103        | Error::CloseGroupShortfall(_) => Outcome::ApplicationError,
104    }
105}
106
107/// Compute XOR distance between a peer's ID bytes and a target address.
108///
109/// Uses the first 32 bytes of the peer ID (or fewer if shorter) XORed
110/// with the target address. The returned byte array sorts
111/// lexicographically from closest to furthest.
112pub(crate) fn peer_xor_distance(peer_id: &PeerId, target: &[u8; 32]) -> [u8; 32] {
113    let peer_bytes = peer_id.as_bytes();
114    let mut distance = [0u8; 32];
115    for (i, d) in distance.iter_mut().enumerate() {
116        let peer_byte = peer_bytes.get(i).copied().unwrap_or(0);
117        *d = peer_byte ^ target[i];
118    }
119    distance
120}
121
122/// Default timeout for lightweight network operations (quotes, DHT lookups) in seconds.
123const DEFAULT_QUOTE_TIMEOUT_SECS: u64 = 10;
124
125/// Default timeout for the per-peer chunk GET response and any other
126/// caller that explicitly reads `store_timeout_secs`, in seconds.
127///
128/// Note despite the name: this knob does **not** govern the non-merkle
129/// chunk PUT response timeout — that path uses the
130/// `STORE_RESPONSE_TIMEOUT` constant in `chunk.rs` directly. Nor does
131/// it govern the merkle batch PUT timeout — see
132/// `DEFAULT_MERKLE_STORE_TIMEOUT_SECS`.
133///
134/// 10 s matches the pre-existing `main` default and intentionally
135/// excludes residential-upload tuning, which is Mick's PR #78
136/// territory (splitting GET into its own field).
137const DEFAULT_STORE_TIMEOUT_SECS: u64 = 10;
138
139/// Default timeout for **merkle batch** chunk store operations in seconds.
140///
141/// Separate from `DEFAULT_STORE_TIMEOUT_SECS` because merkle PUTs carry
142/// an extra storer-side cost: the payment verifier runs an iterative
143/// DHT lookup (`CLOSENESS_LOOKUP_TIMEOUT` in `ant-node`, **240 s**
144/// post-PR #89) before accepting the proof.
145///
146/// This timeout MUST be >= the storer-side `CLOSENESS_LOOKUP_TIMEOUT`
147/// plus padding for the store-response round-trip and storer-local
148/// I/O. Otherwise the client gives up while the storer is still
149/// happily verifying, the storer wastes CPU/bandwidth on a chunk the
150/// client has already discarded, and the client re-targets a
151/// different close-K member — potentially double-storing the same
152/// chunk and polluting routing.
153///
154/// 270 s = 240 s (storer lookup) + 30 s padding (network RTT + LMDB
155/// put + fsync + clock skew tolerance).
156///
157/// This invariant must be re-validated if either side's timeout
158/// changes. Empirically surfaced as "every cross-region merkle chunk
159/// times out at 10 s" on a 210-node 7-region testnet run on
160/// 2026-05-12; bumping to 270 s flipped that 0/22 -> 9/9 pass rate.
161const DEFAULT_MERKLE_STORE_TIMEOUT_SECS: u64 = 270;
162
163/// Default timeout for chunk GET response operations in seconds.
164const DEFAULT_CHUNK_GET_TIMEOUT_SECS: u64 = 10;
165
166/// Default quote concurrency: high because quoting is pure network I/O
167/// (DHT lookups + small request/response messages) with no CPU-bound work.
168const DEFAULT_QUOTE_CONCURRENCY: usize = 32;
169
170/// Default store concurrency: moderate because each chunk PUT sends ~4MB
171/// to 7 close-group peers. At 8 concurrent stores, ~225MB of outbound
172/// traffic can be in flight. Users on fast connections can increase this
173/// with --store-concurrency; users on slow connections can decrease it.
174const DEFAULT_STORE_CONCURRENCY: usize = 8;
175
176/// Configuration for the Autonomi client.
177#[derive(Debug, Clone)]
178pub struct ClientConfig {
179    /// Per-op timeout for lightweight network operations (quotes,
180    /// DHT lookups), in seconds. The adaptive controller does NOT
181    /// currently size timeouts; this remains a static knob.
182    pub quote_timeout_secs: u64,
183    /// Per-op timeout, in seconds, for the chunk GET response path
184    /// (`chunk_get_from_peer`) and any other caller that reads this
185    /// field directly.
186    ///
187    /// Note despite the historical name `store_timeout_secs`: this
188    /// knob does **not** govern the non-merkle chunk PUT response
189    /// timeout (that path uses the `STORE_RESPONSE_TIMEOUT` constant
190    /// in `chunk.rs`) and does **not** govern the merkle batch PUT
191    /// timeout (see `merkle_store_timeout_secs`). Rename pending in
192    /// Mick's PR #78 which adds a dedicated `chunk_get_timeout_secs`.
193    ///
194    /// The adaptive controller does NOT currently size timeouts;
195    /// this remains a static knob.
196    pub store_timeout_secs: u64,
197    /// Per-op timeout for **merkle batch** chunk store (PUT)
198    /// operations, in seconds. Separate from `store_timeout_secs`
199    /// because merkle PUTs incur the storer-side
200    /// `CLOSENESS_LOOKUP_TIMEOUT` (240 s post-PR #89) on top of the
201    /// usual store path; the client must wait at least that long
202    /// plus padding, or the storer wastes work on a chunk the client
203    /// has already given up on. Default 270 s.
204    pub merkle_store_timeout_secs: u64,
205    /// Per-peer response timeout for chunk GET operations, in seconds.
206    /// This is intentionally independent from `store_timeout_secs`: PUTs
207    /// and GETs have different payload direction and performance profiles.
208    pub chunk_get_timeout_secs: u64,
209    /// Number of closest peers to consider for routing.
210    pub close_group_size: usize,
211    /// **Deprecated.** Pre-adaptive ceiling for quote concurrency.
212    ///
213    /// The adaptive controller now sizes quote fan-out from observed
214    /// signals. This field, when non-zero and smaller than the
215    /// controller's per-channel default, clamps the **quote channel
216    /// only** (it does NOT bleed into store or fetch). Removed in a
217    /// future release.
218    pub quote_concurrency: usize,
219    /// **Deprecated.** Pre-adaptive ceiling for store concurrency.
220    ///
221    /// The adaptive controller now sizes store fan-out from observed
222    /// signals. This field, when non-zero and smaller than the
223    /// controller's per-channel default, clamps the **store channel
224    /// only** (it does NOT bleed into quote or fetch). Removed in a
225    /// future release.
226    pub store_concurrency: usize,
227    /// Adaptive controller configuration. Defaults are tuned to match
228    /// or exceed the prior static behavior — disabling adaptation
229    /// (`adaptive.enabled = false`) reverts to the controller's
230    /// `initial` values without re-evaluation.
231    pub adaptive: AdaptiveConfig,
232    /// Allow loopback (`127.0.0.1`) connections in the saorsa-transport
233    /// layer. Set to `true` only for devnet / local testing. Production
234    /// peers on the public Autonomi network reject the QUIC handshake
235    /// variant produced when this is `true`, so the default is `false`.
236    ///
237    /// This mirrors the `--allow-loopback` flag in `ant-cli`, which already
238    /// defaults to `false` and threads through to the same
239    /// `CoreNodeConfig::builder().local(...)` call.
240    pub allow_loopback: bool,
241    /// Bind a dual-stack IPv6 socket (`true`) or an IPv4-only socket
242    /// (`false`). Defaults to `true`, matching the CLI default.
243    ///
244    /// Set to `false` only when running on hosts without a working IPv6
245    /// stack, to avoid advertising unreachable v6 addresses to the DHT
246    /// (which causes slow connects and junk DHT address records). This
247    /// mirrors the `--ipv4-only` flag in `ant-cli`.
248    pub ipv6: bool,
249}
250
251impl Default for ClientConfig {
252    fn default() -> Self {
253        Self {
254            quote_timeout_secs: DEFAULT_QUOTE_TIMEOUT_SECS,
255            store_timeout_secs: DEFAULT_STORE_TIMEOUT_SECS,
256            merkle_store_timeout_secs: DEFAULT_MERKLE_STORE_TIMEOUT_SECS,
257            chunk_get_timeout_secs: DEFAULT_CHUNK_GET_TIMEOUT_SECS,
258            close_group_size: CLOSE_GROUP_SIZE,
259            quote_concurrency: DEFAULT_QUOTE_CONCURRENCY,
260            store_concurrency: DEFAULT_STORE_CONCURRENCY,
261            adaptive: AdaptiveConfig::default(),
262            allow_loopback: false,
263            ipv6: true,
264        }
265    }
266}
267
268/// Build the adaptive controller for a `Client`. Loads any persisted
269/// snapshot, clamps cold-start values into the deprecated-flag bounds
270/// **per channel** (so a pin on `--store-concurrency` does NOT bleed
271/// into the fetch / quote channels), and returns the persistence path
272/// so callers can save back at shutdown.
273fn build_controller(config: &ClientConfig) -> (AdaptiveController, Option<PathBuf>) {
274    let mut adaptive_cfg = config.adaptive.clone();
275
276    // Per-channel ceilings: each legacy field is interpreted as a cap
277    // for ONLY its matching channel. The fetch channel has no
278    // pre-existing legacy field; it always uses the controller's
279    // default ceiling.
280    //
281    // The legacy fields are non-zero by ClientConfig::default(), but
282    // we honor them as bounds only when they would actually CONSTRAIN
283    // the controller — i.e. when smaller than the per-channel default
284    // max. A default ClientConfig must not silently lower the
285    // controller's ceilings.
286    // A value equal to the historic legacy default is treated as
287    // "not pinned by the user" — without this, every default
288    // ClientConfig would silently lower the controller's per-channel
289    // ceilings to the prior static values (32/8) and the controller
290    // could never grow above them.
291    let user_quote_max = config.quote_concurrency;
292    let user_store_max = config.store_concurrency;
293    let quote_pinned = user_quote_max > 0 && user_quote_max != DEFAULT_QUOTE_CONCURRENCY;
294    let store_pinned = user_store_max > 0 && user_store_max != DEFAULT_STORE_CONCURRENCY;
295    if quote_pinned && user_quote_max < adaptive_cfg.max.quote {
296        adaptive_cfg.max.quote = user_quote_max;
297    }
298    if store_pinned && user_store_max < adaptive_cfg.max.store {
299        adaptive_cfg.max.store = user_store_max;
300    }
301
302    // Cold-start values: matched to the prior static defaults. If the
303    // legacy field caps the channel below the cold-start, lower the
304    // start to match — never start above the channel's max.
305    let mut start = ChannelStart::default();
306    start.quote = start.quote.min(adaptive_cfg.max.quote);
307    start.store = start.store.min(adaptive_cfg.max.store);
308    start.fetch = start.fetch.min(adaptive_cfg.max.fetch);
309
310    let adaptive_enabled = adaptive_cfg.enabled;
311    let controller = AdaptiveController::new(start, adaptive_cfg);
312    // Skip disk warm-start entirely when adaptation is disabled —
313    // fixed-concurrency mode means the user wants exactly the cold
314    // start, no surprises from prior runs. (warm_start is also a
315    // no-op when disabled, but skipping the load avoids file I/O
316    // and the path-resolution side effects.)
317    let persist_path = if adaptive_enabled {
318        let p = adaptive::default_persist_path();
319        if let Some(ref path) = p {
320            if let Some(snap) = adaptive::load_snapshot(path) {
321                debug!(path = %path.display(), "adaptive: warm-start from disk");
322                controller.warm_start(snap);
323            }
324        }
325        p
326    } else {
327        // Even with adaptation off, persist_path is computed so
328        // explicit save_adaptive_snapshot() calls still work — but
329        // the controller currently never moves, so saving the cold
330        // start is harmless.
331        adaptive::default_persist_path()
332    };
333
334    // File downloads choose a stream-decrypt batch size per download
335    // from the current fetch cap and usable RAM, then pass it into
336    // self_encryption's runtime batch-size API. The adaptive controller
337    // still drives fan-out inside each batch by re-reading
338    // `controller.fetch.current()` in the decrypt callback.
339
340    (controller, persist_path)
341}
342
343/// Client for the Autonomi decentralized network.
344///
345/// Provides high-level APIs for storing and retrieving chunks
346/// and files on the network.
347pub struct Client {
348    config: ClientConfig,
349    network: Network,
350    wallet: Option<Arc<Wallet>>,
351    evm_network: Option<ant_protocol::evm::Network>,
352    chunk_cache: ChunkCache,
353    next_request_id: AtomicU64,
354    /// Adaptive concurrency controller: replaces the static
355    /// quote/store concurrency knobs. See `adaptive` module.
356    controller: AdaptiveController,
357    /// Path the controller persists its snapshot to. `None` disables
358    /// persistence (useful for tests / non-disk environments).
359    persist_path: Option<PathBuf>,
360    /// Path for the persistent client peer cache. `None` disables the cache.
361    peer_cache_path: Option<PathBuf>,
362}
363
364impl Client {
365    /// Create a client connected to the given P2P node.
366    #[must_use]
367    pub fn from_node(node: Arc<P2PNode>, config: ClientConfig) -> Self {
368        Self::from_node_with_peer_cache(node, config, None)
369    }
370
371    /// Create a client connected to the given P2P node and attach an optional
372    /// persistent peer cache path.
373    #[must_use]
374    pub fn from_node_with_peer_cache(
375        node: Arc<P2PNode>,
376        config: ClientConfig,
377        peer_cache_path: Option<PathBuf>,
378    ) -> Self {
379        let network = Network::from_node(node);
380        let (controller, persist_path) = build_controller(&config);
381        Self {
382            config,
383            network,
384            wallet: None,
385            evm_network: None,
386            chunk_cache: ChunkCache::default(),
387            next_request_id: AtomicU64::new(1),
388            controller,
389            persist_path,
390            peer_cache_path,
391        }
392    }
393
394    /// Create a client connected to bootstrap peers.
395    ///
396    /// Threads `config.allow_loopback` and `config.ipv6` through to
397    /// `Network::new`, which controls the saorsa-transport `local` and
398    /// `ipv6` flags on the underlying `CoreNodeConfig`. See
399    /// `ClientConfig::allow_loopback` and `ClientConfig::ipv6` for details.
400    ///
401    /// # Errors
402    ///
403    /// Returns an error if the P2P node cannot be created or bootstrapping fails.
404    pub async fn connect(
405        bootstrap_peers: &[std::net::SocketAddr],
406        config: ClientConfig,
407    ) -> Result<Self> {
408        debug!(
409            "Connecting to Autonomi network with {} bootstrap peers (allow_loopback={}, ipv6={})",
410            bootstrap_peers.len(),
411            config.allow_loopback,
412            config.ipv6,
413        );
414        let network = Network::new(bootstrap_peers, config.allow_loopback, config.ipv6).await?;
415        let (controller, persist_path) = build_controller(&config);
416        Ok(Self {
417            config,
418            network,
419            wallet: None,
420            evm_network: None,
421            chunk_cache: ChunkCache::default(),
422            next_request_id: AtomicU64::new(1),
423            controller,
424            persist_path,
425            peer_cache_path: None,
426        })
427    }
428
429    /// Set the wallet for payment operations.
430    ///
431    /// Also populates the EVM network from the wallet so that
432    /// token approvals work without a separate `with_evm_network` call.
433    #[must_use]
434    pub fn with_wallet(mut self, wallet: Wallet) -> Self {
435        self.evm_network = Some(wallet.network().clone());
436        self.wallet = Some(Arc::new(wallet));
437        self
438    }
439
440    /// Set the EVM network without requiring a wallet.
441    ///
442    /// This enables token approval and contract interactions
443    /// for external-signer flows where the private key lives outside Rust.
444    #[must_use]
445    pub fn with_evm_network(mut self, network: ant_protocol::evm::Network) -> Self {
446        self.evm_network = Some(network);
447        self
448    }
449
450    /// Get the EVM network, falling back to the wallet's network if available.
451    ///
452    /// # Errors
453    ///
454    /// Returns an error if neither `with_evm_network` nor `with_wallet` was called.
455    pub(crate) fn require_evm_network(&self) -> Result<&ant_protocol::evm::Network> {
456        if let Some(ref net) = self.evm_network {
457            return Ok(net);
458        }
459        if let Some(ref wallet) = self.wallet {
460            return Ok(wallet.network());
461        }
462        Err(Error::Payment(
463            "EVM network not configured — call with_evm_network() or with_wallet() first"
464                .to_string(),
465        ))
466    }
467
468    /// Get the client configuration.
469    #[must_use]
470    pub fn config(&self) -> &ClientConfig {
471        &self.config
472    }
473
474    /// Get a mutable reference to the client configuration.
475    pub fn config_mut(&mut self) -> &mut ClientConfig {
476        &mut self.config
477    }
478
479    /// Get a reference to the network layer.
480    #[must_use]
481    pub fn network(&self) -> &Network {
482        &self.network
483    }
484
485    /// Get the wallet, if configured.
486    #[must_use]
487    pub fn wallet(&self) -> Option<&Arc<Wallet>> {
488        self.wallet.as_ref()
489    }
490
491    /// Get a reference to the chunk cache.
492    #[must_use]
493    pub fn chunk_cache(&self) -> &ChunkCache {
494        &self.chunk_cache
495    }
496
497    /// Adaptive concurrency controller. Hot loops read
498    /// `controller().<channel>.current()` to size their fan-out and
499    /// call `.observe(...)` on each completion.
500    #[must_use]
501    pub fn controller(&self) -> &AdaptiveController {
502        &self.controller
503    }
504
505    /// Persist the current adaptive snapshot to disk so the next
506    /// `Client::connect` warm-starts at the learned values instead of
507    /// cold defaults. Best effort — failures log and are discarded.
508    /// Idempotent. Safe to call from a Drop impl or an explicit
509    /// shutdown hook.
510    pub fn save_adaptive_snapshot(&self) {
511        if let Some(ref path) = self.persist_path {
512            adaptive::save_snapshot(path, self.controller.snapshot());
513        }
514    }
515
516    /// Persist currently connected peers that have Direct-tagged addresses in
517    /// the DHT. Best effort; failures are logged and do not affect the client
518    /// operation that just completed.
519    pub async fn save_peer_cache(&self) {
520        if let Some(ref path) = self.peer_cache_path {
521            let node = self.network().node();
522            peer_cache::promote_connected_direct_peers(node.as_ref(), path, node.dht().k_value())
523                .await;
524        }
525    }
526
527    /// Get the next request ID for protocol messages.
528    pub(crate) fn next_request_id(&self) -> u64 {
529        self.next_request_id.fetch_add(1, Ordering::Relaxed)
530    }
531
532    /// Return the chunk PUT-target set: the closest [`PUT_TARGET_WIDTH`] peers
533    /// to the address, each paired with its known network addresses.
534    ///
535    /// Used by the merkle store path, which — unlike single-node payment — has
536    /// no witnessed put-target list to forward, so it fetches the closest-K
537    /// neighbourhood locally.
538    pub(crate) async fn put_target_peers(
539        &self,
540        target: &XorName,
541    ) -> Result<Vec<(PeerId, Vec<MultiAddr>)>> {
542        self.closest_peers(target, PUT_TARGET_WIDTH).await
543    }
544
545    /// Return the requested number of closest peers for a target address.
546    ///
547    /// Queries the DHT for peers by XOR distance. Returns each peer
548    /// paired with its known network addresses.
549    pub(crate) async fn closest_peers(
550        &self,
551        target: &XorName,
552        count: usize,
553    ) -> Result<Vec<(PeerId, Vec<MultiAddr>)>> {
554        let peers = self.network().find_closest_peers(target, count).await?;
555
556        if peers.is_empty() {
557            return Err(Error::InsufficientPeers(
558                "DHT returned no peers for target address".to_string(),
559            ));
560        }
561        Ok(peers)
562    }
563}
564
565/// Persist the adaptive snapshot when the `Client` is dropped, so any
566/// caller — CLI, daemon, library user, integration test — gets
567/// warm-start carry-over for free without remembering to call
568/// `save_adaptive_snapshot()` explicitly. Best effort, sync `std::fs`,
569/// no panic risk on a poisoned mutex (the inner helper handles it).
570///
571/// We deliberately write SYNCHRONOUSLY (not via `spawn_blocking`)
572/// because Drop runs during process shutdown / runtime teardown,
573/// when fire-and-forget background tasks can be dropped before they
574/// complete and the snapshot is silently lost. A small synchronous
575/// stall on a tokio worker (typically <1ms for a local-disk JSON
576/// write of ~50 bytes) is the right tradeoff for guaranteed
577/// persistence — BOUNDED by `DROP_SAVE_TIMEOUT` so a stalled
578/// network-mounted data dir cannot block process shutdown.
579const DROP_SAVE_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(500);
580
581impl Drop for Client {
582    fn drop(&mut self) {
583        let Some(path) = self.persist_path.clone() else {
584            return;
585        };
586        let snap = self.controller.snapshot();
587        adaptive::save_snapshot_with_timeout(path, snap, DROP_SAVE_TIMEOUT);
588    }
589}
590
591#[cfg(test)]
592#[allow(clippy::unwrap_used)]
593mod tests {
594    use super::*;
595
596    /// Cover EVERY variant of `data::error::Error`. Build an instance of
597    /// each, classify it, and assert the resulting `Outcome` matches the
598    /// only sensible mapping. If a future commit adds a new error variant
599    /// without updating `classify_error`, this test fails to ensure the
600    /// adaptive controller always sees correct capacity signals.
601    ///
602    /// Mapping policy (mirrors `classify_error` doc):
603    /// - `Timeout` -> `Outcome::Timeout`
604    /// - `Network`, `InsufficientPeers`, `Io`, `Protocol`, `Storage`,
605    ///   `PartialUpload` -> `Outcome::NetworkError` (transport-related
606    ///   or literal capacity failure)
607    /// - everything else -> `Outcome::ApplicationError` (would happen
608    ///   on a perfectly healthy network)
609    #[test]
610    fn classify_error_covers_all_variants() {
611        let cases: Vec<(Error, Outcome)> = vec![
612            (Error::Timeout("t".to_string()), Outcome::Timeout),
613            (Error::Network("n".to_string()), Outcome::NetworkError),
614            (
615                Error::InsufficientPeers("p".to_string()),
616                Outcome::NetworkError,
617            ),
618            (Error::Storage("s".to_string()), Outcome::NetworkError),
619            (Error::Payment("p".to_string()), Outcome::ApplicationError),
620            (Error::Protocol("p".to_string()), Outcome::NetworkError),
621            (
622                Error::InvalidData("d".to_string()),
623                Outcome::ApplicationError,
624            ),
625            (
626                Error::Serialization("s".to_string()),
627                Outcome::ApplicationError,
628            ),
629            (Error::Crypto("c".to_string()), Outcome::ApplicationError),
630            (
631                Error::Io(std::io::Error::other("io")),
632                Outcome::NetworkError,
633            ),
634            (Error::Config("c".to_string()), Outcome::ApplicationError),
635            (
636                Error::SignatureVerification("s".to_string()),
637                Outcome::ApplicationError,
638            ),
639            (
640                Error::Encryption("e".to_string()),
641                Outcome::ApplicationError,
642            ),
643            (Error::AlreadyStored, Outcome::ApplicationError),
644            (
645                Error::InsufficientDiskSpace("d".to_string()),
646                Outcome::ApplicationError,
647            ),
648            (
649                Error::CostEstimationInconclusive("c".to_string()),
650                Outcome::ApplicationError,
651            ),
652            (
653                Error::PartialUpload {
654                    stored: vec![],
655                    stored_count: 0,
656                    failed: vec![],
657                    failed_count: 0,
658                    total_chunks: 0,
659                    spend: Box::new(crate::data::error::PartialUploadSpend {
660                        storage_cost_atto: "0".to_string(),
661                        gas_cost_wei: 0,
662                    }),
663                    reason: "r".to_string(),
664                },
665                Outcome::NetworkError,
666            ),
667            (
668                Error::BadQuoteBinding {
669                    peer_id: "peer".to_string(),
670                    detail: "mismatch".to_string(),
671                },
672                Outcome::ApplicationError,
673            ),
674            // A remote application rejection: the node responded with a
675            // structured `ProtocolError`, so the transport succeeded and
676            // this must NOT register as a capacity signal (V2-468).
677            (
678                Error::RemotePut {
679                    address: "abcd".to_string(),
680                    source: ant_protocol::ProtocolError::PaymentFailed("stale quote".to_string()),
681                },
682                Outcome::ApplicationError,
683            ),
684            // A close-group quorum shortfall caused by dial/relay churn with
685            // no PUT-response timeouts — remote peer churn, not local
686            // backpressure, so it must NOT register as a capacity signal
687            // (V2-554). A timeout-bearing shortfall keeps `InsufficientPeers`.
688            (
689                Error::CloseGroupShortfall("Stored on 3 peers, need 4".to_string()),
690                Outcome::ApplicationError,
691            ),
692        ];
693        for (err, expected) in &cases {
694            let got = classify_error(err);
695            assert_eq!(
696                got, *expected,
697                "classify_error({err:?}) = {got:?}, expected {expected:?}",
698            );
699        }
700    }
701
702    /// C4 fix guard: pinning the legacy `quote_concurrency` /
703    /// `store_concurrency` ClientConfig fields must clamp ONLY the
704    /// matching channel's max in the resulting controller. The fetch
705    /// (download) channel must keep its full default ceiling.
706    #[test]
707    fn legacy_concurrency_pin_does_not_bleed_across_channels() {
708        let cfg = ClientConfig {
709            quote_concurrency: 4,
710            store_concurrency: 2,
711            ..ClientConfig::default()
712        };
713        let (controller, _) = build_controller(&cfg);
714        // The store/quote caps must be clamped to the user's pin.
715        assert_eq!(controller.config.max.quote, 4, "quote pin not respected");
716        assert_eq!(controller.config.max.store, 2, "store pin not respected");
717        // The fetch cap must NOT have been lowered — that's the
718        // regression C4 was about.
719        let default_fetch_max = adaptive::ChannelMax::default().fetch;
720        assert_eq!(
721            controller.config.max.fetch, default_fetch_max,
722            "fetch cap was lowered by store/quote pin (C4 regression)"
723        );
724        // Cold-start values must respect the lowered ceilings.
725        assert!(
726            controller.quote.current() <= 4,
727            "quote start exceeds its cap"
728        );
729        assert!(
730            controller.store.current() <= 2,
731            "store start exceeds its cap"
732        );
733    }
734
735    /// Default ClientConfig must NOT silently lower the controller's
736    /// per-channel ceilings — the adaptive defaults give every channel
737    /// real headroom to grow. This guards against future commits
738    /// re-introducing a global clamp.
739    #[test]
740    fn default_client_config_does_not_clamp_controller_max() {
741        let cfg = ClientConfig::default();
742        let (controller, _) = build_controller(&cfg);
743        let defaults = adaptive::ChannelMax::default();
744        // The legacy fields default to 32/8 (the prior static knobs),
745        // both of which are <= the per-channel adaptive defaults
746        // (128/64). build_controller must keep the larger, not clobber
747        // with the legacy values.
748        assert_eq!(controller.config.max.quote, defaults.quote);
749        assert_eq!(controller.config.max.store, defaults.store);
750        assert_eq!(controller.config.max.fetch, defaults.fetch);
751        // Compile-time-ish guard: if a new variant is added to Error,
752        // this match forces an update here.
753        let _ = |e: &Error| match e {
754            Error::Timeout(_)
755            | Error::Network(_)
756            | Error::InsufficientPeers(_)
757            | Error::Storage(_)
758            | Error::Payment(_)
759            | Error::Protocol(_)
760            | Error::InvalidData(_)
761            | Error::Serialization(_)
762            | Error::Crypto(_)
763            | Error::Io(_)
764            | Error::Config(_)
765            | Error::SignatureVerification(_)
766            | Error::Encryption(_)
767            | Error::AlreadyStored
768            | Error::InsufficientDiskSpace(_)
769            | Error::CostEstimationInconclusive(_)
770            | Error::Cancelled(_)
771            | Error::PartialUpload { .. }
772            | Error::BadQuoteBinding { .. }
773            | Error::RemotePut { .. }
774            | Error::CloseGroupShortfall(_) => (),
775        };
776    }
777}