Skip to main content

ant_core/data/client/
mod.rs

1//! Client operations for the Autonomi network.
2//!
3//! Provides high-level APIs for storing and retrieving data
4//! on the Autonomi decentralized network.
5
6pub mod adaptive;
7pub mod batch;
8pub mod cache;
9pub(crate) mod cached_merkle;
10pub(crate) mod cached_single;
11pub mod chunk;
12pub mod data;
13pub mod file;
14pub mod merkle;
15pub mod payment;
16pub mod quote;
17
18use crate::data::client::adaptive::{AdaptiveConfig, AdaptiveController, ChannelStart, Outcome};
19use crate::data::client::cache::ChunkCache;
20use crate::data::error::{Error, Result};
21use crate::data::network::Network;
22use crate::data::peer_cache;
23use ant_protocol::evm::Wallet;
24use ant_protocol::transport::{MultiAddr, P2PNode, PeerId};
25use ant_protocol::{XorName, CLOSE_GROUP_SIZE};
26use std::path::PathBuf;
27use std::sync::atomic::{AtomicU64, Ordering};
28use std::sync::Arc;
29use tracing::debug;
30
31/// Classify a `data::error::Error` into a controller `Outcome`.
32///
33/// Capacity signals (Timeout / NetworkError) drive the controller
34/// down; application errors do not. The mapping is conservative:
35/// anything that COULD be transport-related is treated as a network
36/// signal, because under-classifying a real network failure as
37/// "application error" makes the controller blind to genuine stress.
38///
39/// Mapping policy:
40/// - `Timeout` -> `Timeout` (per-op deadline elapsed)
41/// - `Network`, `InsufficientPeers`, `Io` -> `NetworkError` (transport
42///   layer reported failure)
43/// - `Protocol`, `Storage` -> `NetworkError` (these wrap remote errors
44///   that frequently include peer disconnects mid-stream — under
45///   network stress these are how transport failures surface)
46/// - `PartialUpload` -> `NetworkError` (literal capacity signal: some
47///   chunks could not be stored)
48/// - `AlreadyStored`, `Encryption`, `Crypto`, `Payment`,
49///   `Serialization`, `InvalidData`, `SignatureVerification`,
50///   `Config`, `InsufficientDiskSpace`, `CostEstimationInconclusive`,
51///   `Cancelled` -> `ApplicationError` (would happen on a perfectly
52///   healthy link; `Cancelled` is caller-initiated and must not be retried
53///   as a transport failure)
54/// - `RemotePut` -> `ApplicationError` (the remote node responded with a
55///   structured rejection — the transport succeeded, so the node declined
56///   at the application layer; not a local capacity signal)
57pub(crate) fn classify_error(err: &Error) -> Outcome {
58    match err {
59        Error::Timeout(_) => Outcome::Timeout,
60        Error::Network(_)
61        | Error::InsufficientPeers(_)
62        | Error::Io(_)
63        | Error::Protocol(_)
64        | Error::Storage(_)
65        | Error::PartialUpload { .. } => Outcome::NetworkError,
66        Error::AlreadyStored
67        | Error::Encryption(_)
68        | Error::Crypto(_)
69        | Error::Payment(_)
70        | Error::Serialization(_)
71        | Error::InvalidData(_)
72        | Error::SignatureVerification(_)
73        | Error::Config(_)
74        | Error::InsufficientDiskSpace(_)
75        | Error::CostEstimationInconclusive(_)
76        | Error::Cancelled(_)
77        | Error::BadQuoteBinding { .. }
78        // A remote node responded with a structured rejection — the
79        // transport round-trip succeeded, so the node declined at the
80        // application layer (payment/disk/quote/pool). Not a local
81        // capacity signal; recorded but must not push the limiter down.
82        | Error::RemotePut { .. } => Outcome::ApplicationError,
83    }
84}
85
86/// Compute XOR distance between a peer's ID bytes and a target address.
87///
88/// Uses the first 32 bytes of the peer ID (or fewer if shorter) XORed
89/// with the target address. The returned byte array sorts
90/// lexicographically from closest to furthest.
91pub(crate) fn peer_xor_distance(peer_id: &PeerId, target: &[u8; 32]) -> [u8; 32] {
92    let peer_bytes = peer_id.as_bytes();
93    let mut distance = [0u8; 32];
94    for (i, d) in distance.iter_mut().enumerate() {
95        let peer_byte = peer_bytes.get(i).copied().unwrap_or(0);
96        *d = peer_byte ^ target[i];
97    }
98    distance
99}
100
101/// Default timeout for lightweight network operations (quotes, DHT lookups) in seconds.
102const DEFAULT_QUOTE_TIMEOUT_SECS: u64 = 10;
103
104/// Default timeout for the per-peer chunk GET response and any other
105/// caller that explicitly reads `store_timeout_secs`, in seconds.
106///
107/// Note despite the name: this knob does **not** govern the non-merkle
108/// chunk PUT response timeout — that path uses the
109/// `STORE_RESPONSE_TIMEOUT` constant in `chunk.rs` directly. Nor does
110/// it govern the merkle batch PUT timeout — see
111/// `DEFAULT_MERKLE_STORE_TIMEOUT_SECS`.
112///
113/// 10 s matches the pre-existing `main` default and intentionally
114/// excludes residential-upload tuning, which is Mick's PR #78
115/// territory (splitting GET into its own field).
116const DEFAULT_STORE_TIMEOUT_SECS: u64 = 10;
117
118/// Default timeout for **merkle batch** chunk store operations in seconds.
119///
120/// Separate from `DEFAULT_STORE_TIMEOUT_SECS` because merkle PUTs carry
121/// an extra storer-side cost: the payment verifier runs an iterative
122/// DHT lookup (`CLOSENESS_LOOKUP_TIMEOUT` in `ant-node`, **240 s**
123/// post-PR #89) before accepting the proof.
124///
125/// This timeout MUST be >= the storer-side `CLOSENESS_LOOKUP_TIMEOUT`
126/// plus padding for the store-response round-trip and storer-local
127/// I/O. Otherwise the client gives up while the storer is still
128/// happily verifying, the storer wastes CPU/bandwidth on a chunk the
129/// client has already discarded, and the client re-targets a
130/// different close-K member — potentially double-storing the same
131/// chunk and polluting routing.
132///
133/// 270 s = 240 s (storer lookup) + 30 s padding (network RTT + LMDB
134/// put + fsync + clock skew tolerance).
135///
136/// This invariant must be re-validated if either side's timeout
137/// changes. Empirically surfaced as "every cross-region merkle chunk
138/// times out at 10 s" on a 210-node 7-region testnet run on
139/// 2026-05-12; bumping to 270 s flipped that 0/22 -> 9/9 pass rate.
140const DEFAULT_MERKLE_STORE_TIMEOUT_SECS: u64 = 270;
141
142/// Default timeout for chunk GET response operations in seconds.
143const DEFAULT_CHUNK_GET_TIMEOUT_SECS: u64 = 10;
144
145/// Default quote concurrency: high because quoting is pure network I/O
146/// (DHT lookups + small request/response messages) with no CPU-bound work.
147const DEFAULT_QUOTE_CONCURRENCY: usize = 32;
148
149/// Default store concurrency: moderate because each chunk PUT sends ~4MB
150/// to 7 close-group peers. At 8 concurrent stores, ~225MB of outbound
151/// traffic can be in flight. Users on fast connections can increase this
152/// with --store-concurrency; users on slow connections can decrease it.
153const DEFAULT_STORE_CONCURRENCY: usize = 8;
154
155/// Configuration for the Autonomi client.
156#[derive(Debug, Clone)]
157pub struct ClientConfig {
158    /// Per-op timeout for lightweight network operations (quotes,
159    /// DHT lookups), in seconds. The adaptive controller does NOT
160    /// currently size timeouts; this remains a static knob.
161    pub quote_timeout_secs: u64,
162    /// Per-op timeout, in seconds, for the chunk GET response path
163    /// (`chunk_get_from_peer`) and any other caller that reads this
164    /// field directly.
165    ///
166    /// Note despite the historical name `store_timeout_secs`: this
167    /// knob does **not** govern the non-merkle chunk PUT response
168    /// timeout (that path uses the `STORE_RESPONSE_TIMEOUT` constant
169    /// in `chunk.rs`) and does **not** govern the merkle batch PUT
170    /// timeout (see `merkle_store_timeout_secs`). Rename pending in
171    /// Mick's PR #78 which adds a dedicated `chunk_get_timeout_secs`.
172    ///
173    /// The adaptive controller does NOT currently size timeouts;
174    /// this remains a static knob.
175    pub store_timeout_secs: u64,
176    /// Per-op timeout for **merkle batch** chunk store (PUT)
177    /// operations, in seconds. Separate from `store_timeout_secs`
178    /// because merkle PUTs incur the storer-side
179    /// `CLOSENESS_LOOKUP_TIMEOUT` (240 s post-PR #89) on top of the
180    /// usual store path; the client must wait at least that long
181    /// plus padding, or the storer wastes work on a chunk the client
182    /// has already given up on. Default 270 s.
183    pub merkle_store_timeout_secs: u64,
184    /// Per-peer response timeout for chunk GET operations, in seconds.
185    /// This is intentionally independent from `store_timeout_secs`: PUTs
186    /// and GETs have different payload direction and performance profiles.
187    pub chunk_get_timeout_secs: u64,
188    /// Number of closest peers to consider for routing.
189    pub close_group_size: usize,
190    /// **Deprecated.** Pre-adaptive ceiling for quote concurrency.
191    ///
192    /// The adaptive controller now sizes quote fan-out from observed
193    /// signals. This field, when non-zero and smaller than the
194    /// controller's per-channel default, clamps the **quote channel
195    /// only** (it does NOT bleed into store or fetch). Removed in a
196    /// future release.
197    pub quote_concurrency: usize,
198    /// **Deprecated.** Pre-adaptive ceiling for store concurrency.
199    ///
200    /// The adaptive controller now sizes store fan-out from observed
201    /// signals. This field, when non-zero and smaller than the
202    /// controller's per-channel default, clamps the **store channel
203    /// only** (it does NOT bleed into quote or fetch). Removed in a
204    /// future release.
205    pub store_concurrency: usize,
206    /// Adaptive controller configuration. Defaults are tuned to match
207    /// or exceed the prior static behavior — disabling adaptation
208    /// (`adaptive.enabled = false`) reverts to the controller's
209    /// `initial` values without re-evaluation.
210    pub adaptive: AdaptiveConfig,
211    /// Allow loopback (`127.0.0.1`) connections in the saorsa-transport
212    /// layer. Set to `true` only for devnet / local testing. Production
213    /// peers on the public Autonomi network reject the QUIC handshake
214    /// variant produced when this is `true`, so the default is `false`.
215    ///
216    /// This mirrors the `--allow-loopback` flag in `ant-cli`, which already
217    /// defaults to `false` and threads through to the same
218    /// `CoreNodeConfig::builder().local(...)` call.
219    pub allow_loopback: bool,
220    /// Bind a dual-stack IPv6 socket (`true`) or an IPv4-only socket
221    /// (`false`). Defaults to `true`, matching the CLI default.
222    ///
223    /// Set to `false` only when running on hosts without a working IPv6
224    /// stack, to avoid advertising unreachable v6 addresses to the DHT
225    /// (which causes slow connects and junk DHT address records). This
226    /// mirrors the `--ipv4-only` flag in `ant-cli`.
227    pub ipv6: bool,
228}
229
230impl Default for ClientConfig {
231    fn default() -> Self {
232        Self {
233            quote_timeout_secs: DEFAULT_QUOTE_TIMEOUT_SECS,
234            store_timeout_secs: DEFAULT_STORE_TIMEOUT_SECS,
235            merkle_store_timeout_secs: DEFAULT_MERKLE_STORE_TIMEOUT_SECS,
236            chunk_get_timeout_secs: DEFAULT_CHUNK_GET_TIMEOUT_SECS,
237            close_group_size: CLOSE_GROUP_SIZE,
238            quote_concurrency: DEFAULT_QUOTE_CONCURRENCY,
239            store_concurrency: DEFAULT_STORE_CONCURRENCY,
240            adaptive: AdaptiveConfig::default(),
241            allow_loopback: false,
242            ipv6: true,
243        }
244    }
245}
246
247/// Build the adaptive controller for a `Client`. Loads any persisted
248/// snapshot, clamps cold-start values into the deprecated-flag bounds
249/// **per channel** (so a pin on `--store-concurrency` does NOT bleed
250/// into the fetch / quote channels), and returns the persistence path
251/// so callers can save back at shutdown.
252fn build_controller(config: &ClientConfig) -> (AdaptiveController, Option<PathBuf>) {
253    let mut adaptive_cfg = config.adaptive.clone();
254
255    // Per-channel ceilings: each legacy field is interpreted as a cap
256    // for ONLY its matching channel. The fetch channel has no
257    // pre-existing legacy field; it always uses the controller's
258    // default ceiling.
259    //
260    // The legacy fields are non-zero by ClientConfig::default(), but
261    // we honor them as bounds only when they would actually CONSTRAIN
262    // the controller — i.e. when smaller than the per-channel default
263    // max. A default ClientConfig must not silently lower the
264    // controller's ceilings.
265    // A value equal to the historic legacy default is treated as
266    // "not pinned by the user" — without this, every default
267    // ClientConfig would silently lower the controller's per-channel
268    // ceilings to the prior static values (32/8) and the controller
269    // could never grow above them.
270    let user_quote_max = config.quote_concurrency;
271    let user_store_max = config.store_concurrency;
272    let quote_pinned = user_quote_max > 0 && user_quote_max != DEFAULT_QUOTE_CONCURRENCY;
273    let store_pinned = user_store_max > 0 && user_store_max != DEFAULT_STORE_CONCURRENCY;
274    if quote_pinned && user_quote_max < adaptive_cfg.max.quote {
275        adaptive_cfg.max.quote = user_quote_max;
276    }
277    if store_pinned && user_store_max < adaptive_cfg.max.store {
278        adaptive_cfg.max.store = user_store_max;
279    }
280
281    // Cold-start values: matched to the prior static defaults. If the
282    // legacy field caps the channel below the cold-start, lower the
283    // start to match — never start above the channel's max.
284    let mut start = ChannelStart::default();
285    start.quote = start.quote.min(adaptive_cfg.max.quote);
286    start.store = start.store.min(adaptive_cfg.max.store);
287    start.fetch = start.fetch.min(adaptive_cfg.max.fetch);
288
289    let adaptive_enabled = adaptive_cfg.enabled;
290    let controller = AdaptiveController::new(start, adaptive_cfg);
291    // Skip disk warm-start entirely when adaptation is disabled —
292    // fixed-concurrency mode means the user wants exactly the cold
293    // start, no surprises from prior runs. (warm_start is also a
294    // no-op when disabled, but skipping the load avoids file I/O
295    // and the path-resolution side effects.)
296    let persist_path = if adaptive_enabled {
297        let p = adaptive::default_persist_path();
298        if let Some(ref path) = p {
299            if let Some(snap) = adaptive::load_snapshot(path) {
300                debug!(path = %path.display(), "adaptive: warm-start from disk");
301                controller.warm_start(snap);
302            }
303        }
304        p
305    } else {
306        // Even with adaptation off, persist_path is computed so
307        // explicit save_adaptive_snapshot() calls still work — but
308        // the controller currently never moves, so saving the cold
309        // start is harmless.
310        adaptive::default_persist_path()
311    };
312
313    // File downloads choose a stream-decrypt batch size per download
314    // from the current fetch cap and usable RAM, then pass it into
315    // self_encryption's runtime batch-size API. The adaptive controller
316    // still drives fan-out inside each batch by re-reading
317    // `controller.fetch.current()` in the decrypt callback.
318
319    (controller, persist_path)
320}
321
322/// Client for the Autonomi decentralized network.
323///
324/// Provides high-level APIs for storing and retrieving chunks
325/// and files on the network.
326pub struct Client {
327    config: ClientConfig,
328    network: Network,
329    wallet: Option<Arc<Wallet>>,
330    evm_network: Option<ant_protocol::evm::Network>,
331    chunk_cache: ChunkCache,
332    next_request_id: AtomicU64,
333    /// Adaptive concurrency controller: replaces the static
334    /// quote/store concurrency knobs. See `adaptive` module.
335    controller: AdaptiveController,
336    /// Path the controller persists its snapshot to. `None` disables
337    /// persistence (useful for tests / non-disk environments).
338    persist_path: Option<PathBuf>,
339    /// Path for the persistent client peer cache. `None` disables the cache.
340    peer_cache_path: Option<PathBuf>,
341}
342
343impl Client {
344    /// Create a client connected to the given P2P node.
345    #[must_use]
346    pub fn from_node(node: Arc<P2PNode>, config: ClientConfig) -> Self {
347        Self::from_node_with_peer_cache(node, config, None)
348    }
349
350    /// Create a client connected to the given P2P node and attach an optional
351    /// persistent peer cache path.
352    #[must_use]
353    pub fn from_node_with_peer_cache(
354        node: Arc<P2PNode>,
355        config: ClientConfig,
356        peer_cache_path: Option<PathBuf>,
357    ) -> Self {
358        let network = Network::from_node(node);
359        let (controller, persist_path) = build_controller(&config);
360        Self {
361            config,
362            network,
363            wallet: None,
364            evm_network: None,
365            chunk_cache: ChunkCache::default(),
366            next_request_id: AtomicU64::new(1),
367            controller,
368            persist_path,
369            peer_cache_path,
370        }
371    }
372
373    /// Create a client connected to bootstrap peers.
374    ///
375    /// Threads `config.allow_loopback` and `config.ipv6` through to
376    /// `Network::new`, which controls the saorsa-transport `local` and
377    /// `ipv6` flags on the underlying `CoreNodeConfig`. See
378    /// `ClientConfig::allow_loopback` and `ClientConfig::ipv6` for details.
379    ///
380    /// # Errors
381    ///
382    /// Returns an error if the P2P node cannot be created or bootstrapping fails.
383    pub async fn connect(
384        bootstrap_peers: &[std::net::SocketAddr],
385        config: ClientConfig,
386    ) -> Result<Self> {
387        debug!(
388            "Connecting to Autonomi network with {} bootstrap peers (allow_loopback={}, ipv6={})",
389            bootstrap_peers.len(),
390            config.allow_loopback,
391            config.ipv6,
392        );
393        let network = Network::new(bootstrap_peers, config.allow_loopback, config.ipv6).await?;
394        let (controller, persist_path) = build_controller(&config);
395        Ok(Self {
396            config,
397            network,
398            wallet: None,
399            evm_network: None,
400            chunk_cache: ChunkCache::default(),
401            next_request_id: AtomicU64::new(1),
402            controller,
403            persist_path,
404            peer_cache_path: None,
405        })
406    }
407
408    /// Set the wallet for payment operations.
409    ///
410    /// Also populates the EVM network from the wallet so that
411    /// token approvals work without a separate `with_evm_network` call.
412    #[must_use]
413    pub fn with_wallet(mut self, wallet: Wallet) -> Self {
414        self.evm_network = Some(wallet.network().clone());
415        self.wallet = Some(Arc::new(wallet));
416        self
417    }
418
419    /// Set the EVM network without requiring a wallet.
420    ///
421    /// This enables token approval and contract interactions
422    /// for external-signer flows where the private key lives outside Rust.
423    #[must_use]
424    pub fn with_evm_network(mut self, network: ant_protocol::evm::Network) -> Self {
425        self.evm_network = Some(network);
426        self
427    }
428
429    /// Get the EVM network, falling back to the wallet's network if available.
430    ///
431    /// # Errors
432    ///
433    /// Returns an error if neither `with_evm_network` nor `with_wallet` was called.
434    pub(crate) fn require_evm_network(&self) -> Result<&ant_protocol::evm::Network> {
435        if let Some(ref net) = self.evm_network {
436            return Ok(net);
437        }
438        if let Some(ref wallet) = self.wallet {
439            return Ok(wallet.network());
440        }
441        Err(Error::Payment(
442            "EVM network not configured — call with_evm_network() or with_wallet() first"
443                .to_string(),
444        ))
445    }
446
447    /// Get the client configuration.
448    #[must_use]
449    pub fn config(&self) -> &ClientConfig {
450        &self.config
451    }
452
453    /// Get a mutable reference to the client configuration.
454    pub fn config_mut(&mut self) -> &mut ClientConfig {
455        &mut self.config
456    }
457
458    /// Get a reference to the network layer.
459    #[must_use]
460    pub fn network(&self) -> &Network {
461        &self.network
462    }
463
464    /// Get the wallet, if configured.
465    #[must_use]
466    pub fn wallet(&self) -> Option<&Arc<Wallet>> {
467        self.wallet.as_ref()
468    }
469
470    /// Get a reference to the chunk cache.
471    #[must_use]
472    pub fn chunk_cache(&self) -> &ChunkCache {
473        &self.chunk_cache
474    }
475
476    /// Adaptive concurrency controller. Hot loops read
477    /// `controller().<channel>.current()` to size their fan-out and
478    /// call `.observe(...)` on each completion.
479    #[must_use]
480    pub fn controller(&self) -> &AdaptiveController {
481        &self.controller
482    }
483
484    /// Persist the current adaptive snapshot to disk so the next
485    /// `Client::connect` warm-starts at the learned values instead of
486    /// cold defaults. Best effort — failures log and are discarded.
487    /// Idempotent. Safe to call from a Drop impl or an explicit
488    /// shutdown hook.
489    pub fn save_adaptive_snapshot(&self) {
490        if let Some(ref path) = self.persist_path {
491            adaptive::save_snapshot(path, self.controller.snapshot());
492        }
493    }
494
495    /// Persist currently connected peers that have Direct-tagged addresses in
496    /// the DHT. Best effort; failures are logged and do not affect the client
497    /// operation that just completed.
498    pub async fn save_peer_cache(&self) {
499        if let Some(ref path) = self.peer_cache_path {
500            let node = self.network().node();
501            peer_cache::promote_connected_direct_peers(node.as_ref(), path, node.dht().k_value())
502                .await;
503        }
504    }
505
506    /// Get the next request ID for protocol messages.
507    pub(crate) fn next_request_id(&self) -> u64 {
508        self.next_request_id.fetch_add(1, Ordering::Relaxed)
509    }
510
511    /// Return all peers in the close group for a target address.
512    ///
513    /// Queries the DHT for the closest peers by XOR distance.
514    /// Returns each peer paired with its known network addresses.
515    pub(crate) async fn close_group_peers(
516        &self,
517        target: &XorName,
518    ) -> Result<Vec<(PeerId, Vec<MultiAddr>)>> {
519        self.closest_peers(target, self.config().close_group_size)
520            .await
521    }
522
523    /// Return the requested number of closest peers for a target address.
524    ///
525    /// Queries the DHT for peers by XOR distance. Returns each peer
526    /// paired with its known network addresses.
527    pub(crate) async fn closest_peers(
528        &self,
529        target: &XorName,
530        count: usize,
531    ) -> Result<Vec<(PeerId, Vec<MultiAddr>)>> {
532        let peers = self.network().find_closest_peers(target, count).await?;
533
534        if peers.is_empty() {
535            return Err(Error::InsufficientPeers(
536                "DHT returned no peers for target address".to_string(),
537            ));
538        }
539        Ok(peers)
540    }
541}
542
543/// Persist the adaptive snapshot when the `Client` is dropped, so any
544/// caller — CLI, daemon, library user, integration test — gets
545/// warm-start carry-over for free without remembering to call
546/// `save_adaptive_snapshot()` explicitly. Best effort, sync `std::fs`,
547/// no panic risk on a poisoned mutex (the inner helper handles it).
548///
549/// We deliberately write SYNCHRONOUSLY (not via `spawn_blocking`)
550/// because Drop runs during process shutdown / runtime teardown,
551/// when fire-and-forget background tasks can be dropped before they
552/// complete and the snapshot is silently lost. A small synchronous
553/// stall on a tokio worker (typically <1ms for a local-disk JSON
554/// write of ~50 bytes) is the right tradeoff for guaranteed
555/// persistence — BOUNDED by `DROP_SAVE_TIMEOUT` so a stalled
556/// network-mounted data dir cannot block process shutdown.
557const DROP_SAVE_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(500);
558
559impl Drop for Client {
560    fn drop(&mut self) {
561        let Some(path) = self.persist_path.clone() else {
562            return;
563        };
564        let snap = self.controller.snapshot();
565        adaptive::save_snapshot_with_timeout(path, snap, DROP_SAVE_TIMEOUT);
566    }
567}
568
569#[cfg(test)]
570#[allow(clippy::unwrap_used)]
571mod tests {
572    use super::*;
573
574    /// Cover EVERY variant of `data::error::Error`. Build an instance of
575    /// each, classify it, and assert the resulting `Outcome` matches the
576    /// only sensible mapping. If a future commit adds a new error variant
577    /// without updating `classify_error`, this test fails to ensure the
578    /// adaptive controller always sees correct capacity signals.
579    ///
580    /// Mapping policy (mirrors `classify_error` doc):
581    /// - `Timeout` -> `Outcome::Timeout`
582    /// - `Network`, `InsufficientPeers`, `Io`, `Protocol`, `Storage`,
583    ///   `PartialUpload` -> `Outcome::NetworkError` (transport-related
584    ///   or literal capacity failure)
585    /// - everything else -> `Outcome::ApplicationError` (would happen
586    ///   on a perfectly healthy network)
587    #[test]
588    fn classify_error_covers_all_variants() {
589        let cases: Vec<(Error, Outcome)> = vec![
590            (Error::Timeout("t".to_string()), Outcome::Timeout),
591            (Error::Network("n".to_string()), Outcome::NetworkError),
592            (
593                Error::InsufficientPeers("p".to_string()),
594                Outcome::NetworkError,
595            ),
596            (Error::Storage("s".to_string()), Outcome::NetworkError),
597            (Error::Payment("p".to_string()), Outcome::ApplicationError),
598            (Error::Protocol("p".to_string()), Outcome::NetworkError),
599            (
600                Error::InvalidData("d".to_string()),
601                Outcome::ApplicationError,
602            ),
603            (
604                Error::Serialization("s".to_string()),
605                Outcome::ApplicationError,
606            ),
607            (Error::Crypto("c".to_string()), Outcome::ApplicationError),
608            (
609                Error::Io(std::io::Error::other("io")),
610                Outcome::NetworkError,
611            ),
612            (Error::Config("c".to_string()), Outcome::ApplicationError),
613            (
614                Error::SignatureVerification("s".to_string()),
615                Outcome::ApplicationError,
616            ),
617            (
618                Error::Encryption("e".to_string()),
619                Outcome::ApplicationError,
620            ),
621            (Error::AlreadyStored, Outcome::ApplicationError),
622            (
623                Error::InsufficientDiskSpace("d".to_string()),
624                Outcome::ApplicationError,
625            ),
626            (
627                Error::CostEstimationInconclusive("c".to_string()),
628                Outcome::ApplicationError,
629            ),
630            (
631                Error::PartialUpload {
632                    stored: vec![],
633                    stored_count: 0,
634                    failed: vec![],
635                    failed_count: 0,
636                    total_chunks: 0,
637                    spend: Box::new(crate::data::error::PartialUploadSpend {
638                        storage_cost_atto: "0".to_string(),
639                        gas_cost_wei: 0,
640                    }),
641                    reason: "r".to_string(),
642                },
643                Outcome::NetworkError,
644            ),
645            (
646                Error::BadQuoteBinding {
647                    peer_id: "peer".to_string(),
648                    detail: "mismatch".to_string(),
649                },
650                Outcome::ApplicationError,
651            ),
652            // A remote application rejection: the node responded with a
653            // structured `ProtocolError`, so the transport succeeded and
654            // this must NOT register as a capacity signal (V2-468).
655            (
656                Error::RemotePut {
657                    address: "abcd".to_string(),
658                    source: ant_protocol::ProtocolError::PaymentFailed("stale quote".to_string()),
659                },
660                Outcome::ApplicationError,
661            ),
662        ];
663        for (err, expected) in &cases {
664            let got = classify_error(err);
665            assert_eq!(
666                got, *expected,
667                "classify_error({err:?}) = {got:?}, expected {expected:?}",
668            );
669        }
670    }
671
672    /// C4 fix guard: pinning the legacy `quote_concurrency` /
673    /// `store_concurrency` ClientConfig fields must clamp ONLY the
674    /// matching channel's max in the resulting controller. The fetch
675    /// (download) channel must keep its full default ceiling.
676    #[test]
677    fn legacy_concurrency_pin_does_not_bleed_across_channels() {
678        let cfg = ClientConfig {
679            quote_concurrency: 4,
680            store_concurrency: 2,
681            ..ClientConfig::default()
682        };
683        let (controller, _) = build_controller(&cfg);
684        // The store/quote caps must be clamped to the user's pin.
685        assert_eq!(controller.config.max.quote, 4, "quote pin not respected");
686        assert_eq!(controller.config.max.store, 2, "store pin not respected");
687        // The fetch cap must NOT have been lowered — that's the
688        // regression C4 was about.
689        let default_fetch_max = adaptive::ChannelMax::default().fetch;
690        assert_eq!(
691            controller.config.max.fetch, default_fetch_max,
692            "fetch cap was lowered by store/quote pin (C4 regression)"
693        );
694        // Cold-start values must respect the lowered ceilings.
695        assert!(
696            controller.quote.current() <= 4,
697            "quote start exceeds its cap"
698        );
699        assert!(
700            controller.store.current() <= 2,
701            "store start exceeds its cap"
702        );
703    }
704
705    /// Default ClientConfig must NOT silently lower the controller's
706    /// per-channel ceilings — the adaptive defaults give every channel
707    /// real headroom to grow. This guards against future commits
708    /// re-introducing a global clamp.
709    #[test]
710    fn default_client_config_does_not_clamp_controller_max() {
711        let cfg = ClientConfig::default();
712        let (controller, _) = build_controller(&cfg);
713        let defaults = adaptive::ChannelMax::default();
714        // The legacy fields default to 32/8 (the prior static knobs),
715        // both of which are <= the per-channel adaptive defaults
716        // (128/64). build_controller must keep the larger, not clobber
717        // with the legacy values.
718        assert_eq!(controller.config.max.quote, defaults.quote);
719        assert_eq!(controller.config.max.store, defaults.store);
720        assert_eq!(controller.config.max.fetch, defaults.fetch);
721        // Compile-time-ish guard: if a new variant is added to Error,
722        // this match forces an update here.
723        let _ = |e: &Error| match e {
724            Error::Timeout(_)
725            | Error::Network(_)
726            | Error::InsufficientPeers(_)
727            | Error::Storage(_)
728            | Error::Payment(_)
729            | Error::Protocol(_)
730            | Error::InvalidData(_)
731            | Error::Serialization(_)
732            | Error::Crypto(_)
733            | Error::Io(_)
734            | Error::Config(_)
735            | Error::SignatureVerification(_)
736            | Error::Encryption(_)
737            | Error::AlreadyStored
738            | Error::InsufficientDiskSpace(_)
739            | Error::CostEstimationInconclusive(_)
740            | Error::Cancelled(_)
741            | Error::PartialUpload { .. }
742            | Error::BadQuoteBinding { .. }
743            | Error::RemotePut { .. } => (),
744        };
745    }
746}