use crate::LOG_MODULE;
use crate::SharedError;
use crate::epochs::slot_to_epoch;
use crate::firehose::FirehoseError;
use crate::index::{SlotOffsetIndexError, slot_to_offset};
use crate::node::{Node, NodeWithCid, NodesWithCids, parse_any_from_cbordata};
use crate::utils;
use cid::Cid;
use once_cell::sync::Lazy;
use reqwest::RequestBuilder;
use rseek::Seekable;
use std::io;
use std::io::SeekFrom;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;
use std::vec::Vec;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
use tokio::task::yield_now;
const MAX_VARINT_LEN_64: usize = 10;
const MIN_SEEK_SPACING_MS: u64 = 51;
static SEEK_START_INSTANT: Lazy<Instant> = Lazy::new(Instant::now);
static LAST_SEEK_HIT_TIME: AtomicU64 = AtomicU64::new(0);
pub async fn read_uvarint<R: AsyncRead + Unpin>(reader: &mut R) -> io::Result<u64> {
let mut x = 0u64;
let mut s = 0u32;
let mut buffer = [0u8; 1];
for i in 0..MAX_VARINT_LEN_64 {
reader.read_exact(&mut buffer).await?;
let b = buffer[0];
if b < 0x80 {
if i == MAX_VARINT_LEN_64 - 1 && b > 1 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"uvarint overflow",
));
}
return Ok(x | ((b as u64) << s));
}
x |= ((b & 0x7f) as u64) << s;
s += 7;
if s > 63 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"uvarint too long",
));
}
}
Err(io::Error::new(
io::ErrorKind::InvalidData,
"uvarint overflow",
))
}
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct RawNode {
pub cid: Cid,
pub data: Vec<u8>,
}
impl core::fmt::Debug for RawNode {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("RawNode")
.field("cid", &self.cid)
.field("data", &self.data)
.finish()
}
}
impl RawNode {
pub const fn new(cid: Cid, data: Vec<u8>) -> RawNode {
RawNode { cid, data }
}
pub fn parse(&self) -> Result<Node, SharedError> {
match parse_any_from_cbordata(self.data.clone()) {
Ok(node) => Ok(node),
Err(err) => {
println!("Error: {:?}", err);
Err(Box::new(std::io::Error::other("Unknown type".to_owned())))
}
}
}
pub async fn from_cursor(cursor: &mut io::Cursor<Vec<u8>>) -> Result<RawNode, SharedError> {
let cid_version = read_uvarint(cursor).await?;
let multicodec = read_uvarint(cursor).await?;
let hash_function = read_uvarint(cursor).await?;
let digest_length = read_uvarint(cursor).await?;
if digest_length > 64 {
return Err(Box::new(std::io::Error::other(format!(
"Digest length too long, position={}",
cursor.position()
))));
}
let mut digest = vec![0u8; digest_length as usize];
cursor.read_exact(&mut digest).await?;
let mut data = vec![];
cursor.read_to_end(&mut data).await?;
let ha = multihash::Multihash::wrap(hash_function, digest.as_slice())?;
match cid_version {
0 => {
let cid = Cid::new_v0(ha)?;
let raw_node = RawNode::new(cid, data);
Ok(raw_node)
}
1 => {
let cid = Cid::new_v1(multicodec, ha);
let raw_node = RawNode::new(cid, data);
Ok(raw_node)
}
_ => Err(Box::new(std::io::Error::other(
"Unknown CID version".to_owned(),
))),
}
}
}
pub trait Len {
fn len(&self) -> u64;
fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl<F> Len for Seekable<F>
where
F: Fn() -> RequestBuilder + Send + Sync + 'static,
{
fn len(&self) -> u64 {
self.file_size.unwrap_or(0)
}
}
pub struct NodeReader<R: AsyncRead + AsyncSeek + Len> {
pub reader: R,
pub header: Vec<u8>,
pub item_index: u64,
}
impl<R: AsyncRead + Unpin + AsyncSeek + Len> NodeReader<R> {
pub const fn new(reader: R) -> NodeReader<R> {
NodeReader {
reader,
header: vec![],
item_index: 0,
}
}
pub async fn read_raw_header(&mut self) -> Result<Vec<u8>, SharedError> {
if !self.header.is_empty() {
return Ok(self.header.clone());
};
let header_length = read_uvarint(&mut self.reader).await?;
if header_length > 1024 {
return Err(Box::new(std::io::Error::other(
"Header length too long".to_owned(),
)));
}
let mut header = vec![0u8; header_length as usize];
self.reader.read_exact(&mut header).await?;
self.header.clone_from(&header);
let clone = header.clone();
Ok(clone.as_slice().to_owned())
}
pub async fn seek_to_slot(&mut self, slot: u64) -> Result<(), FirehoseError> {
self.seek_to_slot_inner(slot).await
}
async fn seek_to_slot_inner(&mut self, slot: u64) -> Result<(), FirehoseError> {
if self.header.is_empty() {
self.read_raw_header()
.await
.map_err(FirehoseError::SeekToSlotError)?;
};
let mut current = slot;
loop {
let epoch = slot_to_epoch(current);
match slot_to_offset(current).await {
Ok(offset) => {
log::info!(
target: LOG_MODULE,
"Seeking to slot {} in epoch {} @ offset {}",
current,
epoch,
offset
);
wait_for_seek_hit_slot().await;
self.reader
.seek(SeekFrom::Start(offset))
.await
.map_err(|e| FirehoseError::SeekToSlotError(Box::new(e)))?;
return Ok(());
}
Err(SlotOffsetIndexError::SlotNotFound(..)) => {
log::warn!(
target: LOG_MODULE,
"Slot {} not found in index, seeking to next slot",
current
);
if current == u64::MAX {
return Err(FirehoseError::SeekToSlotError(Box::new(
std::io::Error::other("slot search exhausted u64 range"),
)));
}
current = current.saturating_add(1);
}
Err(err) => return Err(FirehoseError::SeekToSlotError(Box::new(err))),
}
}
}
#[allow(clippy::should_implement_trait)]
pub async fn next(&mut self) -> Result<RawNode, SharedError> {
if self.header.is_empty() {
self.read_raw_header().await?;
};
self.item_index += 1;
let section_size = read_uvarint(&mut self.reader).await?;
if section_size > utils::MAX_ALLOWED_SECTION_SIZE as u64 {
return Err(Box::new(std::io::Error::other(
"Section size too long".to_owned(),
)));
}
let mut item = vec![0u8; section_size as usize];
self.reader.read_exact(&mut item).await?;
let mut cursor = io::Cursor::new(item);
RawNode::from_cursor(&mut cursor).await
}
pub async fn next_parsed(&mut self) -> Result<NodeWithCid, SharedError> {
let raw_node = self.next().await?;
let cid = raw_node.cid;
Ok(NodeWithCid::new(cid, raw_node.parse()?))
}
pub async fn read_until_block(&mut self) -> Result<NodesWithCids, SharedError> {
let mut nodes = NodesWithCids::new();
loop {
let node = match self.next_parsed().await {
Ok(node) => node,
Err(e)
if e.downcast_ref::<io::Error>()
.is_some_and(|io_err| io_err.kind() == io::ErrorKind::UnexpectedEof) =>
{
break;
}
Err(e) => return Err(e),
};
if node.get_node().is_block() {
nodes.push(node);
break;
}
nodes.push(node);
}
Ok(nodes)
}
pub const fn get_item_index(&self) -> u64 {
self.item_index
}
}
fn seek_monotonic_millis() -> u64 {
let elapsed = SEEK_START_INSTANT.elapsed().as_millis();
if elapsed > u64::MAX as u128 {
u64::MAX
} else {
elapsed as u64
}
}
async fn wait_for_seek_hit_slot() {
loop {
let now_ms = seek_monotonic_millis();
let last_hit = LAST_SEEK_HIT_TIME.load(Ordering::Relaxed);
if now_ms.saturating_sub(last_hit) < MIN_SEEK_SPACING_MS {
yield_now().await;
continue;
}
if LAST_SEEK_HIT_TIME
.compare_exchange(last_hit, now_ms, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
return;
}
yield_now().await;
}
}
pub fn cid_from_cbor_link(val: &serde_cbor::Value) -> Result<cid::Cid, SharedError> {
if let serde_cbor::Value::Bytes(b) = val
&& b.first() == Some(&0)
{
return Ok(cid::Cid::try_from(b[1..].to_vec())?);
}
Err("invalid DAG‑CBOR link encoding".into())
}
#[tokio::test]
async fn test_async_node_reader() {
use crate::epochs::fetch_epoch_stream;
let client = crate::network::create_http_client();
let stream = fetch_epoch_stream(670, &client).await;
let mut reader = NodeReader::new(stream);
let nodes = reader.read_until_block().await.unwrap();
assert_eq!(nodes.len(), 117);
}