Skip to main content

sia_storage/
hosts.rs

1use std::collections::{HashMap, VecDeque};
2use std::fmt::{Debug, Display};
3use std::sync::{Arc, RwLock};
4
5use chrono::Utc;
6use log::debug;
7use priority_queue::PriorityQueue;
8use serde::{Deserialize, Serialize};
9use sia_core::rhp4::HostPrices;
10use sia_core::signing::{PrivateKey, PublicKey};
11use sia_core::types::Hash256;
12use sia_core::types::v2::NetAddress;
13use std::sync::Mutex;
14use thiserror::Error;
15use tokio::sync::Semaphore;
16use tokio::task::JoinSet;
17
18use crate::rhp4::{HostEndpoint, Transport};
19use crate::time::{Duration, Elapsed, Instant, timeout};
20
21/// Represents a host in the Sia network. The
22/// addresses can be used to connect to the host.
23#[derive(Debug, PartialEq, Serialize, Deserialize)]
24#[serde(rename_all = "camelCase")]
25/// A storage host on the Sia network.
26pub struct Host {
27    /// The host's public key.
28    pub public_key: PublicKey,
29    /// The host's network addresses.
30    pub addresses: Vec<NetAddress>,
31    /// The host's ISO 3166-1 alpha-2 country code.
32    pub country_code: String,
33    /// The host's latitude coordinate.
34    pub latitude: f64,
35    /// The host's longitude coordinate.
36    pub longitude: f64,
37    /// Whether the host is currently suitable for uploading data.
38    pub good_for_upload: bool,
39}
40
41#[derive(Debug, Default, Clone)]
42struct RPCAverage(Option<f64>); // exponential moving average of latency in milliseconds
43
44impl RPCAverage {
45    const ALPHA: f64 = 0.2;
46    fn add_sample(&mut self, sample: Duration) {
47        match self.0 {
48            Some(avg) => {
49                self.0 =
50                    Some(Self::ALPHA * (sample.as_millis() as f64) + (1.0 - Self::ALPHA) * avg);
51            }
52            None => {
53                self.0 = Some(sample.as_millis() as f64);
54            }
55        }
56    }
57
58    fn avg(&self) -> Duration {
59        match self.0 {
60            Some(avg) => Duration::from_millis(avg as u64),
61            None => Duration::from_secs(3600), // 1h if no samples
62        }
63    }
64}
65
66impl Display for RPCAverage {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        self.avg().fmt(f)
69    }
70}
71
72impl PartialEq for RPCAverage {
73    fn eq(&self, other: &Self) -> bool {
74        self.avg() == other.avg()
75    }
76}
77
78impl Eq for RPCAverage {}
79
80impl Ord for RPCAverage {
81    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
82        self.avg().cmp(&other.avg())
83    }
84}
85
86impl PartialOrd for RPCAverage {
87    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
88        Some(self.cmp(other))
89    }
90}
91
92#[derive(Debug, Default, Clone)]
93struct FailureRate(Option<f64>); // exponential moving average of failure rate
94
95impl FailureRate {
96    const ALPHA: f64 = 0.2;
97
98    fn add_sample(&mut self, success: bool) {
99        let sample = if success { 0.0 } else { 1.0 };
100        match self.0 {
101            Some(rate) => {
102                self.0 = Some(Self::ALPHA * sample + (1.0 - Self::ALPHA) * rate);
103            }
104            None => {
105                self.0 = Some(sample);
106            }
107        }
108    }
109
110    // Computes the failure rate as an integer percentage (0-100)
111    fn rate(&self) -> i64 {
112        match self.0 {
113            Some(rate) => (rate * 100.0).round() as i64,
114            None => 0, // presume no failures if no samples
115        }
116    }
117}
118
119impl Display for FailureRate {
120    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121        write!(f, "{}%", self.rate())
122    }
123}
124
125impl PartialEq for FailureRate {
126    fn eq(&self, other: &Self) -> bool {
127        self.rate() == other.rate()
128    }
129}
130
131impl Eq for FailureRate {}
132
133impl PartialOrd for FailureRate {
134    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
135        Some(self.cmp(other))
136    }
137}
138
139impl Ord for FailureRate {
140    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
141        self.rate().cmp(&other.rate())
142    }
143}
144
145#[derive(Debug, Default, Clone, Eq, PartialEq)]
146struct HostMetric {
147    rpc_settings_avg: RPCAverage,
148    rpc_write_avg: RPCAverage,
149    rpc_read_avg: RPCAverage,
150    failure_rate: FailureRate,
151}
152
153impl HostMetric {
154    fn add_write_sample(&mut self, d: Duration) {
155        self.rpc_write_avg.add_sample(d);
156        self.failure_rate.add_sample(true);
157    }
158
159    fn add_read_sample(&mut self, d: Duration) {
160        self.rpc_read_avg.add_sample(d);
161        self.failure_rate.add_sample(true);
162    }
163
164    fn add_settings_sample(&mut self, d: Duration) {
165        self.rpc_settings_avg.add_sample(d);
166        self.failure_rate.add_sample(true);
167    }
168
169    fn add_failure(&mut self) {
170        self.failure_rate.add_sample(false);
171    }
172}
173
174impl Ord for HostMetric {
175    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
176        match other.failure_rate.cmp(&self.failure_rate) {
177            // lower failure rate is higher priority
178            std::cmp::Ordering::Equal => {
179                // use average of read, write, and settings RPC times as tiebreaker
180                let avg_self = (self
181                    .rpc_write_avg
182                    .avg()
183                    .saturating_add(self.rpc_read_avg.avg()))
184                .saturating_add(self.rpc_settings_avg.avg())
185                    / 3;
186                let avg_other = (other
187                    .rpc_write_avg
188                    .avg()
189                    .saturating_add(other.rpc_read_avg.avg()))
190                .saturating_add(other.rpc_settings_avg.avg())
191                    / 3;
192                avg_other.cmp(&avg_self) // lower average latency is higher priority
193            }
194            ord => ord,
195        }
196    }
197}
198
199impl PartialOrd for HostMetric {
200    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
201        Some(self.cmp(other))
202    }
203}
204
205#[derive(Debug)]
206struct HostInfo {
207    addresses: Vec<NetAddress>,
208    good_for_upload: bool,
209}
210
211#[derive(Debug)]
212struct HostList {
213    hosts: RwLock<HashMap<PublicKey, HostInfo>>,
214    preferred_hosts: RwLock<PriorityQueue<PublicKey, HostMetric>>,
215}
216
217impl HostList {
218    fn new() -> Self {
219        Self {
220            preferred_hosts: RwLock::new(PriorityQueue::new()),
221            hosts: RwLock::new(HashMap::new()),
222        }
223    }
224
225    fn addresses(&self, host_key: &PublicKey) -> Option<Vec<NetAddress>> {
226        let hosts = self.hosts.read().unwrap();
227        hosts.get(host_key).map(|h| h.addresses.clone())
228    }
229
230    /// Sorts a list of hosts according to their priority in the client's
231    /// preferred hosts queue. The function `f` is used to extract the
232    /// public key from each item.
233    fn prioritize<H, F>(&self, hosts: &mut [H], f: F)
234    where
235        F: Fn(&H) -> &PublicKey,
236    {
237        let preferred_hosts = self.preferred_hosts.read().unwrap();
238        hosts.sort_by(|a, b| {
239            preferred_hosts
240                .get_priority(f(b))
241                .cmp(&preferred_hosts.get_priority(f(a)))
242        });
243    }
244
245    /// Adds new hosts to the list if they don't already exist.
246    ///
247    /// If `clear` is true, existing hosts not in the new list are removed, but
248    /// their metrics are retained in case they reappear later.
249    fn update(&self, new_hosts: Vec<Host>, clear: bool) {
250        let mut hosts = self.hosts.write().unwrap();
251        if clear {
252            hosts.clear();
253        }
254        let mut priority = self.preferred_hosts.write().unwrap();
255        for host in new_hosts {
256            hosts.insert(
257                host.public_key,
258                HostInfo {
259                    addresses: host.addresses,
260                    good_for_upload: host.good_for_upload,
261                },
262            );
263            if !priority.contains(&host.public_key) {
264                priority.push(host.public_key, HostMetric::default());
265            }
266        }
267    }
268
269    /// Returns the number of known hosts that are good for upload.
270    fn available_for_upload(&self) -> usize {
271        self.hosts
272            .read()
273            .unwrap()
274            .iter()
275            .filter(|(_, h)| h.good_for_upload)
276            .count()
277    }
278
279    /// Creates a queue of hosts that are good to upload to for sequential
280    /// access sorted by priority.
281    fn upload_queue(&self) -> HostQueue {
282        let mut available_hosts = self
283            .hosts
284            .read()
285            .unwrap()
286            .iter()
287            .filter_map(|(hk, h)| h.good_for_upload.then_some(*hk))
288            .collect::<Vec<_>>();
289
290        self.prioritize(&mut available_hosts, |hk| hk);
291        HostQueue::new(available_hosts)
292    }
293
294    /// Adds a failure for the given host, updating its metrics and priority.
295    fn add_failure(&self, host_key: PublicKey) {
296        self.preferred_hosts
297            .write()
298            .unwrap()
299            .change_priority_by(&host_key, |metric| {
300                metric.add_failure();
301            });
302    }
303
304    /// Adds a read sample for the given host, updating its metrics and priority.
305    fn add_read_sample(&self, host_key: PublicKey, duration: Duration) {
306        self.preferred_hosts
307            .write()
308            .unwrap()
309            .change_priority_by(&host_key, |metric| {
310                metric.add_read_sample(duration);
311            });
312    }
313
314    /// Adds a write sample for the given host, updating its metrics and priority.
315    fn add_write_sample(&self, host_key: PublicKey, duration: Duration) {
316        self.preferred_hosts
317            .write()
318            .unwrap()
319            .change_priority_by(&host_key, |metric| {
320                metric.add_write_sample(duration);
321            });
322    }
323
324    fn add_settings_sample(&self, host_key: PublicKey, duration: Duration) {
325        self.preferred_hosts
326            .write()
327            .unwrap()
328            .change_priority_by(&host_key, |metric| {
329                metric.add_settings_sample(duration);
330            });
331    }
332}
333
334#[derive(Debug)]
335struct HostCache<T> {
336    items: RwLock<HashMap<PublicKey, T>>,
337}
338
339impl<T> HostCache<T> {
340    fn new() -> Self {
341        Self {
342            items: RwLock::new(HashMap::new()),
343        }
344    }
345
346    fn get(&self, host_key: &PublicKey) -> Option<T>
347    where
348        T: Clone,
349    {
350        let cache = self.items.read().unwrap();
351        cache.get(host_key).cloned()
352    }
353
354    fn set(&self, host_key: PublicKey, item: T) {
355        let mut cache = self.items.write().unwrap();
356        cache.insert(host_key, item);
357    }
358}
359
360/// Errors that can occur during host RPCs.
361#[derive(Debug, Error)]
362pub enum RPCError {
363    /// The host is not known to the SDK.
364    #[error("unknown host: {0}")]
365    UnknownHost(PublicKey),
366
367    /// An error in the RHP4 protocol.
368    #[error("RHP error: {0}")]
369    Rhp(#[from] crate::rhp4::Error),
370
371    /// The RPC timed out.
372    #[error("RPC time out after {0:?}")]
373    Elapsed(#[from] Elapsed),
374}
375
376/// Manages a list of known hosts and their performance metrics.
377///
378/// It allows updating the list of hosts, recording performance samples,
379/// and prioritizing hosts based on their metrics.
380///
381/// It can be safely shared across threads and cloned.
382///
383/// This is public for criterion benchmarks, but not intended for general use
384#[derive(Clone)]
385pub(crate) struct Hosts<T: Transport> {
386    transport: T,
387    price_cache: Arc<HostCache<HostPrices>>,
388    hosts: Arc<HostList>,
389}
390
391impl<T: Transport> Hosts<T> {
392    pub fn new(transport: T) -> Self {
393        Self {
394            transport,
395            hosts: Arc::new(HostList::new()),
396            price_cache: Arc::new(HostCache::new()),
397        }
398    }
399
400    fn host_endpoint(&self, host_key: PublicKey) -> Result<HostEndpoint, RPCError> {
401        let addresses = self.hosts.addresses(&host_key);
402        match addresses {
403            Some(addresses) => Ok(HostEndpoint {
404                public_key: host_key,
405                addresses,
406            }),
407            None => Err(RPCError::UnknownHost(host_key)),
408        }
409    }
410
411    /// Sorts a list of hosts according to their priority in the client's
412    /// preferred hosts queue. The function `f` is used to extract the
413    /// public key from each item.
414    pub fn prioritize<H, F>(&self, hosts: &mut [H], f: F)
415    where
416        F: Fn(&H) -> &PublicKey,
417    {
418        self.hosts.prioritize(hosts, f)
419    }
420
421    /// Adds new hosts to the list if they don't already exist
422    ///
423    /// If `clear` is true, existing hosts not in the new list are removed, but their metrics are retained.
424    pub fn update(&self, new_hosts: Vec<Host>, clear: bool) {
425        self.hosts.update(new_hosts, clear);
426    }
427
428    /// Returns the number of known hosts that are good for upload.
429    pub fn available_for_upload(&self) -> usize {
430        self.hosts.available_for_upload()
431    }
432
433    /// Creates a queue of hosts that are good to upload to for sequential
434    /// access sorted by priority.
435    pub fn upload_queue(&self) -> HostQueue {
436        self.hosts.upload_queue()
437    }
438
439    /// Adds a failure for the given host, updating its metrics and priority.
440    pub fn add_failure(&self, host_key: PublicKey) {
441        self.hosts.add_failure(host_key);
442    }
443
444    /// Warms connections to the given hosts by prefetching their prices. This can help seed
445    /// the RPC performance metrics for new hosts before they're used for actual uploads
446    /// or downloads.
447    pub async fn warm_connections(&self, hosts: Vec<HostEndpoint>) {
448        let hosts_len = hosts.len();
449        let mut warmed_conns: usize = 0;
450        let mut inflight_scans = JoinSet::new();
451        let sema = Arc::new(Semaphore::new(15));
452        for host in hosts {
453            let transport = self.transport.clone();
454            let price_cache = self.price_cache.clone();
455            let hosts = self.hosts.clone();
456
457            let sema = sema.clone();
458            join_set_spawn!(inflight_scans, async move {
459                let _permit = sema.acquire().await.unwrap();
460                let start = Instant::now();
461
462                match Self::fetch_prices(
463                    transport,
464                    &price_cache,
465                    &hosts,
466                    &host,
467                    Duration::from_secs(1),
468                    false,
469                )
470                .await
471                {
472                    Ok((_, pulled)) if pulled => {
473                        debug!(
474                            "warmed connection to host {} in {:?}",
475                            host.public_key,
476                            start.elapsed()
477                        );
478                        true
479                    }
480                    _ => false,
481                }
482            });
483        }
484
485        while let Some(res) = inflight_scans.join_next().await {
486            if let Ok(warmed) = res
487                && warmed
488            {
489                warmed_conns += 1;
490            }
491        }
492        debug!("warmed {warmed_conns}/{hosts_len} connections");
493    }
494
495    async fn fetch_prices(
496        transport: T,
497        cache: &HostCache<HostPrices>,
498        hosts: &HostList,
499        host_endpoint: &HostEndpoint,
500        fetch_timeout: Duration,
501        refresh: bool,
502    ) -> Result<(HostPrices, bool), RPCError> {
503        if !refresh
504            && let Some(prices) = cache.get(&host_endpoint.public_key)
505            && prices.valid_until > Utc::now()
506        {
507            Ok((prices, false))
508        } else {
509            let start = Instant::now();
510            let prices = timeout(fetch_timeout, transport.host_prices(host_endpoint))
511                .await
512                .inspect_err(|_| hosts.add_failure(host_endpoint.public_key))?
513                .inspect_err(|_| hosts.add_failure(host_endpoint.public_key))
514                .inspect(|_| {
515                    hosts.add_settings_sample(host_endpoint.public_key, start.elapsed())
516                })?;
517            cache.set(host_endpoint.public_key, prices.clone());
518            Ok((prices, true))
519        }
520    }
521
522    pub async fn write_sector(
523        &self,
524        host_key: PublicKey,
525        account_key: &PrivateKey,
526        sector: bytes::Bytes,
527        write_timeout: Duration,
528    ) -> Result<Hash256, RPCError> {
529        let host = self.host_endpoint(host_key)?;
530        let (prices, _) = Self::fetch_prices(
531            self.transport.clone(),
532            &self.price_cache,
533            &self.hosts,
534            &host,
535            Duration::from_secs(1),
536            false,
537        )
538        .await?;
539        let start = Instant::now();
540        timeout(
541            write_timeout,
542            self.transport
543                .write_sector(&host, prices, account_key, sector),
544        )
545        .await
546        .inspect_err(|_| self.hosts.add_failure(host_key))?
547        .inspect_err(|_| self.hosts.add_failure(host_key))
548        .inspect(|_| {
549            self.hosts.add_write_sample(host_key, start.elapsed());
550        })
551        .map_err(RPCError::Rhp)
552    }
553
554    pub async fn read_sector(
555        &self,
556        host_key: PublicKey,
557        account_key: &PrivateKey,
558        root: Hash256,
559        offset: usize,
560        length: usize,
561        read_timeout: Duration,
562    ) -> Result<bytes::Bytes, RPCError> {
563        let host = self.host_endpoint(host_key)?;
564        let (prices, _) = Self::fetch_prices(
565            self.transport.clone(),
566            &self.price_cache,
567            &self.hosts,
568            &host,
569            Duration::from_secs(1),
570            false,
571        )
572        .await?;
573        let start = Instant::now();
574        timeout(
575            read_timeout,
576            self.transport
577                .read_sector(&host, prices, account_key, root, offset, length),
578        )
579        .await
580        .inspect_err(|_| self.hosts.add_failure(host_key))?
581        .inspect_err(|_| self.hosts.add_failure(host_key))
582        .inspect(|_| {
583            self.hosts.add_read_sample(host_key, start.elapsed());
584        })
585        .map_err(RPCError::Rhp)
586    }
587}
588
589/// Errors from the host selection queue.
590#[derive(Debug, Error)]
591pub enum QueueError {
592    /// All available hosts have been tried and failed.
593    #[error("no more hosts available")]
594    NoMoreHosts,
595    /// Not enough hosts are available to meet the required shard count.
596    #[error("not enough initial hosts")]
597    InsufficientHosts,
598    /// The host queue has been closed.
599    #[error("client closed")]
600    Closed,
601    /// An internal mutex was poisoned.
602    #[error("internal mutex error")]
603    MutexError,
604}
605
606#[derive(Debug)]
607struct HostQueueInner {
608    hosts: VecDeque<PublicKey>,
609    attempts: HashMap<PublicKey, usize>,
610}
611
612/// A thread-safe queue of host public keys.
613#[derive(Debug, Clone)]
614pub(crate) struct HostQueue {
615    inner: Arc<Mutex<HostQueueInner>>,
616}
617
618impl Iterator for HostQueue {
619    type Item = PublicKey;
620
621    fn next(&mut self) -> Option<Self::Item> {
622        self.pop_front().ok().map(|(host, _)| host)
623    }
624}
625
626impl HostQueue {
627    pub(crate) fn new(hosts: Vec<PublicKey>) -> Self {
628        Self {
629            inner: Arc::new(Mutex::new(HostQueueInner {
630                hosts: VecDeque::from(hosts),
631                attempts: HashMap::new(),
632            })),
633        }
634    }
635
636    pub fn pop_front(&self) -> Result<(PublicKey, usize), QueueError> {
637        let mut inner = self.inner.lock().map_err(|_| QueueError::MutexError)?;
638        let host_key = inner.hosts.pop_front().ok_or(QueueError::NoMoreHosts)?;
639
640        let attempts = inner.attempts.get(&host_key).cloned().unwrap_or(0);
641        Ok((host_key, attempts + 1))
642    }
643
644    pub fn pop_n(&self, n: usize) -> Result<Vec<(PublicKey, usize)>, QueueError> {
645        let mut inner = self.inner.lock().map_err(|_| QueueError::MutexError)?;
646        if inner.hosts.len() < n {
647            return Err(QueueError::NoMoreHosts);
648        }
649        let mut result = Vec::with_capacity(n);
650        for _ in 0..n {
651            let host_key = inner.hosts.pop_front().ok_or(QueueError::NoMoreHosts)?;
652            let attempts = inner.attempts.get(&host_key).cloned().unwrap_or(0);
653            result.push((host_key, attempts + 1));
654        }
655        Ok(result)
656    }
657
658    pub fn retry(&self, host: PublicKey) -> Result<(), QueueError> {
659        let mut inner = self.inner.lock().map_err(|_| QueueError::MutexError)?;
660        inner.hosts.push_back(host);
661        inner
662            .attempts
663            .entry(host)
664            .and_modify(|e| *e += 1)
665            .or_insert(1);
666        Ok(())
667    }
668}
669
670#[cfg(test)]
671mod test {
672    use sia_core::signing::PrivateKey;
673
674    use crate::mock::MockRHP4Transport;
675
676    use super::*;
677
678    fn random_pubkey() -> sia_core::signing::PublicKey {
679        let mut seed = [0u8; 32];
680        getrandom::fill(&mut seed).unwrap();
681        PrivateKey::from_seed(&seed).public_key()
682    }
683
684    cross_target_tests! {
685    async fn test_failure_rate() {
686        let mut fr = FailureRate::default();
687        assert_eq!(fr.rate(), 0, "initial failure rate should be 0%");
688        fr.add_sample(false);
689        assert_eq!(fr.rate(), 100, "initial failure should be 100%");
690
691        for _ in 0..5 {
692            fr.add_sample(true);
693        }
694        assert!(
695            fr.rate() < 100,
696            "failure rate should decrease after successes"
697        );
698
699        let mut fr2 = FailureRate::default();
700        for _ in 0..5 {
701            fr2.add_sample(true);
702        }
703        assert_eq!(
704            fr2.rate(),
705            0,
706            "failure rate should be 0% after only successes"
707        );
708        assert_eq!(
709            fr.cmp(&fr2),
710            std::cmp::Ordering::Greater,
711            "higher failure rate should be greater"
712        );
713    }
714
715    async fn test_rpc_average() {
716        let mut avg = RPCAverage::default();
717        avg.add_sample(Duration::from_millis(100));
718        assert_eq!(
719            avg.avg(),
720            Duration::from_millis(100),
721            "initial average should be first sample"
722        );
723
724        avg.add_sample(Duration::from_millis(200));
725        assert!(
726            avg.avg() > Duration::from_millis(100),
727            "average should increase after higher sample"
728        );
729
730        avg.add_sample(Duration::from_millis(50));
731        assert!(
732            avg.avg() < Duration::from_millis(200),
733            "average should decrease after lower sample"
734        );
735
736        let mut avg2 = RPCAverage::default();
737        avg2.add_sample(Duration::from_millis(150));
738        assert_eq!(
739            avg.cmp(&avg2),
740            std::cmp::Ordering::Less,
741            "lower average should be lesser"
742        );
743    }
744
745    async fn test_host_metric_ordering() {
746        let mut hosts = vec![
747            HostMetric::default(),
748            HostMetric::default(),
749            HostMetric::default(),
750        ];
751        hosts[0].failure_rate.add_sample(false);
752        hosts[1].failure_rate.add_sample(false);
753        hosts[2].failure_rate.add_sample(false);
754        for _ in 0..10 {
755            hosts[0].failure_rate.add_sample(true);
756        }
757        for _ in 0..5 {
758            hosts[1].failure_rate.add_sample(true);
759        }
760        hosts.sort();
761
762        let rates = hosts
763            .into_iter()
764            .rev()
765            .map(|h| h.failure_rate)
766            .collect::<Vec<FailureRate>>();
767        assert!(
768            rates.is_sorted(),
769            "hosts should be sorted by failure rate desc"
770        );
771
772        let mut hosts = vec![
773            HostMetric::default(),
774            HostMetric::default(),
775            HostMetric::default(),
776        ];
777        hosts[0]
778            .rpc_write_avg
779            .add_sample(Duration::from_millis(100));
780        hosts[1]
781            .rpc_write_avg
782            .add_sample(Duration::from_millis(1000));
783        hosts[2]
784            .rpc_write_avg
785            .add_sample(Duration::from_millis(500));
786        hosts.sort();
787
788        let rates = hosts
789            .into_iter()
790            .rev()
791            .map(|h| h.rpc_write_avg)
792            .collect::<Vec<RPCAverage>>();
793        assert!(
794            rates.is_sorted(),
795            "hosts should be sorted by rpc write avg desc"
796        );
797    }
798
799    async fn test_host_priority_queue() {
800        let mut pq = PriorityQueue::<PublicKey, HostMetric>::new();
801        let mut hosts = vec![];
802        for _ in 0..5 {
803            let pk = random_pubkey();
804            pq.push(pk, HostMetric::default());
805            hosts.push(pk);
806        }
807
808        // initially, the order is the same as insertion
809        assert_eq!(pq.peek().unwrap().0, &hosts[0]);
810
811        // fourth host has a sample, should have highest priority
812        pq.change_priority_by(&hosts[3], |metric| {
813            metric.add_write_sample(Duration::from_millis(100));
814        });
815        assert_eq!(pq.peek().unwrap().0, &hosts[3]);
816
817        // add a faster sample to second host, should have higher priority than fourth
818        pq.change_priority_by(&hosts[1], |metric| {
819            metric.add_read_sample(Duration::from_millis(50));
820        });
821        assert_eq!(pq.peek().unwrap().0, &hosts[1]);
822
823        // add a failure to the second host, should lower its priority below fourth
824        pq.change_priority_by(&hosts[1], |metric| {
825            metric.add_failure();
826        });
827        assert_eq!(pq.peek().unwrap().0, &hosts[3]);
828    }
829
830    async fn test_upload_queue() {
831        let hosts_manager = Hosts::new(MockRHP4Transport::new());
832
833        let hk1 = random_pubkey();
834        let hk2 = random_pubkey();
835        let hk3 = random_pubkey();
836
837        hosts_manager.update(
838            vec![
839                Host {
840                    public_key: hk1,
841                    addresses: vec![],
842                    country_code: String::new(),
843                    latitude: 0.0,
844                    longitude: 0.0,
845                    good_for_upload: false,
846                },
847                Host {
848                    public_key: hk2,
849                    addresses: vec![],
850                    country_code: String::new(),
851                    latitude: 0.0,
852                    longitude: 0.0,
853                    good_for_upload: true,
854                },
855                Host {
856                    public_key: hk3,
857                    addresses: vec![],
858                    country_code: String::new(),
859                    latitude: 0.0,
860                    longitude: 0.0,
861                    good_for_upload: false,
862                },
863            ],
864            true,
865        );
866
867        let queue = hosts_manager.upload_queue();
868        let (first, _) = queue.pop_front().unwrap();
869        assert_eq!(first, hk2);
870        assert!(
871            queue.pop_front().is_err(),
872            "queue should only have one host"
873        );
874    }
875
876    async fn test_host_queue_pop_n() {
877        let hosts: Vec<_> = (0..5)
878            .map(|_| random_pubkey())
879            .collect();
880        let queue = HostQueue::new(hosts.clone());
881
882        // pop 3 hosts
883        let popped = queue.pop_n(3).expect("should pop 3 hosts");
884        assert_eq!(popped.len(), 3);
885        assert_eq!(popped[0].0, hosts[0]);
886        assert_eq!(popped[1].0, hosts[1]);
887        assert_eq!(popped[2].0, hosts[2]);
888
889        // all should have attempts = 1
890        assert!(popped.iter().all(|(_, attempts)| *attempts == 1));
891
892        // pop remaining 2
893        let popped = queue.pop_n(2).expect("should pop 2 hosts");
894        assert_eq!(popped.len(), 2);
895        assert_eq!(popped[0].0, hosts[3]);
896        assert_eq!(popped[1].0, hosts[4]);
897
898        // queue should be empty
899        assert!(matches!(queue.pop_front(), Err(QueueError::NoMoreHosts)));
900    }
901
902    async fn test_host_queue_pop_n_not_enough_hosts() {
903        let hosts: Vec<_> = (0..3)
904            .map(|_| random_pubkey())
905            .collect();
906        let queue = HostQueue::new(hosts.clone());
907
908        // try to pop more than available
909        let result = queue.pop_n(5);
910        assert!(matches!(result, Err(QueueError::NoMoreHosts)));
911
912        // queue should be unchanged - can still pop all 3
913        let popped = queue.pop_n(3).expect("should pop 3");
914        assert_eq!(popped.len(), 3);
915    }
916
917    async fn test_host_queue_pop_n_zero() {
918        let hosts: Vec<_> = (0..3)
919            .map(|_| random_pubkey())
920            .collect();
921        let queue = HostQueue::new(hosts);
922
923        // pop 0 hosts should succeed with empty vec
924        let popped = queue.pop_n(0).expect("should succeed");
925        assert!(popped.is_empty());
926
927        // queue should be unchanged - can still pop 3
928        let popped = queue.pop_n(3).expect("should pop 3");
929        assert_eq!(popped.len(), 3);
930    }
931    }
932}