1use std::io::{Read, Write};
5use std::path::{Path, PathBuf};
6
7use bytes::Bytes;
8use reqwest::Method;
9use serde::Deserialize;
10
11use crate::api::{
12 CollectionUploadOptions, DownloadOptions, FileHeaders, FileUploadOptions, UploadProgress,
13 UploadResult, prepare_collection_upload_headers, prepare_download_headers,
14 prepare_file_upload_headers,
15};
16use crate::client::{Inner, request};
17use crate::manifest::{MantarayNode, populate_self_addresses};
18use crate::swarm::file_chunker::FileChunker;
19use crate::swarm::{BatchId, Error, Reference};
20
21use super::FileApi;
22
23#[derive(Clone, Debug, PartialEq, Eq)]
26pub struct CollectionEntry {
27 pub path: String,
30 pub data: Vec<u8>,
32}
33
34impl CollectionEntry {
35 pub fn new(path: impl Into<String>, data: impl Into<Vec<u8>>) -> Self {
37 Self {
38 path: path.into(),
39 data: data.into(),
40 }
41 }
42}
43
44pub fn collection_size(entries: &[CollectionEntry]) -> u64 {
47 entries.iter().map(|e| e.data.len() as u64).sum()
48}
49
50#[derive(Deserialize)]
51struct UploadBody {
52 reference: String,
53}
54
55impl FileApi {
56 pub async fn upload_file(
62 &self,
63 batch_id: &BatchId,
64 data: impl Into<Bytes>,
65 name: &str,
66 content_type: &str,
67 opts: Option<&FileUploadOptions>,
68 ) -> Result<UploadResult, Error> {
69 let mut builder = request(&self.inner, Method::POST, "bzz")?;
70 if !name.is_empty() {
71 builder = builder.query(&[("name", name)]);
72 }
73 let ct = match opts.and_then(|o| o.content_type.as_deref()) {
74 Some(s) => s.to_string(),
75 None if !content_type.is_empty() => content_type.to_string(),
76 None => "application/octet-stream".to_string(),
77 };
78 let builder = builder.header("Content-Type", ct).body(data.into());
79 let builder = Inner::apply_headers(builder, prepare_file_upload_headers(batch_id, opts));
80 let resp = self.inner.send(builder).await?;
81 let headers = resp.headers().clone();
82 let body: UploadBody = serde_json::from_slice(&resp.bytes().await?)?;
83 UploadResult::from_response(&body.reference, &headers)
84 }
85
86 pub async fn download_file(
89 &self,
90 reference: &Reference,
91 opts: Option<&DownloadOptions>,
92 ) -> Result<(Bytes, FileHeaders), Error> {
93 let resp = self.download_file_response(reference, opts).await?;
94 let headers = FileHeaders::from_response(resp.headers());
95 Ok((resp.bytes().await?, headers))
96 }
97
98 pub async fn download_file_response(
102 &self,
103 reference: &Reference,
104 opts: Option<&DownloadOptions>,
105 ) -> Result<reqwest::Response, Error> {
106 let path = format!("bzz/{}", reference.to_hex());
107 let builder = request(&self.inner, Method::GET, &path)?;
108 let builder = Inner::apply_headers(builder, prepare_download_headers(opts));
109 self.inner.send(builder).await
110 }
111
112 pub async fn download_file_path(
116 &self,
117 reference: &Reference,
118 path: &str,
119 opts: Option<&DownloadOptions>,
120 ) -> Result<(Bytes, FileHeaders), Error> {
121 let p = format!(
122 "bzz/{}/{}",
123 reference.to_hex(),
124 path.trim_start_matches('/')
125 );
126 let builder = request(&self.inner, Method::GET, &p)?;
127 let builder = Inner::apply_headers(builder, prepare_download_headers(opts));
128 let resp = self.inner.send(builder).await?;
129 let headers = FileHeaders::from_response(resp.headers());
130 Ok((resp.bytes().await?, headers))
131 }
132
133 pub async fn upload_collection_entries(
138 &self,
139 batch_id: &BatchId,
140 entries: &[CollectionEntry],
141 opts: Option<&CollectionUploadOptions>,
142 ) -> Result<UploadResult, Error> {
143 if let Some(cb) = opts.and_then(|o| o.on_entry.as_ref()) {
144 let total = entries.len();
145 for (i, entry) in entries.iter().enumerate() {
146 cb(UploadProgress {
147 path: &entry.path,
148 size: entry.data.len() as u64,
149 index: i,
150 total,
151 });
152 }
153 }
154 let tar_bytes = build_tar_archive(entries)?;
155 let builder = request(&self.inner, Method::POST, "bzz")?
156 .header("Content-Type", "application/x-tar")
157 .header("Swarm-Collection", "true")
158 .body(Bytes::from(tar_bytes));
159 let builder =
160 Inner::apply_headers(builder, prepare_collection_upload_headers(batch_id, opts));
161 let resp = self.inner.send(builder).await?;
162 let headers = resp.headers().clone();
163 let body: UploadBody = serde_json::from_slice(&resp.bytes().await?)?;
164 UploadResult::from_response(&body.reference, &headers)
165 }
166
167 pub async fn upload_collection(
172 &self,
173 batch_id: &BatchId,
174 dir: impl AsRef<Path>,
175 opts: Option<&CollectionUploadOptions>,
176 ) -> Result<UploadResult, Error> {
177 let entries = read_directory_entries(dir.as_ref())?;
178 self.upload_collection_entries(batch_id, &entries, opts)
179 .await
180 }
181}
182
183pub fn hash_collection_entries(entries: &[CollectionEntry]) -> Result<Reference, Error> {
192 let mut manifest = MantarayNode::new();
193 for entry in entries {
194 let mut chunker = FileChunker::new();
195 chunker.write(&entry.data)?;
196 let root = chunker.finalize()?;
197 manifest.add_fork(entry.path.as_bytes(), Some(&root.address), None);
198 }
199 let addr = populate_self_addresses(&mut manifest)?;
200 Reference::new(&addr)
201}
202
203pub fn hash_directory(dir: impl AsRef<Path>) -> Result<Reference, Error> {
206 let entries = read_directory_entries(dir.as_ref())?;
207 hash_collection_entries(&entries)
208}
209
210pub fn read_directory_entries(dir: &Path) -> Result<Vec<CollectionEntry>, Error> {
214 let canonical = dir
215 .canonicalize()
216 .map_err(|e| Error::argument(format!("invalid dir {dir:?}: {e}")))?;
217 let mut out = Vec::new();
218 walk(&canonical, &canonical, &mut out)?;
219 out.sort_by(|a, b| a.path.cmp(&b.path));
220 Ok(out)
221}
222
223fn walk(root: &Path, here: &Path, out: &mut Vec<CollectionEntry>) -> Result<(), Error> {
224 let read =
225 std::fs::read_dir(here).map_err(|e| Error::argument(format!("read_dir {here:?}: {e}")))?;
226 for entry in read {
227 let entry = entry.map_err(|e| Error::argument(format!("dir entry: {e}")))?;
228 let path: PathBuf = entry.path();
229 let ty = entry
230 .file_type()
231 .map_err(|e| Error::argument(format!("file_type {path:?}: {e}")))?;
232 if ty.is_dir() {
233 walk(root, &path, out)?;
234 continue;
235 }
236 if !ty.is_file() {
237 continue;
238 }
239 let rel = path
240 .strip_prefix(root)
241 .map_err(|e| Error::argument(format!("strip_prefix {path:?}: {e}")))?;
242 let rel_str = rel
243 .to_str()
244 .ok_or_else(|| Error::argument(format!("non-UTF-8 path {rel:?}")))?
245 .replace(std::path::MAIN_SEPARATOR, "/");
246 let mut data = Vec::new();
247 std::fs::File::open(&path)
248 .and_then(|mut f| f.read_to_end(&mut data))
249 .map_err(|e| Error::argument(format!("open {path:?}: {e}")))?;
250 out.push(CollectionEntry::new(rel_str, data));
251 }
252 Ok(())
253}
254
255fn build_tar_archive(entries: &[CollectionEntry]) -> Result<Vec<u8>, Error> {
259 let mut buf =
260 Vec::with_capacity(entries.iter().map(|e| e.data.len() + 512).sum::<usize>() + 1024);
261 {
262 let mut tw = tar::Builder::new(&mut buf);
263 for e in entries {
264 let mut header = tar::Header::new_ustar();
265 header
266 .set_path(&e.path)
267 .map_err(|err| Error::argument(format!("invalid tar path {:?}: {err}", e.path)))?;
268 header.set_size(e.data.len() as u64);
269 header.set_mode(0o644);
270 header.set_cksum();
271 tw.append(&header, e.data.as_slice())
272 .map_err(|err| Error::argument(format!("tar append failed: {err}")))?;
273 }
274 tw.finish()
275 .map_err(|err| Error::argument(format!("tar finish failed: {err}")))?;
276 }
277 let _ = std::io::sink().flush();
279 Ok(buf)
280}
281
282#[cfg(test)]
283mod tests {
284 use super::*;
285
286 #[test]
287 fn collection_size_sums_entries() {
288 let entries = vec![
289 CollectionEntry::new("a.txt", b"abc".to_vec()),
290 CollectionEntry::new("b.txt", b"defgh".to_vec()),
291 ];
292 assert_eq!(collection_size(&entries), 8);
293 }
294
295 #[test]
296 fn hash_collection_entries_is_deterministic_and_path_sensitive() {
297 let a = vec![CollectionEntry::new("index.html", b"x".to_vec())];
298 let b = vec![CollectionEntry::new("index.html", b"x".to_vec())];
299 let c = vec![CollectionEntry::new("other.html", b"x".to_vec())];
300
301 let ra = hash_collection_entries(&a).unwrap();
302 let rb = hash_collection_entries(&b).unwrap();
303 let rc = hash_collection_entries(&c).unwrap();
304
305 assert_eq!(ra, rb, "same entries → same address");
306 assert_ne!(ra, rc, "different paths → different address");
307 }
308
309 #[test]
310 fn read_directory_entries_walks_recursively() {
311 let dir = tempfile::tempdir().unwrap();
312 std::fs::create_dir_all(dir.path().join("nested")).unwrap();
313 std::fs::write(dir.path().join("a.txt"), b"alpha").unwrap();
314 std::fs::write(dir.path().join("nested/b.bin"), [0u8, 1, 2]).unwrap();
315
316 let entries = read_directory_entries(dir.path()).unwrap();
317 assert_eq!(entries.len(), 2);
318 assert_eq!(entries[0].path, "a.txt");
319 assert_eq!(&entries[0].data, b"alpha");
320 assert_eq!(entries[1].path, "nested/b.bin");
321 assert_eq!(&entries[1].data, &[0, 1, 2]);
322 }
323
324 #[test]
325 fn build_tar_archive_round_trip() {
326 let entries = vec![
327 CollectionEntry::new("index.html", b"<html/>".to_vec()),
328 CollectionEntry::new("nested/data.bin", b"\x00\x01\x02".to_vec()),
329 ];
330 let bytes = build_tar_archive(&entries).unwrap();
331
332 let mut found: Vec<(String, Vec<u8>)> = Vec::new();
333 let mut ar = tar::Archive::new(bytes.as_slice());
334 for entry in ar.entries().unwrap() {
335 let mut e = entry.unwrap();
336 let path = e.path().unwrap().to_string_lossy().into_owned();
337 let mut data = Vec::new();
338 std::io::Read::read_to_end(&mut e, &mut data).unwrap();
339 found.push((path, data));
340 }
341 assert_eq!(found.len(), 2);
342 assert_eq!(found[0].0, "index.html");
343 assert_eq!(&found[0].1, b"<html/>");
344 assert_eq!(found[1].0, "nested/data.bin");
345 assert_eq!(&found[1].1, b"\x00\x01\x02");
346 }
347}