use std::sync::Arc;
use std::time::SystemTime;
use bee::file::feed_update_chunk_reference;
use bee::swarm::{EthAddress, Reference, Topic, soc::unmarshal_single_owner_chunk};
use crate::api::ApiClient;
pub const DEFAULT_MAX_ENTRIES: u64 = 50;
pub const HARD_MAX_ENTRIES: u64 = 1_000;
#[derive(Debug, Clone)]
pub struct TimelineEntry {
pub index: u64,
pub timestamp_unix: Option<u64>,
pub payload_bytes: usize,
pub reference_hex: Option<String>,
pub error: Option<String>,
}
#[derive(Debug, Clone)]
pub struct Timeline {
pub owner_hex: String,
pub topic_hex: String,
pub latest_index: u64,
pub index_next: u64,
pub entries: Vec<TimelineEntry>,
pub reached_requested: bool,
}
impl Timeline {
pub fn summary(&self) -> String {
let lost = self.entries.iter().filter(|e| e.error.is_some()).count();
format!(
"feed-timeline owner={} · {} entries · latest=idx{} · {} gaps",
short_hex(&self.owner_hex, 8),
self.entries.len(),
self.latest_index,
lost,
)
}
}
fn short_hex(hex: &str, len: usize) -> String {
let s = hex.trim_start_matches("0x");
if s.len() > len {
format!("{}…", &s[..len])
} else {
s.to_string()
}
}
const MAX_PARALLEL: usize = 8;
pub async fn walk(
api: Arc<ApiClient>,
owner: EthAddress,
topic: Topic,
max_entries: u64,
) -> Result<Timeline, String> {
let latest = api
.bee()
.file()
.fetch_latest_feed_update(&owner, &topic)
.await
.map_err(|e| format!("/feeds/{}/{} failed: {e}", owner.to_hex(), topic.to_hex(),))?;
let max_entries = max_entries.clamp(1, HARD_MAX_ENTRIES);
let cap = (latest.index + 1).min(max_entries);
let start = latest.index + 1 - cap; let reached_requested = cap >= max_entries || start == 0;
let mut entries: Vec<TimelineEntry> = Vec::with_capacity(cap as usize);
let mut idx = latest.index + 1;
while idx > start {
let batch_size = (idx - start).min(MAX_PARALLEL as u64);
let batch_indexes: Vec<u64> = (idx - batch_size..idx).rev().collect();
let mut futs = Vec::with_capacity(batch_indexes.len());
for i in &batch_indexes {
let api_c = api.clone();
let i = *i;
futs.push(async move { fetch_one(api_c, owner, topic, i).await });
}
let results = futures::future::join_all(futs).await;
for r in results {
entries.push(r);
}
idx -= batch_size;
}
Ok(Timeline {
owner_hex: owner.to_hex(),
topic_hex: topic.to_hex(),
latest_index: latest.index,
index_next: latest.index_next,
entries,
reached_requested,
})
}
async fn fetch_one(
api: Arc<ApiClient>,
owner: EthAddress,
topic: Topic,
index: u64,
) -> TimelineEntry {
let soc_ref = match feed_update_chunk_reference(&owner, &topic, index) {
Ok(r) => r,
Err(e) => {
return TimelineEntry {
index,
timestamp_unix: None,
payload_bytes: 0,
reference_hex: None,
error: Some(format!("ref calc: {e}")),
};
}
};
let bytes = match api.bee().file().download_chunk(&soc_ref, None).await {
Ok(b) => b,
Err(e) => {
let s = e.to_string();
let kind = if s.contains("404") { "lost" } else { "error" };
return TimelineEntry {
index,
timestamp_unix: None,
payload_bytes: 0,
reference_hex: None,
error: Some(format!("{kind}: {e}")),
};
}
};
parse_soc_bytes(&bytes, &soc_ref, index)
}
pub fn parse_soc_bytes(bytes: &[u8], expected: &Reference, index: u64) -> TimelineEntry {
let soc = match unmarshal_single_owner_chunk(bytes, expected) {
Ok(s) => s,
Err(e) => {
return TimelineEntry {
index,
timestamp_unix: None,
payload_bytes: 0,
reference_hex: None,
error: Some(format!("soc parse: {e}")),
};
}
};
let payload = soc.payload.as_slice();
let payload_bytes = payload.len();
let timestamp_unix = if payload_bytes >= 8 {
let mut ts = [0u8; 8];
ts.copy_from_slice(&payload[..8]);
Some(u64::from_be_bytes(ts))
} else {
None
};
let reference_hex = if payload_bytes == 8 + 32 || payload_bytes == 8 + 64 {
Reference::new(&payload[8..]).ok().map(|r| r.to_hex())
} else {
None
};
TimelineEntry {
index,
timestamp_unix,
payload_bytes,
reference_hex,
error: None,
}
}
pub fn format_age_secs(age: u64) -> String {
crate::feed_probe::format_age_secs(age)
}
pub fn render_table(timeline: &Timeline) -> Vec<String> {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let mut out = Vec::with_capacity(timeline.entries.len() + 1);
out.push(format!(
" {:>6} {:>10} {:>4} {:<8} {}",
"INDEX", "AGE", "SIZE", "TYPE", "REF / ERROR",
));
for e in &timeline.entries {
let age = e
.timestamp_unix
.map(|t| format_age_secs(now.saturating_sub(t)))
.unwrap_or_else(|| "—".to_string());
let body = match (&e.error, &e.reference_hex) {
(Some(err), _) => format!("[{err}]"),
(_, Some(r)) => short_hex(r, 12).to_string(),
(_, None) => format!("payload {}B", e.payload_bytes.saturating_sub(8)),
};
let kind = if e.error.is_some() {
"miss"
} else if e.reference_hex.is_some() {
"ref"
} else {
"raw"
};
out.push(format!(
" {:>6} {:>10} {:>4} {:<8} {}",
e.index, age, e.payload_bytes, kind, body,
));
}
out
}
#[cfg(test)]
mod tests {
use super::*;
fn fake_ref() -> Reference {
Reference::from_hex(&"a".repeat(64)).unwrap()
}
#[test]
fn parse_soc_bytes_returns_error_on_short_input() {
let r = parse_soc_bytes(&[0u8; 16], &fake_ref(), 7);
assert!(r.error.is_some(), "{r:?}");
assert_eq!(r.index, 7);
}
#[test]
fn timeline_summary_includes_index_and_gap_count() {
let t = Timeline {
owner_hex: "1234567890abcdef".repeat(2),
topic_hex: "a".repeat(64),
latest_index: 10,
index_next: 11,
entries: vec![
TimelineEntry {
index: 10,
timestamp_unix: Some(1_700_000_000),
payload_bytes: 40,
reference_hex: Some("ab".repeat(32)),
error: None,
},
TimelineEntry {
index: 9,
timestamp_unix: None,
payload_bytes: 0,
reference_hex: None,
error: Some("lost: 404".into()),
},
],
reached_requested: true,
};
let s = t.summary();
assert!(s.contains("idx10"), "{s}");
assert!(s.contains("1 gaps"), "{s}");
assert!(s.contains("2 entries"), "{s}");
}
#[test]
fn render_table_prints_one_row_per_entry_plus_header() {
let t = Timeline {
owner_hex: "0xowner".into(),
topic_hex: "topic".into(),
latest_index: 2,
index_next: 3,
entries: vec![
TimelineEntry {
index: 2,
timestamp_unix: Some(1_700_000_000),
payload_bytes: 40,
reference_hex: Some("c".repeat(64)),
error: None,
},
TimelineEntry {
index: 1,
timestamp_unix: Some(1_699_900_000),
payload_bytes: 18,
reference_hex: None,
error: None,
},
TimelineEntry {
index: 0,
timestamp_unix: None,
payload_bytes: 0,
reference_hex: None,
error: Some("lost: 404".into()),
},
],
reached_requested: true,
};
let rows = render_table(&t);
assert_eq!(rows.len(), 4); assert!(rows[0].contains("INDEX"), "{}", rows[0]);
assert!(rows[1].contains("ref"), "{}", rows[1]);
assert!(rows[2].contains("raw"), "{}", rows[2]);
assert!(rows[3].contains("miss"), "{}", rows[3]);
assert!(rows[3].contains("lost: 404"), "{}", rows[3]);
}
}