Skip to main content

bee_tui/
feed_timeline.rs

1//! Feed history walker — the data model behind the S14 Feed
2//! Timeline screen and the `:feed-timeline` verb. Builds on v1.5's
3//! [`crate::feed_probe`]: probe gives us the latest index, then the
4//! walker fetches each prior index's SOC chunk in parallel (bounded
5//! concurrency) and surfaces a [`Vec<TimelineEntry>`].
6//!
7//! ## Why a separate module
8//!
9//! The fetch path can't trivially reuse [`bee::file::FileApi::fetch_latest_feed_update`]:
10//! that endpoint only returns the latest. For older indexes we
11//! compute the SOC chunk address ourselves via
12//! [`bee::file::feed_update_chunk_reference`] and download the chunk
13//! directly, then unmarshal it as a SOC to recover the
14//! `timestamp || payload` body. Keeping that math in one place
15//! lets both the cockpit screen and the `--once` verb share it.
16
17use std::sync::Arc;
18use std::time::SystemTime;
19
20use bee::file::feed_update_chunk_reference;
21use bee::swarm::{EthAddress, Reference, Topic, soc::unmarshal_single_owner_chunk};
22
23use crate::api::ApiClient;
24
25/// Default ceiling on how many indexes one walk will fetch before
26/// stopping. Operators with thousands of historical updates can
27/// raise this with an explicit `[N]` arg or future config knob.
28pub const DEFAULT_MAX_ENTRIES: u64 = 50;
29
30/// Cap on `[N]` — even with operator override we don't walk more
31/// than this in one verb call. Above this the walk is better suited
32/// to a scripted shell loop with `--once feed-probe` per index.
33pub const HARD_MAX_ENTRIES: u64 = 1_000;
34
35/// One entry in the timeline.
36#[derive(Debug, Clone)]
37pub struct TimelineEntry {
38    pub index: u64,
39    /// Unix-epoch seconds parsed from the chunk's first 8 bytes BE.
40    /// `None` when the payload is malformed (< 8 bytes).
41    pub timestamp_unix: Option<u64>,
42    /// Total payload size including the 8-byte timestamp prefix.
43    pub payload_bytes: usize,
44    /// Embedded Swarm reference when the post-timestamp portion is
45    /// exactly 32 or 64 bytes (unencrypted / encrypted reference).
46    /// `None` for raw feeds.
47    pub reference_hex: Option<String>,
48    /// `None` when the chunk fetched cleanly. `Some(reason)` when it
49    /// was missing (404), errored, or didn't unmarshal as a SOC.
50    /// The screen renders these as a dim "[lost]" / "[error]" row so
51    /// gaps in the history are visible.
52    pub error: Option<String>,
53}
54
55/// The full timeline view returned to callers.
56#[derive(Debug, Clone)]
57pub struct Timeline {
58    pub owner_hex: String,
59    pub topic_hex: String,
60    /// Index of the most recent update at the time of the walk.
61    pub latest_index: u64,
62    /// Where the next write would land (=`latest_index + 1` for
63    /// sequential feeds).
64    pub index_next: u64,
65    /// Entries in newest-first order. Length ≤ `requested_count`.
66    pub entries: Vec<TimelineEntry>,
67    /// `true` when we walked the full requested range; `false` when
68    /// we stopped at index 0 first.
69    pub reached_requested: bool,
70}
71
72impl Timeline {
73    /// Single-line summary used by `--once feed-timeline`'s text
74    /// output and the cockpit verb's "complete" notice.
75    pub fn summary(&self) -> String {
76        let lost = self.entries.iter().filter(|e| e.error.is_some()).count();
77        format!(
78            "feed-timeline owner={} · {} entries · latest=idx{} · {} gaps",
79            short_hex(&self.owner_hex, 8),
80            self.entries.len(),
81            self.latest_index,
82            lost,
83        )
84    }
85}
86
87fn short_hex(hex: &str, len: usize) -> String {
88    let s = hex.trim_start_matches("0x");
89    if s.len() > len {
90        format!("{}…", &s[..len])
91    } else {
92        s.to_string()
93    }
94}
95
96/// Walk a feed's history backward from the latest index. Bounded
97/// concurrency: at most [`MAX_PARALLEL`] in-flight chunk fetches at
98/// a time, so a 50-entry walk never opens 50 sockets.
99const MAX_PARALLEL: usize = 8;
100
101pub async fn walk(
102    api: Arc<ApiClient>,
103    owner: EthAddress,
104    topic: Topic,
105    max_entries: u64,
106) -> Result<Timeline, String> {
107    // First find out where the latest write landed. The cheapest way
108    // is `fetch_latest_feed_update` — same call `:feed-probe` makes.
109    let latest = api
110        .bee()
111        .file()
112        .fetch_latest_feed_update(&owner, &topic)
113        .await
114        .map_err(|e| format!("/feeds/{}/{} failed: {e}", owner.to_hex(), topic.to_hex(),))?;
115
116    let max_entries = max_entries.clamp(1, HARD_MAX_ENTRIES);
117    let cap = (latest.index + 1).min(max_entries);
118    let start = latest.index + 1 - cap; // walk inclusive of `latest.index` back to `start`
119    let reached_requested = cap >= max_entries || start == 0;
120
121    let mut entries: Vec<TimelineEntry> = Vec::with_capacity(cap as usize);
122    let mut idx = latest.index + 1;
123    while idx > start {
124        // Fan out up to MAX_PARALLEL fetches at a time.
125        let batch_size = (idx - start).min(MAX_PARALLEL as u64);
126        let batch_indexes: Vec<u64> = (idx - batch_size..idx).rev().collect();
127        let mut futs = Vec::with_capacity(batch_indexes.len());
128        for i in &batch_indexes {
129            let api_c = api.clone();
130            let i = *i;
131            futs.push(async move { fetch_one(api_c, owner, topic, i).await });
132        }
133        let results = futures::future::join_all(futs).await;
134        for r in results {
135            entries.push(r);
136        }
137        idx -= batch_size;
138    }
139
140    Ok(Timeline {
141        owner_hex: owner.to_hex(),
142        topic_hex: topic.to_hex(),
143        latest_index: latest.index,
144        index_next: latest.index_next,
145        entries,
146        reached_requested,
147    })
148}
149
150async fn fetch_one(
151    api: Arc<ApiClient>,
152    owner: EthAddress,
153    topic: Topic,
154    index: u64,
155) -> TimelineEntry {
156    let soc_ref = match feed_update_chunk_reference(&owner, &topic, index) {
157        Ok(r) => r,
158        Err(e) => {
159            return TimelineEntry {
160                index,
161                timestamp_unix: None,
162                payload_bytes: 0,
163                reference_hex: None,
164                error: Some(format!("ref calc: {e}")),
165            };
166        }
167    };
168    let bytes = match api.bee().file().download_chunk(&soc_ref, None).await {
169        Ok(b) => b,
170        Err(e) => {
171            let s = e.to_string();
172            let kind = if s.contains("404") { "lost" } else { "error" };
173            return TimelineEntry {
174                index,
175                timestamp_unix: None,
176                payload_bytes: 0,
177                reference_hex: None,
178                error: Some(format!("{kind}: {e}")),
179            };
180        }
181    };
182    parse_soc_bytes(&bytes, &soc_ref, index)
183}
184
185/// Pure parser — exposed for unit tests. Takes the raw SOC chunk
186/// bytes and the expected SOC address, returns a [`TimelineEntry`]
187/// with `error=None` when the unmarshal + body shape are clean.
188pub fn parse_soc_bytes(bytes: &[u8], expected: &Reference, index: u64) -> TimelineEntry {
189    let soc = match unmarshal_single_owner_chunk(bytes, expected) {
190        Ok(s) => s,
191        Err(e) => {
192            return TimelineEntry {
193                index,
194                timestamp_unix: None,
195                payload_bytes: 0,
196                reference_hex: None,
197                error: Some(format!("soc parse: {e}")),
198            };
199        }
200    };
201    let payload = soc.payload.as_slice();
202    let payload_bytes = payload.len();
203    let timestamp_unix = if payload_bytes >= 8 {
204        let mut ts = [0u8; 8];
205        ts.copy_from_slice(&payload[..8]);
206        Some(u64::from_be_bytes(ts))
207    } else {
208        None
209    };
210    let reference_hex = if payload_bytes == 8 + 32 || payload_bytes == 8 + 64 {
211        Reference::new(&payload[8..]).ok().map(|r| r.to_hex())
212    } else {
213        None
214    };
215    TimelineEntry {
216        index,
217        timestamp_unix,
218        payload_bytes,
219        reference_hex,
220        error: None,
221    }
222}
223
224/// Wall-clock age of a unix timestamp, formatted for display.
225/// Re-exposed here so the screen renderer doesn't have to import
226/// it from `feed_probe`.
227pub fn format_age_secs(age: u64) -> String {
228    crate::feed_probe::format_age_secs(age)
229}
230
231/// Helper used by the cockpit verb to map the [`TimelineEntry`]
232/// list into rendered table rows. Pure for testability.
233pub fn render_table(timeline: &Timeline) -> Vec<String> {
234    let now = SystemTime::now()
235        .duration_since(SystemTime::UNIX_EPOCH)
236        .map(|d| d.as_secs())
237        .unwrap_or(0);
238    let mut out = Vec::with_capacity(timeline.entries.len() + 1);
239    out.push(format!(
240        "  {:>6}  {:>10}  {:>4}  {:<8}  {}",
241        "INDEX", "AGE", "SIZE", "TYPE", "REF / ERROR",
242    ));
243    for e in &timeline.entries {
244        let age = e
245            .timestamp_unix
246            .map(|t| format_age_secs(now.saturating_sub(t)))
247            .unwrap_or_else(|| "—".to_string());
248        let body = match (&e.error, &e.reference_hex) {
249            (Some(err), _) => format!("[{err}]"),
250            (_, Some(r)) => short_hex(r, 12).to_string(),
251            (_, None) => format!("payload {}B", e.payload_bytes.saturating_sub(8)),
252        };
253        let kind = if e.error.is_some() {
254            "miss"
255        } else if e.reference_hex.is_some() {
256            "ref"
257        } else {
258            "raw"
259        };
260        out.push(format!(
261            "  {:>6}  {:>10}  {:>4}  {:<8}  {}",
262            e.index, age, e.payload_bytes, kind, body,
263        ));
264    }
265    out
266}
267
268#[cfg(test)]
269mod tests {
270    use super::*;
271
272    fn fake_ref() -> Reference {
273        Reference::from_hex(&"a".repeat(64)).unwrap()
274    }
275
276    #[test]
277    fn parse_soc_bytes_returns_error_on_short_input() {
278        let r = parse_soc_bytes(&[0u8; 16], &fake_ref(), 7);
279        assert!(r.error.is_some(), "{r:?}");
280        assert_eq!(r.index, 7);
281    }
282
283    #[test]
284    fn timeline_summary_includes_index_and_gap_count() {
285        let t = Timeline {
286            owner_hex: "1234567890abcdef".repeat(2),
287            topic_hex: "a".repeat(64),
288            latest_index: 10,
289            index_next: 11,
290            entries: vec![
291                TimelineEntry {
292                    index: 10,
293                    timestamp_unix: Some(1_700_000_000),
294                    payload_bytes: 40,
295                    reference_hex: Some("ab".repeat(32)),
296                    error: None,
297                },
298                TimelineEntry {
299                    index: 9,
300                    timestamp_unix: None,
301                    payload_bytes: 0,
302                    reference_hex: None,
303                    error: Some("lost: 404".into()),
304                },
305            ],
306            reached_requested: true,
307        };
308        let s = t.summary();
309        assert!(s.contains("idx10"), "{s}");
310        assert!(s.contains("1 gaps"), "{s}");
311        assert!(s.contains("2 entries"), "{s}");
312    }
313
314    #[test]
315    fn render_table_prints_one_row_per_entry_plus_header() {
316        let t = Timeline {
317            owner_hex: "0xowner".into(),
318            topic_hex: "topic".into(),
319            latest_index: 2,
320            index_next: 3,
321            entries: vec![
322                TimelineEntry {
323                    index: 2,
324                    timestamp_unix: Some(1_700_000_000),
325                    payload_bytes: 40,
326                    reference_hex: Some("c".repeat(64)),
327                    error: None,
328                },
329                TimelineEntry {
330                    index: 1,
331                    timestamp_unix: Some(1_699_900_000),
332                    payload_bytes: 18,
333                    reference_hex: None,
334                    error: None,
335                },
336                TimelineEntry {
337                    index: 0,
338                    timestamp_unix: None,
339                    payload_bytes: 0,
340                    reference_hex: None,
341                    error: Some("lost: 404".into()),
342                },
343            ],
344            reached_requested: true,
345        };
346        let rows = render_table(&t);
347        assert_eq!(rows.len(), 4); // header + 3 entries
348        assert!(rows[0].contains("INDEX"), "{}", rows[0]);
349        assert!(rows[1].contains("ref"), "{}", rows[1]);
350        assert!(rows[2].contains("raw"), "{}", rows[2]);
351        assert!(rows[3].contains("miss"), "{}", rows[3]);
352        assert!(rows[3].contains("lost: 404"), "{}", rows[3]);
353    }
354}