1use std::collections::{HashMap, VecDeque};
2use std::fmt::{Debug, Display};
3use std::sync::{Arc, RwLock};
4
5use chrono::Utc;
6use log::debug;
7use priority_queue::PriorityQueue;
8use serde::{Deserialize, Serialize};
9use sia_core::rhp4::HostPrices;
10use sia_core::signing::{PrivateKey, PublicKey};
11use sia_core::types::Hash256;
12use sia_core::types::v2::NetAddress;
13use std::sync::Mutex;
14use thiserror::Error;
15use tokio::sync::Semaphore;
16use tokio::task::JoinSet;
17
18use crate::rhp4::{HostEndpoint, Transport};
19use crate::time::{Duration, Elapsed, Instant, timeout};
20
21#[derive(Debug, PartialEq, Serialize, Deserialize)]
24#[serde(rename_all = "camelCase")]
25pub struct Host {
27 pub public_key: PublicKey,
29 pub addresses: Vec<NetAddress>,
31 pub country_code: String,
33 pub latitude: f64,
35 pub longitude: f64,
37 pub good_for_upload: bool,
39}
40
41#[derive(Debug, Default, Clone)]
42struct RPCAverage(Option<f64>); impl RPCAverage {
45 const ALPHA: f64 = 0.2;
46 fn add_sample(&mut self, sample: Duration) {
47 match self.0 {
48 Some(avg) => {
49 self.0 =
50 Some(Self::ALPHA * (sample.as_millis() as f64) + (1.0 - Self::ALPHA) * avg);
51 }
52 None => {
53 self.0 = Some(sample.as_millis() as f64);
54 }
55 }
56 }
57
58 fn avg(&self) -> Duration {
59 match self.0 {
60 Some(avg) => Duration::from_millis(avg as u64),
61 None => Duration::from_secs(3600), }
63 }
64}
65
66impl Display for RPCAverage {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 self.avg().fmt(f)
69 }
70}
71
72impl PartialEq for RPCAverage {
73 fn eq(&self, other: &Self) -> bool {
74 self.avg() == other.avg()
75 }
76}
77
78impl Eq for RPCAverage {}
79
80impl Ord for RPCAverage {
81 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
82 self.avg().cmp(&other.avg())
83 }
84}
85
86impl PartialOrd for RPCAverage {
87 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
88 Some(self.cmp(other))
89 }
90}
91
92#[derive(Debug, Default, Clone)]
93struct FailureRate(Option<f64>); impl FailureRate {
96 const ALPHA: f64 = 0.2;
97
98 fn add_sample(&mut self, success: bool) {
99 let sample = if success { 0.0 } else { 1.0 };
100 match self.0 {
101 Some(rate) => {
102 self.0 = Some(Self::ALPHA * sample + (1.0 - Self::ALPHA) * rate);
103 }
104 None => {
105 self.0 = Some(sample);
106 }
107 }
108 }
109
110 fn rate(&self) -> i64 {
112 match self.0 {
113 Some(rate) => (rate * 100.0).round() as i64,
114 None => 0, }
116 }
117}
118
119impl Display for FailureRate {
120 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121 write!(f, "{}%", self.rate())
122 }
123}
124
125impl PartialEq for FailureRate {
126 fn eq(&self, other: &Self) -> bool {
127 self.rate() == other.rate()
128 }
129}
130
131impl Eq for FailureRate {}
132
133impl PartialOrd for FailureRate {
134 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
135 Some(self.cmp(other))
136 }
137}
138
139impl Ord for FailureRate {
140 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
141 self.rate().cmp(&other.rate())
142 }
143}
144
145#[derive(Debug, Default, Clone, Eq, PartialEq)]
146struct HostMetric {
147 rpc_settings_avg: RPCAverage,
148 rpc_write_avg: RPCAverage,
149 rpc_read_avg: RPCAverage,
150 failure_rate: FailureRate,
151}
152
153impl HostMetric {
154 fn add_write_sample(&mut self, d: Duration) {
155 self.rpc_write_avg.add_sample(d);
156 self.failure_rate.add_sample(true);
157 }
158
159 fn add_read_sample(&mut self, d: Duration) {
160 self.rpc_read_avg.add_sample(d);
161 self.failure_rate.add_sample(true);
162 }
163
164 fn add_settings_sample(&mut self, d: Duration) {
165 self.rpc_settings_avg.add_sample(d);
166 self.failure_rate.add_sample(true);
167 }
168
169 fn add_failure(&mut self) {
170 self.failure_rate.add_sample(false);
171 }
172}
173
174impl Ord for HostMetric {
175 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
176 match other.failure_rate.cmp(&self.failure_rate) {
177 std::cmp::Ordering::Equal => {
179 let avg_self = (self
181 .rpc_write_avg
182 .avg()
183 .saturating_add(self.rpc_read_avg.avg()))
184 .saturating_add(self.rpc_settings_avg.avg())
185 / 3;
186 let avg_other = (other
187 .rpc_write_avg
188 .avg()
189 .saturating_add(other.rpc_read_avg.avg()))
190 .saturating_add(other.rpc_settings_avg.avg())
191 / 3;
192 avg_other.cmp(&avg_self) }
194 ord => ord,
195 }
196 }
197}
198
199impl PartialOrd for HostMetric {
200 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
201 Some(self.cmp(other))
202 }
203}
204
205#[derive(Debug)]
206struct HostInfo {
207 addresses: Vec<NetAddress>,
208 good_for_upload: bool,
209}
210
211#[derive(Debug)]
212struct HostList {
213 hosts: RwLock<HashMap<PublicKey, HostInfo>>,
214 preferred_hosts: RwLock<PriorityQueue<PublicKey, HostMetric>>,
215}
216
217impl HostList {
218 fn new() -> Self {
219 Self {
220 preferred_hosts: RwLock::new(PriorityQueue::new()),
221 hosts: RwLock::new(HashMap::new()),
222 }
223 }
224
225 fn addresses(&self, host_key: &PublicKey) -> Option<Vec<NetAddress>> {
226 let hosts = self.hosts.read().unwrap();
227 hosts.get(host_key).map(|h| h.addresses.clone())
228 }
229
230 fn prioritize<H, F>(&self, hosts: &mut [H], f: F)
234 where
235 F: Fn(&H) -> &PublicKey,
236 {
237 let preferred_hosts = self.preferred_hosts.read().unwrap();
238 hosts.sort_by(|a, b| {
239 preferred_hosts
240 .get_priority(f(b))
241 .cmp(&preferred_hosts.get_priority(f(a)))
242 });
243 }
244
245 fn update(&self, new_hosts: Vec<Host>, clear: bool) {
250 let mut hosts = self.hosts.write().unwrap();
251 if clear {
252 hosts.clear();
253 }
254 let mut priority = self.preferred_hosts.write().unwrap();
255 for host in new_hosts {
256 hosts.insert(
257 host.public_key,
258 HostInfo {
259 addresses: host.addresses,
260 good_for_upload: host.good_for_upload,
261 },
262 );
263 if !priority.contains(&host.public_key) {
264 priority.push(host.public_key, HostMetric::default());
265 }
266 }
267 }
268
269 fn available_for_upload(&self) -> usize {
271 self.hosts
272 .read()
273 .unwrap()
274 .iter()
275 .filter(|(_, h)| h.good_for_upload)
276 .count()
277 }
278
279 fn upload_queue(&self) -> HostQueue {
282 let mut available_hosts = self
283 .hosts
284 .read()
285 .unwrap()
286 .iter()
287 .filter_map(|(hk, h)| h.good_for_upload.then_some(*hk))
288 .collect::<Vec<_>>();
289
290 self.prioritize(&mut available_hosts, |hk| hk);
291 HostQueue::new(available_hosts)
292 }
293
294 fn add_failure(&self, host_key: PublicKey) {
296 self.preferred_hosts
297 .write()
298 .unwrap()
299 .change_priority_by(&host_key, |metric| {
300 metric.add_failure();
301 });
302 }
303
304 fn add_read_sample(&self, host_key: PublicKey, duration: Duration) {
306 self.preferred_hosts
307 .write()
308 .unwrap()
309 .change_priority_by(&host_key, |metric| {
310 metric.add_read_sample(duration);
311 });
312 }
313
314 fn add_write_sample(&self, host_key: PublicKey, duration: Duration) {
316 self.preferred_hosts
317 .write()
318 .unwrap()
319 .change_priority_by(&host_key, |metric| {
320 metric.add_write_sample(duration);
321 });
322 }
323
324 fn add_settings_sample(&self, host_key: PublicKey, duration: Duration) {
325 self.preferred_hosts
326 .write()
327 .unwrap()
328 .change_priority_by(&host_key, |metric| {
329 metric.add_settings_sample(duration);
330 });
331 }
332}
333
334#[derive(Debug)]
335struct HostCache<T> {
336 items: RwLock<HashMap<PublicKey, T>>,
337}
338
339impl<T> HostCache<T> {
340 fn new() -> Self {
341 Self {
342 items: RwLock::new(HashMap::new()),
343 }
344 }
345
346 fn get(&self, host_key: &PublicKey) -> Option<T>
347 where
348 T: Clone,
349 {
350 let cache = self.items.read().unwrap();
351 cache.get(host_key).cloned()
352 }
353
354 fn set(&self, host_key: PublicKey, item: T) {
355 let mut cache = self.items.write().unwrap();
356 cache.insert(host_key, item);
357 }
358}
359
360#[derive(Debug, Error)]
362pub enum RPCError {
363 #[error("unknown host: {0}")]
365 UnknownHost(PublicKey),
366
367 #[error("RHP error: {0}")]
369 Rhp(#[from] crate::rhp4::Error),
370
371 #[error("RPC time out after {0:?}")]
373 Elapsed(#[from] Elapsed),
374}
375
376#[derive(Clone)]
385pub(crate) struct Hosts<T: Transport> {
386 transport: T,
387 price_cache: Arc<HostCache<HostPrices>>,
388 hosts: Arc<HostList>,
389}
390
391impl<T: Transport> Hosts<T> {
392 pub fn new(transport: T) -> Self {
393 Self {
394 transport,
395 hosts: Arc::new(HostList::new()),
396 price_cache: Arc::new(HostCache::new()),
397 }
398 }
399
400 fn host_endpoint(&self, host_key: PublicKey) -> Result<HostEndpoint, RPCError> {
401 let addresses = self.hosts.addresses(&host_key);
402 match addresses {
403 Some(addresses) => Ok(HostEndpoint {
404 public_key: host_key,
405 addresses,
406 }),
407 None => Err(RPCError::UnknownHost(host_key)),
408 }
409 }
410
411 pub fn prioritize<H, F>(&self, hosts: &mut [H], f: F)
415 where
416 F: Fn(&H) -> &PublicKey,
417 {
418 self.hosts.prioritize(hosts, f)
419 }
420
421 pub fn update(&self, new_hosts: Vec<Host>, clear: bool) {
425 self.hosts.update(new_hosts, clear);
426 }
427
428 pub fn available_for_upload(&self) -> usize {
430 self.hosts.available_for_upload()
431 }
432
433 pub fn upload_queue(&self) -> HostQueue {
436 self.hosts.upload_queue()
437 }
438
439 pub fn add_failure(&self, host_key: PublicKey) {
441 self.hosts.add_failure(host_key);
442 }
443
444 pub async fn warm_connections(&self, hosts: Vec<HostEndpoint>) {
448 let hosts_len = hosts.len();
449 let mut warmed_conns: usize = 0;
450 let mut inflight_scans = JoinSet::new();
451 let sema = Arc::new(Semaphore::new(15));
452 for host in hosts {
453 let transport = self.transport.clone();
454 let price_cache = self.price_cache.clone();
455 let hosts = self.hosts.clone();
456
457 let sema = sema.clone();
458 join_set_spawn!(inflight_scans, async move {
459 let _permit = sema.acquire().await.unwrap();
460 let start = Instant::now();
461
462 match Self::fetch_prices(
463 transport,
464 &price_cache,
465 &hosts,
466 &host,
467 Duration::from_secs(1),
468 false,
469 )
470 .await
471 {
472 Ok((_, pulled)) if pulled => {
473 debug!(
474 "warmed connection to host {} in {:?}",
475 host.public_key,
476 start.elapsed()
477 );
478 true
479 }
480 _ => false,
481 }
482 });
483 }
484
485 while let Some(res) = inflight_scans.join_next().await {
486 if let Ok(warmed) = res
487 && warmed
488 {
489 warmed_conns += 1;
490 }
491 }
492 debug!("warmed {warmed_conns}/{hosts_len} connections");
493 }
494
495 async fn fetch_prices(
496 transport: T,
497 cache: &HostCache<HostPrices>,
498 hosts: &HostList,
499 host_endpoint: &HostEndpoint,
500 fetch_timeout: Duration,
501 refresh: bool,
502 ) -> Result<(HostPrices, bool), RPCError> {
503 if !refresh
504 && let Some(prices) = cache.get(&host_endpoint.public_key)
505 && prices.valid_until > Utc::now()
506 {
507 Ok((prices, false))
508 } else {
509 let start = Instant::now();
510 let prices = timeout(fetch_timeout, transport.host_prices(host_endpoint))
511 .await
512 .inspect_err(|_| hosts.add_failure(host_endpoint.public_key))?
513 .inspect_err(|_| hosts.add_failure(host_endpoint.public_key))
514 .inspect(|_| {
515 hosts.add_settings_sample(host_endpoint.public_key, start.elapsed())
516 })?;
517 cache.set(host_endpoint.public_key, prices.clone());
518 Ok((prices, true))
519 }
520 }
521
522 pub async fn write_sector(
523 &self,
524 host_key: PublicKey,
525 account_key: &PrivateKey,
526 sector: bytes::Bytes,
527 write_timeout: Duration,
528 ) -> Result<Hash256, RPCError> {
529 let host = self.host_endpoint(host_key)?;
530 let (prices, _) = Self::fetch_prices(
531 self.transport.clone(),
532 &self.price_cache,
533 &self.hosts,
534 &host,
535 Duration::from_secs(1),
536 false,
537 )
538 .await?;
539 let start = Instant::now();
540 timeout(
541 write_timeout,
542 self.transport
543 .write_sector(&host, prices, account_key, sector),
544 )
545 .await
546 .inspect_err(|_| self.hosts.add_failure(host_key))?
547 .inspect_err(|_| self.hosts.add_failure(host_key))
548 .inspect(|_| {
549 self.hosts.add_write_sample(host_key, start.elapsed());
550 })
551 .map_err(RPCError::Rhp)
552 }
553
554 pub async fn read_sector(
555 &self,
556 host_key: PublicKey,
557 account_key: &PrivateKey,
558 root: Hash256,
559 offset: usize,
560 length: usize,
561 read_timeout: Duration,
562 ) -> Result<bytes::Bytes, RPCError> {
563 let host = self.host_endpoint(host_key)?;
564 let (prices, _) = Self::fetch_prices(
565 self.transport.clone(),
566 &self.price_cache,
567 &self.hosts,
568 &host,
569 Duration::from_secs(1),
570 false,
571 )
572 .await?;
573 let start = Instant::now();
574 timeout(
575 read_timeout,
576 self.transport
577 .read_sector(&host, prices, account_key, root, offset, length),
578 )
579 .await
580 .inspect_err(|_| self.hosts.add_failure(host_key))?
581 .inspect_err(|_| self.hosts.add_failure(host_key))
582 .inspect(|_| {
583 self.hosts.add_read_sample(host_key, start.elapsed());
584 })
585 .map_err(RPCError::Rhp)
586 }
587}
588
589#[derive(Debug, Error)]
591pub enum QueueError {
592 #[error("no more hosts available")]
594 NoMoreHosts,
595 #[error("not enough initial hosts")]
597 InsufficientHosts,
598 #[error("client closed")]
600 Closed,
601 #[error("internal mutex error")]
603 MutexError,
604}
605
606#[derive(Debug)]
607struct HostQueueInner {
608 hosts: VecDeque<PublicKey>,
609 attempts: HashMap<PublicKey, usize>,
610}
611
612#[derive(Debug, Clone)]
614pub(crate) struct HostQueue {
615 inner: Arc<Mutex<HostQueueInner>>,
616}
617
618impl Iterator for HostQueue {
619 type Item = PublicKey;
620
621 fn next(&mut self) -> Option<Self::Item> {
622 self.pop_front().ok().map(|(host, _)| host)
623 }
624}
625
626impl HostQueue {
627 pub(crate) fn new(hosts: Vec<PublicKey>) -> Self {
628 Self {
629 inner: Arc::new(Mutex::new(HostQueueInner {
630 hosts: VecDeque::from(hosts),
631 attempts: HashMap::new(),
632 })),
633 }
634 }
635
636 pub fn pop_front(&self) -> Result<(PublicKey, usize), QueueError> {
637 let mut inner = self.inner.lock().map_err(|_| QueueError::MutexError)?;
638 let host_key = inner.hosts.pop_front().ok_or(QueueError::NoMoreHosts)?;
639
640 let attempts = inner.attempts.get(&host_key).cloned().unwrap_or(0);
641 Ok((host_key, attempts + 1))
642 }
643
644 pub fn pop_n(&self, n: usize) -> Result<Vec<(PublicKey, usize)>, QueueError> {
645 let mut inner = self.inner.lock().map_err(|_| QueueError::MutexError)?;
646 if inner.hosts.len() < n {
647 return Err(QueueError::NoMoreHosts);
648 }
649 let mut result = Vec::with_capacity(n);
650 for _ in 0..n {
651 let host_key = inner.hosts.pop_front().ok_or(QueueError::NoMoreHosts)?;
652 let attempts = inner.attempts.get(&host_key).cloned().unwrap_or(0);
653 result.push((host_key, attempts + 1));
654 }
655 Ok(result)
656 }
657
658 pub fn retry(&self, host: PublicKey) -> Result<(), QueueError> {
659 let mut inner = self.inner.lock().map_err(|_| QueueError::MutexError)?;
660 inner.hosts.push_back(host);
661 inner
662 .attempts
663 .entry(host)
664 .and_modify(|e| *e += 1)
665 .or_insert(1);
666 Ok(())
667 }
668}
669
670#[cfg(test)]
671mod test {
672 use sia_core::signing::PrivateKey;
673
674 use crate::mock::MockRHP4Transport;
675
676 use super::*;
677
678 fn random_pubkey() -> sia_core::signing::PublicKey {
679 let mut seed = [0u8; 32];
680 getrandom::fill(&mut seed).unwrap();
681 PrivateKey::from_seed(&seed).public_key()
682 }
683
684 cross_target_tests! {
685 async fn test_failure_rate() {
686 let mut fr = FailureRate::default();
687 assert_eq!(fr.rate(), 0, "initial failure rate should be 0%");
688 fr.add_sample(false);
689 assert_eq!(fr.rate(), 100, "initial failure should be 100%");
690
691 for _ in 0..5 {
692 fr.add_sample(true);
693 }
694 assert!(
695 fr.rate() < 100,
696 "failure rate should decrease after successes"
697 );
698
699 let mut fr2 = FailureRate::default();
700 for _ in 0..5 {
701 fr2.add_sample(true);
702 }
703 assert_eq!(
704 fr2.rate(),
705 0,
706 "failure rate should be 0% after only successes"
707 );
708 assert_eq!(
709 fr.cmp(&fr2),
710 std::cmp::Ordering::Greater,
711 "higher failure rate should be greater"
712 );
713 }
714
715 async fn test_rpc_average() {
716 let mut avg = RPCAverage::default();
717 avg.add_sample(Duration::from_millis(100));
718 assert_eq!(
719 avg.avg(),
720 Duration::from_millis(100),
721 "initial average should be first sample"
722 );
723
724 avg.add_sample(Duration::from_millis(200));
725 assert!(
726 avg.avg() > Duration::from_millis(100),
727 "average should increase after higher sample"
728 );
729
730 avg.add_sample(Duration::from_millis(50));
731 assert!(
732 avg.avg() < Duration::from_millis(200),
733 "average should decrease after lower sample"
734 );
735
736 let mut avg2 = RPCAverage::default();
737 avg2.add_sample(Duration::from_millis(150));
738 assert_eq!(
739 avg.cmp(&avg2),
740 std::cmp::Ordering::Less,
741 "lower average should be lesser"
742 );
743 }
744
745 async fn test_host_metric_ordering() {
746 let mut hosts = vec![
747 HostMetric::default(),
748 HostMetric::default(),
749 HostMetric::default(),
750 ];
751 hosts[0].failure_rate.add_sample(false);
752 hosts[1].failure_rate.add_sample(false);
753 hosts[2].failure_rate.add_sample(false);
754 for _ in 0..10 {
755 hosts[0].failure_rate.add_sample(true);
756 }
757 for _ in 0..5 {
758 hosts[1].failure_rate.add_sample(true);
759 }
760 hosts.sort();
761
762 let rates = hosts
763 .into_iter()
764 .rev()
765 .map(|h| h.failure_rate)
766 .collect::<Vec<FailureRate>>();
767 assert!(
768 rates.is_sorted(),
769 "hosts should be sorted by failure rate desc"
770 );
771
772 let mut hosts = vec![
773 HostMetric::default(),
774 HostMetric::default(),
775 HostMetric::default(),
776 ];
777 hosts[0]
778 .rpc_write_avg
779 .add_sample(Duration::from_millis(100));
780 hosts[1]
781 .rpc_write_avg
782 .add_sample(Duration::from_millis(1000));
783 hosts[2]
784 .rpc_write_avg
785 .add_sample(Duration::from_millis(500));
786 hosts.sort();
787
788 let rates = hosts
789 .into_iter()
790 .rev()
791 .map(|h| h.rpc_write_avg)
792 .collect::<Vec<RPCAverage>>();
793 assert!(
794 rates.is_sorted(),
795 "hosts should be sorted by rpc write avg desc"
796 );
797 }
798
799 async fn test_host_priority_queue() {
800 let mut pq = PriorityQueue::<PublicKey, HostMetric>::new();
801 let mut hosts = vec![];
802 for _ in 0..5 {
803 let pk = random_pubkey();
804 pq.push(pk, HostMetric::default());
805 hosts.push(pk);
806 }
807
808 assert_eq!(pq.peek().unwrap().0, &hosts[0]);
810
811 pq.change_priority_by(&hosts[3], |metric| {
813 metric.add_write_sample(Duration::from_millis(100));
814 });
815 assert_eq!(pq.peek().unwrap().0, &hosts[3]);
816
817 pq.change_priority_by(&hosts[1], |metric| {
819 metric.add_read_sample(Duration::from_millis(50));
820 });
821 assert_eq!(pq.peek().unwrap().0, &hosts[1]);
822
823 pq.change_priority_by(&hosts[1], |metric| {
825 metric.add_failure();
826 });
827 assert_eq!(pq.peek().unwrap().0, &hosts[3]);
828 }
829
830 async fn test_upload_queue() {
831 let hosts_manager = Hosts::new(MockRHP4Transport::new());
832
833 let hk1 = random_pubkey();
834 let hk2 = random_pubkey();
835 let hk3 = random_pubkey();
836
837 hosts_manager.update(
838 vec![
839 Host {
840 public_key: hk1,
841 addresses: vec![],
842 country_code: String::new(),
843 latitude: 0.0,
844 longitude: 0.0,
845 good_for_upload: false,
846 },
847 Host {
848 public_key: hk2,
849 addresses: vec![],
850 country_code: String::new(),
851 latitude: 0.0,
852 longitude: 0.0,
853 good_for_upload: true,
854 },
855 Host {
856 public_key: hk3,
857 addresses: vec![],
858 country_code: String::new(),
859 latitude: 0.0,
860 longitude: 0.0,
861 good_for_upload: false,
862 },
863 ],
864 true,
865 );
866
867 let queue = hosts_manager.upload_queue();
868 let (first, _) = queue.pop_front().unwrap();
869 assert_eq!(first, hk2);
870 assert!(
871 queue.pop_front().is_err(),
872 "queue should only have one host"
873 );
874 }
875
876 async fn test_host_queue_pop_n() {
877 let hosts: Vec<_> = (0..5)
878 .map(|_| random_pubkey())
879 .collect();
880 let queue = HostQueue::new(hosts.clone());
881
882 let popped = queue.pop_n(3).expect("should pop 3 hosts");
884 assert_eq!(popped.len(), 3);
885 assert_eq!(popped[0].0, hosts[0]);
886 assert_eq!(popped[1].0, hosts[1]);
887 assert_eq!(popped[2].0, hosts[2]);
888
889 assert!(popped.iter().all(|(_, attempts)| *attempts == 1));
891
892 let popped = queue.pop_n(2).expect("should pop 2 hosts");
894 assert_eq!(popped.len(), 2);
895 assert_eq!(popped[0].0, hosts[3]);
896 assert_eq!(popped[1].0, hosts[4]);
897
898 assert!(matches!(queue.pop_front(), Err(QueueError::NoMoreHosts)));
900 }
901
902 async fn test_host_queue_pop_n_not_enough_hosts() {
903 let hosts: Vec<_> = (0..3)
904 .map(|_| random_pubkey())
905 .collect();
906 let queue = HostQueue::new(hosts.clone());
907
908 let result = queue.pop_n(5);
910 assert!(matches!(result, Err(QueueError::NoMoreHosts)));
911
912 let popped = queue.pop_n(3).expect("should pop 3");
914 assert_eq!(popped.len(), 3);
915 }
916
917 async fn test_host_queue_pop_n_zero() {
918 let hosts: Vec<_> = (0..3)
919 .map(|_| random_pubkey())
920 .collect();
921 let queue = HostQueue::new(hosts);
922
923 let popped = queue.pop_n(0).expect("should succeed");
925 assert!(popped.is_empty());
926
927 let popped = queue.pop_n(3).expect("should pop 3");
929 assert_eq!(popped.len(), 3);
930 }
931 }
932}