use std::time::Duration;
use async_trait::async_trait;
use crate::backend::{
AppendFrameOutcome, CancelFlowPolicy, CancelFlowWait, CapabilitySet, ClaimPolicy,
FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal, PendingWaitpoint,
PrepareOutcome, ResumeSignal, ResumeToken,
};
#[cfg(feature = "streaming")]
use crate::backend::{SummaryDocument, TailVisibility};
use crate::contracts::{
CancelFlowResult, ExecutionContext, ExecutionSnapshot, FlowSnapshot, IssueReclaimGrantArgs,
IssueReclaimGrantOutcome, ReclaimExecutionArgs, ReclaimExecutionOutcome, ReportUsageResult,
RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretAllResult, SeedOutcome,
SeedWaitpointHmacSecretArgs, SuspendArgs, SuspendOutcome,
};
#[cfg(feature = "core")]
use crate::contracts::{
AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
ApplyDependencyToChildResult, BudgetStatus, CancelExecutionArgs, CancelExecutionResult,
CancelFlowArgs, ChangePriorityArgs, ChangePriorityResult, ClaimExecutionArgs,
ClaimExecutionResult, ClaimForWorkerArgs, ClaimForWorkerOutcome, ClaimResumedExecutionArgs,
ClaimResumedExecutionResult,
BlockRouteArgs, BlockRouteOutcome, CheckAdmissionArgs, CheckAdmissionResult,
ClaimGrantOutcome,
CompleteExecutionArgs, CompleteExecutionResult, CreateBudgetArgs, CreateBudgetResult,
CreateExecutionArgs, CreateExecutionResult, CreateFlowArgs, CreateFlowResult,
CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
DeliverApprovalSignalArgs, DeliverSignalArgs, DeliverSignalResult, EdgeDirection, EdgeSnapshot,
EvaluateFlowEligibilityArgs, EvaluateFlowEligibilityResult, ExecutionInfo,
FailExecutionArgs, FailExecutionResult,
IssueClaimGrantArgs, IssueClaimGrantOutcome, IssueGrantAndClaimArgs,
RecordSpendArgs, ReleaseBudgetArgs, ScanEligibleArgs,
ListExecutionsPage, ListFlowsPage, ListLanesPage, ListPendingWaitpointsArgs,
ListPendingWaitpointsResult, ListSuspendedPage, RenewLeaseArgs, RenewLeaseResult,
ReplayExecutionArgs, ReplayExecutionResult,
ReportUsageAdminArgs, ResetBudgetArgs, ResetBudgetResult, ResumeExecutionArgs,
ResumeExecutionResult, RevokeLeaseArgs, RevokeLeaseResult,
StageDependencyEdgeArgs, StageDependencyEdgeResult,
};
#[cfg(feature = "core")]
use crate::state::PublicState;
#[cfg(feature = "core")]
use crate::partition::PartitionKey;
#[cfg(feature = "streaming")]
use crate::contracts::{StreamCursor, StreamFrames};
use crate::engine_error::EngineError;
#[cfg(feature = "core")]
use crate::types::EdgeId;
#[cfg(feature = "core")]
use crate::types::WaitpointId;
use crate::types::{AttemptIndex, BudgetId, ExecutionId, FlowId, LaneId, LeaseFence, TimestampMs};
#[async_trait]
pub trait EngineBackend: Send + Sync + 'static {
async fn claim(
&self,
lane: &LaneId,
capabilities: &CapabilitySet,
policy: ClaimPolicy,
) -> Result<Option<Handle>, EngineError>;
async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError>;
async fn progress(
&self,
handle: &Handle,
percent: Option<u8>,
message: Option<String>,
) -> Result<(), EngineError>;
async fn append_frame(
&self,
handle: &Handle,
frame: Frame,
) -> Result<AppendFrameOutcome, EngineError>;
async fn complete(&self, handle: &Handle, payload: Option<Vec<u8>>) -> Result<(), EngineError>;
async fn fail(
&self,
handle: &Handle,
reason: FailureReason,
classification: FailureClass,
) -> Result<FailOutcome, EngineError>;
async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError>;
async fn suspend(
&self,
handle: &Handle,
args: SuspendArgs,
) -> Result<SuspendOutcome, EngineError>;
async fn suspend_by_triple(
&self,
exec_id: ExecutionId,
triple: LeaseFence,
args: SuspendArgs,
) -> Result<SuspendOutcome, EngineError> {
let _ = (exec_id, triple, args);
Err(EngineError::Unavailable {
op: "suspend_by_triple",
})
}
async fn create_waitpoint(
&self,
handle: &Handle,
waitpoint_key: &str,
expires_in: Duration,
) -> Result<PendingWaitpoint, EngineError>;
#[cfg(feature = "core")]
async fn read_waitpoint_token(
&self,
_partition: PartitionKey,
_waitpoint_id: &WaitpointId,
) -> Result<Option<String>, EngineError> {
Err(EngineError::Unavailable {
op: "read_waitpoint_token",
})
}
async fn observe_signals(&self, handle: &Handle) -> Result<Vec<ResumeSignal>, EngineError>;
async fn claim_from_resume_grant(
&self,
token: ResumeToken,
) -> Result<Option<Handle>, EngineError>;
async fn issue_reclaim_grant(
&self,
_args: IssueReclaimGrantArgs,
) -> Result<IssueReclaimGrantOutcome, EngineError> {
Err(EngineError::Unavailable {
op: "issue_reclaim_grant",
})
}
async fn reclaim_execution(
&self,
_args: ReclaimExecutionArgs,
) -> Result<ReclaimExecutionOutcome, EngineError> {
Err(EngineError::Unavailable {
op: "reclaim_execution",
})
}
async fn delay(&self, handle: &Handle, delay_until: TimestampMs) -> Result<(), EngineError>;
async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError>;
async fn describe_execution(
&self,
id: &ExecutionId,
) -> Result<Option<ExecutionSnapshot>, EngineError>;
async fn read_execution_context(
&self,
execution_id: &ExecutionId,
) -> Result<ExecutionContext, EngineError>;
async fn read_current_attempt_index(
&self,
_execution_id: &ExecutionId,
) -> Result<AttemptIndex, EngineError> {
Err(EngineError::Unavailable {
op: "read_current_attempt_index",
})
}
async fn read_total_attempt_count(
&self,
_execution_id: &ExecutionId,
) -> Result<AttemptIndex, EngineError> {
Err(EngineError::Unavailable {
op: "read_total_attempt_count",
})
}
async fn describe_flow(&self, id: &FlowId) -> Result<Option<FlowSnapshot>, EngineError>;
async fn set_execution_tag(
&self,
_execution_id: &ExecutionId,
_key: &str,
_value: &str,
) -> Result<(), EngineError> {
Err(EngineError::Unavailable {
op: "set_execution_tag",
})
}
async fn set_flow_tag(
&self,
_flow_id: &FlowId,
_key: &str,
_value: &str,
) -> Result<(), EngineError> {
Err(EngineError::Unavailable {
op: "set_flow_tag",
})
}
async fn get_execution_tag(
&self,
_execution_id: &ExecutionId,
_key: &str,
) -> Result<Option<String>, EngineError> {
Err(EngineError::Unavailable {
op: "get_execution_tag",
})
}
async fn get_execution_namespace(
&self,
_execution_id: &ExecutionId,
) -> Result<Option<String>, EngineError> {
Err(EngineError::Unavailable {
op: "get_execution_namespace",
})
}
async fn get_flow_tag(
&self,
_flow_id: &FlowId,
_key: &str,
) -> Result<Option<String>, EngineError> {
Err(EngineError::Unavailable {
op: "get_flow_tag",
})
}
#[cfg(feature = "core")]
async fn list_edges(
&self,
_flow_id: &FlowId,
_direction: EdgeDirection,
) -> Result<Vec<EdgeSnapshot>, EngineError> {
Err(EngineError::Unavailable { op: "list_edges" })
}
#[cfg(feature = "core")]
async fn describe_edge(
&self,
_flow_id: &FlowId,
_edge_id: &EdgeId,
) -> Result<Option<EdgeSnapshot>, EngineError> {
Err(EngineError::Unavailable {
op: "describe_edge",
})
}
#[cfg(feature = "core")]
async fn resolve_execution_flow_id(
&self,
_eid: &ExecutionId,
) -> Result<Option<FlowId>, EngineError> {
Err(EngineError::Unavailable {
op: "resolve_execution_flow_id",
})
}
#[cfg(feature = "core")]
async fn list_flows(
&self,
_partition: PartitionKey,
_cursor: Option<FlowId>,
_limit: usize,
) -> Result<ListFlowsPage, EngineError> {
Err(EngineError::Unavailable { op: "list_flows" })
}
#[cfg(feature = "core")]
async fn list_lanes(
&self,
_cursor: Option<LaneId>,
_limit: usize,
) -> Result<ListLanesPage, EngineError> {
Err(EngineError::Unavailable { op: "list_lanes" })
}
#[cfg(feature = "core")]
async fn list_suspended(
&self,
_partition: PartitionKey,
_cursor: Option<ExecutionId>,
_limit: usize,
) -> Result<ListSuspendedPage, EngineError> {
Err(EngineError::Unavailable {
op: "list_suspended",
})
}
#[cfg(feature = "core")]
async fn list_executions(
&self,
_partition: PartitionKey,
_cursor: Option<ExecutionId>,
_limit: usize,
) -> Result<ListExecutionsPage, EngineError> {
Err(EngineError::Unavailable {
op: "list_executions",
})
}
#[cfg(feature = "core")]
async fn deliver_signal(
&self,
_args: DeliverSignalArgs,
) -> Result<DeliverSignalResult, EngineError> {
Err(EngineError::Unavailable {
op: "deliver_signal",
})
}
#[cfg(feature = "core")]
async fn claim_resumed_execution(
&self,
_args: ClaimResumedExecutionArgs,
) -> Result<ClaimResumedExecutionResult, EngineError> {
Err(EngineError::Unavailable {
op: "claim_resumed_execution",
})
}
#[cfg(feature = "core")]
async fn scan_eligible_executions(
&self,
_args: ScanEligibleArgs,
) -> Result<Vec<ExecutionId>, EngineError> {
Err(EngineError::Unavailable {
op: "scan_eligible_executions",
})
}
#[cfg(feature = "core")]
async fn issue_claim_grant(
&self,
_args: IssueClaimGrantArgs,
) -> Result<IssueClaimGrantOutcome, EngineError> {
Err(EngineError::Unavailable {
op: "issue_claim_grant",
})
}
#[cfg(feature = "core")]
async fn block_route(
&self,
_args: BlockRouteArgs,
) -> Result<BlockRouteOutcome, EngineError> {
Err(EngineError::Unavailable { op: "block_route" })
}
#[cfg(feature = "core")]
async fn claim_execution(
&self,
_args: ClaimExecutionArgs,
) -> Result<ClaimExecutionResult, EngineError> {
Err(EngineError::Unavailable {
op: "claim_execution",
})
}
async fn cancel_flow(
&self,
id: &FlowId,
policy: CancelFlowPolicy,
wait: CancelFlowWait,
) -> Result<CancelFlowResult, EngineError>;
#[cfg(feature = "core")]
async fn set_edge_group_policy(
&self,
_flow_id: &FlowId,
_downstream_execution_id: &ExecutionId,
_policy: crate::contracts::EdgeDependencyPolicy,
) -> Result<crate::contracts::SetEdgeGroupPolicyResult, EngineError> {
Err(EngineError::Unavailable {
op: "set_edge_group_policy",
})
}
async fn rotate_waitpoint_hmac_secret_all(
&self,
_args: RotateWaitpointHmacSecretAllArgs,
) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
Err(EngineError::Unavailable {
op: "rotate_waitpoint_hmac_secret_all",
})
}
async fn seed_waitpoint_hmac_secret(
&self,
_args: SeedWaitpointHmacSecretArgs,
) -> Result<SeedOutcome, EngineError> {
Err(EngineError::Unavailable {
op: "seed_waitpoint_hmac_secret",
})
}
async fn report_usage(
&self,
handle: &Handle,
budget: &BudgetId,
dimensions: crate::backend::UsageDimensions,
) -> Result<ReportUsageResult, EngineError>;
#[cfg(feature = "streaming")]
async fn read_stream(
&self,
_execution_id: &ExecutionId,
_attempt_index: AttemptIndex,
_from: StreamCursor,
_to: StreamCursor,
_count_limit: u64,
) -> Result<StreamFrames, EngineError> {
Err(EngineError::Unavailable { op: "read_stream" })
}
#[cfg(feature = "streaming")]
async fn tail_stream(
&self,
_execution_id: &ExecutionId,
_attempt_index: AttemptIndex,
_after: StreamCursor,
_block_ms: u64,
_count_limit: u64,
_visibility: TailVisibility,
) -> Result<StreamFrames, EngineError> {
Err(EngineError::Unavailable { op: "tail_stream" })
}
#[cfg(feature = "streaming")]
async fn read_summary(
&self,
_execution_id: &ExecutionId,
_attempt_index: AttemptIndex,
) -> Result<Option<SummaryDocument>, EngineError> {
Err(EngineError::Unavailable {
op: "read_summary",
})
}
#[cfg(feature = "core")]
async fn create_execution(
&self,
_args: CreateExecutionArgs,
) -> Result<CreateExecutionResult, EngineError> {
Err(EngineError::Unavailable {
op: "create_execution",
})
}
#[cfg(feature = "core")]
async fn create_flow(
&self,
_args: CreateFlowArgs,
) -> Result<CreateFlowResult, EngineError> {
Err(EngineError::Unavailable { op: "create_flow" })
}
#[cfg(feature = "core")]
async fn add_execution_to_flow(
&self,
_args: AddExecutionToFlowArgs,
) -> Result<AddExecutionToFlowResult, EngineError> {
Err(EngineError::Unavailable {
op: "add_execution_to_flow",
})
}
#[cfg(feature = "core")]
async fn stage_dependency_edge(
&self,
_args: StageDependencyEdgeArgs,
) -> Result<StageDependencyEdgeResult, EngineError> {
Err(EngineError::Unavailable {
op: "stage_dependency_edge",
})
}
#[cfg(feature = "core")]
async fn apply_dependency_to_child(
&self,
_args: ApplyDependencyToChildArgs,
) -> Result<ApplyDependencyToChildResult, EngineError> {
Err(EngineError::Unavailable {
op: "apply_dependency_to_child",
})
}
#[cfg(feature = "core")]
async fn resolve_dependency(
&self,
_args: crate::contracts::ResolveDependencyArgs,
) -> Result<crate::contracts::ResolveDependencyOutcome, EngineError> {
Err(EngineError::Unavailable {
op: "resolve_dependency",
})
}
#[cfg(feature = "core")]
async fn cascade_completion(
&self,
_payload: &crate::backend::CompletionPayload,
) -> Result<crate::contracts::CascadeOutcome, EngineError> {
Err(EngineError::Unavailable {
op: "cascade_completion",
})
}
#[cfg(feature = "core")]
async fn cancel_execution(
&self,
_args: CancelExecutionArgs,
) -> Result<CancelExecutionResult, EngineError> {
Err(EngineError::Unavailable {
op: "cancel_execution",
})
}
#[cfg(feature = "core")]
async fn change_priority(
&self,
_args: ChangePriorityArgs,
) -> Result<ChangePriorityResult, EngineError> {
Err(EngineError::Unavailable {
op: "change_priority",
})
}
#[cfg(feature = "core")]
async fn replay_execution(
&self,
_args: ReplayExecutionArgs,
) -> Result<ReplayExecutionResult, EngineError> {
Err(EngineError::Unavailable {
op: "replay_execution",
})
}
#[cfg(feature = "core")]
async fn revoke_lease(
&self,
_args: RevokeLeaseArgs,
) -> Result<RevokeLeaseResult, EngineError> {
Err(EngineError::Unavailable { op: "revoke_lease" })
}
#[cfg(feature = "core")]
async fn complete_execution(
&self,
_args: CompleteExecutionArgs,
) -> Result<CompleteExecutionResult, EngineError> {
Err(EngineError::Unavailable {
op: "complete_execution",
})
}
#[cfg(feature = "core")]
async fn fail_execution(
&self,
_args: FailExecutionArgs,
) -> Result<FailExecutionResult, EngineError> {
Err(EngineError::Unavailable {
op: "fail_execution",
})
}
#[cfg(feature = "core")]
async fn renew_lease(
&self,
_args: RenewLeaseArgs,
) -> Result<RenewLeaseResult, EngineError> {
Err(EngineError::Unavailable { op: "renew_lease" })
}
#[cfg(feature = "core")]
async fn resume_execution(
&self,
_args: ResumeExecutionArgs,
) -> Result<ResumeExecutionResult, EngineError> {
Err(EngineError::Unavailable {
op: "resume_execution",
})
}
#[cfg(feature = "core")]
async fn check_admission(
&self,
_quota_policy_id: &crate::types::QuotaPolicyId,
_dimension: &str,
_args: CheckAdmissionArgs,
) -> Result<CheckAdmissionResult, EngineError> {
Err(EngineError::Unavailable {
op: "check_admission",
})
}
#[cfg(feature = "core")]
async fn evaluate_flow_eligibility(
&self,
_args: EvaluateFlowEligibilityArgs,
) -> Result<EvaluateFlowEligibilityResult, EngineError> {
Err(EngineError::Unavailable {
op: "evaluate_flow_eligibility",
})
}
#[cfg(feature = "core")]
async fn record_spend(
&self,
_args: RecordSpendArgs,
) -> Result<ReportUsageResult, EngineError> {
Err(EngineError::Unavailable {
op: "record_spend",
})
}
#[cfg(feature = "core")]
async fn release_budget(
&self,
_args: ReleaseBudgetArgs,
) -> Result<(), EngineError> {
Err(EngineError::Unavailable {
op: "release_budget",
})
}
#[cfg(feature = "core")]
async fn deliver_approval_signal(
&self,
_args: DeliverApprovalSignalArgs,
) -> Result<DeliverSignalResult, EngineError> {
Err(EngineError::Unavailable {
op: "deliver_approval_signal",
})
}
#[cfg(feature = "core")]
async fn issue_grant_and_claim(
&self,
_args: IssueGrantAndClaimArgs,
) -> Result<ClaimGrantOutcome, EngineError> {
Err(EngineError::Unavailable {
op: "issue_grant_and_claim",
})
}
#[cfg(feature = "core")]
async fn create_budget(
&self,
_args: CreateBudgetArgs,
) -> Result<CreateBudgetResult, EngineError> {
Err(EngineError::Unavailable {
op: "create_budget",
})
}
#[cfg(feature = "core")]
async fn reset_budget(
&self,
_args: ResetBudgetArgs,
) -> Result<ResetBudgetResult, EngineError> {
Err(EngineError::Unavailable { op: "reset_budget" })
}
#[cfg(feature = "core")]
async fn create_quota_policy(
&self,
_args: CreateQuotaPolicyArgs,
) -> Result<CreateQuotaPolicyResult, EngineError> {
Err(EngineError::Unavailable {
op: "create_quota_policy",
})
}
#[cfg(feature = "core")]
async fn get_budget_status(
&self,
_id: &BudgetId,
) -> Result<BudgetStatus, EngineError> {
Err(EngineError::Unavailable {
op: "get_budget_status",
})
}
#[cfg(feature = "core")]
async fn report_usage_admin(
&self,
_budget: &BudgetId,
_args: ReportUsageAdminArgs,
) -> Result<ReportUsageResult, EngineError> {
Err(EngineError::Unavailable {
op: "report_usage_admin",
})
}
async fn get_execution_result(
&self,
_id: &ExecutionId,
) -> Result<Option<Vec<u8>>, EngineError> {
Err(EngineError::Unavailable {
op: "get_execution_result",
})
}
#[cfg(feature = "core")]
async fn list_pending_waitpoints(
&self,
_args: ListPendingWaitpointsArgs,
) -> Result<ListPendingWaitpointsResult, EngineError> {
Err(EngineError::Unavailable {
op: "list_pending_waitpoints",
})
}
async fn ping(&self) -> Result<(), EngineError> {
Err(EngineError::Unavailable { op: "ping" })
}
#[cfg(feature = "core")]
async fn claim_for_worker(
&self,
_args: ClaimForWorkerArgs,
) -> Result<ClaimForWorkerOutcome, EngineError> {
Err(EngineError::Unavailable {
op: "claim_for_worker",
})
}
fn backend_label(&self) -> &'static str {
"unknown"
}
fn as_any(&self) -> &(dyn std::any::Any + 'static) {
&()
}
fn capabilities(&self) -> crate::capability::Capabilities {
crate::capability::Capabilities::new(
crate::capability::BackendIdentity::new(
"unknown",
crate::capability::Version::new(0, 0, 0),
"unknown",
),
crate::capability::Supports::none(),
)
}
async fn prepare(&self) -> Result<PrepareOutcome, EngineError> {
Ok(PrepareOutcome::NoOp)
}
async fn shutdown_prepare(&self, _grace: Duration) -> Result<(), EngineError> {
Ok(())
}
#[cfg(feature = "core")]
async fn cancel_flow_header(
&self,
_args: CancelFlowArgs,
) -> Result<crate::contracts::CancelFlowHeader, EngineError> {
Err(EngineError::Unavailable {
op: "cancel_flow_header",
})
}
#[cfg(feature = "core")]
async fn ack_cancel_member(
&self,
_flow_id: &FlowId,
_execution_id: &ExecutionId,
) -> Result<(), EngineError> {
Err(EngineError::Unavailable {
op: "ack_cancel_member",
})
}
#[cfg(feature = "core")]
async fn read_execution_info(
&self,
_id: &ExecutionId,
) -> Result<Option<ExecutionInfo>, EngineError> {
Err(EngineError::Unavailable {
op: "read_execution_info",
})
}
#[cfg(feature = "core")]
async fn read_execution_state(
&self,
_id: &ExecutionId,
) -> Result<Option<PublicState>, EngineError> {
Err(EngineError::Unavailable {
op: "read_execution_state",
})
}
async fn subscribe_lease_history(
&self,
_cursor: crate::stream_subscribe::StreamCursor,
_filter: &crate::backend::ScannerFilter,
) -> Result<crate::stream_events::LeaseHistorySubscription, EngineError> {
Err(EngineError::Unavailable {
op: "subscribe_lease_history",
})
}
async fn subscribe_completion(
&self,
_cursor: crate::stream_subscribe::StreamCursor,
_filter: &crate::backend::ScannerFilter,
) -> Result<crate::stream_events::CompletionSubscription, EngineError> {
Err(EngineError::Unavailable {
op: "subscribe_completion",
})
}
async fn subscribe_signal_delivery(
&self,
_cursor: crate::stream_subscribe::StreamCursor,
_filter: &crate::backend::ScannerFilter,
) -> Result<crate::stream_events::SignalDeliverySubscription, EngineError> {
Err(EngineError::Unavailable {
op: "subscribe_signal_delivery",
})
}
async fn subscribe_instance_tags(
&self,
_cursor: crate::stream_subscribe::StreamCursor,
) -> Result<crate::stream_events::InstanceTagSubscription, EngineError> {
Err(EngineError::Unavailable {
op: "subscribe_instance_tags",
})
}
async fn mark_lease_expired_if_due(
&self,
_partition: crate::partition::Partition,
_execution_id: &ExecutionId,
) -> Result<(), EngineError> {
Err(EngineError::Unavailable {
op: "mark_lease_expired_if_due",
})
}
async fn promote_delayed(
&self,
_partition: crate::partition::Partition,
_lane: &LaneId,
_execution_id: &ExecutionId,
_now_ms: TimestampMs,
) -> Result<(), EngineError> {
Err(EngineError::Unavailable {
op: "promote_delayed",
})
}
async fn close_waitpoint(
&self,
_partition: crate::partition::Partition,
_execution_id: &ExecutionId,
_waitpoint_id: &str,
_now_ms: TimestampMs,
) -> Result<(), EngineError> {
Err(EngineError::Unavailable {
op: "close_waitpoint",
})
}
async fn expire_execution(
&self,
_partition: crate::partition::Partition,
_execution_id: &ExecutionId,
_phase: ExpirePhase,
_now_ms: TimestampMs,
) -> Result<(), EngineError> {
Err(EngineError::Unavailable {
op: "expire_execution",
})
}
async fn expire_suspension(
&self,
_partition: crate::partition::Partition,
_execution_id: &ExecutionId,
_now_ms: TimestampMs,
) -> Result<(), EngineError> {
Err(EngineError::Unavailable {
op: "expire_suspension",
})
}
async fn unblock_execution(
&self,
_partition: crate::partition::Partition,
_lane_id: &LaneId,
_execution_id: &ExecutionId,
_expected_blocking_reason: &str,
_now_ms: TimestampMs,
) -> Result<(), EngineError> {
Err(EngineError::Unavailable {
op: "unblock_execution",
})
}
#[cfg(feature = "core")]
async fn drain_sibling_cancel_group(
&self,
_flow_partition: crate::partition::Partition,
_flow_id: &FlowId,
_downstream_eid: &ExecutionId,
) -> Result<(), EngineError> {
Err(EngineError::Unavailable {
op: "drain_sibling_cancel_group",
})
}
#[cfg(feature = "core")]
async fn reconcile_sibling_cancel_group(
&self,
_flow_partition: crate::partition::Partition,
_flow_id: &FlowId,
_downstream_eid: &ExecutionId,
) -> Result<SiblingCancelReconcileAction, EngineError> {
Err(EngineError::Unavailable {
op: "reconcile_sibling_cancel_group",
})
}
async fn reconcile_execution_index(
&self,
_partition: crate::partition::Partition,
_lanes: &[LaneId],
_filter: &crate::backend::ScannerFilter,
) -> Result<ReconcileCounts, EngineError> {
Err(EngineError::Unavailable {
op: "reconcile_execution_index",
})
}
async fn reconcile_budget_counters(
&self,
_partition: crate::partition::Partition,
_now_ms: TimestampMs,
) -> Result<ReconcileCounts, EngineError> {
Err(EngineError::Unavailable {
op: "reconcile_budget_counters",
})
}
async fn reconcile_quota_counters(
&self,
_partition: crate::partition::Partition,
_now_ms: TimestampMs,
) -> Result<ReconcileCounts, EngineError> {
Err(EngineError::Unavailable {
op: "reconcile_quota_counters",
})
}
#[cfg(feature = "core")]
async fn project_flow_summary(
&self,
_partition: crate::partition::Partition,
_flow_id: &FlowId,
_now_ms: TimestampMs,
) -> Result<bool, EngineError> {
Err(EngineError::Unavailable {
op: "project_flow_summary",
})
}
#[cfg(feature = "core")]
async fn trim_retention(
&self,
_partition: crate::partition::Partition,
_lane_id: &LaneId,
_retention_ms: u64,
_now_ms: TimestampMs,
_batch_size: u32,
_filter: &crate::backend::ScannerFilter,
) -> Result<u32, EngineError> {
Err(EngineError::Unavailable {
op: "trim_retention",
})
}
#[cfg(feature = "core")]
async fn read_exec_core_fields(
&self,
_partition: crate::partition::Partition,
_execution_id: &crate::types::ExecutionId,
_fields: &[&str],
) -> Result<std::collections::HashMap<String, Option<String>>, EngineError> {
Err(EngineError::Unavailable {
op: "read_exec_core_fields",
})
}
#[cfg(feature = "core")]
async fn server_time_ms(&self) -> Result<u64, EngineError> {
use std::time::{SystemTime, UNIX_EPOCH};
let d = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| EngineError::Transport {
backend: "system-clock",
source: format!("server_time_ms default: {e}").into(),
})?;
Ok(d.as_millis() as u64)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SiblingCancelReconcileAction {
SremmedStale,
CompletedDrain,
NoOp,
}
impl SiblingCancelReconcileAction {
pub fn as_str(&self) -> &'static str {
match self {
Self::SremmedStale => "sremmed_stale",
Self::CompletedDrain => "completed_drain",
Self::NoOp => "no_op",
}
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct ReconcileCounts {
pub processed: u32,
pub errors: u32,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ExpirePhase {
AttemptTimeout,
ExecutionDeadline,
}
impl ExpirePhase {
pub fn as_str(&self) -> &'static str {
match self {
Self::AttemptTimeout => "attempt_timeout",
Self::ExecutionDeadline => "execution_deadline",
}
}
}
#[allow(dead_code)]
fn _assert_dyn_compatible(_: &dyn EngineBackend) {}
const CANCEL_WAIT_POLL_INTERVAL: Duration = Duration::from_millis(100);
const CANCEL_WAIT_INDEFINITE_CEILING: Duration = Duration::from_secs(300);
pub async fn wait_for_flow_cancellation<B: EngineBackend + ?Sized>(
backend: &B,
flow_id: &crate::types::FlowId,
deadline: Duration,
) -> Result<(), EngineError> {
let start = std::time::Instant::now();
loop {
match backend.describe_flow(flow_id).await? {
Some(snap) if snap.public_flow_state == "cancelled" => return Ok(()),
None => return Ok(()),
Some(_) => {}
}
if start.elapsed() >= deadline {
return Err(EngineError::Timeout {
op: "cancel_flow",
elapsed: deadline,
});
}
tokio::time::sleep(CANCEL_WAIT_POLL_INTERVAL).await;
}
}
pub fn cancel_flow_wait_deadline(wait: CancelFlowWait) -> Option<Duration> {
match wait {
CancelFlowWait::NoWait => None,
CancelFlowWait::WaitTimeout(d) => Some(d),
CancelFlowWait::WaitIndefinite => Some(CANCEL_WAIT_INDEFINITE_CEILING),
}
}
#[allow(clippy::result_large_err)]
pub fn validate_tag_key(key: &str) -> Result<(), EngineError> {
use crate::engine_error::ValidationKind;
let bad = || EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!(
"invalid tag key: {key:?} (must match ^[a-z][a-z0-9_]*\\.[a-z0-9_][a-z0-9_.]*$)"
),
};
let mut chars = key.chars();
let first = chars.next().ok_or_else(bad)?;
if !first.is_ascii_lowercase() {
return Err(bad());
}
let mut saw_dot = false;
for c in chars.by_ref() {
if c == '.' {
saw_dot = true;
break;
}
if !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') {
return Err(bad());
}
}
if !saw_dot {
return Err(bad());
}
let second = chars.next().ok_or_else(bad)?;
if !(second.is_ascii_lowercase() || second.is_ascii_digit() || second == '_') {
return Err(bad());
}
for c in chars {
if !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_' || c == '.') {
return Err(bad());
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
struct DefaultBackend;
#[async_trait]
impl EngineBackend for DefaultBackend {
async fn claim(
&self,
_lane: &LaneId,
_capabilities: &CapabilitySet,
_policy: ClaimPolicy,
) -> Result<Option<Handle>, EngineError> {
unreachable!()
}
async fn renew(&self, _handle: &Handle) -> Result<LeaseRenewal, EngineError> {
unreachable!()
}
async fn progress(
&self,
_handle: &Handle,
_percent: Option<u8>,
_message: Option<String>,
) -> Result<(), EngineError> {
unreachable!()
}
async fn append_frame(
&self,
_handle: &Handle,
_frame: Frame,
) -> Result<AppendFrameOutcome, EngineError> {
unreachable!()
}
async fn complete(
&self,
_handle: &Handle,
_payload: Option<Vec<u8>>,
) -> Result<(), EngineError> {
unreachable!()
}
async fn fail(
&self,
_handle: &Handle,
_reason: FailureReason,
_classification: FailureClass,
) -> Result<FailOutcome, EngineError> {
unreachable!()
}
async fn cancel(&self, _handle: &Handle, _reason: &str) -> Result<(), EngineError> {
unreachable!()
}
async fn suspend(
&self,
_handle: &Handle,
_args: SuspendArgs,
) -> Result<SuspendOutcome, EngineError> {
unreachable!()
}
async fn create_waitpoint(
&self,
_handle: &Handle,
_waitpoint_key: &str,
_expires_in: Duration,
) -> Result<PendingWaitpoint, EngineError> {
unreachable!()
}
async fn observe_signals(
&self,
_handle: &Handle,
) -> Result<Vec<ResumeSignal>, EngineError> {
unreachable!()
}
async fn claim_from_resume_grant(
&self,
_token: ResumeToken,
) -> Result<Option<Handle>, EngineError> {
unreachable!()
}
async fn delay(
&self,
_handle: &Handle,
_delay_until: TimestampMs,
) -> Result<(), EngineError> {
unreachable!()
}
async fn wait_children(&self, _handle: &Handle) -> Result<(), EngineError> {
unreachable!()
}
async fn describe_execution(
&self,
_id: &ExecutionId,
) -> Result<Option<ExecutionSnapshot>, EngineError> {
unreachable!()
}
async fn read_execution_context(
&self,
_execution_id: &ExecutionId,
) -> Result<ExecutionContext, EngineError> {
Ok(ExecutionContext::new(
Vec::new(),
String::new(),
std::collections::HashMap::new(),
))
}
async fn read_current_attempt_index(
&self,
_execution_id: &ExecutionId,
) -> Result<AttemptIndex, EngineError> {
Ok(AttemptIndex::new(0))
}
async fn read_total_attempt_count(
&self,
_execution_id: &ExecutionId,
) -> Result<AttemptIndex, EngineError> {
Ok(AttemptIndex::new(0))
}
async fn describe_flow(
&self,
_id: &FlowId,
) -> Result<Option<FlowSnapshot>, EngineError> {
unreachable!()
}
#[cfg(feature = "core")]
async fn list_edges(
&self,
_flow_id: &FlowId,
_direction: EdgeDirection,
) -> Result<Vec<EdgeSnapshot>, EngineError> {
unreachable!()
}
#[cfg(feature = "core")]
async fn describe_edge(
&self,
_flow_id: &FlowId,
_edge_id: &EdgeId,
) -> Result<Option<EdgeSnapshot>, EngineError> {
unreachable!()
}
#[cfg(feature = "core")]
async fn resolve_execution_flow_id(
&self,
_eid: &ExecutionId,
) -> Result<Option<FlowId>, EngineError> {
unreachable!()
}
#[cfg(feature = "core")]
async fn list_flows(
&self,
_partition: PartitionKey,
_cursor: Option<FlowId>,
_limit: usize,
) -> Result<ListFlowsPage, EngineError> {
unreachable!()
}
#[cfg(feature = "core")]
async fn list_lanes(
&self,
_cursor: Option<LaneId>,
_limit: usize,
) -> Result<ListLanesPage, EngineError> {
unreachable!()
}
#[cfg(feature = "core")]
async fn list_suspended(
&self,
_partition: PartitionKey,
_cursor: Option<ExecutionId>,
_limit: usize,
) -> Result<ListSuspendedPage, EngineError> {
unreachable!()
}
#[cfg(feature = "core")]
async fn list_executions(
&self,
_partition: PartitionKey,
_cursor: Option<ExecutionId>,
_limit: usize,
) -> Result<ListExecutionsPage, EngineError> {
unreachable!()
}
#[cfg(feature = "core")]
async fn deliver_signal(
&self,
_args: DeliverSignalArgs,
) -> Result<DeliverSignalResult, EngineError> {
unreachable!()
}
#[cfg(feature = "core")]
async fn claim_resumed_execution(
&self,
_args: ClaimResumedExecutionArgs,
) -> Result<ClaimResumedExecutionResult, EngineError> {
unreachable!()
}
async fn cancel_flow(
&self,
_id: &FlowId,
_policy: CancelFlowPolicy,
_wait: CancelFlowWait,
) -> Result<CancelFlowResult, EngineError> {
unreachable!()
}
#[cfg(feature = "core")]
async fn set_edge_group_policy(
&self,
_flow_id: &FlowId,
_downstream_execution_id: &ExecutionId,
_policy: crate::contracts::EdgeDependencyPolicy,
) -> Result<crate::contracts::SetEdgeGroupPolicyResult, EngineError> {
unreachable!()
}
async fn report_usage(
&self,
_handle: &Handle,
_budget: &BudgetId,
_dimensions: crate::backend::UsageDimensions,
) -> Result<ReportUsageResult, EngineError> {
unreachable!()
}
#[cfg(feature = "streaming")]
async fn read_stream(
&self,
_execution_id: &ExecutionId,
_attempt_index: AttemptIndex,
_from: StreamCursor,
_to: StreamCursor,
_count_limit: u64,
) -> Result<StreamFrames, EngineError> {
unreachable!()
}
#[cfg(feature = "streaming")]
async fn tail_stream(
&self,
_execution_id: &ExecutionId,
_attempt_index: AttemptIndex,
_after: StreamCursor,
_block_ms: u64,
_count_limit: u64,
_visibility: TailVisibility,
) -> Result<StreamFrames, EngineError> {
unreachable!()
}
#[cfg(feature = "streaming")]
async fn read_summary(
&self,
_execution_id: &ExecutionId,
_attempt_index: AttemptIndex,
) -> Result<Option<SummaryDocument>, EngineError> {
unreachable!()
}
}
#[test]
fn default_capabilities_is_unknown_family_all_false() {
let b = DefaultBackend;
let caps = b.capabilities();
assert_eq!(caps.identity.family, "unknown");
assert_eq!(
caps.identity.version,
crate::capability::Version::new(0, 0, 0)
);
assert_eq!(caps.identity.rfc017_stage, "unknown");
assert_eq!(caps.supports, crate::capability::Supports::none());
}
#[cfg(feature = "core")]
#[tokio::test]
async fn default_resolve_dependency_is_unavailable() {
use crate::contracts::ResolveDependencyArgs;
use crate::partition::{Partition, PartitionFamily};
use crate::types::{AttemptIndex, EdgeId, FlowId, LaneId};
let b = DefaultBackend;
let partition = Partition {
family: PartitionFamily::Flow,
index: 0,
};
let args = ResolveDependencyArgs::new(
partition,
FlowId::parse("11111111-1111-1111-1111-111111111111").unwrap(),
ExecutionId::parse("{fp:0}:22222222-2222-2222-2222-222222222222").unwrap(),
ExecutionId::parse("{fp:0}:33333333-3333-3333-3333-333333333333").unwrap(),
EdgeId::parse("44444444-4444-4444-4444-444444444444").unwrap(),
LaneId::new("default"),
AttemptIndex::new(0),
"success".to_owned(),
TimestampMs::now(),
);
match b.resolve_dependency(args).await {
Err(EngineError::Unavailable { op }) => {
assert_eq!(op, "resolve_dependency");
}
other => panic!("expected Unavailable, got {other:?}"),
}
}
#[cfg(feature = "core")]
#[tokio::test]
async fn default_cascade_completion_is_unavailable() {
use crate::backend::CompletionPayload;
let b = DefaultBackend;
let eid = ExecutionId::parse(
"{fp:0}:66666666-6666-6666-6666-666666666666",
)
.unwrap();
let payload = CompletionPayload::new(eid, "success", None, TimestampMs::now());
match b.cascade_completion(&payload).await {
Err(EngineError::Unavailable { op }) => {
assert_eq!(op, "cascade_completion");
}
other => panic!("expected Unavailable, got {other:?}"),
}
}
#[cfg(feature = "core")]
#[tokio::test]
async fn default_unblock_execution_is_unavailable() {
use crate::partition::{Partition, PartitionFamily};
let b = DefaultBackend;
let partition = Partition {
family: PartitionFamily::Execution,
index: 0,
};
let eid = ExecutionId::parse(
"{fp:0}:55555555-5555-5555-5555-555555555555",
)
.unwrap();
let lane = LaneId::new("default");
match b
.unblock_execution(
partition,
&lane,
&eid,
"waiting_for_budget",
TimestampMs::from_millis(0),
)
.await
{
Err(EngineError::Unavailable { op }) => {
assert_eq!(op, "unblock_execution");
}
other => panic!("expected Unavailable, got {other:?}"),
}
}
#[cfg(feature = "core")]
#[tokio::test]
async fn default_project_flow_summary_is_unavailable() {
use crate::partition::{Partition, PartitionFamily};
use crate::types::FlowId;
let b = DefaultBackend;
let partition = Partition {
family: PartitionFamily::Flow,
index: 0,
};
let fid = FlowId::parse("11111111-1111-1111-1111-111111111111").unwrap();
match b
.project_flow_summary(partition, &fid, TimestampMs::from_millis(0))
.await
{
Err(EngineError::Unavailable { op }) => {
assert_eq!(op, "project_flow_summary");
}
other => panic!("expected Unavailable, got {other:?}"),
}
}
#[cfg(feature = "core")]
#[tokio::test]
async fn default_trim_retention_is_unavailable() {
use crate::partition::{Partition, PartitionFamily};
let b = DefaultBackend;
let partition = Partition {
family: PartitionFamily::Execution,
index: 0,
};
let lane = LaneId::new("default");
match b
.trim_retention(
partition,
&lane,
60_000,
TimestampMs::from_millis(0),
20,
&crate::backend::ScannerFilter::NOOP,
)
.await
{
Err(EngineError::Unavailable { op }) => {
assert_eq!(op, "trim_retention");
}
other => panic!("expected Unavailable, got {other:?}"),
}
}
#[cfg(feature = "core")]
#[tokio::test]
async fn default_read_exec_core_fields_is_unavailable() {
use crate::partition::{Partition, PartitionFamily};
let b = DefaultBackend;
let partition = Partition {
family: PartitionFamily::Execution,
index: 0,
};
let eid = ExecutionId::parse(
"{fp:0}:66666666-6666-6666-6666-666666666666",
)
.unwrap();
match b
.read_exec_core_fields(partition, &eid, &["lane_id"])
.await
{
Err(EngineError::Unavailable { op }) => {
assert_eq!(op, "read_exec_core_fields");
}
other => panic!("expected Unavailable, got {other:?}"),
}
}
#[test]
fn validate_tag_key_accepts_valid() {
for k in [
"cairn.session_id",
"cairn.project",
"a.b",
"a1_2.x",
"app.sub.field",
"x.y_z",
] {
validate_tag_key(k).unwrap_or_else(|e| panic!("{k:?} should pass: {e:?}"));
}
}
#[test]
fn validate_tag_key_rejects_invalid() {
for k in [
"", "Cairn.x", "1cairn.x", "cairn", "cairn.", "cairn..x", ".cairn", "cair n.x", "ca-irn.x", "cairn.Foo", "cairn.foo bar", "cairn.foo\"bar", "cairn.foo-bar", "cairn.foo\\bar", ] {
let err = validate_tag_key(k)
.err()
.unwrap_or_else(|| panic!("{k:?} should fail"));
match err {
EngineError::Validation {
kind: crate::engine_error::ValidationKind::InvalidInput,
..
} => {}
other => panic!("{k:?}: unexpected err {other:?}"),
}
}
}
#[cfg(feature = "core")]
#[tokio::test]
async fn default_complete_execution_is_unavailable() {
use crate::contracts::CompleteExecutionArgs;
use crate::types::{ExecutionId, FlowId};
let b = DefaultBackend;
let config = crate::partition::PartitionConfig::default();
let eid = ExecutionId::for_flow(&FlowId::new(), &config);
let args = CompleteExecutionArgs {
execution_id: eid,
fence: None,
attempt_index: AttemptIndex::new(0),
result_payload: None,
result_encoding: None,
source: crate::types::CancelSource::default(),
now: TimestampMs::from_millis(0),
};
match b.complete_execution(args).await.unwrap_err() {
EngineError::Unavailable { op } => assert_eq!(op, "complete_execution"),
other => panic!("expected Unavailable, got {other:?}"),
}
}
#[cfg(feature = "core")]
#[tokio::test]
async fn default_fail_execution_is_unavailable() {
use crate::contracts::FailExecutionArgs;
use crate::types::{ExecutionId, FlowId};
let b = DefaultBackend;
let config = crate::partition::PartitionConfig::default();
let eid = ExecutionId::for_flow(&FlowId::new(), &config);
let args = FailExecutionArgs {
execution_id: eid,
fence: None,
attempt_index: AttemptIndex::new(0),
failure_reason: String::new(),
failure_category: String::new(),
retry_policy_json: String::new(),
next_attempt_policy_json: String::new(),
source: crate::types::CancelSource::default(),
};
match b.fail_execution(args).await.unwrap_err() {
EngineError::Unavailable { op } => assert_eq!(op, "fail_execution"),
other => panic!("expected Unavailable, got {other:?}"),
}
}
#[cfg(feature = "core")]
#[tokio::test]
async fn default_renew_lease_is_unavailable() {
use crate::contracts::RenewLeaseArgs;
use crate::types::{ExecutionId, FlowId};
let b = DefaultBackend;
let config = crate::partition::PartitionConfig::default();
let eid = ExecutionId::for_flow(&FlowId::new(), &config);
let args = RenewLeaseArgs {
execution_id: eid,
attempt_index: AttemptIndex::new(0),
fence: None,
lease_ttl_ms: 1_000,
lease_history_grace_ms: 60_000,
};
match b.renew_lease(args).await.unwrap_err() {
EngineError::Unavailable { op } => assert_eq!(op, "renew_lease"),
other => panic!("expected Unavailable, got {other:?}"),
}
}
#[cfg(feature = "core")]
#[tokio::test]
async fn default_resume_execution_is_unavailable() {
use crate::contracts::ResumeExecutionArgs;
use crate::types::{ExecutionId, FlowId};
let b = DefaultBackend;
let config = crate::partition::PartitionConfig::default();
let eid = ExecutionId::for_flow(&FlowId::new(), &config);
let args = ResumeExecutionArgs {
execution_id: eid,
trigger_type: "signal".to_owned(),
resume_delay_ms: 0,
};
match b.resume_execution(args).await.unwrap_err() {
EngineError::Unavailable { op } => assert_eq!(op, "resume_execution"),
other => panic!("expected Unavailable, got {other:?}"),
}
}
#[cfg(feature = "core")]
#[tokio::test]
async fn default_check_admission_is_unavailable() {
use crate::contracts::CheckAdmissionArgs;
use crate::types::{ExecutionId, FlowId, QuotaPolicyId};
let b = DefaultBackend;
let config = crate::partition::PartitionConfig::default();
let eid = ExecutionId::for_flow(&FlowId::new(), &config);
let args = CheckAdmissionArgs {
execution_id: eid,
now: TimestampMs::from_millis(0),
window_seconds: 60,
rate_limit: 10,
concurrency_cap: 1,
jitter_ms: None,
};
let qid = QuotaPolicyId::new();
match b
.check_admission(&qid, "default", args)
.await
.unwrap_err()
{
EngineError::Unavailable { op } => assert_eq!(op, "check_admission"),
other => panic!("expected Unavailable, got {other:?}"),
}
}
#[cfg(feature = "core")]
#[tokio::test]
async fn default_evaluate_flow_eligibility_is_unavailable() {
use crate::contracts::EvaluateFlowEligibilityArgs;
use crate::types::{ExecutionId, FlowId};
let b = DefaultBackend;
let config = crate::partition::PartitionConfig::default();
let eid = ExecutionId::for_flow(&FlowId::new(), &config);
let args = EvaluateFlowEligibilityArgs { execution_id: eid };
match b.evaluate_flow_eligibility(args).await.unwrap_err() {
EngineError::Unavailable { op } => assert_eq!(op, "evaluate_flow_eligibility"),
other => panic!("expected Unavailable, got {other:?}"),
}
}
#[cfg(feature = "core")]
#[tokio::test]
async fn default_reconcile_execution_index_is_unavailable() {
use crate::backend::ScannerFilter;
use crate::partition::{Partition, PartitionFamily};
let b = DefaultBackend;
let partition = Partition { family: PartitionFamily::Execution, index: 0 };
let lanes = [LaneId::new("default")];
let filter = ScannerFilter::default();
match b
.reconcile_execution_index(partition, &lanes, &filter)
.await
.unwrap_err()
{
EngineError::Unavailable { op } => assert_eq!(op, "reconcile_execution_index"),
other => panic!("expected Unavailable, got {other:?}"),
}
}
#[cfg(feature = "core")]
#[tokio::test]
async fn default_reconcile_budget_counters_is_unavailable() {
use crate::partition::{Partition, PartitionFamily};
let b = DefaultBackend;
let partition = Partition { family: PartitionFamily::Budget, index: 0 };
match b
.reconcile_budget_counters(partition, TimestampMs::from_millis(0))
.await
.unwrap_err()
{
EngineError::Unavailable { op } => assert_eq!(op, "reconcile_budget_counters"),
other => panic!("expected Unavailable, got {other:?}"),
}
}
#[cfg(feature = "core")]
#[tokio::test]
async fn default_reconcile_quota_counters_is_unavailable() {
use crate::partition::{Partition, PartitionFamily};
let b = DefaultBackend;
let partition = Partition { family: PartitionFamily::Quota, index: 0 };
match b
.reconcile_quota_counters(partition, TimestampMs::from_millis(0))
.await
.unwrap_err()
{
EngineError::Unavailable { op } => assert_eq!(op, "reconcile_quota_counters"),
other => panic!("expected Unavailable, got {other:?}"),
}
}
#[cfg(feature = "core")]
#[tokio::test]
async fn default_record_spend_is_unavailable() {
use crate::contracts::RecordSpendArgs;
use crate::types::{BudgetId, ExecutionId, FlowId};
let b = DefaultBackend;
let config = crate::partition::PartitionConfig::default();
let eid = ExecutionId::for_flow(&FlowId::new(), &config);
let args = RecordSpendArgs::new(
BudgetId::new(),
eid,
std::collections::BTreeMap::new(),
"k",
);
match b.record_spend(args).await.unwrap_err() {
EngineError::Unavailable { op } => assert_eq!(op, "record_spend"),
other => panic!("expected Unavailable, got {other:?}"),
}
}
#[cfg(feature = "core")]
#[tokio::test]
async fn default_release_budget_is_unavailable() {
use crate::contracts::ReleaseBudgetArgs;
use crate::types::{BudgetId, ExecutionId, FlowId};
let b = DefaultBackend;
let config = crate::partition::PartitionConfig::default();
let eid = ExecutionId::for_flow(&FlowId::new(), &config);
let args = ReleaseBudgetArgs::new(BudgetId::new(), eid);
match b.release_budget(args).await.unwrap_err() {
EngineError::Unavailable { op } => assert_eq!(op, "release_budget"),
other => panic!("expected Unavailable, got {other:?}"),
}
}
#[cfg(feature = "core")]
#[tokio::test]
async fn default_deliver_approval_signal_is_unavailable() {
use crate::contracts::DeliverApprovalSignalArgs;
use crate::types::{ExecutionId, FlowId, LaneId, WaitpointId};
let b = DefaultBackend;
let config = crate::partition::PartitionConfig::default();
let eid = ExecutionId::for_flow(&FlowId::new(), &config);
let args = DeliverApprovalSignalArgs::new(
eid,
LaneId::new("default"),
WaitpointId::new(),
"approved",
"sfx",
1_000,
Some(100),
Some(10),
);
match b.deliver_approval_signal(args).await.unwrap_err() {
EngineError::Unavailable { op } => assert_eq!(op, "deliver_approval_signal"),
other => panic!("expected Unavailable, got {other:?}"),
}
}
#[cfg(feature = "core")]
#[tokio::test]
async fn default_issue_grant_and_claim_is_unavailable() {
use crate::contracts::IssueGrantAndClaimArgs;
use crate::types::{ExecutionId, FlowId, LaneId};
let b = DefaultBackend;
let config = crate::partition::PartitionConfig::default();
let eid = ExecutionId::for_flow(&FlowId::new(), &config);
let args = IssueGrantAndClaimArgs::new(eid, LaneId::new("default"), 30_000);
match b.issue_grant_and_claim(args).await.unwrap_err() {
EngineError::Unavailable { op } => assert_eq!(op, "issue_grant_and_claim"),
other => panic!("expected Unavailable, got {other:?}"),
}
}
}