1use std::{task::Poll, time::Duration};
2
3use either::Either;
4use futures::{
5 future::BoxFuture,
6 stream::{BoxStream, FusedStream},
7 FutureExt, Stream, StreamExt,
8};
9use ipld_core::cid::Cid;
10use libp2p::PeerId;
11use rust_unixfs::walk::{ContinuedWalk, Walker};
12use tracing::{Instrument, Span};
13
14use crate::{dag::IpldDag, repo::Repo, Ipfs, IpfsPath};
15
16#[derive(Debug)]
17pub enum Entry {
18 Error { error: anyhow::Error },
19 RootDirectory { cid: Cid, path: String },
20 Directory { cid: Cid, path: String },
21 File { cid: Cid, file: String, size: usize },
22}
23
24#[must_use = "does nothing unless you `.await` or poll the stream"]
25pub struct UnixfsLs {
26 core: Option<Either<Ipfs, Repo>>,
27 span: Span,
28 path: Option<IpfsPath>,
29 providers: Vec<PeerId>,
30 local_only: bool,
31 timeout: Option<Duration>,
32 stream: Option<BoxStream<'static, Entry>>,
33}
34
35impl UnixfsLs {
36 pub fn with_ipfs(ipfs: &Ipfs, path: impl Into<IpfsPath>) -> Self {
37 Self::with_either(Either::Left(ipfs.clone()), path)
38 }
39
40 pub fn with_repo(repo: &Repo, path: impl Into<IpfsPath>) -> Self {
41 Self::with_either(Either::Right(repo.clone()), path)
42 }
43
44 fn with_either(core: Either<Ipfs, Repo>, path: impl Into<IpfsPath>) -> Self {
45 let path = path.into();
46 Self {
47 core: Some(core),
48 path: Some(path),
49 span: Span::current(),
50 providers: Vec::new(),
51 local_only: false,
52 timeout: None,
53 stream: None,
54 }
55 }
56
57 pub fn span(mut self, span: Span) -> Self {
58 self.span = span;
59 self
60 }
61
62 pub fn provider(mut self, peer_id: PeerId) -> Self {
63 if !self.providers.contains(&peer_id) {
64 self.providers.push(peer_id);
65 }
66 self
67 }
68
69 pub fn providers(mut self, list: &[PeerId]) -> Self {
70 self.providers = list.to_vec();
71 self
72 }
73
74 pub fn timeout(mut self, timeout: Duration) -> Self {
75 self.timeout = Some(timeout);
76 self
77 }
78
79 pub fn local(mut self) -> Self {
80 self.local_only = true;
81 self
82 }
83
84 pub fn set_local(mut self, local: bool) -> Self {
85 self.local_only = local;
86 self
87 }
88}
89
90impl Stream for UnixfsLs {
91 type Item = Entry;
92 fn poll_next(
93 mut self: std::pin::Pin<&mut Self>,
94 cx: &mut std::task::Context<'_>,
95 ) -> Poll<Option<Self::Item>> {
96 if self.core.is_none() && self.stream.is_none() {
97 return Poll::Ready(None);
98 }
99 loop {
100 match &mut self.stream {
101 Some(stream) => match futures::ready!(stream.poll_next_unpin(cx)) {
102 None => {
103 self.stream.take();
104 return Poll::Ready(None);
105 }
106 task => return Poll::Ready(task),
107 },
108 None => {
109 let Some(core) = self.core.take() else {
110 return Poll::Ready(None);
111 };
112
113 let (repo, dag) = match core {
114 Either::Left(ipfs) => (ipfs.repo().clone(), ipfs.dag()),
115 Either::Right(repo) => (repo.clone(), IpldDag::from(repo.clone())),
116 };
117
118 let path = self.path.take().expect("path exist");
119 let providers = std::mem::take(&mut self.providers);
120 let local_only = self.local_only;
121 let timeout = self.timeout;
122
123 let stream = async_stream::stream! {
126
127 let resolved = match dag
128 ._resolve(path, true, &providers, local_only, timeout)
129 .await {
130 Ok((resolved, _)) => resolved,
131 Err(e) => {
132 yield Entry::Error { error: e.into() };
133 return;
134 }
135 };
136
137 let block = match resolved.into_unixfs_block() {
138 Ok(block) => block,
139 Err(e) => {
140 yield Entry::Error { error: e.into() };
141 return;
142 }
143 };
144
145 let cid = block.cid();
146 let root_name = cid.to_string();
147
148 let mut walker = Walker::new(*cid, root_name);
149 let mut cache = None;
150 let mut root_directory = String::new();
151 while walker.should_continue() {
152 let (next, _) = walker.pending_links();
153 let block = match repo.get_block(next).providers(&providers).set_local(local_only).timeout(timeout).await {
154 Ok(block) => block,
155 Err(error) => {
156 yield Entry::Error { error };
157 return;
158 }
159 };
160 let block_data = block.data();
161
162 match walker.next(block_data, &mut cache) {
163 Ok(ContinuedWalk::Bucket(..)) => {}
164 Ok(ContinuedWalk::File(_, cid, path, _, size)) => {
165 let file = path.to_string_lossy().to_string().replace(&format!("{root_directory}/"), "");
166 yield Entry::File { cid: *cid, file, size: size as _ };
167 },
168 Ok(ContinuedWalk::RootDirectory( cid, path, _)) => {
169 let path = path.to_string_lossy().to_string();
170 root_directory.clone_from(&path);
171 yield Entry::RootDirectory { cid: *cid, path };
172 }
173 Ok(ContinuedWalk::Directory( cid, path, _)) => {
174 let path = path.to_string_lossy().to_string().replace(&format!("{root_directory}/"), "");
175 yield Entry::Directory { cid: *cid, path };
176 }
177 Ok(ContinuedWalk::Symlink( .. )) => {},
178 Err(error) => {
179 yield Entry::Error { error: anyhow::Error::from(error) };
180 return;
181 }
182 };
183 };
184
185 }.boxed();
186
187 self.stream.replace(stream);
188 }
189 }
190 }
191 }
192}
193
194impl std::future::IntoFuture for UnixfsLs {
195 type Output = Result<Vec<Entry>, anyhow::Error>;
196
197 type IntoFuture = BoxFuture<'static, Self::Output>;
198
199 fn into_future(mut self) -> Self::IntoFuture {
200 let span = self.span.clone();
201 async move {
202 let mut items = vec![];
203 while let Some(status) = self.next().await {
204 match status {
205 Entry::Error { error } => return Err(error),
206 item => items.push(item),
207 }
208 }
209 Ok(items)
210 }
211 .instrument(span)
212 .boxed()
213 }
214}
215
216impl FusedStream for UnixfsLs {
217 fn is_terminated(&self) -> bool {
218 self.stream.is_none() && self.core.is_none()
219 }
220}