use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use bee::Client;
use bee::api::CollectionUploadOptions;
use bee::file::StreamProgress;
use bee::postage::Stamper;
use bee::swarm::{
BatchId, CidType, EthAddress, PrivateKey, Reference, Topic, convert_cid_to_reference,
convert_reference_to_cid,
};
use serde_json::json;
use wiremock::matchers::{header_exists, method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
fn batch() -> BatchId {
BatchId::new(&[0xab; 32]).unwrap()
}
fn signer() -> PrivateKey {
PrivateKey::new(&[0x33; 32]).unwrap()
}
#[tokio::test]
async fn feed_reader_download_returns_latest_update() {
let server = MockServer::start().await;
let owner = EthAddress::new(&[0xee; 20]).unwrap();
let topic = Topic::from_string("feed-reader-test");
Mock::given(method("GET"))
.and(path(format!(
"/feeds/{}/{}",
owner.to_hex(),
topic.to_hex()
)))
.respond_with(
ResponseTemplate::new(200)
.insert_header("swarm-feed-index", "0000000000000007")
.insert_header("swarm-feed-index-next", "0000000000000008")
.set_body_bytes(b"payload".to_vec()),
)
.mount(&server)
.await;
let client = Client::new(&server.uri()).unwrap();
let reader = client.file().make_feed_reader(owner, topic);
let upd = reader.download().await.unwrap();
assert_eq!(upd.index, 7);
assert_eq!(upd.index_next, 8);
assert_eq!(upd.payload.as_ref(), b"payload");
assert_eq!(reader.next_index().await.unwrap(), 8);
}
#[tokio::test]
async fn feed_writer_upload_payload_uses_signer_owner() {
let server = MockServer::start().await;
let topic = Topic::from_string("feed-writer-test");
let pk = signer();
let owner = pk.public_key().unwrap().address();
Mock::given(method("GET"))
.and(path(format!(
"/feeds/{}/{}",
owner.to_hex(),
topic.to_hex()
)))
.respond_with(ResponseTemplate::new(404))
.mount(&server)
.await;
let owner_hex = owner.to_hex();
Mock::given(method("POST"))
.and(header_exists("Swarm-Postage-Batch-Id"))
.and(wiremock::matchers::path_regex(format!(
r"^/soc/{}/[0-9a-f]+$",
owner_hex
)))
.respond_with(
ResponseTemplate::new(201).set_body_json(json!({ "reference": "aa".repeat(32) })),
)
.mount(&server)
.await;
let client = Client::new(&server.uri()).unwrap();
let writer = client.file().make_feed_writer(pk, topic).unwrap();
let result = writer
.upload_payload(&batch(), b"hello-feed")
.await
.unwrap();
assert_eq!(result.reference.to_hex(), "aa".repeat(32));
assert_eq!(writer.owner(), &owner);
}
#[tokio::test]
async fn feed_writer_upload_reference_with_explicit_index_skips_lookup() {
let server = MockServer::start().await;
let topic = Topic::from_string("feed-writer-explicit");
let pk = signer();
let owner = pk.public_key().unwrap().address();
let target = Reference::from_hex(&"cd".repeat(32)).unwrap();
Mock::given(method("POST"))
.and(header_exists("Swarm-Postage-Batch-Id"))
.and(wiremock::matchers::path_regex(format!(
r"^/soc/{}/[0-9a-f]+$",
owner.to_hex()
)))
.respond_with(
ResponseTemplate::new(201).set_body_json(json!({ "reference": "ef".repeat(32) })),
)
.mount(&server)
.await;
let client = Client::new(&server.uri()).unwrap();
let writer = client.file().make_feed_writer(pk, topic).unwrap();
let result = writer
.upload_reference(&batch(), &target, Some(42))
.await
.unwrap();
assert_eq!(result.reference.to_hex(), "ef".repeat(32));
}
#[test]
fn cid_round_trip_feed_and_manifest() {
let hex = "ca6357a08e317d15ec560fef34e4c45f8f19f01c75d6f20a7021602e9575a617";
let r = Reference::from_hex(hex).unwrap();
let feed_cid = convert_reference_to_cid(&r, CidType::Feed).unwrap();
let dec = convert_cid_to_reference(&feed_cid).unwrap();
assert_eq!(dec.kind, CidType::Feed);
assert_eq!(dec.reference.to_hex(), hex);
let manifest_cid = convert_reference_to_cid(&r, CidType::Manifest).unwrap();
let dec = convert_cid_to_reference(&manifest_cid).unwrap();
assert_eq!(dec.kind, CidType::Manifest);
assert_eq!(dec.reference.to_hex(), hex);
assert_ne!(feed_cid, manifest_cid);
assert!(feed_cid.starts_with('b'));
assert!(manifest_cid.starts_with('b'));
}
#[test]
fn stamper_persists_and_resumes_state() {
let mut a = Stamper::from_blank(signer(), batch(), 18).unwrap();
a.stamp(&[0u8; 32]).unwrap();
a.stamp(&[1u8; 32]).unwrap();
let snapshot = a.state().to_vec();
let b = Stamper::from_state(signer(), batch(), snapshot, 18).unwrap();
assert_eq!(b.state()[0], 1);
assert_eq!(b.state()[0x0101], 1);
assert_eq!(b.depth(), 18);
assert_eq!(b.max_slot(), 4);
}
#[tokio::test]
async fn upload_collection_invokes_progress_callback() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/bzz"))
.and(header_exists("Swarm-Postage-Batch-Id"))
.respond_with(
ResponseTemplate::new(201).set_body_json(json!({ "reference": "11".repeat(32) })),
)
.mount(&server)
.await;
let dir = tempfile::tempdir().unwrap();
std::fs::write(dir.path().join("a.txt"), b"alpha").unwrap();
std::fs::write(dir.path().join("b.txt"), b"beta").unwrap();
let observed = Arc::new(AtomicUsize::new(0));
let observed_clone = observed.clone();
let names: Arc<std::sync::Mutex<Vec<String>>> = Arc::new(std::sync::Mutex::new(Vec::new()));
let names_clone = names.clone();
let opts = CollectionUploadOptions::default().with_on_entry(move |entry| {
observed_clone.fetch_add(1, Ordering::SeqCst);
names_clone.lock().unwrap().push(entry.path.to_string());
});
let client = Client::new(&server.uri()).unwrap();
let path: PathBuf = dir.path().to_path_buf();
client
.file()
.upload_collection(&batch(), &path, Some(&opts))
.await
.unwrap();
assert_eq!(observed.load(Ordering::SeqCst), 2);
let mut got = names.lock().unwrap().clone();
got.sort();
assert_eq!(got, vec!["a.txt".to_string(), "b.txt".to_string()]);
}
#[tokio::test]
async fn stream_directory_uploads_chunks_then_manifest() {
let server = MockServer::start().await;
let chunks_seen = Arc::new(AtomicUsize::new(0));
let chunks_seen_clone = chunks_seen.clone();
Mock::given(method("POST"))
.and(path("/chunks"))
.and(header_exists("Swarm-Postage-Batch-Id"))
.respond_with(move |_: &wiremock::Request| {
chunks_seen_clone.fetch_add(1, Ordering::SeqCst);
ResponseTemplate::new(201).set_body_json(json!({ "reference": "cc".repeat(32) }))
})
.mount(&server)
.await;
let bytes_seen = Arc::new(AtomicUsize::new(0));
let bytes_seen_clone = bytes_seen.clone();
Mock::given(method("POST"))
.and(path("/bytes"))
.and(header_exists("Swarm-Postage-Batch-Id"))
.respond_with(move |_: &wiremock::Request| {
bytes_seen_clone.fetch_add(1, Ordering::SeqCst);
ResponseTemplate::new(201).set_body_json(json!({ "reference": "ee".repeat(32) }))
})
.mount(&server)
.await;
let dir = tempfile::tempdir().unwrap();
std::fs::write(dir.path().join("a.txt"), b"alpha").unwrap();
std::fs::write(dir.path().join("index.html"), b"<html/>").unwrap();
let last: Arc<std::sync::Mutex<Option<StreamProgress>>> = Arc::new(std::sync::Mutex::new(None));
let last_clone = last.clone();
let on_progress: bee::file::OnStreamProgressFn = Arc::new(move |p: StreamProgress| {
*last_clone.lock().unwrap() = Some(p);
});
let client = Client::new(&server.uri()).unwrap();
let result = client
.file()
.stream_directory(&batch(), dir.path(), None, Some(on_progress))
.await
.unwrap();
assert_eq!(result.reference.to_hex(), "ee".repeat(32));
assert_eq!(chunks_seen.load(Ordering::SeqCst), 2);
assert!(bytes_seen.load(Ordering::SeqCst) >= 1);
let last = last.lock().unwrap().expect("progress callback fired");
assert_eq!(last.total, 2);
assert_eq!(last.processed, 2);
}