use crate::policy::ExecutionPolicy;
use crate::state::{AttemptType, PublicState, StateVector};
use crate::types::{
AttemptId, AttemptIndex, CancelSource, EdgeId, ExecutionId, FlowId, LaneId, LeaseEpoch,
LeaseFence, LeaseId, Namespace, SignalId, SuspensionId, TimestampMs, WaitpointId,
WaitpointToken, WorkerId, WorkerInstanceId,
};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, BTreeSet, HashMap};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CreateExecutionArgs {
pub execution_id: ExecutionId,
pub namespace: Namespace,
pub lane_id: LaneId,
pub execution_kind: String,
pub input_payload: Vec<u8>,
#[serde(default)]
pub payload_encoding: Option<String>,
pub priority: i32,
pub creator_identity: String,
#[serde(default)]
pub idempotency_key: Option<String>,
#[serde(default)]
pub tags: HashMap<String, String>,
#[serde(default)]
pub policy: Option<ExecutionPolicy>,
#[serde(default)]
pub delay_until: Option<TimestampMs>,
#[serde(default)]
pub execution_deadline_at: Option<TimestampMs>,
pub partition_id: u16,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum CreateExecutionResult {
Created {
execution_id: ExecutionId,
public_state: PublicState,
},
Duplicate { execution_id: ExecutionId },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct IssueClaimGrantArgs {
pub execution_id: ExecutionId,
pub lane_id: LaneId,
pub worker_id: WorkerId,
pub worker_instance_id: WorkerInstanceId,
#[serde(default)]
pub capability_hash: Option<String>,
#[serde(default)]
pub route_snapshot_json: Option<String>,
#[serde(default)]
pub admission_summary: Option<String>,
#[serde(default)]
pub worker_capabilities: BTreeSet<String>,
pub grant_ttl_ms: u64,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum IssueClaimGrantResult {
Granted { execution_id: ExecutionId },
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ClaimGrant {
pub execution_id: ExecutionId,
pub partition_key: crate::partition::PartitionKey,
pub grant_key: String,
pub expires_at_ms: u64,
}
impl ClaimGrant {
pub fn partition(
&self,
) -> Result<crate::partition::Partition, crate::partition::PartitionKeyParseError> {
self.partition_key.parse()
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ReclaimGrant {
pub execution_id: ExecutionId,
pub partition_key: crate::partition::PartitionKey,
pub grant_key: String,
pub expires_at_ms: u64,
pub lane_id: LaneId,
}
impl ReclaimGrant {
pub fn partition(
&self,
) -> Result<crate::partition::Partition, crate::partition::PartitionKeyParseError> {
self.partition_key.parse()
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ClaimExecutionArgs {
pub execution_id: ExecutionId,
pub worker_id: WorkerId,
pub worker_instance_id: WorkerInstanceId,
pub lane_id: LaneId,
pub lease_id: LeaseId,
pub lease_ttl_ms: u64,
pub attempt_id: AttemptId,
pub expected_attempt_index: AttemptIndex,
#[serde(default)]
pub attempt_policy_json: String,
#[serde(default)]
pub attempt_timeout_ms: Option<u64>,
#[serde(default)]
pub execution_deadline_at: Option<i64>,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ClaimedExecution {
pub execution_id: ExecutionId,
pub lease_id: LeaseId,
pub lease_epoch: LeaseEpoch,
pub attempt_index: AttemptIndex,
pub attempt_id: AttemptId,
pub attempt_type: AttemptType,
pub lease_expires_at: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ClaimExecutionResult {
Claimed(ClaimedExecution),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CompleteExecutionArgs {
pub execution_id: ExecutionId,
#[serde(default)]
pub fence: Option<LeaseFence>,
pub attempt_index: AttemptIndex,
#[serde(default)]
pub result_payload: Option<Vec<u8>>,
#[serde(default)]
pub result_encoding: Option<String>,
#[serde(default)]
pub source: CancelSource,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum CompleteExecutionResult {
Completed {
execution_id: ExecutionId,
public_state: PublicState,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RenewLeaseArgs {
pub execution_id: ExecutionId,
pub attempt_index: AttemptIndex,
pub fence: Option<LeaseFence>,
pub lease_ttl_ms: u64,
#[serde(default = "default_lease_history_grace_ms")]
pub lease_history_grace_ms: u64,
}
fn default_lease_history_grace_ms() -> u64 {
60_000
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum RenewLeaseResult {
Renewed { expires_at: TimestampMs },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MarkLeaseExpiredArgs {
pub execution_id: ExecutionId,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum MarkLeaseExpiredResult {
MarkedExpired,
AlreadySatisfied { reason: String },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CancelExecutionArgs {
pub execution_id: ExecutionId,
pub reason: String,
#[serde(default)]
pub source: CancelSource,
#[serde(default)]
pub lease_id: Option<LeaseId>,
#[serde(default)]
pub lease_epoch: Option<LeaseEpoch>,
#[serde(default)]
pub attempt_id: Option<AttemptId>,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum CancelExecutionResult {
Cancelled {
execution_id: ExecutionId,
public_state: PublicState,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RevokeLeaseArgs {
pub execution_id: ExecutionId,
#[serde(default)]
pub expected_lease_id: Option<String>,
pub worker_instance_id: WorkerInstanceId,
pub reason: String,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum RevokeLeaseResult {
Revoked {
lease_id: String,
lease_epoch: String,
},
AlreadySatisfied { reason: String },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DelayExecutionArgs {
pub execution_id: ExecutionId,
#[serde(default)]
pub fence: Option<LeaseFence>,
pub attempt_index: AttemptIndex,
pub delay_until: TimestampMs,
#[serde(default)]
pub source: CancelSource,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum DelayExecutionResult {
Delayed {
execution_id: ExecutionId,
public_state: PublicState,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MoveToWaitingChildrenArgs {
pub execution_id: ExecutionId,
#[serde(default)]
pub fence: Option<LeaseFence>,
pub attempt_index: AttemptIndex,
#[serde(default)]
pub source: CancelSource,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum MoveToWaitingChildrenResult {
Moved {
execution_id: ExecutionId,
public_state: PublicState,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ChangePriorityArgs {
pub execution_id: ExecutionId,
pub new_priority: i32,
pub lane_id: LaneId,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ChangePriorityResult {
Changed { execution_id: ExecutionId },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct UpdateProgressArgs {
pub execution_id: ExecutionId,
pub lease_id: LeaseId,
pub lease_epoch: LeaseEpoch,
pub attempt_id: AttemptId,
#[serde(default)]
pub progress_pct: Option<u8>,
#[serde(default)]
pub progress_message: Option<String>,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum UpdateProgressResult {
Updated,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct FailExecutionArgs {
pub execution_id: ExecutionId,
#[serde(default)]
pub fence: Option<LeaseFence>,
pub attempt_index: AttemptIndex,
pub failure_reason: String,
pub failure_category: String,
#[serde(default)]
pub retry_policy_json: String,
#[serde(default)]
pub next_attempt_policy_json: String,
#[serde(default)]
pub source: CancelSource,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum FailExecutionResult {
RetryScheduled {
delay_until: TimestampMs,
next_attempt_index: AttemptIndex,
},
TerminalFailed,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct IssueReclaimGrantArgs {
pub execution_id: ExecutionId,
pub worker_id: WorkerId,
pub worker_instance_id: WorkerInstanceId,
pub lane_id: LaneId,
#[serde(default)]
pub capability_hash: Option<String>,
pub grant_ttl_ms: u64,
#[serde(default)]
pub route_snapshot_json: Option<String>,
#[serde(default)]
pub admission_summary: Option<String>,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum IssueReclaimGrantResult {
Granted { expires_at_ms: TimestampMs },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ReclaimExecutionArgs {
pub execution_id: ExecutionId,
pub worker_id: WorkerId,
pub worker_instance_id: WorkerInstanceId,
pub lane_id: LaneId,
#[serde(default)]
pub capability_hash: Option<String>,
pub lease_id: LeaseId,
pub lease_ttl_ms: u64,
pub attempt_id: AttemptId,
#[serde(default)]
pub attempt_policy_json: String,
#[serde(default = "default_max_reclaim_count")]
pub max_reclaim_count: u32,
pub old_worker_instance_id: WorkerInstanceId,
pub current_attempt_index: AttemptIndex,
}
fn default_max_reclaim_count() -> u32 {
100
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ReclaimExecutionResult {
Reclaimed {
new_attempt_index: AttemptIndex,
new_attempt_id: AttemptId,
new_lease_id: LeaseId,
new_lease_epoch: LeaseEpoch,
lease_expires_at: TimestampMs,
},
MaxReclaimsExceeded,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ExpireExecutionArgs {
pub execution_id: ExecutionId,
pub expire_reason: String,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ExpireExecutionResult {
Expired { execution_id: ExecutionId },
AlreadyTerminal,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SuspendExecutionArgs {
pub execution_id: ExecutionId,
pub fence: Option<LeaseFence>,
pub attempt_index: AttemptIndex,
pub suspension_id: SuspensionId,
pub waitpoint_id: WaitpointId,
pub waitpoint_key: String,
pub reason_code: String,
pub requested_by: String,
pub resume_condition_json: String,
pub resume_policy_json: String,
#[serde(default)]
pub continuation_metadata_pointer: Option<String>,
#[serde(default)]
pub timeout_at: Option<TimestampMs>,
#[serde(default)]
pub use_pending_waitpoint: bool,
#[serde(default = "default_timeout_behavior")]
pub timeout_behavior: String,
}
fn default_timeout_behavior() -> String {
"fail".to_owned()
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum SuspendExecutionResult {
Suspended {
suspension_id: SuspensionId,
waitpoint_id: WaitpointId,
waitpoint_key: String,
waitpoint_token: WaitpointToken,
},
AlreadySatisfied {
suspension_id: SuspensionId,
waitpoint_id: WaitpointId,
waitpoint_key: String,
waitpoint_token: WaitpointToken,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ResumeExecutionArgs {
pub execution_id: ExecutionId,
#[serde(default = "default_trigger_type")]
pub trigger_type: String,
#[serde(default)]
pub resume_delay_ms: u64,
}
fn default_trigger_type() -> String {
"signal".to_owned()
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ResumeExecutionResult {
Resumed { public_state: PublicState },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CreatePendingWaitpointArgs {
pub execution_id: ExecutionId,
pub lease_id: LeaseId,
pub lease_epoch: LeaseEpoch,
pub attempt_index: AttemptIndex,
pub attempt_id: AttemptId,
pub waitpoint_id: WaitpointId,
pub waitpoint_key: String,
pub expires_in_ms: u64,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum CreatePendingWaitpointResult {
Created {
waitpoint_id: WaitpointId,
waitpoint_key: String,
waitpoint_token: WaitpointToken,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CloseWaitpointArgs {
pub waitpoint_id: WaitpointId,
pub reason: String,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum CloseWaitpointResult {
Closed,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DeliverSignalArgs {
pub execution_id: ExecutionId,
pub waitpoint_id: WaitpointId,
pub signal_id: SignalId,
pub signal_name: String,
pub signal_category: String,
pub source_type: String,
pub source_identity: String,
#[serde(default)]
pub payload: Option<Vec<u8>>,
#[serde(default)]
pub payload_encoding: Option<String>,
#[serde(default)]
pub correlation_id: Option<String>,
#[serde(default)]
pub idempotency_key: Option<String>,
pub target_scope: String,
#[serde(default)]
pub created_at: Option<TimestampMs>,
#[serde(default)]
pub dedup_ttl_ms: Option<u64>,
#[serde(default)]
pub resume_delay_ms: Option<u64>,
#[serde(default)]
pub max_signals_per_execution: Option<u64>,
#[serde(default)]
pub signal_maxlen: Option<u64>,
pub waitpoint_token: WaitpointToken,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum DeliverSignalResult {
Accepted { signal_id: SignalId, effect: String },
Duplicate { existing_signal_id: SignalId },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct BufferSignalArgs {
pub execution_id: ExecutionId,
pub waitpoint_id: WaitpointId,
pub signal_id: SignalId,
pub signal_name: String,
pub signal_category: String,
pub source_type: String,
pub source_identity: String,
#[serde(default)]
pub payload: Option<Vec<u8>>,
#[serde(default)]
pub payload_encoding: Option<String>,
#[serde(default)]
pub idempotency_key: Option<String>,
pub target_scope: String,
pub waitpoint_token: WaitpointToken,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum BufferSignalResult {
Buffered { signal_id: SignalId },
Duplicate { existing_signal_id: SignalId },
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct PendingWaitpointInfo {
pub waitpoint_id: WaitpointId,
pub waitpoint_key: String,
pub state: String,
pub waitpoint_token: WaitpointToken,
#[serde(default)]
pub required_signal_names: Vec<String>,
pub created_at: TimestampMs,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub activated_at: Option<TimestampMs>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub expires_at: Option<TimestampMs>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ExpireSuspensionArgs {
pub execution_id: ExecutionId,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ExpireSuspensionResult {
Expired { behavior_applied: String },
AlreadySatisfied { reason: String },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ClaimResumedExecutionArgs {
pub execution_id: ExecutionId,
pub worker_id: WorkerId,
pub worker_instance_id: WorkerInstanceId,
pub lane_id: LaneId,
pub lease_id: LeaseId,
pub lease_ttl_ms: u64,
pub current_attempt_index: AttemptIndex,
#[serde(default)]
pub remaining_attempt_timeout_ms: Option<u64>,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ClaimedResumedExecution {
pub execution_id: ExecutionId,
pub lease_id: LeaseId,
pub lease_epoch: LeaseEpoch,
pub attempt_index: AttemptIndex,
pub attempt_id: AttemptId,
pub lease_expires_at: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ClaimResumedExecutionResult {
Claimed(ClaimedResumedExecution),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AppendFrameArgs {
pub execution_id: ExecutionId,
pub attempt_index: AttemptIndex,
pub lease_id: LeaseId,
pub lease_epoch: LeaseEpoch,
pub attempt_id: AttemptId,
pub frame_type: String,
pub timestamp: TimestampMs,
pub payload: Vec<u8>,
#[serde(default)]
pub encoding: Option<String>,
#[serde(default)]
pub metadata_json: Option<String>,
#[serde(default)]
pub correlation_id: Option<String>,
#[serde(default)]
pub source: Option<String>,
#[serde(default)]
pub retention_maxlen: Option<u32>,
#[serde(default)]
pub max_payload_bytes: Option<u32>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum AppendFrameResult {
Appended {
entry_id: String,
frame_count: u64,
},
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum StreamCursor {
Start,
End,
At(String),
}
impl StreamCursor {
pub fn from_beginning() -> Self {
Self::At("0-0".to_owned())
}
pub fn start() -> Self {
Self::Start
}
pub fn end() -> Self {
Self::End
}
pub fn beginning() -> Self {
Self::from_beginning()
}
#[doc(hidden)]
pub fn to_wire(&self) -> &str {
match self {
Self::Start => "-",
Self::End => "+",
Self::At(s) => s.as_str(),
}
}
#[doc(hidden)]
pub fn into_wire_string(self) -> String {
match self {
Self::Start => "-".to_owned(),
Self::End => "+".to_owned(),
Self::At(s) => s,
}
}
pub fn is_concrete(&self) -> bool {
matches!(self, Self::At(_))
}
}
impl std::fmt::Display for StreamCursor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Start => f.write_str("start"),
Self::End => f.write_str("end"),
Self::At(s) => f.write_str(s),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum StreamCursorParseError {
Empty,
BareMarkerRejected(String),
Malformed(String),
}
impl std::fmt::Display for StreamCursorParseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Empty => f.write_str("stream cursor must not be empty"),
Self::BareMarkerRejected(s) => write!(
f,
"bare marker '{s}' is not a valid stream cursor; use 'start' or 'end'"
),
Self::Malformed(s) => write!(
f,
"invalid stream cursor '{s}' (expected 'start', 'end', '<ms>', or '<ms>-<seq>')"
),
}
}
}
impl std::error::Error for StreamCursorParseError {}
enum StreamCursorClass {
Start,
End,
Concrete,
BareMarker,
Empty,
Malformed,
}
fn classify_stream_cursor(s: &str) -> StreamCursorClass {
if s.is_empty() {
return StreamCursorClass::Empty;
}
if s == "-" || s == "+" {
return StreamCursorClass::BareMarker;
}
if s == "start" {
return StreamCursorClass::Start;
}
if s == "end" {
return StreamCursorClass::End;
}
if !s.is_ascii() {
return StreamCursorClass::Malformed;
}
let (ms_part, seq_part) = match s.split_once('-') {
Some((ms, seq)) => (ms, Some(seq)),
None => (s, None),
};
let ms_ok = !ms_part.is_empty() && ms_part.bytes().all(|b| b.is_ascii_digit());
let seq_ok = seq_part
.map(|p| !p.is_empty() && p.bytes().all(|b| b.is_ascii_digit()))
.unwrap_or(true);
if ms_ok && seq_ok {
StreamCursorClass::Concrete
} else {
StreamCursorClass::Malformed
}
}
impl std::str::FromStr for StreamCursor {
type Err = StreamCursorParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match classify_stream_cursor(s) {
StreamCursorClass::Start => Ok(Self::Start),
StreamCursorClass::End => Ok(Self::End),
StreamCursorClass::Concrete => Ok(Self::At(s.to_owned())),
StreamCursorClass::BareMarker => {
Err(StreamCursorParseError::BareMarkerRejected(s.to_owned()))
}
StreamCursorClass::Empty => Err(StreamCursorParseError::Empty),
StreamCursorClass::Malformed => {
Err(StreamCursorParseError::Malformed(s.to_owned()))
}
}
}
}
impl TryFrom<String> for StreamCursor {
type Error = StreamCursorParseError;
fn try_from(s: String) -> Result<Self, Self::Error> {
match classify_stream_cursor(&s) {
StreamCursorClass::Start => Ok(Self::Start),
StreamCursorClass::End => Ok(Self::End),
StreamCursorClass::Concrete => Ok(Self::At(s)),
StreamCursorClass::BareMarker => {
Err(StreamCursorParseError::BareMarkerRejected(s))
}
StreamCursorClass::Empty => Err(StreamCursorParseError::Empty),
StreamCursorClass::Malformed => Err(StreamCursorParseError::Malformed(s)),
}
}
}
impl From<StreamCursor> for String {
fn from(c: StreamCursor) -> Self {
c.to_string()
}
}
impl Serialize for StreamCursor {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
serializer.collect_str(self)
}
}
impl<'de> Deserialize<'de> for StreamCursor {
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let s = String::deserialize(deserializer)?;
Self::try_from(s).map_err(serde::de::Error::custom)
}
}
pub const STREAM_READ_HARD_CAP: u64 = 10_000;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct StreamFrame {
pub id: String,
pub fields: std::collections::BTreeMap<String, String>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ReadFramesArgs {
pub execution_id: ExecutionId,
pub attempt_index: AttemptIndex,
pub from_id: String,
pub to_id: String,
pub count_limit: u64,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct StreamFrames {
pub frames: Vec<StreamFrame>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub closed_at: Option<TimestampMs>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub closed_reason: Option<String>,
}
impl StreamFrames {
pub fn empty_open() -> Self {
Self {
frames: Vec::new(),
closed_at: None,
closed_reason: None,
}
}
pub fn is_closed(&self) -> bool {
self.closed_at.is_some()
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ReadFramesResult {
Frames(StreamFrames),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CreateBudgetArgs {
pub budget_id: crate::types::BudgetId,
pub scope_type: String,
pub scope_id: String,
pub enforcement_mode: String,
pub on_hard_limit: String,
pub on_soft_limit: String,
pub reset_interval_ms: u64,
pub dimensions: Vec<String>,
pub hard_limits: Vec<u64>,
pub soft_limits: Vec<u64>,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum CreateBudgetResult {
Created { budget_id: crate::types::BudgetId },
AlreadySatisfied { budget_id: crate::types::BudgetId },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CreateQuotaPolicyArgs {
pub quota_policy_id: crate::types::QuotaPolicyId,
pub window_seconds: u64,
pub max_requests_per_window: u64,
pub max_concurrent: u64,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum CreateQuotaPolicyResult {
Created {
quota_policy_id: crate::types::QuotaPolicyId,
},
AlreadySatisfied {
quota_policy_id: crate::types::QuotaPolicyId,
},
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct BudgetStatus {
pub budget_id: String,
pub scope_type: String,
pub scope_id: String,
pub enforcement_mode: String,
pub usage: HashMap<String, u64>,
pub hard_limits: HashMap<String, u64>,
pub soft_limits: HashMap<String, u64>,
pub breach_count: u64,
pub soft_breach_count: u64,
pub last_breach_at: Option<String>,
pub last_breach_dim: Option<String>,
pub next_reset_at: Option<String>,
pub created_at: Option<String>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ReportUsageArgs {
pub dimensions: Vec<String>,
pub deltas: Vec<u64>,
pub now: TimestampMs,
#[serde(default)]
pub dedup_key: Option<String>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ReportUsageResult {
Ok,
SoftBreach {
dimension: String,
current_usage: u64,
soft_limit: u64,
},
HardBreach {
dimension: String,
current_usage: u64,
hard_limit: u64,
},
AlreadyApplied,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ResetBudgetArgs {
pub budget_id: crate::types::BudgetId,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ResetBudgetResult {
Reset { next_reset_at: TimestampMs },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CheckAdmissionArgs {
pub execution_id: ExecutionId,
pub now: TimestampMs,
pub window_seconds: u64,
pub rate_limit: u64,
pub concurrency_cap: u64,
#[serde(default)]
pub jitter_ms: Option<u64>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum CheckAdmissionResult {
Admitted,
AlreadyAdmitted,
RateExceeded { retry_after_ms: u64 },
ConcurrencyExceeded,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ReleaseAdmissionArgs {
pub execution_id: ExecutionId,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ReleaseAdmissionResult {
Released,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct BlockExecutionArgs {
pub execution_id: ExecutionId,
pub blocking_reason: String,
#[serde(default)]
pub blocking_detail: Option<String>,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum BlockExecutionResult {
Blocked,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct UnblockExecutionArgs {
pub execution_id: ExecutionId,
pub now: TimestampMs,
#[serde(default)]
pub expected_blocking_reason: Option<String>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum UnblockExecutionResult {
Unblocked,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CreateFlowArgs {
pub flow_id: crate::types::FlowId,
pub flow_kind: String,
pub namespace: Namespace,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum CreateFlowResult {
Created { flow_id: crate::types::FlowId },
AlreadySatisfied { flow_id: crate::types::FlowId },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AddExecutionToFlowArgs {
pub flow_id: crate::types::FlowId,
pub execution_id: ExecutionId,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum AddExecutionToFlowResult {
Added {
execution_id: ExecutionId,
new_node_count: u32,
},
AlreadyMember {
execution_id: ExecutionId,
node_count: u32,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CancelFlowArgs {
pub flow_id: crate::types::FlowId,
pub reason: String,
pub cancellation_policy: String,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum CancelFlowResult {
Cancelled {
cancellation_policy: String,
member_execution_ids: Vec<String>,
},
CancellationScheduled {
cancellation_policy: String,
member_count: u32,
member_execution_ids: Vec<String>,
},
PartiallyCancelled {
cancellation_policy: String,
member_execution_ids: Vec<String>,
failed_member_execution_ids: Vec<String>,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct StageDependencyEdgeArgs {
pub flow_id: crate::types::FlowId,
pub edge_id: crate::types::EdgeId,
pub upstream_execution_id: ExecutionId,
pub downstream_execution_id: ExecutionId,
#[serde(default = "default_dependency_kind")]
pub dependency_kind: String,
#[serde(default)]
pub data_passing_ref: Option<String>,
pub expected_graph_revision: u64,
pub now: TimestampMs,
}
fn default_dependency_kind() -> String {
"success_only".to_owned()
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum StageDependencyEdgeResult {
Staged {
edge_id: crate::types::EdgeId,
new_graph_revision: u64,
},
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ApplyDependencyToChildArgs {
pub flow_id: crate::types::FlowId,
pub edge_id: crate::types::EdgeId,
pub downstream_execution_id: ExecutionId,
pub upstream_execution_id: ExecutionId,
pub graph_revision: u64,
#[serde(default = "default_dependency_kind")]
pub dependency_kind: String,
#[serde(default)]
pub data_passing_ref: Option<String>,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ApplyDependencyToChildResult {
Applied { unsatisfied_count: u32 },
AlreadyApplied,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ResolveDependencyArgs {
pub edge_id: crate::types::EdgeId,
pub upstream_outcome: String,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ResolveDependencyResult {
Satisfied,
Impossible,
AlreadyResolved,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PromoteBlockedToEligibleArgs {
pub execution_id: ExecutionId,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum PromoteBlockedToEligibleResult {
Promoted,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EvaluateFlowEligibilityArgs {
pub execution_id: ExecutionId,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum EvaluateFlowEligibilityResult {
Status { status: String },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ReplayExecutionArgs {
pub execution_id: ExecutionId,
pub now: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ReplayExecutionResult {
Replayed { public_state: PublicState },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ExecutionInfo {
pub execution_id: ExecutionId,
pub namespace: String,
pub lane_id: String,
pub priority: i32,
pub execution_kind: String,
pub state_vector: StateVector,
pub public_state: PublicState,
pub created_at: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub started_at: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub completed_at: Option<String>,
pub current_attempt_index: u32,
pub flow_id: Option<String>,
pub blocking_detail: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SetExecutionTagsArgs {
pub execution_id: ExecutionId,
pub tags: BTreeMap<String, String>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum SetExecutionTagsResult {
Ok { count: u32 },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SetFlowTagsArgs {
pub flow_id: FlowId,
pub tags: BTreeMap<String, String>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum SetFlowTagsResult {
Ok { count: u32 },
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct ExecutionSnapshot {
pub execution_id: ExecutionId,
pub flow_id: Option<FlowId>,
pub lane_id: LaneId,
pub namespace: Namespace,
pub public_state: PublicState,
pub blocking_reason: Option<String>,
pub blocking_detail: Option<String>,
pub current_attempt: Option<AttemptSummary>,
pub current_lease: Option<LeaseSummary>,
pub current_waitpoint: Option<WaitpointId>,
pub created_at: TimestampMs,
pub last_mutation_at: TimestampMs,
pub total_attempt_count: u32,
pub tags: BTreeMap<String, String>,
}
impl ExecutionSnapshot {
#[allow(clippy::too_many_arguments)]
pub fn new(
execution_id: ExecutionId,
flow_id: Option<FlowId>,
lane_id: LaneId,
namespace: Namespace,
public_state: PublicState,
blocking_reason: Option<String>,
blocking_detail: Option<String>,
current_attempt: Option<AttemptSummary>,
current_lease: Option<LeaseSummary>,
current_waitpoint: Option<WaitpointId>,
created_at: TimestampMs,
last_mutation_at: TimestampMs,
total_attempt_count: u32,
tags: BTreeMap<String, String>,
) -> Self {
Self {
execution_id,
flow_id,
lane_id,
namespace,
public_state,
blocking_reason,
blocking_detail,
current_attempt,
current_lease,
current_waitpoint,
created_at,
last_mutation_at,
total_attempt_count,
tags,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct AttemptSummary {
pub attempt_id: AttemptId,
pub attempt_index: AttemptIndex,
}
impl AttemptSummary {
pub fn new(attempt_id: AttemptId, attempt_index: AttemptIndex) -> Self {
Self {
attempt_id,
attempt_index,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct LeaseSummary {
pub lease_epoch: LeaseEpoch,
pub worker_instance_id: WorkerInstanceId,
pub expires_at: TimestampMs,
}
impl LeaseSummary {
pub fn new(
lease_epoch: LeaseEpoch,
worker_instance_id: WorkerInstanceId,
expires_at: TimestampMs,
) -> Self {
Self {
lease_epoch,
worker_instance_id,
expires_at,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct FlowSnapshot {
pub flow_id: FlowId,
pub flow_kind: String,
pub namespace: Namespace,
pub public_flow_state: String,
pub graph_revision: u64,
pub node_count: u32,
pub edge_count: u32,
pub created_at: TimestampMs,
pub last_mutation_at: TimestampMs,
pub cancelled_at: Option<TimestampMs>,
pub cancel_reason: Option<String>,
pub cancellation_policy: Option<String>,
pub tags: BTreeMap<String, String>,
}
impl FlowSnapshot {
#[allow(clippy::too_many_arguments)]
pub fn new(
flow_id: FlowId,
flow_kind: String,
namespace: Namespace,
public_flow_state: String,
graph_revision: u64,
node_count: u32,
edge_count: u32,
created_at: TimestampMs,
last_mutation_at: TimestampMs,
cancelled_at: Option<TimestampMs>,
cancel_reason: Option<String>,
cancellation_policy: Option<String>,
tags: BTreeMap<String, String>,
) -> Self {
Self {
flow_id,
flow_kind,
namespace,
public_flow_state,
graph_revision,
node_count,
edge_count,
created_at,
last_mutation_at,
cancelled_at,
cancel_reason,
cancellation_policy,
tags,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct EdgeSnapshot {
pub edge_id: EdgeId,
pub flow_id: FlowId,
pub upstream_execution_id: ExecutionId,
pub downstream_execution_id: ExecutionId,
pub dependency_kind: String,
pub satisfaction_condition: String,
pub data_passing_ref: Option<String>,
pub edge_state: String,
pub created_at: TimestampMs,
pub created_by: String,
}
impl EdgeSnapshot {
#[allow(clippy::too_many_arguments)]
pub fn new(
edge_id: EdgeId,
flow_id: FlowId,
upstream_execution_id: ExecutionId,
downstream_execution_id: ExecutionId,
dependency_kind: String,
satisfaction_condition: String,
data_passing_ref: Option<String>,
edge_state: String,
created_at: TimestampMs,
created_by: String,
) -> Self {
Self {
edge_id,
flow_id,
upstream_execution_id,
downstream_execution_id,
dependency_kind,
satisfaction_condition,
data_passing_ref,
edge_state,
created_at,
created_by,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct StateSummary {
pub state_vector: StateVector,
pub current_attempt_index: AttemptIndex,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::FlowId;
#[test]
fn create_execution_args_serde() {
let config = crate::partition::PartitionConfig::default();
let args = CreateExecutionArgs {
execution_id: ExecutionId::for_flow(&FlowId::new(), &config),
namespace: Namespace::new("test"),
lane_id: LaneId::new("default"),
execution_kind: "llm_call".to_owned(),
input_payload: b"hello".to_vec(),
payload_encoding: Some("json".to_owned()),
priority: 0,
creator_identity: "test-user".to_owned(),
idempotency_key: None,
tags: HashMap::new(),
policy: None,
delay_until: None,
execution_deadline_at: None,
partition_id: 42,
now: TimestampMs::now(),
};
let json = serde_json::to_string(&args).unwrap();
let parsed: CreateExecutionArgs = serde_json::from_str(&json).unwrap();
assert_eq!(args.execution_id, parsed.execution_id);
}
#[test]
fn claim_result_serde() {
let config = crate::partition::PartitionConfig::default();
let result = ClaimExecutionResult::Claimed(ClaimedExecution {
execution_id: ExecutionId::for_flow(&FlowId::new(), &config),
lease_id: LeaseId::new(),
lease_epoch: LeaseEpoch::new(1),
attempt_index: AttemptIndex::new(0),
attempt_id: AttemptId::new(),
attempt_type: AttemptType::Initial,
lease_expires_at: TimestampMs::from_millis(1000),
});
let json = serde_json::to_string(&result).unwrap();
let parsed: ClaimExecutionResult = serde_json::from_str(&json).unwrap();
assert_eq!(result, parsed);
}
#[test]
fn stream_cursor_display_matches_wire_tokens() {
assert_eq!(StreamCursor::Start.to_string(), "start");
assert_eq!(StreamCursor::End.to_string(), "end");
assert_eq!(StreamCursor::At("123".into()).to_string(), "123");
assert_eq!(StreamCursor::At("123-4".into()).to_string(), "123-4");
}
#[test]
fn stream_cursor_to_wire_maps_to_valkey_markers() {
assert_eq!(StreamCursor::Start.to_wire(), "-");
assert_eq!(StreamCursor::End.to_wire(), "+");
assert_eq!(StreamCursor::At("0-0".into()).to_wire(), "0-0");
assert_eq!(StreamCursor::At("17-3".into()).to_wire(), "17-3");
}
#[test]
fn stream_cursor_from_str_accepts_wire_tokens() {
use std::str::FromStr;
assert_eq!(StreamCursor::from_str("start").unwrap(), StreamCursor::Start);
assert_eq!(StreamCursor::from_str("end").unwrap(), StreamCursor::End);
assert_eq!(
StreamCursor::from_str("123").unwrap(),
StreamCursor::At("123".into())
);
assert_eq!(
StreamCursor::from_str("0-0").unwrap(),
StreamCursor::At("0-0".into())
);
assert_eq!(
StreamCursor::from_str("1713100800150-0").unwrap(),
StreamCursor::At("1713100800150-0".into())
);
}
#[test]
fn stream_cursor_from_str_rejects_bare_markers() {
use std::str::FromStr;
assert!(matches!(
StreamCursor::from_str("-"),
Err(StreamCursorParseError::BareMarkerRejected(s)) if s == "-"
));
assert!(matches!(
StreamCursor::from_str("+"),
Err(StreamCursorParseError::BareMarkerRejected(s)) if s == "+"
));
}
#[test]
fn stream_cursor_from_str_rejects_empty() {
use std::str::FromStr;
assert_eq!(
StreamCursor::from_str(""),
Err(StreamCursorParseError::Empty)
);
}
#[test]
fn stream_cursor_from_str_rejects_malformed() {
use std::str::FromStr;
for bad in ["abc", "-1", "1-", "-1-2", "1-2-3", "1.2", "1 2", "Start", "END"] {
assert!(
matches!(
StreamCursor::from_str(bad),
Err(StreamCursorParseError::Malformed(_))
),
"must reject {bad:?}",
);
}
}
#[test]
fn stream_cursor_from_str_rejects_non_ascii() {
use std::str::FromStr;
assert!(matches!(
StreamCursor::from_str("1\u{2013}2"),
Err(StreamCursorParseError::Malformed(_))
));
}
#[test]
fn stream_cursor_serde_round_trip() {
for c in [
StreamCursor::Start,
StreamCursor::End,
StreamCursor::At("0-0".into()),
StreamCursor::At("1713100800150-0".into()),
] {
let json = serde_json::to_string(&c).unwrap();
let back: StreamCursor = serde_json::from_str(&json).unwrap();
assert_eq!(back, c);
}
}
#[test]
fn stream_cursor_serializes_as_bare_string() {
assert_eq!(serde_json::to_string(&StreamCursor::Start).unwrap(), r#""start""#);
assert_eq!(serde_json::to_string(&StreamCursor::End).unwrap(), r#""end""#);
assert_eq!(
serde_json::to_string(&StreamCursor::At("123-0".into())).unwrap(),
r#""123-0""#
);
}
#[test]
fn stream_cursor_deserialize_rejects_bare_markers() {
assert!(serde_json::from_str::<StreamCursor>(r#""-""#).is_err());
assert!(serde_json::from_str::<StreamCursor>(r#""+""#).is_err());
}
#[test]
fn stream_cursor_from_beginning_is_zero_zero() {
assert_eq!(
StreamCursor::from_beginning(),
StreamCursor::At("0-0".into())
);
}
#[test]
fn stream_cursor_is_concrete_classifies_variants() {
assert!(!StreamCursor::Start.is_concrete());
assert!(!StreamCursor::End.is_concrete());
assert!(StreamCursor::At("0-0".into()).is_concrete());
assert!(StreamCursor::At("123-0".into()).is_concrete());
assert!(StreamCursor::from_beginning().is_concrete());
}
#[test]
fn stream_cursor_into_wire_string_moves_without_cloning() {
assert_eq!(StreamCursor::Start.into_wire_string(), "-");
assert_eq!(StreamCursor::End.into_wire_string(), "+");
assert_eq!(
StreamCursor::At("17-3".into()).into_wire_string(),
"17-3"
);
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ExecutionSummary {
pub execution_id: ExecutionId,
pub namespace: String,
pub lane_id: String,
pub execution_kind: String,
pub public_state: String,
pub priority: i32,
pub created_at: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ListExecutionsResult {
pub executions: Vec<ExecutionSummary>,
pub total_returned: usize,
}
#[derive(Clone, Debug)]
pub struct RotateWaitpointHmacSecretArgs {
pub new_kid: String,
pub new_secret_hex: String,
pub grace_ms: u64,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum RotateWaitpointHmacSecretOutcome {
Rotated {
previous_kid: Option<String>,
new_kid: String,
gc_count: u32,
},
Noop { kid: String },
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ListWaitpointHmacKidsArgs {}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct WaitpointHmacKids {
pub current_kid: Option<String>,
pub verifying: Vec<VerifyingKid>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct VerifyingKid {
pub kid: String,
pub expires_at_ms: i64,
}