use std::fmt::Debug;
use std::thread;
use futures_util::StreamExt;
use p2panda_core::traits::Digest;
use p2panda_core::{Extensions, Hash, LogId, Operation, SeqNum, VerifyingKey};
use p2panda_store::Transaction;
use p2panda_store::logs::LogStore;
use p2panda_store::operations::OperationStore;
use p2panda_store::topics::TopicStore;
use p2panda_stream::StreamLayerExt;
use p2panda_stream::ingest::Ingest;
use p2panda_stream::log_prune::LogPrune;
use tokio::pin;
use tokio::runtime::Builder;
use tokio::sync::mpsc;
use tokio::task::LocalSet;
use tokio_stream::wrappers::ReceiverStream;
use crate::processor::tasks::TaskTracker;
use crate::processor::{Event, ProcessorStatus};
const PUBLISH_BUFFER_SIZE: usize = 128;
#[derive(Clone, Debug)]
pub struct Pipeline<L, E, TP> {
pipeline_tx: mpsc::Sender<Event<L, E, TP>>,
tasks: TaskTracker<Event<L, E, TP>, Hash>,
}
impl<L, E, TP> Pipeline<L, E, TP>
where
L: LogId + Send + 'static,
E: Extensions + Send + 'static,
TP: Clone + Send + 'static,
{
pub fn new<S>(store: S, tasks: TaskTracker<Event<L, E, TP>, Hash>) -> Self
where
S: Clone
+ Transaction
+ OperationStore<Operation<E>, Hash, L>
+ LogStore<Operation<E>, VerifyingKey, L, SeqNum, Hash>
+ TopicStore<TP, VerifyingKey, L>
+ Send
+ 'static,
{
let (pipeline_tx, pipeline_rx) = mpsc::channel(PUBLISH_BUFFER_SIZE);
{
let tasks = tasks.clone();
let rt = Builder::new_current_thread()
.enable_all()
.build()
.expect("runtime for current thread");
thread::spawn(move || {
let local = LocalSet::new();
local.spawn_local(async move {
let ingest = Ingest::<S, Event<L, E, TP>, L, E, TP>::new(store.clone());
let log_prune = LogPrune::<S, Event<L, E, TP>, L, E>::new(store);
let pipeline = ReceiverStream::new(pipeline_rx)
.layer(ingest)
.map(|result| match result {
Ok((mut event, result)) => {
event.ingest = ProcessorStatus::Completed(result);
event
}
Err((mut event, err)) => {
event.ingest = ProcessorStatus::Failed(err);
event
}
})
.layer(log_prune)
.map(|result| match result {
Ok((mut event, result)) => {
event.log_prune = ProcessorStatus::Completed(result);
event
}
Err((mut event, err)) => {
event.log_prune = ProcessorStatus::Failed(err);
event
}
});
pin!(pipeline);
while let Some(operation) = pipeline.next().await {
tasks.mark_as_done(operation.hash(), operation).await;
}
});
rt.block_on(local);
});
}
Self { pipeline_tx, tasks }
}
pub async fn process(&self, input: Event<L, E, TP>) -> Event<L, E, TP> {
let task = self.tasks.track(input.hash()).await;
let _ = self.pipeline_tx.send(input).await;
task.ready().await
}
}
#[cfg(test)]
mod tests {
use p2panda_core::test_utils::TestLog;
use p2panda_core::traits::Digest;
use p2panda_core::{PruneFlag, SigningKey, Topic};
use p2panda_store::SqliteStore;
use crate::operation::LogId;
use crate::processor::TaskTracker;
use super::{Event, Pipeline};
#[tokio::test]
async fn processing_operations() {
let store = SqliteStore::temporary().await;
let tasks = TaskTracker::new();
let processor = Pipeline::<LogId, (), Topic>::new(store, tasks);
let log = TestLog::new();
let topic = Topic::random();
let mut operation = log.operation(b"test", ());
let result = processor
.process(Event::new(
operation.clone(),
LogId::from_topic(topic),
topic,
PruneFlag::default(),
))
.await;
assert_eq!(result.hash(), operation.hash());
assert!(result.is_completed());
assert!(!result.is_failed());
operation.header.verifying_key = SigningKey::generate().verifying_key();
let result = processor
.process(Event::new(
operation.clone(),
LogId::from_topic(topic),
topic,
PruneFlag::default(),
))
.await;
assert_eq!(result.hash(), operation.hash());
assert!(!result.is_completed());
assert!(result.is_failed());
}
}