use std::time::Duration;
use async_trait::async_trait;
use crate::backend::{
AppendFrameOutcome, CancelFlowPolicy, CancelFlowWait, CapabilitySet, ClaimPolicy,
FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal, PendingWaitpoint,
ReclaimToken, ResumeSignal, SummaryDocument, TailVisibility,
};
use crate::contracts::{
CancelFlowResult, ExecutionSnapshot, FlowSnapshot, ReportUsageResult,
RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretAllResult, SuspendArgs,
SuspendOutcome,
};
#[cfg(feature = "core")]
use crate::contracts::{
AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
ApplyDependencyToChildResult, BudgetStatus, CancelExecutionArgs, CancelExecutionResult,
CancelFlowArgs, ChangePriorityArgs, ChangePriorityResult, ClaimForWorkerArgs,
ClaimForWorkerOutcome, ClaimResumedExecutionArgs, ClaimResumedExecutionResult,
CreateBudgetArgs, CreateBudgetResult, CreateExecutionArgs, CreateExecutionResult,
CreateFlowArgs, CreateFlowResult, CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
DeliverSignalArgs, DeliverSignalResult, EdgeDirection, EdgeSnapshot, ExecutionInfo,
ListExecutionsPage, ListFlowsPage, ListLanesPage, ListPendingWaitpointsArgs,
ListPendingWaitpointsResult, ListSuspendedPage, ReplayExecutionArgs, ReplayExecutionResult,
ReportUsageAdminArgs, ResetBudgetArgs, ResetBudgetResult, RevokeLeaseArgs, RevokeLeaseResult,
StageDependencyEdgeArgs, StageDependencyEdgeResult,
};
#[cfg(feature = "core")]
use crate::state::PublicState;
#[cfg(feature = "core")]
use crate::partition::PartitionKey;
#[cfg(feature = "streaming")]
use crate::contracts::{StreamCursor, StreamFrames};
use crate::engine_error::EngineError;
#[cfg(feature = "streaming")]
use crate::types::AttemptIndex;
#[cfg(feature = "core")]
use crate::types::EdgeId;
use crate::types::{BudgetId, ExecutionId, FlowId, LaneId, TimestampMs};
#[async_trait]
pub trait EngineBackend: Send + Sync + 'static {
async fn claim(
&self,
lane: &LaneId,
capabilities: &CapabilitySet,
policy: ClaimPolicy,
) -> Result<Option<Handle>, EngineError>;
async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError>;
async fn progress(
&self,
handle: &Handle,
percent: Option<u8>,
message: Option<String>,
) -> Result<(), EngineError>;
async fn append_frame(
&self,
handle: &Handle,
frame: Frame,
) -> Result<AppendFrameOutcome, EngineError>;
async fn complete(&self, handle: &Handle, payload: Option<Vec<u8>>) -> Result<(), EngineError>;
async fn fail(
&self,
handle: &Handle,
reason: FailureReason,
classification: FailureClass,
) -> Result<FailOutcome, EngineError>;
async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError>;
async fn suspend(
&self,
handle: &Handle,
args: SuspendArgs,
) -> Result<SuspendOutcome, EngineError>;
async fn create_waitpoint(
&self,
handle: &Handle,
waitpoint_key: &str,
expires_in: Duration,
) -> Result<PendingWaitpoint, EngineError>;
async fn observe_signals(&self, handle: &Handle) -> Result<Vec<ResumeSignal>, EngineError>;
async fn claim_from_reclaim(&self, token: ReclaimToken) -> Result<Option<Handle>, EngineError>;
async fn delay(&self, handle: &Handle, delay_until: TimestampMs) -> Result<(), EngineError>;
async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError>;
async fn describe_execution(
&self,
id: &ExecutionId,
) -> Result<Option<ExecutionSnapshot>, EngineError>;
async fn describe_flow(&self, id: &FlowId) -> Result<Option<FlowSnapshot>, EngineError>;
#[cfg(feature = "core")]
async fn list_edges(
&self,
flow_id: &FlowId,
direction: EdgeDirection,
) -> Result<Vec<EdgeSnapshot>, EngineError>;
#[cfg(feature = "core")]
async fn describe_edge(
&self,
flow_id: &FlowId,
edge_id: &EdgeId,
) -> Result<Option<EdgeSnapshot>, EngineError>;
#[cfg(feature = "core")]
async fn resolve_execution_flow_id(
&self,
eid: &ExecutionId,
) -> Result<Option<FlowId>, EngineError>;
#[cfg(feature = "core")]
async fn list_flows(
&self,
partition: PartitionKey,
cursor: Option<FlowId>,
limit: usize,
) -> Result<ListFlowsPage, EngineError>;
#[cfg(feature = "core")]
async fn list_lanes(
&self,
cursor: Option<LaneId>,
limit: usize,
) -> Result<ListLanesPage, EngineError>;
#[cfg(feature = "core")]
async fn list_suspended(
&self,
partition: PartitionKey,
cursor: Option<ExecutionId>,
limit: usize,
) -> Result<ListSuspendedPage, EngineError>;
#[cfg(feature = "core")]
async fn list_executions(
&self,
partition: PartitionKey,
cursor: Option<ExecutionId>,
limit: usize,
) -> Result<ListExecutionsPage, EngineError>;
#[cfg(feature = "core")]
async fn deliver_signal(
&self,
args: DeliverSignalArgs,
) -> Result<DeliverSignalResult, EngineError>;
#[cfg(feature = "core")]
async fn claim_resumed_execution(
&self,
args: ClaimResumedExecutionArgs,
) -> Result<ClaimResumedExecutionResult, EngineError>;
async fn cancel_flow(
&self,
id: &FlowId,
policy: CancelFlowPolicy,
wait: CancelFlowWait,
) -> Result<CancelFlowResult, EngineError>;
#[cfg(feature = "core")]
async fn set_edge_group_policy(
&self,
flow_id: &FlowId,
downstream_execution_id: &ExecutionId,
policy: crate::contracts::EdgeDependencyPolicy,
) -> Result<crate::contracts::SetEdgeGroupPolicyResult, EngineError>;
async fn rotate_waitpoint_hmac_secret_all(
&self,
_args: RotateWaitpointHmacSecretAllArgs,
) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
Err(EngineError::Unavailable {
op: "rotate_waitpoint_hmac_secret_all",
})
}
async fn report_usage(
&self,
handle: &Handle,
budget: &BudgetId,
dimensions: crate::backend::UsageDimensions,
) -> Result<ReportUsageResult, EngineError>;
#[cfg(feature = "streaming")]
async fn read_stream(
&self,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
from: StreamCursor,
to: StreamCursor,
count_limit: u64,
) -> Result<StreamFrames, EngineError>;
#[cfg(feature = "streaming")]
async fn tail_stream(
&self,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
after: StreamCursor,
block_ms: u64,
count_limit: u64,
visibility: TailVisibility,
) -> Result<StreamFrames, EngineError>;
#[cfg(feature = "streaming")]
async fn read_summary(
&self,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
) -> Result<Option<SummaryDocument>, EngineError>;
#[cfg(feature = "core")]
async fn create_execution(
&self,
_args: CreateExecutionArgs,
) -> Result<CreateExecutionResult, EngineError> {
Err(EngineError::Unavailable {
op: "create_execution",
})
}
#[cfg(feature = "core")]
async fn create_flow(
&self,
_args: CreateFlowArgs,
) -> Result<CreateFlowResult, EngineError> {
Err(EngineError::Unavailable { op: "create_flow" })
}
#[cfg(feature = "core")]
async fn add_execution_to_flow(
&self,
_args: AddExecutionToFlowArgs,
) -> Result<AddExecutionToFlowResult, EngineError> {
Err(EngineError::Unavailable {
op: "add_execution_to_flow",
})
}
#[cfg(feature = "core")]
async fn stage_dependency_edge(
&self,
_args: StageDependencyEdgeArgs,
) -> Result<StageDependencyEdgeResult, EngineError> {
Err(EngineError::Unavailable {
op: "stage_dependency_edge",
})
}
#[cfg(feature = "core")]
async fn apply_dependency_to_child(
&self,
_args: ApplyDependencyToChildArgs,
) -> Result<ApplyDependencyToChildResult, EngineError> {
Err(EngineError::Unavailable {
op: "apply_dependency_to_child",
})
}
#[cfg(feature = "core")]
async fn cancel_execution(
&self,
_args: CancelExecutionArgs,
) -> Result<CancelExecutionResult, EngineError> {
Err(EngineError::Unavailable {
op: "cancel_execution",
})
}
#[cfg(feature = "core")]
async fn change_priority(
&self,
_args: ChangePriorityArgs,
) -> Result<ChangePriorityResult, EngineError> {
Err(EngineError::Unavailable {
op: "change_priority",
})
}
#[cfg(feature = "core")]
async fn replay_execution(
&self,
_args: ReplayExecutionArgs,
) -> Result<ReplayExecutionResult, EngineError> {
Err(EngineError::Unavailable {
op: "replay_execution",
})
}
#[cfg(feature = "core")]
async fn revoke_lease(
&self,
_args: RevokeLeaseArgs,
) -> Result<RevokeLeaseResult, EngineError> {
Err(EngineError::Unavailable { op: "revoke_lease" })
}
#[cfg(feature = "core")]
async fn create_budget(
&self,
_args: CreateBudgetArgs,
) -> Result<CreateBudgetResult, EngineError> {
Err(EngineError::Unavailable {
op: "create_budget",
})
}
#[cfg(feature = "core")]
async fn reset_budget(
&self,
_args: ResetBudgetArgs,
) -> Result<ResetBudgetResult, EngineError> {
Err(EngineError::Unavailable { op: "reset_budget" })
}
#[cfg(feature = "core")]
async fn create_quota_policy(
&self,
_args: CreateQuotaPolicyArgs,
) -> Result<CreateQuotaPolicyResult, EngineError> {
Err(EngineError::Unavailable {
op: "create_quota_policy",
})
}
#[cfg(feature = "core")]
async fn get_budget_status(
&self,
_id: &BudgetId,
) -> Result<BudgetStatus, EngineError> {
Err(EngineError::Unavailable {
op: "get_budget_status",
})
}
#[cfg(feature = "core")]
async fn report_usage_admin(
&self,
_budget: &BudgetId,
_args: ReportUsageAdminArgs,
) -> Result<ReportUsageResult, EngineError> {
Err(EngineError::Unavailable {
op: "report_usage_admin",
})
}
async fn get_execution_result(
&self,
_id: &ExecutionId,
) -> Result<Option<Vec<u8>>, EngineError> {
Err(EngineError::Unavailable {
op: "get_execution_result",
})
}
#[cfg(feature = "core")]
async fn list_pending_waitpoints(
&self,
_args: ListPendingWaitpointsArgs,
) -> Result<ListPendingWaitpointsResult, EngineError> {
Err(EngineError::Unavailable {
op: "list_pending_waitpoints",
})
}
async fn ping(&self) -> Result<(), EngineError> {
Err(EngineError::Unavailable { op: "ping" })
}
#[cfg(feature = "core")]
async fn claim_for_worker(
&self,
_args: ClaimForWorkerArgs,
) -> Result<ClaimForWorkerOutcome, EngineError> {
Err(EngineError::Unavailable {
op: "claim_for_worker",
})
}
fn backend_label(&self) -> &'static str {
"unknown"
}
async fn shutdown_prepare(&self, _grace: Duration) -> Result<(), EngineError> {
Ok(())
}
#[cfg(feature = "core")]
async fn cancel_flow_header(
&self,
_args: CancelFlowArgs,
) -> Result<crate::contracts::CancelFlowHeader, EngineError> {
Err(EngineError::Unavailable {
op: "cancel_flow_header",
})
}
#[cfg(feature = "core")]
async fn ack_cancel_member(
&self,
_flow_id: &FlowId,
_execution_id: &ExecutionId,
) -> Result<(), EngineError> {
Err(EngineError::Unavailable {
op: "ack_cancel_member",
})
}
#[cfg(feature = "core")]
async fn read_execution_info(
&self,
_id: &ExecutionId,
) -> Result<Option<ExecutionInfo>, EngineError> {
Err(EngineError::Unavailable {
op: "read_execution_info",
})
}
#[cfg(feature = "core")]
async fn read_execution_state(
&self,
_id: &ExecutionId,
) -> Result<Option<PublicState>, EngineError> {
Err(EngineError::Unavailable {
op: "read_execution_state",
})
}
}
#[allow(dead_code)]
fn _assert_dyn_compatible(_: &dyn EngineBackend) {}