use std::time::Duration;
use async_trait::async_trait;
use crate::backend::{
AppendFrameOutcome, CancelFlowPolicy, CancelFlowWait, CapabilitySet, ClaimPolicy,
FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal, PendingWaitpoint,
PrepareOutcome, ReclaimToken, ResumeSignal, SummaryDocument, TailVisibility,
};
use crate::contracts::{
CancelFlowResult, ExecutionSnapshot, FlowSnapshot, ReportUsageResult,
RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretAllResult, SeedOutcome,
SeedWaitpointHmacSecretArgs, SuspendArgs, SuspendOutcome,
};
#[cfg(feature = "core")]
use crate::contracts::{
AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
ApplyDependencyToChildResult, BudgetStatus, CancelExecutionArgs, CancelExecutionResult,
CancelFlowArgs, ChangePriorityArgs, ChangePriorityResult, ClaimForWorkerArgs,
ClaimForWorkerOutcome, ClaimResumedExecutionArgs, ClaimResumedExecutionResult,
CreateBudgetArgs, CreateBudgetResult, CreateExecutionArgs, CreateExecutionResult,
CreateFlowArgs, CreateFlowResult, CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
DeliverSignalArgs, DeliverSignalResult, EdgeDirection, EdgeSnapshot, ExecutionInfo,
ListExecutionsPage, ListFlowsPage, ListLanesPage, ListPendingWaitpointsArgs,
ListPendingWaitpointsResult, ListSuspendedPage, ReplayExecutionArgs, ReplayExecutionResult,
ReportUsageAdminArgs, ResetBudgetArgs, ResetBudgetResult, RevokeLeaseArgs, RevokeLeaseResult,
StageDependencyEdgeArgs, StageDependencyEdgeResult,
};
#[cfg(feature = "core")]
use crate::state::PublicState;
#[cfg(feature = "core")]
use crate::partition::PartitionKey;
#[cfg(feature = "streaming")]
use crate::contracts::{StreamCursor, StreamFrames};
use crate::engine_error::EngineError;
#[cfg(feature = "streaming")]
use crate::types::AttemptIndex;
#[cfg(feature = "core")]
use crate::types::EdgeId;
use crate::types::{BudgetId, ExecutionId, FlowId, LaneId, LeaseFence, TimestampMs};
#[async_trait]
pub trait EngineBackend: Send + Sync + 'static {
async fn claim(
&self,
lane: &LaneId,
capabilities: &CapabilitySet,
policy: ClaimPolicy,
) -> Result<Option<Handle>, EngineError>;
async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError>;
async fn progress(
&self,
handle: &Handle,
percent: Option<u8>,
message: Option<String>,
) -> Result<(), EngineError>;
async fn append_frame(
&self,
handle: &Handle,
frame: Frame,
) -> Result<AppendFrameOutcome, EngineError>;
async fn complete(&self, handle: &Handle, payload: Option<Vec<u8>>) -> Result<(), EngineError>;
async fn fail(
&self,
handle: &Handle,
reason: FailureReason,
classification: FailureClass,
) -> Result<FailOutcome, EngineError>;
async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError>;
async fn suspend(
&self,
handle: &Handle,
args: SuspendArgs,
) -> Result<SuspendOutcome, EngineError>;
async fn suspend_by_triple(
&self,
exec_id: ExecutionId,
triple: LeaseFence,
args: SuspendArgs,
) -> Result<SuspendOutcome, EngineError> {
let _ = (exec_id, triple, args);
Err(EngineError::Unavailable {
op: "suspend_by_triple",
})
}
async fn create_waitpoint(
&self,
handle: &Handle,
waitpoint_key: &str,
expires_in: Duration,
) -> Result<PendingWaitpoint, EngineError>;
async fn observe_signals(&self, handle: &Handle) -> Result<Vec<ResumeSignal>, EngineError>;
async fn claim_from_reclaim(&self, token: ReclaimToken) -> Result<Option<Handle>, EngineError>;
async fn delay(&self, handle: &Handle, delay_until: TimestampMs) -> Result<(), EngineError>;
async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError>;
async fn describe_execution(
&self,
id: &ExecutionId,
) -> Result<Option<ExecutionSnapshot>, EngineError>;
async fn describe_flow(&self, id: &FlowId) -> Result<Option<FlowSnapshot>, EngineError>;
#[cfg(feature = "core")]
async fn list_edges(
&self,
flow_id: &FlowId,
direction: EdgeDirection,
) -> Result<Vec<EdgeSnapshot>, EngineError>;
#[cfg(feature = "core")]
async fn describe_edge(
&self,
flow_id: &FlowId,
edge_id: &EdgeId,
) -> Result<Option<EdgeSnapshot>, EngineError>;
#[cfg(feature = "core")]
async fn resolve_execution_flow_id(
&self,
eid: &ExecutionId,
) -> Result<Option<FlowId>, EngineError>;
#[cfg(feature = "core")]
async fn list_flows(
&self,
partition: PartitionKey,
cursor: Option<FlowId>,
limit: usize,
) -> Result<ListFlowsPage, EngineError>;
#[cfg(feature = "core")]
async fn list_lanes(
&self,
cursor: Option<LaneId>,
limit: usize,
) -> Result<ListLanesPage, EngineError>;
#[cfg(feature = "core")]
async fn list_suspended(
&self,
partition: PartitionKey,
cursor: Option<ExecutionId>,
limit: usize,
) -> Result<ListSuspendedPage, EngineError>;
#[cfg(feature = "core")]
async fn list_executions(
&self,
partition: PartitionKey,
cursor: Option<ExecutionId>,
limit: usize,
) -> Result<ListExecutionsPage, EngineError>;
#[cfg(feature = "core")]
async fn deliver_signal(
&self,
args: DeliverSignalArgs,
) -> Result<DeliverSignalResult, EngineError>;
#[cfg(feature = "core")]
async fn claim_resumed_execution(
&self,
args: ClaimResumedExecutionArgs,
) -> Result<ClaimResumedExecutionResult, EngineError>;
async fn cancel_flow(
&self,
id: &FlowId,
policy: CancelFlowPolicy,
wait: CancelFlowWait,
) -> Result<CancelFlowResult, EngineError>;
#[cfg(feature = "core")]
async fn set_edge_group_policy(
&self,
flow_id: &FlowId,
downstream_execution_id: &ExecutionId,
policy: crate::contracts::EdgeDependencyPolicy,
) -> Result<crate::contracts::SetEdgeGroupPolicyResult, EngineError>;
async fn rotate_waitpoint_hmac_secret_all(
&self,
_args: RotateWaitpointHmacSecretAllArgs,
) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
Err(EngineError::Unavailable {
op: "rotate_waitpoint_hmac_secret_all",
})
}
async fn seed_waitpoint_hmac_secret(
&self,
_args: SeedWaitpointHmacSecretArgs,
) -> Result<SeedOutcome, EngineError> {
Err(EngineError::Unavailable {
op: "seed_waitpoint_hmac_secret",
})
}
async fn report_usage(
&self,
handle: &Handle,
budget: &BudgetId,
dimensions: crate::backend::UsageDimensions,
) -> Result<ReportUsageResult, EngineError>;
#[cfg(feature = "streaming")]
async fn read_stream(
&self,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
from: StreamCursor,
to: StreamCursor,
count_limit: u64,
) -> Result<StreamFrames, EngineError>;
#[cfg(feature = "streaming")]
async fn tail_stream(
&self,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
after: StreamCursor,
block_ms: u64,
count_limit: u64,
visibility: TailVisibility,
) -> Result<StreamFrames, EngineError>;
#[cfg(feature = "streaming")]
async fn read_summary(
&self,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
) -> Result<Option<SummaryDocument>, EngineError>;
#[cfg(feature = "core")]
async fn create_execution(
&self,
_args: CreateExecutionArgs,
) -> Result<CreateExecutionResult, EngineError> {
Err(EngineError::Unavailable {
op: "create_execution",
})
}
#[cfg(feature = "core")]
async fn create_flow(
&self,
_args: CreateFlowArgs,
) -> Result<CreateFlowResult, EngineError> {
Err(EngineError::Unavailable { op: "create_flow" })
}
#[cfg(feature = "core")]
async fn add_execution_to_flow(
&self,
_args: AddExecutionToFlowArgs,
) -> Result<AddExecutionToFlowResult, EngineError> {
Err(EngineError::Unavailable {
op: "add_execution_to_flow",
})
}
#[cfg(feature = "core")]
async fn stage_dependency_edge(
&self,
_args: StageDependencyEdgeArgs,
) -> Result<StageDependencyEdgeResult, EngineError> {
Err(EngineError::Unavailable {
op: "stage_dependency_edge",
})
}
#[cfg(feature = "core")]
async fn apply_dependency_to_child(
&self,
_args: ApplyDependencyToChildArgs,
) -> Result<ApplyDependencyToChildResult, EngineError> {
Err(EngineError::Unavailable {
op: "apply_dependency_to_child",
})
}
#[cfg(feature = "core")]
async fn cancel_execution(
&self,
_args: CancelExecutionArgs,
) -> Result<CancelExecutionResult, EngineError> {
Err(EngineError::Unavailable {
op: "cancel_execution",
})
}
#[cfg(feature = "core")]
async fn change_priority(
&self,
_args: ChangePriorityArgs,
) -> Result<ChangePriorityResult, EngineError> {
Err(EngineError::Unavailable {
op: "change_priority",
})
}
#[cfg(feature = "core")]
async fn replay_execution(
&self,
_args: ReplayExecutionArgs,
) -> Result<ReplayExecutionResult, EngineError> {
Err(EngineError::Unavailable {
op: "replay_execution",
})
}
#[cfg(feature = "core")]
async fn revoke_lease(
&self,
_args: RevokeLeaseArgs,
) -> Result<RevokeLeaseResult, EngineError> {
Err(EngineError::Unavailable { op: "revoke_lease" })
}
#[cfg(feature = "core")]
async fn create_budget(
&self,
_args: CreateBudgetArgs,
) -> Result<CreateBudgetResult, EngineError> {
Err(EngineError::Unavailable {
op: "create_budget",
})
}
#[cfg(feature = "core")]
async fn reset_budget(
&self,
_args: ResetBudgetArgs,
) -> Result<ResetBudgetResult, EngineError> {
Err(EngineError::Unavailable { op: "reset_budget" })
}
#[cfg(feature = "core")]
async fn create_quota_policy(
&self,
_args: CreateQuotaPolicyArgs,
) -> Result<CreateQuotaPolicyResult, EngineError> {
Err(EngineError::Unavailable {
op: "create_quota_policy",
})
}
#[cfg(feature = "core")]
async fn get_budget_status(
&self,
_id: &BudgetId,
) -> Result<BudgetStatus, EngineError> {
Err(EngineError::Unavailable {
op: "get_budget_status",
})
}
#[cfg(feature = "core")]
async fn report_usage_admin(
&self,
_budget: &BudgetId,
_args: ReportUsageAdminArgs,
) -> Result<ReportUsageResult, EngineError> {
Err(EngineError::Unavailable {
op: "report_usage_admin",
})
}
async fn get_execution_result(
&self,
_id: &ExecutionId,
) -> Result<Option<Vec<u8>>, EngineError> {
Err(EngineError::Unavailable {
op: "get_execution_result",
})
}
#[cfg(feature = "core")]
async fn list_pending_waitpoints(
&self,
_args: ListPendingWaitpointsArgs,
) -> Result<ListPendingWaitpointsResult, EngineError> {
Err(EngineError::Unavailable {
op: "list_pending_waitpoints",
})
}
async fn ping(&self) -> Result<(), EngineError> {
Err(EngineError::Unavailable { op: "ping" })
}
#[cfg(feature = "core")]
async fn claim_for_worker(
&self,
_args: ClaimForWorkerArgs,
) -> Result<ClaimForWorkerOutcome, EngineError> {
Err(EngineError::Unavailable {
op: "claim_for_worker",
})
}
fn backend_label(&self) -> &'static str {
"unknown"
}
fn capabilities(&self) -> crate::capability::Capabilities {
crate::capability::Capabilities::new(
crate::capability::BackendIdentity::new(
"unknown",
crate::capability::Version::new(0, 0, 0),
"unknown",
),
crate::capability::Supports::none(),
)
}
async fn prepare(&self) -> Result<PrepareOutcome, EngineError> {
Ok(PrepareOutcome::NoOp)
}
async fn shutdown_prepare(&self, _grace: Duration) -> Result<(), EngineError> {
Ok(())
}
#[cfg(feature = "core")]
async fn cancel_flow_header(
&self,
_args: CancelFlowArgs,
) -> Result<crate::contracts::CancelFlowHeader, EngineError> {
Err(EngineError::Unavailable {
op: "cancel_flow_header",
})
}
#[cfg(feature = "core")]
async fn ack_cancel_member(
&self,
_flow_id: &FlowId,
_execution_id: &ExecutionId,
) -> Result<(), EngineError> {
Err(EngineError::Unavailable {
op: "ack_cancel_member",
})
}
#[cfg(feature = "core")]
async fn read_execution_info(
&self,
_id: &ExecutionId,
) -> Result<Option<ExecutionInfo>, EngineError> {
Err(EngineError::Unavailable {
op: "read_execution_info",
})
}
#[cfg(feature = "core")]
async fn read_execution_state(
&self,
_id: &ExecutionId,
) -> Result<Option<PublicState>, EngineError> {
Err(EngineError::Unavailable {
op: "read_execution_state",
})
}
async fn subscribe_lease_history(
&self,
_cursor: crate::stream_subscribe::StreamCursor,
_filter: &crate::backend::ScannerFilter,
) -> Result<crate::stream_events::LeaseHistorySubscription, EngineError> {
Err(EngineError::Unavailable {
op: "subscribe_lease_history",
})
}
async fn subscribe_completion(
&self,
_cursor: crate::stream_subscribe::StreamCursor,
_filter: &crate::backend::ScannerFilter,
) -> Result<crate::stream_events::CompletionSubscription, EngineError> {
Err(EngineError::Unavailable {
op: "subscribe_completion",
})
}
async fn subscribe_signal_delivery(
&self,
_cursor: crate::stream_subscribe::StreamCursor,
_filter: &crate::backend::ScannerFilter,
) -> Result<crate::stream_events::SignalDeliverySubscription, EngineError> {
Err(EngineError::Unavailable {
op: "subscribe_signal_delivery",
})
}
async fn subscribe_instance_tags(
&self,
_cursor: crate::stream_subscribe::StreamCursor,
) -> Result<crate::stream_events::InstanceTagSubscription, EngineError> {
Err(EngineError::Unavailable {
op: "subscribe_instance_tags",
})
}
}
#[allow(dead_code)]
fn _assert_dyn_compatible(_: &dyn EngineBackend) {}
const CANCEL_WAIT_POLL_INTERVAL: Duration = Duration::from_millis(100);
const CANCEL_WAIT_INDEFINITE_CEILING: Duration = Duration::from_secs(300);
pub async fn wait_for_flow_cancellation<B: EngineBackend + ?Sized>(
backend: &B,
flow_id: &crate::types::FlowId,
deadline: Duration,
) -> Result<(), EngineError> {
let start = std::time::Instant::now();
loop {
match backend.describe_flow(flow_id).await? {
Some(snap) if snap.public_flow_state == "cancelled" => return Ok(()),
None => return Ok(()),
Some(_) => {}
}
if start.elapsed() >= deadline {
return Err(EngineError::Timeout {
op: "cancel_flow",
elapsed: deadline,
});
}
tokio::time::sleep(CANCEL_WAIT_POLL_INTERVAL).await;
}
}
pub fn cancel_flow_wait_deadline(wait: CancelFlowWait) -> Option<Duration> {
match wait {
CancelFlowWait::NoWait => None,
CancelFlowWait::WaitTimeout(d) => Some(d),
CancelFlowWait::WaitIndefinite => Some(CANCEL_WAIT_INDEFINITE_CEILING),
}
}
#[cfg(test)]
mod tests {
use super::*;
struct DefaultBackend;
#[async_trait]
impl EngineBackend for DefaultBackend {
async fn claim(
&self,
_lane: &LaneId,
_capabilities: &CapabilitySet,
_policy: ClaimPolicy,
) -> Result<Option<Handle>, EngineError> {
unreachable!()
}
async fn renew(&self, _handle: &Handle) -> Result<LeaseRenewal, EngineError> {
unreachable!()
}
async fn progress(
&self,
_handle: &Handle,
_percent: Option<u8>,
_message: Option<String>,
) -> Result<(), EngineError> {
unreachable!()
}
async fn append_frame(
&self,
_handle: &Handle,
_frame: Frame,
) -> Result<AppendFrameOutcome, EngineError> {
unreachable!()
}
async fn complete(
&self,
_handle: &Handle,
_payload: Option<Vec<u8>>,
) -> Result<(), EngineError> {
unreachable!()
}
async fn fail(
&self,
_handle: &Handle,
_reason: FailureReason,
_classification: FailureClass,
) -> Result<FailOutcome, EngineError> {
unreachable!()
}
async fn cancel(&self, _handle: &Handle, _reason: &str) -> Result<(), EngineError> {
unreachable!()
}
async fn suspend(
&self,
_handle: &Handle,
_args: SuspendArgs,
) -> Result<SuspendOutcome, EngineError> {
unreachable!()
}
async fn create_waitpoint(
&self,
_handle: &Handle,
_waitpoint_key: &str,
_expires_in: Duration,
) -> Result<PendingWaitpoint, EngineError> {
unreachable!()
}
async fn observe_signals(
&self,
_handle: &Handle,
) -> Result<Vec<ResumeSignal>, EngineError> {
unreachable!()
}
async fn claim_from_reclaim(
&self,
_token: ReclaimToken,
) -> Result<Option<Handle>, EngineError> {
unreachable!()
}
async fn delay(
&self,
_handle: &Handle,
_delay_until: TimestampMs,
) -> Result<(), EngineError> {
unreachable!()
}
async fn wait_children(&self, _handle: &Handle) -> Result<(), EngineError> {
unreachable!()
}
async fn describe_execution(
&self,
_id: &ExecutionId,
) -> Result<Option<ExecutionSnapshot>, EngineError> {
unreachable!()
}
async fn describe_flow(
&self,
_id: &FlowId,
) -> Result<Option<FlowSnapshot>, EngineError> {
unreachable!()
}
#[cfg(feature = "core")]
async fn list_edges(
&self,
_flow_id: &FlowId,
_direction: EdgeDirection,
) -> Result<Vec<EdgeSnapshot>, EngineError> {
unreachable!()
}
#[cfg(feature = "core")]
async fn describe_edge(
&self,
_flow_id: &FlowId,
_edge_id: &EdgeId,
) -> Result<Option<EdgeSnapshot>, EngineError> {
unreachable!()
}
#[cfg(feature = "core")]
async fn resolve_execution_flow_id(
&self,
_eid: &ExecutionId,
) -> Result<Option<FlowId>, EngineError> {
unreachable!()
}
#[cfg(feature = "core")]
async fn list_flows(
&self,
_partition: PartitionKey,
_cursor: Option<FlowId>,
_limit: usize,
) -> Result<ListFlowsPage, EngineError> {
unreachable!()
}
#[cfg(feature = "core")]
async fn list_lanes(
&self,
_cursor: Option<LaneId>,
_limit: usize,
) -> Result<ListLanesPage, EngineError> {
unreachable!()
}
#[cfg(feature = "core")]
async fn list_suspended(
&self,
_partition: PartitionKey,
_cursor: Option<ExecutionId>,
_limit: usize,
) -> Result<ListSuspendedPage, EngineError> {
unreachable!()
}
#[cfg(feature = "core")]
async fn list_executions(
&self,
_partition: PartitionKey,
_cursor: Option<ExecutionId>,
_limit: usize,
) -> Result<ListExecutionsPage, EngineError> {
unreachable!()
}
#[cfg(feature = "core")]
async fn deliver_signal(
&self,
_args: DeliverSignalArgs,
) -> Result<DeliverSignalResult, EngineError> {
unreachable!()
}
#[cfg(feature = "core")]
async fn claim_resumed_execution(
&self,
_args: ClaimResumedExecutionArgs,
) -> Result<ClaimResumedExecutionResult, EngineError> {
unreachable!()
}
async fn cancel_flow(
&self,
_id: &FlowId,
_policy: CancelFlowPolicy,
_wait: CancelFlowWait,
) -> Result<CancelFlowResult, EngineError> {
unreachable!()
}
#[cfg(feature = "core")]
async fn set_edge_group_policy(
&self,
_flow_id: &FlowId,
_downstream_execution_id: &ExecutionId,
_policy: crate::contracts::EdgeDependencyPolicy,
) -> Result<crate::contracts::SetEdgeGroupPolicyResult, EngineError> {
unreachable!()
}
async fn report_usage(
&self,
_handle: &Handle,
_budget: &BudgetId,
_dimensions: crate::backend::UsageDimensions,
) -> Result<ReportUsageResult, EngineError> {
unreachable!()
}
#[cfg(feature = "streaming")]
async fn read_stream(
&self,
_execution_id: &ExecutionId,
_attempt_index: AttemptIndex,
_from: StreamCursor,
_to: StreamCursor,
_count_limit: u64,
) -> Result<StreamFrames, EngineError> {
unreachable!()
}
#[cfg(feature = "streaming")]
async fn tail_stream(
&self,
_execution_id: &ExecutionId,
_attempt_index: AttemptIndex,
_after: StreamCursor,
_block_ms: u64,
_count_limit: u64,
_visibility: TailVisibility,
) -> Result<StreamFrames, EngineError> {
unreachable!()
}
#[cfg(feature = "streaming")]
async fn read_summary(
&self,
_execution_id: &ExecutionId,
_attempt_index: AttemptIndex,
) -> Result<Option<SummaryDocument>, EngineError> {
unreachable!()
}
}
#[test]
fn default_capabilities_is_unknown_family_all_false() {
let b = DefaultBackend;
let caps = b.capabilities();
assert_eq!(caps.identity.family, "unknown");
assert_eq!(
caps.identity.version,
crate::capability::Version::new(0, 0, 0)
);
assert_eq!(caps.identity.rfc017_stage, "unknown");
assert_eq!(caps.supports, crate::capability::Supports::none());
}
}