use std::collections::{HashMap, HashSet};
use std::sync::{Arc, RwLock};
use bytes::Bytes;
use chrono::Utc;
use sia_core::rhp4::HostPrices;
use sia_core::signing::{PrivateKey, PublicKey, Signature};
use sia_core::types::{Currency, Hash256};
use super::{Error as RHP4Error, HostEndpoint, Transport};
use crate::time::{Duration, Instant, sleep};
#[derive(Clone)]
pub struct Client {
sectors: Arc<RwLock<HashMap<PublicKey, HashMap<Hash256, Bytes>>>>,
slow_hosts: Arc<RwLock<HashSet<PublicKey>>>,
slow_delay: Arc<RwLock<Duration>>,
read_delays: Arc<RwLock<HashMap<Hash256, Duration>>>,
initial_read_delay: Arc<RwLock<Option<Duration>>>,
}
impl Default for Client {
fn default() -> Self {
Self::new()
}
}
impl Client {
pub fn new() -> Self {
Self {
sectors: Arc::new(RwLock::new(HashMap::new())),
slow_hosts: Arc::new(RwLock::new(HashSet::new())),
slow_delay: Arc::new(RwLock::new(Duration::ZERO)),
read_delays: Arc::new(RwLock::new(HashMap::new())),
initial_read_delay: Arc::new(RwLock::new(None)),
}
}
#[allow(dead_code)] pub fn set_initial_read_delay(&self, delay: Duration) {
*self.initial_read_delay.write().unwrap() = Some(delay);
}
pub fn clear(&self) {
self.sectors.write().unwrap().clear();
}
pub fn set_slow_hosts(&self, hosts: impl IntoIterator<Item = PublicKey>, delay: Duration) {
let mut slow = self.slow_hosts.write().unwrap();
slow.clear();
slow.extend(hosts);
*self.slow_delay.write().unwrap() = delay;
}
pub fn reset_slow_hosts(&self) {
self.slow_hosts.write().unwrap().clear();
*self.slow_delay.write().unwrap() = Duration::ZERO;
}
}
impl Transport for Client {
async fn host_prices(&self, _: &HostEndpoint) -> Result<(HostPrices, Duration), RHP4Error> {
let start = Instant::now();
let prices = HostPrices {
contract_price: Currency::zero(),
collateral: Currency::zero(),
ingress_price: Currency::zero(),
egress_price: Currency::zero(),
storage_price: Currency::zero(),
free_sector_price: Currency::zero(),
tip_height: 1,
signature: Signature::default(),
valid_until: Utc::now() + chrono::Duration::days(1),
};
Ok((prices, start.elapsed()))
}
async fn write_sector(
&self,
host: &HostEndpoint,
_: HostPrices,
_: &PrivateKey,
sector: Bytes,
) -> Result<(Hash256, Duration), RHP4Error> {
if host.addresses.is_empty() {
return Err(RHP4Error::Transport("host has no addresses".to_string()));
}
let start = Instant::now();
let slow_delay = {
let slow_hosts = self.slow_hosts.read().unwrap();
if slow_hosts.contains(&host.public_key) {
Some(*self.slow_delay.read().unwrap())
} else {
None
}
};
if let Some(delay) = slow_delay {
sleep(delay).await;
}
sleep(Duration::from_millis(3)).await; let sector_root = sia_core::rhp4::sector_root(§or);
let mut sectors = self.sectors.write().unwrap();
let host_sectors = sectors.entry(host.public_key).or_default();
host_sectors.insert(sector_root, sector);
if let Some(delay) = *self.initial_read_delay.read().unwrap() {
self.read_delays.write().unwrap().insert(sector_root, delay);
}
Ok((sector_root, start.elapsed()))
}
async fn read_sector(
&self,
host: &HostEndpoint,
_: HostPrices,
_: &PrivateKey,
root: Hash256,
offset: usize,
length: usize,
) -> Result<(Bytes, Duration), RHP4Error> {
if host.addresses.is_empty() {
return Err(RHP4Error::Transport("host has no addresses".to_string()));
}
let start = Instant::now();
let slow_delay = {
let slow_hosts = self.slow_hosts.read().unwrap();
if slow_hosts.contains(&host.public_key) {
Some(*self.slow_delay.read().unwrap())
} else {
None
}
};
if let Some(delay) = slow_delay {
sleep(delay).await;
}
let read_delay = {
let mut delays = self.read_delays.write().unwrap();
delays.get(&root).copied().inspect(|&delay| {
delays.insert(root, delay / 2);
})
};
if let Some(delay) = read_delay {
sleep(delay).await;
}
let sector = {
let sectors = self.sectors.read().unwrap();
let host_sectors = sectors
.get(&host.public_key)
.ok_or_else(|| RHP4Error::Transport("host not found".to_string()))?;
let sector = host_sectors
.get(&root)
.ok_or_else(|| RHP4Error::Transport("sector not found".to_string()))?;
Bytes::copy_from_slice(§or[offset..offset + length])
};
sleep(Duration::from_nanos(sector.len() as u64 * 8 / 10)).await; Ok((sector, start.elapsed()))
}
}