Skip to main content

bee/file/
feeds.rs

1//! Feed reader / writer + auto-index update logic. Mirrors bee-go's
2//! `pkg/file/feed.go`.
3//!
4//! A Swarm feed stores a sequence of SOC chunks under a fixed
5//! `(owner, topic)` pair. Each update has a 64-bit big-endian index
6//! and is signed by the owner. The chunk identifier is
7//! `keccak256(topic || BE-uint64(index))`; the chunk payload is
8//! `BE-uint64(timestamp) || data`.
9
10use std::sync::Arc;
11use std::time::{SystemTime, UNIX_EPOCH};
12
13use bytes::Bytes;
14use reqwest::Method;
15use serde::Deserialize;
16
17use crate::api::{DownloadOptions, UploadResult};
18use crate::client::{Inner, request};
19use crate::swarm::{
20    BatchId, Error, EthAddress, Identifier, PrivateKey, Reference, Topic, bmt::keccak256,
21    make_single_owner_chunk,
22};
23
24use super::FileApi;
25
26/// Result of a feed lookup: the wrapped chunk payload plus the
27/// indexes parsed from `swarm-feed-index` / `swarm-feed-index-next`.
28///
29/// Mirrors bee-js `FeedPayloadResult`.
30#[derive(Clone, Debug, PartialEq, Eq)]
31pub struct FeedUpdate {
32    /// Raw chunk payload (`timestamp(8) || data`).
33    pub payload: Bytes,
34    /// Index of the returned update.
35    pub index: u64,
36    /// Index where the *next* update should be written (sequential
37    /// feeds: `index + 1`).
38    pub index_next: u64,
39}
40
41#[derive(Deserialize)]
42struct ReferenceBody {
43    reference: String,
44}
45
46impl FileApi {
47    /// `POST /feeds/{owner}/{topic}` — create a feed manifest for
48    /// the given pair. Returns the manifest reference.
49    pub async fn create_feed_manifest(
50        &self,
51        batch_id: &BatchId,
52        owner: &EthAddress,
53        topic: &Topic,
54    ) -> Result<Reference, Error> {
55        let path = format!("feeds/{}/{}", owner.to_hex(), topic.to_hex());
56        let builder = request(&self.inner, Method::POST, &path)?
57            .header("Swarm-Postage-Batch-Id", batch_id.to_hex());
58        let body: ReferenceBody = self.inner.send_json(builder).await?;
59        Reference::from_hex(&body.reference)
60    }
61
62    /// `GET /feeds/{owner}/{topic}` — return the latest feed lookup.
63    pub async fn get_feed_lookup(
64        &self,
65        owner: &EthAddress,
66        topic: &Topic,
67    ) -> Result<Reference, Error> {
68        let path = format!("feeds/{}/{}", owner.to_hex(), topic.to_hex());
69        let builder = request(&self.inner, Method::GET, &path)?;
70        let body: ReferenceBody = self.inner.send_json(builder).await?;
71        Reference::from_hex(&body.reference)
72    }
73
74    /// Fetch the most recent feed update.
75    ///
76    /// The body is the wrapped chunk payload; the `swarm-feed-index`
77    /// and `swarm-feed-index-next` headers carry the indexes as
78    /// 8-byte big-endian hex.
79    pub async fn fetch_latest_feed_update(
80        &self,
81        owner: &EthAddress,
82        topic: &Topic,
83    ) -> Result<FeedUpdate, Error> {
84        let path = format!("feeds/{}/{}", owner.to_hex(), topic.to_hex());
85        let builder = request(&self.inner, Method::GET, &path)?;
86        let resp = self.inner.send(builder).await?;
87        let index = decode_feed_index_header(&resp, "swarm-feed-index")?;
88        let index_next = decode_feed_index_header(&resp, "swarm-feed-index-next")?;
89        let payload = resp.bytes().await?;
90        Ok(FeedUpdate {
91            payload,
92            index,
93            index_next,
94        })
95    }
96
97    /// Return the index where the next feed update should be written.
98    /// Bee returns 404 / 500 when the feed is empty; this helper
99    /// translates those to `0`.
100    pub async fn find_next_index(&self, owner: &EthAddress, topic: &Topic) -> Result<u64, Error> {
101        match self.fetch_latest_feed_update(owner, topic).await {
102            Ok(upd) => Ok(upd.index_next),
103            Err(e) => match e.status() {
104                Some(404) | Some(500) => Ok(0),
105                _ => Err(e),
106            },
107        }
108    }
109
110    /// Update the feed at the next available index. The chunk payload
111    /// is `BE-uint64(timestamp) || data`. Mirrors bee-js
112    /// `updateFeedWithPayload`.
113    pub async fn update_feed(
114        &self,
115        batch_id: &BatchId,
116        signer: &PrivateKey,
117        topic: &Topic,
118        data: &[u8],
119    ) -> Result<UploadResult, Error> {
120        let owner = signer.public_key()?.address();
121        let index = self.find_next_index(&owner, topic).await?;
122        self.update_feed_with_index(batch_id, signer, topic, index, data)
123            .await
124    }
125
126    /// Update the feed to point at `reference`. The chunk payload is
127    /// `BE-uint64(timestamp) || reference(32 or 64)`. If `index` is
128    /// `None`, [`FileApi::find_next_index`] is called.
129    pub async fn update_feed_with_reference(
130        &self,
131        batch_id: &BatchId,
132        signer: &PrivateKey,
133        topic: &Topic,
134        reference: &Reference,
135        index: Option<u64>,
136    ) -> Result<UploadResult, Error> {
137        let owner = signer.public_key()?.address();
138        let idx = match index {
139            Some(i) => i,
140            None => self.find_next_index(&owner, topic).await?,
141        };
142        self.update_feed_with_index(batch_id, signer, topic, idx, reference.as_bytes())
143            .await
144    }
145
146    /// Update the feed at a specific index.
147    ///
148    /// The chunk identifier is `keccak256(topic || BE-uint64(index))`;
149    /// the payload is `BE-uint64(now_unix_seconds) || data`. The
150    /// chunk is signed via SOC and uploaded to `/soc/{owner}/{id}`.
151    pub async fn update_feed_with_index(
152        &self,
153        batch_id: &BatchId,
154        signer: &PrivateKey,
155        topic: &Topic,
156        index: u64,
157        data: &[u8],
158    ) -> Result<UploadResult, Error> {
159        let identifier = make_feed_identifier(topic, index);
160
161        let mut payload = Vec::with_capacity(8 + data.len());
162        let timestamp = SystemTime::now()
163            .duration_since(UNIX_EPOCH)
164            .map(|d| d.as_secs())
165            .unwrap_or(0);
166        payload.extend_from_slice(&timestamp.to_be_bytes());
167        payload.extend_from_slice(data);
168
169        let soc = make_single_owner_chunk(&identifier, &payload, signer)?;
170
171        // SOC body: span || payload (the live-Bee SOC bug bee-go hit).
172        let mut full = Vec::with_capacity(soc.span.as_bytes().len() + soc.payload.len());
173        full.extend_from_slice(soc.span.as_bytes());
174        full.extend_from_slice(&soc.payload);
175
176        let owner = signer.public_key()?.address();
177        self.upload_soc(batch_id, &owner, &identifier, &soc.signature, full, None)
178            .await
179    }
180
181    /// True iff the feed currently resolves on the network. If
182    /// `index` is `None`, only the latest update is checked. If
183    /// `index` is `Some(i)`, every chunk from 0 through `i` is
184    /// checked via [`FileApi::are_all_sequential_feeds_update_retrievable`].
185    pub async fn is_feed_retrievable(
186        &self,
187        owner: &EthAddress,
188        topic: &Topic,
189        index: Option<u64>,
190        opts: Option<&DownloadOptions>,
191    ) -> Result<bool, Error> {
192        if let Some(i) = index {
193            return self
194                .are_all_sequential_feeds_update_retrievable(owner, topic, i, opts)
195                .await;
196        }
197        match self.fetch_latest_feed_update(owner, topic).await {
198            Ok(_) => Ok(true),
199            Err(e) => match e.status() {
200                Some(404) | Some(500) => Ok(false),
201                _ => Err(e),
202            },
203        }
204    }
205
206    /// True iff every feed-update chunk from `0` through `index`
207    /// (inclusive) is currently retrievable. Used to validate that a
208    /// feed can be replayed from its origin.
209    pub async fn are_all_sequential_feeds_update_retrievable(
210        &self,
211        owner: &EthAddress,
212        topic: &Topic,
213        index: u64,
214        opts: Option<&DownloadOptions>,
215    ) -> Result<bool, Error> {
216        for i in 0..=index {
217            let r = feed_update_chunk_reference(owner, topic, i)?;
218            match self.download_chunk(&r, opts).await {
219                Ok(_) => continue,
220                Err(e) => match e.status() {
221                    Some(404) | Some(500) => return Ok(false),
222                    _ => return Err(e),
223                },
224            }
225        }
226        Ok(true)
227    }
228
229    /// Construct a [`FeedReader`] bound to `(owner, topic)`. Mirrors
230    /// bee-js `Bee.makeFeedReader`.
231    pub fn make_feed_reader(&self, owner: EthAddress, topic: Topic) -> FeedReader {
232        FeedReader {
233            inner: self.inner.clone(),
234            owner,
235            topic,
236        }
237    }
238
239    /// Construct a [`FeedWriter`] bound to `(signer, topic)`. Owner is
240    /// derived from `signer.public_key().address()`. Mirrors bee-js
241    /// `Bee.makeFeedWriter`.
242    pub fn make_feed_writer(&self, signer: PrivateKey, topic: Topic) -> Result<FeedWriter, Error> {
243        let owner = signer.public_key()?.address();
244        Ok(FeedWriter {
245            reader: FeedReader {
246                inner: self.inner.clone(),
247                owner,
248                topic,
249            },
250            signer,
251        })
252    }
253}
254
255/// Reader bound to a `(owner, topic)` pair. Wraps the lower-level
256/// `FileApi` feed methods for callers that prefer the bee-js
257/// `FeedReader` shape.
258#[derive(Clone, Debug)]
259pub struct FeedReader {
260    inner: Arc<Inner>,
261    owner: EthAddress,
262    topic: Topic,
263}
264
265impl FeedReader {
266    /// Owner address.
267    pub fn owner(&self) -> &EthAddress {
268        &self.owner
269    }
270
271    /// Feed topic.
272    pub fn topic(&self) -> &Topic {
273        &self.topic
274    }
275
276    fn api(&self) -> FileApi {
277        FileApi {
278            inner: self.inner.clone(),
279        }
280    }
281
282    /// Fetch the most recent feed update.
283    pub async fn download(&self) -> Result<FeedUpdate, Error> {
284        self.api()
285            .fetch_latest_feed_update(&self.owner, &self.topic)
286            .await
287    }
288
289    /// Resolve the feed manifest reference (`GET /feeds/{owner}/{topic}`).
290    pub async fn lookup(&self) -> Result<Reference, Error> {
291        self.api().get_feed_lookup(&self.owner, &self.topic).await
292    }
293
294    /// Index where the next feed update will be written.
295    pub async fn next_index(&self) -> Result<u64, Error> {
296        self.api().find_next_index(&self.owner, &self.topic).await
297    }
298
299    /// True iff the feed currently resolves on the network.
300    pub async fn is_retrievable(
301        &self,
302        index: Option<u64>,
303        opts: Option<&DownloadOptions>,
304    ) -> Result<bool, Error> {
305        self.api()
306            .is_feed_retrievable(&self.owner, &self.topic, index, opts)
307            .await
308    }
309}
310
311/// Writer bound to a `(signer, topic)` pair. Owner is derived from
312/// the signer.
313#[derive(Clone, Debug)]
314pub struct FeedWriter {
315    reader: FeedReader,
316    signer: PrivateKey,
317}
318
319impl FeedWriter {
320    /// Owner address derived from the signer.
321    pub fn owner(&self) -> &EthAddress {
322        self.reader.owner()
323    }
324
325    /// Feed topic.
326    pub fn topic(&self) -> &Topic {
327        self.reader.topic()
328    }
329
330    /// Read side of this writer.
331    pub fn reader(&self) -> &FeedReader {
332        &self.reader
333    }
334
335    fn api(&self) -> FileApi {
336        FileApi {
337            inner: self.reader.inner.clone(),
338        }
339    }
340
341    /// Update the feed at the next available index. Mirrors bee-js
342    /// `FeedWriter.uploadPayload`.
343    pub async fn upload_payload(
344        &self,
345        batch_id: &BatchId,
346        data: &[u8],
347    ) -> Result<UploadResult, Error> {
348        self.api()
349            .update_feed(batch_id, &self.signer, self.reader.topic(), data)
350            .await
351    }
352
353    /// Update the feed to point at `reference`. Mirrors bee-js
354    /// `FeedWriter.uploadReference`.
355    pub async fn upload_reference(
356        &self,
357        batch_id: &BatchId,
358        reference: &Reference,
359        index: Option<u64>,
360    ) -> Result<UploadResult, Error> {
361        self.api()
362            .update_feed_with_reference(
363                batch_id,
364                &self.signer,
365                self.reader.topic(),
366                reference,
367                index,
368            )
369            .await
370    }
371
372    /// Update the feed at a specific index.
373    pub async fn upload_payload_at(
374        &self,
375        batch_id: &BatchId,
376        index: u64,
377        data: &[u8],
378    ) -> Result<UploadResult, Error> {
379        self.api()
380            .update_feed_with_index(batch_id, &self.signer, self.reader.topic(), index, data)
381            .await
382    }
383}
384
385/// Compute the feed identifier for a `(topic, index)` pair —
386/// `keccak256(topic || BE-uint64(index))`.
387pub fn make_feed_identifier(topic: &Topic, index: u64) -> Identifier {
388    let mut input = Vec::with_capacity(topic.as_bytes().len() + 8);
389    input.extend_from_slice(topic.as_bytes());
390    input.extend_from_slice(&index.to_be_bytes());
391    Identifier::new(&keccak256(&input)).expect("keccak256 returns 32 bytes")
392}
393
394/// Compute the SOC chunk address for a feed update at
395/// `(owner, topic, index)` — `keccak256(identifier || owner)`. Use
396/// this with [`FileApi::download_chunk`] to verify retrievability of
397/// past updates.
398pub fn feed_update_chunk_reference(
399    owner: &EthAddress,
400    topic: &Topic,
401    index: u64,
402) -> Result<Reference, Error> {
403    let identifier = make_feed_identifier(topic, index);
404    let mut input = Vec::with_capacity(identifier.as_bytes().len() + owner.as_bytes().len());
405    input.extend_from_slice(identifier.as_bytes());
406    input.extend_from_slice(owner.as_bytes());
407    Reference::new(&keccak256(&input))
408}
409
410fn decode_feed_index_header(resp: &reqwest::Response, name: &str) -> Result<u64, Error> {
411    let s = resp
412        .headers()
413        .get(name)
414        .and_then(|v| v.to_str().ok())
415        .ok_or_else(|| Error::argument(format!("missing {name} header")))?;
416    let bytes = hex::decode(s)?;
417    if bytes.len() != 8 {
418        return Err(Error::argument(format!(
419            "{name}: expected 8 bytes, got {}",
420            bytes.len()
421        )));
422    }
423    let mut arr = [0u8; 8];
424    arr.copy_from_slice(&bytes);
425    Ok(u64::from_be_bytes(arr))
426}