rust_ipfs/unixfs/
ls.rs

1use std::{task::Poll, time::Duration};
2
3use either::Either;
4use futures::{
5    future::BoxFuture,
6    stream::{BoxStream, FusedStream},
7    FutureExt, Stream, StreamExt,
8};
9use ipld_core::cid::Cid;
10use libp2p::PeerId;
11use rust_unixfs::walk::{ContinuedWalk, Walker};
12use tracing::{Instrument, Span};
13
14use crate::{dag::IpldDag, repo::Repo, Ipfs, IpfsPath};
15
16#[derive(Debug)]
17pub enum Entry {
18    Error { error: anyhow::Error },
19    RootDirectory { cid: Cid, path: String },
20    Directory { cid: Cid, path: String },
21    File { cid: Cid, file: String, size: usize },
22}
23
24#[must_use = "does nothing unless you `.await` or poll the stream"]
25pub struct UnixfsLs {
26    core: Option<Either<Ipfs, Repo>>,
27    span: Span,
28    path: Option<IpfsPath>,
29    providers: Vec<PeerId>,
30    local_only: bool,
31    timeout: Option<Duration>,
32    stream: Option<BoxStream<'static, Entry>>,
33}
34
35impl UnixfsLs {
36    pub fn with_ipfs(ipfs: &Ipfs, path: impl Into<IpfsPath>) -> Self {
37        Self::with_either(Either::Left(ipfs.clone()), path)
38    }
39
40    pub fn with_repo(repo: &Repo, path: impl Into<IpfsPath>) -> Self {
41        Self::with_either(Either::Right(repo.clone()), path)
42    }
43
44    fn with_either(core: Either<Ipfs, Repo>, path: impl Into<IpfsPath>) -> Self {
45        let path = path.into();
46        Self {
47            core: Some(core),
48            path: Some(path),
49            span: Span::current(),
50            providers: Vec::new(),
51            local_only: false,
52            timeout: None,
53            stream: None,
54        }
55    }
56
57    pub fn span(mut self, span: Span) -> Self {
58        self.span = span;
59        self
60    }
61
62    pub fn provider(mut self, peer_id: PeerId) -> Self {
63        if !self.providers.contains(&peer_id) {
64            self.providers.push(peer_id);
65        }
66        self
67    }
68
69    pub fn providers(mut self, list: &[PeerId]) -> Self {
70        self.providers = list.to_vec();
71        self
72    }
73
74    pub fn timeout(mut self, timeout: Duration) -> Self {
75        self.timeout = Some(timeout);
76        self
77    }
78
79    pub fn local(mut self) -> Self {
80        self.local_only = true;
81        self
82    }
83
84    pub fn set_local(mut self, local: bool) -> Self {
85        self.local_only = local;
86        self
87    }
88}
89
90impl Stream for UnixfsLs {
91    type Item = Entry;
92    fn poll_next(
93        mut self: std::pin::Pin<&mut Self>,
94        cx: &mut std::task::Context<'_>,
95    ) -> Poll<Option<Self::Item>> {
96        if self.core.is_none() && self.stream.is_none() {
97            return Poll::Ready(None);
98        }
99        loop {
100            match &mut self.stream {
101                Some(stream) => match futures::ready!(stream.poll_next_unpin(cx)) {
102                    None => {
103                        self.stream.take();
104                        return Poll::Ready(None);
105                    }
106                    task => return Poll::Ready(task),
107                },
108                None => {
109                    let Some(core) = self.core.take() else {
110                        return Poll::Ready(None);
111                    };
112
113                    let (repo, dag) = match core {
114                        Either::Left(ipfs) => (ipfs.repo().clone(), ipfs.dag()),
115                        Either::Right(repo) => (repo.clone(), IpldDag::from(repo.clone())),
116                    };
117
118                    let path = self.path.take().expect("path exist");
119                    let providers = std::mem::take(&mut self.providers);
120                    let local_only = self.local_only;
121                    let timeout = self.timeout;
122
123                    // using async_stream here at least to get on faster; writing custom streams is not too easy
124                    // but this might be easy enough to write open.
125                    let stream = async_stream::stream! {
126
127                        let resolved = match dag
128                            ._resolve(path, true, &providers, local_only, timeout)
129                            .await {
130                                Ok((resolved, _)) => resolved,
131                                Err(e) => {
132                                    yield Entry::Error { error: e.into() };
133                                    return;
134                                }
135                            };
136
137                        let block = match resolved.into_unixfs_block() {
138                            Ok(block) => block,
139                            Err(e) => {
140                                yield Entry::Error { error: e.into() };
141                                return;
142                            }
143                        };
144
145                        let cid = block.cid();
146                        let root_name = cid.to_string();
147
148                        let mut walker = Walker::new(*cid, root_name);
149                        let mut cache = None;
150                        let mut root_directory = String::new();
151                        while walker.should_continue() {
152                            let (next, _) = walker.pending_links();
153                            let block = match repo.get_block(next).providers(&providers).set_local(local_only).timeout(timeout).await {
154                                Ok(block) => block,
155                                Err(error) => {
156                                    yield Entry::Error { error };
157                                    return;
158                                }
159                            };
160                            let block_data = block.data();
161
162                            match walker.next(block_data, &mut cache) {
163                                Ok(ContinuedWalk::Bucket(..)) => {}
164                                Ok(ContinuedWalk::File(_, cid, path, _, size)) => {
165                                    let file = path.to_string_lossy().to_string().replace(&format!("{root_directory}/"), "");
166                                    yield Entry::File { cid: *cid, file, size: size as _ };
167                                },
168                                Ok(ContinuedWalk::RootDirectory( cid, path, _)) => {
169                                    let path = path.to_string_lossy().to_string();
170                                    root_directory.clone_from(&path);
171                                    yield Entry::RootDirectory { cid: *cid, path };
172                                }
173                                Ok(ContinuedWalk::Directory( cid, path, _)) => {
174                                    let path = path.to_string_lossy().to_string().replace(&format!("{root_directory}/"), "");
175                                    yield Entry::Directory { cid: *cid, path };
176                                }
177                                Ok(ContinuedWalk::Symlink( .. )) => {},
178                                Err(error) => {
179                                    yield Entry::Error { error: anyhow::Error::from(error) };
180                                    return;
181                                }
182                            };
183                        };
184
185                    }.boxed();
186
187                    self.stream.replace(stream);
188                }
189            }
190        }
191    }
192}
193
194impl std::future::IntoFuture for UnixfsLs {
195    type Output = Result<Vec<Entry>, anyhow::Error>;
196
197    type IntoFuture = BoxFuture<'static, Self::Output>;
198
199    fn into_future(mut self) -> Self::IntoFuture {
200        let span = self.span.clone();
201        async move {
202            let mut items = vec![];
203            while let Some(status) = self.next().await {
204                match status {
205                    Entry::Error { error } => return Err(error),
206                    item => items.push(item),
207                }
208            }
209            Ok(items)
210        }
211        .instrument(span)
212        .boxed()
213    }
214}
215
216impl FusedStream for UnixfsLs {
217    fn is_terminated(&self) -> bool {
218        self.stream.is_none() && self.core.is_none()
219    }
220}