pub mod adaptive;
pub mod batch;
pub mod cache;
pub mod chunk;
pub mod data;
pub mod file;
pub mod merkle;
pub mod payment;
pub(crate) mod peer_cache;
pub mod quote;
use crate::data::client::adaptive::{AdaptiveConfig, AdaptiveController, ChannelStart, Outcome};
use crate::data::client::cache::ChunkCache;
use crate::data::error::{Error, Result};
use crate::data::network::Network;
use ant_protocol::evm::Wallet;
use ant_protocol::transport::{MultiAddr, P2PNode, PeerId};
use ant_protocol::{XorName, CLOSE_GROUP_SIZE};
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tracing::debug;
pub(crate) fn classify_error(err: &Error) -> Outcome {
match err {
Error::Timeout(_) => Outcome::Timeout,
Error::Network(_)
| Error::InsufficientPeers(_)
| Error::Io(_)
| Error::Protocol(_)
| Error::Storage(_)
| Error::PartialUpload { .. } => Outcome::NetworkError,
Error::AlreadyStored
| Error::Encryption(_)
| Error::Crypto(_)
| Error::Payment(_)
| Error::Serialization(_)
| Error::InvalidData(_)
| Error::SignatureVerification(_)
| Error::Config(_)
| Error::InsufficientDiskSpace(_)
| Error::CostEstimationInconclusive(_) => Outcome::ApplicationError,
}
}
const DEFAULT_QUOTE_TIMEOUT_SECS: u64 = 10;
const DEFAULT_STORE_TIMEOUT_SECS: u64 = 10;
const DEFAULT_QUOTE_CONCURRENCY: usize = 32;
const DEFAULT_STORE_CONCURRENCY: usize = 8;
#[derive(Debug, Clone)]
pub struct ClientConfig {
pub quote_timeout_secs: u64,
pub store_timeout_secs: u64,
pub close_group_size: usize,
pub quote_concurrency: usize,
pub store_concurrency: usize,
pub adaptive: AdaptiveConfig,
pub allow_loopback: bool,
pub ipv6: bool,
}
impl Default for ClientConfig {
fn default() -> Self {
Self {
quote_timeout_secs: DEFAULT_QUOTE_TIMEOUT_SECS,
store_timeout_secs: DEFAULT_STORE_TIMEOUT_SECS,
close_group_size: CLOSE_GROUP_SIZE,
quote_concurrency: DEFAULT_QUOTE_CONCURRENCY,
store_concurrency: DEFAULT_STORE_CONCURRENCY,
adaptive: AdaptiveConfig::default(),
allow_loopback: false,
ipv6: true,
}
}
}
fn build_controller(config: &ClientConfig) -> (AdaptiveController, Option<PathBuf>) {
let mut adaptive_cfg = config.adaptive.clone();
let user_quote_max = config.quote_concurrency;
let user_store_max = config.store_concurrency;
let quote_pinned = user_quote_max > 0 && user_quote_max != DEFAULT_QUOTE_CONCURRENCY;
let store_pinned = user_store_max > 0 && user_store_max != DEFAULT_STORE_CONCURRENCY;
if quote_pinned && user_quote_max < adaptive_cfg.max.quote {
adaptive_cfg.max.quote = user_quote_max;
}
if store_pinned && user_store_max < adaptive_cfg.max.store {
adaptive_cfg.max.store = user_store_max;
}
let mut start = ChannelStart::default();
start.quote = start.quote.min(adaptive_cfg.max.quote);
start.store = start.store.min(adaptive_cfg.max.store);
start.fetch = start.fetch.min(adaptive_cfg.max.fetch);
let adaptive_enabled = adaptive_cfg.enabled;
let controller = AdaptiveController::new(start, adaptive_cfg);
let persist_path = if adaptive_enabled {
let p = adaptive::default_persist_path();
if let Some(ref path) = p {
if let Some(snap) = adaptive::load_snapshot(path) {
debug!(path = %path.display(), "adaptive: warm-start from disk");
controller.warm_start(snap);
}
}
p
} else {
adaptive::default_persist_path()
};
(controller, persist_path)
}
pub struct Client {
config: ClientConfig,
network: Network,
wallet: Option<Arc<Wallet>>,
evm_network: Option<ant_protocol::evm::Network>,
chunk_cache: ChunkCache,
next_request_id: AtomicU64,
controller: AdaptiveController,
persist_path: Option<PathBuf>,
}
impl Client {
#[must_use]
pub fn from_node(node: Arc<P2PNode>, config: ClientConfig) -> Self {
let network = Network::from_node(node);
let (controller, persist_path) = build_controller(&config);
Self {
config,
network,
wallet: None,
evm_network: None,
chunk_cache: ChunkCache::default(),
next_request_id: AtomicU64::new(1),
controller,
persist_path,
}
}
pub async fn connect(
bootstrap_peers: &[std::net::SocketAddr],
config: ClientConfig,
) -> Result<Self> {
debug!(
"Connecting to Autonomi network with {} bootstrap peers (allow_loopback={}, ipv6={})",
bootstrap_peers.len(),
config.allow_loopback,
config.ipv6,
);
let network = Network::new(bootstrap_peers, config.allow_loopback, config.ipv6).await?;
let (controller, persist_path) = build_controller(&config);
Ok(Self {
config,
network,
wallet: None,
evm_network: None,
chunk_cache: ChunkCache::default(),
next_request_id: AtomicU64::new(1),
controller,
persist_path,
})
}
#[must_use]
pub fn with_wallet(mut self, wallet: Wallet) -> Self {
self.evm_network = Some(wallet.network().clone());
self.wallet = Some(Arc::new(wallet));
self
}
#[must_use]
pub fn with_evm_network(mut self, network: ant_protocol::evm::Network) -> Self {
self.evm_network = Some(network);
self
}
pub(crate) fn require_evm_network(&self) -> Result<&ant_protocol::evm::Network> {
if let Some(ref net) = self.evm_network {
return Ok(net);
}
if let Some(ref wallet) = self.wallet {
return Ok(wallet.network());
}
Err(Error::Payment(
"EVM network not configured — call with_evm_network() or with_wallet() first"
.to_string(),
))
}
#[must_use]
pub fn config(&self) -> &ClientConfig {
&self.config
}
pub fn config_mut(&mut self) -> &mut ClientConfig {
&mut self.config
}
#[must_use]
pub fn network(&self) -> &Network {
&self.network
}
#[must_use]
pub fn wallet(&self) -> Option<&Arc<Wallet>> {
self.wallet.as_ref()
}
#[must_use]
pub fn chunk_cache(&self) -> &ChunkCache {
&self.chunk_cache
}
#[must_use]
pub fn controller(&self) -> &AdaptiveController {
&self.controller
}
pub fn save_adaptive_snapshot(&self) {
if let Some(ref path) = self.persist_path {
adaptive::save_snapshot(path, self.controller.snapshot());
}
}
pub(crate) fn next_request_id(&self) -> u64 {
self.next_request_id.fetch_add(1, Ordering::Relaxed)
}
pub(crate) async fn close_group_peers(
&self,
target: &XorName,
) -> Result<Vec<(PeerId, Vec<MultiAddr>)>> {
let peers = self
.network()
.find_closest_peers(target, self.config().close_group_size)
.await?;
if peers.is_empty() {
return Err(Error::InsufficientPeers(
"DHT returned no peers for target address".to_string(),
));
}
Ok(peers)
}
}
const DROP_SAVE_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(500);
impl Drop for Client {
fn drop(&mut self) {
let Some(path) = self.persist_path.clone() else {
return;
};
let snap = self.controller.snapshot();
adaptive::save_snapshot_with_timeout(path, snap, DROP_SAVE_TIMEOUT);
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
#[test]
fn classify_error_covers_all_variants() {
let cases: Vec<(Error, Outcome)> = vec![
(Error::Timeout("t".to_string()), Outcome::Timeout),
(Error::Network("n".to_string()), Outcome::NetworkError),
(
Error::InsufficientPeers("p".to_string()),
Outcome::NetworkError,
),
(Error::Storage("s".to_string()), Outcome::NetworkError),
(Error::Payment("p".to_string()), Outcome::ApplicationError),
(Error::Protocol("p".to_string()), Outcome::NetworkError),
(
Error::InvalidData("d".to_string()),
Outcome::ApplicationError,
),
(
Error::Serialization("s".to_string()),
Outcome::ApplicationError,
),
(Error::Crypto("c".to_string()), Outcome::ApplicationError),
(
Error::Io(std::io::Error::other("io")),
Outcome::NetworkError,
),
(Error::Config("c".to_string()), Outcome::ApplicationError),
(
Error::SignatureVerification("s".to_string()),
Outcome::ApplicationError,
),
(
Error::Encryption("e".to_string()),
Outcome::ApplicationError,
),
(Error::AlreadyStored, Outcome::ApplicationError),
(
Error::InsufficientDiskSpace("d".to_string()),
Outcome::ApplicationError,
),
(
Error::CostEstimationInconclusive("c".to_string()),
Outcome::ApplicationError,
),
(
Error::PartialUpload {
stored: vec![],
stored_count: 0,
failed: vec![],
failed_count: 0,
total_chunks: 0,
reason: "r".to_string(),
},
Outcome::NetworkError,
),
];
for (err, expected) in &cases {
let got = classify_error(err);
assert_eq!(
got, *expected,
"classify_error({err:?}) = {got:?}, expected {expected:?}",
);
}
}
#[test]
fn legacy_concurrency_pin_does_not_bleed_across_channels() {
let cfg = ClientConfig {
quote_concurrency: 4,
store_concurrency: 2,
..ClientConfig::default()
};
let (controller, _) = build_controller(&cfg);
assert_eq!(controller.config.max.quote, 4, "quote pin not respected");
assert_eq!(controller.config.max.store, 2, "store pin not respected");
let default_fetch_max = adaptive::ChannelMax::default().fetch;
assert_eq!(
controller.config.max.fetch, default_fetch_max,
"fetch cap was lowered by store/quote pin (C4 regression)"
);
assert!(
controller.quote.current() <= 4,
"quote start exceeds its cap"
);
assert!(
controller.store.current() <= 2,
"store start exceeds its cap"
);
}
#[test]
fn default_client_config_does_not_clamp_controller_max() {
let cfg = ClientConfig::default();
let (controller, _) = build_controller(&cfg);
let defaults = adaptive::ChannelMax::default();
assert_eq!(controller.config.max.quote, defaults.quote);
assert_eq!(controller.config.max.store, defaults.store);
assert_eq!(controller.config.max.fetch, defaults.fetch);
let _ = |e: &Error| match e {
Error::Timeout(_)
| Error::Network(_)
| Error::InsufficientPeers(_)
| Error::Storage(_)
| Error::Payment(_)
| Error::Protocol(_)
| Error::InvalidData(_)
| Error::Serialization(_)
| Error::Crypto(_)
| Error::Io(_)
| Error::Config(_)
| Error::SignatureVerification(_)
| Error::Encryption(_)
| Error::AlreadyStored
| Error::InsufficientDiskSpace(_)
| Error::CostEstimationInconclusive(_)
| Error::PartialUpload { .. } => (),
};
}
}