use p2panda_core::logs::LogRanges;
use p2panda_core::{Cursor, Topic, VerifyingKey};
use p2panda_store::logs::LogStore;
use p2panda_store::{SqliteError, SqliteStore};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::sync::mpsc;
use tracing::debug;
use crate::node::AckPolicy;
use crate::operation::{Extensions, LogId, Operation};
use crate::processor::Pipeline;
use crate::streams::StreamEvent;
use crate::streams::acked::Acked;
use crate::streams::stream::{Source, process_operation};
#[derive(Clone, Default, Debug, PartialEq, Eq)]
pub enum StreamFrom {
Start,
#[default]
Frontier,
Cursor(Cursor<VerifyingKey, LogId>),
}
impl From<Cursor<VerifyingKey, LogId>> for StreamFrom {
fn from(cursor: Cursor<VerifyingKey, LogId>) -> Self {
Self::Cursor(cursor)
}
}
pub(crate) async fn replay_log_ranges<M>(
topic: Topic,
store: &SqliteStore,
app_tx: &mpsc::Sender<StreamEvent<M>>,
pipeline: &Pipeline<LogId, Extensions, Topic>,
ack_policy: AckPolicy,
acked: &Acked,
log_ranges: LogRanges<VerifyingKey, LogId>,
) -> Result<(), ReplayError>
where
M: Serialize + for<'a> Deserialize<'a> + Send + 'static,
{
let total_operations = total_operations(&log_ranges);
debug!("replay {total_operations} operations");
if total_operations == 0 {
return Ok(());
}
app_tx
.send(StreamEvent::ReplayStarted { total_operations })
.await
.map_err(|_| ReplayError::CriticalError)?;
for (author, logs) in log_ranges {
for (log_id, (after, until)) in logs {
let Some(operations): Option<Vec<(Operation, _)>> = store
.get_log_entries(&author, &log_id, after, until)
.await?
else {
continue;
};
for (operation, _) in operations {
match process_operation::<M>(
operation,
topic,
pipeline,
ack_policy,
acked,
Source::LocalStore,
)
.await
{
Some(event) => {
app_tx
.send(event)
.await
.map_err(|_| ReplayError::CriticalError)?;
}
None => continue,
}
}
}
}
app_tx
.send(StreamEvent::ReplayEnded)
.await
.map_err(|_| ReplayError::CriticalError)?;
debug!("finished replaying {total_operations} operations");
Ok(())
}
fn total_operations<A, L>(log_ranges: &LogRanges<A, L>) -> u64 {
log_ranges.iter().fold(0, |mut acc, (_, logs)| {
logs.iter().for_each(|(_, (after, until))| {
if let Some(until) = until {
if after.is_none() {
acc += 1;
}
let after = after.unwrap_or_default();
acc += until - after
}
});
acc
})
}
#[derive(Debug, Error)]
pub enum ReplayError {
#[error("an error occurred while querying the store: {0}")]
Store(#[from] SqliteError),
#[error("a critical error occurred in the replay task")]
CriticalError,
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use p2panda_core::logs::LogRanges;
use p2panda_core::{SigningKey, VerifyingKey};
use super::total_operations;
#[test]
fn calculate_total_operations() {
let author_1 = SigningKey::generate().verifying_key();
let author_2 = SigningKey::generate().verifying_key();
let author_3 = SigningKey::generate().verifying_key();
let ranges_1 = {
let mut result: LogRanges<VerifyingKey, usize> = BTreeMap::new();
let logs = result.entry(author_1).or_default();
logs.insert(0, (None, Some(12)));
result
};
assert_eq!(total_operations(&ranges_1), 13);
let ranges_2 = {
let mut result: LogRanges<VerifyingKey, usize> = BTreeMap::new();
let logs = result.entry(author_1).or_default();
logs.insert(0, (Some(0), Some(12)));
logs.insert(1, (Some(5), Some(7)));
logs.insert(2, (Some(77), Some(80)));
let logs = result.entry(author_2).or_default();
logs.insert(3, (None, Some(3)));
let logs = result.entry(author_3).or_default();
logs.insert(4, (Some(10), Some(20)));
result
};
assert_eq!(total_operations(&ranges_2), 12 + 2 + 3 + 4 + 10);
let ranges_3 = {
let mut result: LogRanges<VerifyingKey, usize> = BTreeMap::new();
let logs = result.entry(author_1).or_default();
logs.insert(0, (Some(100), None));
result
};
assert_eq!(total_operations(&ranges_3), 0);
let ranges_4 = {
let mut result: LogRanges<VerifyingKey, usize> = BTreeMap::new();
let logs = result.entry(author_1).or_default();
logs.insert(0, (None, Some(0)));
result
};
assert_eq!(total_operations(&ranges_4), 1);
let ranges_5 = {
let mut result: LogRanges<VerifyingKey, usize> = BTreeMap::new();
let logs = result.entry(author_1).or_default();
logs.insert(0, (Some(0), Some(0)));
result
};
assert_eq!(total_operations(&ranges_5), 0);
let ranges_6 = {
let mut result: LogRanges<VerifyingKey, usize> = BTreeMap::new();
let logs = result.entry(author_1).or_default();
logs.insert(0, (None, None));
result
};
assert_eq!(total_operations(&ranges_6), 0);
let ranges_7 = {
let mut result: LogRanges<VerifyingKey, usize> = BTreeMap::new();
let logs = result.entry(author_1).or_default();
logs.insert(0, (Some(1), Some(1)));
result
};
assert_eq!(total_operations(&ranges_7), 0);
}
}