Skip to main content

rust_ipfs/unixfs/
ls.rs

1use connexa::prelude::PeerId;
2use either::Either;
3use futures::{
4    future::BoxFuture,
5    stream::{BoxStream, FusedStream},
6    FutureExt, Stream, StreamExt,
7};
8use ipld_core::cid::Cid;
9use rust_unixfs::walk::{ContinuedWalk, Walker};
10use std::pin::Pin;
11use std::task::Context;
12use std::{task::Poll, time::Duration};
13use tracing::{Instrument, Span};
14
15use crate::{
16    dag::IpldDag,
17    repo::{DefaultStorage, Repo},
18    Ipfs, IpfsPath,
19};
20
21#[derive(Debug)]
22pub enum Entry {
23    Error { error: anyhow::Error },
24    RootDirectory { cid: Cid, path: String },
25    Directory { cid: Cid, path: String },
26    File { cid: Cid, file: String, size: usize },
27}
28
29#[must_use = "does nothing unless you `.await` or poll the stream"]
30pub struct UnixfsLs {
31    core: Option<Either<Ipfs, Repo<DefaultStorage>>>,
32    span: Span,
33    path: Option<IpfsPath>,
34    providers: Vec<PeerId>,
35    local_only: bool,
36    timeout: Option<Duration>,
37    stream: Option<BoxStream<'static, Entry>>,
38}
39
40impl UnixfsLs {
41    pub fn with_ipfs(ipfs: &Ipfs, path: impl Into<IpfsPath>) -> Self {
42        Self::with_either(Either::Left(ipfs.clone()), path)
43    }
44
45    pub fn with_repo(repo: &Repo<DefaultStorage>, path: impl Into<IpfsPath>) -> Self {
46        Self::with_either(Either::Right(repo.clone()), path)
47    }
48
49    fn with_either(core: Either<Ipfs, Repo<DefaultStorage>>, path: impl Into<IpfsPath>) -> Self {
50        let path = path.into();
51        Self {
52            core: Some(core),
53            path: Some(path),
54            span: Span::current(),
55            providers: Vec::new(),
56            local_only: false,
57            timeout: None,
58            stream: None,
59        }
60    }
61
62    pub fn span(mut self, span: Span) -> Self {
63        self.span = span;
64        self
65    }
66
67    pub fn provider(mut self, peer_id: PeerId) -> Self {
68        if !self.providers.contains(&peer_id) {
69            self.providers.push(peer_id);
70        }
71        self
72    }
73
74    pub fn providers(mut self, list: &[PeerId]) -> Self {
75        self.providers = list.to_vec();
76        self
77    }
78
79    pub fn timeout(mut self, timeout: Duration) -> Self {
80        self.timeout = Some(timeout);
81        self
82    }
83
84    pub fn local(mut self) -> Self {
85        self.local_only = true;
86        self
87    }
88
89    pub fn set_local(mut self, local: bool) -> Self {
90        self.local_only = local;
91        self
92    }
93}
94
95impl Stream for UnixfsLs {
96    type Item = Entry;
97    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
98        if self.core.is_none() && self.stream.is_none() {
99            return Poll::Ready(None);
100        }
101        loop {
102            match &mut self.stream {
103                Some(stream) => match futures::ready!(stream.poll_next_unpin(cx)) {
104                    None => {
105                        self.stream.take();
106                        return Poll::Ready(None);
107                    }
108                    task => return Poll::Ready(task),
109                },
110                None => {
111                    let Some(core) = self.core.take() else {
112                        return Poll::Ready(None);
113                    };
114
115                    let (repo, dag) = match core {
116                        Either::Left(ipfs) => (ipfs.repo().clone(), ipfs.dag()),
117                        Either::Right(repo) => (repo.clone(), IpldDag::from(repo.clone())),
118                    };
119
120                    let path = self.path.take().expect("path exist");
121                    let providers = std::mem::take(&mut self.providers);
122                    let local_only = self.local_only;
123                    let timeout = self.timeout;
124
125                    // using async_stream here at least to get on faster; writing custom streams is not too easy
126                    // but this might be easy enough to write open.
127                    let stream = async_stream::stream! {
128
129                        let resolved = match dag
130                            ._resolve(path, true, &providers, local_only, timeout)
131                            .await {
132                                Ok((resolved, _)) => resolved,
133                                Err(e) => {
134                                    yield Entry::Error { error: e.into() };
135                                    return;
136                                }
137                            };
138
139                        let block = match resolved.into_unixfs_block() {
140                            Ok(block) => block,
141                            Err(e) => {
142                                yield Entry::Error { error: e.into() };
143                                return;
144                            }
145                        };
146
147                        let cid = block.cid();
148                        let root_name = cid.to_string();
149
150                        let mut walker = Walker::new(*cid, root_name);
151                        let mut cache = None;
152                        let mut root_directory = String::new();
153                        while walker.should_continue() {
154                            let (next, _) = walker.pending_links();
155                            let block = match repo.get_block(next).providers(&providers).set_local(local_only).timeout(timeout).await {
156                                Ok(block) => block,
157                                Err(error) => {
158                                    yield Entry::Error { error };
159                                    return;
160                                }
161                            };
162                            let block_data = block.data();
163
164                            match walker.next(block_data, &mut cache) {
165                                Ok(ContinuedWalk::Bucket(..)) => {}
166                                Ok(ContinuedWalk::File(_, cid, path, _, size)) => {
167                                    let file = path.to_string_lossy().to_string().replace(&format!("{root_directory}/"), "");
168                                    yield Entry::File { cid: *cid, file, size: size as _ };
169                                },
170                                Ok(ContinuedWalk::RootDirectory( cid, path, _)) => {
171                                    let path = path.to_string_lossy().to_string();
172                                    root_directory.clone_from(&path);
173                                    yield Entry::RootDirectory { cid: *cid, path };
174                                }
175                                Ok(ContinuedWalk::Directory( cid, path, _)) => {
176                                    let path = path.to_string_lossy().to_string().replace(&format!("{root_directory}/"), "");
177                                    yield Entry::Directory { cid: *cid, path };
178                                }
179                                Ok(ContinuedWalk::Symlink( .. )) => {},
180                                Err(error) => {
181                                    yield Entry::Error { error: anyhow::Error::from(error) };
182                                    return;
183                                }
184                            };
185                        };
186
187                    }.boxed();
188
189                    self.stream.replace(stream);
190                }
191            }
192        }
193    }
194}
195
196impl std::future::IntoFuture for UnixfsLs {
197    type Output = Result<Vec<Entry>, anyhow::Error>;
198
199    type IntoFuture = BoxFuture<'static, Self::Output>;
200
201    fn into_future(mut self) -> Self::IntoFuture {
202        let span = self.span.clone();
203        async move {
204            let mut items = vec![];
205            while let Some(status) = self.next().await {
206                match status {
207                    Entry::Error { error } => return Err(error),
208                    item => items.push(item),
209                }
210            }
211            Ok(items)
212        }
213        .instrument(span)
214        .boxed()
215    }
216}
217
218impl FusedStream for UnixfsLs {
219    fn is_terminated(&self) -> bool {
220        self.stream.is_none() && self.core.is_none()
221    }
222}