1use std::sync::Arc;
18use std::time::SystemTime;
19
20use bee::file::feed_update_chunk_reference;
21use bee::swarm::{EthAddress, Reference, Topic, soc::unmarshal_single_owner_chunk};
22
23use crate::api::ApiClient;
24
25pub const DEFAULT_MAX_ENTRIES: u64 = 50;
29
30pub const HARD_MAX_ENTRIES: u64 = 1_000;
34
35#[derive(Debug, Clone)]
37pub struct TimelineEntry {
38 pub index: u64,
39 pub timestamp_unix: Option<u64>,
42 pub payload_bytes: usize,
44 pub reference_hex: Option<String>,
48 pub error: Option<String>,
53}
54
55#[derive(Debug, Clone)]
57pub struct Timeline {
58 pub owner_hex: String,
59 pub topic_hex: String,
60 pub latest_index: u64,
62 pub index_next: u64,
65 pub entries: Vec<TimelineEntry>,
67 pub reached_requested: bool,
70}
71
72impl Timeline {
73 pub fn summary(&self) -> String {
76 let lost = self.entries.iter().filter(|e| e.error.is_some()).count();
77 format!(
78 "feed-timeline owner={} · {} entries · latest=idx{} · {} gaps",
79 short_hex(&self.owner_hex, 8),
80 self.entries.len(),
81 self.latest_index,
82 lost,
83 )
84 }
85}
86
87fn short_hex(hex: &str, len: usize) -> String {
88 let s = hex.trim_start_matches("0x");
89 if s.len() > len {
90 format!("{}…", &s[..len])
91 } else {
92 s.to_string()
93 }
94}
95
96const MAX_PARALLEL: usize = 8;
100
101pub async fn walk(
102 api: Arc<ApiClient>,
103 owner: EthAddress,
104 topic: Topic,
105 max_entries: u64,
106) -> Result<Timeline, String> {
107 let latest = api
110 .bee()
111 .file()
112 .fetch_latest_feed_update(&owner, &topic)
113 .await
114 .map_err(|e| format!("/feeds/{}/{} failed: {e}", owner.to_hex(), topic.to_hex(),))?;
115
116 let max_entries = max_entries.clamp(1, HARD_MAX_ENTRIES);
117 let cap = (latest.index + 1).min(max_entries);
118 let start = latest.index + 1 - cap; let reached_requested = cap >= max_entries || start == 0;
120
121 let mut entries: Vec<TimelineEntry> = Vec::with_capacity(cap as usize);
122 let mut idx = latest.index + 1;
123 while idx > start {
124 let batch_size = (idx - start).min(MAX_PARALLEL as u64);
126 let batch_indexes: Vec<u64> = (idx - batch_size..idx).rev().collect();
127 let mut futs = Vec::with_capacity(batch_indexes.len());
128 for i in &batch_indexes {
129 let api_c = api.clone();
130 let i = *i;
131 futs.push(async move { fetch_one(api_c, owner, topic, i).await });
132 }
133 let results = futures::future::join_all(futs).await;
134 for r in results {
135 entries.push(r);
136 }
137 idx -= batch_size;
138 }
139
140 Ok(Timeline {
141 owner_hex: owner.to_hex(),
142 topic_hex: topic.to_hex(),
143 latest_index: latest.index,
144 index_next: latest.index_next,
145 entries,
146 reached_requested,
147 })
148}
149
150async fn fetch_one(
151 api: Arc<ApiClient>,
152 owner: EthAddress,
153 topic: Topic,
154 index: u64,
155) -> TimelineEntry {
156 let soc_ref = match feed_update_chunk_reference(&owner, &topic, index) {
157 Ok(r) => r,
158 Err(e) => {
159 return TimelineEntry {
160 index,
161 timestamp_unix: None,
162 payload_bytes: 0,
163 reference_hex: None,
164 error: Some(format!("ref calc: {e}")),
165 };
166 }
167 };
168 let bytes = match api.bee().file().download_chunk(&soc_ref, None).await {
169 Ok(b) => b,
170 Err(e) => {
171 let s = e.to_string();
172 let kind = if s.contains("404") { "lost" } else { "error" };
173 return TimelineEntry {
174 index,
175 timestamp_unix: None,
176 payload_bytes: 0,
177 reference_hex: None,
178 error: Some(format!("{kind}: {e}")),
179 };
180 }
181 };
182 parse_soc_bytes(&bytes, &soc_ref, index)
183}
184
185pub fn parse_soc_bytes(bytes: &[u8], expected: &Reference, index: u64) -> TimelineEntry {
189 let soc = match unmarshal_single_owner_chunk(bytes, expected) {
190 Ok(s) => s,
191 Err(e) => {
192 return TimelineEntry {
193 index,
194 timestamp_unix: None,
195 payload_bytes: 0,
196 reference_hex: None,
197 error: Some(format!("soc parse: {e}")),
198 };
199 }
200 };
201 let payload = soc.payload.as_slice();
202 let payload_bytes = payload.len();
203 let timestamp_unix = if payload_bytes >= 8 {
204 let mut ts = [0u8; 8];
205 ts.copy_from_slice(&payload[..8]);
206 Some(u64::from_be_bytes(ts))
207 } else {
208 None
209 };
210 let reference_hex = if payload_bytes == 8 + 32 || payload_bytes == 8 + 64 {
211 Reference::new(&payload[8..]).ok().map(|r| r.to_hex())
212 } else {
213 None
214 };
215 TimelineEntry {
216 index,
217 timestamp_unix,
218 payload_bytes,
219 reference_hex,
220 error: None,
221 }
222}
223
224pub fn format_age_secs(age: u64) -> String {
228 crate::feed_probe::format_age_secs(age)
229}
230
231pub fn render_table(timeline: &Timeline) -> Vec<String> {
234 let now = SystemTime::now()
235 .duration_since(SystemTime::UNIX_EPOCH)
236 .map(|d| d.as_secs())
237 .unwrap_or(0);
238 let mut out = Vec::with_capacity(timeline.entries.len() + 1);
239 out.push(format!(
240 " {:>6} {:>10} {:>4} {:<8} {}",
241 "INDEX", "AGE", "SIZE", "TYPE", "REF / ERROR",
242 ));
243 for e in &timeline.entries {
244 let age = e
245 .timestamp_unix
246 .map(|t| format_age_secs(now.saturating_sub(t)))
247 .unwrap_or_else(|| "—".to_string());
248 let body = match (&e.error, &e.reference_hex) {
249 (Some(err), _) => format!("[{err}]"),
250 (_, Some(r)) => short_hex(r, 12).to_string(),
251 (_, None) => format!("payload {}B", e.payload_bytes.saturating_sub(8)),
252 };
253 let kind = if e.error.is_some() {
254 "miss"
255 } else if e.reference_hex.is_some() {
256 "ref"
257 } else {
258 "raw"
259 };
260 out.push(format!(
261 " {:>6} {:>10} {:>4} {:<8} {}",
262 e.index, age, e.payload_bytes, kind, body,
263 ));
264 }
265 out
266}
267
268#[cfg(test)]
269mod tests {
270 use super::*;
271
272 fn fake_ref() -> Reference {
273 Reference::from_hex(&"a".repeat(64)).unwrap()
274 }
275
276 #[test]
277 fn parse_soc_bytes_returns_error_on_short_input() {
278 let r = parse_soc_bytes(&[0u8; 16], &fake_ref(), 7);
279 assert!(r.error.is_some(), "{r:?}");
280 assert_eq!(r.index, 7);
281 }
282
283 #[test]
284 fn timeline_summary_includes_index_and_gap_count() {
285 let t = Timeline {
286 owner_hex: "1234567890abcdef".repeat(2),
287 topic_hex: "a".repeat(64),
288 latest_index: 10,
289 index_next: 11,
290 entries: vec![
291 TimelineEntry {
292 index: 10,
293 timestamp_unix: Some(1_700_000_000),
294 payload_bytes: 40,
295 reference_hex: Some("ab".repeat(32)),
296 error: None,
297 },
298 TimelineEntry {
299 index: 9,
300 timestamp_unix: None,
301 payload_bytes: 0,
302 reference_hex: None,
303 error: Some("lost: 404".into()),
304 },
305 ],
306 reached_requested: true,
307 };
308 let s = t.summary();
309 assert!(s.contains("idx10"), "{s}");
310 assert!(s.contains("1 gaps"), "{s}");
311 assert!(s.contains("2 entries"), "{s}");
312 }
313
314 #[test]
315 fn render_table_prints_one_row_per_entry_plus_header() {
316 let t = Timeline {
317 owner_hex: "0xowner".into(),
318 topic_hex: "topic".into(),
319 latest_index: 2,
320 index_next: 3,
321 entries: vec![
322 TimelineEntry {
323 index: 2,
324 timestamp_unix: Some(1_700_000_000),
325 payload_bytes: 40,
326 reference_hex: Some("c".repeat(64)),
327 error: None,
328 },
329 TimelineEntry {
330 index: 1,
331 timestamp_unix: Some(1_699_900_000),
332 payload_bytes: 18,
333 reference_hex: None,
334 error: None,
335 },
336 TimelineEntry {
337 index: 0,
338 timestamp_unix: None,
339 payload_bytes: 0,
340 reference_hex: None,
341 error: Some("lost: 404".into()),
342 },
343 ],
344 reached_requested: true,
345 };
346 let rows = render_table(&t);
347 assert_eq!(rows.len(), 4); assert!(rows[0].contains("INDEX"), "{}", rows[0]);
349 assert!(rows[1].contains("ref"), "{}", rows[1]);
350 assert!(rows[2].contains("raw"), "{}", rows[2]);
351 assert!(rows[3].contains("miss"), "{}", rows[3]);
352 assert!(rows[3].contains("lost: 404"), "{}", rows[3]);
353 }
354}