ant_core/data/client/mod.rs
1//! Client operations for the Autonomi network.
2//!
3//! Provides high-level APIs for storing and retrieving data
4//! on the Autonomi decentralized network.
5
6pub mod adaptive;
7pub mod batch;
8pub mod cache;
9pub(crate) mod cached_merkle;
10pub(crate) mod cached_single;
11pub mod chunk;
12pub mod data;
13pub mod file;
14pub mod merkle;
15pub mod payment;
16pub mod quote;
17
18use crate::data::client::adaptive::{AdaptiveConfig, AdaptiveController, ChannelStart, Outcome};
19use crate::data::client::cache::ChunkCache;
20use crate::data::error::{Error, Result};
21use crate::data::network::Network;
22use crate::data::peer_cache;
23use ant_protocol::evm::Wallet;
24use ant_protocol::transport::{MultiAddr, P2PNode, PeerId};
25use ant_protocol::{XorName, CLOSE_GROUP_SIZE};
26use std::path::PathBuf;
27use std::sync::atomic::{AtomicU64, Ordering};
28use std::sync::Arc;
29use tracing::debug;
30
31/// Width of the chunk PUT-target set (initial writes plus fallback): the
32/// closest `PUT_TARGET_WIDTH` peers to the address.
33///
34/// Mirrors the node-side `K_BUCKET_SIZE` / `PAID_QUOTE_ISSUER_CLOSENESS_WIDTH`
35/// (20): a node accepts a reused payment proof only when one of the proof's
36/// closest-`CLOSE_GROUP_SIZE` quote issuers is within its own local 20-closest,
37/// so trying peers past this width is pointless.
38pub(crate) const PUT_TARGET_WIDTH: usize = 20;
39
40/// Classify a `data::error::Error` into a controller `Outcome`.
41///
42/// Capacity signals (Timeout / NetworkError) drive the controller
43/// down; application errors do not. The mapping is conservative:
44/// anything that COULD be transport-related is treated as a network
45/// signal, because under-classifying a real network failure as
46/// "application error" makes the controller blind to genuine stress.
47///
48/// Mapping policy:
49/// - `Timeout` -> `Timeout` (per-op deadline elapsed)
50/// - `Network`, `InsufficientPeers`, `Io` -> `NetworkError` (transport
51/// layer reported failure)
52/// - `Protocol`, `Storage` -> `NetworkError` (these wrap remote errors
53/// that frequently include peer disconnects mid-stream — under
54/// network stress these are how transport failures surface)
55/// - `PartialUpload` -> `NetworkError` (literal capacity signal: some
56/// chunks could not be stored)
57/// - `AlreadyStored`, `Encryption`, `Crypto`, `Payment`,
58/// `Serialization`, `InvalidData`, `SignatureVerification`,
59/// `Config`, `InsufficientDiskSpace`, `CostEstimationInconclusive`,
60/// `Cancelled` -> `ApplicationError` (would happen on a perfectly
61/// healthy link; `Cancelled` is caller-initiated and must not be retried
62/// as a transport failure)
63/// - `RemotePut` -> `ApplicationError` (the remote node responded with a
64/// structured rejection — the transport succeeded, so the node declined
65/// at the application layer; not a local capacity signal)
66pub(crate) fn classify_error(err: &Error) -> Outcome {
67 match err {
68 Error::Timeout(_) => Outcome::Timeout,
69 Error::Network(_)
70 | Error::InsufficientPeers(_)
71 | Error::Io(_)
72 | Error::Protocol(_)
73 | Error::Storage(_)
74 | Error::PartialUpload { .. } => Outcome::NetworkError,
75 Error::AlreadyStored
76 | Error::Encryption(_)
77 | Error::Crypto(_)
78 | Error::Payment(_)
79 | Error::Serialization(_)
80 | Error::InvalidData(_)
81 | Error::SignatureVerification(_)
82 | Error::Config(_)
83 | Error::InsufficientDiskSpace(_)
84 | Error::CostEstimationInconclusive(_)
85 | Error::Cancelled(_)
86 | Error::BadQuoteBinding { .. }
87 // A remote node responded with a structured rejection — the
88 // transport round-trip succeeded, so the node declined at the
89 // application layer (payment/disk/quote/pool). Not a local
90 // capacity signal; recorded but must not push the limiter down.
91 | Error::RemotePut { .. } => Outcome::ApplicationError,
92 }
93}
94
95/// Compute XOR distance between a peer's ID bytes and a target address.
96///
97/// Uses the first 32 bytes of the peer ID (or fewer if shorter) XORed
98/// with the target address. The returned byte array sorts
99/// lexicographically from closest to furthest.
100pub(crate) fn peer_xor_distance(peer_id: &PeerId, target: &[u8; 32]) -> [u8; 32] {
101 let peer_bytes = peer_id.as_bytes();
102 let mut distance = [0u8; 32];
103 for (i, d) in distance.iter_mut().enumerate() {
104 let peer_byte = peer_bytes.get(i).copied().unwrap_or(0);
105 *d = peer_byte ^ target[i];
106 }
107 distance
108}
109
110/// Default timeout for lightweight network operations (quotes, DHT lookups) in seconds.
111const DEFAULT_QUOTE_TIMEOUT_SECS: u64 = 10;
112
113/// Default timeout for the per-peer chunk GET response and any other
114/// caller that explicitly reads `store_timeout_secs`, in seconds.
115///
116/// Note despite the name: this knob does **not** govern the non-merkle
117/// chunk PUT response timeout — that path uses the
118/// `STORE_RESPONSE_TIMEOUT` constant in `chunk.rs` directly. Nor does
119/// it govern the merkle batch PUT timeout — see
120/// `DEFAULT_MERKLE_STORE_TIMEOUT_SECS`.
121///
122/// 10 s matches the pre-existing `main` default and intentionally
123/// excludes residential-upload tuning, which is Mick's PR #78
124/// territory (splitting GET into its own field).
125const DEFAULT_STORE_TIMEOUT_SECS: u64 = 10;
126
127/// Default timeout for **merkle batch** chunk store operations in seconds.
128///
129/// Separate from `DEFAULT_STORE_TIMEOUT_SECS` because merkle PUTs carry
130/// an extra storer-side cost: the payment verifier runs an iterative
131/// DHT lookup (`CLOSENESS_LOOKUP_TIMEOUT` in `ant-node`, **240 s**
132/// post-PR #89) before accepting the proof.
133///
134/// This timeout MUST be >= the storer-side `CLOSENESS_LOOKUP_TIMEOUT`
135/// plus padding for the store-response round-trip and storer-local
136/// I/O. Otherwise the client gives up while the storer is still
137/// happily verifying, the storer wastes CPU/bandwidth on a chunk the
138/// client has already discarded, and the client re-targets a
139/// different close-K member — potentially double-storing the same
140/// chunk and polluting routing.
141///
142/// 270 s = 240 s (storer lookup) + 30 s padding (network RTT + LMDB
143/// put + fsync + clock skew tolerance).
144///
145/// This invariant must be re-validated if either side's timeout
146/// changes. Empirically surfaced as "every cross-region merkle chunk
147/// times out at 10 s" on a 210-node 7-region testnet run on
148/// 2026-05-12; bumping to 270 s flipped that 0/22 -> 9/9 pass rate.
149const DEFAULT_MERKLE_STORE_TIMEOUT_SECS: u64 = 270;
150
151/// Default timeout for chunk GET response operations in seconds.
152const DEFAULT_CHUNK_GET_TIMEOUT_SECS: u64 = 10;
153
154/// Default quote concurrency: high because quoting is pure network I/O
155/// (DHT lookups + small request/response messages) with no CPU-bound work.
156const DEFAULT_QUOTE_CONCURRENCY: usize = 32;
157
158/// Default store concurrency: moderate because each chunk PUT sends ~4MB
159/// to 7 close-group peers. At 8 concurrent stores, ~225MB of outbound
160/// traffic can be in flight. Users on fast connections can increase this
161/// with --store-concurrency; users on slow connections can decrease it.
162const DEFAULT_STORE_CONCURRENCY: usize = 8;
163
164/// Configuration for the Autonomi client.
165#[derive(Debug, Clone)]
166pub struct ClientConfig {
167 /// Per-op timeout for lightweight network operations (quotes,
168 /// DHT lookups), in seconds. The adaptive controller does NOT
169 /// currently size timeouts; this remains a static knob.
170 pub quote_timeout_secs: u64,
171 /// Per-op timeout, in seconds, for the chunk GET response path
172 /// (`chunk_get_from_peer`) and any other caller that reads this
173 /// field directly.
174 ///
175 /// Note despite the historical name `store_timeout_secs`: this
176 /// knob does **not** govern the non-merkle chunk PUT response
177 /// timeout (that path uses the `STORE_RESPONSE_TIMEOUT` constant
178 /// in `chunk.rs`) and does **not** govern the merkle batch PUT
179 /// timeout (see `merkle_store_timeout_secs`). Rename pending in
180 /// Mick's PR #78 which adds a dedicated `chunk_get_timeout_secs`.
181 ///
182 /// The adaptive controller does NOT currently size timeouts;
183 /// this remains a static knob.
184 pub store_timeout_secs: u64,
185 /// Per-op timeout for **merkle batch** chunk store (PUT)
186 /// operations, in seconds. Separate from `store_timeout_secs`
187 /// because merkle PUTs incur the storer-side
188 /// `CLOSENESS_LOOKUP_TIMEOUT` (240 s post-PR #89) on top of the
189 /// usual store path; the client must wait at least that long
190 /// plus padding, or the storer wastes work on a chunk the client
191 /// has already given up on. Default 270 s.
192 pub merkle_store_timeout_secs: u64,
193 /// Per-peer response timeout for chunk GET operations, in seconds.
194 /// This is intentionally independent from `store_timeout_secs`: PUTs
195 /// and GETs have different payload direction and performance profiles.
196 pub chunk_get_timeout_secs: u64,
197 /// Number of closest peers to consider for routing.
198 pub close_group_size: usize,
199 /// **Deprecated.** Pre-adaptive ceiling for quote concurrency.
200 ///
201 /// The adaptive controller now sizes quote fan-out from observed
202 /// signals. This field, when non-zero and smaller than the
203 /// controller's per-channel default, clamps the **quote channel
204 /// only** (it does NOT bleed into store or fetch). Removed in a
205 /// future release.
206 pub quote_concurrency: usize,
207 /// **Deprecated.** Pre-adaptive ceiling for store concurrency.
208 ///
209 /// The adaptive controller now sizes store fan-out from observed
210 /// signals. This field, when non-zero and smaller than the
211 /// controller's per-channel default, clamps the **store channel
212 /// only** (it does NOT bleed into quote or fetch). Removed in a
213 /// future release.
214 pub store_concurrency: usize,
215 /// Adaptive controller configuration. Defaults are tuned to match
216 /// or exceed the prior static behavior — disabling adaptation
217 /// (`adaptive.enabled = false`) reverts to the controller's
218 /// `initial` values without re-evaluation.
219 pub adaptive: AdaptiveConfig,
220 /// Allow loopback (`127.0.0.1`) connections in the saorsa-transport
221 /// layer. Set to `true` only for devnet / local testing. Production
222 /// peers on the public Autonomi network reject the QUIC handshake
223 /// variant produced when this is `true`, so the default is `false`.
224 ///
225 /// This mirrors the `--allow-loopback` flag in `ant-cli`, which already
226 /// defaults to `false` and threads through to the same
227 /// `CoreNodeConfig::builder().local(...)` call.
228 pub allow_loopback: bool,
229 /// Bind a dual-stack IPv6 socket (`true`) or an IPv4-only socket
230 /// (`false`). Defaults to `true`, matching the CLI default.
231 ///
232 /// Set to `false` only when running on hosts without a working IPv6
233 /// stack, to avoid advertising unreachable v6 addresses to the DHT
234 /// (which causes slow connects and junk DHT address records). This
235 /// mirrors the `--ipv4-only` flag in `ant-cli`.
236 pub ipv6: bool,
237}
238
239impl Default for ClientConfig {
240 fn default() -> Self {
241 Self {
242 quote_timeout_secs: DEFAULT_QUOTE_TIMEOUT_SECS,
243 store_timeout_secs: DEFAULT_STORE_TIMEOUT_SECS,
244 merkle_store_timeout_secs: DEFAULT_MERKLE_STORE_TIMEOUT_SECS,
245 chunk_get_timeout_secs: DEFAULT_CHUNK_GET_TIMEOUT_SECS,
246 close_group_size: CLOSE_GROUP_SIZE,
247 quote_concurrency: DEFAULT_QUOTE_CONCURRENCY,
248 store_concurrency: DEFAULT_STORE_CONCURRENCY,
249 adaptive: AdaptiveConfig::default(),
250 allow_loopback: false,
251 ipv6: true,
252 }
253 }
254}
255
256/// Build the adaptive controller for a `Client`. Loads any persisted
257/// snapshot, clamps cold-start values into the deprecated-flag bounds
258/// **per channel** (so a pin on `--store-concurrency` does NOT bleed
259/// into the fetch / quote channels), and returns the persistence path
260/// so callers can save back at shutdown.
261fn build_controller(config: &ClientConfig) -> (AdaptiveController, Option<PathBuf>) {
262 let mut adaptive_cfg = config.adaptive.clone();
263
264 // Per-channel ceilings: each legacy field is interpreted as a cap
265 // for ONLY its matching channel. The fetch channel has no
266 // pre-existing legacy field; it always uses the controller's
267 // default ceiling.
268 //
269 // The legacy fields are non-zero by ClientConfig::default(), but
270 // we honor them as bounds only when they would actually CONSTRAIN
271 // the controller — i.e. when smaller than the per-channel default
272 // max. A default ClientConfig must not silently lower the
273 // controller's ceilings.
274 // A value equal to the historic legacy default is treated as
275 // "not pinned by the user" — without this, every default
276 // ClientConfig would silently lower the controller's per-channel
277 // ceilings to the prior static values (32/8) and the controller
278 // could never grow above them.
279 let user_quote_max = config.quote_concurrency;
280 let user_store_max = config.store_concurrency;
281 let quote_pinned = user_quote_max > 0 && user_quote_max != DEFAULT_QUOTE_CONCURRENCY;
282 let store_pinned = user_store_max > 0 && user_store_max != DEFAULT_STORE_CONCURRENCY;
283 if quote_pinned && user_quote_max < adaptive_cfg.max.quote {
284 adaptive_cfg.max.quote = user_quote_max;
285 }
286 if store_pinned && user_store_max < adaptive_cfg.max.store {
287 adaptive_cfg.max.store = user_store_max;
288 }
289
290 // Cold-start values: matched to the prior static defaults. If the
291 // legacy field caps the channel below the cold-start, lower the
292 // start to match — never start above the channel's max.
293 let mut start = ChannelStart::default();
294 start.quote = start.quote.min(adaptive_cfg.max.quote);
295 start.store = start.store.min(adaptive_cfg.max.store);
296 start.fetch = start.fetch.min(adaptive_cfg.max.fetch);
297
298 let adaptive_enabled = adaptive_cfg.enabled;
299 let controller = AdaptiveController::new(start, adaptive_cfg);
300 // Skip disk warm-start entirely when adaptation is disabled —
301 // fixed-concurrency mode means the user wants exactly the cold
302 // start, no surprises from prior runs. (warm_start is also a
303 // no-op when disabled, but skipping the load avoids file I/O
304 // and the path-resolution side effects.)
305 let persist_path = if adaptive_enabled {
306 let p = adaptive::default_persist_path();
307 if let Some(ref path) = p {
308 if let Some(snap) = adaptive::load_snapshot(path) {
309 debug!(path = %path.display(), "adaptive: warm-start from disk");
310 controller.warm_start(snap);
311 }
312 }
313 p
314 } else {
315 // Even with adaptation off, persist_path is computed so
316 // explicit save_adaptive_snapshot() calls still work — but
317 // the controller currently never moves, so saving the cold
318 // start is harmless.
319 adaptive::default_persist_path()
320 };
321
322 // File downloads choose a stream-decrypt batch size per download
323 // from the current fetch cap and usable RAM, then pass it into
324 // self_encryption's runtime batch-size API. The adaptive controller
325 // still drives fan-out inside each batch by re-reading
326 // `controller.fetch.current()` in the decrypt callback.
327
328 (controller, persist_path)
329}
330
331/// Client for the Autonomi decentralized network.
332///
333/// Provides high-level APIs for storing and retrieving chunks
334/// and files on the network.
335pub struct Client {
336 config: ClientConfig,
337 network: Network,
338 wallet: Option<Arc<Wallet>>,
339 evm_network: Option<ant_protocol::evm::Network>,
340 chunk_cache: ChunkCache,
341 next_request_id: AtomicU64,
342 /// Adaptive concurrency controller: replaces the static
343 /// quote/store concurrency knobs. See `adaptive` module.
344 controller: AdaptiveController,
345 /// Path the controller persists its snapshot to. `None` disables
346 /// persistence (useful for tests / non-disk environments).
347 persist_path: Option<PathBuf>,
348 /// Path for the persistent client peer cache. `None` disables the cache.
349 peer_cache_path: Option<PathBuf>,
350}
351
352impl Client {
353 /// Create a client connected to the given P2P node.
354 #[must_use]
355 pub fn from_node(node: Arc<P2PNode>, config: ClientConfig) -> Self {
356 Self::from_node_with_peer_cache(node, config, None)
357 }
358
359 /// Create a client connected to the given P2P node and attach an optional
360 /// persistent peer cache path.
361 #[must_use]
362 pub fn from_node_with_peer_cache(
363 node: Arc<P2PNode>,
364 config: ClientConfig,
365 peer_cache_path: Option<PathBuf>,
366 ) -> Self {
367 let network = Network::from_node(node);
368 let (controller, persist_path) = build_controller(&config);
369 Self {
370 config,
371 network,
372 wallet: None,
373 evm_network: None,
374 chunk_cache: ChunkCache::default(),
375 next_request_id: AtomicU64::new(1),
376 controller,
377 persist_path,
378 peer_cache_path,
379 }
380 }
381
382 /// Create a client connected to bootstrap peers.
383 ///
384 /// Threads `config.allow_loopback` and `config.ipv6` through to
385 /// `Network::new`, which controls the saorsa-transport `local` and
386 /// `ipv6` flags on the underlying `CoreNodeConfig`. See
387 /// `ClientConfig::allow_loopback` and `ClientConfig::ipv6` for details.
388 ///
389 /// # Errors
390 ///
391 /// Returns an error if the P2P node cannot be created or bootstrapping fails.
392 pub async fn connect(
393 bootstrap_peers: &[std::net::SocketAddr],
394 config: ClientConfig,
395 ) -> Result<Self> {
396 debug!(
397 "Connecting to Autonomi network with {} bootstrap peers (allow_loopback={}, ipv6={})",
398 bootstrap_peers.len(),
399 config.allow_loopback,
400 config.ipv6,
401 );
402 let network = Network::new(bootstrap_peers, config.allow_loopback, config.ipv6).await?;
403 let (controller, persist_path) = build_controller(&config);
404 Ok(Self {
405 config,
406 network,
407 wallet: None,
408 evm_network: None,
409 chunk_cache: ChunkCache::default(),
410 next_request_id: AtomicU64::new(1),
411 controller,
412 persist_path,
413 peer_cache_path: None,
414 })
415 }
416
417 /// Set the wallet for payment operations.
418 ///
419 /// Also populates the EVM network from the wallet so that
420 /// token approvals work without a separate `with_evm_network` call.
421 #[must_use]
422 pub fn with_wallet(mut self, wallet: Wallet) -> Self {
423 self.evm_network = Some(wallet.network().clone());
424 self.wallet = Some(Arc::new(wallet));
425 self
426 }
427
428 /// Set the EVM network without requiring a wallet.
429 ///
430 /// This enables token approval and contract interactions
431 /// for external-signer flows where the private key lives outside Rust.
432 #[must_use]
433 pub fn with_evm_network(mut self, network: ant_protocol::evm::Network) -> Self {
434 self.evm_network = Some(network);
435 self
436 }
437
438 /// Get the EVM network, falling back to the wallet's network if available.
439 ///
440 /// # Errors
441 ///
442 /// Returns an error if neither `with_evm_network` nor `with_wallet` was called.
443 pub(crate) fn require_evm_network(&self) -> Result<&ant_protocol::evm::Network> {
444 if let Some(ref net) = self.evm_network {
445 return Ok(net);
446 }
447 if let Some(ref wallet) = self.wallet {
448 return Ok(wallet.network());
449 }
450 Err(Error::Payment(
451 "EVM network not configured — call with_evm_network() or with_wallet() first"
452 .to_string(),
453 ))
454 }
455
456 /// Get the client configuration.
457 #[must_use]
458 pub fn config(&self) -> &ClientConfig {
459 &self.config
460 }
461
462 /// Get a mutable reference to the client configuration.
463 pub fn config_mut(&mut self) -> &mut ClientConfig {
464 &mut self.config
465 }
466
467 /// Get a reference to the network layer.
468 #[must_use]
469 pub fn network(&self) -> &Network {
470 &self.network
471 }
472
473 /// Get the wallet, if configured.
474 #[must_use]
475 pub fn wallet(&self) -> Option<&Arc<Wallet>> {
476 self.wallet.as_ref()
477 }
478
479 /// Get a reference to the chunk cache.
480 #[must_use]
481 pub fn chunk_cache(&self) -> &ChunkCache {
482 &self.chunk_cache
483 }
484
485 /// Adaptive concurrency controller. Hot loops read
486 /// `controller().<channel>.current()` to size their fan-out and
487 /// call `.observe(...)` on each completion.
488 #[must_use]
489 pub fn controller(&self) -> &AdaptiveController {
490 &self.controller
491 }
492
493 /// Persist the current adaptive snapshot to disk so the next
494 /// `Client::connect` warm-starts at the learned values instead of
495 /// cold defaults. Best effort — failures log and are discarded.
496 /// Idempotent. Safe to call from a Drop impl or an explicit
497 /// shutdown hook.
498 pub fn save_adaptive_snapshot(&self) {
499 if let Some(ref path) = self.persist_path {
500 adaptive::save_snapshot(path, self.controller.snapshot());
501 }
502 }
503
504 /// Persist currently connected peers that have Direct-tagged addresses in
505 /// the DHT. Best effort; failures are logged and do not affect the client
506 /// operation that just completed.
507 pub async fn save_peer_cache(&self) {
508 if let Some(ref path) = self.peer_cache_path {
509 let node = self.network().node();
510 peer_cache::promote_connected_direct_peers(node.as_ref(), path, node.dht().k_value())
511 .await;
512 }
513 }
514
515 /// Get the next request ID for protocol messages.
516 pub(crate) fn next_request_id(&self) -> u64 {
517 self.next_request_id.fetch_add(1, Ordering::Relaxed)
518 }
519
520 /// Return the chunk PUT-target set: the closest [`PUT_TARGET_WIDTH`] peers
521 /// to the address, each paired with its known network addresses.
522 ///
523 /// Used by the merkle store path, which — unlike single-node payment — has
524 /// no witnessed put-target list to forward, so it fetches the closest-K
525 /// neighbourhood locally.
526 pub(crate) async fn put_target_peers(
527 &self,
528 target: &XorName,
529 ) -> Result<Vec<(PeerId, Vec<MultiAddr>)>> {
530 self.closest_peers(target, PUT_TARGET_WIDTH).await
531 }
532
533 /// Return the requested number of closest peers for a target address.
534 ///
535 /// Queries the DHT for peers by XOR distance. Returns each peer
536 /// paired with its known network addresses.
537 pub(crate) async fn closest_peers(
538 &self,
539 target: &XorName,
540 count: usize,
541 ) -> Result<Vec<(PeerId, Vec<MultiAddr>)>> {
542 let peers = self.network().find_closest_peers(target, count).await?;
543
544 if peers.is_empty() {
545 return Err(Error::InsufficientPeers(
546 "DHT returned no peers for target address".to_string(),
547 ));
548 }
549 Ok(peers)
550 }
551}
552
553/// Persist the adaptive snapshot when the `Client` is dropped, so any
554/// caller — CLI, daemon, library user, integration test — gets
555/// warm-start carry-over for free without remembering to call
556/// `save_adaptive_snapshot()` explicitly. Best effort, sync `std::fs`,
557/// no panic risk on a poisoned mutex (the inner helper handles it).
558///
559/// We deliberately write SYNCHRONOUSLY (not via `spawn_blocking`)
560/// because Drop runs during process shutdown / runtime teardown,
561/// when fire-and-forget background tasks can be dropped before they
562/// complete and the snapshot is silently lost. A small synchronous
563/// stall on a tokio worker (typically <1ms for a local-disk JSON
564/// write of ~50 bytes) is the right tradeoff for guaranteed
565/// persistence — BOUNDED by `DROP_SAVE_TIMEOUT` so a stalled
566/// network-mounted data dir cannot block process shutdown.
567const DROP_SAVE_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(500);
568
569impl Drop for Client {
570 fn drop(&mut self) {
571 let Some(path) = self.persist_path.clone() else {
572 return;
573 };
574 let snap = self.controller.snapshot();
575 adaptive::save_snapshot_with_timeout(path, snap, DROP_SAVE_TIMEOUT);
576 }
577}
578
579#[cfg(test)]
580#[allow(clippy::unwrap_used)]
581mod tests {
582 use super::*;
583
584 /// Cover EVERY variant of `data::error::Error`. Build an instance of
585 /// each, classify it, and assert the resulting `Outcome` matches the
586 /// only sensible mapping. If a future commit adds a new error variant
587 /// without updating `classify_error`, this test fails to ensure the
588 /// adaptive controller always sees correct capacity signals.
589 ///
590 /// Mapping policy (mirrors `classify_error` doc):
591 /// - `Timeout` -> `Outcome::Timeout`
592 /// - `Network`, `InsufficientPeers`, `Io`, `Protocol`, `Storage`,
593 /// `PartialUpload` -> `Outcome::NetworkError` (transport-related
594 /// or literal capacity failure)
595 /// - everything else -> `Outcome::ApplicationError` (would happen
596 /// on a perfectly healthy network)
597 #[test]
598 fn classify_error_covers_all_variants() {
599 let cases: Vec<(Error, Outcome)> = vec![
600 (Error::Timeout("t".to_string()), Outcome::Timeout),
601 (Error::Network("n".to_string()), Outcome::NetworkError),
602 (
603 Error::InsufficientPeers("p".to_string()),
604 Outcome::NetworkError,
605 ),
606 (Error::Storage("s".to_string()), Outcome::NetworkError),
607 (Error::Payment("p".to_string()), Outcome::ApplicationError),
608 (Error::Protocol("p".to_string()), Outcome::NetworkError),
609 (
610 Error::InvalidData("d".to_string()),
611 Outcome::ApplicationError,
612 ),
613 (
614 Error::Serialization("s".to_string()),
615 Outcome::ApplicationError,
616 ),
617 (Error::Crypto("c".to_string()), Outcome::ApplicationError),
618 (
619 Error::Io(std::io::Error::other("io")),
620 Outcome::NetworkError,
621 ),
622 (Error::Config("c".to_string()), Outcome::ApplicationError),
623 (
624 Error::SignatureVerification("s".to_string()),
625 Outcome::ApplicationError,
626 ),
627 (
628 Error::Encryption("e".to_string()),
629 Outcome::ApplicationError,
630 ),
631 (Error::AlreadyStored, Outcome::ApplicationError),
632 (
633 Error::InsufficientDiskSpace("d".to_string()),
634 Outcome::ApplicationError,
635 ),
636 (
637 Error::CostEstimationInconclusive("c".to_string()),
638 Outcome::ApplicationError,
639 ),
640 (
641 Error::PartialUpload {
642 stored: vec![],
643 stored_count: 0,
644 failed: vec![],
645 failed_count: 0,
646 total_chunks: 0,
647 spend: Box::new(crate::data::error::PartialUploadSpend {
648 storage_cost_atto: "0".to_string(),
649 gas_cost_wei: 0,
650 }),
651 reason: "r".to_string(),
652 },
653 Outcome::NetworkError,
654 ),
655 (
656 Error::BadQuoteBinding {
657 peer_id: "peer".to_string(),
658 detail: "mismatch".to_string(),
659 },
660 Outcome::ApplicationError,
661 ),
662 // A remote application rejection: the node responded with a
663 // structured `ProtocolError`, so the transport succeeded and
664 // this must NOT register as a capacity signal (V2-468).
665 (
666 Error::RemotePut {
667 address: "abcd".to_string(),
668 source: ant_protocol::ProtocolError::PaymentFailed("stale quote".to_string()),
669 },
670 Outcome::ApplicationError,
671 ),
672 ];
673 for (err, expected) in &cases {
674 let got = classify_error(err);
675 assert_eq!(
676 got, *expected,
677 "classify_error({err:?}) = {got:?}, expected {expected:?}",
678 );
679 }
680 }
681
682 /// C4 fix guard: pinning the legacy `quote_concurrency` /
683 /// `store_concurrency` ClientConfig fields must clamp ONLY the
684 /// matching channel's max in the resulting controller. The fetch
685 /// (download) channel must keep its full default ceiling.
686 #[test]
687 fn legacy_concurrency_pin_does_not_bleed_across_channels() {
688 let cfg = ClientConfig {
689 quote_concurrency: 4,
690 store_concurrency: 2,
691 ..ClientConfig::default()
692 };
693 let (controller, _) = build_controller(&cfg);
694 // The store/quote caps must be clamped to the user's pin.
695 assert_eq!(controller.config.max.quote, 4, "quote pin not respected");
696 assert_eq!(controller.config.max.store, 2, "store pin not respected");
697 // The fetch cap must NOT have been lowered — that's the
698 // regression C4 was about.
699 let default_fetch_max = adaptive::ChannelMax::default().fetch;
700 assert_eq!(
701 controller.config.max.fetch, default_fetch_max,
702 "fetch cap was lowered by store/quote pin (C4 regression)"
703 );
704 // Cold-start values must respect the lowered ceilings.
705 assert!(
706 controller.quote.current() <= 4,
707 "quote start exceeds its cap"
708 );
709 assert!(
710 controller.store.current() <= 2,
711 "store start exceeds its cap"
712 );
713 }
714
715 /// Default ClientConfig must NOT silently lower the controller's
716 /// per-channel ceilings — the adaptive defaults give every channel
717 /// real headroom to grow. This guards against future commits
718 /// re-introducing a global clamp.
719 #[test]
720 fn default_client_config_does_not_clamp_controller_max() {
721 let cfg = ClientConfig::default();
722 let (controller, _) = build_controller(&cfg);
723 let defaults = adaptive::ChannelMax::default();
724 // The legacy fields default to 32/8 (the prior static knobs),
725 // both of which are <= the per-channel adaptive defaults
726 // (128/64). build_controller must keep the larger, not clobber
727 // with the legacy values.
728 assert_eq!(controller.config.max.quote, defaults.quote);
729 assert_eq!(controller.config.max.store, defaults.store);
730 assert_eq!(controller.config.max.fetch, defaults.fetch);
731 // Compile-time-ish guard: if a new variant is added to Error,
732 // this match forces an update here.
733 let _ = |e: &Error| match e {
734 Error::Timeout(_)
735 | Error::Network(_)
736 | Error::InsufficientPeers(_)
737 | Error::Storage(_)
738 | Error::Payment(_)
739 | Error::Protocol(_)
740 | Error::InvalidData(_)
741 | Error::Serialization(_)
742 | Error::Crypto(_)
743 | Error::Io(_)
744 | Error::Config(_)
745 | Error::SignatureVerification(_)
746 | Error::Encryption(_)
747 | Error::AlreadyStored
748 | Error::InsufficientDiskSpace(_)
749 | Error::CostEstimationInconclusive(_)
750 | Error::Cancelled(_)
751 | Error::PartialUpload { .. }
752 | Error::BadQuoteBinding { .. }
753 | Error::RemotePut { .. } => (),
754 };
755 }
756}