use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use async_trait::async_trait;
use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
use sqlx::{Row, SqlitePool};
use uuid::Uuid;
use ff_core::backend::PrepareOutcome;
use ff_core::backend::{
AppendFrameOutcome, CancelFlowPolicy, CancelFlowWait, CapabilitySet, ClaimPolicy, FailOutcome,
FailureClass, FailureReason, Frame, FrameKind, Handle, HandleKind, LeaseRenewal, PatchKind,
PendingWaitpoint, ResumeToken, ResumeSignal, SUMMARY_NULL_SENTINEL, StreamMode,
UsageDimensions,
};
#[cfg(feature = "streaming")]
use ff_core::backend::{SummaryDocument, TailVisibility};
use ff_core::capability::{BackendIdentity, Capabilities, Supports, Version};
use ff_core::caps::{CapabilityRequirement, matches as caps_matches};
use ff_core::contracts::{
BudgetStatus, CancelFlowResult, CreateBudgetArgs, CreateBudgetResult, CreateQuotaPolicyArgs,
CreateQuotaPolicyResult, ExecutionContext, ExecutionSnapshot, FlowSnapshot, IssueReclaimGrantArgs,
IssueReclaimGrantOutcome, ReclaimExecutionArgs, ReclaimExecutionOutcome, ReportUsageAdminArgs,
ReportUsageResult, ResetBudgetArgs, ResetBudgetResult, RotateWaitpointHmacSecretAllArgs,
RotateWaitpointHmacSecretAllResult, SeedOutcome, SeedWaitpointHmacSecretArgs, SuspendArgs,
SuspendOutcome,
};
#[cfg(feature = "core")]
use ff_core::contracts::{
ClaimResumedExecutionArgs, ClaimResumedExecutionResult, DeliverSignalArgs, DeliverSignalResult,
EdgeDependencyPolicy, EdgeDirection, EdgeSnapshot, ListExecutionsPage, ListFlowsPage,
ListLanesPage, ListSuspendedPage, SetEdgeGroupPolicyResult,
};
#[cfg(feature = "streaming")]
use ff_core::contracts::{STREAM_READ_HARD_CAP, StreamCursor, StreamFrame, StreamFrames};
use ff_core::engine_backend::EngineBackend;
use ff_core::engine_error::{BackendError, ContentionKind, EngineError, ValidationKind};
use ff_core::handle_codec::HandlePayload;
use ff_core::types::{AttemptId, AttemptIndex, LeaseEpoch, LeaseFence, LeaseId};
use crate::errors::map_sqlx_error;
use crate::handle_codec::{decode_handle, encode_handle};
use crate::queries::{
attempt as q_attempt, dispatch as q_dispatch, exec_core as q_exec, flow as q_flow,
flow_staging as q_flow_staging, lease as q_lease, stream as q_stream,
};
use crate::retry::retry_serializable;
#[cfg(feature = "core")]
use ff_core::partition::PartitionKey;
#[cfg(feature = "core")]
use ff_core::types::EdgeId;
use ff_core::types::{BudgetId, ExecutionId, FlowId, LaneId, TimestampMs};
use crate::pubsub::{OutboxEvent, PubSub};
use crate::registry;
#[cfg(feature = "core")]
use ff_core::contracts::{
AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
ApplyDependencyToChildResult, CancelExecutionArgs, CancelExecutionResult, CancelFlowArgs,
CancelFlowHeader, ChangePriorityArgs, ChangePriorityResult, CreateExecutionArgs,
CreateExecutionResult, CreateFlowArgs, CreateFlowResult, ExecutionInfo,
ListPendingWaitpointsArgs, ListPendingWaitpointsResult, ReplayExecutionArgs,
ReplayExecutionResult, RevokeLeaseArgs, RevokeLeaseResult, StageDependencyEdgeArgs,
StageDependencyEdgeResult,
};
#[cfg(feature = "core")]
use ff_core::state::PublicState;
use tokio::sync::broadcast;
#[inline]
fn unavailable<T>(op: &'static str) -> Result<T, EngineError> {
Err(EngineError::Unavailable { op })
}
#[derive(Clone, Copy, Debug)]
pub(crate) enum OutboxChannel {
LeaseHistory,
Completion,
#[allow(dead_code)] SignalDelivery,
StreamFrame,
#[allow(dead_code)] OperatorEvent,
}
pub(crate) type PendingEmit = (OutboxChannel, OutboxEvent);
fn dispatch_pending_emits(pubsub: &PubSub, emits: &[PendingEmit]) {
for (channel, ev) in emits {
let sender: &broadcast::Sender<OutboxEvent> = match channel {
OutboxChannel::LeaseHistory => &pubsub.lease_history,
OutboxChannel::Completion => &pubsub.completion,
OutboxChannel::SignalDelivery => &pubsub.signal_delivery,
OutboxChannel::StreamFrame => &pubsub.stream_frame,
OutboxChannel::OperatorEvent => &pubsub.operator_event,
};
PubSub::emit(sender, ev.clone());
}
}
async fn last_outbox_event(
conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
partition_key: i64,
) -> Result<OutboxEvent, EngineError> {
let event_id: i64 = sqlx::query_scalar("SELECT last_insert_rowid()")
.fetch_one(&mut **conn)
.await
.map_err(map_sqlx_error)?;
Ok(OutboxEvent {
event_id,
partition_key,
})
}
fn is_memory_uri(path: &str) -> bool {
if path == ":memory:" || path.starts_with("file::memory:") {
return true;
}
if !path.starts_with("file:") {
return false;
}
let Some(query_start) = path.find('?') else {
return false;
};
let query = &path[query_start + 1..];
let query = query.split('#').next().unwrap_or("");
query.split('&').any(|kv| kv == "mode=memory")
}
fn now_ms() -> i64 {
i64::try_from(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis())
.unwrap_or(0),
)
.unwrap_or(i64::MAX)
}
pub(crate) fn split_exec_id(eid: &ff_core::types::ExecutionId) -> Result<(i64, Uuid), EngineError> {
let s = eid.as_str();
let rest = s
.strip_prefix("{fp:")
.ok_or_else(|| EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("execution_id missing `{{fp:` prefix: {s}"),
})?;
let close = rest.find("}:").ok_or_else(|| EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("execution_id missing `}}:`: {s}"),
})?;
let part: i64 = rest[..close].parse().map_err(|_| EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("execution_id partition index not u16: {s}"),
})?;
let uuid = Uuid::parse_str(&rest[close + 2..]).map_err(|_| EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("execution_id UUID invalid: {s}"),
})?;
Ok((part, uuid))
}
pub(crate) async fn begin_immediate(
pool: &SqlitePool,
) -> Result<sqlx::pool::PoolConnection<sqlx::Sqlite>, EngineError> {
let mut conn = pool.acquire().await.map_err(map_sqlx_error)?;
sqlx::query("BEGIN IMMEDIATE")
.execute(&mut *conn)
.await
.map_err(map_sqlx_error)?;
Ok(conn)
}
pub(crate) async fn commit_or_rollback(
conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
) -> Result<(), EngineError> {
if let Err(e) = sqlx::query("COMMIT")
.execute(&mut **conn)
.await
.map_err(map_sqlx_error)
{
let _ = sqlx::query("ROLLBACK").execute(&mut **conn).await;
return Err(e);
}
Ok(())
}
pub(crate) async fn rollback_quiet(conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>) {
let _ = sqlx::query("ROLLBACK").execute(&mut **conn).await;
}
async fn fence_check(
conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
part: i64,
exec_uuid: Uuid,
attempt_index: i64,
expected_epoch: u64,
) -> Result<(), EngineError> {
let row = sqlx::query(q_attempt::SELECT_ATTEMPT_EPOCH_SQL)
.bind(part)
.bind(exec_uuid)
.bind(attempt_index)
.fetch_optional(&mut **conn)
.await
.map_err(map_sqlx_error)?;
let Some(row) = row else {
return Err(EngineError::NotFound { entity: "attempt" });
};
let epoch_i: i64 = row.try_get("lease_epoch").map_err(map_sqlx_error)?;
let observed = u64::try_from(epoch_i).unwrap_or(0);
if observed != expected_epoch {
return Err(EngineError::Contention(ContentionKind::LeaseConflict));
}
Ok(())
}
async fn claim_impl(
pool: &SqlitePool,
pubsub: &PubSub,
lane: &ff_core::types::LaneId,
capabilities: &CapabilitySet,
policy: &ClaimPolicy,
) -> Result<Option<Handle>, EngineError> {
let part: i64 = 0;
let mut conn = begin_immediate(pool).await?;
let result = claim_inner(&mut conn, part, lane, capabilities, policy).await;
match result {
Ok(Some((handle, emits))) => {
commit_or_rollback(&mut conn).await?;
dispatch_pending_emits(pubsub, &emits);
Ok(Some(handle))
}
Ok(None) => {
rollback_quiet(&mut conn).await;
Ok(None)
}
Err(e) => {
rollback_quiet(&mut conn).await;
Err(e)
}
}
}
async fn claim_inner(
conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
part: i64,
lane: &ff_core::types::LaneId,
capabilities: &CapabilitySet,
policy: &ClaimPolicy,
) -> Result<Option<(Handle, Vec<PendingEmit>)>, EngineError> {
const CAP_SCAN_BATCH: i64 = 16;
let candidate_rows = sqlx::query(q_attempt::SELECT_ELIGIBLE_EXEC_SQL)
.bind(part)
.bind(lane.as_str())
.bind(CAP_SCAN_BATCH)
.fetch_all(&mut **conn)
.await
.map_err(map_sqlx_error)?;
if candidate_rows.is_empty() {
return Ok(None);
}
let mut claimable: Option<(Uuid, i64)> = None;
for row in &candidate_rows {
let exec_uuid: Uuid = row.try_get("execution_id").map_err(map_sqlx_error)?;
let attempt_index_i: i64 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
let cap_rows = sqlx::query(q_attempt::SELECT_EXEC_CAPABILITIES_SQL)
.bind(exec_uuid)
.fetch_all(&mut **conn)
.await
.map_err(map_sqlx_error)?;
let tokens: Vec<String> = cap_rows
.iter()
.map(|r| r.try_get::<String, _>("capability"))
.collect::<Result<Vec<_>, _>>()
.map_err(map_sqlx_error)?;
let req = CapabilityRequirement::new(tokens);
if caps_matches(&req, capabilities) {
claimable = Some((exec_uuid, attempt_index_i));
break;
}
}
let Some((exec_uuid, attempt_index_i)) = claimable else {
return Ok(None);
};
let now = now_ms();
let lease_ttl_ms = i64::from(policy.lease_ttl_ms);
let expires = now.saturating_add(lease_ttl_ms);
let epoch_row = sqlx::query(q_attempt::UPSERT_ATTEMPT_ON_CLAIM_SQL)
.bind(part)
.bind(exec_uuid)
.bind(attempt_index_i)
.bind(policy.worker_id.as_str())
.bind(policy.worker_instance_id.as_str())
.bind(expires)
.bind(now)
.fetch_one(&mut **conn)
.await
.map_err(map_sqlx_error)?;
let epoch_i: i64 = epoch_row.try_get("lease_epoch").map_err(map_sqlx_error)?;
sqlx::query(q_exec::UPDATE_EXEC_CORE_CLAIM_SQL)
.bind(part)
.bind(exec_uuid)
.bind(now)
.execute(&mut **conn)
.await
.map_err(map_sqlx_error)?;
let mut emits: Vec<PendingEmit> = Vec::new();
let ev = insert_lease_event(conn, part, exec_uuid, "acquired", now).await?;
emits.push((OutboxChannel::LeaseHistory, ev));
let attempt_index = AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0));
let exec_id = ff_core::types::ExecutionId::parse(&format!("{{fp:{part}}}:{exec_uuid}"))
.map_err(|e| EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("reassembling exec id: {e}"),
})?;
let payload = HandlePayload::new(
exec_id,
attempt_index,
AttemptId::new(),
LeaseId::new(),
LeaseEpoch(u64::try_from(epoch_i).unwrap_or(1)),
u64::from(policy.lease_ttl_ms),
lane.clone(),
policy.worker_instance_id.clone(),
);
Ok(Some((encode_handle(&payload, HandleKind::Fresh), emits)))
}
async fn complete_impl(
pool: &SqlitePool,
pubsub: &PubSub,
handle: &Handle,
payload_bytes: Option<Vec<u8>>,
) -> Result<(), EngineError> {
let payload = decode_handle(handle)?;
let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
let attempt_index = i64::from(payload.attempt_index.0);
let expected_epoch = payload.lease_epoch.0;
let mut conn = begin_immediate(pool).await?;
let result = complete_inner(
&mut conn,
part,
exec_uuid,
attempt_index,
expected_epoch,
payload_bytes,
)
.await;
match result {
Ok(emits) => {
commit_or_rollback(&mut conn).await?;
dispatch_pending_emits(pubsub, &emits);
Ok(())
}
Err(e) => {
rollback_quiet(&mut conn).await;
Err(e)
}
}
}
async fn complete_inner(
conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
part: i64,
exec_uuid: Uuid,
attempt_index: i64,
expected_epoch: u64,
payload_bytes: Option<Vec<u8>>,
) -> Result<Vec<PendingEmit>, EngineError> {
fence_check(conn, part, exec_uuid, attempt_index, expected_epoch).await?;
let now = now_ms();
sqlx::query(q_attempt::UPDATE_ATTEMPT_COMPLETE_SQL)
.bind(now)
.bind(part)
.bind(exec_uuid)
.bind(attempt_index)
.execute(&mut **conn)
.await
.map_err(map_sqlx_error)?;
sqlx::query(q_exec::UPDATE_EXEC_CORE_COMPLETE_SQL)
.bind(now)
.bind(payload_bytes.as_deref())
.bind(part)
.bind(exec_uuid)
.execute(&mut **conn)
.await
.map_err(map_sqlx_error)?;
let mut emits: Vec<PendingEmit> = Vec::new();
let completion_ev = insert_completion_event_ev(conn, part, exec_uuid, "success", now).await?;
emits.push((OutboxChannel::Completion, completion_ev));
let lease_ev = insert_lease_event(conn, part, exec_uuid, "revoked", now).await?;
emits.push((OutboxChannel::LeaseHistory, lease_ev));
Ok(emits)
}
fn classify_retryable(classification: FailureClass) -> bool {
match classification {
FailureClass::Transient | FailureClass::InfraCrash => true,
FailureClass::Permanent | FailureClass::Timeout | FailureClass::Cancelled => false,
_ => true,
}
}
async fn fail_impl(
pool: &SqlitePool,
pubsub: &PubSub,
handle: &Handle,
reason: FailureReason,
classification: FailureClass,
) -> Result<FailOutcome, EngineError> {
let payload = decode_handle(handle)?;
let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
let attempt_index = i64::from(payload.attempt_index.0);
let expected_epoch = payload.lease_epoch.0;
let retryable = classify_retryable(classification);
let mut conn = begin_immediate(pool).await?;
let result = fail_inner(
&mut conn,
part,
exec_uuid,
attempt_index,
expected_epoch,
retryable,
&reason,
classification,
)
.await;
match result {
Ok((outcome, emits)) => {
commit_or_rollback(&mut conn).await?;
dispatch_pending_emits(pubsub, &emits);
Ok(outcome)
}
Err(e) => {
rollback_quiet(&mut conn).await;
Err(e)
}
}
}
#[allow(clippy::too_many_arguments)] async fn fail_inner(
conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
part: i64,
exec_uuid: Uuid,
attempt_index: i64,
expected_epoch: u64,
retryable: bool,
reason: &FailureReason,
classification: FailureClass,
) -> Result<(FailOutcome, Vec<PendingEmit>), EngineError> {
fence_check(conn, part, exec_uuid, attempt_index, expected_epoch).await?;
let now = now_ms();
let mut emits: Vec<PendingEmit> = Vec::new();
if retryable {
sqlx::query(q_attempt::UPDATE_ATTEMPT_FAIL_RETRY_SQL)
.bind(now)
.bind(part)
.bind(exec_uuid)
.bind(attempt_index)
.execute(&mut **conn)
.await
.map_err(map_sqlx_error)?;
sqlx::query(q_exec::UPDATE_EXEC_CORE_FAIL_RETRY_SQL)
.bind(&reason.message)
.bind(part)
.bind(exec_uuid)
.execute(&mut **conn)
.await
.map_err(map_sqlx_error)?;
let lease_ev = insert_lease_event(conn, part, exec_uuid, "revoked", now).await?;
emits.push((OutboxChannel::LeaseHistory, lease_ev));
tracing::warn!(
error.message = %reason.message,
classification = ?classification,
execution_id = %exec_uuid,
attempt_index = attempt_index,
"sqlite.fail: scheduling retry"
);
Ok((
FailOutcome::RetryScheduled {
delay_until: ff_core::types::TimestampMs::from_millis(now),
},
emits,
))
} else {
sqlx::query(q_attempt::UPDATE_ATTEMPT_FAIL_TERMINAL_SQL)
.bind(now)
.bind(part)
.bind(exec_uuid)
.bind(attempt_index)
.execute(&mut **conn)
.await
.map_err(map_sqlx_error)?;
sqlx::query(q_exec::UPDATE_EXEC_CORE_FAIL_TERMINAL_SQL)
.bind(now)
.bind(&reason.message)
.bind(part)
.bind(exec_uuid)
.execute(&mut **conn)
.await
.map_err(map_sqlx_error)?;
let completion_ev =
insert_completion_event_ev(conn, part, exec_uuid, "failed", now).await?;
emits.push((OutboxChannel::Completion, completion_ev));
let lease_ev = insert_lease_event(conn, part, exec_uuid, "revoked", now).await?;
emits.push((OutboxChannel::LeaseHistory, lease_ev));
Ok((FailOutcome::TerminalFailed, emits))
}
}
pub(crate) async fn insert_lease_event(
conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
part: i64,
exec_uuid: Uuid,
event_type: &str,
now: i64,
) -> Result<OutboxEvent, EngineError> {
sqlx::query(q_dispatch::INSERT_LEASE_EVENT_SQL)
.bind(exec_uuid.to_string())
.bind(event_type)
.bind(now)
.bind(part)
.bind(exec_uuid)
.execute(&mut **conn)
.await
.map_err(map_sqlx_error)?;
last_outbox_event(conn, part).await
}
pub(crate) async fn insert_completion_event_ev(
conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
part: i64,
exec_uuid: Uuid,
outcome: &str,
now: i64,
) -> Result<OutboxEvent, EngineError> {
sqlx::query(q_attempt::INSERT_COMPLETION_EVENT_SQL)
.bind(outcome)
.bind(now)
.bind(part)
.bind(exec_uuid)
.execute(&mut **conn)
.await
.map_err(map_sqlx_error)?;
last_outbox_event(conn, part).await
}
async fn renew_impl(
pool: &SqlitePool,
pubsub: &PubSub,
handle: &Handle,
) -> Result<LeaseRenewal, EngineError> {
let payload = decode_handle(handle)?;
let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
let attempt_index = i64::from(payload.attempt_index.0);
let expected_epoch = payload.lease_epoch.0;
let lease_ttl_ms = i64::try_from(payload.lease_ttl_ms).unwrap_or(0);
let mut conn = begin_immediate(pool).await?;
let result = renew_inner(
&mut conn,
part,
exec_uuid,
attempt_index,
expected_epoch,
lease_ttl_ms,
)
.await;
match result {
Ok((renewal, emits)) => {
commit_or_rollback(&mut conn).await?;
dispatch_pending_emits(pubsub, &emits);
Ok(renewal)
}
Err(e) => {
rollback_quiet(&mut conn).await;
Err(e)
}
}
}
async fn renew_inner(
conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
part: i64,
exec_uuid: Uuid,
attempt_index: i64,
expected_epoch: u64,
lease_ttl_ms: i64,
) -> Result<(LeaseRenewal, Vec<PendingEmit>), EngineError> {
fence_check(conn, part, exec_uuid, attempt_index, expected_epoch).await?;
let now = now_ms();
let new_expires = now.saturating_add(lease_ttl_ms);
sqlx::query(q_lease::UPDATE_ATTEMPT_RENEW_SQL)
.bind(new_expires)
.bind(part)
.bind(exec_uuid)
.bind(attempt_index)
.execute(&mut **conn)
.await
.map_err(map_sqlx_error)?;
let ev = insert_lease_event(conn, part, exec_uuid, "renewed", now).await?;
let emits = vec![(OutboxChannel::LeaseHistory, ev)];
Ok((
LeaseRenewal::new(u64::try_from(new_expires).unwrap_or(0), expected_epoch),
emits,
))
}
async fn progress_impl(
pool: &SqlitePool,
handle: &Handle,
percent: Option<u8>,
message: Option<String>,
) -> Result<(), EngineError> {
let payload = decode_handle(handle)?;
let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
let attempt_index = i64::from(payload.attempt_index.0);
let expected_epoch = payload.lease_epoch.0;
let mut conn = begin_immediate(pool).await?;
let result = progress_inner(
&mut conn,
part,
exec_uuid,
attempt_index,
expected_epoch,
percent,
message,
)
.await;
match result {
Ok(()) => commit_or_rollback(&mut conn).await,
Err(e) => {
rollback_quiet(&mut conn).await;
Err(e)
}
}
}
async fn progress_inner(
conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
part: i64,
exec_uuid: Uuid,
attempt_index: i64,
expected_epoch: u64,
percent: Option<u8>,
message: Option<String>,
) -> Result<(), EngineError> {
fence_check(conn, part, exec_uuid, attempt_index, expected_epoch).await?;
sqlx::query(q_exec::UPDATE_EXEC_CORE_PROGRESS_SQL)
.bind(percent.map(i64::from))
.bind(message)
.bind(part)
.bind(exec_uuid)
.execute(&mut **conn)
.await
.map_err(map_sqlx_error)?;
Ok(())
}
fn apply_json_merge_patch(target: &mut serde_json::Value, patch: &serde_json::Value) {
use serde_json::Value;
if let Value::Object(patch_map) = patch {
if !target.is_object() {
*target = Value::Object(serde_json::Map::new());
}
let target_map = target.as_object_mut().expect("just ensured object");
for (k, v) in patch_map {
match v {
Value::Null => {
target_map.remove(k);
}
Value::String(s) if s == SUMMARY_NULL_SENTINEL => {
target_map.insert(k.clone(), Value::Null);
}
Value::Object(_) => {
let entry = target_map.entry(k.clone()).or_insert(Value::Null);
apply_json_merge_patch(entry, v);
}
other => {
target_map.insert(k.clone(), other.clone());
}
}
}
} else {
*target = patch.clone();
}
}
fn build_fields_json(frame: &Frame) -> String {
use serde_json::{Map, Value};
let payload_str = String::from_utf8_lossy(&frame.bytes).into_owned();
let mut map = Map::new();
let frame_type = if frame.frame_type.is_empty() {
match frame.kind {
FrameKind::Stdout => "stdout",
FrameKind::Stderr => "stderr",
FrameKind::Event => "event",
FrameKind::Blob => "blob",
_ => "event",
}
.to_owned()
} else {
frame.frame_type.clone()
};
map.insert("frame_type".into(), Value::String(frame_type));
map.insert("payload".into(), Value::String(payload_str));
map.insert("encoding".into(), Value::String("utf8".into()));
map.insert("source".into(), Value::String("worker".into()));
if let Some(corr) = &frame.correlation_id {
map.insert("correlation_id".into(), Value::String(corr.clone()));
}
Value::Object(map).to_string()
}
async fn append_frame_impl(
pool: &SqlitePool,
pubsub: &PubSub,
handle: &Handle,
frame: Frame,
) -> Result<AppendFrameOutcome, EngineError> {
let payload = decode_handle(handle)?;
let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
let attempt_index = i64::from(payload.attempt_index.0);
let expected_epoch = payload.lease_epoch.0;
let mut conn = begin_immediate(pool).await?;
let result = append_frame_inner(
&mut conn,
part,
exec_uuid,
attempt_index,
expected_epoch,
frame,
)
.await;
match result {
Ok((outcome, emits)) => {
commit_or_rollback(&mut conn).await?;
dispatch_pending_emits(pubsub, &emits);
Ok(outcome)
}
Err(e) => {
rollback_quiet(&mut conn).await;
Err(e)
}
}
}
async fn append_frame_inner(
conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
part: i64,
exec_uuid: Uuid,
attempt_index: i64,
expected_epoch: u64,
frame: Frame,
) -> Result<(AppendFrameOutcome, Vec<PendingEmit>), EngineError> {
fence_check(conn, part, exec_uuid, attempt_index, expected_epoch).await?;
let ts_ms = now_ms();
let mode_wire = frame.mode.wire_str();
let fields_text = build_fields_json(&frame);
let max_seq: Option<i64> = sqlx::query_scalar(q_stream::SELECT_MAX_SEQ_SQL)
.bind(part)
.bind(exec_uuid)
.bind(attempt_index)
.bind(ts_ms)
.fetch_one(&mut **conn)
.await
.map_err(map_sqlx_error)?;
let next_seq: i64 = max_seq.map(|s| s + 1).unwrap_or(0);
sqlx::query(q_stream::INSERT_STREAM_FRAME_SQL)
.bind(part)
.bind(exec_uuid)
.bind(attempt_index)
.bind(ts_ms)
.bind(next_seq)
.bind(&fields_text)
.bind(mode_wire)
.bind(ts_ms)
.execute(&mut **conn)
.await
.map_err(map_sqlx_error)?;
let stream_ev = last_outbox_event(conn, part).await?;
let emits: Vec<PendingEmit> = vec![(OutboxChannel::StreamFrame, stream_ev)];
let mut summary_version: Option<u64> = None;
if let StreamMode::DurableSummary { patch_kind } = &frame.mode {
let patch: serde_json::Value =
serde_json::from_slice(&frame.bytes).map_err(|e| EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("summary patch not valid JSON: {e}"),
})?;
let existing: Option<(String, i64)> = sqlx::query_as(q_stream::SELECT_STREAM_SUMMARY_SQL)
.bind(part)
.bind(exec_uuid)
.bind(attempt_index)
.fetch_optional(&mut **conn)
.await
.map_err(map_sqlx_error)?;
let (mut doc, prev_version): (serde_json::Value, i64) = match existing {
Some((text, v)) => {
let parsed: serde_json::Value =
serde_json::from_str(&text).map_err(|e| EngineError::Validation {
kind: ValidationKind::Corruption,
detail: format!("corrupt summary document in ff_stream_summary: {e}"),
})?;
(parsed, v)
}
None => (serde_json::Value::Object(serde_json::Map::new()), 0),
};
match patch_kind {
PatchKind::JsonMergePatch => apply_json_merge_patch(&mut doc, &patch),
_ => apply_json_merge_patch(&mut doc, &patch),
}
let new_version = prev_version + 1;
let patch_kind_wire = "json-merge-patch";
let doc_text = doc.to_string();
if prev_version == 0 {
sqlx::query(q_stream::INSERT_STREAM_SUMMARY_SQL)
.bind(part)
.bind(exec_uuid)
.bind(attempt_index)
.bind(&doc_text)
.bind(new_version)
.bind(patch_kind_wire)
.bind(ts_ms)
.bind(ts_ms)
.execute(&mut **conn)
.await
.map_err(map_sqlx_error)?;
} else {
sqlx::query(q_stream::UPDATE_STREAM_SUMMARY_SQL)
.bind(part)
.bind(exec_uuid)
.bind(attempt_index)
.bind(&doc_text)
.bind(new_version)
.bind(patch_kind_wire)
.bind(ts_ms)
.execute(&mut **conn)
.await
.map_err(map_sqlx_error)?;
}
summary_version = Some(u64::try_from(new_version).unwrap_or(0));
}
if let StreamMode::BestEffortLive { config } = &frame.mode {
let meta: Option<(f64, i64)> = sqlx::query_as(q_stream::SELECT_STREAM_META_SQL)
.bind(part)
.bind(exec_uuid)
.bind(attempt_index)
.fetch_optional(&mut **conn)
.await
.map_err(map_sqlx_error)?;
let (ema_prev, last_ts) = meta.unwrap_or((0.0, 0));
let inst_rate: f64 = if last_ts > 0 && ts_ms > last_ts {
1000.0 / ((ts_ms - last_ts) as f64)
} else {
0.0
};
let alpha = config.ema_alpha;
let ema_new = alpha * inst_rate + (1.0 - alpha) * ema_prev;
let k_raw = (ema_new * (f64::from(config.ttl_ms)) / 1000.0).ceil() as i64 * 2;
let k = k_raw
.max(i64::from(config.maxlen_floor))
.min(i64::from(config.maxlen_ceiling));
sqlx::query(q_stream::UPSERT_STREAM_META_SQL)
.bind(part)
.bind(exec_uuid)
.bind(attempt_index)
.bind(ema_new)
.bind(ts_ms)
.bind(k)
.execute(&mut **conn)
.await
.map_err(map_sqlx_error)?;
sqlx::query(q_stream::TRIM_STREAM_FRAMES_SQL)
.bind(part)
.bind(exec_uuid)
.bind(attempt_index)
.bind(k)
.execute(&mut **conn)
.await
.map_err(map_sqlx_error)?;
}
let frame_count: i64 = sqlx::query_scalar(q_stream::COUNT_STREAM_FRAMES_SQL)
.bind(part)
.bind(exec_uuid)
.bind(attempt_index)
.fetch_one(&mut **conn)
.await
.map_err(map_sqlx_error)?;
let stream_id = format!("{ts_ms}-{next_seq}");
let mut out = AppendFrameOutcome::new(stream_id, u64::try_from(frame_count).unwrap_or(0));
if let Some(v) = summary_version {
out = out.with_summary_version(v);
}
Ok((out, emits))
}
#[cfg(feature = "streaming")]
fn parse_cursor_bound(c: &StreamCursor) -> Result<(i64, i64), EngineError> {
match c {
StreamCursor::Start => Ok((i64::MIN, i64::MIN)),
StreamCursor::End => Ok((i64::MAX, i64::MAX)),
StreamCursor::At(s) => parse_concrete_cursor(s),
}
}
#[cfg(feature = "streaming")]
fn parse_concrete_cursor(s: &str) -> Result<(i64, i64), EngineError> {
let (ms, seq) = match s.split_once('-') {
Some((a, b)) => (a, b),
None => (s, "0"),
};
let ms: i64 = ms.parse().map_err(|_| EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("bad stream cursor '{s}' (ms)"),
})?;
let sq: i64 = seq.parse().map_err(|_| EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("bad stream cursor '{s}' (seq)"),
})?;
Ok((ms, sq))
}
#[cfg(feature = "streaming")]
fn row_to_frame(ts_ms: i64, seq: i64, fields_text: &str) -> StreamFrame {
use std::collections::BTreeMap;
let mut out: BTreeMap<String, String> = BTreeMap::new();
if let Ok(serde_json::Value::Object(map)) =
serde_json::from_str::<serde_json::Value>(fields_text)
{
for (k, v) in map {
let s = match v {
serde_json::Value::String(s) => s,
other => other.to_string(),
};
out.insert(k, s);
}
}
StreamFrame {
id: format!("{ts_ms}-{seq}"),
fields: out,
}
}
#[cfg(feature = "streaming")]
async fn read_stream_impl(
pool: &SqlitePool,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
from: StreamCursor,
to: StreamCursor,
count_limit: u64,
) -> Result<StreamFrames, EngineError> {
let (part, exec_uuid) = split_exec_id(execution_id)?;
let aidx: i64 = i64::from(attempt_index.0);
let (from_ms, from_seq) = parse_cursor_bound(&from)?;
let (to_ms, to_seq) = parse_cursor_bound(&to)?;
let lim = i64::try_from(count_limit.min(STREAM_READ_HARD_CAP)).unwrap_or(i64::MAX);
let rows = sqlx::query(q_stream::READ_STREAM_RANGE_SQL)
.bind(part)
.bind(exec_uuid)
.bind(aidx)
.bind(from_ms)
.bind(from_seq)
.bind(to_ms)
.bind(to_seq)
.bind(lim)
.fetch_all(pool)
.await
.map_err(map_sqlx_error)?;
let mut frames = Vec::with_capacity(rows.len());
for row in rows {
let ts: i64 = row.try_get("ts_ms").map_err(map_sqlx_error)?;
let seq: i64 = row.try_get("seq").map_err(map_sqlx_error)?;
let fields_text: String = row.try_get("fields").map_err(map_sqlx_error)?;
frames.push(row_to_frame(ts, seq, &fields_text));
}
Ok(StreamFrames {
frames,
closed_at: None,
closed_reason: None,
})
}
#[cfg(feature = "streaming")]
#[allow(clippy::too_many_arguments)] async fn tail_stream_impl(
pool: &SqlitePool,
pubsub: &PubSub,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
after: StreamCursor,
block_ms: u64,
count_limit: u64,
visibility: TailVisibility,
) -> Result<StreamFrames, EngineError> {
let (part, exec_uuid) = split_exec_id(execution_id)?;
let aidx: i64 = i64::from(attempt_index.0);
let (after_ms, after_seq) = match &after {
StreamCursor::At(s) => parse_concrete_cursor(s)?,
_ => {
return Err(EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: "tail_stream requires concrete after cursor".into(),
});
}
};
let lim = i64::try_from(count_limit.min(STREAM_READ_HARD_CAP)).unwrap_or(i64::MAX);
let sql = match visibility {
TailVisibility::ExcludeBestEffort => q_stream::TAIL_STREAM_AFTER_EXCLUDE_BE_SQL,
_ => q_stream::TAIL_STREAM_AFTER_SQL,
};
let mut rx = pubsub.stream_frame.subscribe();
let do_select = || async {
sqlx::query(sql)
.bind(part)
.bind(exec_uuid)
.bind(aidx)
.bind(after_ms)
.bind(after_seq)
.bind(lim)
.fetch_all(pool)
.await
.map_err(map_sqlx_error)
};
let rows = do_select().await?;
if !rows.is_empty() || block_ms == 0 {
return Ok(rows_to_frames(rows));
}
let start = std::time::Instant::now();
let total = Duration::from_millis(block_ms);
loop {
let remaining = match total.checked_sub(start.elapsed()) {
Some(r) if !r.is_zero() => r,
_ => break,
};
match tokio::time::timeout(remaining, rx.recv()).await {
Ok(Ok(_)) => {}
Ok(Err(broadcast::error::RecvError::Lagged(_))) => {}
Ok(Err(broadcast::error::RecvError::Closed)) => {
return Ok(rows_to_frames(do_select().await?));
}
Err(_) => break,
}
let rows = do_select().await?;
if !rows.is_empty() {
return Ok(rows_to_frames(rows));
}
if start.elapsed() >= total {
break;
}
}
Ok(StreamFrames::empty_open())
}
#[cfg(feature = "streaming")]
fn rows_to_frames(rows: Vec<sqlx::sqlite::SqliteRow>) -> StreamFrames {
let mut frames = Vec::with_capacity(rows.len());
for row in rows {
let ts: i64 = row.try_get("ts_ms").unwrap_or(0);
let seq: i64 = row.try_get("seq").unwrap_or(0);
let fields_text: String = row.try_get("fields").unwrap_or_default();
frames.push(row_to_frame(ts, seq, &fields_text));
}
StreamFrames {
frames,
closed_at: None,
closed_reason: None,
}
}
#[cfg(feature = "streaming")]
async fn read_summary_impl(
pool: &SqlitePool,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
) -> Result<Option<SummaryDocument>, EngineError> {
let (part, exec_uuid) = split_exec_id(execution_id)?;
let aidx: i64 = i64::from(attempt_index.0);
let row = sqlx::query(q_stream::READ_SUMMARY_FULL_SQL)
.bind(part)
.bind(exec_uuid)
.bind(aidx)
.fetch_optional(pool)
.await
.map_err(map_sqlx_error)?;
let Some(row) = row else { return Ok(None) };
let doc_text: String = row.try_get("document_json").map_err(map_sqlx_error)?;
let version: i64 = row.try_get("version").map_err(map_sqlx_error)?;
let patch_kind_wire: Option<String> = row
.try_get::<Option<String>, _>("patch_kind")
.unwrap_or(None);
let last_updated: i64 = row.try_get("last_updated_ms").map_err(map_sqlx_error)?;
let first_applied: i64 = row.try_get("first_applied_ms").map_err(map_sqlx_error)?;
let parsed: serde_json::Value =
serde_json::from_str(&doc_text).map_err(|e| EngineError::Validation {
kind: ValidationKind::Corruption,
detail: format!("corrupt summary document in ff_stream_summary: {e}"),
})?;
let bytes = serde_json::to_vec(&parsed).map_err(|e| EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("summary document not serialisable: {e}"),
})?;
let patch_kind = match patch_kind_wire.as_deref() {
Some("json-merge-patch") => PatchKind::JsonMergePatch,
_ => PatchKind::JsonMergePatch,
};
Ok(Some(SummaryDocument::new(
bytes,
u64::try_from(version).unwrap_or(0),
patch_kind,
u64::try_from(last_updated).unwrap_or(0),
u64::try_from(first_applied).unwrap_or(0),
)))
}
async fn claim_from_reclaim_impl(
pool: &SqlitePool,
pubsub: &PubSub,
token: &ResumeToken,
) -> Result<Option<Handle>, EngineError> {
let eid = &token.grant.execution_id;
let (part, exec_uuid) = split_exec_id(eid)?;
let mut conn = begin_immediate(pool).await?;
let result = claim_from_reclaim_inner(&mut conn, part, exec_uuid, token).await;
match result {
Ok(Some((handle, emits))) => {
commit_or_rollback(&mut conn).await?;
dispatch_pending_emits(pubsub, &emits);
Ok(Some(handle))
}
Ok(None) => {
rollback_quiet(&mut conn).await;
Ok(None)
}
Err(e) => {
rollback_quiet(&mut conn).await;
Err(e)
}
}
}
async fn claim_from_reclaim_inner(
conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
part: i64,
exec_uuid: Uuid,
token: &ResumeToken,
) -> Result<Option<(Handle, Vec<PendingEmit>)>, EngineError> {
let row = sqlx::query(q_lease::SELECT_LATEST_ATTEMPT_FOR_RECLAIM_SQL)
.bind(part)
.bind(exec_uuid)
.fetch_optional(&mut **conn)
.await
.map_err(map_sqlx_error)?;
let Some(row) = row else {
return Err(EngineError::NotFound { entity: "attempt" });
};
let attempt_index_i: i64 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
let current_epoch: i64 = row.try_get("lease_epoch").map_err(map_sqlx_error)?;
let expires_at: Option<i64> = row
.try_get::<Option<i64>, _>("lease_expires_at_ms")
.map_err(map_sqlx_error)?;
let now = now_ms();
let live = matches!(expires_at, Some(exp) if exp > now);
if live {
return Ok(None);
}
let lease_ttl_ms = i64::from(token.lease_ttl_ms);
let new_expires = now.saturating_add(lease_ttl_ms);
sqlx::query(q_lease::UPDATE_ATTEMPT_RECLAIM_SQL)
.bind(token.worker_id.as_str())
.bind(token.worker_instance_id.as_str())
.bind(new_expires)
.bind(now)
.bind(part)
.bind(exec_uuid)
.bind(attempt_index_i)
.execute(&mut **conn)
.await
.map_err(map_sqlx_error)?;
sqlx::query(q_lease::UPDATE_EXEC_CORE_RECLAIM_SQL)
.bind(part)
.bind(exec_uuid)
.execute(&mut **conn)
.await
.map_err(map_sqlx_error)?;
let ev = insert_lease_event(conn, part, exec_uuid, "reclaimed", now).await?;
let emits = vec![(OutboxChannel::LeaseHistory, ev)];
let new_epoch = current_epoch.saturating_add(1);
let payload = HandlePayload::new(
token.grant.execution_id.clone(),
AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0)),
AttemptId::new(),
LeaseId::new(),
LeaseEpoch(u64::try_from(new_epoch).unwrap_or(0)),
u64::from(token.lease_ttl_ms),
token.grant.lane_id.clone(),
token.worker_instance_id.clone(),
);
Ok(Some((encode_handle(&payload, HandleKind::Resumed), emits)))
}
#[cfg(feature = "core")]
fn encode_policy_json(
policy: Option<&ff_core::policy::ExecutionPolicy>,
) -> Result<Option<String>, EngineError> {
match policy {
Some(p) => serde_json::to_string(p)
.map(Some)
.map_err(|e| EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("create_execution: policy: serialize failed: {e}"),
}),
None => Ok(None),
}
}
#[cfg(feature = "core")]
fn build_create_execution_raw_fields(args: &CreateExecutionArgs) -> String {
use serde_json::{Map, Value};
let mut raw: Map<String, Value> = Map::new();
raw.insert(
"namespace".into(),
Value::String(args.namespace.as_str().to_owned()),
);
raw.insert(
"execution_kind".into(),
Value::String(args.execution_kind.clone()),
);
raw.insert(
"creator_identity".into(),
Value::String(args.creator_identity.clone()),
);
if let Some(k) = &args.idempotency_key {
raw.insert("idempotency_key".into(), Value::String(k.clone()));
}
if let Some(enc) = &args.payload_encoding {
raw.insert("payload_encoding".into(), Value::String(enc.clone()));
}
raw.insert(
"last_mutation_at".into(),
Value::String(args.now.0.to_string()),
);
raw.insert("total_attempt_count".into(), Value::String("0".to_owned()));
let tags_json: Map<String, Value> = args
.tags
.iter()
.map(|(k, v)| (k.clone(), Value::String(v.clone())))
.collect();
raw.insert("tags".into(), Value::Object(tags_json));
Value::Object(raw).to_string()
}
#[cfg(feature = "core")]
async fn create_execution_impl(
pool: &SqlitePool,
args: &CreateExecutionArgs,
) -> Result<CreateExecutionResult, EngineError> {
let part: i64 = i64::from(args.execution_id.partition());
let exec_uuid = {
let s = args.execution_id.as_str();
let tail = s
.split_once("}:")
.map(|(_, t)| t)
.ok_or_else(|| EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("execution_id missing `}}:` separator: {s}"),
})?;
Uuid::parse_str(tail).map_err(|e| EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("execution_id UUID invalid: {e}"),
})?
};
let lane_id = args.lane_id.as_str().to_owned();
let priority: i64 = i64::from(args.priority);
let created_at_ms: i64 = args.now.0;
let deadline_at_ms: Option<i64> = args.execution_deadline_at.map(|t| t.0);
let raw_fields = build_create_execution_raw_fields(args);
let policy_json = encode_policy_json(args.policy.as_ref())?;
let mut conn = begin_immediate(pool).await?;
let insert_result = sqlx::query(q_exec::INSERT_EXEC_CORE_SQL)
.bind(part)
.bind(exec_uuid)
.bind(&lane_id)
.bind(priority)
.bind(created_at_ms)
.bind(deadline_at_ms)
.bind(args.input_payload.as_slice())
.bind(policy_json.as_deref())
.bind(&raw_fields)
.execute(&mut *conn)
.await
.map_err(map_sqlx_error);
let result = async {
let res = insert_result?;
let inserted = res.rows_affected() > 0;
if inserted {
let required: Vec<String> = args
.policy
.as_ref()
.and_then(|p| p.routing_requirements.as_ref())
.map(|r| r.required_capabilities.iter().cloned().collect())
.unwrap_or_default();
for cap in &required {
sqlx::query(q_exec::INSERT_EXEC_CAPABILITY_SQL)
.bind(exec_uuid)
.bind(cap)
.execute(&mut *conn)
.await
.map_err(map_sqlx_error)?;
}
}
sqlx::query(q_exec::INSERT_LANE_REGISTRY_SQL)
.bind(&lane_id)
.bind(created_at_ms)
.execute(&mut *conn)
.await
.map_err(map_sqlx_error)?;
Ok::<bool, EngineError>(inserted)
}
.await;
match result {
Ok(inserted) => {
commit_or_rollback(&mut conn).await?;
if inserted {
Ok(CreateExecutionResult::Created {
execution_id: args.execution_id.clone(),
public_state: PublicState::Waiting,
})
} else {
Ok(CreateExecutionResult::Duplicate {
execution_id: args.execution_id.clone(),
})
}
}
Err(e) => {
rollback_quiet(&mut conn).await;
Err(e)
}
}
}
#[cfg(feature = "core")]
async fn create_flow_impl(
pool: &SqlitePool,
args: &CreateFlowArgs,
) -> Result<CreateFlowResult, EngineError> {
let part: i64 = 0;
let flow_uuid: Uuid = args.flow_id.0;
let now_ms = args.now.0;
let raw_fields = serde_json::json!({
"flow_kind": args.flow_kind,
"namespace": args.namespace.as_str(),
"node_count": 0,
"edge_count": 0,
"last_mutation_at_ms": now_ms,
})
.to_string();
let mut conn = begin_immediate(pool).await?;
let ins = sqlx::query(q_flow::INSERT_FLOW_CORE_SQL)
.bind(part)
.bind(flow_uuid)
.bind(now_ms)
.bind(&raw_fields)
.execute(&mut *conn)
.await
.map_err(map_sqlx_error);
match ins {
Ok(r) => {
commit_or_rollback(&mut conn).await?;
if r.rows_affected() > 0 {
Ok(CreateFlowResult::Created {
flow_id: args.flow_id.clone(),
})
} else {
Ok(CreateFlowResult::AlreadySatisfied {
flow_id: args.flow_id.clone(),
})
}
}
Err(e) => {
rollback_quiet(&mut conn).await;
Err(e)
}
}
}
#[cfg(feature = "core")]
async fn add_execution_to_flow_impl(
pool: &SqlitePool,
args: &AddExecutionToFlowArgs,
) -> Result<AddExecutionToFlowResult, EngineError> {
let part: i64 = 0;
let flow_uuid: Uuid = args.flow_id.0;
let (exec_part, exec_uuid) = split_exec_id(&args.execution_id)?;
if exec_part != part {
return Err(EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("execution partition mismatch: expected 0, got {exec_part}"),
});
}
let now_ms = args.now.0;
let mut conn = begin_immediate(pool).await?;
let work = async {
let flow_row = sqlx::query(q_flow_staging::SELECT_FLOW_CORE_FOR_STAGE_SQL)
.bind(part)
.bind(flow_uuid)
.fetch_optional(&mut *conn)
.await
.map_err(map_sqlx_error)?;
let Some(flow_row) = flow_row else {
return Err(EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: "flow_not_found".into(),
});
};
let public_flow_state: String = flow_row
.try_get("public_flow_state")
.map_err(map_sqlx_error)?;
if matches!(
public_flow_state.as_str(),
"cancelled" | "completed" | "failed" | "terminal"
) {
return Err(EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: "flow_already_terminal".into(),
});
}
let raw_fields_text: String = flow_row.try_get("raw_fields").map_err(map_sqlx_error)?;
let exec_row = sqlx::query(q_flow_staging::SELECT_EXEC_FLOW_ID_SQL)
.bind(part)
.bind(exec_uuid)
.fetch_optional(&mut *conn)
.await
.map_err(map_sqlx_error)?;
let Some(exec_row) = exec_row else {
return Err(EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: "execution_not_found".into(),
});
};
let existing_flow_id: Option<Uuid> = exec_row.try_get("flow_id").map_err(map_sqlx_error)?;
if existing_flow_id == Some(flow_uuid) {
let raw_val: serde_json::Value = serde_json::from_str(&raw_fields_text)
.unwrap_or_else(|_| serde_json::Value::Object(serde_json::Map::new()));
let nc = raw_val
.get("node_count")
.and_then(|v| v.as_u64())
.and_then(|n| u32::try_from(n).ok())
.unwrap_or(0);
return Ok(AddExecutionToFlowResult::AlreadyMember {
execution_id: args.execution_id.clone(),
node_count: nc,
});
}
if let Some(other) = existing_flow_id
&& other != flow_uuid
{
return Err(EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("already_member_of_different_flow:{other}"),
});
}
sqlx::query(q_flow_staging::UPDATE_EXEC_SET_FLOW_ID_SQL)
.bind(part)
.bind(exec_uuid)
.bind(flow_uuid)
.execute(&mut *conn)
.await
.map_err(map_sqlx_error)?;
sqlx::query(q_flow_staging::BUMP_FLOW_NODE_COUNT_SQL)
.bind(part)
.bind(flow_uuid)
.bind(now_ms)
.execute(&mut *conn)
.await
.map_err(map_sqlx_error)?;
let new_nc: i64 = sqlx::query_scalar(q_flow_staging::SELECT_FLOW_NODE_COUNT_SQL)
.bind(part)
.bind(flow_uuid)
.fetch_one(&mut *conn)
.await
.map_err(map_sqlx_error)?;
Ok(AddExecutionToFlowResult::Added {
execution_id: args.execution_id.clone(),
new_node_count: u32::try_from(new_nc.max(0)).unwrap_or(0),
})
}
.await;
match work {
Ok(res) => {
commit_or_rollback(&mut conn).await?;
Ok(res)
}
Err(e) => {
rollback_quiet(&mut conn).await;
Err(e)
}
}
}
#[cfg(feature = "core")]
async fn stage_dependency_edge_impl(
pool: &SqlitePool,
args: &StageDependencyEdgeArgs,
) -> Result<StageDependencyEdgeResult, EngineError> {
if args.upstream_execution_id == args.downstream_execution_id {
return Err(EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: "self_referencing_edge".into(),
});
}
let part: i64 = 0;
let flow_uuid: Uuid = args.flow_id.0;
let edge_uuid: Uuid = args.edge_id.0;
let (up_part, upstream_uuid) = split_exec_id(&args.upstream_execution_id)?;
let (down_part, downstream_uuid) = split_exec_id(&args.downstream_execution_id)?;
if up_part != part || down_part != part {
return Err(EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: "execution partition mismatch under single-writer SQLite".into(),
});
}
let now_ms = args.now.0;
let expected_rev = i64::try_from(args.expected_graph_revision).unwrap_or(i64::MAX);
let mut conn = begin_immediate(pool).await?;
let work = async {
let cas = sqlx::query(q_flow_staging::CAS_BUMP_FLOW_REV_SQL)
.bind(part)
.bind(flow_uuid)
.bind(expected_rev)
.bind(now_ms)
.execute(&mut *conn)
.await
.map_err(map_sqlx_error)?;
if cas.rows_affected() == 0 {
let probe = sqlx::query(q_flow_staging::SELECT_FLOW_REV_AND_STATE_SQL)
.bind(part)
.bind(flow_uuid)
.fetch_optional(&mut *conn)
.await
.map_err(map_sqlx_error)?;
return match probe {
None => Err(EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: "flow_not_found".into(),
}),
Some(r) => {
let state: String = r.try_get("public_flow_state").map_err(map_sqlx_error)?;
if matches!(
state.as_str(),
"cancelled" | "completed" | "failed" | "terminal"
) {
Err(EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: "flow_already_terminal".into(),
})
} else {
Err(EngineError::Contention(ContentionKind::StaleGraphRevision))
}
}
};
}
let member_rows =
sqlx::query_scalar::<_, Uuid>(q_flow_staging::SELECT_FLOW_MEMBERSHIP_PAIR_SQL)
.bind(part)
.bind(flow_uuid)
.bind(upstream_uuid)
.bind(downstream_uuid)
.fetch_all(&mut *conn)
.await
.map_err(map_sqlx_error)?;
if !member_rows.contains(&upstream_uuid) || !member_rows.contains(&downstream_uuid) {
return Err(EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: "execution_not_in_flow".into(),
});
}
let policy_json = serde_json::json!({
"dependency_kind": args.dependency_kind,
"satisfaction_condition": "all_required",
"data_passing_ref": args.data_passing_ref.clone().unwrap_or_default(),
"edge_state": "pending",
"created_at_ms": now_ms,
"created_by": "engine",
"staged_at_ms": now_ms,
"applied_at_ms": serde_json::Value::Null,
})
.to_string();
let ins = sqlx::query(q_flow_staging::INSERT_EDGE_SQL)
.bind(part)
.bind(flow_uuid)
.bind(edge_uuid)
.bind(upstream_uuid)
.bind(downstream_uuid)
.bind(&policy_json)
.execute(&mut *conn)
.await
.map_err(map_sqlx_error)?;
if ins.rows_affected() == 0 {
return Err(EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("dependency_already_exists:edge_id={edge_uuid}"),
});
}
let new_rev: i64 = sqlx::query_scalar::<_, i64>(
"SELECT graph_revision FROM ff_flow_core \
WHERE partition_key = ?1 AND flow_id = ?2",
)
.bind(part)
.bind(flow_uuid)
.fetch_one(&mut *conn)
.await
.map_err(map_sqlx_error)?;
Ok(StageDependencyEdgeResult::Staged {
edge_id: args.edge_id.clone(),
new_graph_revision: u64::try_from(new_rev).unwrap_or(0),
})
}
.await;
match work {
Ok(res) => {
commit_or_rollback(&mut conn).await?;
Ok(res)
}
Err(e) => {
rollback_quiet(&mut conn).await;
Err(e)
}
}
}
#[cfg(feature = "core")]
async fn apply_dependency_to_child_impl(
pool: &SqlitePool,
args: &ApplyDependencyToChildArgs,
) -> Result<ApplyDependencyToChildResult, EngineError> {
let part: i64 = 0;
let flow_uuid: Uuid = args.flow_id.0;
let edge_uuid: Uuid = args.edge_id.0;
let (down_part, downstream_uuid) = split_exec_id(&args.downstream_execution_id)?;
if down_part != part {
return Err(EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: "execution partition mismatch under single-writer SQLite".into(),
});
}
let now_ms = args.now.0;
let mut conn = begin_immediate(pool).await?;
let work = async {
let row = sqlx::query(q_flow_staging::SELECT_EDGE_POLICY_SQL)
.bind(part)
.bind(flow_uuid)
.bind(edge_uuid)
.fetch_optional(&mut *conn)
.await
.map_err(map_sqlx_error)?;
let Some(row) = row else {
return Err(EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: "edge_not_found".into(),
});
};
let policy_text: String = row.try_get("policy").map_err(map_sqlx_error)?;
let mut policy: serde_json::Value =
serde_json::from_str(&policy_text).map_err(|e| EngineError::Validation {
kind: ValidationKind::Corruption,
detail: format!("ff_edge.policy: {e}"),
})?;
let already_applied = policy
.get("applied_at_ms")
.and_then(|v| v.as_i64())
.is_some();
if already_applied {
return Ok(ApplyDependencyToChildResult::AlreadyApplied);
}
if let Some(obj) = policy.as_object_mut() {
obj.insert("applied_at_ms".into(), serde_json::json!(now_ms));
obj.insert("edge_state".into(), serde_json::json!("applied"));
}
let new_policy_text = policy.to_string();
sqlx::query(q_flow_staging::UPDATE_EDGE_POLICY_SQL)
.bind(part)
.bind(flow_uuid)
.bind(edge_uuid)
.bind(&new_policy_text)
.execute(&mut *conn)
.await
.map_err(map_sqlx_error)?;
let default_group_policy = serde_json::json!({ "kind": "all_of" }).to_string();
sqlx::query(q_flow_staging::UPSERT_EDGE_GROUP_APPLY_SQL)
.bind(part)
.bind(flow_uuid)
.bind(downstream_uuid)
.bind(&default_group_policy)
.execute(&mut *conn)
.await
.map_err(map_sqlx_error)?;
let unsatisfied: i64 =
sqlx::query_scalar(q_flow_staging::SELECT_EDGE_GROUP_RUNNING_COUNT_SQL)
.bind(part)
.bind(flow_uuid)
.bind(downstream_uuid)
.fetch_one(&mut *conn)
.await
.map_err(map_sqlx_error)?;
Ok(ApplyDependencyToChildResult::Applied {
unsatisfied_count: u32::try_from(unsatisfied.max(0)).unwrap_or(0),
})
}
.await;
match work {
Ok(res) => {
commit_or_rollback(&mut conn).await?;
Ok(res)
}
Err(e) => {
rollback_quiet(&mut conn).await;
Err(e)
}
}
}
fn cancel_policy_to_str(p: CancelFlowPolicy) -> &'static str {
match p {
CancelFlowPolicy::FlowOnly => "cancel_flow_only",
CancelFlowPolicy::CancelAll => "cancel_all",
CancelFlowPolicy::CancelPending => "cancel_pending",
_ => "cancel_flow_only",
}
}
async fn cancel_flow_impl(
pool: &SqlitePool,
pubsub: &PubSub,
id: &FlowId,
policy: CancelFlowPolicy,
) -> Result<CancelFlowResult, EngineError> {
let part: i64 = 0;
let flow_uuid: Uuid = id.0;
let policy_str = cancel_policy_to_str(policy);
let now_ms = now_ms();
let mut conn = begin_immediate(pool).await?;
let work: Result<(CancelFlowResult, Vec<PendingEmit>), EngineError> = async {
let flip = sqlx::query(q_flow::UPDATE_FLOW_CORE_CANCEL_SQL)
.bind(part)
.bind(flow_uuid)
.bind(now_ms)
.bind(policy_str)
.execute(&mut *conn)
.await
.map_err(map_sqlx_error)?;
if flip.rows_affected() == 0 {
return Ok((
CancelFlowResult::Cancelled {
cancellation_policy: policy_str.to_owned(),
member_execution_ids: Vec::new(),
},
Vec::new(),
));
}
let member_rows: Vec<Uuid> = if matches!(policy, CancelFlowPolicy::FlowOnly) {
Vec::new()
} else {
let sql = match policy {
CancelFlowPolicy::CancelPending => q_flow::SELECT_FLOW_MEMBERS_CANCEL_PENDING_SQL,
_ => q_flow::SELECT_FLOW_MEMBERS_CANCEL_ALL_SQL,
};
sqlx::query_scalar::<_, Uuid>(sql)
.bind(part)
.bind(flow_uuid)
.fetch_all(&mut *conn)
.await
.map_err(map_sqlx_error)?
};
let mut member_execution_ids: Vec<String> = Vec::with_capacity(member_rows.len());
let mut emits: Vec<PendingEmit> = Vec::new();
for exec_uuid in &member_rows {
sqlx::query(q_flow::UPDATE_EXEC_CORE_CANCEL_MEMBER_SQL)
.bind(part)
.bind(exec_uuid)
.bind(now_ms)
.execute(&mut *conn)
.await
.map_err(map_sqlx_error)?;
sqlx::query(q_flow::UPDATE_ATTEMPT_CLEAR_OUTCOME_FOR_CURRENT_SQL)
.bind(part)
.bind(exec_uuid)
.execute(&mut *conn)
.await
.map_err(map_sqlx_error)?;
let completion_ev =
insert_completion_event_ev(&mut conn, part, *exec_uuid, "cancelled", now_ms)
.await?;
emits.push((OutboxChannel::Completion, completion_ev));
let lease_ev =
insert_lease_event(&mut conn, part, *exec_uuid, "revoked", now_ms).await?;
emits.push((OutboxChannel::LeaseHistory, lease_ev));
member_execution_ids.push(format!("{{fp:{part}}}:{exec_uuid}"));
}
if matches!(policy, CancelFlowPolicy::CancelPending) {
sqlx::query(q_flow::INSERT_PENDING_CANCEL_GROUPS_SQL)
.bind(part)
.bind(flow_uuid)
.bind(now_ms)
.execute(&mut *conn)
.await
.map_err(map_sqlx_error)?;
}
Ok((
CancelFlowResult::Cancelled {
cancellation_policy: policy_str.to_owned(),
member_execution_ids,
},
emits,
))
}
.await;
match work {
Ok((res, emits)) => {
commit_or_rollback(&mut conn).await?;
dispatch_pending_emits(pubsub, &emits);
Ok(res)
}
Err(e) => {
rollback_quiet(&mut conn).await;
Err(e)
}
}
}
pub(crate) struct SqliteBackendInner {
#[allow(dead_code)]
pub(crate) pool: SqlitePool,
pub(crate) pubsub: PubSub,
#[allow(dead_code)]
pub(crate) key: PathBuf,
#[allow(dead_code)]
pub(crate) memory_sentinel: Option<std::sync::Mutex<Option<sqlx::SqliteConnection>>>,
pub(crate) scanner_handle: std::sync::OnceLock<crate::scanner_supervisor::SqliteScannerHandle>,
}
fn sqlite_supports_base() -> Supports {
let mut s = Supports::none();
s.cancel_flow_wait_timeout = true;
s.cancel_flow_wait_indefinite = true;
s.rotate_waitpoint_hmac_secret_all = true;
s.seed_waitpoint_hmac_secret = true;
s.subscribe_lease_history = true;
s.subscribe_completion = true;
s.subscribe_signal_delivery = true;
s.stream_durable_summary = true;
s.stream_best_effort_live = true;
s.prepare = true;
s.cancel_execution = true;
s.change_priority = true;
s.replay_execution = true;
s.revoke_lease = true;
s.read_execution_state = true;
s.read_execution_info = true;
s.get_execution_result = true;
s.budget_admin = true;
s.quota_admin = true;
s.list_pending_waitpoints = true;
s.cancel_flow_header = true;
s.ack_cancel_member = true;
s.register_worker = true;
s.heartbeat_worker = true;
s.mark_worker_dead = true;
s.list_expired_leases = true;
s.list_workers = true;
s.release_admission = true;
s.read_quota_policy_limits = true;
s
}
#[derive(Clone)]
pub struct SqliteBackend {
inner: Arc<SqliteBackendInner>,
}
impl std::fmt::Debug for SqliteBackend {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SqliteBackend")
.field("key", &self.inner.key)
.finish()
}
}
impl SqliteBackend {
pub async fn new(path: &str) -> Result<Arc<Self>, BackendError> {
Self::new_with_tuning(path, 4, true).await
}
pub async fn new_with_tuning(
path: &str,
pool_size: u32,
wal_mode: bool,
) -> Result<Arc<Self>, BackendError> {
if std::env::var("FF_DEV_MODE").as_deref() != Ok("1") {
return Err(BackendError::RequiresDevMode);
}
let is_memory = is_memory_uri(path);
let effective_path: std::borrow::Cow<'_, str> = if path == ":memory:" {
std::borrow::Cow::Borrowed("file::memory:?cache=shared")
} else {
std::borrow::Cow::Borrowed(path)
};
let key = if is_memory {
PathBuf::from(effective_path.as_ref())
} else {
std::fs::canonicalize(path).unwrap_or_else(|_| PathBuf::from(path))
};
if let Some(existing) = registry::lookup(&key) {
return Ok(Arc::new(Self { inner: existing }));
}
let opts: SqliteConnectOptions = effective_path
.parse::<SqliteConnectOptions>()
.map_err(|e| BackendError::Valkey {
kind: ff_core::engine_error::BackendErrorKind::Protocol,
message: format!("sqlite connect-opts parse for {path:?}: {e}"),
})?
.create_if_missing(true);
let opts = if wal_mode && !is_memory {
opts.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
} else {
opts
};
let pool_max = pool_size.max(1);
let pool = SqlitePoolOptions::new()
.max_connections(pool_max)
.connect_with(opts.clone())
.await
.map_err(|e| BackendError::Valkey {
kind: ff_core::engine_error::BackendErrorKind::Transport,
message: format!("sqlite pool connect for {path:?}: {e}"),
})?;
let memory_sentinel = if is_memory {
use sqlx::ConnectOptions;
let conn = opts.connect().await.map_err(|e| BackendError::Valkey {
kind: ff_core::engine_error::BackendErrorKind::Transport,
message: format!("sqlite sentinel connect for {path:?}: {e}"),
})?;
Some(std::sync::Mutex::new(Some(conn)))
} else {
None
};
tracing::warn!(
"FlowFabric SQLite backend active (FF_DEV_MODE=1). \
This backend is dev-only; single-writer, single-process, \
not supported in production. See RFC-023."
);
sqlx::migrate!("./migrations")
.run(&pool)
.await
.map_err(|e| BackendError::Valkey {
kind: ff_core::engine_error::BackendErrorKind::Protocol,
message: format!("sqlite migrate for {path:?}: {e}"),
})?;
let inner = Arc::new(SqliteBackendInner {
pool,
pubsub: PubSub::new(),
key: key.clone(),
memory_sentinel,
scanner_handle: std::sync::OnceLock::new(),
});
let inner = registry::insert(key, inner);
Ok(Arc::new(Self { inner }))
}
#[allow(dead_code)]
pub(crate) fn pool(&self) -> &SqlitePool {
&self.inner.pool
}
pub fn with_scanners(&self, cfg: crate::scanner_supervisor::SqliteScannerConfig) -> bool {
let mut result = false;
let _ = self.inner.scanner_handle.get_or_init(|| {
result = true;
crate::scanner_supervisor::spawn_scanners(self.inner.pool.clone(), cfg)
});
result
}
#[doc(hidden)]
pub async fn budget_reset_scan_tick_for_test(
&self,
now_ms: i64,
) -> Result<(u32, u32), EngineError> {
let report = crate::reconcilers::budget_reset::scan_tick(&self.inner.pool, now_ms).await?;
Ok((report.processed, report.errors))
}
#[doc(hidden)]
pub fn pool_for_test(&self) -> &SqlitePool {
&self.inner.pool
}
#[doc(hidden)]
pub fn subscribe_completion_for_test(
&self,
) -> tokio::sync::broadcast::Receiver<crate::pubsub::OutboxEvent> {
self.inner.pubsub.completion.subscribe()
}
#[doc(hidden)]
#[cfg(test)]
pub(crate) fn stream_frame_receiver_for_test(
&self,
) -> tokio::sync::broadcast::Receiver<crate::pubsub::OutboxEvent> {
self.inner.pubsub.stream_frame.subscribe()
}
}
#[async_trait]
impl EngineBackend for SqliteBackend {
async fn shutdown_prepare(&self, grace: Duration) -> Result<(), EngineError> {
if let Some(handle) = self.inner.scanner_handle.get() {
let timed_out = handle.shutdown(grace).await;
if timed_out > 0 {
tracing::warn!(
timed_out,
?grace,
"sqlite scanner supervisor exceeded grace on shutdown"
);
}
}
Ok(())
}
async fn claim(
&self,
lane: &LaneId,
capabilities: &CapabilitySet,
policy: ClaimPolicy,
) -> Result<Option<Handle>, EngineError> {
let pool = &self.inner.pool;
let pubsub = &self.inner.pubsub;
retry_serializable(|| claim_impl(pool, pubsub, lane, capabilities, &policy)).await
}
async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError> {
let pool = &self.inner.pool;
let pubsub = &self.inner.pubsub;
retry_serializable(|| renew_impl(pool, pubsub, handle)).await
}
async fn renew_lease(
&self,
args: ff_core::contracts::RenewLeaseArgs,
) -> Result<ff_core::contracts::RenewLeaseResult, EngineError> {
let pool = &self.inner.pool;
let pubsub = &self.inner.pubsub;
retry_serializable(|| crate::typed_ops::renew_lease(pool, pubsub, args.clone())).await
}
async fn complete_execution(
&self,
args: ff_core::contracts::CompleteExecutionArgs,
) -> Result<ff_core::contracts::CompleteExecutionResult, EngineError> {
let pool = &self.inner.pool;
let pubsub = &self.inner.pubsub;
retry_serializable(|| crate::typed_ops::complete_execution(pool, pubsub, args.clone()))
.await
}
async fn fail_execution(
&self,
args: ff_core::contracts::FailExecutionArgs,
) -> Result<ff_core::contracts::FailExecutionResult, EngineError> {
let pool = &self.inner.pool;
let pubsub = &self.inner.pubsub;
retry_serializable(|| crate::typed_ops::fail_execution(pool, pubsub, args.clone())).await
}
async fn resume_execution(
&self,
args: ff_core::contracts::ResumeExecutionArgs,
) -> Result<ff_core::contracts::ResumeExecutionResult, EngineError> {
let pool = &self.inner.pool;
let pubsub = &self.inner.pubsub;
retry_serializable(|| crate::typed_ops::resume_execution(pool, pubsub, args.clone())).await
}
async fn evaluate_flow_eligibility(
&self,
args: ff_core::contracts::EvaluateFlowEligibilityArgs,
) -> Result<ff_core::contracts::EvaluateFlowEligibilityResult, EngineError> {
crate::typed_ops::evaluate_flow_eligibility(&self.inner.pool, args).await
}
async fn claim_execution(
&self,
args: ff_core::contracts::ClaimExecutionArgs,
) -> Result<ff_core::contracts::ClaimExecutionResult, EngineError> {
let pool = &self.inner.pool;
let pubsub = &self.inner.pubsub;
let pc = ff_core::partition::PartitionConfig::default();
retry_serializable(|| {
crate::typed_ops::claim_execution(pool, &pc, pubsub, args.clone())
})
.await
}
async fn check_admission(
&self,
quota_policy_id: &ff_core::types::QuotaPolicyId,
_dimension: &str,
args: ff_core::contracts::CheckAdmissionArgs,
) -> Result<ff_core::contracts::CheckAdmissionResult, EngineError> {
let pool = &self.inner.pool;
let pc = ff_core::partition::PartitionConfig::default();
retry_serializable(|| {
crate::typed_ops::check_admission(pool, &pc, quota_policy_id, args.clone())
})
.await
}
async fn progress(
&self,
handle: &Handle,
percent: Option<u8>,
message: Option<String>,
) -> Result<(), EngineError> {
let pool = &self.inner.pool;
retry_serializable(|| progress_impl(pool, handle, percent, message.clone())).await
}
async fn append_frame(
&self,
handle: &Handle,
frame: Frame,
) -> Result<AppendFrameOutcome, EngineError> {
let pool = &self.inner.pool;
let pubsub = &self.inner.pubsub;
retry_serializable(|| append_frame_impl(pool, pubsub, handle, frame.clone())).await
}
async fn complete(&self, handle: &Handle, payload: Option<Vec<u8>>) -> Result<(), EngineError> {
let pool = &self.inner.pool;
let pubsub = &self.inner.pubsub;
retry_serializable(|| complete_impl(pool, pubsub, handle, payload.clone())).await
}
async fn fail(
&self,
handle: &Handle,
reason: FailureReason,
classification: FailureClass,
) -> Result<FailOutcome, EngineError> {
let pool = &self.inner.pool;
let pubsub = &self.inner.pubsub;
retry_serializable(|| fail_impl(pool, pubsub, handle, reason.clone(), classification)).await
}
async fn cancel(&self, _handle: &Handle, _reason: &str) -> Result<(), EngineError> {
unavailable("sqlite.cancel")
}
async fn suspend(
&self,
handle: &Handle,
args: SuspendArgs,
) -> Result<SuspendOutcome, EngineError> {
let pool = &self.inner.pool;
let pubsub = &self.inner.pubsub;
retry_serializable(|| crate::suspend_ops::suspend_impl(pool, pubsub, handle, args.clone()))
.await
}
async fn suspend_by_triple(
&self,
exec_id: ExecutionId,
triple: LeaseFence,
args: SuspendArgs,
) -> Result<SuspendOutcome, EngineError> {
let pool = &self.inner.pool;
let pubsub = &self.inner.pubsub;
retry_serializable(|| {
crate::suspend_ops::suspend_by_triple_impl(
pool,
pubsub,
exec_id.clone(),
triple.clone(),
args.clone(),
)
})
.await
}
async fn create_waitpoint(
&self,
handle: &Handle,
waitpoint_key: &str,
expires_in: Duration,
) -> Result<PendingWaitpoint, EngineError> {
let pool = &self.inner.pool;
retry_serializable(|| {
crate::suspend_ops::create_waitpoint_impl(pool, handle, waitpoint_key, expires_in)
})
.await
}
#[cfg(feature = "core")]
async fn read_waitpoint_token(
&self,
partition: PartitionKey,
waitpoint_id: &ff_core::types::WaitpointId,
) -> Result<Option<String>, EngineError> {
crate::reads::read_waitpoint_token_impl(&self.inner.pool, &partition, waitpoint_id).await
}
async fn observe_signals(&self, handle: &Handle) -> Result<Vec<ResumeSignal>, EngineError> {
let pool = &self.inner.pool;
retry_serializable(|| crate::suspend_ops::observe_signals_impl(pool, handle)).await
}
async fn claim_from_resume_grant(&self, token: ResumeToken) -> Result<Option<Handle>, EngineError> {
let pool = &self.inner.pool;
let pubsub = &self.inner.pubsub;
retry_serializable(|| claim_from_reclaim_impl(pool, pubsub, &token)).await
}
async fn issue_reclaim_grant(
&self,
args: IssueReclaimGrantArgs,
) -> Result<IssueReclaimGrantOutcome, EngineError> {
let pool = &self.inner.pool;
retry_serializable(|| crate::reclaim::issue_reclaim_grant_impl(pool, &args)).await
}
async fn reclaim_execution(
&self,
args: ReclaimExecutionArgs,
) -> Result<ReclaimExecutionOutcome, EngineError> {
let pool = &self.inner.pool;
let pubsub = &self.inner.pubsub;
retry_serializable(|| crate::reclaim::reclaim_execution_impl(pool, pubsub, &args)).await
}
async fn delay(&self, _handle: &Handle, _delay_until: TimestampMs) -> Result<(), EngineError> {
unavailable("sqlite.delay")
}
async fn wait_children(&self, _handle: &Handle) -> Result<(), EngineError> {
unavailable("sqlite.wait_children")
}
async fn describe_execution(
&self,
_id: &ExecutionId,
) -> Result<Option<ExecutionSnapshot>, EngineError> {
unavailable("sqlite.describe_execution")
}
async fn read_execution_context(
&self,
execution_id: &ExecutionId,
) -> Result<ExecutionContext, EngineError> {
crate::reads::read_execution_context_impl(&self.inner.pool, execution_id).await
}
async fn read_current_attempt_index(
&self,
execution_id: &ExecutionId,
) -> Result<ff_core::types::AttemptIndex, EngineError> {
crate::reads::read_current_attempt_index_impl(&self.inner.pool, execution_id).await
}
async fn read_total_attempt_count(
&self,
execution_id: &ExecutionId,
) -> Result<ff_core::types::AttemptIndex, EngineError> {
crate::reads::read_total_attempt_count_impl(&self.inner.pool, execution_id).await
}
async fn describe_flow(&self, _id: &FlowId) -> Result<Option<FlowSnapshot>, EngineError> {
unavailable("sqlite.describe_flow")
}
async fn set_execution_tag(
&self,
execution_id: &ExecutionId,
key: &str,
value: &str,
) -> Result<(), EngineError> {
ff_core::engine_backend::validate_tag_key(key)?;
crate::reads::set_execution_tag_impl(&self.inner.pool, execution_id, key, value).await
}
async fn set_flow_tag(
&self,
flow_id: &FlowId,
key: &str,
value: &str,
) -> Result<(), EngineError> {
ff_core::engine_backend::validate_tag_key(key)?;
crate::reads::set_flow_tag_impl(&self.inner.pool, flow_id, key, value).await
}
async fn get_execution_tag(
&self,
execution_id: &ExecutionId,
key: &str,
) -> Result<Option<String>, EngineError> {
ff_core::engine_backend::validate_tag_key(key)?;
crate::reads::get_execution_tag_impl(&self.inner.pool, execution_id, key).await
}
async fn get_flow_tag(
&self,
flow_id: &FlowId,
key: &str,
) -> Result<Option<String>, EngineError> {
ff_core::engine_backend::validate_tag_key(key)?;
crate::reads::get_flow_tag_impl(&self.inner.pool, flow_id, key).await
}
async fn get_execution_namespace(
&self,
execution_id: &ExecutionId,
) -> Result<Option<String>, EngineError> {
crate::reads::get_execution_namespace_impl(&self.inner.pool, execution_id).await
}
#[cfg(feature = "core")]
async fn list_edges(
&self,
_flow_id: &FlowId,
_direction: EdgeDirection,
) -> Result<Vec<EdgeSnapshot>, EngineError> {
unavailable("sqlite.list_edges")
}
#[cfg(feature = "core")]
async fn describe_edge(
&self,
_flow_id: &FlowId,
_edge_id: &EdgeId,
) -> Result<Option<EdgeSnapshot>, EngineError> {
unavailable("sqlite.describe_edge")
}
#[cfg(feature = "core")]
async fn resolve_execution_flow_id(
&self,
_eid: &ExecutionId,
) -> Result<Option<FlowId>, EngineError> {
unavailable("sqlite.resolve_execution_flow_id")
}
#[cfg(feature = "core")]
async fn list_flows(
&self,
_partition: PartitionKey,
_cursor: Option<FlowId>,
_limit: usize,
) -> Result<ListFlowsPage, EngineError> {
unavailable("sqlite.list_flows")
}
#[cfg(feature = "core")]
async fn list_lanes(
&self,
_cursor: Option<LaneId>,
_limit: usize,
) -> Result<ListLanesPage, EngineError> {
unavailable("sqlite.list_lanes")
}
#[cfg(feature = "core")]
async fn list_suspended(
&self,
_partition: PartitionKey,
_cursor: Option<ExecutionId>,
_limit: usize,
) -> Result<ListSuspendedPage, EngineError> {
unavailable("sqlite.list_suspended")
}
#[cfg(feature = "core")]
async fn list_executions(
&self,
_partition: PartitionKey,
_cursor: Option<ExecutionId>,
_limit: usize,
) -> Result<ListExecutionsPage, EngineError> {
unavailable("sqlite.list_executions")
}
#[cfg(feature = "core")]
async fn deliver_signal(
&self,
args: DeliverSignalArgs,
) -> Result<DeliverSignalResult, EngineError> {
let pool = &self.inner.pool;
let pubsub = &self.inner.pubsub;
retry_serializable(|| crate::suspend_ops::deliver_signal_impl(pool, pubsub, args.clone()))
.await
}
#[cfg(feature = "core")]
async fn claim_resumed_execution(
&self,
args: ClaimResumedExecutionArgs,
) -> Result<ClaimResumedExecutionResult, EngineError> {
let pool = &self.inner.pool;
let pubsub = &self.inner.pubsub;
retry_serializable(|| {
crate::suspend_ops::claim_resumed_execution_impl(pool, pubsub, args.clone())
})
.await
}
async fn cancel_flow(
&self,
id: &FlowId,
policy: CancelFlowPolicy,
_wait: CancelFlowWait,
) -> Result<CancelFlowResult, EngineError> {
let pool = &self.inner.pool;
let pubsub = &self.inner.pubsub;
retry_serializable(|| cancel_flow_impl(pool, pubsub, id, policy)).await
}
#[cfg(feature = "core")]
async fn set_edge_group_policy(
&self,
_flow_id: &FlowId,
_downstream_execution_id: &ExecutionId,
_policy: EdgeDependencyPolicy,
) -> Result<SetEdgeGroupPolicyResult, EngineError> {
unavailable("sqlite.set_edge_group_policy")
}
async fn report_usage(
&self,
_handle: &Handle,
budget: &BudgetId,
dimensions: UsageDimensions,
) -> Result<ReportUsageResult, EngineError> {
crate::budget::report_usage_impl(&self.inner.pool, budget, dimensions).await
}
#[cfg(feature = "core")]
async fn create_budget(
&self,
args: CreateBudgetArgs,
) -> Result<CreateBudgetResult, EngineError> {
crate::budget::create_budget_impl(&self.inner.pool, args).await
}
#[cfg(feature = "core")]
async fn reset_budget(
&self,
args: ResetBudgetArgs,
) -> Result<ResetBudgetResult, EngineError> {
crate::budget::reset_budget_impl(&self.inner.pool, args).await
}
#[cfg(feature = "core")]
async fn create_quota_policy(
&self,
args: CreateQuotaPolicyArgs,
) -> Result<CreateQuotaPolicyResult, EngineError> {
crate::budget::create_quota_policy_impl(&self.inner.pool, args).await
}
#[cfg(feature = "core")]
async fn get_budget_status(
&self,
id: &BudgetId,
) -> Result<BudgetStatus, EngineError> {
crate::budget::get_budget_status_impl(&self.inner.pool, id).await
}
#[cfg(feature = "core")]
async fn report_usage_admin(
&self,
budget_id: &BudgetId,
args: ReportUsageAdminArgs,
) -> Result<ReportUsageResult, EngineError> {
crate::budget::report_usage_admin_impl(&self.inner.pool, budget_id, args).await
}
#[cfg(feature = "core")]
async fn record_spend(
&self,
args: ff_core::contracts::RecordSpendArgs,
) -> Result<ReportUsageResult, EngineError> {
let pool = &self.inner.pool;
retry_serializable(|| crate::typed_ops::record_spend(pool, args.clone())).await
}
#[cfg(feature = "core")]
async fn release_budget(
&self,
args: ff_core::contracts::ReleaseBudgetArgs,
) -> Result<(), EngineError> {
let pool = &self.inner.pool;
retry_serializable(|| crate::typed_ops::release_budget(pool, args.clone())).await
}
#[cfg(feature = "core")]
async fn release_admission(
&self,
args: ff_core::contracts::ReleaseAdmissionArgs,
) -> Result<ff_core::contracts::ReleaseAdmissionResult, EngineError> {
let pool = &self.inner.pool;
retry_serializable(|| crate::typed_ops::release_admission(pool, args.clone())).await
}
#[cfg(feature = "core")]
async fn read_quota_policy_limits(
&self,
quota_policy_id: &ff_core::types::QuotaPolicyId,
) -> Result<Option<ff_core::contracts::QuotaPolicyLimits>, EngineError> {
let pool = &self.inner.pool;
crate::typed_ops::read_quota_policy_limits(pool, quota_policy_id).await
}
#[cfg(feature = "core")]
async fn deliver_approval_signal(
&self,
args: ff_core::contracts::DeliverApprovalSignalArgs,
) -> Result<ff_core::contracts::DeliverSignalResult, EngineError> {
let pool = &self.inner.pool;
let pubsub = &self.inner.pubsub;
retry_serializable(|| {
crate::typed_ops::deliver_approval_signal(pool, pubsub, args.clone())
})
.await
}
#[cfg(feature = "core")]
async fn issue_grant_and_claim(
&self,
args: ff_core::contracts::IssueGrantAndClaimArgs,
) -> Result<ff_core::contracts::ClaimGrantOutcome, EngineError> {
let pool = &self.inner.pool;
let pubsub = &self.inner.pubsub;
retry_serializable(|| crate::typed_ops::issue_grant_and_claim(pool, pubsub, args.clone()))
.await
}
#[cfg(feature = "streaming")]
async fn read_stream(
&self,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
from: StreamCursor,
to: StreamCursor,
count_limit: u64,
) -> Result<StreamFrames, EngineError> {
let pool = &self.inner.pool;
read_stream_impl(pool, execution_id, attempt_index, from, to, count_limit).await
}
#[cfg(feature = "streaming")]
async fn tail_stream(
&self,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
after: StreamCursor,
block_ms: u64,
count_limit: u64,
visibility: TailVisibility,
) -> Result<StreamFrames, EngineError> {
let pool = &self.inner.pool;
let pubsub = &self.inner.pubsub;
tail_stream_impl(
pool,
pubsub,
execution_id,
attempt_index,
after,
block_ms,
count_limit,
visibility,
)
.await
}
#[cfg(feature = "streaming")]
async fn read_summary(
&self,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
) -> Result<Option<SummaryDocument>, EngineError> {
let pool = &self.inner.pool;
read_summary_impl(pool, execution_id, attempt_index).await
}
#[cfg(feature = "core")]
async fn create_execution(
&self,
args: CreateExecutionArgs,
) -> Result<CreateExecutionResult, EngineError> {
let pool = &self.inner.pool;
retry_serializable(|| create_execution_impl(pool, &args)).await
}
#[cfg(feature = "core")]
async fn create_flow(&self, args: CreateFlowArgs) -> Result<CreateFlowResult, EngineError> {
let pool = &self.inner.pool;
retry_serializable(|| create_flow_impl(pool, &args)).await
}
#[cfg(feature = "core")]
async fn add_execution_to_flow(
&self,
args: AddExecutionToFlowArgs,
) -> Result<AddExecutionToFlowResult, EngineError> {
let pool = &self.inner.pool;
retry_serializable(|| add_execution_to_flow_impl(pool, &args)).await
}
#[cfg(feature = "core")]
async fn stage_dependency_edge(
&self,
args: StageDependencyEdgeArgs,
) -> Result<StageDependencyEdgeResult, EngineError> {
let pool = &self.inner.pool;
retry_serializable(|| stage_dependency_edge_impl(pool, &args)).await
}
#[cfg(feature = "core")]
async fn apply_dependency_to_child(
&self,
args: ApplyDependencyToChildArgs,
) -> Result<ApplyDependencyToChildResult, EngineError> {
let pool = &self.inner.pool;
retry_serializable(|| apply_dependency_to_child_impl(pool, &args)).await
}
#[cfg(feature = "core")]
async fn cancel_execution(
&self,
args: CancelExecutionArgs,
) -> Result<CancelExecutionResult, EngineError> {
crate::operator::cancel_execution_impl(&self.inner.pool, &self.inner.pubsub, args).await
}
#[cfg(feature = "core")]
async fn revoke_lease(
&self,
args: RevokeLeaseArgs,
) -> Result<RevokeLeaseResult, EngineError> {
crate::operator::revoke_lease_impl(&self.inner.pool, &self.inner.pubsub, args).await
}
#[cfg(feature = "core")]
async fn change_priority(
&self,
args: ChangePriorityArgs,
) -> Result<ChangePriorityResult, EngineError> {
crate::operator::change_priority_impl(&self.inner.pool, &self.inner.pubsub, args).await
}
#[cfg(feature = "core")]
async fn replay_execution(
&self,
args: ReplayExecutionArgs,
) -> Result<ReplayExecutionResult, EngineError> {
crate::operator::replay_execution_impl(&self.inner.pool, &self.inner.pubsub, args).await
}
#[cfg(feature = "core")]
async fn read_execution_state(
&self,
id: &ExecutionId,
) -> Result<Option<PublicState>, EngineError> {
crate::reads::read_execution_state_impl(&self.inner.pool, id).await
}
#[cfg(feature = "core")]
async fn read_execution_info(
&self,
id: &ExecutionId,
) -> Result<Option<ExecutionInfo>, EngineError> {
crate::reads::read_execution_info_impl(&self.inner.pool, id).await
}
async fn get_execution_result(
&self,
id: &ExecutionId,
) -> Result<Option<Vec<u8>>, EngineError> {
crate::reads::get_execution_result_impl(&self.inner.pool, id).await
}
#[cfg(feature = "core")]
async fn cancel_flow_header(
&self,
args: CancelFlowArgs,
) -> Result<CancelFlowHeader, EngineError> {
crate::operator::cancel_flow_header_impl(&self.inner.pool, &self.inner.pubsub, args).await
}
#[cfg(feature = "core")]
async fn ack_cancel_member(
&self,
flow_id: &FlowId,
execution_id: &ExecutionId,
) -> Result<(), EngineError> {
crate::operator::ack_cancel_member_impl(
&self.inner.pool,
flow_id.clone(),
execution_id.clone(),
)
.await
}
#[cfg(feature = "core")]
async fn list_pending_waitpoints(
&self,
args: ListPendingWaitpointsArgs,
) -> Result<ListPendingWaitpointsResult, EngineError> {
crate::suspend_ops::list_pending_waitpoints_impl(&self.inner.pool, args).await
}
async fn subscribe_completion(
&self,
cursor: ff_core::stream_subscribe::StreamCursor,
filter: &ff_core::backend::ScannerFilter,
) -> Result<ff_core::stream_events::CompletionSubscription, EngineError> {
let pool = self.inner.pool.clone();
let wakeup = self.inner.pubsub.completion.subscribe();
crate::completion_subscribe::subscribe(pool, wakeup, cursor, filter.clone()).await
}
async fn subscribe_lease_history(
&self,
cursor: ff_core::stream_subscribe::StreamCursor,
filter: &ff_core::backend::ScannerFilter,
) -> Result<ff_core::stream_events::LeaseHistorySubscription, EngineError> {
let pool = self.inner.pool.clone();
let wakeup = self.inner.pubsub.lease_history.subscribe();
crate::lease_event_subscribe::subscribe(pool, wakeup, cursor, filter.clone()).await
}
async fn subscribe_signal_delivery(
&self,
cursor: ff_core::stream_subscribe::StreamCursor,
filter: &ff_core::backend::ScannerFilter,
) -> Result<ff_core::stream_events::SignalDeliverySubscription, EngineError> {
let pool = self.inner.pool.clone();
let wakeup = self.inner.pubsub.signal_delivery.subscribe();
crate::signal_delivery_subscribe::subscribe(pool, wakeup, cursor, filter.clone()).await
}
async fn seed_waitpoint_hmac_secret(
&self,
args: SeedWaitpointHmacSecretArgs,
) -> Result<SeedOutcome, EngineError> {
let pool = &self.inner.pool;
retry_serializable(|| {
crate::suspend_ops::seed_waitpoint_hmac_secret_impl(pool, args.clone())
})
.await
}
async fn rotate_waitpoint_hmac_secret_all(
&self,
args: RotateWaitpointHmacSecretAllArgs,
) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
let pool = &self.inner.pool;
retry_serializable(|| {
crate::suspend_ops::rotate_waitpoint_hmac_secret_all_impl(pool, args.clone())
})
.await
}
fn backend_label(&self) -> &'static str {
"sqlite"
}
fn capabilities(&self) -> Capabilities {
Capabilities::new(
BackendIdentity::new(
"sqlite",
Version::new(
env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap_or(0),
env!("CARGO_PKG_VERSION_MINOR").parse().unwrap_or(0),
env!("CARGO_PKG_VERSION_PATCH").parse().unwrap_or(0),
),
"Phase-4",
),
sqlite_supports_base(),
)
}
async fn prepare(&self) -> Result<PrepareOutcome, EngineError> {
Ok(PrepareOutcome::NoOp)
}
async fn read_exec_core_fields(
&self,
partition: ff_core::partition::Partition,
execution_id: &ff_core::types::ExecutionId,
fields: &[&str],
) -> Result<std::collections::HashMap<String, Option<String>>, EngineError> {
if fields.is_empty() {
return Ok(std::collections::HashMap::new());
}
let (part, exec_uuid) = split_exec_id(execution_id)?;
if part as u16 != partition.index {
return Err(EngineError::Validation {
kind: ff_core::engine_error::ValidationKind::InvalidInput,
detail: format!(
"read_exec_core_fields: partition mismatch (arg={}, eid={})",
partition.index, part
),
});
}
let mut projections: Vec<String> = Vec::with_capacity(fields.len());
for field in fields {
let expr = match *field {
"lane_id" | "lifecycle_phase" | "ownership_state" | "eligibility_state"
| "public_state" | "attempt_state" | "blocking_reason" | "cancellation_reason"
| "cancelled_by" => format!("CAST({field} AS TEXT)"),
"attempt_index" => "CAST(attempt_index AS TEXT)".to_string(),
"flow_id" => "CAST(flow_id AS TEXT)".to_string(),
"priority" => "CAST(priority AS TEXT)".to_string(),
"created_at_ms" => "CAST(created_at_ms AS TEXT)".to_string(),
"terminal_at_ms" => "CAST(terminal_at_ms AS TEXT)".to_string(),
"deadline_at_ms" => "CAST(deadline_at_ms AS TEXT)".to_string(),
"current_attempt_index" => "CAST(attempt_index AS TEXT)".to_string(),
"completed_at" => "CAST(terminal_at_ms AS TEXT)".to_string(),
"cancel_reason" => "CAST(cancellation_reason AS TEXT)".to_string(),
"required_capabilities" => {
"(SELECT group_concat(capability, ',') \
FROM ff_execution_capabilities \
WHERE execution_id = ff_exec_core.execution_id)"
.to_string()
}
other => match other {
"current_waitpoint_id"
| "current_worker_instance_id"
| "budget_ids"
| "quota_policy_id" => {
format!("json_extract(raw_fields, '$.{other}')")
}
_ => "NULL".to_string(),
},
};
projections.push(expr);
}
let projection_sql = projections.join(", ");
let query = format!(
"SELECT {projection_sql} FROM ff_exec_core \
WHERE partition_key = ?1 AND execution_id = ?2"
);
let row_opt = sqlx::query(&query)
.bind(part)
.bind(exec_uuid)
.fetch_optional(self.pool())
.await
.map_err(|e| EngineError::Transport {
backend: "sqlite",
source: format!("read_exec_core_fields: {e}").into(),
})?;
let mut out = std::collections::HashMap::with_capacity(fields.len());
if let Some(row) = row_opt {
use sqlx::Row;
for (idx, field) in fields.iter().enumerate() {
let val: Option<String> =
row.try_get(idx).map_err(|e| EngineError::Transport {
backend: "sqlite",
source: format!("read_exec_core_fields[{field}]: {e}").into(),
})?;
out.insert((*field).to_string(), val);
}
} else {
for field in fields {
out.insert((*field).to_string(), None);
}
}
Ok(out)
}
async fn server_time_ms(&self) -> Result<u64, EngineError> {
let ms: i64 = sqlx::query_scalar(
"SELECT CAST((julianday('now') - 2440587.5) * 86400000 AS INTEGER)",
)
.fetch_one(self.pool())
.await
.map_err(|e| EngineError::Transport {
backend: "sqlite",
source: format!("server_time_ms: {e}").into(),
})?;
if ms < 0 {
return Err(EngineError::Transport {
backend: "sqlite",
source: "server_time_ms: negative epoch".into(),
});
}
Ok(ms as u64)
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "sqlite.register_worker", skip_all)]
async fn register_worker(
&self,
args: ff_core::contracts::RegisterWorkerArgs,
) -> Result<ff_core::contracts::RegisterWorkerOutcome, EngineError> {
crate::worker_registry::register_worker(&self.inner.pool, args).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "sqlite.heartbeat_worker", skip_all)]
async fn heartbeat_worker(
&self,
args: ff_core::contracts::HeartbeatWorkerArgs,
) -> Result<ff_core::contracts::HeartbeatWorkerOutcome, EngineError> {
crate::worker_registry::heartbeat_worker(&self.inner.pool, args).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "sqlite.mark_worker_dead", skip_all)]
async fn mark_worker_dead(
&self,
args: ff_core::contracts::MarkWorkerDeadArgs,
) -> Result<ff_core::contracts::MarkWorkerDeadOutcome, EngineError> {
crate::worker_registry::mark_worker_dead(&self.inner.pool, args).await
}
#[cfg(all(feature = "core", feature = "suspension"))]
#[tracing::instrument(name = "sqlite.list_expired_leases", skip_all)]
async fn list_expired_leases(
&self,
args: ff_core::contracts::ListExpiredLeasesArgs,
) -> Result<ff_core::contracts::ListExpiredLeasesResult, EngineError> {
crate::worker_registry::list_expired_leases(&self.inner.pool, args).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "sqlite.list_workers", skip_all)]
async fn list_workers(
&self,
args: ff_core::contracts::ListWorkersArgs,
) -> Result<ff_core::contracts::ListWorkersResult, EngineError> {
crate::worker_registry::list_workers(&self.inner.pool, args).await
}
}
#[cfg(test)]
mod tests {
use super::is_memory_uri;
#[test]
fn is_memory_detects_all_uri_forms() {
assert!(is_memory_uri(":memory:"));
assert!(is_memory_uri("file::memory:"));
assert!(is_memory_uri("file::memory:?cache=shared"));
assert!(is_memory_uri(
"file:ff-test-abc123?mode=memory&cache=shared"
));
assert!(is_memory_uri(
"file:ff-test-00000000-0000-0000-0000-000000000000?mode=memory&cache=shared"
));
assert!(!is_memory_uri("/tmp/ff.sqlite"));
assert!(!is_memory_uri("./ff.sqlite"));
assert!(!is_memory_uri("file:/tmp/ff.sqlite"));
assert!(!is_memory_uri("file:ff-test?cache=shared"));
assert!(!is_memory_uri("file:my_mode=memory_db.sqlite"));
assert!(is_memory_uri("file:ff-test?cache=shared&mode=memory"));
}
#[test]
fn is_memory_uri_rejects_filename_with_mode_memory() {
assert!(!is_memory_uri("file:my_mode=memory_db.sqlite"));
assert!(!is_memory_uri("file:foo?mode=memory_extra"));
}
#[test]
fn is_memory_uri_accepts_query_param() {
assert!(is_memory_uri("file:test?mode=memory"));
}
#[test]
fn is_memory_uri_accepts_shared_cache_form() {
assert!(is_memory_uri(
"file:ff-test-00000000-0000-0000-0000-000000000000?mode=memory&cache=shared"
));
}
#[test]
fn is_memory_uri_rejects_plain_file() {
assert!(!is_memory_uri("file:./data.db"));
}
}