use datasize::DataSize;
use futures::{Stream, StreamExt};
use once_cell::sync::Lazy;
use semver::Version;
use serde::{Deserialize, Serialize};
use tokio::sync::{
broadcast::{self, RecvError},
mpsc,
};
use tracing::{error, info, trace};
use warp::{
filters::BoxedFilter,
sse::{self, ServerSentEvent as WarpServerSentEvent},
Filter, Reply,
};
use casper_types::{ExecutionResult, PublicKey};
use crate::{
components::{consensus::EraId, CLIENT_API_VERSION},
types::{BlockHash, BlockHeader, DeployHash, FinalitySignature, TimeDiff, Timestamp},
};
pub const SSE_API_PATH: &str = "events";
pub(super) static SSE_INITIAL_EVENT: Lazy<ServerSentEvent> = Lazy::new(|| ServerSentEvent {
id: None,
data: SseData::ApiVersion(CLIENT_API_VERSION.clone()),
});
type Id = u32;
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Debug, DataSize)]
pub enum SseData {
#[data_size(skip)]
ApiVersion(Version),
BlockAdded {
block_hash: BlockHash,
block_header: Box<BlockHeader>,
},
DeployProcessed {
deploy_hash: Box<DeployHash>,
account: PublicKey,
timestamp: Timestamp,
ttl: TimeDiff,
dependencies: Vec<DeployHash>,
block_hash: Box<BlockHash>,
#[data_size(skip)]
execution_result: Box<ExecutionResult>,
},
Fault {
era_id: EraId,
public_key: PublicKey,
timestamp: Timestamp,
},
FinalitySignature(Box<FinalitySignature>),
}
#[derive(Clone, PartialEq, Eq, Debug)]
pub(super) struct ServerSentEvent {
pub(super) id: Option<Id>,
pub(super) data: SseData,
}
#[derive(Clone, PartialEq, Eq, Debug)]
pub(super) enum BroadcastChannelMessage {
ServerSentEvent(ServerSentEvent),
Shutdown,
}
pub(super) struct NewSubscriberInfo {
pub(super) start_from: Option<Id>,
pub(super) initial_events_sender: mpsc::UnboundedSender<ServerSentEvent>,
}
#[derive(Deserialize, Debug)]
struct Query {
start_from: Option<Id>,
}
pub(super) fn create_channels_and_filter(
broadcast_channel_size: usize,
) -> (
broadcast::Sender<BroadcastChannelMessage>,
mpsc::UnboundedReceiver<NewSubscriberInfo>,
BoxedFilter<(impl Reply,)>,
) {
let (broadcaster, _) = broadcast::channel(broadcast_channel_size);
let cloned_broadcaster = broadcaster.clone();
let (new_subscriber_info_sender, new_subscriber_info_receiver) = mpsc::unbounded_channel();
let filter = warp::get()
.and(warp::path(SSE_API_PATH))
.and(warp::query().map(move |query: Query| {
let (initial_events_sender, initial_events_receiver) = mpsc::unbounded_channel();
let new_subscriber_info = NewSubscriberInfo {
start_from: query.start_from,
initial_events_sender,
};
if new_subscriber_info_sender
.send(new_subscriber_info)
.is_err()
{
error!("failed to send new subscriber info");
}
let ongoing_events_receiver = cloned_broadcaster.subscribe();
sse::reply(sse::keep_alive().stream(stream_to_client(
initial_events_receiver,
ongoing_events_receiver,
)))
}))
.boxed();
(broadcaster, new_subscriber_info_receiver, filter)
}
fn stream_to_client(
initial_events: mpsc::UnboundedReceiver<ServerSentEvent>,
ongoing_events: broadcast::Receiver<BroadcastChannelMessage>,
) -> impl Stream<Item = Result<impl WarpServerSentEvent, RecvError>> + 'static {
initial_events
.map(|event| Ok(BroadcastChannelMessage::ServerSentEvent(event)))
.chain(ongoing_events)
.map(|result| {
trace!(?result);
match result {
Ok(BroadcastChannelMessage::ServerSentEvent(event)) => {
match (event.id, &event.data) {
(None, &SseData::ApiVersion { .. }) => Ok(sse::json(event.data).boxed()),
(Some(id), &SseData::BlockAdded { .. })
| (Some(id), &SseData::DeployProcessed { .. })
| (Some(id), &SseData::FinalitySignature(_))
| (Some(id), &SseData::Fault { .. }) => {
Ok((sse::id(id), sse::json(event.data)).boxed())
}
_ => unreachable!("only ApiVersion may have no event ID"),
}
}
Ok(BroadcastChannelMessage::Shutdown) | Err(RecvError::Closed) => {
Err(RecvError::Closed)
}
Err(RecvError::Lagged(amount)) => {
info!(
"client lagged by {} events - dropping event stream connection to client",
amount
);
Err(RecvError::Lagged(amount))
}
}
})
}