use std::sync::Arc;
use tokio::sync::{mpsc, oneshot, watch};
use super::{DrainOutcome, GapGuard, PendingCommit, apply_outcome, send_or_cancel};
use crate::state::{Checkpoint, ResumeKey, StateStore};
use crate::watch::wire::{
WireCloudEvent, WireConnectionClosing, WireConnectionEstablished, WireErrorEvent,
WireReplayControl,
};
use crate::watch::{
FatalKind, GapReason, ReconnectPolicy, ServerCloseReason, WatchEvent, WatchState,
};
use crate::{ClientError, Notification, parse_cloudevent_id};
#[allow(
clippy::too_many_arguments,
clippy::too_many_lines,
reason = "the SSE-event-type dispatch is a flat match over six wire variants; splitting each branch into its own helper trades readability for line count, and reviewers want to see the full mapping table in one place"
)]
pub(super) async fn drain_frames(
parser: &mut finesse::Parser,
state: &mut WatchState,
last_reconnect_policy: &mut Option<ReconnectPolicy>,
gap_guard: &mut GapGuard,
commit_cursor: &mut Option<u64>,
pending_commit: &mut Option<PendingCommit>,
state_store: Option<&Arc<dyn StateStore>>,
resume_key: &ResumeKey,
triggers: &[crate::watch::Trigger],
trigger_states: &mut [crate::watch::trigger::TriggerState],
http: &reqwest::Client,
tx: &mpsc::Sender<Result<Notification, ClientError>>,
cancel: &mut oneshot::Receiver<()>,
parent_cancel: &mut watch::Receiver<bool>,
) -> Result<DrainOutcome, ClientError> {
while let Some(frame) = parser.next_frame() {
let finesse::Frame::Message(msg) = frame else {
continue;
};
match msg.event.as_str() {
"live-notification" | "replay" => {
let raw: serde_json::Value = serde_json::from_str(&msg.data)?;
let top_type = raw.get("type").and_then(|v| v.as_str());
if top_type == Some("connection_established") {
let _: WireConnectionEstablished = serde_json::from_value(raw)?;
apply_outcome(
last_reconnect_policy,
state.transition(WatchEvent::ConnectionEstablished),
);
continue;
}
let raw_envelope = raw.clone();
let wire: WireCloudEvent = serde_json::from_value(raw)?;
let (event_type, sequence) = match parse_cloudevent_id(&wire.id) {
Ok(v) => v,
Err(e) => {
apply_outcome(
last_reconnect_policy,
state.transition(WatchEvent::Fatal(FatalKind::MalformedEvent)),
);
return Err(e);
}
};
apply_outcome(
last_reconnect_policy,
state.transition(WatchEvent::NotificationReceived { sequence }),
);
match gap_guard.observe(sequence) {
Ok(()) => {
let notification = Notification {
event_type: event_type.clone(),
sequence,
identifier: wire.data.identifier,
payload: wire.data.payload,
cloudevent: Some(raw_envelope),
};
if let Some(prev) = pending_commit.as_ref() {
if let Some(store) = state_store {
let checkpoint =
Checkpoint::new(prev.sequence, Some(prev.event_id.clone()));
if let Err(e) = store.put(resume_key, checkpoint).await {
return Err(ClientError::from(e));
}
}
*commit_cursor = Some(prev.sequence);
}
match crate::watch::trigger::dispatch_triggers(
triggers,
trigger_states,
¬ification,
parent_cancel,
cancel,
http,
)
.await
{
Ok(()) => {}
Err(crate::watch::trigger::DispatchOutcome::Cancelled) => {
return Ok(DrainOutcome::StopRequested);
}
Err(crate::watch::trigger::DispatchOutcome::RequiredFailed {
kind,
source,
}) => {
let err = ClientError::TriggerFailed { kind, source };
let _ = send_or_cancel(tx, Err(err), cancel, parent_cancel).await;
return Ok(DrainOutcome::StopRequested);
}
}
if send_or_cancel(tx, Ok(notification), cancel, parent_cancel)
.await
.is_err()
{
return Ok(DrainOutcome::StopRequested);
}
*pending_commit = Some(PendingCommit {
sequence,
event_id: format!("{event_type}@{sequence}"),
});
}
Err(reason) => {
apply_outcome(
last_reconnect_policy,
state.transition(WatchEvent::GapDetected(reason)),
);
let _ = send_or_cancel(
tx,
Err(ClientError::HistoryGap { reason }),
cancel,
parent_cancel,
)
.await;
return Ok(DrainOutcome::StopRequested);
}
}
}
"heartbeat" => {
apply_outcome(
last_reconnect_policy,
state.transition(WatchEvent::HeartbeatReceived),
);
}
"replay-control" => {
let wire: WireReplayControl = serde_json::from_str(&msg.data)?;
match wire.tag.as_str() {
"replay_completed" => {
apply_outcome(
last_reconnect_policy,
state.transition(WatchEvent::ReplayCompleted),
);
}
"notification_replay_limit_reached" => {
let Some(max_allowed) = wire.max_allowed else {
let message =
"replay-control: notification_replay_limit_reached missing max_allowed"
.to_string();
apply_outcome(
last_reconnect_policy,
state.transition(WatchEvent::Fatal(FatalKind::ProtocolViolation(
message.clone(),
))),
);
return Err(ClientError::StreamProtocol {
message,
request_id: None,
});
};
let reason = GapReason::ReplayLimitReached { max_allowed };
apply_outcome(
last_reconnect_policy,
state.transition(WatchEvent::GapDetected(reason)),
);
let _ = send_or_cancel(
tx,
Err(ClientError::HistoryGap { reason }),
cancel,
parent_cancel,
)
.await;
return Ok(DrainOutcome::StopRequested);
}
_other => {}
}
}
"connection-closing" => {
let wire: WireConnectionClosing = serde_json::from_str(&msg.data)?;
let reason = match wire.reason.as_str() {
"server_shutdown" => ServerCloseReason::ServerShutdown,
"max_duration_reached" => ServerCloseReason::MaxDurationReached,
"end_of_stream" => ServerCloseReason::EndOfStream,
other => {
let message = format!("unknown connection-closing reason: {other}");
apply_outcome(
last_reconnect_policy,
state.transition(WatchEvent::Fatal(FatalKind::ProtocolViolation(
message.clone(),
))),
);
return Err(ClientError::StreamProtocol {
message,
request_id: wire.request_id,
});
}
};
apply_outcome(
last_reconnect_policy,
state.transition(WatchEvent::ServerClose { reason }),
);
return Ok(DrainOutcome::ServerClosed);
}
"error" => {
let wire: WireErrorEvent = serde_json::from_str(&msg.data)?;
let message = wire
.message
.clone()
.or_else(|| wire.error.clone())
.unwrap_or_else(|| "server error event".to_string());
apply_outcome(
last_reconnect_policy,
state.transition(WatchEvent::Fatal(FatalKind::ProtocolViolation(
message.clone(),
))),
);
return Err(ClientError::StreamProtocol {
message,
request_id: wire.request_id,
});
}
_other => {
}
}
}
Ok(DrainOutcome::Continue)
}