use std::sync::Arc;
use std::time::{Duration, Instant};
use async_trait::async_trait;
use ff_core::backend::{
AppendFrameOutcome, CancelFlowPolicy, CancelFlowWait, CapabilitySet, ClaimPolicy, FailOutcome,
FailureClass, FailureReason, Frame, Handle, LeaseRenewal, PendingWaitpoint, ReclaimToken,
ResumeSignal,
};
use ff_core::contracts::{
CancelFlowResult, ClaimResumedExecutionArgs, ClaimResumedExecutionResult, DeliverSignalArgs,
DeliverSignalResult, EdgeDirection, EdgeSnapshot, ExecutionSnapshot, FlowSnapshot,
ListExecutionsPage, ListFlowsPage, ListLanesPage, ListSuspendedPage, ReportUsageResult,
RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretAllResult, SuspendArgs,
SuspendOutcome,
};
use ff_core::partition::PartitionKey;
#[cfg(feature = "valkey-default")]
use ff_core::contracts::{StreamCursor, StreamFrames};
use ff_core::engine_backend::EngineBackend;
use ff_core::engine_error::EngineError;
#[cfg(feature = "valkey-default")]
use ff_core::types::AttemptIndex;
use ff_core::types::{BudgetId, EdgeId, ExecutionId, FlowId, LaneId, LeaseFence, TimestampMs};
#[derive(Clone, Copy, Debug)]
pub enum HookOutcome<'a> {
Ok,
Err(&'a EngineError),
}
pub enum Admit {
Proceed,
Reject(Box<EngineError>),
}
impl Admit {
pub fn reject(err: EngineError) -> Self {
Self::Reject(Box::new(err))
}
}
pub trait LayerHooks: Send + Sync + 'static {
fn before(&self, method_name: &'static str) -> Admit;
fn after(&self, method_name: &'static str, elapsed: Duration, outcome: HookOutcome<'_>);
}
pub struct HookedBackend<H: LayerHooks> {
pub(crate) inner: Arc<dyn EngineBackend>,
pub(crate) hooks: H,
}
impl<H: LayerHooks> HookedBackend<H> {
pub(crate) fn new(inner: Arc<dyn EngineBackend>, hooks: H) -> Self {
Self { inner, hooks }
}
}
macro_rules! with_hooks {
($self:ident, $method_name:literal, $delegate:expr) => {{
match $self.hooks.before($method_name) {
Admit::Proceed => {
let start = Instant::now();
let result = $delegate;
let elapsed = start.elapsed();
let hook_outcome = match &result {
Ok(_) => HookOutcome::Ok,
Err(e) => HookOutcome::Err(e),
};
$self.hooks.after($method_name, elapsed, hook_outcome);
result
}
Admit::Reject(err) => {
let elapsed = Duration::ZERO;
$self
.hooks
.after($method_name, elapsed, HookOutcome::Err(&err));
Err(*err)
}
}
}};
}
#[async_trait]
impl<H: LayerHooks> EngineBackend for HookedBackend<H> {
async fn claim(
&self,
lane: &LaneId,
capabilities: &CapabilitySet,
policy: ClaimPolicy,
) -> Result<Option<Handle>, EngineError> {
with_hooks!(
self,
"claim",
self.inner.claim(lane, capabilities, policy).await
)
}
async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError> {
with_hooks!(self, "renew", self.inner.renew(handle).await)
}
async fn progress(
&self,
handle: &Handle,
percent: Option<u8>,
message: Option<String>,
) -> Result<(), EngineError> {
with_hooks!(
self,
"progress",
self.inner.progress(handle, percent, message).await
)
}
async fn append_frame(
&self,
handle: &Handle,
frame: Frame,
) -> Result<AppendFrameOutcome, EngineError> {
with_hooks!(
self,
"append_frame",
self.inner.append_frame(handle, frame).await
)
}
async fn complete(&self, handle: &Handle, payload: Option<Vec<u8>>) -> Result<(), EngineError> {
with_hooks!(self, "complete", self.inner.complete(handle, payload).await)
}
async fn fail(
&self,
handle: &Handle,
reason: FailureReason,
classification: FailureClass,
) -> Result<FailOutcome, EngineError> {
with_hooks!(
self,
"fail",
self.inner.fail(handle, reason, classification).await
)
}
async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError> {
with_hooks!(self, "cancel", self.inner.cancel(handle, reason).await)
}
async fn suspend(
&self,
handle: &Handle,
args: SuspendArgs,
) -> Result<SuspendOutcome, EngineError> {
with_hooks!(self, "suspend", self.inner.suspend(handle, args).await)
}
async fn suspend_by_triple(
&self,
exec_id: ExecutionId,
triple: LeaseFence,
args: SuspendArgs,
) -> Result<SuspendOutcome, EngineError> {
with_hooks!(
self,
"suspend_by_triple",
self.inner.suspend_by_triple(exec_id, triple, args).await
)
}
async fn create_waitpoint(
&self,
handle: &Handle,
waitpoint_key: &str,
expires_in: Duration,
) -> Result<PendingWaitpoint, EngineError> {
with_hooks!(
self,
"create_waitpoint",
self.inner
.create_waitpoint(handle, waitpoint_key, expires_in)
.await
)
}
async fn observe_signals(&self, handle: &Handle) -> Result<Vec<ResumeSignal>, EngineError> {
with_hooks!(
self,
"observe_signals",
self.inner.observe_signals(handle).await
)
}
async fn claim_from_reclaim(&self, token: ReclaimToken) -> Result<Option<Handle>, EngineError> {
with_hooks!(
self,
"claim_from_reclaim",
self.inner.claim_from_reclaim(token).await
)
}
async fn delay(&self, handle: &Handle, delay_until: TimestampMs) -> Result<(), EngineError> {
with_hooks!(self, "delay", self.inner.delay(handle, delay_until).await)
}
async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError> {
with_hooks!(
self,
"wait_children",
self.inner.wait_children(handle).await
)
}
async fn describe_execution(
&self,
id: &ExecutionId,
) -> Result<Option<ExecutionSnapshot>, EngineError> {
with_hooks!(
self,
"describe_execution",
self.inner.describe_execution(id).await
)
}
async fn describe_flow(&self, id: &FlowId) -> Result<Option<FlowSnapshot>, EngineError> {
with_hooks!(self, "describe_flow", self.inner.describe_flow(id).await)
}
async fn list_edges(
&self,
flow_id: &FlowId,
direction: EdgeDirection,
) -> Result<Vec<EdgeSnapshot>, EngineError> {
with_hooks!(
self,
"list_edges",
self.inner.list_edges(flow_id, direction).await
)
}
async fn describe_edge(
&self,
flow_id: &FlowId,
edge_id: &EdgeId,
) -> Result<Option<EdgeSnapshot>, EngineError> {
with_hooks!(
self,
"describe_edge",
self.inner.describe_edge(flow_id, edge_id).await
)
}
async fn resolve_execution_flow_id(
&self,
eid: &ExecutionId,
) -> Result<Option<FlowId>, EngineError> {
with_hooks!(
self,
"resolve_execution_flow_id",
self.inner.resolve_execution_flow_id(eid).await
)
}
async fn list_flows(
&self,
partition: ff_core::partition::PartitionKey,
cursor: Option<FlowId>,
limit: usize,
) -> Result<ListFlowsPage, EngineError> {
with_hooks!(
self,
"list_flows",
self.inner.list_flows(partition, cursor, limit).await
)
}
async fn list_lanes(
&self,
cursor: Option<LaneId>,
limit: usize,
) -> Result<ListLanesPage, EngineError> {
with_hooks!(
self,
"list_lanes",
self.inner.list_lanes(cursor, limit).await
)
}
async fn list_suspended(
&self,
partition: PartitionKey,
cursor: Option<ExecutionId>,
limit: usize,
) -> Result<ListSuspendedPage, EngineError> {
with_hooks!(
self,
"list_suspended",
self.inner.list_suspended(partition, cursor, limit).await
)
}
async fn cancel_flow(
&self,
id: &FlowId,
policy: CancelFlowPolicy,
wait: CancelFlowWait,
) -> Result<CancelFlowResult, EngineError> {
with_hooks!(
self,
"cancel_flow",
self.inner.cancel_flow(id, policy, wait).await
)
}
async fn set_edge_group_policy(
&self,
flow_id: &FlowId,
downstream_execution_id: &ff_core::types::ExecutionId,
policy: ff_core::contracts::EdgeDependencyPolicy,
) -> Result<ff_core::contracts::SetEdgeGroupPolicyResult, EngineError> {
with_hooks!(
self,
"set_edge_group_policy",
self.inner
.set_edge_group_policy(flow_id, downstream_execution_id, policy)
.await
)
}
async fn report_usage(
&self,
handle: &Handle,
budget: &BudgetId,
dimensions: ff_core::backend::UsageDimensions,
) -> Result<ReportUsageResult, EngineError> {
with_hooks!(
self,
"report_usage",
self.inner.report_usage(handle, budget, dimensions).await
)
}
async fn rotate_waitpoint_hmac_secret_all(
&self,
args: RotateWaitpointHmacSecretAllArgs,
) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
with_hooks!(
self,
"rotate_waitpoint_hmac_secret_all",
self.inner.rotate_waitpoint_hmac_secret_all(args).await
)
}
async fn list_executions(
&self,
partition: PartitionKey,
cursor: Option<ExecutionId>,
limit: usize,
) -> Result<ListExecutionsPage, EngineError> {
with_hooks!(
self,
"list_executions",
self.inner.list_executions(partition, cursor, limit).await
)
}
async fn deliver_signal(
&self,
args: DeliverSignalArgs,
) -> Result<DeliverSignalResult, EngineError> {
with_hooks!(
self,
"deliver_signal",
self.inner.deliver_signal(args).await
)
}
async fn claim_resumed_execution(
&self,
args: ClaimResumedExecutionArgs,
) -> Result<ClaimResumedExecutionResult, EngineError> {
with_hooks!(
self,
"claim_resumed_execution",
self.inner.claim_resumed_execution(args).await
)
}
#[cfg(feature = "valkey-default")]
async fn read_stream(
&self,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
from: StreamCursor,
to: StreamCursor,
count_limit: u64,
) -> Result<StreamFrames, EngineError> {
with_hooks!(
self,
"read_stream",
self.inner
.read_stream(execution_id, attempt_index, from, to, count_limit)
.await
)
}
#[cfg(feature = "valkey-default")]
async fn tail_stream(
&self,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
after: StreamCursor,
block_ms: u64,
count_limit: u64,
visibility: ff_core::backend::TailVisibility,
) -> Result<StreamFrames, EngineError> {
with_hooks!(
self,
"tail_stream",
self.inner
.tail_stream(
execution_id,
attempt_index,
after,
block_ms,
count_limit,
visibility,
)
.await
)
}
#[cfg(feature = "valkey-default")]
async fn read_summary(
&self,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
) -> Result<Option<ff_core::backend::SummaryDocument>, EngineError> {
with_hooks!(
self,
"read_summary",
self.inner.read_summary(execution_id, attempt_index).await
)
}
}