use std::pin::Pin;
use tokio_stream::Stream;
use crate::engine_error::EngineError;
use crate::stream_subscribe::StreamCursor;
use crate::types::{ExecutionId, LeaseId, SignalId, TimestampMs, WaitpointId, WorkerInstanceId};
#[non_exhaustive]
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum LeaseHistoryEvent {
Acquired {
cursor: StreamCursor,
execution_id: ExecutionId,
lease_id: Option<LeaseId>,
worker_instance_id: Option<WorkerInstanceId>,
at: TimestampMs,
},
Renewed {
cursor: StreamCursor,
execution_id: ExecutionId,
lease_id: Option<LeaseId>,
worker_instance_id: Option<WorkerInstanceId>,
at: TimestampMs,
},
Expired {
cursor: StreamCursor,
execution_id: ExecutionId,
lease_id: Option<LeaseId>,
prev_owner: Option<WorkerInstanceId>,
at: TimestampMs,
},
Reclaimed {
cursor: StreamCursor,
execution_id: ExecutionId,
new_lease_id: Option<LeaseId>,
new_owner: Option<WorkerInstanceId>,
at: TimestampMs,
},
Revoked {
cursor: StreamCursor,
execution_id: ExecutionId,
lease_id: Option<LeaseId>,
revoked_by: String,
at: TimestampMs,
},
}
impl LeaseHistoryEvent {
pub fn cursor(&self) -> &StreamCursor {
match self {
Self::Acquired { cursor, .. }
| Self::Renewed { cursor, .. }
| Self::Expired { cursor, .. }
| Self::Reclaimed { cursor, .. }
| Self::Revoked { cursor, .. } => cursor,
}
}
pub fn execution_id(&self) -> &ExecutionId {
match self {
Self::Acquired { execution_id, .. }
| Self::Renewed { execution_id, .. }
| Self::Expired { execution_id, .. }
| Self::Reclaimed { execution_id, .. }
| Self::Revoked { execution_id, .. } => execution_id,
}
}
pub fn at(&self) -> TimestampMs {
match self {
Self::Acquired { at, .. }
| Self::Renewed { at, .. }
| Self::Expired { at, .. }
| Self::Reclaimed { at, .. }
| Self::Revoked { at, .. } => *at,
}
}
}
pub type LeaseHistorySubscription =
Pin<Box<dyn Stream<Item = Result<LeaseHistoryEvent, EngineError>> + Send>>;
#[non_exhaustive]
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum CompletionOutcome {
Success,
Failure,
Cancelled,
Other(String),
}
impl CompletionOutcome {
pub fn from_wire(s: &str) -> Self {
match s {
"success" => Self::Success,
"failure" => Self::Failure,
"cancelled" => Self::Cancelled,
other => Self::Other(other.to_string()),
}
}
}
#[non_exhaustive]
#[derive(Clone, Debug)]
pub struct CompletionEvent {
pub cursor: StreamCursor,
pub execution_id: ExecutionId,
pub outcome: CompletionOutcome,
pub at: TimestampMs,
}
impl CompletionEvent {
pub fn new(
cursor: StreamCursor,
execution_id: ExecutionId,
outcome: CompletionOutcome,
at: TimestampMs,
) -> Self {
Self {
cursor,
execution_id,
outcome,
at,
}
}
}
pub type CompletionSubscription =
Pin<Box<dyn Stream<Item = Result<CompletionEvent, EngineError>> + Send>>;
#[non_exhaustive]
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum SignalDeliveryEffect {
Satisfied,
Buffered,
Deduped,
Other(String),
}
impl SignalDeliveryEffect {
pub fn from_wire(s: &str) -> Self {
match s {
"satisfied" => Self::Satisfied,
"buffered" => Self::Buffered,
"deduped" => Self::Deduped,
other => Self::Other(other.to_string()),
}
}
}
#[non_exhaustive]
#[derive(Clone, Debug)]
pub struct SignalDeliveryEvent {
pub cursor: StreamCursor,
pub execution_id: ExecutionId,
pub signal_id: SignalId,
pub waitpoint_id: Option<WaitpointId>,
pub source_identity: Option<String>,
pub effect: SignalDeliveryEffect,
pub at: TimestampMs,
}
impl SignalDeliveryEvent {
pub fn new(
cursor: StreamCursor,
execution_id: ExecutionId,
signal_id: SignalId,
waitpoint_id: Option<WaitpointId>,
source_identity: Option<String>,
effect: SignalDeliveryEffect,
at: TimestampMs,
) -> Self {
Self {
cursor,
execution_id,
signal_id,
waitpoint_id,
source_identity,
effect,
at,
}
}
}
pub type SignalDeliverySubscription =
Pin<Box<dyn Stream<Item = Result<SignalDeliveryEvent, EngineError>> + Send>>;
#[non_exhaustive]
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum InstanceTagEvent {
Attached {
cursor: StreamCursor,
execution_id: ExecutionId,
tag: String,
at: TimestampMs,
},
Cleared {
cursor: StreamCursor,
execution_id: ExecutionId,
tag: String,
at: TimestampMs,
},
}
pub type InstanceTagSubscription =
Pin<Box<dyn Stream<Item = Result<InstanceTagEvent, EngineError>> + Send>>;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn completion_outcome_wire_round_trip() {
assert_eq!(CompletionOutcome::from_wire("success"), CompletionOutcome::Success);
assert_eq!(CompletionOutcome::from_wire("failure"), CompletionOutcome::Failure);
assert_eq!(CompletionOutcome::from_wire("cancelled"), CompletionOutcome::Cancelled);
match CompletionOutcome::from_wire("timed_out") {
CompletionOutcome::Other(s) => assert_eq!(s, "timed_out"),
other => panic!("expected Other, got {other:?}"),
}
}
#[test]
fn signal_delivery_effect_wire_round_trip() {
assert_eq!(
SignalDeliveryEffect::from_wire("satisfied"),
SignalDeliveryEffect::Satisfied
);
assert_eq!(
SignalDeliveryEffect::from_wire("buffered"),
SignalDeliveryEffect::Buffered
);
assert_eq!(
SignalDeliveryEffect::from_wire("deduped"),
SignalDeliveryEffect::Deduped
);
match SignalDeliveryEffect::from_wire("throttled") {
SignalDeliveryEffect::Other(s) => assert_eq!(s, "throttled"),
other => panic!("expected Other, got {other:?}"),
}
}
#[test]
fn lease_history_event_accessors() {
use crate::types::ExecutionId;
let cursor = StreamCursor::new(vec![0x01, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 42]);
let exec = ExecutionId::parse("{fp:0}:11111111-1111-4111-8111-111111111111").unwrap();
let ev = LeaseHistoryEvent::Expired {
cursor: cursor.clone(),
execution_id: exec.clone(),
lease_id: None,
prev_owner: None,
at: TimestampMs(1_700_000_000_000),
};
assert_eq!(ev.cursor(), &cursor);
assert_eq!(ev.execution_id(), &exec);
assert_eq!(ev.at(), TimestampMs(1_700_000_000_000));
}
}