use crate::event_bus::{Event, EventBus, STREAM_END_SCOPE};
pub(crate) enum StreamEndReason {
Completed {
step: u64,
},
Error {
step: Option<u64>,
error: String,
},
}
impl StreamEndReason {
pub fn format_message(&self, session_id: &str) -> String {
match self {
StreamEndReason::Completed { step } => {
format!("session={session_id} status=completed step={step}")
}
StreamEndReason::Error { step, error } => step
.map(|s| format!("session={session_id} status=error step={s} error={error}"))
.unwrap_or_else(|| format!("session={session_id} status=error error={error}")),
}
}
}
pub(crate) fn finalize_event_stream(
event_bus: &EventBus,
session_id: &str,
reason: StreamEndReason,
event_stream_taken: &mut bool,
) {
let message = reason.format_message(session_id);
if let Err(err) = event_bus
.get_emitter()
.emit(Event::diagnostic(STREAM_END_SCOPE, message.clone()))
{
tracing::debug!(
session = %session_id,
scope = STREAM_END_SCOPE,
completion_message = %message,
error = ?err,
"failed to emit stream termination event"
);
}
if *event_stream_taken {
event_bus.close_channel();
*event_stream_taken = false;
}
}