iroh_blobs/api/
downloader.rs

1//! API for downloads from multiple nodes.
2use std::{
3    collections::{HashMap, HashSet},
4    fmt::Debug,
5    future::{Future, IntoFuture},
6    io,
7    ops::Deref,
8    sync::Arc,
9    time::{Duration, SystemTime},
10};
11
12use anyhow::bail;
13use genawaiter::sync::Gen;
14use iroh::{endpoint::Connection, Endpoint, NodeId};
15use irpc::{channel::mpsc, rpc_requests};
16use n0_future::{future, stream, BufferedStreamExt, Stream, StreamExt};
17use rand::seq::SliceRandom;
18use serde::{de::Error, Deserialize, Serialize};
19use tokio::{sync::Mutex, task::JoinSet};
20use tokio_util::time::FutureExt;
21use tracing::{info, instrument::Instrument, warn};
22
23use super::{remote::GetConnection, Store};
24use crate::{
25    protocol::{GetManyRequest, GetRequest},
26    util::sink::{Drain, IrpcSenderRefSink, Sink, TokioMpscSenderSink},
27    BlobFormat, Hash, HashAndFormat,
28};
29
30#[derive(Debug, Clone)]
31pub struct Downloader {
32    client: irpc::Client<SwarmProtocol>,
33}
34
35#[rpc_requests(message = SwarmMsg, alias = "Msg")]
36#[derive(Debug, Serialize, Deserialize)]
37enum SwarmProtocol {
38    #[rpc(tx = mpsc::Sender<DownloadProgessItem>)]
39    Download(DownloadRequest),
40}
41
42struct DownloaderActor {
43    store: Store,
44    pool: ConnectionPool,
45    tasks: JoinSet<()>,
46    running: HashSet<tokio::task::Id>,
47}
48
49#[derive(Debug, Serialize, Deserialize)]
50pub enum DownloadProgessItem {
51    #[serde(skip)]
52    Error(anyhow::Error),
53    TryProvider {
54        id: NodeId,
55        request: Arc<GetRequest>,
56    },
57    ProviderFailed {
58        id: NodeId,
59        request: Arc<GetRequest>,
60    },
61    PartComplete {
62        request: Arc<GetRequest>,
63    },
64    Progress(u64),
65    DownloadError,
66}
67
68impl DownloaderActor {
69    fn new(store: Store, endpoint: Endpoint) -> Self {
70        Self {
71            store,
72            pool: ConnectionPool::new(endpoint, crate::ALPN.to_vec()),
73            tasks: JoinSet::new(),
74            running: HashSet::new(),
75        }
76    }
77
78    async fn run(mut self, mut rx: tokio::sync::mpsc::Receiver<SwarmMsg>) {
79        while let Some(msg) = rx.recv().await {
80            match msg {
81                SwarmMsg::Download(request) => {
82                    self.spawn(handle_download(
83                        self.store.clone(),
84                        self.pool.clone(),
85                        request,
86                    ));
87                }
88            }
89        }
90    }
91
92    fn spawn(&mut self, fut: impl Future<Output = ()> + Send + 'static) {
93        let span = tracing::Span::current();
94        let id = self.tasks.spawn(fut.instrument(span)).id();
95        self.running.insert(id);
96    }
97}
98
99async fn handle_download(store: Store, pool: ConnectionPool, msg: DownloadMsg) {
100    let DownloadMsg { inner, mut tx, .. } = msg;
101    if let Err(cause) = handle_download_impl(store, pool, inner, &mut tx).await {
102        tx.send(DownloadProgessItem::Error(cause)).await.ok();
103    }
104}
105
106async fn handle_download_impl(
107    store: Store,
108    pool: ConnectionPool,
109    request: DownloadRequest,
110    tx: &mut mpsc::Sender<DownloadProgessItem>,
111) -> anyhow::Result<()> {
112    match request.strategy {
113        SplitStrategy::Split => handle_download_split_impl(store, pool, request, tx).await?,
114        SplitStrategy::None => match request.request {
115            FiniteRequest::Get(get) => {
116                let sink = IrpcSenderRefSink(tx).with_map_err(io::Error::other);
117                execute_get(&pool, Arc::new(get), &request.providers, &store, sink).await?;
118            }
119            FiniteRequest::GetMany(_) => {
120                handle_download_split_impl(store, pool, request, tx).await?
121            }
122        },
123    }
124    Ok(())
125}
126
127async fn handle_download_split_impl(
128    store: Store,
129    pool: ConnectionPool,
130    request: DownloadRequest,
131    tx: &mut mpsc::Sender<DownloadProgessItem>,
132) -> anyhow::Result<()> {
133    let providers = request.providers;
134    let requests = split_request(&request.request, &providers, &pool, &store, Drain).await?;
135    let (progress_tx, progress_rx) = tokio::sync::mpsc::channel(32);
136    let mut futs = stream::iter(requests.into_iter().enumerate())
137        .map(|(id, request)| {
138            let pool = pool.clone();
139            let providers = providers.clone();
140            let store = store.clone();
141            let progress_tx = progress_tx.clone();
142            async move {
143                let hash = request.hash;
144                let (tx, rx) = tokio::sync::mpsc::channel::<(usize, DownloadProgessItem)>(16);
145                progress_tx.send(rx).await.ok();
146                let sink = TokioMpscSenderSink(tx)
147                    .with_map_err(io::Error::other)
148                    .with_map(move |x| (id, x));
149                let res = execute_get(&pool, Arc::new(request), &providers, &store, sink).await;
150                (hash, res)
151            }
152        })
153        .buffered_unordered(32);
154    let mut progress_stream = {
155        let mut offsets = HashMap::new();
156        let mut total = 0;
157        into_stream(progress_rx)
158            .flat_map(into_stream)
159            .map(move |(id, item)| match item {
160                DownloadProgessItem::Progress(offset) => {
161                    total += offset;
162                    if let Some(prev) = offsets.insert(id, offset) {
163                        total -= prev;
164                    }
165                    DownloadProgessItem::Progress(total)
166                }
167                x => x,
168            })
169    };
170    loop {
171        tokio::select! {
172            Some(item) = progress_stream.next() => {
173                tx.send(item).await?;
174            },
175            res = futs.next() => {
176                match res {
177                    Some((_hash, Ok(()))) => {
178                    }
179                    Some((_hash, Err(_e))) => {
180                        tx.send(DownloadProgessItem::DownloadError).await?;
181                    }
182                    None => break,
183                }
184            }
185            _ = tx.closed() => {
186                // The sender has been closed, we should stop processing.
187                break;
188            }
189        }
190    }
191    Ok(())
192}
193
194fn into_stream<T>(mut recv: tokio::sync::mpsc::Receiver<T>) -> impl Stream<Item = T> {
195    Gen::new(|co| async move {
196        while let Some(item) = recv.recv().await {
197            co.yield_(item).await;
198        }
199    })
200}
201
202#[derive(Debug, Serialize, Deserialize, derive_more::From)]
203pub enum FiniteRequest {
204    Get(GetRequest),
205    GetMany(GetManyRequest),
206}
207
208pub trait SupportedRequest {
209    fn into_request(self) -> FiniteRequest;
210}
211
212impl<I: Into<Hash>, T: IntoIterator<Item = I>> SupportedRequest for T {
213    fn into_request(self) -> FiniteRequest {
214        let hashes = self.into_iter().map(Into::into).collect::<GetManyRequest>();
215        FiniteRequest::GetMany(hashes)
216    }
217}
218
219impl SupportedRequest for GetRequest {
220    fn into_request(self) -> FiniteRequest {
221        self.into()
222    }
223}
224
225impl SupportedRequest for GetManyRequest {
226    fn into_request(self) -> FiniteRequest {
227        self.into()
228    }
229}
230
231impl SupportedRequest for Hash {
232    fn into_request(self) -> FiniteRequest {
233        GetRequest::blob(self).into()
234    }
235}
236
237impl SupportedRequest for HashAndFormat {
238    fn into_request(self) -> FiniteRequest {
239        (match self.format {
240            BlobFormat::Raw => GetRequest::blob(self.hash),
241            BlobFormat::HashSeq => GetRequest::all(self.hash),
242        })
243        .into()
244    }
245}
246
247#[derive(Debug, Serialize, Deserialize)]
248pub struct AddProviderRequest {
249    pub hash: Hash,
250    pub providers: Vec<NodeId>,
251}
252
253#[derive(Debug)]
254pub struct DownloadRequest {
255    pub request: FiniteRequest,
256    pub providers: Arc<dyn ContentDiscovery>,
257    pub strategy: SplitStrategy,
258}
259
260impl DownloadRequest {
261    pub fn new(
262        request: impl SupportedRequest,
263        providers: impl ContentDiscovery,
264        strategy: SplitStrategy,
265    ) -> Self {
266        Self {
267            request: request.into_request(),
268            providers: Arc::new(providers),
269            strategy,
270        }
271    }
272}
273
274#[derive(Debug, Serialize, Deserialize)]
275pub enum SplitStrategy {
276    None,
277    Split,
278}
279
280impl Serialize for DownloadRequest {
281    fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
282    where
283        S: serde::Serializer,
284    {
285        Err(serde::ser::Error::custom(
286            "cannot serialize DownloadRequest",
287        ))
288    }
289}
290
291// Implement Deserialize to always fail
292impl<'de> Deserialize<'de> for DownloadRequest {
293    fn deserialize<D>(_deserializer: D) -> Result<Self, D::Error>
294    where
295        D: serde::Deserializer<'de>,
296    {
297        Err(D::Error::custom("cannot deserialize DownloadRequest"))
298    }
299}
300
301pub type DownloadOptions = DownloadRequest;
302
303pub struct DownloadProgress {
304    fut: future::Boxed<irpc::Result<mpsc::Receiver<DownloadProgessItem>>>,
305}
306
307impl DownloadProgress {
308    fn new(fut: future::Boxed<irpc::Result<mpsc::Receiver<DownloadProgessItem>>>) -> Self {
309        Self { fut }
310    }
311
312    pub async fn stream(self) -> irpc::Result<impl Stream<Item = DownloadProgessItem> + Unpin> {
313        let rx = self.fut.await?;
314        Ok(Box::pin(rx.into_stream().map(|item| match item {
315            Ok(item) => item,
316            Err(e) => DownloadProgessItem::Error(e.into()),
317        })))
318    }
319
320    async fn complete(self) -> anyhow::Result<()> {
321        let rx = self.fut.await?;
322        let stream = rx.into_stream();
323        tokio::pin!(stream);
324        while let Some(item) = stream.next().await {
325            match item? {
326                DownloadProgessItem::Error(e) => Err(e)?,
327                DownloadProgessItem::DownloadError => anyhow::bail!("Download error"),
328                _ => {}
329            }
330        }
331        Ok(())
332    }
333}
334
335impl IntoFuture for DownloadProgress {
336    type Output = anyhow::Result<()>;
337    type IntoFuture = future::Boxed<Self::Output>;
338
339    fn into_future(self) -> Self::IntoFuture {
340        Box::pin(self.complete())
341    }
342}
343
344impl Downloader {
345    pub fn new(store: &Store, endpoint: &Endpoint) -> Self {
346        let (tx, rx) = tokio::sync::mpsc::channel::<SwarmMsg>(32);
347        let actor = DownloaderActor::new(store.clone(), endpoint.clone());
348        tokio::spawn(actor.run(rx));
349        Self { client: tx.into() }
350    }
351
352    pub fn download(
353        &self,
354        request: impl SupportedRequest,
355        providers: impl ContentDiscovery,
356    ) -> DownloadProgress {
357        let request = request.into_request();
358        let providers = Arc::new(providers);
359        self.download_with_opts(DownloadOptions {
360            request,
361            providers,
362            strategy: SplitStrategy::None,
363        })
364    }
365
366    pub fn download_with_opts(&self, options: DownloadOptions) -> DownloadProgress {
367        let fut = self.client.server_streaming(options, 32);
368        DownloadProgress::new(Box::pin(fut))
369    }
370}
371
372/// Split a request into multiple requests that can be run in parallel.
373async fn split_request<'a>(
374    request: &'a FiniteRequest,
375    providers: &Arc<dyn ContentDiscovery>,
376    pool: &ConnectionPool,
377    store: &Store,
378    progress: impl Sink<DownloadProgessItem, Error = io::Error>,
379) -> anyhow::Result<Box<dyn Iterator<Item = GetRequest> + Send + 'a>> {
380    Ok(match request {
381        FiniteRequest::Get(req) => {
382            let Some(_first) = req.ranges.iter_infinite().next() else {
383                return Ok(Box::new(std::iter::empty()));
384            };
385            let first = GetRequest::blob(req.hash);
386            execute_get(pool, Arc::new(first), providers, store, progress).await?;
387            let size = store.observe(req.hash).await?.size();
388            anyhow::ensure!(size % 32 == 0, "Size is not a multiple of 32");
389            let n = size / 32;
390            Box::new(
391                req.ranges
392                    .iter_infinite()
393                    .take(n as usize + 1)
394                    .enumerate()
395                    .filter_map(|(i, ranges)| {
396                        if i != 0 && !ranges.is_empty() {
397                            Some(
398                                GetRequest::builder()
399                                    .offset(i as u64, ranges.clone())
400                                    .build(req.hash),
401                            )
402                        } else {
403                            None
404                        }
405                    }),
406            )
407        }
408        FiniteRequest::GetMany(req) => Box::new(
409            req.hashes
410                .iter()
411                .enumerate()
412                .map(|(i, hash)| GetRequest::blob_ranges(*hash, req.ranges[i as u64].clone())),
413        ),
414    })
415}
416
417#[derive(Debug)]
418struct ConnectionPoolInner {
419    alpn: Vec<u8>,
420    endpoint: Endpoint,
421    connections: Mutex<HashMap<NodeId, Arc<Mutex<SlotState>>>>,
422    retry_delay: Duration,
423    connect_timeout: Duration,
424}
425
426#[derive(Debug, Clone)]
427struct ConnectionPool(Arc<ConnectionPoolInner>);
428
429#[derive(Debug, Default)]
430enum SlotState {
431    #[default]
432    Initial,
433    Connected(Connection),
434    AttemptFailed(SystemTime),
435    #[allow(dead_code)]
436    Evil(String),
437}
438
439impl ConnectionPool {
440    fn new(endpoint: Endpoint, alpn: Vec<u8>) -> Self {
441        Self(
442            ConnectionPoolInner {
443                endpoint,
444                alpn,
445                connections: Default::default(),
446                retry_delay: Duration::from_secs(5),
447                connect_timeout: Duration::from_secs(2),
448            }
449            .into(),
450        )
451    }
452
453    pub fn alpn(&self) -> &[u8] {
454        &self.0.alpn
455    }
456
457    pub fn endpoint(&self) -> &Endpoint {
458        &self.0.endpoint
459    }
460
461    pub fn retry_delay(&self) -> Duration {
462        self.0.retry_delay
463    }
464
465    fn dial(&self, id: NodeId) -> DialNode {
466        DialNode {
467            pool: self.clone(),
468            id,
469        }
470    }
471
472    #[allow(dead_code)]
473    async fn mark_evil(&self, id: NodeId, reason: String) {
474        let slot = self
475            .0
476            .connections
477            .lock()
478            .await
479            .entry(id)
480            .or_default()
481            .clone();
482        let mut t = slot.lock().await;
483        *t = SlotState::Evil(reason)
484    }
485
486    #[allow(dead_code)]
487    async fn mark_closed(&self, id: NodeId) {
488        let slot = self
489            .0
490            .connections
491            .lock()
492            .await
493            .entry(id)
494            .or_default()
495            .clone();
496        let mut t = slot.lock().await;
497        *t = SlotState::Initial
498    }
499}
500
501/// Execute a get request sequentially for multiple providers.
502///
503/// It will try each provider in order
504/// until it finds one that can fulfill the request. When trying a new provider,
505/// it takes the progress from the previous providers into account, so e.g.
506/// if the first provider had the first 10% of the data, it will only ask the next
507/// provider for the remaining 90%.
508///
509/// This is fully sequential, so there will only be one request in flight at a time.
510///
511/// If the request is not complete after trying all providers, it will return an error.
512/// If the provider stream never ends, it will try indefinitely.
513async fn execute_get(
514    pool: &ConnectionPool,
515    request: Arc<GetRequest>,
516    providers: &Arc<dyn ContentDiscovery>,
517    store: &Store,
518    mut progress: impl Sink<DownloadProgessItem, Error = io::Error>,
519) -> anyhow::Result<()> {
520    let remote = store.remote();
521    let mut providers = providers.find_providers(request.content());
522    while let Some(provider) = providers.next().await {
523        progress
524            .send(DownloadProgessItem::TryProvider {
525                id: provider,
526                request: request.clone(),
527            })
528            .await?;
529        let mut conn = pool.dial(provider);
530        let local = remote.local_for_request(request.clone()).await?;
531        if local.is_complete() {
532            return Ok(());
533        }
534        let local_bytes = local.local_bytes();
535        let Ok(conn) = conn.connection().await else {
536            progress
537                .send(DownloadProgessItem::ProviderFailed {
538                    id: provider,
539                    request: request.clone(),
540                })
541                .await?;
542            continue;
543        };
544        match remote
545            .execute_get_sink(
546                conn,
547                local.missing(),
548                (&mut progress).with_map(move |x| DownloadProgessItem::Progress(x + local_bytes)),
549            )
550            .await
551        {
552            Ok(_stats) => {
553                progress
554                    .send(DownloadProgessItem::PartComplete {
555                        request: request.clone(),
556                    })
557                    .await?;
558                return Ok(());
559            }
560            Err(_cause) => {
561                progress
562                    .send(DownloadProgessItem::ProviderFailed {
563                        id: provider,
564                        request: request.clone(),
565                    })
566                    .await?;
567                continue;
568            }
569        }
570    }
571    bail!("Unable to download {}", request.hash);
572}
573
574#[derive(Debug, Clone)]
575struct DialNode {
576    pool: ConnectionPool,
577    id: NodeId,
578}
579
580impl DialNode {
581    async fn connection_impl(&self) -> anyhow::Result<Connection> {
582        info!("Getting connection for node {}", self.id);
583        let slot = self
584            .pool
585            .0
586            .connections
587            .lock()
588            .await
589            .entry(self.id)
590            .or_default()
591            .clone();
592        info!("Dialing node {}", self.id);
593        let mut guard = slot.lock().await;
594        match guard.deref() {
595            SlotState::Connected(conn) => {
596                return Ok(conn.clone());
597            }
598            SlotState::AttemptFailed(time) => {
599                let elapsed = time.elapsed().unwrap_or_default();
600                if elapsed <= self.pool.retry_delay() {
601                    bail!(
602                        "Connection attempt failed {} seconds ago",
603                        elapsed.as_secs_f64()
604                    );
605                }
606            }
607            SlotState::Evil(reason) => {
608                bail!("Node is banned due to evil behavior: {reason}");
609            }
610            SlotState::Initial => {}
611        }
612        let res = self
613            .pool
614            .endpoint()
615            .connect(self.id, self.pool.alpn())
616            .timeout(self.pool.0.connect_timeout)
617            .await;
618        match res {
619            Ok(Ok(conn)) => {
620                info!("Connected to node {}", self.id);
621                *guard = SlotState::Connected(conn.clone());
622                Ok(conn)
623            }
624            Ok(Err(e)) => {
625                warn!("Failed to connect to node {}: {}", self.id, e);
626                *guard = SlotState::AttemptFailed(SystemTime::now());
627                Err(e.into())
628            }
629            Err(e) => {
630                warn!("Failed to connect to node {}: {}", self.id, e);
631                *guard = SlotState::AttemptFailed(SystemTime::now());
632                bail!("Failed to connect to node: {}", e);
633            }
634        }
635    }
636}
637
638impl GetConnection for DialNode {
639    fn connection(&mut self) -> impl Future<Output = Result<Connection, anyhow::Error>> + '_ {
640        let this = self.clone();
641        async move { this.connection_impl().await }
642    }
643}
644
645/// Trait for pluggable content discovery strategies.
646pub trait ContentDiscovery: Debug + Send + Sync + 'static {
647    fn find_providers(&self, hash: HashAndFormat) -> n0_future::stream::Boxed<NodeId>;
648}
649
650impl<C, I> ContentDiscovery for C
651where
652    C: Debug + Clone + IntoIterator<Item = I> + Send + Sync + 'static,
653    C::IntoIter: Send + Sync + 'static,
654    I: Into<NodeId> + Send + Sync + 'static,
655{
656    fn find_providers(&self, _: HashAndFormat) -> n0_future::stream::Boxed<NodeId> {
657        let providers = self.clone();
658        n0_future::stream::iter(providers.into_iter().map(Into::into)).boxed()
659    }
660}
661
662#[derive(derive_more::Debug)]
663pub struct Shuffled {
664    nodes: Vec<NodeId>,
665}
666
667impl Shuffled {
668    pub fn new(nodes: Vec<NodeId>) -> Self {
669        Self { nodes }
670    }
671}
672
673impl ContentDiscovery for Shuffled {
674    fn find_providers(&self, _: HashAndFormat) -> n0_future::stream::Boxed<NodeId> {
675        let mut nodes = self.nodes.clone();
676        nodes.shuffle(&mut rand::thread_rng());
677        n0_future::stream::iter(nodes).boxed()
678    }
679}
680
681#[cfg(test)]
682mod tests {
683    use std::ops::Deref;
684
685    use bao_tree::ChunkRanges;
686    use iroh::Watcher;
687    use n0_future::StreamExt;
688    use testresult::TestResult;
689
690    use crate::{
691        api::{
692            blobs::AddBytesOptions,
693            downloader::{DownloadOptions, Downloader, Shuffled, SplitStrategy},
694        },
695        hashseq::HashSeq,
696        protocol::{GetManyRequest, GetRequest},
697        tests::node_test_setup_fs,
698    };
699
700    #[tokio::test]
701    #[ignore = "todo"]
702    async fn downloader_get_many_smoke() -> TestResult<()> {
703        let testdir = tempfile::tempdir()?;
704        let (r1, store1, _) = node_test_setup_fs(testdir.path().join("a")).await?;
705        let (r2, store2, _) = node_test_setup_fs(testdir.path().join("b")).await?;
706        let (r3, store3, _) = node_test_setup_fs(testdir.path().join("c")).await?;
707        let tt1 = store1.add_slice("hello world").await?;
708        let tt2 = store2.add_slice("hello world 2").await?;
709        let node1_addr = r1.endpoint().node_addr().initialized().await;
710        let node1_id = node1_addr.node_id;
711        let node2_addr = r2.endpoint().node_addr().initialized().await;
712        let node2_id = node2_addr.node_id;
713        let swarm = Downloader::new(&store3, r3.endpoint());
714        r3.endpoint().add_node_addr(node1_addr.clone())?;
715        r3.endpoint().add_node_addr(node2_addr.clone())?;
716        let request = GetManyRequest::builder()
717            .hash(tt1.hash, ChunkRanges::all())
718            .hash(tt2.hash, ChunkRanges::all())
719            .build();
720        let mut progress = swarm
721            .download(request, Shuffled::new(vec![node1_id, node2_id]))
722            .stream()
723            .await?;
724        while let Some(item) = progress.next().await {
725            println!("Got item: {item:?}");
726        }
727        assert_eq!(store3.get_bytes(tt1.hash).await?.deref(), b"hello world");
728        assert_eq!(store3.get_bytes(tt2.hash).await?.deref(), b"hello world 2");
729        Ok(())
730    }
731
732    #[tokio::test]
733    async fn downloader_get_smoke() -> TestResult<()> {
734        // tracing_subscriber::fmt::try_init().ok();
735        let testdir = tempfile::tempdir()?;
736        let (r1, store1, _) = node_test_setup_fs(testdir.path().join("a")).await?;
737        let (r2, store2, _) = node_test_setup_fs(testdir.path().join("b")).await?;
738        let (r3, store3, _) = node_test_setup_fs(testdir.path().join("c")).await?;
739        let tt1 = store1.add_slice(vec![1; 10000000]).await?;
740        let tt2 = store2.add_slice(vec![2; 10000000]).await?;
741        let hs = [tt1.hash, tt2.hash].into_iter().collect::<HashSeq>();
742        let root = store1
743            .add_bytes_with_opts(AddBytesOptions {
744                data: hs.clone().into(),
745                format: crate::BlobFormat::HashSeq,
746            })
747            .await?;
748        let node1_addr = r1.endpoint().node_addr().initialized().await;
749        let node1_id = node1_addr.node_id;
750        let node2_addr = r2.endpoint().node_addr().initialized().await;
751        let node2_id = node2_addr.node_id;
752        let swarm = Downloader::new(&store3, r3.endpoint());
753        r3.endpoint().add_node_addr(node1_addr.clone())?;
754        r3.endpoint().add_node_addr(node2_addr.clone())?;
755        let request = GetRequest::builder()
756            .root(ChunkRanges::all())
757            .next(ChunkRanges::all())
758            .next(ChunkRanges::all())
759            .build(root.hash);
760        if true {
761            let mut progress = swarm
762                .download_with_opts(DownloadOptions::new(
763                    request,
764                    [node1_id, node2_id],
765                    SplitStrategy::Split,
766                ))
767                .stream()
768                .await?;
769            while let Some(item) = progress.next().await {
770                println!("Got item: {item:?}");
771            }
772        }
773        if false {
774            let conn = r3.endpoint().connect(node1_addr, crate::ALPN).await?;
775            let remote = store3.remote();
776            let _rh = remote
777                .execute_get(
778                    conn.clone(),
779                    GetRequest::builder()
780                        .root(ChunkRanges::all())
781                        .build(root.hash),
782                )
783                .await?;
784            let h1 = remote.execute_get(
785                conn.clone(),
786                GetRequest::builder()
787                    .child(0, ChunkRanges::all())
788                    .build(root.hash),
789            );
790            let h2 = remote.execute_get(
791                conn.clone(),
792                GetRequest::builder()
793                    .child(1, ChunkRanges::all())
794                    .build(root.hash),
795            );
796            h1.await?;
797            h2.await?;
798        }
799        Ok(())
800    }
801
802    #[tokio::test]
803    async fn downloader_get_all() -> TestResult<()> {
804        let testdir = tempfile::tempdir()?;
805        let (r1, store1, _) = node_test_setup_fs(testdir.path().join("a")).await?;
806        let (r2, store2, _) = node_test_setup_fs(testdir.path().join("b")).await?;
807        let (r3, store3, _) = node_test_setup_fs(testdir.path().join("c")).await?;
808        let tt1 = store1.add_slice(vec![1; 10000000]).await?;
809        let tt2 = store2.add_slice(vec![2; 10000000]).await?;
810        let hs = [tt1.hash, tt2.hash].into_iter().collect::<HashSeq>();
811        let root = store1
812            .add_bytes_with_opts(AddBytesOptions {
813                data: hs.clone().into(),
814                format: crate::BlobFormat::HashSeq,
815            })
816            .await?;
817        let node1_addr = r1.endpoint().node_addr().initialized().await;
818        let node1_id = node1_addr.node_id;
819        let node2_addr = r2.endpoint().node_addr().initialized().await;
820        let node2_id = node2_addr.node_id;
821        let swarm = Downloader::new(&store3, r3.endpoint());
822        r3.endpoint().add_node_addr(node1_addr.clone())?;
823        r3.endpoint().add_node_addr(node2_addr.clone())?;
824        let request = GetRequest::all(root.hash);
825        let mut progress = swarm
826            .download_with_opts(DownloadOptions::new(
827                request,
828                [node1_id, node2_id],
829                SplitStrategy::Split,
830            ))
831            .stream()
832            .await?;
833        while let Some(item) = progress.next().await {
834            println!("Got item: {item:?}");
835        }
836        Ok(())
837    }
838}