use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use bytes::Bytes;
use reqwest::Method;
use serde::Deserialize;
use crate::api::{DownloadOptions, UploadResult};
use crate::client::{Inner, request};
use crate::swarm::{
BatchId, Error, EthAddress, Identifier, PrivateKey, Reference, Topic, bmt::keccak256,
make_single_owner_chunk,
};
use super::FileApi;
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct FeedUpdate {
pub payload: Bytes,
pub index: u64,
pub index_next: u64,
}
#[derive(Deserialize)]
struct ReferenceBody {
reference: String,
}
impl FileApi {
pub async fn create_feed_manifest(
&self,
batch_id: &BatchId,
owner: &EthAddress,
topic: &Topic,
) -> Result<Reference, Error> {
let path = format!("feeds/{}/{}", owner.to_hex(), topic.to_hex());
let builder = request(&self.inner, Method::POST, &path)?
.header("Swarm-Postage-Batch-Id", batch_id.to_hex());
let body: ReferenceBody = self.inner.send_json(builder).await?;
Reference::from_hex(&body.reference)
}
pub async fn get_feed_lookup(
&self,
owner: &EthAddress,
topic: &Topic,
) -> Result<Reference, Error> {
let path = format!("feeds/{}/{}", owner.to_hex(), topic.to_hex());
let builder = request(&self.inner, Method::GET, &path)?;
let body: ReferenceBody = self.inner.send_json(builder).await?;
Reference::from_hex(&body.reference)
}
pub async fn fetch_latest_feed_update(
&self,
owner: &EthAddress,
topic: &Topic,
) -> Result<FeedUpdate, Error> {
let path = format!("feeds/{}/{}", owner.to_hex(), topic.to_hex());
let builder = request(&self.inner, Method::GET, &path)?;
let resp = self.inner.send(builder).await?;
let index = decode_feed_index_header(&resp, "swarm-feed-index")?;
let index_next = decode_feed_index_header(&resp, "swarm-feed-index-next")?;
let payload = resp.bytes().await?;
Ok(FeedUpdate {
payload,
index,
index_next,
})
}
pub async fn find_next_index(&self, owner: &EthAddress, topic: &Topic) -> Result<u64, Error> {
match self.fetch_latest_feed_update(owner, topic).await {
Ok(upd) => Ok(upd.index_next),
Err(e) => match e.status() {
Some(404) | Some(500) => Ok(0),
_ => Err(e),
},
}
}
pub async fn update_feed(
&self,
batch_id: &BatchId,
signer: &PrivateKey,
topic: &Topic,
data: &[u8],
) -> Result<UploadResult, Error> {
let owner = signer.public_key()?.address();
let index = self.find_next_index(&owner, topic).await?;
self.update_feed_with_index(batch_id, signer, topic, index, data)
.await
}
pub async fn update_feed_with_reference(
&self,
batch_id: &BatchId,
signer: &PrivateKey,
topic: &Topic,
reference: &Reference,
index: Option<u64>,
) -> Result<UploadResult, Error> {
let owner = signer.public_key()?.address();
let idx = match index {
Some(i) => i,
None => self.find_next_index(&owner, topic).await?,
};
self.update_feed_with_index(batch_id, signer, topic, idx, reference.as_bytes())
.await
}
pub async fn update_feed_with_index(
&self,
batch_id: &BatchId,
signer: &PrivateKey,
topic: &Topic,
index: u64,
data: &[u8],
) -> Result<UploadResult, Error> {
let identifier = make_feed_identifier(topic, index);
let mut payload = Vec::with_capacity(8 + data.len());
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
payload.extend_from_slice(×tamp.to_be_bytes());
payload.extend_from_slice(data);
let soc = make_single_owner_chunk(&identifier, &payload, signer)?;
let mut full = Vec::with_capacity(soc.span.as_bytes().len() + soc.payload.len());
full.extend_from_slice(soc.span.as_bytes());
full.extend_from_slice(&soc.payload);
let owner = signer.public_key()?.address();
self.upload_soc(batch_id, &owner, &identifier, &soc.signature, full, None)
.await
}
pub async fn is_feed_retrievable(
&self,
owner: &EthAddress,
topic: &Topic,
index: Option<u64>,
opts: Option<&DownloadOptions>,
) -> Result<bool, Error> {
if let Some(i) = index {
return self
.are_all_sequential_feeds_update_retrievable(owner, topic, i, opts)
.await;
}
match self.fetch_latest_feed_update(owner, topic).await {
Ok(_) => Ok(true),
Err(e) => match e.status() {
Some(404) | Some(500) => Ok(false),
_ => Err(e),
},
}
}
pub async fn are_all_sequential_feeds_update_retrievable(
&self,
owner: &EthAddress,
topic: &Topic,
index: u64,
opts: Option<&DownloadOptions>,
) -> Result<bool, Error> {
for i in 0..=index {
let r = feed_update_chunk_reference(owner, topic, i)?;
match self.download_chunk(&r, opts).await {
Ok(_) => continue,
Err(e) => match e.status() {
Some(404) | Some(500) => return Ok(false),
_ => return Err(e),
},
}
}
Ok(true)
}
pub fn make_feed_reader(&self, owner: EthAddress, topic: Topic) -> FeedReader {
FeedReader {
inner: self.inner.clone(),
owner,
topic,
}
}
pub fn make_feed_writer(&self, signer: PrivateKey, topic: Topic) -> Result<FeedWriter, Error> {
let owner = signer.public_key()?.address();
Ok(FeedWriter {
reader: FeedReader {
inner: self.inner.clone(),
owner,
topic,
},
signer,
})
}
}
#[derive(Clone, Debug)]
pub struct FeedReader {
inner: Arc<Inner>,
owner: EthAddress,
topic: Topic,
}
impl FeedReader {
pub fn owner(&self) -> &EthAddress {
&self.owner
}
pub fn topic(&self) -> &Topic {
&self.topic
}
fn api(&self) -> FileApi {
FileApi {
inner: self.inner.clone(),
}
}
pub async fn download(&self) -> Result<FeedUpdate, Error> {
self.api()
.fetch_latest_feed_update(&self.owner, &self.topic)
.await
}
pub async fn lookup(&self) -> Result<Reference, Error> {
self.api().get_feed_lookup(&self.owner, &self.topic).await
}
pub async fn next_index(&self) -> Result<u64, Error> {
self.api().find_next_index(&self.owner, &self.topic).await
}
pub async fn is_retrievable(
&self,
index: Option<u64>,
opts: Option<&DownloadOptions>,
) -> Result<bool, Error> {
self.api()
.is_feed_retrievable(&self.owner, &self.topic, index, opts)
.await
}
}
#[derive(Clone, Debug)]
pub struct FeedWriter {
reader: FeedReader,
signer: PrivateKey,
}
impl FeedWriter {
pub fn owner(&self) -> &EthAddress {
self.reader.owner()
}
pub fn topic(&self) -> &Topic {
self.reader.topic()
}
pub fn reader(&self) -> &FeedReader {
&self.reader
}
fn api(&self) -> FileApi {
FileApi {
inner: self.reader.inner.clone(),
}
}
pub async fn upload_payload(
&self,
batch_id: &BatchId,
data: &[u8],
) -> Result<UploadResult, Error> {
self.api()
.update_feed(batch_id, &self.signer, self.reader.topic(), data)
.await
}
pub async fn upload_reference(
&self,
batch_id: &BatchId,
reference: &Reference,
index: Option<u64>,
) -> Result<UploadResult, Error> {
self.api()
.update_feed_with_reference(
batch_id,
&self.signer,
self.reader.topic(),
reference,
index,
)
.await
}
pub async fn upload_payload_at(
&self,
batch_id: &BatchId,
index: u64,
data: &[u8],
) -> Result<UploadResult, Error> {
self.api()
.update_feed_with_index(batch_id, &self.signer, self.reader.topic(), index, data)
.await
}
}
pub fn make_feed_identifier(topic: &Topic, index: u64) -> Identifier {
let mut input = Vec::with_capacity(topic.as_bytes().len() + 8);
input.extend_from_slice(topic.as_bytes());
input.extend_from_slice(&index.to_be_bytes());
Identifier::new(&keccak256(&input)).expect("keccak256 returns 32 bytes")
}
pub fn feed_update_chunk_reference(
owner: &EthAddress,
topic: &Topic,
index: u64,
) -> Result<Reference, Error> {
let identifier = make_feed_identifier(topic, index);
let mut input = Vec::with_capacity(identifier.as_bytes().len() + owner.as_bytes().len());
input.extend_from_slice(identifier.as_bytes());
input.extend_from_slice(owner.as_bytes());
Reference::new(&keccak256(&input))
}
fn decode_feed_index_header(resp: &reqwest::Response, name: &str) -> Result<u64, Error> {
let s = resp
.headers()
.get(name)
.and_then(|v| v.to_str().ok())
.ok_or_else(|| Error::argument(format!("missing {name} header")))?;
let bytes = hex::decode(s)?;
if bytes.len() != 8 {
return Err(Error::argument(format!(
"{name}: expected 8 bytes, got {}",
bytes.len()
)));
}
let mut arr = [0u8; 8];
arr.copy_from_slice(&bytes);
Ok(u64::from_be_bytes(arr))
}