1use std::io::{Read, Write};
5use std::path::{Path, PathBuf};
6
7use bytes::Bytes;
8use reqwest::Method;
9use serde::Deserialize;
10
11use crate::api::{
12 CollectionUploadOptions, DownloadOptions, FileHeaders, FileUploadOptions, UploadProgress,
13 UploadResult, prepare_collection_upload_headers, prepare_download_headers,
14 validate_collection_upload_options,
15 prepare_file_upload_headers,
16};
17use crate::client::{Inner, MAX_JSON_RESPONSE_BYTES, request};
18use crate::manifest::{MantarayNode, populate_self_addresses};
19use crate::swarm::file_chunker::FileChunker;
20use crate::swarm::{BatchId, Error, Reference};
21
22use super::FileApi;
23
24#[derive(Clone, Debug, PartialEq, Eq)]
27pub struct CollectionEntry {
28 pub path: String,
31 pub data: Vec<u8>,
33}
34
35impl CollectionEntry {
36 pub fn new(path: impl Into<String>, data: impl Into<Vec<u8>>) -> Self {
38 Self {
39 path: path.into(),
40 data: data.into(),
41 }
42 }
43}
44
45pub fn collection_size(entries: &[CollectionEntry]) -> u64 {
48 entries.iter().map(|e| e.data.len() as u64).sum()
49}
50
51#[derive(Deserialize)]
52struct UploadBody {
53 reference: String,
54}
55
56impl FileApi {
57 pub async fn upload_file(
63 &self,
64 batch_id: &BatchId,
65 data: impl Into<Bytes>,
66 name: &str,
67 content_type: &str,
68 opts: Option<&FileUploadOptions>,
69 ) -> Result<UploadResult, Error> {
70 let mut builder = request(&self.inner, Method::POST, "bzz")?;
71 if !name.is_empty() {
72 builder = builder.query(&[("name", name)]);
73 }
74 let ct = match opts.and_then(|o| o.content_type.as_deref()) {
75 Some(s) => s.to_string(),
76 None if !content_type.is_empty() => content_type.to_string(),
77 None => "application/octet-stream".to_string(),
78 };
79 let builder = builder.header("Content-Type", ct).body(data.into());
80 let builder = Inner::apply_headers(builder, prepare_file_upload_headers(batch_id, opts));
81 let resp = self.inner.send(builder).await?;
82 let headers = resp.headers().clone();
83 let body: UploadBody = serde_json::from_slice(&Inner::read_capped(resp, MAX_JSON_RESPONSE_BYTES).await?)?;
84 UploadResult::from_response(&body.reference, &headers)
85 }
86
87 pub async fn download_file(
90 &self,
91 reference: &Reference,
92 opts: Option<&DownloadOptions>,
93 ) -> Result<(Bytes, FileHeaders), Error> {
94 let resp = self.download_file_response(reference, opts).await?;
95 let headers = FileHeaders::from_response(resp.headers());
96 Ok((resp.bytes().await?, headers))
97 }
98
99 pub async fn download_file_response(
103 &self,
104 reference: &Reference,
105 opts: Option<&DownloadOptions>,
106 ) -> Result<reqwest::Response, Error> {
107 let path = format!("bzz/{}", reference.to_hex());
108 let builder = request(&self.inner, Method::GET, &path)?;
109 let builder = Inner::apply_headers(builder, prepare_download_headers(opts));
110 self.inner.send(builder).await
111 }
112
113 pub async fn download_file_path(
117 &self,
118 reference: &Reference,
119 path: &str,
120 opts: Option<&DownloadOptions>,
121 ) -> Result<(Bytes, FileHeaders), Error> {
122 let p = format!(
123 "bzz/{}/{}",
124 reference.to_hex(),
125 path.trim_start_matches('/')
126 );
127 let builder = request(&self.inner, Method::GET, &p)?;
128 let builder = Inner::apply_headers(builder, prepare_download_headers(opts));
129 let resp = self.inner.send(builder).await?;
130 let headers = FileHeaders::from_response(resp.headers());
131 Ok((resp.bytes().await?, headers))
132 }
133
134 pub async fn upload_collection_entries(
139 &self,
140 batch_id: &BatchId,
141 entries: &[CollectionEntry],
142 opts: Option<&CollectionUploadOptions>,
143 ) -> Result<UploadResult, Error> {
144 if let Some(cb) = opts.and_then(|o| o.on_entry.as_ref()) {
145 let total = entries.len();
146 for (i, entry) in entries.iter().enumerate() {
147 cb(UploadProgress {
148 path: &entry.path,
149 size: entry.data.len() as u64,
150 index: i,
151 total,
152 });
153 }
154 }
155 let tar_bytes = build_tar_archive(entries)?;
156 validate_collection_upload_options(opts)?;
157 let builder = request(&self.inner, Method::POST, "bzz")?
158 .header("Content-Type", "application/x-tar")
159 .header("Swarm-Collection", "true")
160 .body(Bytes::from(tar_bytes));
161 let builder =
162 Inner::apply_headers(builder, prepare_collection_upload_headers(batch_id, opts));
163 let resp = self.inner.send(builder).await?;
164 let headers = resp.headers().clone();
165 let body: UploadBody = serde_json::from_slice(&Inner::read_capped(resp, MAX_JSON_RESPONSE_BYTES).await?)?;
166 UploadResult::from_response(&body.reference, &headers)
167 }
168
169 pub async fn upload_collection(
174 &self,
175 batch_id: &BatchId,
176 dir: impl AsRef<Path>,
177 opts: Option<&CollectionUploadOptions>,
178 ) -> Result<UploadResult, Error> {
179 let entries = read_directory_entries(dir.as_ref())?;
180 self.upload_collection_entries(batch_id, &entries, opts)
181 .await
182 }
183}
184
185pub fn hash_collection_entries(entries: &[CollectionEntry]) -> Result<Reference, Error> {
194 let mut manifest = MantarayNode::new();
195 for entry in entries {
196 let mut chunker = FileChunker::new();
197 chunker.write(&entry.data)?;
198 let root = chunker.finalize()?;
199 manifest.add_fork(entry.path.as_bytes(), Some(&root.address), None);
200 }
201 let addr = populate_self_addresses(&mut manifest)?;
202 Reference::new(&addr)
203}
204
205pub fn hash_directory(dir: impl AsRef<Path>) -> Result<Reference, Error> {
208 let entries = read_directory_entries(dir.as_ref())?;
209 hash_collection_entries(&entries)
210}
211
212pub fn read_directory_entries(dir: &Path) -> Result<Vec<CollectionEntry>, Error> {
216 let canonical = dir
217 .canonicalize()
218 .map_err(|e| Error::argument(format!("invalid dir {dir:?}: {e}")))?;
219 let mut out = Vec::new();
220 walk(&canonical, &canonical, &mut out)?;
221 out.sort_by(|a, b| a.path.cmp(&b.path));
222 Ok(out)
223}
224
225fn walk(root: &Path, here: &Path, out: &mut Vec<CollectionEntry>) -> Result<(), Error> {
226 let read =
227 std::fs::read_dir(here).map_err(|e| Error::argument(format!("read_dir {here:?}: {e}")))?;
228 for entry in read {
229 let entry = entry.map_err(|e| Error::argument(format!("dir entry: {e}")))?;
230 let path: PathBuf = entry.path();
231 let ty = entry
232 .file_type()
233 .map_err(|e| Error::argument(format!("file_type {path:?}: {e}")))?;
234 if ty.is_dir() {
235 walk(root, &path, out)?;
236 continue;
237 }
238 if !ty.is_file() {
239 continue;
240 }
241 let rel = path
242 .strip_prefix(root)
243 .map_err(|e| Error::argument(format!("strip_prefix {path:?}: {e}")))?;
244 let rel_str = rel
245 .to_str()
246 .ok_or_else(|| Error::argument(format!("non-UTF-8 path {rel:?}")))?
247 .replace(std::path::MAIN_SEPARATOR, "/");
248 let mut data = Vec::new();
249 std::fs::File::open(&path)
250 .and_then(|mut f| f.read_to_end(&mut data))
251 .map_err(|e| Error::argument(format!("open {path:?}: {e}")))?;
252 out.push(CollectionEntry::new(rel_str, data));
253 }
254 Ok(())
255}
256
257fn build_tar_archive(entries: &[CollectionEntry]) -> Result<Vec<u8>, Error> {
261 let mut buf =
262 Vec::with_capacity(entries.iter().map(|e| e.data.len() + 512).sum::<usize>() + 1024);
263 {
264 let mut tw = tar::Builder::new(&mut buf);
265 for e in entries {
266 let mut header = tar::Header::new_ustar();
267 header
268 .set_path(&e.path)
269 .map_err(|err| Error::argument(format!("invalid tar path {:?}: {err}", e.path)))?;
270 header.set_size(e.data.len() as u64);
271 header.set_mode(0o644);
272 header.set_cksum();
273 tw.append(&header, e.data.as_slice())
274 .map_err(|err| Error::argument(format!("tar append failed: {err}")))?;
275 }
276 tw.finish()
277 .map_err(|err| Error::argument(format!("tar finish failed: {err}")))?;
278 }
279 let _ = std::io::sink().flush();
281 Ok(buf)
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287
288 #[test]
289 fn collection_size_sums_entries() {
290 let entries = vec![
291 CollectionEntry::new("a.txt", b"abc".to_vec()),
292 CollectionEntry::new("b.txt", b"defgh".to_vec()),
293 ];
294 assert_eq!(collection_size(&entries), 8);
295 }
296
297 #[test]
298 fn hash_collection_entries_is_deterministic_and_path_sensitive() {
299 let a = vec![CollectionEntry::new("index.html", b"x".to_vec())];
300 let b = vec![CollectionEntry::new("index.html", b"x".to_vec())];
301 let c = vec![CollectionEntry::new("other.html", b"x".to_vec())];
302
303 let ra = hash_collection_entries(&a).unwrap();
304 let rb = hash_collection_entries(&b).unwrap();
305 let rc = hash_collection_entries(&c).unwrap();
306
307 assert_eq!(ra, rb, "same entries → same address");
308 assert_ne!(ra, rc, "different paths → different address");
309 }
310
311 #[test]
312 fn read_directory_entries_walks_recursively() {
313 let dir = tempfile::tempdir().unwrap();
314 std::fs::create_dir_all(dir.path().join("nested")).unwrap();
315 std::fs::write(dir.path().join("a.txt"), b"alpha").unwrap();
316 std::fs::write(dir.path().join("nested/b.bin"), [0u8, 1, 2]).unwrap();
317
318 let entries = read_directory_entries(dir.path()).unwrap();
319 assert_eq!(entries.len(), 2);
320 assert_eq!(entries[0].path, "a.txt");
321 assert_eq!(&entries[0].data, b"alpha");
322 assert_eq!(entries[1].path, "nested/b.bin");
323 assert_eq!(&entries[1].data, &[0, 1, 2]);
324 }
325
326 #[test]
327 fn build_tar_archive_round_trip() {
328 let entries = vec![
329 CollectionEntry::new("index.html", b"<html/>".to_vec()),
330 CollectionEntry::new("nested/data.bin", b"\x00\x01\x02".to_vec()),
331 ];
332 let bytes = build_tar_archive(&entries).unwrap();
333
334 let mut found: Vec<(String, Vec<u8>)> = Vec::new();
335 let mut ar = tar::Archive::new(bytes.as_slice());
336 for entry in ar.entries().unwrap() {
337 let mut e = entry.unwrap();
338 let path = e.path().unwrap().to_string_lossy().into_owned();
339 let mut data = Vec::new();
340 std::io::Read::read_to_end(&mut e, &mut data).unwrap();
341 found.push((path, data));
342 }
343 assert_eq!(found.len(), 2);
344 assert_eq!(found[0].0, "index.html");
345 assert_eq!(&found[0].1, b"<html/>");
346 assert_eq!(found[1].0, "nested/data.bin");
347 assert_eq!(&found[1].1, b"\x00\x01\x02");
348 }
349}