1use std::sync::Arc;
11use std::time::{SystemTime, UNIX_EPOCH};
12
13use bytes::Bytes;
14use reqwest::Method;
15use serde::Deserialize;
16
17use crate::api::{DownloadOptions, UploadResult};
18use crate::client::{Inner, request};
19use crate::swarm::{
20 BatchId, Error, EthAddress, Identifier, PrivateKey, Reference, Topic, bmt::keccak256,
21 make_single_owner_chunk,
22};
23
24use super::FileApi;
25
26#[derive(Clone, Debug, PartialEq, Eq)]
31pub struct FeedUpdate {
32 pub payload: Bytes,
34 pub index: u64,
36 pub index_next: u64,
39}
40
41#[derive(Deserialize)]
42struct ReferenceBody {
43 reference: String,
44}
45
46impl FileApi {
47 pub async fn create_feed_manifest(
50 &self,
51 batch_id: &BatchId,
52 owner: &EthAddress,
53 topic: &Topic,
54 ) -> Result<Reference, Error> {
55 let path = format!("feeds/{}/{}", owner.to_hex(), topic.to_hex());
56 let builder = request(&self.inner, Method::POST, &path)?
57 .header("Swarm-Postage-Batch-Id", batch_id.to_hex());
58 let body: ReferenceBody = self.inner.send_json(builder).await?;
59 Reference::from_hex(&body.reference)
60 }
61
62 pub async fn get_feed_lookup(
64 &self,
65 owner: &EthAddress,
66 topic: &Topic,
67 ) -> Result<Reference, Error> {
68 let path = format!("feeds/{}/{}", owner.to_hex(), topic.to_hex());
69 let builder = request(&self.inner, Method::GET, &path)?;
70 let body: ReferenceBody = self.inner.send_json(builder).await?;
71 Reference::from_hex(&body.reference)
72 }
73
74 pub async fn fetch_latest_feed_update(
80 &self,
81 owner: &EthAddress,
82 topic: &Topic,
83 ) -> Result<FeedUpdate, Error> {
84 let path = format!("feeds/{}/{}", owner.to_hex(), topic.to_hex());
85 let builder = request(&self.inner, Method::GET, &path)?;
86 let resp = self.inner.send(builder).await?;
87 let index = decode_feed_index_header(&resp, "swarm-feed-index")?;
88 let index_next = decode_feed_index_header(&resp, "swarm-feed-index-next")?;
89 let payload = resp.bytes().await?;
90 Ok(FeedUpdate {
91 payload,
92 index,
93 index_next,
94 })
95 }
96
97 pub async fn find_next_index(&self, owner: &EthAddress, topic: &Topic) -> Result<u64, Error> {
101 match self.fetch_latest_feed_update(owner, topic).await {
102 Ok(upd) => Ok(upd.index_next),
103 Err(e) => match e.status() {
104 Some(404) | Some(500) => Ok(0),
105 _ => Err(e),
106 },
107 }
108 }
109
110 pub async fn update_feed(
114 &self,
115 batch_id: &BatchId,
116 signer: &PrivateKey,
117 topic: &Topic,
118 data: &[u8],
119 ) -> Result<UploadResult, Error> {
120 let owner = signer.public_key()?.address();
121 let index = self.find_next_index(&owner, topic).await?;
122 self.update_feed_with_index(batch_id, signer, topic, index, data)
123 .await
124 }
125
126 pub async fn update_feed_with_reference(
130 &self,
131 batch_id: &BatchId,
132 signer: &PrivateKey,
133 topic: &Topic,
134 reference: &Reference,
135 index: Option<u64>,
136 ) -> Result<UploadResult, Error> {
137 let owner = signer.public_key()?.address();
138 let idx = match index {
139 Some(i) => i,
140 None => self.find_next_index(&owner, topic).await?,
141 };
142 self.update_feed_with_index(batch_id, signer, topic, idx, reference.as_bytes())
143 .await
144 }
145
146 pub async fn update_feed_with_index(
152 &self,
153 batch_id: &BatchId,
154 signer: &PrivateKey,
155 topic: &Topic,
156 index: u64,
157 data: &[u8],
158 ) -> Result<UploadResult, Error> {
159 let identifier = make_feed_identifier(topic, index);
160
161 let mut payload = Vec::with_capacity(8 + data.len());
162 let timestamp = SystemTime::now()
163 .duration_since(UNIX_EPOCH)
164 .map(|d| d.as_secs())
165 .unwrap_or(0);
166 payload.extend_from_slice(×tamp.to_be_bytes());
167 payload.extend_from_slice(data);
168
169 let soc = make_single_owner_chunk(&identifier, &payload, signer)?;
170
171 let mut full = Vec::with_capacity(soc.span.as_bytes().len() + soc.payload.len());
173 full.extend_from_slice(soc.span.as_bytes());
174 full.extend_from_slice(&soc.payload);
175
176 let owner = signer.public_key()?.address();
177 self.upload_soc(batch_id, &owner, &identifier, &soc.signature, full, None)
178 .await
179 }
180
181 pub async fn is_feed_retrievable(
186 &self,
187 owner: &EthAddress,
188 topic: &Topic,
189 index: Option<u64>,
190 opts: Option<&DownloadOptions>,
191 ) -> Result<bool, Error> {
192 if let Some(i) = index {
193 return self
194 .are_all_sequential_feeds_update_retrievable(owner, topic, i, opts)
195 .await;
196 }
197 match self.fetch_latest_feed_update(owner, topic).await {
198 Ok(_) => Ok(true),
199 Err(e) => match e.status() {
200 Some(404) | Some(500) => Ok(false),
201 _ => Err(e),
202 },
203 }
204 }
205
206 pub async fn are_all_sequential_feeds_update_retrievable(
210 &self,
211 owner: &EthAddress,
212 topic: &Topic,
213 index: u64,
214 opts: Option<&DownloadOptions>,
215 ) -> Result<bool, Error> {
216 for i in 0..=index {
217 let r = feed_update_chunk_reference(owner, topic, i)?;
218 match self.download_chunk(&r, opts).await {
219 Ok(_) => continue,
220 Err(e) => match e.status() {
221 Some(404) | Some(500) => return Ok(false),
222 _ => return Err(e),
223 },
224 }
225 }
226 Ok(true)
227 }
228
229 pub fn make_feed_reader(&self, owner: EthAddress, topic: Topic) -> FeedReader {
232 FeedReader {
233 inner: self.inner.clone(),
234 owner,
235 topic,
236 }
237 }
238
239 pub fn make_feed_writer(&self, signer: PrivateKey, topic: Topic) -> Result<FeedWriter, Error> {
243 let owner = signer.public_key()?.address();
244 Ok(FeedWriter {
245 reader: FeedReader {
246 inner: self.inner.clone(),
247 owner,
248 topic,
249 },
250 signer,
251 })
252 }
253}
254
255#[derive(Clone, Debug)]
259pub struct FeedReader {
260 inner: Arc<Inner>,
261 owner: EthAddress,
262 topic: Topic,
263}
264
265impl FeedReader {
266 pub fn owner(&self) -> &EthAddress {
268 &self.owner
269 }
270
271 pub fn topic(&self) -> &Topic {
273 &self.topic
274 }
275
276 fn api(&self) -> FileApi {
277 FileApi {
278 inner: self.inner.clone(),
279 }
280 }
281
282 pub async fn download(&self) -> Result<FeedUpdate, Error> {
284 self.api()
285 .fetch_latest_feed_update(&self.owner, &self.topic)
286 .await
287 }
288
289 pub async fn lookup(&self) -> Result<Reference, Error> {
291 self.api().get_feed_lookup(&self.owner, &self.topic).await
292 }
293
294 pub async fn next_index(&self) -> Result<u64, Error> {
296 self.api().find_next_index(&self.owner, &self.topic).await
297 }
298
299 pub async fn is_retrievable(
301 &self,
302 index: Option<u64>,
303 opts: Option<&DownloadOptions>,
304 ) -> Result<bool, Error> {
305 self.api()
306 .is_feed_retrievable(&self.owner, &self.topic, index, opts)
307 .await
308 }
309}
310
311#[derive(Clone, Debug)]
314pub struct FeedWriter {
315 reader: FeedReader,
316 signer: PrivateKey,
317}
318
319impl FeedWriter {
320 pub fn owner(&self) -> &EthAddress {
322 self.reader.owner()
323 }
324
325 pub fn topic(&self) -> &Topic {
327 self.reader.topic()
328 }
329
330 pub fn reader(&self) -> &FeedReader {
332 &self.reader
333 }
334
335 fn api(&self) -> FileApi {
336 FileApi {
337 inner: self.reader.inner.clone(),
338 }
339 }
340
341 pub async fn upload_payload(
344 &self,
345 batch_id: &BatchId,
346 data: &[u8],
347 ) -> Result<UploadResult, Error> {
348 self.api()
349 .update_feed(batch_id, &self.signer, self.reader.topic(), data)
350 .await
351 }
352
353 pub async fn upload_reference(
356 &self,
357 batch_id: &BatchId,
358 reference: &Reference,
359 index: Option<u64>,
360 ) -> Result<UploadResult, Error> {
361 self.api()
362 .update_feed_with_reference(
363 batch_id,
364 &self.signer,
365 self.reader.topic(),
366 reference,
367 index,
368 )
369 .await
370 }
371
372 pub async fn upload_payload_at(
374 &self,
375 batch_id: &BatchId,
376 index: u64,
377 data: &[u8],
378 ) -> Result<UploadResult, Error> {
379 self.api()
380 .update_feed_with_index(batch_id, &self.signer, self.reader.topic(), index, data)
381 .await
382 }
383}
384
385pub fn make_feed_identifier(topic: &Topic, index: u64) -> Identifier {
388 let mut input = Vec::with_capacity(topic.as_bytes().len() + 8);
389 input.extend_from_slice(topic.as_bytes());
390 input.extend_from_slice(&index.to_be_bytes());
391 Identifier::new(&keccak256(&input)).expect("keccak256 returns 32 bytes")
392}
393
394pub fn feed_update_chunk_reference(
399 owner: &EthAddress,
400 topic: &Topic,
401 index: u64,
402) -> Result<Reference, Error> {
403 let identifier = make_feed_identifier(topic, index);
404 let mut input = Vec::with_capacity(identifier.as_bytes().len() + owner.as_bytes().len());
405 input.extend_from_slice(identifier.as_bytes());
406 input.extend_from_slice(owner.as_bytes());
407 Reference::new(&keccak256(&input))
408}
409
410fn decode_feed_index_header(resp: &reqwest::Response, name: &str) -> Result<u64, Error> {
411 let s = resp
412 .headers()
413 .get(name)
414 .and_then(|v| v.to_str().ok())
415 .ok_or_else(|| Error::argument(format!("missing {name} header")))?;
416 let bytes = hex::decode(s)?;
417 if bytes.len() != 8 {
418 return Err(Error::argument(format!(
419 "{name}: expected 8 bytes, got {}",
420 bytes.len()
421 )));
422 }
423 let mut arr = [0u8; 8];
424 arr.copy_from_slice(&bytes);
425 Ok(u64::from_be_bytes(arr))
426}