1use std::{
3 collections::{HashMap, HashSet},
4 fmt::Debug,
5 future::{Future, IntoFuture},
6 io,
7 ops::Deref,
8 sync::Arc,
9 time::{Duration, SystemTime},
10};
11
12use anyhow::bail;
13use genawaiter::sync::Gen;
14use iroh::{endpoint::Connection, Endpoint, NodeId};
15use irpc::{channel::mpsc, rpc_requests};
16use n0_future::{future, stream, BufferedStreamExt, Stream, StreamExt};
17use rand::seq::SliceRandom;
18use serde::{de::Error, Deserialize, Serialize};
19use tokio::{sync::Mutex, task::JoinSet};
20use tokio_util::time::FutureExt;
21use tracing::{info, instrument::Instrument, warn};
22
23use super::{remote::GetConnection, Store};
24use crate::{
25 protocol::{GetManyRequest, GetRequest},
26 util::sink::{Drain, IrpcSenderRefSink, Sink, TokioMpscSenderSink},
27 BlobFormat, Hash, HashAndFormat,
28};
29
30#[derive(Debug, Clone)]
31pub struct Downloader {
32 client: irpc::Client<SwarmProtocol>,
33}
34
35#[rpc_requests(message = SwarmMsg, alias = "Msg")]
36#[derive(Debug, Serialize, Deserialize)]
37enum SwarmProtocol {
38 #[rpc(tx = mpsc::Sender<DownloadProgessItem>)]
39 Download(DownloadRequest),
40}
41
42struct DownloaderActor {
43 store: Store,
44 pool: ConnectionPool,
45 tasks: JoinSet<()>,
46 running: HashSet<tokio::task::Id>,
47}
48
49#[derive(Debug, Serialize, Deserialize)]
50pub enum DownloadProgessItem {
51 #[serde(skip)]
52 Error(anyhow::Error),
53 TryProvider {
54 id: NodeId,
55 request: Arc<GetRequest>,
56 },
57 ProviderFailed {
58 id: NodeId,
59 request: Arc<GetRequest>,
60 },
61 PartComplete {
62 request: Arc<GetRequest>,
63 },
64 Progress(u64),
65 DownloadError,
66}
67
68impl DownloaderActor {
69 fn new(store: Store, endpoint: Endpoint) -> Self {
70 Self {
71 store,
72 pool: ConnectionPool::new(endpoint, crate::ALPN.to_vec()),
73 tasks: JoinSet::new(),
74 running: HashSet::new(),
75 }
76 }
77
78 async fn run(mut self, mut rx: tokio::sync::mpsc::Receiver<SwarmMsg>) {
79 while let Some(msg) = rx.recv().await {
80 match msg {
81 SwarmMsg::Download(request) => {
82 self.spawn(handle_download(
83 self.store.clone(),
84 self.pool.clone(),
85 request,
86 ));
87 }
88 }
89 }
90 }
91
92 fn spawn(&mut self, fut: impl Future<Output = ()> + Send + 'static) {
93 let span = tracing::Span::current();
94 let id = self.tasks.spawn(fut.instrument(span)).id();
95 self.running.insert(id);
96 }
97}
98
99async fn handle_download(store: Store, pool: ConnectionPool, msg: DownloadMsg) {
100 let DownloadMsg { inner, mut tx, .. } = msg;
101 if let Err(cause) = handle_download_impl(store, pool, inner, &mut tx).await {
102 tx.send(DownloadProgessItem::Error(cause)).await.ok();
103 }
104}
105
106async fn handle_download_impl(
107 store: Store,
108 pool: ConnectionPool,
109 request: DownloadRequest,
110 tx: &mut mpsc::Sender<DownloadProgessItem>,
111) -> anyhow::Result<()> {
112 match request.strategy {
113 SplitStrategy::Split => handle_download_split_impl(store, pool, request, tx).await?,
114 SplitStrategy::None => match request.request {
115 FiniteRequest::Get(get) => {
116 let sink = IrpcSenderRefSink(tx).with_map_err(io::Error::other);
117 execute_get(&pool, Arc::new(get), &request.providers, &store, sink).await?;
118 }
119 FiniteRequest::GetMany(_) => {
120 handle_download_split_impl(store, pool, request, tx).await?
121 }
122 },
123 }
124 Ok(())
125}
126
127async fn handle_download_split_impl(
128 store: Store,
129 pool: ConnectionPool,
130 request: DownloadRequest,
131 tx: &mut mpsc::Sender<DownloadProgessItem>,
132) -> anyhow::Result<()> {
133 let providers = request.providers;
134 let requests = split_request(&request.request, &providers, &pool, &store, Drain).await?;
135 let (progress_tx, progress_rx) = tokio::sync::mpsc::channel(32);
136 let mut futs = stream::iter(requests.into_iter().enumerate())
137 .map(|(id, request)| {
138 let pool = pool.clone();
139 let providers = providers.clone();
140 let store = store.clone();
141 let progress_tx = progress_tx.clone();
142 async move {
143 let hash = request.hash;
144 let (tx, rx) = tokio::sync::mpsc::channel::<(usize, DownloadProgessItem)>(16);
145 progress_tx.send(rx).await.ok();
146 let sink = TokioMpscSenderSink(tx)
147 .with_map_err(io::Error::other)
148 .with_map(move |x| (id, x));
149 let res = execute_get(&pool, Arc::new(request), &providers, &store, sink).await;
150 (hash, res)
151 }
152 })
153 .buffered_unordered(32);
154 let mut progress_stream = {
155 let mut offsets = HashMap::new();
156 let mut total = 0;
157 into_stream(progress_rx)
158 .flat_map(into_stream)
159 .map(move |(id, item)| match item {
160 DownloadProgessItem::Progress(offset) => {
161 total += offset;
162 if let Some(prev) = offsets.insert(id, offset) {
163 total -= prev;
164 }
165 DownloadProgessItem::Progress(total)
166 }
167 x => x,
168 })
169 };
170 loop {
171 tokio::select! {
172 Some(item) = progress_stream.next() => {
173 tx.send(item).await?;
174 },
175 res = futs.next() => {
176 match res {
177 Some((_hash, Ok(()))) => {
178 }
179 Some((_hash, Err(_e))) => {
180 tx.send(DownloadProgessItem::DownloadError).await?;
181 }
182 None => break,
183 }
184 }
185 _ = tx.closed() => {
186 break;
188 }
189 }
190 }
191 Ok(())
192}
193
194fn into_stream<T>(mut recv: tokio::sync::mpsc::Receiver<T>) -> impl Stream<Item = T> {
195 Gen::new(|co| async move {
196 while let Some(item) = recv.recv().await {
197 co.yield_(item).await;
198 }
199 })
200}
201
202#[derive(Debug, Serialize, Deserialize, derive_more::From)]
203pub enum FiniteRequest {
204 Get(GetRequest),
205 GetMany(GetManyRequest),
206}
207
208pub trait SupportedRequest {
209 fn into_request(self) -> FiniteRequest;
210}
211
212impl<I: Into<Hash>, T: IntoIterator<Item = I>> SupportedRequest for T {
213 fn into_request(self) -> FiniteRequest {
214 let hashes = self.into_iter().map(Into::into).collect::<GetManyRequest>();
215 FiniteRequest::GetMany(hashes)
216 }
217}
218
219impl SupportedRequest for GetRequest {
220 fn into_request(self) -> FiniteRequest {
221 self.into()
222 }
223}
224
225impl SupportedRequest for GetManyRequest {
226 fn into_request(self) -> FiniteRequest {
227 self.into()
228 }
229}
230
231impl SupportedRequest for Hash {
232 fn into_request(self) -> FiniteRequest {
233 GetRequest::blob(self).into()
234 }
235}
236
237impl SupportedRequest for HashAndFormat {
238 fn into_request(self) -> FiniteRequest {
239 (match self.format {
240 BlobFormat::Raw => GetRequest::blob(self.hash),
241 BlobFormat::HashSeq => GetRequest::all(self.hash),
242 })
243 .into()
244 }
245}
246
247#[derive(Debug, Serialize, Deserialize)]
248pub struct AddProviderRequest {
249 pub hash: Hash,
250 pub providers: Vec<NodeId>,
251}
252
253#[derive(Debug)]
254pub struct DownloadRequest {
255 pub request: FiniteRequest,
256 pub providers: Arc<dyn ContentDiscovery>,
257 pub strategy: SplitStrategy,
258}
259
260impl DownloadRequest {
261 pub fn new(
262 request: impl SupportedRequest,
263 providers: impl ContentDiscovery,
264 strategy: SplitStrategy,
265 ) -> Self {
266 Self {
267 request: request.into_request(),
268 providers: Arc::new(providers),
269 strategy,
270 }
271 }
272}
273
274#[derive(Debug, Serialize, Deserialize)]
275pub enum SplitStrategy {
276 None,
277 Split,
278}
279
280impl Serialize for DownloadRequest {
281 fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
282 where
283 S: serde::Serializer,
284 {
285 Err(serde::ser::Error::custom(
286 "cannot serialize DownloadRequest",
287 ))
288 }
289}
290
291impl<'de> Deserialize<'de> for DownloadRequest {
293 fn deserialize<D>(_deserializer: D) -> Result<Self, D::Error>
294 where
295 D: serde::Deserializer<'de>,
296 {
297 Err(D::Error::custom("cannot deserialize DownloadRequest"))
298 }
299}
300
301pub type DownloadOptions = DownloadRequest;
302
303pub struct DownloadProgress {
304 fut: future::Boxed<irpc::Result<mpsc::Receiver<DownloadProgessItem>>>,
305}
306
307impl DownloadProgress {
308 fn new(fut: future::Boxed<irpc::Result<mpsc::Receiver<DownloadProgessItem>>>) -> Self {
309 Self { fut }
310 }
311
312 pub async fn stream(self) -> irpc::Result<impl Stream<Item = DownloadProgessItem> + Unpin> {
313 let rx = self.fut.await?;
314 Ok(Box::pin(rx.into_stream().map(|item| match item {
315 Ok(item) => item,
316 Err(e) => DownloadProgessItem::Error(e.into()),
317 })))
318 }
319
320 async fn complete(self) -> anyhow::Result<()> {
321 let rx = self.fut.await?;
322 let stream = rx.into_stream();
323 tokio::pin!(stream);
324 while let Some(item) = stream.next().await {
325 match item? {
326 DownloadProgessItem::Error(e) => Err(e)?,
327 DownloadProgessItem::DownloadError => anyhow::bail!("Download error"),
328 _ => {}
329 }
330 }
331 Ok(())
332 }
333}
334
335impl IntoFuture for DownloadProgress {
336 type Output = anyhow::Result<()>;
337 type IntoFuture = future::Boxed<Self::Output>;
338
339 fn into_future(self) -> Self::IntoFuture {
340 Box::pin(self.complete())
341 }
342}
343
344impl Downloader {
345 pub fn new(store: &Store, endpoint: &Endpoint) -> Self {
346 let (tx, rx) = tokio::sync::mpsc::channel::<SwarmMsg>(32);
347 let actor = DownloaderActor::new(store.clone(), endpoint.clone());
348 tokio::spawn(actor.run(rx));
349 Self { client: tx.into() }
350 }
351
352 pub fn download(
353 &self,
354 request: impl SupportedRequest,
355 providers: impl ContentDiscovery,
356 ) -> DownloadProgress {
357 let request = request.into_request();
358 let providers = Arc::new(providers);
359 self.download_with_opts(DownloadOptions {
360 request,
361 providers,
362 strategy: SplitStrategy::None,
363 })
364 }
365
366 pub fn download_with_opts(&self, options: DownloadOptions) -> DownloadProgress {
367 let fut = self.client.server_streaming(options, 32);
368 DownloadProgress::new(Box::pin(fut))
369 }
370}
371
372async fn split_request<'a>(
374 request: &'a FiniteRequest,
375 providers: &Arc<dyn ContentDiscovery>,
376 pool: &ConnectionPool,
377 store: &Store,
378 progress: impl Sink<DownloadProgessItem, Error = io::Error>,
379) -> anyhow::Result<Box<dyn Iterator<Item = GetRequest> + Send + 'a>> {
380 Ok(match request {
381 FiniteRequest::Get(req) => {
382 let Some(_first) = req.ranges.iter_infinite().next() else {
383 return Ok(Box::new(std::iter::empty()));
384 };
385 let first = GetRequest::blob(req.hash);
386 execute_get(pool, Arc::new(first), providers, store, progress).await?;
387 let size = store.observe(req.hash).await?.size();
388 anyhow::ensure!(size % 32 == 0, "Size is not a multiple of 32");
389 let n = size / 32;
390 Box::new(
391 req.ranges
392 .iter_infinite()
393 .take(n as usize + 1)
394 .enumerate()
395 .filter_map(|(i, ranges)| {
396 if i != 0 && !ranges.is_empty() {
397 Some(
398 GetRequest::builder()
399 .offset(i as u64, ranges.clone())
400 .build(req.hash),
401 )
402 } else {
403 None
404 }
405 }),
406 )
407 }
408 FiniteRequest::GetMany(req) => Box::new(
409 req.hashes
410 .iter()
411 .enumerate()
412 .map(|(i, hash)| GetRequest::blob_ranges(*hash, req.ranges[i as u64].clone())),
413 ),
414 })
415}
416
417#[derive(Debug)]
418struct ConnectionPoolInner {
419 alpn: Vec<u8>,
420 endpoint: Endpoint,
421 connections: Mutex<HashMap<NodeId, Arc<Mutex<SlotState>>>>,
422 retry_delay: Duration,
423 connect_timeout: Duration,
424}
425
426#[derive(Debug, Clone)]
427struct ConnectionPool(Arc<ConnectionPoolInner>);
428
429#[derive(Debug, Default)]
430enum SlotState {
431 #[default]
432 Initial,
433 Connected(Connection),
434 AttemptFailed(SystemTime),
435 #[allow(dead_code)]
436 Evil(String),
437}
438
439impl ConnectionPool {
440 fn new(endpoint: Endpoint, alpn: Vec<u8>) -> Self {
441 Self(
442 ConnectionPoolInner {
443 endpoint,
444 alpn,
445 connections: Default::default(),
446 retry_delay: Duration::from_secs(5),
447 connect_timeout: Duration::from_secs(2),
448 }
449 .into(),
450 )
451 }
452
453 pub fn alpn(&self) -> &[u8] {
454 &self.0.alpn
455 }
456
457 pub fn endpoint(&self) -> &Endpoint {
458 &self.0.endpoint
459 }
460
461 pub fn retry_delay(&self) -> Duration {
462 self.0.retry_delay
463 }
464
465 fn dial(&self, id: NodeId) -> DialNode {
466 DialNode {
467 pool: self.clone(),
468 id,
469 }
470 }
471
472 #[allow(dead_code)]
473 async fn mark_evil(&self, id: NodeId, reason: String) {
474 let slot = self
475 .0
476 .connections
477 .lock()
478 .await
479 .entry(id)
480 .or_default()
481 .clone();
482 let mut t = slot.lock().await;
483 *t = SlotState::Evil(reason)
484 }
485
486 #[allow(dead_code)]
487 async fn mark_closed(&self, id: NodeId) {
488 let slot = self
489 .0
490 .connections
491 .lock()
492 .await
493 .entry(id)
494 .or_default()
495 .clone();
496 let mut t = slot.lock().await;
497 *t = SlotState::Initial
498 }
499}
500
501async fn execute_get(
514 pool: &ConnectionPool,
515 request: Arc<GetRequest>,
516 providers: &Arc<dyn ContentDiscovery>,
517 store: &Store,
518 mut progress: impl Sink<DownloadProgessItem, Error = io::Error>,
519) -> anyhow::Result<()> {
520 let remote = store.remote();
521 let mut providers = providers.find_providers(request.content());
522 while let Some(provider) = providers.next().await {
523 progress
524 .send(DownloadProgessItem::TryProvider {
525 id: provider,
526 request: request.clone(),
527 })
528 .await?;
529 let mut conn = pool.dial(provider);
530 let local = remote.local_for_request(request.clone()).await?;
531 if local.is_complete() {
532 return Ok(());
533 }
534 let local_bytes = local.local_bytes();
535 let Ok(conn) = conn.connection().await else {
536 progress
537 .send(DownloadProgessItem::ProviderFailed {
538 id: provider,
539 request: request.clone(),
540 })
541 .await?;
542 continue;
543 };
544 match remote
545 .execute_get_sink(
546 conn,
547 local.missing(),
548 (&mut progress).with_map(move |x| DownloadProgessItem::Progress(x + local_bytes)),
549 )
550 .await
551 {
552 Ok(_stats) => {
553 progress
554 .send(DownloadProgessItem::PartComplete {
555 request: request.clone(),
556 })
557 .await?;
558 return Ok(());
559 }
560 Err(_cause) => {
561 progress
562 .send(DownloadProgessItem::ProviderFailed {
563 id: provider,
564 request: request.clone(),
565 })
566 .await?;
567 continue;
568 }
569 }
570 }
571 bail!("Unable to download {}", request.hash);
572}
573
574#[derive(Debug, Clone)]
575struct DialNode {
576 pool: ConnectionPool,
577 id: NodeId,
578}
579
580impl DialNode {
581 async fn connection_impl(&self) -> anyhow::Result<Connection> {
582 info!("Getting connection for node {}", self.id);
583 let slot = self
584 .pool
585 .0
586 .connections
587 .lock()
588 .await
589 .entry(self.id)
590 .or_default()
591 .clone();
592 info!("Dialing node {}", self.id);
593 let mut guard = slot.lock().await;
594 match guard.deref() {
595 SlotState::Connected(conn) => {
596 return Ok(conn.clone());
597 }
598 SlotState::AttemptFailed(time) => {
599 let elapsed = time.elapsed().unwrap_or_default();
600 if elapsed <= self.pool.retry_delay() {
601 bail!(
602 "Connection attempt failed {} seconds ago",
603 elapsed.as_secs_f64()
604 );
605 }
606 }
607 SlotState::Evil(reason) => {
608 bail!("Node is banned due to evil behavior: {reason}");
609 }
610 SlotState::Initial => {}
611 }
612 let res = self
613 .pool
614 .endpoint()
615 .connect(self.id, self.pool.alpn())
616 .timeout(self.pool.0.connect_timeout)
617 .await;
618 match res {
619 Ok(Ok(conn)) => {
620 info!("Connected to node {}", self.id);
621 *guard = SlotState::Connected(conn.clone());
622 Ok(conn)
623 }
624 Ok(Err(e)) => {
625 warn!("Failed to connect to node {}: {}", self.id, e);
626 *guard = SlotState::AttemptFailed(SystemTime::now());
627 Err(e.into())
628 }
629 Err(e) => {
630 warn!("Failed to connect to node {}: {}", self.id, e);
631 *guard = SlotState::AttemptFailed(SystemTime::now());
632 bail!("Failed to connect to node: {}", e);
633 }
634 }
635 }
636}
637
638impl GetConnection for DialNode {
639 fn connection(&mut self) -> impl Future<Output = Result<Connection, anyhow::Error>> + '_ {
640 let this = self.clone();
641 async move { this.connection_impl().await }
642 }
643}
644
645pub trait ContentDiscovery: Debug + Send + Sync + 'static {
647 fn find_providers(&self, hash: HashAndFormat) -> n0_future::stream::Boxed<NodeId>;
648}
649
650impl<C, I> ContentDiscovery for C
651where
652 C: Debug + Clone + IntoIterator<Item = I> + Send + Sync + 'static,
653 C::IntoIter: Send + Sync + 'static,
654 I: Into<NodeId> + Send + Sync + 'static,
655{
656 fn find_providers(&self, _: HashAndFormat) -> n0_future::stream::Boxed<NodeId> {
657 let providers = self.clone();
658 n0_future::stream::iter(providers.into_iter().map(Into::into)).boxed()
659 }
660}
661
662#[derive(derive_more::Debug)]
663pub struct Shuffled {
664 nodes: Vec<NodeId>,
665}
666
667impl Shuffled {
668 pub fn new(nodes: Vec<NodeId>) -> Self {
669 Self { nodes }
670 }
671}
672
673impl ContentDiscovery for Shuffled {
674 fn find_providers(&self, _: HashAndFormat) -> n0_future::stream::Boxed<NodeId> {
675 let mut nodes = self.nodes.clone();
676 nodes.shuffle(&mut rand::thread_rng());
677 n0_future::stream::iter(nodes).boxed()
678 }
679}
680
681#[cfg(test)]
682mod tests {
683 use std::ops::Deref;
684
685 use bao_tree::ChunkRanges;
686 use iroh::Watcher;
687 use n0_future::StreamExt;
688 use testresult::TestResult;
689
690 use crate::{
691 api::{
692 blobs::AddBytesOptions,
693 downloader::{DownloadOptions, Downloader, Shuffled, SplitStrategy},
694 },
695 hashseq::HashSeq,
696 protocol::{GetManyRequest, GetRequest},
697 tests::node_test_setup_fs,
698 };
699
700 #[tokio::test]
701 #[ignore = "todo"]
702 async fn downloader_get_many_smoke() -> TestResult<()> {
703 let testdir = tempfile::tempdir()?;
704 let (r1, store1, _) = node_test_setup_fs(testdir.path().join("a")).await?;
705 let (r2, store2, _) = node_test_setup_fs(testdir.path().join("b")).await?;
706 let (r3, store3, _) = node_test_setup_fs(testdir.path().join("c")).await?;
707 let tt1 = store1.add_slice("hello world").await?;
708 let tt2 = store2.add_slice("hello world 2").await?;
709 let node1_addr = r1.endpoint().node_addr().initialized().await;
710 let node1_id = node1_addr.node_id;
711 let node2_addr = r2.endpoint().node_addr().initialized().await;
712 let node2_id = node2_addr.node_id;
713 let swarm = Downloader::new(&store3, r3.endpoint());
714 r3.endpoint().add_node_addr(node1_addr.clone())?;
715 r3.endpoint().add_node_addr(node2_addr.clone())?;
716 let request = GetManyRequest::builder()
717 .hash(tt1.hash, ChunkRanges::all())
718 .hash(tt2.hash, ChunkRanges::all())
719 .build();
720 let mut progress = swarm
721 .download(request, Shuffled::new(vec![node1_id, node2_id]))
722 .stream()
723 .await?;
724 while let Some(item) = progress.next().await {
725 println!("Got item: {item:?}");
726 }
727 assert_eq!(store3.get_bytes(tt1.hash).await?.deref(), b"hello world");
728 assert_eq!(store3.get_bytes(tt2.hash).await?.deref(), b"hello world 2");
729 Ok(())
730 }
731
732 #[tokio::test]
733 async fn downloader_get_smoke() -> TestResult<()> {
734 let testdir = tempfile::tempdir()?;
736 let (r1, store1, _) = node_test_setup_fs(testdir.path().join("a")).await?;
737 let (r2, store2, _) = node_test_setup_fs(testdir.path().join("b")).await?;
738 let (r3, store3, _) = node_test_setup_fs(testdir.path().join("c")).await?;
739 let tt1 = store1.add_slice(vec![1; 10000000]).await?;
740 let tt2 = store2.add_slice(vec![2; 10000000]).await?;
741 let hs = [tt1.hash, tt2.hash].into_iter().collect::<HashSeq>();
742 let root = store1
743 .add_bytes_with_opts(AddBytesOptions {
744 data: hs.clone().into(),
745 format: crate::BlobFormat::HashSeq,
746 })
747 .await?;
748 let node1_addr = r1.endpoint().node_addr().initialized().await;
749 let node1_id = node1_addr.node_id;
750 let node2_addr = r2.endpoint().node_addr().initialized().await;
751 let node2_id = node2_addr.node_id;
752 let swarm = Downloader::new(&store3, r3.endpoint());
753 r3.endpoint().add_node_addr(node1_addr.clone())?;
754 r3.endpoint().add_node_addr(node2_addr.clone())?;
755 let request = GetRequest::builder()
756 .root(ChunkRanges::all())
757 .next(ChunkRanges::all())
758 .next(ChunkRanges::all())
759 .build(root.hash);
760 if true {
761 let mut progress = swarm
762 .download_with_opts(DownloadOptions::new(
763 request,
764 [node1_id, node2_id],
765 SplitStrategy::Split,
766 ))
767 .stream()
768 .await?;
769 while let Some(item) = progress.next().await {
770 println!("Got item: {item:?}");
771 }
772 }
773 if false {
774 let conn = r3.endpoint().connect(node1_addr, crate::ALPN).await?;
775 let remote = store3.remote();
776 let _rh = remote
777 .execute_get(
778 conn.clone(),
779 GetRequest::builder()
780 .root(ChunkRanges::all())
781 .build(root.hash),
782 )
783 .await?;
784 let h1 = remote.execute_get(
785 conn.clone(),
786 GetRequest::builder()
787 .child(0, ChunkRanges::all())
788 .build(root.hash),
789 );
790 let h2 = remote.execute_get(
791 conn.clone(),
792 GetRequest::builder()
793 .child(1, ChunkRanges::all())
794 .build(root.hash),
795 );
796 h1.await?;
797 h2.await?;
798 }
799 Ok(())
800 }
801
802 #[tokio::test]
803 async fn downloader_get_all() -> TestResult<()> {
804 let testdir = tempfile::tempdir()?;
805 let (r1, store1, _) = node_test_setup_fs(testdir.path().join("a")).await?;
806 let (r2, store2, _) = node_test_setup_fs(testdir.path().join("b")).await?;
807 let (r3, store3, _) = node_test_setup_fs(testdir.path().join("c")).await?;
808 let tt1 = store1.add_slice(vec![1; 10000000]).await?;
809 let tt2 = store2.add_slice(vec![2; 10000000]).await?;
810 let hs = [tt1.hash, tt2.hash].into_iter().collect::<HashSeq>();
811 let root = store1
812 .add_bytes_with_opts(AddBytesOptions {
813 data: hs.clone().into(),
814 format: crate::BlobFormat::HashSeq,
815 })
816 .await?;
817 let node1_addr = r1.endpoint().node_addr().initialized().await;
818 let node1_id = node1_addr.node_id;
819 let node2_addr = r2.endpoint().node_addr().initialized().await;
820 let node2_id = node2_addr.node_id;
821 let swarm = Downloader::new(&store3, r3.endpoint());
822 r3.endpoint().add_node_addr(node1_addr.clone())?;
823 r3.endpoint().add_node_addr(node2_addr.clone())?;
824 let request = GetRequest::all(root.hash);
825 let mut progress = swarm
826 .download_with_opts(DownloadOptions::new(
827 request,
828 [node1_id, node2_id],
829 SplitStrategy::Split,
830 ))
831 .stream()
832 .await?;
833 while let Some(item) = progress.next().await {
834 println!("Got item: {item:?}");
835 }
836 Ok(())
837 }
838}