use std::borrow::Borrow;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use futures_util::stream::BoxStream;
use futures_util::{FutureExt, Stream, StreamExt};
use p2panda_core::cbor::{DecodeError, EncodeError, decode_cbor, encode_cbor};
use p2panda_core::traits::Digest;
use p2panda_core::{Hash, Topic, VerifyingKey};
use p2panda_net::NodeId;
use p2panda_net::sync::SyncHandle;
use p2panda_net::utils::ShortFormat;
use p2panda_store::SqliteStore;
use p2panda_store::operations::OperationStore;
use p2panda_sync::protocols::TopicLogSyncEvent;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::ReceiverStream;
use tracing::warn;
use crate::forge::{Forge, ForgeError, OperationForge};
use crate::node::{AckPolicy, CreateStreamError};
use crate::operation::{Extensions, Header, LogId, Operation};
use crate::processor::{Event, Pipeline, ProcessorError};
use crate::streams::acked::{Acked, AckedError};
use crate::streams::external_stream::{
ExternalStream, ExternalStreamEvent, ExternalStreamFuture, SessionId,
};
use crate::streams::replay::{ReplayError, StreamFrom, replay_log_ranges};
use crate::streams::sync_metrics::{self, Aggregator, SessionPhase, SyncError};
const BUFFER_SIZE: usize = 16;
const PUBLISH_BUFFER_SIZE: usize = 128;
const IMPORT_BUFFER_SIZE: usize = 16;
#[allow(clippy::too_many_arguments)]
pub(crate) async fn processed_stream<M>(
topic: Topic,
ack_policy: AckPolicy,
sync_handle: SyncHandle<Operation, TopicLogSyncEvent<Extensions>>,
store: SqliteStore,
forge: OperationForge,
pipeline: Pipeline<LogId, Extensions, Topic>,
from: StreamFrom,
) -> Result<(StreamPublisher<M>, StreamSubscription<M>), CreateStreamError>
where
M: Serialize + for<'a> Deserialize<'a> + Send + 'static,
{
let acked = Acked::new(store.clone(), topic);
let mut sync_stream = sync_handle
.subscribe()
.await
.map_err(|err| CreateStreamError(err.to_string()))?;
let (app_tx, app_rx) = mpsc::channel::<StreamEvent<M>>(BUFFER_SIZE);
let (publish_tx, mut publish_rx) = mpsc::channel::<(
Operation,
Option<M>,
oneshot::Sender<Event<LogId, Extensions, Topic>>,
)>(PUBLISH_BUFFER_SIZE);
let (import_tx, mut import_rx) = mpsc::channel::<(
BoxStream<'static, Operation>,
oneshot::Sender<ExternalStreamFuture>,
)>(IMPORT_BUFFER_SIZE);
let mut external_stream = ExternalStream::default();
let nacked_log_ranges = acked
.nacked_log_ranges(from)
.await
.map_err(|err| CreateStreamError(err.to_string()))?;
{
let pipeline = pipeline.clone();
let acked = acked.clone();
let store = store.clone();
tokio::spawn(async move {
{
let replay_result = replay_log_ranges(
topic,
&store,
&app_tx,
&pipeline,
ack_policy,
&acked,
nacked_log_ranges,
)
.await;
if let Err(error) = replay_result {
warn!(
topic = %topic.fmt_short(),
"error occurred in replay task: {error}"
);
let _ = app_tx
.send(StreamEvent::ReplayFailed {
error: Arc::new(error),
})
.await;
}
}
let mut aggregator = Aggregator::new();
loop {
let event = tokio::select! {
item = sync_stream.next() => {
let Some(result) = item else {
break;
};
let Ok(from_sync) = result else {
continue;
};
let Some(event) = aggregator.process(from_sync) else {
continue;
};
match event {
sync_metrics::SyncEvent::SyncStarted { .. } => event.into(),
sync_metrics::SyncEvent::SyncEnded { .. } => event.into(),
sync_metrics::SyncEvent::OperationReceived { operation, source } => {
let Some(event) = process_operation::<M>(
*operation,
topic,
&pipeline,
ack_policy,
&acked,
source
).await else {
continue;
};
event
},
}
}
Some((operation, message, processed_tx)) = publish_rx.recv() => {
let event = process_published_operation(
operation,
topic,
&pipeline,
).await;
let _ = processed_tx.send(event.clone());
let result = match message {
Some(message) => {
ack_published_operation(message, event, topic, ack_policy, &acked).await
},
None => {
ack_published_operation_wo_body(event, &acked).await
},
};
let Some(event) = result else {
continue;
};
event
}
Some((stream, ready_tx)) = import_rx.recv() => {
let external_stream_future = external_stream.insert(stream);
let session_id = external_stream_future.session_id();
if ready_tx.send(external_stream_future).is_err() {
warn!(session_id = session_id, "failed sending on import ready channel")
};
continue;
}
Some(event) = external_stream.next() => {
match event {
ExternalStreamEvent::Start { session_id } => StreamEvent::ImportStarted { session_id },
ExternalStreamEvent::Operation { session_id, operation } => {
let Some(event) = process_operation::<M>(
*operation,
topic,
&pipeline,
ack_policy,
&acked,
Source::ExternalStream { session_id },
).await else {
continue;
};
event
},
ExternalStreamEvent::End { session_id } => StreamEvent::ImportEnded { session_id },
}
}
};
let _ = app_tx.send(event).await;
}
});
}
let sync_handle = Arc::new(sync_handle);
let tx = StreamPublisher {
topic,
sync_handle: sync_handle.clone(),
forge,
publish_tx,
import_tx,
_marker: PhantomData,
};
let rx = StreamSubscription {
topic,
store,
sync_handle,
acked,
stream: ReceiverStream::new(app_rx),
};
Ok((tx, rx))
}
pub(crate) async fn process_operation<M>(
operation: Operation,
topic: Topic,
pipeline: &Pipeline<LogId, Extensions, Topic>,
ack_policy: AckPolicy,
acked: &Acked,
source: Source,
) -> Option<StreamEvent<M>>
where
M: Serialize + for<'a> Deserialize<'a> + Send + 'static,
{
let log_id = LogId::from_topic(topic);
let prune_flag = operation.header.extensions.prune_flag;
let event = pipeline
.process(Event::new(operation, log_id, topic, prune_flag))
.await;
if event.is_failed() {
let error = event.failure_reason().expect("event has failed");
warn!(
id = %event.hash(),
"processing operation failed: {}",
error,
);
return Some(StreamEvent::ProcessingFailed {
event,
error,
source,
});
}
let Some(body) = event.body() else {
if let Err(error) = acked.ack(&event).await {
return Some(StreamEvent::AckFailed {
event,
error: Arc::new(error),
});
}
return None;
};
let event = match decode_cbor::<M, _>(body.as_bytes()) {
Ok(message) => {
if ack_policy == AckPolicy::Automatic
&& let Err(error) = acked.ack(&event).await
{
return Some(StreamEvent::AckFailed {
event,
error: Arc::new(error),
});
}
StreamEvent::Processed {
operation: ProcessedOperation {
event,
topic,
acked: acked.clone(),
message,
},
source,
}
}
Err(error) => StreamEvent::DecodeFailed { event, error },
};
Some(event)
}
pub(crate) async fn process_published_operation(
operation: Operation,
topic: Topic,
pipeline: &Pipeline<LogId, Extensions, Topic>,
) -> Event<LogId, Extensions, Topic> {
let log_id = LogId::from_topic(topic);
let prune_flag = operation.header.extensions.prune_flag;
let event = pipeline
.process(Event::new(operation, log_id, topic, prune_flag))
.await;
if event.is_failed() {
warn!(
id = %event.hash(),
"processing local operation failed: {}",
event.failure_reason().expect("error")
);
}
event
}
pub(crate) async fn ack_published_operation_wo_body<M>(
event: Event<LogId, Extensions, Topic>,
acked: &Acked,
) -> Option<StreamEvent<M>> {
if let Err(error) = acked.ack(&event).await {
return Some(StreamEvent::AckFailed {
event,
error: Arc::new(error),
});
}
None
}
pub(crate) async fn ack_published_operation<M>(
message: M,
event: Event<LogId, Extensions, Topic>,
topic: Topic,
ack_policy: AckPolicy,
acked: &Acked,
) -> Option<StreamEvent<M>> {
if ack_policy == AckPolicy::Automatic
&& let Err(error) = acked.ack(&event).await
{
return Some(StreamEvent::AckFailed {
event,
error: Arc::new(error),
});
}
Some(StreamEvent::Processed {
operation: ProcessedOperation {
event,
topic,
acked: acked.clone(),
message,
},
source: Source::LocalStore,
})
}
#[derive(Clone, Debug)]
pub struct StreamPublisher<M> {
topic: Topic,
sync_handle: Arc<SyncHandle<Operation, TopicLogSyncEvent<Extensions>>>,
forge: OperationForge,
#[allow(clippy::type_complexity)]
publish_tx: mpsc::Sender<(
Operation,
Option<M>,
oneshot::Sender<Event<LogId, Extensions, Topic>>,
)>,
import_tx: mpsc::Sender<(
BoxStream<'static, Operation>,
oneshot::Sender<ExternalStreamFuture>,
)>,
_marker: PhantomData<M>,
}
impl<M> StreamPublisher<M>
where
M: Serialize,
{
pub fn topic(&self) -> Topic {
self.topic
}
pub async fn publish(&self, message: M) -> Result<PublishFuture, PublishError> {
self.publish_inner(Some(message), false).await
}
pub async fn prune(&self, message: Option<M>) -> Result<PublishFuture, PublishError> {
self.publish_inner(message, true).await
}
pub async fn import(
&self,
stream: impl Stream<Item = Operation> + Send + 'static,
) -> Result<ExternalStreamFuture, ImportError> {
let stream = Box::pin(stream);
let (ready_tx, ready_rx) = oneshot::channel::<ExternalStreamFuture>();
self.import_tx
.send((stream, ready_tx))
.await
.map_err(|err| ImportError::SendToProcessor(err.to_string()))?;
ready_rx
.await
.map_err(|err| ImportError::ReceiveFromProcessor(err.to_string()))
}
async fn publish_inner(
&self,
message: Option<M>,
prune_flag: bool,
) -> Result<PublishFuture, PublishError> {
let extensions = Extensions::from_topic(self.topic()).prune_flag(prune_flag);
let body_bytes = match message {
Some(ref message) => Some(encode_cbor(&message)?),
None => None,
};
let operation = self
.forge
.create_operation(self.topic(), extensions.log_id, body_bytes, extensions)
.await?;
let hash = operation.hash;
let (processed_tx, processed_rx) = oneshot::channel();
self.publish_tx
.send((operation.clone(), message, processed_tx))
.await
.map_err(|err| PublishError::SendToProcessor(err.to_string()))?;
self.sync_handle
.publish(operation)
.await
.map_err(|err| PublishError::SyncHandle(err.to_string()))?;
Ok(PublishFuture { hash, processed_rx })
}
}
#[derive(Debug)]
pub struct PublishFuture {
hash: Hash,
processed_rx: oneshot::Receiver<Event<LogId, Extensions, Topic>>,
}
impl PublishFuture {
pub fn hash(&self) -> Hash {
self.hash
}
}
impl Future for PublishFuture {
type Output = Result<Event<LogId, Extensions, Topic>, oneshot::error::RecvError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.processed_rx.poll_unpin(cx)
}
}
pub struct StreamSubscription<M> {
topic: Topic,
store: SqliteStore,
acked: Acked,
#[allow(unused)]
sync_handle: Arc<SyncHandle<Operation, TopicLogSyncEvent<Extensions>>>,
stream: ReceiverStream<StreamEvent<M>>,
}
impl<M> StreamSubscription<M> {
pub fn topic(&self) -> Topic {
self.topic
}
pub async fn ack(&self, id: Hash) -> Result<(), AckedError> {
if let Some(operation) =
OperationStore::<_, _, LogId>::get_operation(&self.store, &id).await?
{
self.acked.ack(&operation.header).await?;
}
Ok(())
}
}
impl<M> Stream for StreamSubscription<M>
where
M: Serialize + for<'a> Deserialize<'a> + Send + 'static,
{
type Item = StreamEvent<M>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.poll_next_unpin(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}
#[derive(Clone, Debug)]
#[allow(clippy::large_enum_variant)]
pub enum StreamEvent<M> {
Processed {
operation: ProcessedOperation<M>,
source: Source,
},
SyncStarted {
remote_node_id: NodeId,
session_id: u64,
incoming_operations: u64,
outgoing_operations: u64,
incoming_bytes: u64,
outgoing_bytes: u64,
topic_sessions: u64,
},
SyncEnded {
remote_node_id: NodeId,
session_id: u64,
sent_operations: u64,
received_operations: u64,
sent_bytes: u64,
received_bytes: u64,
sent_bytes_topic_total: u64,
received_bytes_topic_total: u64,
error: Option<SyncError>,
},
ImportStarted {
session_id: SessionId,
},
ImportEnded {
session_id: SessionId,
},
ProcessingFailed {
event: Event<LogId, Extensions, Topic>,
error: ProcessorError,
source: Source,
},
ReplayStarted {
total_operations: u64,
},
ReplayEnded,
ReplayFailed { error: Arc<ReplayError> },
DecodeFailed {
event: Event<LogId, Extensions, Topic>,
error: DecodeError,
},
AckFailed {
event: Event<LogId, Extensions, Topic>,
error: Arc<AckedError>,
},
}
#[derive(Clone, Debug, PartialEq)]
pub struct ProcessedOperation<M> {
event: Event<LogId, Extensions, Topic>,
topic: Topic,
acked: Acked,
message: M,
}
impl<M> ProcessedOperation<M> {
pub fn topic(&self) -> Topic {
self.topic
}
pub fn id(&self) -> Hash {
self.event.hash()
}
pub fn author(&self) -> VerifyingKey {
self.event.header().verifying_key
}
pub fn timestamp(&self) -> u64 {
self.event.header().timestamp.into()
}
pub fn message(&self) -> &M {
&self.message
}
pub fn processed(&self) -> &Event<LogId, Extensions, Topic> {
&self.event
}
pub async fn ack(&self) -> Result<(), AckedError> {
self.acked.ack(self).await?;
Ok(())
}
}
impl<M> Borrow<Header> for &ProcessedOperation<M> {
fn borrow(&self) -> &Header {
self.event.header()
}
}
#[derive(Clone, Debug)]
#[allow(clippy::large_enum_variant)]
pub enum Source {
SyncSession {
remote_node_id: NodeId,
session_id: u64,
sent_operations: u64,
received_operations: u64,
sent_bytes: u64,
received_bytes: u64,
sent_bytes_topic_total: u64,
received_bytes_topic_total: u64,
phase: SessionPhase,
},
ExternalStream {
session_id: u64,
},
LocalStore,
}
#[derive(Debug, Error)]
pub enum PublishError {
#[error("an error occurred while serializing the message for publication: {0}")]
MessageEncoding(#[from] EncodeError),
#[error("an error occurred while creating an operation in the forge: {0}")]
Forge(#[from] ForgeError),
#[error("an error occurred while publishing an operation to the log sync stream: {0}")]
SyncHandle(String),
#[error("could not send to processor pipeline: {0}")]
SendToProcessor(String),
}
#[derive(Debug, Error)]
pub enum ImportError {
#[error("could not send to processor pipeline: {0}")]
SendToProcessor(String),
#[error("an error occurred awaiting message from processor: {0}")]
ReceiveFromProcessor(String),
}