use p2panda_core::logs::LogRanges;
use p2panda_core::{Cursor, Topic, VerifyingKey};
use p2panda_store::logs::LogStore;
use p2panda_store::{SqliteError, SqliteStore};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::sync::mpsc;
use crate::node::AckPolicy;
use crate::operation::{Extensions, LogId, Operation};
use crate::processor::Pipeline;
use crate::streams::StreamEvent;
use crate::streams::acked::Acked;
use crate::streams::stream::{Source, process_operation};
#[derive(Clone, Default, Debug, PartialEq, Eq)]
pub enum StreamFrom {
Start,
#[default]
Frontier,
Cursor(Cursor<VerifyingKey, LogId>),
}
impl From<Cursor<VerifyingKey, LogId>> for StreamFrom {
fn from(cursor: Cursor<VerifyingKey, LogId>) -> Self {
Self::Cursor(cursor)
}
}
pub(crate) async fn replay_log_ranges<M>(
topic: Topic,
store: &SqliteStore,
app_tx: &mpsc::Sender<StreamEvent<M>>,
pipeline: &Pipeline<LogId, Extensions, Topic>,
ack_policy: AckPolicy,
acked: &Acked,
log_ranges: LogRanges<VerifyingKey, LogId>,
) -> Result<(), ReplayError>
where
M: Serialize + for<'a> Deserialize<'a> + Send + 'static,
{
for (author, logs) in log_ranges {
for (log_id, (from, to)) in logs {
let Some(operations): Option<Vec<(Operation, _)>> =
store.get_log_entries(&author, &log_id, from, to).await?
else {
continue;
};
for (operation, _) in operations {
match process_operation::<M>(
operation,
topic,
pipeline,
ack_policy,
acked,
Source::LocalStore,
)
.await
{
Some(event) => {
app_tx
.send(event)
.await
.map_err(|_| ReplayError::CriticalError)?;
}
None => continue,
}
}
}
}
Ok(())
}
#[derive(Debug, Error)]
pub enum ReplayError {
#[error("an error occurred while querying the store: {0}")]
Store(#[from] SqliteError),
#[error("a critical error occurred in the replay task")]
CriticalError,
}