pub mod decode;
use crate::policy::ExecutionPolicy;
use crate::state::{AttemptType, PublicState, StateVector};
use crate::types::{
AttemptId, AttemptIndex, CancelSource, EdgeId, ExecutionId, FlowId, LaneId, LeaseEpoch,
LeaseFence, LeaseId, Namespace, SignalId, SuspensionId, TimestampMs, WaitpointId,
WaitpointToken, WorkerId, WorkerInstanceId,
};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, BTreeSet, HashMap};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CreateExecutionArgs {
pub execution_id: ExecutionId,
pub namespace: Namespace,
pub lane_id: LaneId,
pub execution_kind: String,
pub input_payload: Vec<u8>,
#[serde(default)]
pub payload_encoding: Option<String>,
pub priority: i32,
pub creator_identity: String,
#[serde(default)]
pub idempotency_key: Option<String>,
#[serde(default)]
pub tags: HashMap<String, String>,
#[serde(default)]
pub policy: Option<ExecutionPolicy>,
#[serde(default)]
pub delay_until: Option<TimestampMs>,
#[serde(default)]
pub execution_deadline_at: Option<TimestampMs>,
pub partition_id: u16,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum CreateExecutionResult {
Created {
execution_id: ExecutionId,
public_state: PublicState,
},
Duplicate { execution_id: ExecutionId },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct IssueClaimGrantArgs {
pub execution_id: ExecutionId,
pub lane_id: LaneId,
pub worker_id: WorkerId,
pub worker_instance_id: WorkerInstanceId,
#[serde(default)]
pub capability_hash: Option<String>,
#[serde(default)]
pub route_snapshot_json: Option<String>,
#[serde(default)]
pub admission_summary: Option<String>,
#[serde(default)]
pub worker_capabilities: BTreeSet<String>,
pub grant_ttl_ms: u64,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum IssueClaimGrantResult {
Granted { execution_id: ExecutionId },
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ClaimGrant {
pub execution_id: ExecutionId,
pub partition_key: crate::partition::PartitionKey,
pub grant_key: String,
pub expires_at_ms: u64,
}
impl ClaimGrant {
pub fn partition(
&self,
) -> Result<crate::partition::Partition, crate::partition::PartitionKeyParseError> {
self.partition_key.parse()
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ReclaimGrant {
pub execution_id: ExecutionId,
pub partition_key: crate::partition::PartitionKey,
pub grant_key: String,
pub expires_at_ms: u64,
pub lane_id: LaneId,
}
impl ReclaimGrant {
pub fn partition(
&self,
) -> Result<crate::partition::Partition, crate::partition::PartitionKeyParseError> {
self.partition_key.parse()
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ClaimExecutionArgs {
pub execution_id: ExecutionId,
pub worker_id: WorkerId,
pub worker_instance_id: WorkerInstanceId,
pub lane_id: LaneId,
pub lease_id: LeaseId,
pub lease_ttl_ms: u64,
pub attempt_id: AttemptId,
pub expected_attempt_index: AttemptIndex,
#[serde(default)]
pub attempt_policy_json: String,
#[serde(default)]
pub attempt_timeout_ms: Option<u64>,
#[serde(default)]
pub execution_deadline_at: Option<i64>,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ClaimedExecution {
pub execution_id: ExecutionId,
pub lease_id: LeaseId,
pub lease_epoch: LeaseEpoch,
pub attempt_index: AttemptIndex,
pub attempt_id: AttemptId,
pub attempt_type: AttemptType,
pub lease_expires_at: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ClaimExecutionResult {
Claimed(ClaimedExecution),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CompleteExecutionArgs {
pub execution_id: ExecutionId,
#[serde(default)]
pub fence: Option<LeaseFence>,
pub attempt_index: AttemptIndex,
#[serde(default)]
pub result_payload: Option<Vec<u8>>,
#[serde(default)]
pub result_encoding: Option<String>,
#[serde(default)]
pub source: CancelSource,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum CompleteExecutionResult {
Completed {
execution_id: ExecutionId,
public_state: PublicState,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RenewLeaseArgs {
pub execution_id: ExecutionId,
pub attempt_index: AttemptIndex,
pub fence: Option<LeaseFence>,
pub lease_ttl_ms: u64,
#[serde(default = "default_lease_history_grace_ms")]
pub lease_history_grace_ms: u64,
}
fn default_lease_history_grace_ms() -> u64 {
60_000
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum RenewLeaseResult {
Renewed { expires_at: TimestampMs },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MarkLeaseExpiredArgs {
pub execution_id: ExecutionId,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum MarkLeaseExpiredResult {
MarkedExpired,
AlreadySatisfied { reason: String },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CancelExecutionArgs {
pub execution_id: ExecutionId,
pub reason: String,
#[serde(default)]
pub source: CancelSource,
#[serde(default)]
pub lease_id: Option<LeaseId>,
#[serde(default)]
pub lease_epoch: Option<LeaseEpoch>,
#[serde(default)]
pub attempt_id: Option<AttemptId>,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum CancelExecutionResult {
Cancelled {
execution_id: ExecutionId,
public_state: PublicState,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RevokeLeaseArgs {
pub execution_id: ExecutionId,
#[serde(default)]
pub expected_lease_id: Option<String>,
pub worker_instance_id: WorkerInstanceId,
pub reason: String,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum RevokeLeaseResult {
Revoked {
lease_id: String,
lease_epoch: String,
},
AlreadySatisfied { reason: String },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DelayExecutionArgs {
pub execution_id: ExecutionId,
#[serde(default)]
pub fence: Option<LeaseFence>,
pub attempt_index: AttemptIndex,
pub delay_until: TimestampMs,
#[serde(default)]
pub source: CancelSource,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum DelayExecutionResult {
Delayed {
execution_id: ExecutionId,
public_state: PublicState,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MoveToWaitingChildrenArgs {
pub execution_id: ExecutionId,
#[serde(default)]
pub fence: Option<LeaseFence>,
pub attempt_index: AttemptIndex,
#[serde(default)]
pub source: CancelSource,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum MoveToWaitingChildrenResult {
Moved {
execution_id: ExecutionId,
public_state: PublicState,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ChangePriorityArgs {
pub execution_id: ExecutionId,
pub new_priority: i32,
pub lane_id: LaneId,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ChangePriorityResult {
Changed { execution_id: ExecutionId },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct UpdateProgressArgs {
pub execution_id: ExecutionId,
pub lease_id: LeaseId,
pub lease_epoch: LeaseEpoch,
pub attempt_id: AttemptId,
#[serde(default)]
pub progress_pct: Option<u8>,
#[serde(default)]
pub progress_message: Option<String>,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum UpdateProgressResult {
Updated,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct FailExecutionArgs {
pub execution_id: ExecutionId,
#[serde(default)]
pub fence: Option<LeaseFence>,
pub attempt_index: AttemptIndex,
pub failure_reason: String,
pub failure_category: String,
#[serde(default)]
pub retry_policy_json: String,
#[serde(default)]
pub next_attempt_policy_json: String,
#[serde(default)]
pub source: CancelSource,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum FailExecutionResult {
RetryScheduled {
delay_until: TimestampMs,
next_attempt_index: AttemptIndex,
},
TerminalFailed,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct IssueReclaimGrantArgs {
pub execution_id: ExecutionId,
pub worker_id: WorkerId,
pub worker_instance_id: WorkerInstanceId,
pub lane_id: LaneId,
#[serde(default)]
pub capability_hash: Option<String>,
pub grant_ttl_ms: u64,
#[serde(default)]
pub route_snapshot_json: Option<String>,
#[serde(default)]
pub admission_summary: Option<String>,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum IssueReclaimGrantResult {
Granted { expires_at_ms: TimestampMs },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ReclaimExecutionArgs {
pub execution_id: ExecutionId,
pub worker_id: WorkerId,
pub worker_instance_id: WorkerInstanceId,
pub lane_id: LaneId,
#[serde(default)]
pub capability_hash: Option<String>,
pub lease_id: LeaseId,
pub lease_ttl_ms: u64,
pub attempt_id: AttemptId,
#[serde(default)]
pub attempt_policy_json: String,
#[serde(default = "default_max_reclaim_count")]
pub max_reclaim_count: u32,
pub old_worker_instance_id: WorkerInstanceId,
pub current_attempt_index: AttemptIndex,
}
fn default_max_reclaim_count() -> u32 {
100
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ReclaimExecutionResult {
Reclaimed {
new_attempt_index: AttemptIndex,
new_attempt_id: AttemptId,
new_lease_id: LeaseId,
new_lease_epoch: LeaseEpoch,
lease_expires_at: TimestampMs,
},
MaxReclaimsExceeded,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ExpireExecutionArgs {
pub execution_id: ExecutionId,
pub expire_reason: String,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ExpireExecutionResult {
Expired { execution_id: ExecutionId },
AlreadyTerminal,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SuspendExecutionArgs {
pub execution_id: ExecutionId,
pub fence: Option<LeaseFence>,
pub attempt_index: AttemptIndex,
pub suspension_id: SuspensionId,
pub waitpoint_id: WaitpointId,
pub waitpoint_key: String,
pub reason_code: String,
pub requested_by: String,
pub resume_condition_json: String,
pub resume_policy_json: String,
#[serde(default)]
pub continuation_metadata_pointer: Option<String>,
#[serde(default)]
pub timeout_at: Option<TimestampMs>,
#[serde(default)]
pub use_pending_waitpoint: bool,
#[serde(default = "default_timeout_behavior")]
pub timeout_behavior: String,
}
fn default_timeout_behavior() -> String {
"fail".to_owned()
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum SuspendExecutionResult {
Suspended {
suspension_id: SuspensionId,
waitpoint_id: WaitpointId,
waitpoint_key: String,
waitpoint_token: WaitpointToken,
},
AlreadySatisfied {
suspension_id: SuspensionId,
waitpoint_id: WaitpointId,
waitpoint_key: String,
waitpoint_token: WaitpointToken,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ResumeExecutionArgs {
pub execution_id: ExecutionId,
#[serde(default = "default_trigger_type")]
pub trigger_type: String,
#[serde(default)]
pub resume_delay_ms: u64,
}
fn default_trigger_type() -> String {
"signal".to_owned()
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ResumeExecutionResult {
Resumed { public_state: PublicState },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CreatePendingWaitpointArgs {
pub execution_id: ExecutionId,
pub lease_id: LeaseId,
pub lease_epoch: LeaseEpoch,
pub attempt_index: AttemptIndex,
pub attempt_id: AttemptId,
pub waitpoint_id: WaitpointId,
pub waitpoint_key: String,
pub expires_in_ms: u64,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum CreatePendingWaitpointResult {
Created {
waitpoint_id: WaitpointId,
waitpoint_key: String,
waitpoint_token: WaitpointToken,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CloseWaitpointArgs {
pub waitpoint_id: WaitpointId,
pub reason: String,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum CloseWaitpointResult {
Closed,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DeliverSignalArgs {
pub execution_id: ExecutionId,
pub waitpoint_id: WaitpointId,
pub signal_id: SignalId,
pub signal_name: String,
pub signal_category: String,
pub source_type: String,
pub source_identity: String,
#[serde(default)]
pub payload: Option<Vec<u8>>,
#[serde(default)]
pub payload_encoding: Option<String>,
#[serde(default)]
pub correlation_id: Option<String>,
#[serde(default)]
pub idempotency_key: Option<String>,
pub target_scope: String,
#[serde(default)]
pub created_at: Option<TimestampMs>,
#[serde(default)]
pub dedup_ttl_ms: Option<u64>,
#[serde(default)]
pub resume_delay_ms: Option<u64>,
#[serde(default)]
pub max_signals_per_execution: Option<u64>,
#[serde(default)]
pub signal_maxlen: Option<u64>,
pub waitpoint_token: WaitpointToken,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum DeliverSignalResult {
Accepted { signal_id: SignalId, effect: String },
Duplicate { existing_signal_id: SignalId },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct BufferSignalArgs {
pub execution_id: ExecutionId,
pub waitpoint_id: WaitpointId,
pub signal_id: SignalId,
pub signal_name: String,
pub signal_category: String,
pub source_type: String,
pub source_identity: String,
#[serde(default)]
pub payload: Option<Vec<u8>>,
#[serde(default)]
pub payload_encoding: Option<String>,
#[serde(default)]
pub idempotency_key: Option<String>,
pub target_scope: String,
pub waitpoint_token: WaitpointToken,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum BufferSignalResult {
Buffered { signal_id: SignalId },
Duplicate { existing_signal_id: SignalId },
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct PendingWaitpointInfo {
pub waitpoint_id: WaitpointId,
pub waitpoint_key: String,
pub state: String,
pub waitpoint_token: WaitpointToken,
#[serde(default)]
pub required_signal_names: Vec<String>,
pub created_at: TimestampMs,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub activated_at: Option<TimestampMs>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub expires_at: Option<TimestampMs>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ExpireSuspensionArgs {
pub execution_id: ExecutionId,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ExpireSuspensionResult {
Expired { behavior_applied: String },
AlreadySatisfied { reason: String },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ClaimResumedExecutionArgs {
pub execution_id: ExecutionId,
pub worker_id: WorkerId,
pub worker_instance_id: WorkerInstanceId,
pub lane_id: LaneId,
pub lease_id: LeaseId,
pub lease_ttl_ms: u64,
pub current_attempt_index: AttemptIndex,
#[serde(default)]
pub remaining_attempt_timeout_ms: Option<u64>,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ClaimedResumedExecution {
pub execution_id: ExecutionId,
pub lease_id: LeaseId,
pub lease_epoch: LeaseEpoch,
pub attempt_index: AttemptIndex,
pub attempt_id: AttemptId,
pub lease_expires_at: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ClaimResumedExecutionResult {
Claimed(ClaimedResumedExecution),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AppendFrameArgs {
pub execution_id: ExecutionId,
pub attempt_index: AttemptIndex,
pub lease_id: LeaseId,
pub lease_epoch: LeaseEpoch,
pub attempt_id: AttemptId,
pub frame_type: String,
pub timestamp: TimestampMs,
pub payload: Vec<u8>,
#[serde(default)]
pub encoding: Option<String>,
#[serde(default)]
pub metadata_json: Option<String>,
#[serde(default)]
pub correlation_id: Option<String>,
#[serde(default)]
pub source: Option<String>,
#[serde(default)]
pub retention_maxlen: Option<u32>,
#[serde(default)]
pub max_payload_bytes: Option<u32>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum AppendFrameResult {
Appended {
entry_id: String,
frame_count: u64,
},
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum StreamCursor {
Start,
End,
At(String),
}
impl StreamCursor {
pub fn from_beginning() -> Self {
Self::At("0-0".to_owned())
}
pub fn start() -> Self {
Self::Start
}
pub fn end() -> Self {
Self::End
}
pub fn beginning() -> Self {
Self::from_beginning()
}
#[doc(hidden)]
pub fn to_wire(&self) -> &str {
match self {
Self::Start => "-",
Self::End => "+",
Self::At(s) => s.as_str(),
}
}
#[doc(hidden)]
pub fn into_wire_string(self) -> String {
match self {
Self::Start => "-".to_owned(),
Self::End => "+".to_owned(),
Self::At(s) => s,
}
}
pub fn is_concrete(&self) -> bool {
matches!(self, Self::At(_))
}
}
impl std::fmt::Display for StreamCursor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Start => f.write_str("start"),
Self::End => f.write_str("end"),
Self::At(s) => f.write_str(s),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum StreamCursorParseError {
Empty,
BareMarkerRejected(String),
Malformed(String),
}
impl std::fmt::Display for StreamCursorParseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Empty => f.write_str("stream cursor must not be empty"),
Self::BareMarkerRejected(s) => write!(
f,
"bare marker '{s}' is not a valid stream cursor; use 'start' or 'end'"
),
Self::Malformed(s) => write!(
f,
"invalid stream cursor '{s}' (expected 'start', 'end', '<ms>', or '<ms>-<seq>')"
),
}
}
}
impl std::error::Error for StreamCursorParseError {}
enum StreamCursorClass {
Start,
End,
Concrete,
BareMarker,
Empty,
Malformed,
}
fn classify_stream_cursor(s: &str) -> StreamCursorClass {
if s.is_empty() {
return StreamCursorClass::Empty;
}
if s == "-" || s == "+" {
return StreamCursorClass::BareMarker;
}
if s == "start" {
return StreamCursorClass::Start;
}
if s == "end" {
return StreamCursorClass::End;
}
if !s.is_ascii() {
return StreamCursorClass::Malformed;
}
let (ms_part, seq_part) = match s.split_once('-') {
Some((ms, seq)) => (ms, Some(seq)),
None => (s, None),
};
let ms_ok = !ms_part.is_empty() && ms_part.bytes().all(|b| b.is_ascii_digit());
let seq_ok = seq_part
.map(|p| !p.is_empty() && p.bytes().all(|b| b.is_ascii_digit()))
.unwrap_or(true);
if ms_ok && seq_ok {
StreamCursorClass::Concrete
} else {
StreamCursorClass::Malformed
}
}
impl std::str::FromStr for StreamCursor {
type Err = StreamCursorParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match classify_stream_cursor(s) {
StreamCursorClass::Start => Ok(Self::Start),
StreamCursorClass::End => Ok(Self::End),
StreamCursorClass::Concrete => Ok(Self::At(s.to_owned())),
StreamCursorClass::BareMarker => {
Err(StreamCursorParseError::BareMarkerRejected(s.to_owned()))
}
StreamCursorClass::Empty => Err(StreamCursorParseError::Empty),
StreamCursorClass::Malformed => Err(StreamCursorParseError::Malformed(s.to_owned())),
}
}
}
impl TryFrom<String> for StreamCursor {
type Error = StreamCursorParseError;
fn try_from(s: String) -> Result<Self, Self::Error> {
match classify_stream_cursor(&s) {
StreamCursorClass::Start => Ok(Self::Start),
StreamCursorClass::End => Ok(Self::End),
StreamCursorClass::Concrete => Ok(Self::At(s)),
StreamCursorClass::BareMarker => Err(StreamCursorParseError::BareMarkerRejected(s)),
StreamCursorClass::Empty => Err(StreamCursorParseError::Empty),
StreamCursorClass::Malformed => Err(StreamCursorParseError::Malformed(s)),
}
}
}
impl From<StreamCursor> for String {
fn from(c: StreamCursor) -> Self {
c.to_string()
}
}
impl Serialize for StreamCursor {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
serializer.collect_str(self)
}
}
impl<'de> Deserialize<'de> for StreamCursor {
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let s = String::deserialize(deserializer)?;
Self::try_from(s).map_err(serde::de::Error::custom)
}
}
pub const STREAM_READ_HARD_CAP: u64 = 10_000;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct StreamFrame {
pub id: String,
pub fields: std::collections::BTreeMap<String, String>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ReadFramesArgs {
pub execution_id: ExecutionId,
pub attempt_index: AttemptIndex,
pub from_id: String,
pub to_id: String,
pub count_limit: u64,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct StreamFrames {
pub frames: Vec<StreamFrame>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub closed_at: Option<TimestampMs>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub closed_reason: Option<String>,
}
impl StreamFrames {
pub fn empty_open() -> Self {
Self {
frames: Vec::new(),
closed_at: None,
closed_reason: None,
}
}
pub fn is_closed(&self) -> bool {
self.closed_at.is_some()
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ReadFramesResult {
Frames(StreamFrames),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CreateBudgetArgs {
pub budget_id: crate::types::BudgetId,
pub scope_type: String,
pub scope_id: String,
pub enforcement_mode: String,
pub on_hard_limit: String,
pub on_soft_limit: String,
pub reset_interval_ms: u64,
pub dimensions: Vec<String>,
pub hard_limits: Vec<u64>,
pub soft_limits: Vec<u64>,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum CreateBudgetResult {
Created { budget_id: crate::types::BudgetId },
AlreadySatisfied { budget_id: crate::types::BudgetId },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CreateQuotaPolicyArgs {
pub quota_policy_id: crate::types::QuotaPolicyId,
pub window_seconds: u64,
pub max_requests_per_window: u64,
pub max_concurrent: u64,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum CreateQuotaPolicyResult {
Created {
quota_policy_id: crate::types::QuotaPolicyId,
},
AlreadySatisfied {
quota_policy_id: crate::types::QuotaPolicyId,
},
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct BudgetStatus {
pub budget_id: String,
pub scope_type: String,
pub scope_id: String,
pub enforcement_mode: String,
pub usage: HashMap<String, u64>,
pub hard_limits: HashMap<String, u64>,
pub soft_limits: HashMap<String, u64>,
pub breach_count: u64,
pub soft_breach_count: u64,
pub last_breach_at: Option<String>,
pub last_breach_dim: Option<String>,
pub next_reset_at: Option<String>,
pub created_at: Option<String>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ReportUsageArgs {
pub dimensions: Vec<String>,
pub deltas: Vec<u64>,
pub now: TimestampMs,
#[serde(default)]
pub dedup_key: Option<String>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum ReportUsageResult {
Ok,
SoftBreach {
dimension: String,
current_usage: u64,
soft_limit: u64,
},
HardBreach {
dimension: String,
current_usage: u64,
hard_limit: u64,
},
AlreadyApplied,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ResetBudgetArgs {
pub budget_id: crate::types::BudgetId,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ResetBudgetResult {
Reset { next_reset_at: TimestampMs },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CheckAdmissionArgs {
pub execution_id: ExecutionId,
pub now: TimestampMs,
pub window_seconds: u64,
pub rate_limit: u64,
pub concurrency_cap: u64,
#[serde(default)]
pub jitter_ms: Option<u64>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum CheckAdmissionResult {
Admitted,
AlreadyAdmitted,
RateExceeded { retry_after_ms: u64 },
ConcurrencyExceeded,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ReleaseAdmissionArgs {
pub execution_id: ExecutionId,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ReleaseAdmissionResult {
Released,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct BlockExecutionArgs {
pub execution_id: ExecutionId,
pub blocking_reason: String,
#[serde(default)]
pub blocking_detail: Option<String>,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum BlockExecutionResult {
Blocked,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct UnblockExecutionArgs {
pub execution_id: ExecutionId,
pub now: TimestampMs,
#[serde(default)]
pub expected_blocking_reason: Option<String>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum UnblockExecutionResult {
Unblocked,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CreateFlowArgs {
pub flow_id: crate::types::FlowId,
pub flow_kind: String,
pub namespace: Namespace,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum CreateFlowResult {
Created { flow_id: crate::types::FlowId },
AlreadySatisfied { flow_id: crate::types::FlowId },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AddExecutionToFlowArgs {
pub flow_id: crate::types::FlowId,
pub execution_id: ExecutionId,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum AddExecutionToFlowResult {
Added {
execution_id: ExecutionId,
new_node_count: u32,
},
AlreadyMember {
execution_id: ExecutionId,
node_count: u32,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CancelFlowArgs {
pub flow_id: crate::types::FlowId,
pub reason: String,
pub cancellation_policy: String,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum CancelFlowResult {
Cancelled {
cancellation_policy: String,
member_execution_ids: Vec<String>,
},
CancellationScheduled {
cancellation_policy: String,
member_count: u32,
member_execution_ids: Vec<String>,
},
PartiallyCancelled {
cancellation_policy: String,
member_execution_ids: Vec<String>,
failed_member_execution_ids: Vec<String>,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct StageDependencyEdgeArgs {
pub flow_id: crate::types::FlowId,
pub edge_id: crate::types::EdgeId,
pub upstream_execution_id: ExecutionId,
pub downstream_execution_id: ExecutionId,
#[serde(default = "default_dependency_kind")]
pub dependency_kind: String,
#[serde(default)]
pub data_passing_ref: Option<String>,
pub expected_graph_revision: u64,
pub now: TimestampMs,
}
fn default_dependency_kind() -> String {
"success_only".to_owned()
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum StageDependencyEdgeResult {
Staged {
edge_id: crate::types::EdgeId,
new_graph_revision: u64,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ApplyDependencyToChildArgs {
pub flow_id: crate::types::FlowId,
pub edge_id: crate::types::EdgeId,
pub downstream_execution_id: ExecutionId,
pub upstream_execution_id: ExecutionId,
pub graph_revision: u64,
#[serde(default = "default_dependency_kind")]
pub dependency_kind: String,
#[serde(default)]
pub data_passing_ref: Option<String>,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ApplyDependencyToChildResult {
Applied { unsatisfied_count: u32 },
AlreadyApplied,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ResolveDependencyArgs {
pub edge_id: crate::types::EdgeId,
pub upstream_outcome: String,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ResolveDependencyResult {
Satisfied,
Impossible,
AlreadyResolved,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PromoteBlockedToEligibleArgs {
pub execution_id: ExecutionId,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum PromoteBlockedToEligibleResult {
Promoted,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EvaluateFlowEligibilityArgs {
pub execution_id: ExecutionId,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum EvaluateFlowEligibilityResult {
Status { status: String },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ReplayExecutionArgs {
pub execution_id: ExecutionId,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ReplayExecutionResult {
Replayed { public_state: PublicState },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ExecutionInfo {
pub execution_id: ExecutionId,
pub namespace: String,
pub lane_id: String,
pub priority: i32,
pub execution_kind: String,
pub state_vector: StateVector,
pub public_state: PublicState,
pub created_at: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub started_at: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub completed_at: Option<String>,
pub current_attempt_index: u32,
pub flow_id: Option<String>,
pub blocking_detail: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SetExecutionTagsArgs {
pub execution_id: ExecutionId,
pub tags: BTreeMap<String, String>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum SetExecutionTagsResult {
Ok { count: u32 },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SetFlowTagsArgs {
pub flow_id: FlowId,
pub tags: BTreeMap<String, String>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum SetFlowTagsResult {
Ok { count: u32 },
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct ExecutionSnapshot {
pub execution_id: ExecutionId,
pub flow_id: Option<FlowId>,
pub lane_id: LaneId,
pub namespace: Namespace,
pub public_state: PublicState,
pub blocking_reason: Option<String>,
pub blocking_detail: Option<String>,
pub current_attempt: Option<AttemptSummary>,
pub current_lease: Option<LeaseSummary>,
pub current_waitpoint: Option<WaitpointId>,
pub created_at: TimestampMs,
pub last_mutation_at: TimestampMs,
pub total_attempt_count: u32,
pub tags: BTreeMap<String, String>,
}
impl ExecutionSnapshot {
#[allow(clippy::too_many_arguments)]
pub fn new(
execution_id: ExecutionId,
flow_id: Option<FlowId>,
lane_id: LaneId,
namespace: Namespace,
public_state: PublicState,
blocking_reason: Option<String>,
blocking_detail: Option<String>,
current_attempt: Option<AttemptSummary>,
current_lease: Option<LeaseSummary>,
current_waitpoint: Option<WaitpointId>,
created_at: TimestampMs,
last_mutation_at: TimestampMs,
total_attempt_count: u32,
tags: BTreeMap<String, String>,
) -> Self {
Self {
execution_id,
flow_id,
lane_id,
namespace,
public_state,
blocking_reason,
blocking_detail,
current_attempt,
current_lease,
current_waitpoint,
created_at,
last_mutation_at,
total_attempt_count,
tags,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct AttemptSummary {
pub attempt_id: AttemptId,
pub attempt_index: AttemptIndex,
}
impl AttemptSummary {
pub fn new(attempt_id: AttemptId, attempt_index: AttemptIndex) -> Self {
Self {
attempt_id,
attempt_index,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct LeaseSummary {
pub lease_epoch: LeaseEpoch,
pub worker_instance_id: WorkerInstanceId,
pub expires_at: TimestampMs,
}
impl LeaseSummary {
pub fn new(
lease_epoch: LeaseEpoch,
worker_instance_id: WorkerInstanceId,
expires_at: TimestampMs,
) -> Self {
Self {
lease_epoch,
worker_instance_id,
expires_at,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct FlowSnapshot {
pub flow_id: FlowId,
pub flow_kind: String,
pub namespace: Namespace,
pub public_flow_state: String,
pub graph_revision: u64,
pub node_count: u32,
pub edge_count: u32,
pub created_at: TimestampMs,
pub last_mutation_at: TimestampMs,
pub cancelled_at: Option<TimestampMs>,
pub cancel_reason: Option<String>,
pub cancellation_policy: Option<String>,
pub tags: BTreeMap<String, String>,
pub edge_groups: Vec<EdgeGroupSnapshot>,
}
impl FlowSnapshot {
#[allow(clippy::too_many_arguments)]
pub fn new(
flow_id: FlowId,
flow_kind: String,
namespace: Namespace,
public_flow_state: String,
graph_revision: u64,
node_count: u32,
edge_count: u32,
created_at: TimestampMs,
last_mutation_at: TimestampMs,
cancelled_at: Option<TimestampMs>,
cancel_reason: Option<String>,
cancellation_policy: Option<String>,
tags: BTreeMap<String, String>,
edge_groups: Vec<EdgeGroupSnapshot>,
) -> Self {
Self {
flow_id,
flow_kind,
namespace,
public_flow_state,
graph_revision,
node_count,
edge_count,
created_at,
last_mutation_at,
cancelled_at,
cancel_reason,
cancellation_policy,
tags,
edge_groups,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct EdgeSnapshot {
pub edge_id: EdgeId,
pub flow_id: FlowId,
pub upstream_execution_id: ExecutionId,
pub downstream_execution_id: ExecutionId,
pub dependency_kind: String,
pub satisfaction_condition: String,
pub data_passing_ref: Option<String>,
pub edge_state: String,
pub created_at: TimestampMs,
pub created_by: String,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum EdgeDirection {
Outgoing {
from_node: ExecutionId,
},
Incoming {
to_node: ExecutionId,
},
}
impl EdgeDirection {
pub fn subject(&self) -> &ExecutionId {
match self {
Self::Outgoing { from_node } => from_node,
Self::Incoming { to_node } => to_node,
}
}
}
impl EdgeSnapshot {
#[allow(clippy::too_many_arguments)]
pub fn new(
edge_id: EdgeId,
flow_id: FlowId,
upstream_execution_id: ExecutionId,
downstream_execution_id: ExecutionId,
dependency_kind: String,
satisfaction_condition: String,
data_passing_ref: Option<String>,
edge_state: String,
created_at: TimestampMs,
created_by: String,
) -> Self {
Self {
edge_id,
flow_id,
upstream_execution_id,
downstream_execution_id,
dependency_kind,
satisfaction_condition,
data_passing_ref,
edge_state,
created_at,
created_by,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum EdgeDependencyPolicy {
AllOf,
AnyOf {
#[serde(rename = "on_satisfied")]
on_satisfied: OnSatisfied,
},
Quorum {
k: u32,
#[serde(rename = "on_satisfied")]
on_satisfied: OnSatisfied,
},
}
impl EdgeDependencyPolicy {
pub fn all_of() -> Self {
Self::AllOf
}
pub fn any_of(on_satisfied: OnSatisfied) -> Self {
Self::AnyOf { on_satisfied }
}
pub fn quorum(k: u32, on_satisfied: OnSatisfied) -> Self {
Self::Quorum { k, on_satisfied }
}
pub fn variant_str(&self) -> &'static str {
match self {
Self::AllOf => "all_of",
Self::AnyOf { .. } => "any_of",
Self::Quorum { .. } => "quorum",
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
#[serde(rename_all = "snake_case")]
pub enum OnSatisfied {
CancelRemaining,
LetRun,
}
impl OnSatisfied {
pub fn cancel_remaining() -> Self {
Self::CancelRemaining
}
pub fn let_run() -> Self {
Self::LetRun
}
pub fn variant_str(&self) -> &'static str {
match self {
Self::CancelRemaining => "cancel_remaining",
Self::LetRun => "let_run",
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
#[serde(rename_all = "snake_case")]
pub enum EdgeGroupState {
Pending,
Satisfied,
Impossible,
Cancelled,
}
impl EdgeGroupState {
pub fn from_literal(s: &str) -> Self {
match s {
"satisfied" => Self::Satisfied,
"impossible" => Self::Impossible,
"cancelled" => Self::Cancelled,
_ => Self::Pending,
}
}
pub fn as_str(&self) -> &'static str {
match self {
Self::Pending => "pending",
Self::Satisfied => "satisfied",
Self::Impossible => "impossible",
Self::Cancelled => "cancelled",
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct EdgeGroupSnapshot {
pub downstream_execution_id: ExecutionId,
pub policy: EdgeDependencyPolicy,
pub total_deps: u32,
pub satisfied_count: u32,
pub failed_count: u32,
pub skipped_count: u32,
pub running_count: u32,
pub group_state: EdgeGroupState,
}
impl EdgeGroupSnapshot {
#[allow(clippy::too_many_arguments)]
pub fn new(
downstream_execution_id: ExecutionId,
policy: EdgeDependencyPolicy,
total_deps: u32,
satisfied_count: u32,
failed_count: u32,
skipped_count: u32,
running_count: u32,
group_state: EdgeGroupState,
) -> Self {
Self {
downstream_execution_id,
policy,
total_deps,
satisfied_count,
failed_count,
skipped_count,
running_count,
group_state,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SetEdgeGroupPolicyArgs {
pub flow_id: FlowId,
pub downstream_execution_id: ExecutionId,
pub policy: EdgeDependencyPolicy,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum SetEdgeGroupPolicyResult {
Set,
AlreadySet,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum FlowStatus {
Active,
Completed,
Failed,
Cancelled,
Unknown,
}
impl FlowStatus {
pub fn from_public_flow_state(raw: &str) -> Self {
match raw {
"open" | "running" | "blocked" => Self::Active,
"completed" => Self::Completed,
"failed" => Self::Failed,
"cancelled" => Self::Cancelled,
_ => Self::Unknown,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct FlowSummary {
pub flow_id: FlowId,
pub created_at: TimestampMs,
pub status: FlowStatus,
}
impl FlowSummary {
pub fn new(flow_id: FlowId, created_at: TimestampMs, status: FlowStatus) -> Self {
Self {
flow_id,
created_at,
status,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct ListFlowsPage {
pub flows: Vec<FlowSummary>,
pub next_cursor: Option<FlowId>,
}
impl ListFlowsPage {
pub fn new(flows: Vec<FlowSummary>, next_cursor: Option<FlowId>) -> Self {
Self { flows, next_cursor }
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct StateSummary {
pub state_vector: StateVector,
pub current_attempt_index: AttemptIndex,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::FlowId;
#[test]
fn create_execution_args_serde() {
let config = crate::partition::PartitionConfig::default();
let args = CreateExecutionArgs {
execution_id: ExecutionId::for_flow(&FlowId::new(), &config),
namespace: Namespace::new("test"),
lane_id: LaneId::new("default"),
execution_kind: "llm_call".to_owned(),
input_payload: b"hello".to_vec(),
payload_encoding: Some("json".to_owned()),
priority: 0,
creator_identity: "test-user".to_owned(),
idempotency_key: None,
tags: HashMap::new(),
policy: None,
delay_until: None,
execution_deadline_at: None,
partition_id: 42,
now: TimestampMs::now(),
};
let json = serde_json::to_string(&args).unwrap();
let parsed: CreateExecutionArgs = serde_json::from_str(&json).unwrap();
assert_eq!(args.execution_id, parsed.execution_id);
}
#[test]
fn claim_result_serde() {
let config = crate::partition::PartitionConfig::default();
let result = ClaimExecutionResult::Claimed(ClaimedExecution {
execution_id: ExecutionId::for_flow(&FlowId::new(), &config),
lease_id: LeaseId::new(),
lease_epoch: LeaseEpoch::new(1),
attempt_index: AttemptIndex::new(0),
attempt_id: AttemptId::new(),
attempt_type: AttemptType::Initial,
lease_expires_at: TimestampMs::from_millis(1000),
});
let json = serde_json::to_string(&result).unwrap();
let parsed: ClaimExecutionResult = serde_json::from_str(&json).unwrap();
assert_eq!(result, parsed);
}
#[test]
fn stream_cursor_display_matches_wire_tokens() {
assert_eq!(StreamCursor::Start.to_string(), "start");
assert_eq!(StreamCursor::End.to_string(), "end");
assert_eq!(StreamCursor::At("123".into()).to_string(), "123");
assert_eq!(StreamCursor::At("123-4".into()).to_string(), "123-4");
}
#[test]
fn stream_cursor_to_wire_maps_to_valkey_markers() {
assert_eq!(StreamCursor::Start.to_wire(), "-");
assert_eq!(StreamCursor::End.to_wire(), "+");
assert_eq!(StreamCursor::At("0-0".into()).to_wire(), "0-0");
assert_eq!(StreamCursor::At("17-3".into()).to_wire(), "17-3");
}
#[test]
fn stream_cursor_from_str_accepts_wire_tokens() {
use std::str::FromStr;
assert_eq!(
StreamCursor::from_str("start").unwrap(),
StreamCursor::Start
);
assert_eq!(StreamCursor::from_str("end").unwrap(), StreamCursor::End);
assert_eq!(
StreamCursor::from_str("123").unwrap(),
StreamCursor::At("123".into())
);
assert_eq!(
StreamCursor::from_str("0-0").unwrap(),
StreamCursor::At("0-0".into())
);
assert_eq!(
StreamCursor::from_str("1713100800150-0").unwrap(),
StreamCursor::At("1713100800150-0".into())
);
}
#[test]
fn stream_cursor_from_str_rejects_bare_markers() {
use std::str::FromStr;
assert!(matches!(
StreamCursor::from_str("-"),
Err(StreamCursorParseError::BareMarkerRejected(s)) if s == "-"
));
assert!(matches!(
StreamCursor::from_str("+"),
Err(StreamCursorParseError::BareMarkerRejected(s)) if s == "+"
));
}
#[test]
fn stream_cursor_from_str_rejects_empty() {
use std::str::FromStr;
assert_eq!(
StreamCursor::from_str(""),
Err(StreamCursorParseError::Empty)
);
}
#[test]
fn stream_cursor_from_str_rejects_malformed() {
use std::str::FromStr;
for bad in [
"abc", "-1", "1-", "-1-2", "1-2-3", "1.2", "1 2", "Start", "END",
] {
assert!(
matches!(
StreamCursor::from_str(bad),
Err(StreamCursorParseError::Malformed(_))
),
"must reject {bad:?}",
);
}
}
#[test]
fn stream_cursor_from_str_rejects_non_ascii() {
use std::str::FromStr;
assert!(matches!(
StreamCursor::from_str("1\u{2013}2"),
Err(StreamCursorParseError::Malformed(_))
));
}
#[test]
fn stream_cursor_serde_round_trip() {
for c in [
StreamCursor::Start,
StreamCursor::End,
StreamCursor::At("0-0".into()),
StreamCursor::At("1713100800150-0".into()),
] {
let json = serde_json::to_string(&c).unwrap();
let back: StreamCursor = serde_json::from_str(&json).unwrap();
assert_eq!(back, c);
}
}
#[test]
fn stream_cursor_serializes_as_bare_string() {
assert_eq!(
serde_json::to_string(&StreamCursor::Start).unwrap(),
r#""start""#
);
assert_eq!(
serde_json::to_string(&StreamCursor::End).unwrap(),
r#""end""#
);
assert_eq!(
serde_json::to_string(&StreamCursor::At("123-0".into())).unwrap(),
r#""123-0""#
);
}
#[test]
fn stream_cursor_deserialize_rejects_bare_markers() {
assert!(serde_json::from_str::<StreamCursor>(r#""-""#).is_err());
assert!(serde_json::from_str::<StreamCursor>(r#""+""#).is_err());
}
#[test]
fn stream_cursor_from_beginning_is_zero_zero() {
assert_eq!(
StreamCursor::from_beginning(),
StreamCursor::At("0-0".into())
);
}
#[test]
fn stream_cursor_is_concrete_classifies_variants() {
assert!(!StreamCursor::Start.is_concrete());
assert!(!StreamCursor::End.is_concrete());
assert!(StreamCursor::At("0-0".into()).is_concrete());
assert!(StreamCursor::At("123-0".into()).is_concrete());
assert!(StreamCursor::from_beginning().is_concrete());
}
#[test]
fn stream_cursor_into_wire_string_moves_without_cloning() {
assert_eq!(StreamCursor::Start.into_wire_string(), "-");
assert_eq!(StreamCursor::End.into_wire_string(), "+");
assert_eq!(StreamCursor::At("17-3".into()).into_wire_string(), "17-3");
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ExecutionSummary {
pub execution_id: ExecutionId,
pub namespace: String,
pub lane_id: String,
pub execution_kind: String,
pub public_state: String,
pub priority: i32,
pub created_at: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ListExecutionsResult {
pub executions: Vec<ExecutionSummary>,
pub total_returned: usize,
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct ListLanesPage {
pub lanes: Vec<LaneId>,
pub next_cursor: Option<LaneId>,
}
impl ListLanesPage {
pub fn new(lanes: Vec<LaneId>, next_cursor: Option<LaneId>) -> Self {
Self { lanes, next_cursor }
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
pub struct SuspendedExecutionEntry {
pub execution_id: ExecutionId,
pub suspended_at_ms: i64,
pub reason: String,
}
impl SuspendedExecutionEntry {
pub fn new(execution_id: ExecutionId, suspended_at_ms: i64, reason: String) -> Self {
Self {
execution_id,
suspended_at_ms,
reason,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
pub struct ListSuspendedPage {
pub entries: Vec<SuspendedExecutionEntry>,
pub next_cursor: Option<ExecutionId>,
}
impl ListSuspendedPage {
pub fn new(entries: Vec<SuspendedExecutionEntry>, next_cursor: Option<ExecutionId>) -> Self {
Self {
entries,
next_cursor,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
pub struct ListExecutionsPage {
pub executions: Vec<ExecutionId>,
pub next_cursor: Option<ExecutionId>,
}
impl ListExecutionsPage {
pub fn new(executions: Vec<ExecutionId>, next_cursor: Option<ExecutionId>) -> Self {
Self { executions, next_cursor }
}
}
#[derive(Clone, Debug)]
pub struct RotateWaitpointHmacSecretArgs {
pub new_kid: String,
pub new_secret_hex: String,
pub grace_ms: u64,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum RotateWaitpointHmacSecretOutcome {
Rotated {
previous_kid: Option<String>,
new_kid: String,
gc_count: u32,
},
Noop { kid: String },
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ListWaitpointHmacKidsArgs {}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct WaitpointHmacKids {
pub current_kid: Option<String>,
pub verifying: Vec<VerifyingKid>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct VerifyingKid {
pub kid: String,
pub expires_at_ms: i64,
}
use crate::backend::WaitpointHmac;
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct IdempotencyKey(String);
impl IdempotencyKey {
pub fn new(key: impl Into<String>) -> Self {
Self(key.into())
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl std::fmt::Display for IdempotencyKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum SignalMatcher {
ByName(String),
Wildcard,
}
pub const MAX_COMPOSITE_DEPTH: usize = 4;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind")]
#[non_exhaustive]
pub enum CompositeBody {
AllOf {
members: Vec<ResumeCondition>,
},
Count {
n: u32,
count_kind: CountKind,
#[serde(default, skip_serializing_if = "Option::is_none")]
matcher: Option<SignalMatcher>,
waitpoints: Vec<String>,
},
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum CountKind {
DistinctWaitpoints,
DistinctSignals,
DistinctSources,
}
impl CompositeBody {
pub fn all_of(members: impl IntoIterator<Item = ResumeCondition>) -> Self {
Self::AllOf {
members: members.into_iter().collect(),
}
}
pub fn count(
n: u32,
count_kind: CountKind,
matcher: Option<SignalMatcher>,
waitpoints: impl IntoIterator<Item = String>,
) -> Self {
Self::Count {
n,
count_kind,
matcher,
waitpoints: waitpoints.into_iter().collect(),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum ResumeCondition {
Single {
waitpoint_key: String,
matcher: SignalMatcher,
},
OperatorOnly,
TimeoutOnly,
Composite(CompositeBody),
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct CompositeValidationError {
pub detail: String,
}
impl CompositeValidationError {
fn new(detail: impl Into<String>) -> Self {
Self {
detail: detail.into(),
}
}
}
impl ResumeCondition {
pub fn all_of_waitpoints<I, S>(waitpoint_keys: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
let members: Vec<ResumeCondition> = waitpoint_keys
.into_iter()
.map(|k| ResumeCondition::Single {
waitpoint_key: k.into(),
matcher: SignalMatcher::Wildcard,
})
.collect();
ResumeCondition::Composite(CompositeBody::AllOf { members })
}
pub fn referenced_waitpoint_keys(&self) -> Vec<String> {
let mut out: Vec<String> = Vec::new();
let mut push = |k: &str| {
if !out.iter().any(|e| e == k) {
out.push(k.to_owned());
}
};
fn walk(cond: &ResumeCondition, push: &mut dyn FnMut(&str)) {
match cond {
ResumeCondition::Single { waitpoint_key, .. } => push(waitpoint_key),
ResumeCondition::Composite(body) => walk_body(body, push),
_ => {}
}
}
fn walk_body(body: &CompositeBody, push: &mut dyn FnMut(&str)) {
match body {
CompositeBody::AllOf { members } => {
for m in members {
walk(m, push);
}
}
CompositeBody::Count { waitpoints, .. } => {
for w in waitpoints {
push(w.as_str());
}
}
}
}
walk(self, &mut push);
out
}
pub fn validate_composite(&self) -> Result<(), CompositeValidationError> {
match self {
ResumeCondition::Composite(body) => validate_body(body, 1, ""),
_ => Ok(()),
}
}
}
fn validate_body(
body: &CompositeBody,
depth: usize,
path: &str,
) -> Result<(), CompositeValidationError> {
if depth > MAX_COMPOSITE_DEPTH {
return Err(CompositeValidationError::new(format!(
"depth {} exceeds cap {} at path {}",
depth,
MAX_COMPOSITE_DEPTH,
if path.is_empty() { "<root>" } else { path }
)));
}
match body {
CompositeBody::AllOf { members } => {
if members.is_empty() {
return Err(CompositeValidationError::new(format!(
"allof_empty_members at path {}",
if path.is_empty() { "<root>" } else { path }
)));
}
for (i, m) in members.iter().enumerate() {
let child_path = if path.is_empty() {
format!("members[{i}]")
} else {
format!("{path}.members[{i}]")
};
if let ResumeCondition::Composite(inner) = m {
validate_body(inner, depth + 1, &child_path)?;
}
}
Ok(())
}
CompositeBody::Count {
n,
count_kind,
waitpoints,
..
} => {
if *n == 0 {
return Err(CompositeValidationError::new(format!(
"count_n_zero at path {}",
if path.is_empty() { "<root>" } else { path }
)));
}
if waitpoints.is_empty() {
return Err(CompositeValidationError::new(format!(
"count_waitpoints_empty at path {}",
if path.is_empty() { "<root>" } else { path }
)));
}
if matches!(count_kind, CountKind::DistinctWaitpoints)
&& (*n as usize) > waitpoints.len()
{
return Err(CompositeValidationError::new(format!(
"count_exceeds_waitpoint_set: n={} > waitpoints.len()={} at path {}",
n,
waitpoints.len(),
if path.is_empty() { "<root>" } else { path }
)));
}
Ok(())
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum ResumeTarget {
Runnable,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
pub struct ResumePolicy {
pub resume_target: ResumeTarget,
pub consume_matched_signals: bool,
pub retain_signal_buffer_until_closed: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub resume_delay_ms: Option<u64>,
pub close_waitpoint_on_resume: bool,
}
impl ResumePolicy {
pub fn normal() -> Self {
Self {
resume_target: ResumeTarget::Runnable,
consume_matched_signals: true,
retain_signal_buffer_until_closed: false,
resume_delay_ms: None,
close_waitpoint_on_resume: true,
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum TimeoutBehavior {
Fail,
Cancel,
Expire,
AutoResumeWithTimeoutSignal,
Escalate,
}
impl TimeoutBehavior {
pub fn as_wire_str(self) -> &'static str {
match self {
Self::Fail => "fail",
Self::Cancel => "cancel",
Self::Expire => "expire",
Self::AutoResumeWithTimeoutSignal => "auto_resume_with_timeout_signal",
Self::Escalate => "escalate",
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum SuspensionReasonCode {
WaitingForSignal,
WaitingForApproval,
WaitingForCallback,
WaitingForToolResult,
WaitingForOperatorReview,
PausedByPolicy,
PausedByBudget,
StepBoundary,
ManualPause,
}
impl SuspensionReasonCode {
pub fn as_wire_str(self) -> &'static str {
match self {
Self::WaitingForSignal => "waiting_for_signal",
Self::WaitingForApproval => "waiting_for_approval",
Self::WaitingForCallback => "waiting_for_callback",
Self::WaitingForToolResult => "waiting_for_tool_result",
Self::WaitingForOperatorReview => "waiting_for_operator_review",
Self::PausedByPolicy => "paused_by_policy",
Self::PausedByBudget => "paused_by_budget",
Self::StepBoundary => "step_boundary",
Self::ManualPause => "manual_pause",
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum SuspensionRequester {
Worker,
Operator,
Policy,
SystemTimeoutPolicy,
}
impl SuspensionRequester {
pub fn as_wire_str(self) -> &'static str {
match self {
Self::Worker => "worker",
Self::Operator => "operator",
Self::Policy => "policy",
Self::SystemTimeoutPolicy => "system_timeout_policy",
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum WaitpointBinding {
Fresh {
waitpoint_id: WaitpointId,
waitpoint_key: String,
},
UsePending {
waitpoint_id: WaitpointId,
},
}
impl WaitpointBinding {
pub fn fresh() -> Self {
let wp_id = WaitpointId::new();
let key = format!("wpk:{wp_id}");
Self::Fresh {
waitpoint_id: wp_id,
waitpoint_key: key,
}
}
pub fn use_pending(pending: &crate::backend::PendingWaitpoint) -> Self {
Self::UsePending {
waitpoint_id: pending.waitpoint_id.clone(),
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[non_exhaustive]
pub struct SuspendArgs {
pub suspension_id: SuspensionId,
pub waitpoints: Vec<WaitpointBinding>,
pub resume_condition: ResumeCondition,
pub resume_policy: ResumePolicy,
pub reason_code: SuspensionReasonCode,
pub requested_by: SuspensionRequester,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub timeout_at: Option<TimestampMs>,
pub timeout_behavior: TimeoutBehavior,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub continuation_metadata_pointer: Option<String>,
pub now: TimestampMs,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub idempotency_key: Option<IdempotencyKey>,
}
impl SuspendArgs {
pub fn new(
suspension_id: SuspensionId,
waitpoint: WaitpointBinding,
resume_condition: ResumeCondition,
resume_policy: ResumePolicy,
reason_code: SuspensionReasonCode,
now: TimestampMs,
) -> Self {
Self {
suspension_id,
waitpoints: vec![waitpoint],
resume_condition,
resume_policy,
reason_code,
requested_by: SuspensionRequester::Worker,
timeout_at: None,
timeout_behavior: TimeoutBehavior::Fail,
continuation_metadata_pointer: None,
now,
idempotency_key: None,
}
}
pub fn primary(&self) -> &WaitpointBinding {
&self.waitpoints[0]
}
pub fn with_timeout(mut self, at: TimestampMs, behavior: TimeoutBehavior) -> Self {
self.timeout_at = Some(at);
self.timeout_behavior = behavior;
self
}
pub fn with_requester(mut self, requester: SuspensionRequester) -> Self {
self.requested_by = requester;
self
}
pub fn with_continuation_metadata_pointer(mut self, p: String) -> Self {
self.continuation_metadata_pointer = Some(p);
self
}
pub fn with_idempotency_key(mut self, key: IdempotencyKey) -> Self {
self.idempotency_key = Some(key);
self
}
pub fn with_waitpoint(mut self, binding: WaitpointBinding) -> Self {
self.waitpoints.push(binding);
self
}
pub fn with_waitpoints(mut self, bindings: Vec<WaitpointBinding>) -> Self {
self.waitpoints = bindings;
self
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct SuspendOutcomeDetails {
pub suspension_id: SuspensionId,
pub waitpoint_id: WaitpointId,
pub waitpoint_key: String,
pub waitpoint_token: WaitpointHmac,
pub additional_waitpoints: Vec<AdditionalWaitpointBinding>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct AdditionalWaitpointBinding {
pub waitpoint_id: WaitpointId,
pub waitpoint_key: String,
pub waitpoint_token: WaitpointHmac,
}
impl AdditionalWaitpointBinding {
pub fn new(
waitpoint_id: WaitpointId,
waitpoint_key: String,
waitpoint_token: WaitpointHmac,
) -> Self {
Self {
waitpoint_id,
waitpoint_key,
waitpoint_token,
}
}
}
impl SuspendOutcomeDetails {
pub fn new(
suspension_id: SuspensionId,
waitpoint_id: WaitpointId,
waitpoint_key: String,
waitpoint_token: WaitpointHmac,
) -> Self {
Self {
suspension_id,
waitpoint_id,
waitpoint_key,
waitpoint_token,
additional_waitpoints: Vec::new(),
}
}
pub fn with_additional_waitpoints(
mut self,
extras: Vec<AdditionalWaitpointBinding>,
) -> Self {
self.additional_waitpoints = extras;
self
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum SuspendOutcome {
Suspended {
details: SuspendOutcomeDetails,
handle: crate::backend::Handle,
},
AlreadySatisfied { details: SuspendOutcomeDetails },
}
impl SuspendOutcome {
pub fn details(&self) -> &SuspendOutcomeDetails {
match self {
Self::Suspended { details, .. } => details,
Self::AlreadySatisfied { details } => details,
}
}
}
#[cfg(test)]
mod rfc_014_validation_tests {
use super::*;
fn single(wp: &str) -> ResumeCondition {
ResumeCondition::Single {
waitpoint_key: wp.to_owned(),
matcher: SignalMatcher::ByName("x".to_owned()),
}
}
#[test]
fn single_passes_validate() {
assert!(single("wpk:a").validate_composite().is_ok());
}
#[test]
fn allof_empty_members_rejected() {
let c = ResumeCondition::Composite(CompositeBody::AllOf { members: vec![] });
let e = c.validate_composite().unwrap_err();
assert!(e.detail.contains("allof_empty_members"), "{}", e.detail);
}
#[test]
fn count_n_zero_rejected() {
let c = ResumeCondition::Composite(CompositeBody::Count {
n: 0,
count_kind: CountKind::DistinctWaitpoints,
matcher: None,
waitpoints: vec!["wpk:a".to_owned()],
});
let e = c.validate_composite().unwrap_err();
assert!(e.detail.contains("count_n_zero"), "{}", e.detail);
}
#[test]
fn count_waitpoints_empty_rejected() {
let c = ResumeCondition::Composite(CompositeBody::Count {
n: 1,
count_kind: CountKind::DistinctSources,
matcher: None,
waitpoints: vec![],
});
let e = c.validate_composite().unwrap_err();
assert!(e.detail.contains("count_waitpoints_empty"), "{}", e.detail);
}
#[test]
fn count_exceeds_waitpoint_set_rejected_only_for_distinct_waitpoints() {
let c = ResumeCondition::Composite(CompositeBody::Count {
n: 3,
count_kind: CountKind::DistinctWaitpoints,
matcher: None,
waitpoints: vec!["a".into(), "b".into()],
});
let e = c.validate_composite().unwrap_err();
assert!(e.detail.contains("count_exceeds_waitpoint_set"), "{}", e.detail);
let c2 = ResumeCondition::Composite(CompositeBody::Count {
n: 3,
count_kind: CountKind::DistinctSignals,
matcher: None,
waitpoints: vec!["a".into(), "b".into()],
});
assert!(c2.validate_composite().is_ok());
}
#[test]
fn depth_4_accepted_depth_5_rejected() {
let leaf = single("wpk:leaf");
let d4 = ResumeCondition::Composite(CompositeBody::AllOf {
members: vec![ResumeCondition::Composite(CompositeBody::AllOf {
members: vec![ResumeCondition::Composite(CompositeBody::AllOf {
members: vec![ResumeCondition::Composite(CompositeBody::AllOf {
members: vec![leaf.clone()],
})],
})],
})],
});
assert!(d4.validate_composite().is_ok());
let d5 = ResumeCondition::Composite(CompositeBody::AllOf {
members: vec![d4],
});
let e = d5.validate_composite().unwrap_err();
assert!(e.detail.contains("exceeds cap"), "{}", e.detail);
}
}