1use connexa::prelude::PeerId;
2use either::Either;
3use futures::{
4 future::BoxFuture,
5 stream::{BoxStream, FusedStream},
6 FutureExt, Stream, StreamExt,
7};
8use ipld_core::cid::Cid;
9use rust_unixfs::walk::{ContinuedWalk, Walker};
10use std::pin::Pin;
11use std::task::Context;
12use std::{task::Poll, time::Duration};
13use tracing::{Instrument, Span};
14
15use crate::{
16 dag::IpldDag,
17 repo::{DefaultStorage, Repo},
18 Ipfs, IpfsPath,
19};
20
21#[derive(Debug)]
22pub enum Entry {
23 Error { error: anyhow::Error },
24 RootDirectory { cid: Cid, path: String },
25 Directory { cid: Cid, path: String },
26 File { cid: Cid, file: String, size: usize },
27}
28
29#[must_use = "does nothing unless you `.await` or poll the stream"]
30pub struct UnixfsLs {
31 core: Option<Either<Ipfs, Repo<DefaultStorage>>>,
32 span: Span,
33 path: Option<IpfsPath>,
34 providers: Vec<PeerId>,
35 local_only: bool,
36 timeout: Option<Duration>,
37 stream: Option<BoxStream<'static, Entry>>,
38}
39
40impl UnixfsLs {
41 pub fn with_ipfs(ipfs: &Ipfs, path: impl Into<IpfsPath>) -> Self {
42 Self::with_either(Either::Left(ipfs.clone()), path)
43 }
44
45 pub fn with_repo(repo: &Repo<DefaultStorage>, path: impl Into<IpfsPath>) -> Self {
46 Self::with_either(Either::Right(repo.clone()), path)
47 }
48
49 fn with_either(core: Either<Ipfs, Repo<DefaultStorage>>, path: impl Into<IpfsPath>) -> Self {
50 let path = path.into();
51 Self {
52 core: Some(core),
53 path: Some(path),
54 span: Span::current(),
55 providers: Vec::new(),
56 local_only: false,
57 timeout: None,
58 stream: None,
59 }
60 }
61
62 pub fn span(mut self, span: Span) -> Self {
63 self.span = span;
64 self
65 }
66
67 pub fn provider(mut self, peer_id: PeerId) -> Self {
68 if !self.providers.contains(&peer_id) {
69 self.providers.push(peer_id);
70 }
71 self
72 }
73
74 pub fn providers(mut self, list: &[PeerId]) -> Self {
75 self.providers = list.to_vec();
76 self
77 }
78
79 pub fn timeout(mut self, timeout: Duration) -> Self {
80 self.timeout = Some(timeout);
81 self
82 }
83
84 pub fn local(mut self) -> Self {
85 self.local_only = true;
86 self
87 }
88
89 pub fn set_local(mut self, local: bool) -> Self {
90 self.local_only = local;
91 self
92 }
93}
94
95impl Stream for UnixfsLs {
96 type Item = Entry;
97 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
98 if self.core.is_none() && self.stream.is_none() {
99 return Poll::Ready(None);
100 }
101 loop {
102 match &mut self.stream {
103 Some(stream) => match futures::ready!(stream.poll_next_unpin(cx)) {
104 None => {
105 self.stream.take();
106 return Poll::Ready(None);
107 }
108 task => return Poll::Ready(task),
109 },
110 None => {
111 let Some(core) = self.core.take() else {
112 return Poll::Ready(None);
113 };
114
115 let (repo, dag) = match core {
116 Either::Left(ipfs) => (ipfs.repo().clone(), ipfs.dag()),
117 Either::Right(repo) => (repo.clone(), IpldDag::from(repo.clone())),
118 };
119
120 let path = self.path.take().expect("path exist");
121 let providers = std::mem::take(&mut self.providers);
122 let local_only = self.local_only;
123 let timeout = self.timeout;
124
125 let stream = async_stream::stream! {
128
129 let resolved = match dag
130 ._resolve(path, true, &providers, local_only, timeout)
131 .await {
132 Ok((resolved, _)) => resolved,
133 Err(e) => {
134 yield Entry::Error { error: e.into() };
135 return;
136 }
137 };
138
139 let block = match resolved.into_unixfs_block() {
140 Ok(block) => block,
141 Err(e) => {
142 yield Entry::Error { error: e.into() };
143 return;
144 }
145 };
146
147 let cid = block.cid();
148 let root_name = cid.to_string();
149
150 let mut walker = Walker::new(*cid, root_name);
151 let mut cache = None;
152 let mut root_directory = String::new();
153 while walker.should_continue() {
154 let (next, _) = walker.pending_links();
155 let block = match repo.get_block(next).providers(&providers).set_local(local_only).timeout(timeout).await {
156 Ok(block) => block,
157 Err(error) => {
158 yield Entry::Error { error };
159 return;
160 }
161 };
162 let block_data = block.data();
163
164 match walker.next(block_data, &mut cache) {
165 Ok(ContinuedWalk::Bucket(..)) => {}
166 Ok(ContinuedWalk::File(_, cid, path, _, size)) => {
167 let file = path.to_string_lossy().to_string().replace(&format!("{root_directory}/"), "");
168 yield Entry::File { cid: *cid, file, size: size as _ };
169 },
170 Ok(ContinuedWalk::RootDirectory( cid, path, _)) => {
171 let path = path.to_string_lossy().to_string();
172 root_directory.clone_from(&path);
173 yield Entry::RootDirectory { cid: *cid, path };
174 }
175 Ok(ContinuedWalk::Directory( cid, path, _)) => {
176 let path = path.to_string_lossy().to_string().replace(&format!("{root_directory}/"), "");
177 yield Entry::Directory { cid: *cid, path };
178 }
179 Ok(ContinuedWalk::Symlink( .. )) => {},
180 Err(error) => {
181 yield Entry::Error { error: anyhow::Error::from(error) };
182 return;
183 }
184 };
185 };
186
187 }.boxed();
188
189 self.stream.replace(stream);
190 }
191 }
192 }
193 }
194}
195
196impl std::future::IntoFuture for UnixfsLs {
197 type Output = Result<Vec<Entry>, anyhow::Error>;
198
199 type IntoFuture = BoxFuture<'static, Self::Output>;
200
201 fn into_future(mut self) -> Self::IntoFuture {
202 let span = self.span.clone();
203 async move {
204 let mut items = vec![];
205 while let Some(status) = self.next().await {
206 match status {
207 Entry::Error { error } => return Err(error),
208 item => items.push(item),
209 }
210 }
211 Ok(items)
212 }
213 .instrument(span)
214 .boxed()
215 }
216}
217
218impl FusedStream for UnixfsLs {
219 fn is_terminated(&self) -> bool {
220 self.stream.is_none() && self.core.is_none()
221 }
222}