Skip to main content

ff_backend_sqlite/
backend.rs

1//! `SqliteBackend` — RFC-023 dev-only SQLite [`EngineBackend`] impl.
2//!
3//! Phase 1a lands the scaffolding: construction guard, registry
4//! dedup, pool setup, WARN banner, and Unavailable stubs for every
5//! required trait method. Phase 2+ progressively replaces the stubs
6//! with real bodies paralleling `ff-backend-postgres`.
7
8use std::path::PathBuf;
9use std::sync::Arc;
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12use async_trait::async_trait;
13use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
14use sqlx::{Row, SqlitePool};
15use uuid::Uuid;
16
17use ff_core::backend::PrepareOutcome;
18use ff_core::backend::{
19    AppendFrameOutcome, CancelFlowPolicy, CancelFlowWait, CapabilitySet, ClaimPolicy, FailOutcome,
20    FailureClass, FailureReason, Frame, FrameKind, Handle, HandleKind, LeaseRenewal, PatchKind,
21    PendingWaitpoint, ResumeToken, ResumeSignal, SUMMARY_NULL_SENTINEL, StreamMode,
22    UsageDimensions,
23};
24#[cfg(feature = "streaming")]
25use ff_core::backend::{SummaryDocument, TailVisibility};
26use ff_core::capability::{BackendIdentity, Capabilities, Supports, Version};
27use ff_core::caps::{CapabilityRequirement, matches as caps_matches};
28use ff_core::contracts::{
29    BudgetStatus, CancelFlowResult, CreateBudgetArgs, CreateBudgetResult, CreateQuotaPolicyArgs,
30    CreateQuotaPolicyResult, ExecutionContext, ExecutionSnapshot, FlowSnapshot, IssueReclaimGrantArgs,
31    IssueReclaimGrantOutcome, ReclaimExecutionArgs, ReclaimExecutionOutcome, ReportUsageAdminArgs,
32    ReportUsageResult, ResetBudgetArgs, ResetBudgetResult, RotateWaitpointHmacSecretAllArgs,
33    RotateWaitpointHmacSecretAllResult, SeedOutcome, SeedWaitpointHmacSecretArgs, SuspendArgs,
34    SuspendOutcome,
35};
36#[cfg(feature = "core")]
37use ff_core::contracts::{
38    ClaimResumedExecutionArgs, ClaimResumedExecutionResult, DeliverSignalArgs, DeliverSignalResult,
39    EdgeDependencyPolicy, EdgeDirection, EdgeSnapshot, ListExecutionsPage, ListFlowsPage,
40    ListLanesPage, ListSuspendedPage, SetEdgeGroupPolicyResult,
41};
42#[cfg(feature = "streaming")]
43use ff_core::contracts::{STREAM_READ_HARD_CAP, StreamCursor, StreamFrame, StreamFrames};
44use ff_core::engine_backend::EngineBackend;
45use ff_core::engine_error::{BackendError, ContentionKind, EngineError, ValidationKind};
46use ff_core::handle_codec::HandlePayload;
47use ff_core::types::{AttemptId, AttemptIndex, LeaseEpoch, LeaseFence, LeaseId};
48
49use crate::errors::map_sqlx_error;
50use crate::handle_codec::{decode_handle, encode_handle};
51use crate::queries::{
52    attempt as q_attempt, dispatch as q_dispatch, exec_core as q_exec, flow as q_flow,
53    flow_staging as q_flow_staging, lease as q_lease, stream as q_stream,
54};
55use crate::retry::retry_serializable;
56#[cfg(feature = "core")]
57use ff_core::partition::PartitionKey;
58#[cfg(feature = "core")]
59use ff_core::types::EdgeId;
60use ff_core::types::{BudgetId, ExecutionId, FlowId, LaneId, TimestampMs};
61
62use crate::pubsub::{OutboxEvent, PubSub};
63use crate::registry;
64#[cfg(feature = "core")]
65use ff_core::contracts::{
66    AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
67    ApplyDependencyToChildResult, CancelExecutionArgs, CancelExecutionResult, CancelFlowArgs,
68    CancelFlowHeader, ChangePriorityArgs, ChangePriorityResult, CreateExecutionArgs,
69    CreateExecutionResult, CreateFlowArgs, CreateFlowResult, ExecutionInfo,
70    ListPendingWaitpointsArgs, ListPendingWaitpointsResult, ReplayExecutionArgs,
71    ReplayExecutionResult, RevokeLeaseArgs, RevokeLeaseResult, StageDependencyEdgeArgs,
72    StageDependencyEdgeResult,
73};
74#[cfg(feature = "core")]
75use ff_core::state::PublicState;
76use tokio::sync::broadcast;
77
78/// Phase-1a-wide `Unavailable` helper. Each stubbed method names
79/// itself here so call-site errors carry a stable identifier.
80#[inline]
81fn unavailable<T>(op: &'static str) -> Result<T, EngineError> {
82    Err(EngineError::Unavailable { op })
83}
84
85// ── Phase 2b.1: post-commit broadcast emit support ─────────────────────
86
87/// Enum selector for the 5 broadcast channels. Inner transaction bodies
88/// accumulate `(OutboxChannel, OutboxEvent)` pairs in a `Vec` and the
89/// outer wrapper dispatches them AFTER `tx.commit()` succeeds. This
90/// preserves the RFC-023 §4.2 ordering invariant: broadcast wakeup
91/// fires only for events that genuinely committed.
92#[derive(Clone, Copy, Debug)]
93pub(crate) enum OutboxChannel {
94    LeaseHistory,
95    Completion,
96    #[allow(dead_code)] // wired in Phase 2b.2 deliver_signal
97    SignalDelivery,
98    StreamFrame,
99    #[allow(dead_code)] // wired in Phase 2b.2 operator ops
100    OperatorEvent,
101}
102
103/// A pending post-commit broadcast emit. See [`OutboxChannel`].
104pub(crate) type PendingEmit = (OutboxChannel, OutboxEvent);
105
106/// Dispatch every pending emit via the appropriate broadcast channel.
107/// Called AFTER `tx.commit()` returns OK so consumers only observe
108/// wakeups for genuinely-committed events.
109fn dispatch_pending_emits(pubsub: &PubSub, emits: &[PendingEmit]) {
110    for (channel, ev) in emits {
111        let sender: &broadcast::Sender<OutboxEvent> = match channel {
112            OutboxChannel::LeaseHistory => &pubsub.lease_history,
113            OutboxChannel::Completion => &pubsub.completion,
114            OutboxChannel::SignalDelivery => &pubsub.signal_delivery,
115            OutboxChannel::StreamFrame => &pubsub.stream_frame,
116            OutboxChannel::OperatorEvent => &pubsub.operator_event,
117        };
118        PubSub::emit(sender, ev.clone());
119    }
120}
121
122/// Read `last_insert_rowid()` inside the open txn and turn it into
123/// an [`OutboxEvent`]. SQLite's AUTOINCREMENT outbox tables use the
124/// rowid alias as the `event_id`, so this read is correct for every
125/// outbox table defined under `migrations/000{1,6,7,10}_*.sql`.
126async fn last_outbox_event(
127    conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
128    partition_key: i64,
129) -> Result<OutboxEvent, EngineError> {
130    let event_id: i64 = sqlx::query_scalar("SELECT last_insert_rowid()")
131        .fetch_one(&mut **conn)
132        .await
133        .map_err(map_sqlx_error)?;
134    Ok(OutboxEvent {
135        event_id,
136        partition_key,
137    })
138}
139
140// ── Phase 2a.2 helpers: hot-path shared logic ──────────────────────────
141
142/// Classify a sqlite path/URI as in-memory (RFC-023 §4.6).
143///
144/// Matches the three on-disk-free forms the backend supports:
145///   - bare `":memory:"` (rewritten internally to a shared-cache URI)
146///   - `"file::memory:..."` (the short-form shared-cache URI)
147///   - `"file:<name>?...mode=memory..."` (the §4.6-recommended named
148///     form, e.g. `file:ff-test-<uuid>?mode=memory&cache=shared`)
149///
150/// The `mode=memory` check parses the URI query string and requires
151/// an exact `mode=memory` `key=value` pair (delimited by `?`/`&`,
152/// terminated by `&`/`#`/end-of-string). A substring-only check
153/// would mis-classify filesystem paths whose filename happens to
154/// contain the substring (e.g. `file:my_mode=memory_db.sqlite`) or
155/// a longer value (`?mode=memory_extra`) as in-memory.
156///
157/// A #372 miss on the third form caused `is_memory = false` for the
158/// §4.6 test-isolation URIs: WAL mode was applied inappropriately
159/// and no sentinel connection was held, so pool-idle cycles dropped
160/// the shared cache mid-test.
161fn is_memory_uri(path: &str) -> bool {
162    if path == ":memory:" || path.starts_with("file::memory:") {
163        return true;
164    }
165    if !path.starts_with("file:") {
166        return false;
167    }
168    // Require `mode=memory` to appear as a real URI query parameter —
169    // a `key=value` pair delimited by `?` or `&`, ending at `&`, `#`,
170    // or end-of-string. A substring-only check would mis-classify
171    // filesystem paths whose filename happens to contain
172    // `mode=memory` (e.g. `file:my_mode=memory_db.sqlite`) or a
173    // longer value like `?mode=memory_extra`.
174    let Some(query_start) = path.find('?') else {
175        return false;
176    };
177    let query = &path[query_start + 1..];
178    // Strip URI fragment before splitting pairs.
179    let query = query.split('#').next().unwrap_or("");
180    query.split('&').any(|kv| kv == "mode=memory")
181}
182
183/// Unix-millis wall clock. Matches the PG reference shape
184/// (`ff-backend-postgres/src/attempt.rs:55-63`); SQLite stores the
185/// same `*_ms` fields so the value is directly comparable.
186fn now_ms() -> i64 {
187    i64::try_from(
188        SystemTime::now()
189            .duration_since(UNIX_EPOCH)
190            .map(|d| d.as_millis())
191            .unwrap_or(0),
192    )
193    .unwrap_or(i64::MAX)
194}
195
196/// Decompose an [`ff_core::types::ExecutionId`] formatted `{fp:N}:<uuid>`
197/// into `(partition_index, uuid_bytes)` — SQLite stores the UUID as a
198/// 16-byte `BLOB` (§4.1) so we bind via `uuid::Uuid`.
199pub(crate) fn split_exec_id(eid: &ff_core::types::ExecutionId) -> Result<(i64, Uuid), EngineError> {
200    let s = eid.as_str();
201    let rest = s
202        .strip_prefix("{fp:")
203        .ok_or_else(|| EngineError::Validation {
204            kind: ValidationKind::InvalidInput,
205            detail: format!("execution_id missing `{{fp:` prefix: {s}"),
206        })?;
207    let close = rest.find("}:").ok_or_else(|| EngineError::Validation {
208        kind: ValidationKind::InvalidInput,
209        detail: format!("execution_id missing `}}:`: {s}"),
210    })?;
211    let part: i64 = rest[..close].parse().map_err(|_| EngineError::Validation {
212        kind: ValidationKind::InvalidInput,
213        detail: format!("execution_id partition index not u16: {s}"),
214    })?;
215    let uuid = Uuid::parse_str(&rest[close + 2..]).map_err(|_| EngineError::Validation {
216        kind: ValidationKind::InvalidInput,
217        detail: format!("execution_id UUID invalid: {s}"),
218    })?;
219    Ok((part, uuid))
220}
221
222/// Acquire a pooled connection and issue `BEGIN IMMEDIATE`, escalating
223/// the txn to RESERVED so §4.1 A3's single-writer invariant holds for
224/// the full read-modify-write window.
225///
226/// The caller MUST arrange an explicit `commit()` on success and a
227/// `rollback_quiet()` on every error path. Use
228/// [`commit_or_rollback`] as the single tail-call so a `COMMIT`
229/// failure deterministically rolls back — otherwise a half-open txn
230/// could return to the pool and poison a later borrower.
231///
232/// sqlx's `Transaction` abstraction opens a plain `BEGIN` on SQLite
233/// (no `IMMEDIATE` escalation on the public API today); we manage
234/// the lock here manually and the per-op helpers in this file close
235/// the rollback loop.
236pub(crate) async fn begin_immediate(
237    pool: &SqlitePool,
238) -> Result<sqlx::pool::PoolConnection<sqlx::Sqlite>, EngineError> {
239    let mut conn = pool.acquire().await.map_err(map_sqlx_error)?;
240    sqlx::query("BEGIN IMMEDIATE")
241        .execute(&mut *conn)
242        .await
243        .map_err(map_sqlx_error)?;
244    Ok(conn)
245}
246
247/// Commit the pending txn; on `COMMIT` failure issue a best-effort
248/// `ROLLBACK` so the connection is returned to the pool in a clean
249/// state (otherwise a pool-reuse borrower observes a half-open txn).
250/// A secondary rollback error is swallowed — SQLite auto-rolls-back
251/// on connection close, which happens when the pool drops an
252/// unhealthy connection, so correctness is preserved.
253pub(crate) async fn commit_or_rollback(
254    conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
255) -> Result<(), EngineError> {
256    if let Err(e) = sqlx::query("COMMIT")
257        .execute(&mut **conn)
258        .await
259        .map_err(map_sqlx_error)
260    {
261        let _ = sqlx::query("ROLLBACK").execute(&mut **conn).await;
262        return Err(e);
263    }
264    Ok(())
265}
266
267/// Best-effort rollback on an error path. A failed rollback is
268/// swallowed so the original error surfaces unchanged.
269pub(crate) async fn rollback_quiet(conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>) {
270    let _ = sqlx::query("ROLLBACK").execute(&mut **conn).await;
271}
272
273/// Fence check: under the `BEGIN IMMEDIATE` lock, read the attempt
274/// row's `lease_epoch` and compare against the handle-embedded epoch.
275/// Mismatch ⇒ [`ContentionKind::LeaseConflict`] (terminal for this
276/// call; caller does not retry a fence mismatch).
277async fn fence_check(
278    conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
279    part: i64,
280    exec_uuid: Uuid,
281    attempt_index: i64,
282    expected_epoch: u64,
283) -> Result<(), EngineError> {
284    let row = sqlx::query(q_attempt::SELECT_ATTEMPT_EPOCH_SQL)
285        .bind(part)
286        .bind(exec_uuid)
287        .bind(attempt_index)
288        .fetch_optional(&mut **conn)
289        .await
290        .map_err(map_sqlx_error)?;
291    let Some(row) = row else {
292        return Err(EngineError::NotFound { entity: "attempt" });
293    };
294    let epoch_i: i64 = row.try_get("lease_epoch").map_err(map_sqlx_error)?;
295    let observed = u64::try_from(epoch_i).unwrap_or(0);
296    if observed != expected_epoch {
297        return Err(EngineError::Contention(ContentionKind::LeaseConflict));
298    }
299    Ok(())
300}
301
302// ── Phase 2a.2 hot-path bodies ─────────────────────────────────────────
303
304async fn claim_impl(
305    pool: &SqlitePool,
306    pubsub: &PubSub,
307    lane: &ff_core::types::LaneId,
308    capabilities: &CapabilitySet,
309    policy: &ClaimPolicy,
310) -> Result<Option<Handle>, EngineError> {
311    // RFC-023 §4.1 A3: SQLite is single-writer with
312    // `num_flow_partitions = 1`, so we scan only partition 0 rather
313    // than iterating 0..256 as the PG path does.
314    let part: i64 = 0;
315
316    let mut conn = begin_immediate(pool).await?;
317    let result = claim_inner(&mut conn, part, lane, capabilities, policy).await;
318    match result {
319        Ok(Some((handle, emits))) => {
320            commit_or_rollback(&mut conn).await?;
321            dispatch_pending_emits(pubsub, &emits);
322            Ok(Some(handle))
323        }
324        Ok(None) => {
325            rollback_quiet(&mut conn).await;
326            Ok(None)
327        }
328        Err(e) => {
329            rollback_quiet(&mut conn).await;
330            Err(e)
331        }
332    }
333}
334
335/// Inside-txn body of [`claim_impl`] — any `?` short-circuit surfaces
336/// to the caller which guarantees `rollback_quiet` via
337/// [`claim_impl`]'s match arms.
338async fn claim_inner(
339    conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
340    part: i64,
341    lane: &ff_core::types::LaneId,
342    capabilities: &CapabilitySet,
343    policy: &ClaimPolicy,
344) -> Result<Option<(Handle, Vec<PendingEmit>)>, EngineError> {
345    // Scan up to CAP_SCAN_BATCH eligible rows in priority order and
346    // walk until we find the first capability-satisfying one. Under
347    // §4.1 A3 SQLite runs on a single partition, so a high-priority
348    // row whose required caps the worker lacks would starve
349    // downstream-priority matches if we only inspected the top
350    // candidate (caught in PR-375 review). Bounded scan budget keeps
351    // the lock window predictable.
352    const CAP_SCAN_BATCH: i64 = 16;
353
354    let candidate_rows = sqlx::query(q_attempt::SELECT_ELIGIBLE_EXEC_SQL)
355        .bind(part)
356        .bind(lane.as_str())
357        .bind(CAP_SCAN_BATCH)
358        .fetch_all(&mut **conn)
359        .await
360        .map_err(map_sqlx_error)?;
361
362    if candidate_rows.is_empty() {
363        return Ok(None);
364    }
365
366    let mut claimable: Option<(Uuid, i64)> = None;
367    for row in &candidate_rows {
368        let exec_uuid: Uuid = row.try_get("execution_id").map_err(map_sqlx_error)?;
369        let attempt_index_i: i64 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
370
371        // Capability subset check (§4.1 A4): junction table read +
372        // Rust-side `caps::matches`. Same post-lock Rust match as the
373        // PG path at `ff-backend-postgres/src/attempt.rs:170-182`.
374        let cap_rows = sqlx::query(q_attempt::SELECT_EXEC_CAPABILITIES_SQL)
375            .bind(exec_uuid)
376            .fetch_all(&mut **conn)
377            .await
378            .map_err(map_sqlx_error)?;
379        let tokens: Vec<String> = cap_rows
380            .iter()
381            .map(|r| r.try_get::<String, _>("capability"))
382            .collect::<Result<Vec<_>, _>>()
383            .map_err(map_sqlx_error)?;
384        let req = CapabilityRequirement::new(tokens);
385        if caps_matches(&req, capabilities) {
386            claimable = Some((exec_uuid, attempt_index_i));
387            break;
388        }
389    }
390
391    let Some((exec_uuid, attempt_index_i)) = claimable else {
392        // Every candidate in the batch required a capability the
393        // worker lacks; surface `None` so the caller's retry cadence
394        // re-enters later. A different-caps worker takes the batch
395        // when it claims.
396        return Ok(None);
397    };
398
399    let now = now_ms();
400    let lease_ttl_ms = i64::from(policy.lease_ttl_ms);
401    let expires = now.saturating_add(lease_ttl_ms);
402
403    // UPSERT the attempt row. `RETURNING lease_epoch` round-trips
404    // the post-UPSERT epoch in one statement (SQLite >= 3.35).
405    let epoch_row = sqlx::query(q_attempt::UPSERT_ATTEMPT_ON_CLAIM_SQL)
406        .bind(part)
407        .bind(exec_uuid)
408        .bind(attempt_index_i)
409        .bind(policy.worker_id.as_str())
410        .bind(policy.worker_instance_id.as_str())
411        .bind(expires)
412        .bind(now)
413        .fetch_one(&mut **conn)
414        .await
415        .map_err(map_sqlx_error)?;
416    let epoch_i: i64 = epoch_row.try_get("lease_epoch").map_err(map_sqlx_error)?;
417
418    // #356: `started_at_ms` is set-once on ff_exec_core (migration
419    // 0016); COALESCE preserves the first-claim timestamp across
420    // reclaim + retry attempts, matching Valkey's dedicated
421    // `exec_core["started_at"]` semantics.
422    sqlx::query(q_exec::UPDATE_EXEC_CORE_CLAIM_SQL)
423        .bind(part)
424        .bind(exec_uuid)
425        .bind(now)
426        .execute(&mut **conn)
427        .await
428        .map_err(map_sqlx_error)?;
429
430    // RFC-019 Stage B outbox parity (PG reference at
431    // `ff-backend-postgres/src/lease_event.rs`): record a lease
432    // lifecycle event so a later `subscribe_lease_history` reader
433    // observes the acquisition. Post-commit broadcast emit wired in
434    // Phase 2b.1 per RFC-023 §4.2.
435    let mut emits: Vec<PendingEmit> = Vec::new();
436    let ev = insert_lease_event(conn, part, exec_uuid, "acquired", now).await?;
437    emits.push((OutboxChannel::LeaseHistory, ev));
438
439    let attempt_index = AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0));
440    let exec_id = ff_core::types::ExecutionId::parse(&format!("{{fp:{part}}}:{exec_uuid}"))
441        .map_err(|e| EngineError::Validation {
442            kind: ValidationKind::InvalidInput,
443            detail: format!("reassembling exec id: {e}"),
444        })?;
445    let payload = HandlePayload::new(
446        exec_id,
447        attempt_index,
448        AttemptId::new(),
449        LeaseId::new(),
450        LeaseEpoch(u64::try_from(epoch_i).unwrap_or(1)),
451        u64::from(policy.lease_ttl_ms),
452        lane.clone(),
453        policy.worker_instance_id.clone(),
454    );
455    Ok(Some((encode_handle(&payload, HandleKind::Fresh), emits)))
456}
457
458async fn complete_impl(
459    pool: &SqlitePool,
460    pubsub: &PubSub,
461    handle: &Handle,
462    payload_bytes: Option<Vec<u8>>,
463) -> Result<(), EngineError> {
464    let payload = decode_handle(handle)?;
465    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
466    let attempt_index = i64::from(payload.attempt_index.0);
467    let expected_epoch = payload.lease_epoch.0;
468
469    let mut conn = begin_immediate(pool).await?;
470    let result = complete_inner(
471        &mut conn,
472        part,
473        exec_uuid,
474        attempt_index,
475        expected_epoch,
476        payload_bytes,
477    )
478    .await;
479    match result {
480        Ok(emits) => {
481            commit_or_rollback(&mut conn).await?;
482            dispatch_pending_emits(pubsub, &emits);
483            Ok(())
484        }
485        Err(e) => {
486            rollback_quiet(&mut conn).await;
487            Err(e)
488        }
489    }
490}
491
492async fn complete_inner(
493    conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
494    part: i64,
495    exec_uuid: Uuid,
496    attempt_index: i64,
497    expected_epoch: u64,
498    payload_bytes: Option<Vec<u8>>,
499) -> Result<Vec<PendingEmit>, EngineError> {
500    fence_check(conn, part, exec_uuid, attempt_index, expected_epoch).await?;
501    let now = now_ms();
502
503    sqlx::query(q_attempt::UPDATE_ATTEMPT_COMPLETE_SQL)
504        .bind(now)
505        .bind(part)
506        .bind(exec_uuid)
507        .bind(attempt_index)
508        .execute(&mut **conn)
509        .await
510        .map_err(map_sqlx_error)?;
511
512    sqlx::query(q_exec::UPDATE_EXEC_CORE_COMPLETE_SQL)
513        .bind(now)
514        .bind(payload_bytes.as_deref())
515        .bind(part)
516        .bind(exec_uuid)
517        .execute(&mut **conn)
518        .await
519        .map_err(map_sqlx_error)?;
520
521    let mut emits: Vec<PendingEmit> = Vec::new();
522    let completion_ev = insert_completion_event_ev(conn, part, exec_uuid, "success", now).await?;
523    emits.push((OutboxChannel::Completion, completion_ev));
524
525    let lease_ev = insert_lease_event(conn, part, exec_uuid, "revoked", now).await?;
526    emits.push((OutboxChannel::LeaseHistory, lease_ev));
527    Ok(emits)
528}
529
530/// Classify whether a `fail()` call reschedules a retry or transitions
531/// to terminal. Mirrors the PG reference behaviour
532/// (`ff-backend-postgres/src/attempt.rs:622-626` — Transient /
533/// InfraCrash → retry; Permanent / Timeout / Cancelled → terminal),
534/// and handles the `#[non_exhaustive]` catch-all by defaulting future
535/// variants to the **least-destructive** retry path per the project's
536/// non-exhaustive-enum rule: terminal-failed is irreversible, so an
537/// unknown classification MUST NOT silently burn the attempt.
538fn classify_retryable(classification: FailureClass) -> bool {
539    match classification {
540        FailureClass::Transient | FailureClass::InfraCrash => true,
541        FailureClass::Permanent | FailureClass::Timeout | FailureClass::Cancelled => false,
542        // #[non_exhaustive]: unknown future variant → retry (least
543        // destructive). A deliberate terminal variant is fine to add
544        // here alongside Permanent in a follow-up PR; defaulting
545        // unknowns to terminal would regress outcomes on backend
546        // upgrades where a new variant lands before this classifier
547        // is taught about it.
548        _ => true,
549    }
550}
551
552async fn fail_impl(
553    pool: &SqlitePool,
554    pubsub: &PubSub,
555    handle: &Handle,
556    reason: FailureReason,
557    classification: FailureClass,
558) -> Result<FailOutcome, EngineError> {
559    let payload = decode_handle(handle)?;
560    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
561    let attempt_index = i64::from(payload.attempt_index.0);
562    let expected_epoch = payload.lease_epoch.0;
563    let retryable = classify_retryable(classification);
564
565    let mut conn = begin_immediate(pool).await?;
566    let result = fail_inner(
567        &mut conn,
568        part,
569        exec_uuid,
570        attempt_index,
571        expected_epoch,
572        retryable,
573        &reason,
574        classification,
575    )
576    .await;
577    match result {
578        Ok((outcome, emits)) => {
579            commit_or_rollback(&mut conn).await?;
580            dispatch_pending_emits(pubsub, &emits);
581            Ok(outcome)
582        }
583        Err(e) => {
584            rollback_quiet(&mut conn).await;
585            Err(e)
586        }
587    }
588}
589
590#[allow(clippy::too_many_arguments)] // every arg is load-bearing attempt state
591async fn fail_inner(
592    conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
593    part: i64,
594    exec_uuid: Uuid,
595    attempt_index: i64,
596    expected_epoch: u64,
597    retryable: bool,
598    reason: &FailureReason,
599    classification: FailureClass,
600) -> Result<(FailOutcome, Vec<PendingEmit>), EngineError> {
601    fence_check(conn, part, exec_uuid, attempt_index, expected_epoch).await?;
602    let now = now_ms();
603    let mut emits: Vec<PendingEmit> = Vec::new();
604
605    if retryable {
606        sqlx::query(q_attempt::UPDATE_ATTEMPT_FAIL_RETRY_SQL)
607            .bind(now)
608            .bind(part)
609            .bind(exec_uuid)
610            .bind(attempt_index)
611            .execute(&mut **conn)
612            .await
613            .map_err(map_sqlx_error)?;
614
615        sqlx::query(q_exec::UPDATE_EXEC_CORE_FAIL_RETRY_SQL)
616            .bind(&reason.message)
617            .bind(part)
618            .bind(exec_uuid)
619            .execute(&mut **conn)
620            .await
621            .map_err(map_sqlx_error)?;
622
623        let lease_ev = insert_lease_event(conn, part, exec_uuid, "revoked", now).await?;
624        emits.push((OutboxChannel::LeaseHistory, lease_ev));
625        // Log the transient failure so operators tracing a retry loop
626        // can correlate cause without re-reading the attempt row
627        // themselves (Gemini review #1).
628        tracing::warn!(
629            error.message = %reason.message,
630            classification = ?classification,
631            execution_id = %exec_uuid,
632            attempt_index = attempt_index,
633            "sqlite.fail: scheduling retry"
634        );
635        Ok((
636            FailOutcome::RetryScheduled {
637                delay_until: ff_core::types::TimestampMs::from_millis(now),
638            },
639            emits,
640        ))
641    } else {
642        sqlx::query(q_attempt::UPDATE_ATTEMPT_FAIL_TERMINAL_SQL)
643            .bind(now)
644            .bind(part)
645            .bind(exec_uuid)
646            .bind(attempt_index)
647            .execute(&mut **conn)
648            .await
649            .map_err(map_sqlx_error)?;
650
651        sqlx::query(q_exec::UPDATE_EXEC_CORE_FAIL_TERMINAL_SQL)
652            .bind(now)
653            .bind(&reason.message)
654            .bind(part)
655            .bind(exec_uuid)
656            .execute(&mut **conn)
657            .await
658            .map_err(map_sqlx_error)?;
659
660        let completion_ev =
661            insert_completion_event_ev(conn, part, exec_uuid, "failed", now).await?;
662        emits.push((OutboxChannel::Completion, completion_ev));
663
664        let lease_ev = insert_lease_event(conn, part, exec_uuid, "revoked", now).await?;
665        emits.push((OutboxChannel::LeaseHistory, lease_ev));
666        Ok((FailOutcome::TerminalFailed, emits))
667    }
668}
669
670/// Emit one RFC-019 Stage B lease-lifecycle outbox row + return the
671/// generated outbox `event_id` wrapped in an [`OutboxEvent`] for the
672/// caller to queue as a post-commit broadcast.
673///
674/// Mirrors `ff-backend-postgres/src/lease_event.rs`. The PG
675/// `pg_notify` trigger is dropped per RFC-023 §4.2 — broadcast moves
676/// into the Rust post-commit dispatch landed in Phase 2b.1; durable
677/// replay via `event_id > cursor` continues to ride against this
678/// table.
679pub(crate) async fn insert_lease_event(
680    conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
681    part: i64,
682    exec_uuid: Uuid,
683    event_type: &str,
684    now: i64,
685) -> Result<OutboxEvent, EngineError> {
686    sqlx::query(q_dispatch::INSERT_LEASE_EVENT_SQL)
687        .bind(exec_uuid.to_string())
688        .bind(event_type)
689        .bind(now)
690        .bind(part)
691        // BLOB bind for the co-transactional exec_core lookup that
692        // back-fills namespace + instance_tag (Phase 3.2 fix).
693        .bind(exec_uuid)
694        .execute(&mut **conn)
695        .await
696        .map_err(map_sqlx_error)?;
697    last_outbox_event(conn, part).await
698}
699
700/// Insert one completion outbox row (success / failed / cancelled /
701/// retry) and return the `event_id` wrapped in an [`OutboxEvent`].
702pub(crate) async fn insert_completion_event_ev(
703    conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
704    part: i64,
705    exec_uuid: Uuid,
706    outcome: &str,
707    now: i64,
708) -> Result<OutboxEvent, EngineError> {
709    sqlx::query(q_attempt::INSERT_COMPLETION_EVENT_SQL)
710        .bind(outcome)
711        .bind(now)
712        .bind(part)
713        .bind(exec_uuid)
714        .execute(&mut **conn)
715        .await
716        .map_err(map_sqlx_error)?;
717    last_outbox_event(conn, part).await
718}
719
720// ── Phase 2a.3 hot-path bodies ────────────────────────────────────────
721
722async fn renew_impl(
723    pool: &SqlitePool,
724    pubsub: &PubSub,
725    handle: &Handle,
726) -> Result<LeaseRenewal, EngineError> {
727    let payload = decode_handle(handle)?;
728    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
729    let attempt_index = i64::from(payload.attempt_index.0);
730    let expected_epoch = payload.lease_epoch.0;
731    let lease_ttl_ms = i64::try_from(payload.lease_ttl_ms).unwrap_or(0);
732
733    let mut conn = begin_immediate(pool).await?;
734    let result = renew_inner(
735        &mut conn,
736        part,
737        exec_uuid,
738        attempt_index,
739        expected_epoch,
740        lease_ttl_ms,
741    )
742    .await;
743    match result {
744        Ok((renewal, emits)) => {
745            commit_or_rollback(&mut conn).await?;
746            dispatch_pending_emits(pubsub, &emits);
747            Ok(renewal)
748        }
749        Err(e) => {
750            rollback_quiet(&mut conn).await;
751            Err(e)
752        }
753    }
754}
755
756async fn renew_inner(
757    conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
758    part: i64,
759    exec_uuid: Uuid,
760    attempt_index: i64,
761    expected_epoch: u64,
762    lease_ttl_ms: i64,
763) -> Result<(LeaseRenewal, Vec<PendingEmit>), EngineError> {
764    fence_check(conn, part, exec_uuid, attempt_index, expected_epoch).await?;
765    let now = now_ms();
766    let new_expires = now.saturating_add(lease_ttl_ms);
767
768    sqlx::query(q_lease::UPDATE_ATTEMPT_RENEW_SQL)
769        .bind(new_expires)
770        .bind(part)
771        .bind(exec_uuid)
772        .bind(attempt_index)
773        .execute(&mut **conn)
774        .await
775        .map_err(map_sqlx_error)?;
776
777    // RFC-019 Stage B outbox parity: lease renewed event.
778    let ev = insert_lease_event(conn, part, exec_uuid, "renewed", now).await?;
779    let emits = vec![(OutboxChannel::LeaseHistory, ev)];
780
781    Ok((
782        LeaseRenewal::new(u64::try_from(new_expires).unwrap_or(0), expected_epoch),
783        emits,
784    ))
785}
786
787async fn progress_impl(
788    pool: &SqlitePool,
789    handle: &Handle,
790    percent: Option<u8>,
791    message: Option<String>,
792) -> Result<(), EngineError> {
793    let payload = decode_handle(handle)?;
794    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
795    let attempt_index = i64::from(payload.attempt_index.0);
796    let expected_epoch = payload.lease_epoch.0;
797
798    let mut conn = begin_immediate(pool).await?;
799    let result = progress_inner(
800        &mut conn,
801        part,
802        exec_uuid,
803        attempt_index,
804        expected_epoch,
805        percent,
806        message,
807    )
808    .await;
809    match result {
810        Ok(()) => commit_or_rollback(&mut conn).await,
811        Err(e) => {
812            rollback_quiet(&mut conn).await;
813            Err(e)
814        }
815    }
816}
817
818async fn progress_inner(
819    conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
820    part: i64,
821    exec_uuid: Uuid,
822    attempt_index: i64,
823    expected_epoch: u64,
824    percent: Option<u8>,
825    message: Option<String>,
826) -> Result<(), EngineError> {
827    fence_check(conn, part, exec_uuid, attempt_index, expected_epoch).await?;
828
829    // `UPDATE_EXEC_CORE_PROGRESS_SQL` is self-correct for any NULL/
830    // non-NULL combination of the two binds (PR #376 Copilot review) —
831    // its nested `CASE WHEN ? IS NULL` shape treats each field as
832    // independent and leaves the corresponding JSON path absent when
833    // the caller passed None. No Rust-side short-circuit needed.
834    sqlx::query(q_exec::UPDATE_EXEC_CORE_PROGRESS_SQL)
835        .bind(percent.map(i64::from))
836        .bind(message)
837        .bind(part)
838        .bind(exec_uuid)
839        .execute(&mut **conn)
840        .await
841        .map_err(map_sqlx_error)?;
842    Ok(())
843}
844
845// ── append_frame (RFC-015 write surface) ──────────────────────────────
846
847/// Apply one RFC 7396 JSON Merge Patch in-place. Mirrors the PG helper
848/// at `ff-backend-postgres/src/stream.rs::apply_json_merge_patch`;
849/// both implementations must honour the [`SUMMARY_NULL_SENTINEL`]
850/// rewrite (leaf `"__ff_null__"` → JSON `null`) so the round-trip
851/// invariant holds across backends.
852fn apply_json_merge_patch(target: &mut serde_json::Value, patch: &serde_json::Value) {
853    use serde_json::Value;
854    if let Value::Object(patch_map) = patch {
855        if !target.is_object() {
856            *target = Value::Object(serde_json::Map::new());
857        }
858        let target_map = target.as_object_mut().expect("just ensured object");
859        for (k, v) in patch_map {
860            match v {
861                Value::Null => {
862                    target_map.remove(k);
863                }
864                Value::String(s) if s == SUMMARY_NULL_SENTINEL => {
865                    target_map.insert(k.clone(), Value::Null);
866                }
867                Value::Object(_) => {
868                    let entry = target_map.entry(k.clone()).or_insert(Value::Null);
869                    apply_json_merge_patch(entry, v);
870                }
871                other => {
872                    target_map.insert(k.clone(), other.clone());
873                }
874            }
875        }
876    } else {
877        *target = patch.clone();
878    }
879}
880
881/// Build the `fields` JSON TEXT blob for a frame — mirrors the PG
882/// helper at `ff-backend-postgres/src/stream.rs::build_fields_json`
883/// so downstream readers observe the same shape on both backends.
884fn build_fields_json(frame: &Frame) -> String {
885    use serde_json::{Map, Value};
886    let payload_str = String::from_utf8_lossy(&frame.bytes).into_owned();
887    let mut map = Map::new();
888    let frame_type = if frame.frame_type.is_empty() {
889        match frame.kind {
890            FrameKind::Stdout => "stdout",
891            FrameKind::Stderr => "stderr",
892            FrameKind::Event => "event",
893            FrameKind::Blob => "blob",
894            _ => "event",
895        }
896        .to_owned()
897    } else {
898        frame.frame_type.clone()
899    };
900    map.insert("frame_type".into(), Value::String(frame_type));
901    map.insert("payload".into(), Value::String(payload_str));
902    map.insert("encoding".into(), Value::String("utf8".into()));
903    map.insert("source".into(), Value::String("worker".into()));
904    if let Some(corr) = &frame.correlation_id {
905        map.insert("correlation_id".into(), Value::String(corr.clone()));
906    }
907    Value::Object(map).to_string()
908}
909
910async fn append_frame_impl(
911    pool: &SqlitePool,
912    pubsub: &PubSub,
913    handle: &Handle,
914    frame: Frame,
915) -> Result<AppendFrameOutcome, EngineError> {
916    let payload = decode_handle(handle)?;
917    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
918    let attempt_index = i64::from(payload.attempt_index.0);
919    let expected_epoch = payload.lease_epoch.0;
920
921    let mut conn = begin_immediate(pool).await?;
922    let result = append_frame_inner(
923        &mut conn,
924        part,
925        exec_uuid,
926        attempt_index,
927        expected_epoch,
928        frame,
929    )
930    .await;
931    match result {
932        Ok((outcome, emits)) => {
933            commit_or_rollback(&mut conn).await?;
934            dispatch_pending_emits(pubsub, &emits);
935            Ok(outcome)
936        }
937        Err(e) => {
938            rollback_quiet(&mut conn).await;
939            Err(e)
940        }
941    }
942}
943
944async fn append_frame_inner(
945    conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
946    part: i64,
947    exec_uuid: Uuid,
948    attempt_index: i64,
949    expected_epoch: u64,
950    frame: Frame,
951) -> Result<(AppendFrameOutcome, Vec<PendingEmit>), EngineError> {
952    fence_check(conn, part, exec_uuid, attempt_index, expected_epoch).await?;
953
954    let ts_ms = now_ms();
955    let mode_wire = frame.mode.wire_str();
956    let fields_text = build_fields_json(&frame);
957
958    // Mint `seq` as MAX(seq) + 1 under the txn lock. `BEGIN IMMEDIATE`
959    // serializes writers so there is no need for an additional advisory
960    // lock (the PG path uses `pg_advisory_xact_lock` because READ
961    // COMMITTED isolates less strictly).
962    let max_seq: Option<i64> = sqlx::query_scalar(q_stream::SELECT_MAX_SEQ_SQL)
963        .bind(part)
964        .bind(exec_uuid)
965        .bind(attempt_index)
966        .bind(ts_ms)
967        .fetch_one(&mut **conn)
968        .await
969        .map_err(map_sqlx_error)?;
970    let next_seq: i64 = max_seq.map(|s| s + 1).unwrap_or(0);
971
972    sqlx::query(q_stream::INSERT_STREAM_FRAME_SQL)
973        .bind(part)
974        .bind(exec_uuid)
975        .bind(attempt_index)
976        .bind(ts_ms)
977        .bind(next_seq)
978        .bind(&fields_text)
979        .bind(mode_wire)
980        .bind(ts_ms)
981        .execute(&mut **conn)
982        .await
983        .map_err(map_sqlx_error)?;
984
985    // Post-commit broadcast on the stream_frame channel. `ff_stream_frame`
986    // uses a composite primary key, not AUTOINCREMENT — `last_insert_rowid()`
987    // still returns the rowid of the just-inserted row (SQLite assigns
988    // one for every non-WITHOUT-ROWID table), so the outbox-event id is
989    // unique per append within the table's rowid sequence.
990    let stream_ev = last_outbox_event(conn, part).await?;
991    let emits: Vec<PendingEmit> = vec![(OutboxChannel::StreamFrame, stream_ev)];
992
993    let mut summary_version: Option<u64> = None;
994
995    // DurableSummary: JSON Merge Patch applied in Rust, TEXT in/out.
996    if let StreamMode::DurableSummary { patch_kind } = &frame.mode {
997        let patch: serde_json::Value =
998            serde_json::from_slice(&frame.bytes).map_err(|e| EngineError::Validation {
999                kind: ValidationKind::InvalidInput,
1000                detail: format!("summary patch not valid JSON: {e}"),
1001            })?;
1002
1003        let existing: Option<(String, i64)> = sqlx::query_as(q_stream::SELECT_STREAM_SUMMARY_SQL)
1004            .bind(part)
1005            .bind(exec_uuid)
1006            .bind(attempt_index)
1007            .fetch_optional(&mut **conn)
1008            .await
1009            .map_err(map_sqlx_error)?;
1010
1011        let (mut doc, prev_version): (serde_json::Value, i64) = match existing {
1012            Some((text, v)) => {
1013                // Strict-parse posture (PR #376 gemini review): a stored
1014                // `document_json` that no longer round-trips indicates
1015                // DB corruption. Surface loudly via `Corruption` rather
1016                // than silently overwriting with an empty object.
1017                let parsed: serde_json::Value =
1018                    serde_json::from_str(&text).map_err(|e| EngineError::Validation {
1019                        kind: ValidationKind::Corruption,
1020                        detail: format!("corrupt summary document in ff_stream_summary: {e}"),
1021                    })?;
1022                (parsed, v)
1023            }
1024            None => (serde_json::Value::Object(serde_json::Map::new()), 0),
1025        };
1026
1027        match patch_kind {
1028            PatchKind::JsonMergePatch => apply_json_merge_patch(&mut doc, &patch),
1029            _ => apply_json_merge_patch(&mut doc, &patch),
1030        }
1031
1032        let new_version = prev_version + 1;
1033        let patch_kind_wire = "json-merge-patch";
1034        let doc_text = doc.to_string();
1035        if prev_version == 0 {
1036            sqlx::query(q_stream::INSERT_STREAM_SUMMARY_SQL)
1037                .bind(part)
1038                .bind(exec_uuid)
1039                .bind(attempt_index)
1040                .bind(&doc_text)
1041                .bind(new_version)
1042                .bind(patch_kind_wire)
1043                .bind(ts_ms)
1044                .bind(ts_ms)
1045                .execute(&mut **conn)
1046                .await
1047                .map_err(map_sqlx_error)?;
1048        } else {
1049            sqlx::query(q_stream::UPDATE_STREAM_SUMMARY_SQL)
1050                .bind(part)
1051                .bind(exec_uuid)
1052                .bind(attempt_index)
1053                .bind(&doc_text)
1054                .bind(new_version)
1055                .bind(patch_kind_wire)
1056                .bind(ts_ms)
1057                .execute(&mut **conn)
1058                .await
1059                .map_err(map_sqlx_error)?;
1060        }
1061        summary_version = Some(u64::try_from(new_version).unwrap_or(0));
1062    }
1063
1064    // BestEffortLive: EMA + trim. Computation ports from the PG helper
1065    // at `ff-backend-postgres/src/stream.rs:272-339`.
1066    if let StreamMode::BestEffortLive { config } = &frame.mode {
1067        let meta: Option<(f64, i64)> = sqlx::query_as(q_stream::SELECT_STREAM_META_SQL)
1068            .bind(part)
1069            .bind(exec_uuid)
1070            .bind(attempt_index)
1071            .fetch_optional(&mut **conn)
1072            .await
1073            .map_err(map_sqlx_error)?;
1074
1075        let (ema_prev, last_ts) = meta.unwrap_or((0.0, 0));
1076        let inst_rate: f64 = if last_ts > 0 && ts_ms > last_ts {
1077            1000.0 / ((ts_ms - last_ts) as f64)
1078        } else {
1079            0.0
1080        };
1081        let alpha = config.ema_alpha;
1082        let ema_new = alpha * inst_rate + (1.0 - alpha) * ema_prev;
1083        let k_raw = (ema_new * (f64::from(config.ttl_ms)) / 1000.0).ceil() as i64 * 2;
1084        let k = k_raw
1085            .max(i64::from(config.maxlen_floor))
1086            .min(i64::from(config.maxlen_ceiling));
1087
1088        sqlx::query(q_stream::UPSERT_STREAM_META_SQL)
1089            .bind(part)
1090            .bind(exec_uuid)
1091            .bind(attempt_index)
1092            .bind(ema_new)
1093            .bind(ts_ms)
1094            .bind(k)
1095            .execute(&mut **conn)
1096            .await
1097            .map_err(map_sqlx_error)?;
1098
1099        sqlx::query(q_stream::TRIM_STREAM_FRAMES_SQL)
1100            .bind(part)
1101            .bind(exec_uuid)
1102            .bind(attempt_index)
1103            .bind(k)
1104            .execute(&mut **conn)
1105            .await
1106            .map_err(map_sqlx_error)?;
1107    }
1108
1109    let frame_count: i64 = sqlx::query_scalar(q_stream::COUNT_STREAM_FRAMES_SQL)
1110        .bind(part)
1111        .bind(exec_uuid)
1112        .bind(attempt_index)
1113        .fetch_one(&mut **conn)
1114        .await
1115        .map_err(map_sqlx_error)?;
1116
1117    let stream_id = format!("{ts_ms}-{next_seq}");
1118    let mut out = AppendFrameOutcome::new(stream_id, u64::try_from(frame_count).unwrap_or(0));
1119    if let Some(v) = summary_version {
1120        out = out.with_summary_version(v);
1121    }
1122    Ok((out, emits))
1123}
1124
1125// ── Phase 2b.2.2 stream readers (Group C) ─────────────────────────────
1126
1127/// Parse a [`StreamCursor`] into `(ts_ms, seq)`. Mirror of PG at
1128/// `ff-backend-postgres/src/stream.rs:365-395`. `Start` maps to the
1129/// smallest representable tuple (i64::MIN, i64::MIN) so the lower
1130/// bound on `read_stream` is inclusive-from-earliest; `End` maps to
1131/// (i64::MAX, i64::MAX) for the symmetric upper bound.
1132#[cfg(feature = "streaming")]
1133fn parse_cursor_bound(c: &StreamCursor) -> Result<(i64, i64), EngineError> {
1134    match c {
1135        StreamCursor::Start => Ok((i64::MIN, i64::MIN)),
1136        StreamCursor::End => Ok((i64::MAX, i64::MAX)),
1137        StreamCursor::At(s) => parse_concrete_cursor(s),
1138    }
1139}
1140
1141#[cfg(feature = "streaming")]
1142fn parse_concrete_cursor(s: &str) -> Result<(i64, i64), EngineError> {
1143    let (ms, seq) = match s.split_once('-') {
1144        Some((a, b)) => (a, b),
1145        None => (s, "0"),
1146    };
1147    let ms: i64 = ms.parse().map_err(|_| EngineError::Validation {
1148        kind: ValidationKind::InvalidInput,
1149        detail: format!("bad stream cursor '{s}' (ms)"),
1150    })?;
1151    let sq: i64 = seq.parse().map_err(|_| EngineError::Validation {
1152        kind: ValidationKind::InvalidInput,
1153        detail: format!("bad stream cursor '{s}' (seq)"),
1154    })?;
1155    Ok((ms, sq))
1156}
1157
1158#[cfg(feature = "streaming")]
1159fn row_to_frame(ts_ms: i64, seq: i64, fields_text: &str) -> StreamFrame {
1160    use std::collections::BTreeMap;
1161    let mut out: BTreeMap<String, String> = BTreeMap::new();
1162    if let Ok(serde_json::Value::Object(map)) =
1163        serde_json::from_str::<serde_json::Value>(fields_text)
1164    {
1165        for (k, v) in map {
1166            let s = match v {
1167                serde_json::Value::String(s) => s,
1168                other => other.to_string(),
1169            };
1170            out.insert(k, s);
1171        }
1172    }
1173    StreamFrame {
1174        id: format!("{ts_ms}-{seq}"),
1175        fields: out,
1176    }
1177}
1178
1179#[cfg(feature = "streaming")]
1180async fn read_stream_impl(
1181    pool: &SqlitePool,
1182    execution_id: &ExecutionId,
1183    attempt_index: AttemptIndex,
1184    from: StreamCursor,
1185    to: StreamCursor,
1186    count_limit: u64,
1187) -> Result<StreamFrames, EngineError> {
1188    let (part, exec_uuid) = split_exec_id(execution_id)?;
1189    let aidx: i64 = i64::from(attempt_index.0);
1190    let (from_ms, from_seq) = parse_cursor_bound(&from)?;
1191    let (to_ms, to_seq) = parse_cursor_bound(&to)?;
1192    let lim = i64::try_from(count_limit.min(STREAM_READ_HARD_CAP)).unwrap_or(i64::MAX);
1193
1194    let rows = sqlx::query(q_stream::READ_STREAM_RANGE_SQL)
1195        .bind(part)
1196        .bind(exec_uuid)
1197        .bind(aidx)
1198        .bind(from_ms)
1199        .bind(from_seq)
1200        .bind(to_ms)
1201        .bind(to_seq)
1202        .bind(lim)
1203        .fetch_all(pool)
1204        .await
1205        .map_err(map_sqlx_error)?;
1206
1207    let mut frames = Vec::with_capacity(rows.len());
1208    for row in rows {
1209        let ts: i64 = row.try_get("ts_ms").map_err(map_sqlx_error)?;
1210        let seq: i64 = row.try_get("seq").map_err(map_sqlx_error)?;
1211        let fields_text: String = row.try_get("fields").map_err(map_sqlx_error)?;
1212        frames.push(row_to_frame(ts, seq, &fields_text));
1213    }
1214    Ok(StreamFrames {
1215        frames,
1216        closed_at: None,
1217        closed_reason: None,
1218    })
1219}
1220
1221#[cfg(feature = "streaming")]
1222#[allow(clippy::too_many_arguments)] // mirrors the trait signature
1223async fn tail_stream_impl(
1224    pool: &SqlitePool,
1225    pubsub: &PubSub,
1226    execution_id: &ExecutionId,
1227    attempt_index: AttemptIndex,
1228    after: StreamCursor,
1229    block_ms: u64,
1230    count_limit: u64,
1231    visibility: TailVisibility,
1232) -> Result<StreamFrames, EngineError> {
1233    let (part, exec_uuid) = split_exec_id(execution_id)?;
1234    let aidx: i64 = i64::from(attempt_index.0);
1235    let (after_ms, after_seq) = match &after {
1236        StreamCursor::At(s) => parse_concrete_cursor(s)?,
1237        _ => {
1238            return Err(EngineError::Validation {
1239                kind: ValidationKind::InvalidInput,
1240                detail: "tail_stream requires concrete after cursor".into(),
1241            });
1242        }
1243    };
1244    let lim = i64::try_from(count_limit.min(STREAM_READ_HARD_CAP)).unwrap_or(i64::MAX);
1245    let sql = match visibility {
1246        TailVisibility::ExcludeBestEffort => q_stream::TAIL_STREAM_AFTER_EXCLUDE_BE_SQL,
1247        _ => q_stream::TAIL_STREAM_AFTER_SQL,
1248    };
1249
1250    // Subscribe BEFORE the first SELECT so we never miss a broadcast
1251    // wake between SELECT and park. Matches PG's LISTEN-then-SELECT
1252    // handshake at `ff-backend-postgres/src/stream.rs:496-498`.
1253    let mut rx = pubsub.stream_frame.subscribe();
1254
1255    let do_select = || async {
1256        sqlx::query(sql)
1257            .bind(part)
1258            .bind(exec_uuid)
1259            .bind(aidx)
1260            .bind(after_ms)
1261            .bind(after_seq)
1262            .bind(lim)
1263            .fetch_all(pool)
1264            .await
1265            .map_err(map_sqlx_error)
1266    };
1267
1268    let rows = do_select().await?;
1269    if !rows.is_empty() || block_ms == 0 {
1270        return Ok(rows_to_frames(rows));
1271    }
1272
1273    // Park on the broadcast receiver — NO SQLite connection held here.
1274    // Loop until timeout OR the re-SELECT returns a non-empty set:
1275    // a broadcast tick may correspond to a frame that failed the
1276    // visibility filter, in which case we re-park for the remainder.
1277    let start = std::time::Instant::now();
1278    let total = Duration::from_millis(block_ms);
1279    loop {
1280        let remaining = match total.checked_sub(start.elapsed()) {
1281            Some(r) if !r.is_zero() => r,
1282            _ => break,
1283        };
1284        match tokio::time::timeout(remaining, rx.recv()).await {
1285            Ok(Ok(_)) => {}
1286            // Lagged → outbox is durable; just re-select.
1287            Ok(Err(broadcast::error::RecvError::Lagged(_))) => {}
1288            // Producer closed → do one last re-select + return.
1289            Ok(Err(broadcast::error::RecvError::Closed)) => {
1290                return Ok(rows_to_frames(do_select().await?));
1291            }
1292            // Timeout; fall through to break below.
1293            Err(_) => break,
1294        }
1295        let rows = do_select().await?;
1296        if !rows.is_empty() {
1297            return Ok(rows_to_frames(rows));
1298        }
1299        if start.elapsed() >= total {
1300            break;
1301        }
1302    }
1303
1304    Ok(StreamFrames::empty_open())
1305}
1306
1307#[cfg(feature = "streaming")]
1308fn rows_to_frames(rows: Vec<sqlx::sqlite::SqliteRow>) -> StreamFrames {
1309    let mut frames = Vec::with_capacity(rows.len());
1310    for row in rows {
1311        let ts: i64 = row.try_get("ts_ms").unwrap_or(0);
1312        let seq: i64 = row.try_get("seq").unwrap_or(0);
1313        let fields_text: String = row.try_get("fields").unwrap_or_default();
1314        frames.push(row_to_frame(ts, seq, &fields_text));
1315    }
1316    StreamFrames {
1317        frames,
1318        closed_at: None,
1319        closed_reason: None,
1320    }
1321}
1322
1323#[cfg(feature = "streaming")]
1324async fn read_summary_impl(
1325    pool: &SqlitePool,
1326    execution_id: &ExecutionId,
1327    attempt_index: AttemptIndex,
1328) -> Result<Option<SummaryDocument>, EngineError> {
1329    let (part, exec_uuid) = split_exec_id(execution_id)?;
1330    let aidx: i64 = i64::from(attempt_index.0);
1331
1332    let row = sqlx::query(q_stream::READ_SUMMARY_FULL_SQL)
1333        .bind(part)
1334        .bind(exec_uuid)
1335        .bind(aidx)
1336        .fetch_optional(pool)
1337        .await
1338        .map_err(map_sqlx_error)?;
1339
1340    let Some(row) = row else { return Ok(None) };
1341    let doc_text: String = row.try_get("document_json").map_err(map_sqlx_error)?;
1342    let version: i64 = row.try_get("version").map_err(map_sqlx_error)?;
1343    let patch_kind_wire: Option<String> = row
1344        .try_get::<Option<String>, _>("patch_kind")
1345        .unwrap_or(None);
1346    let last_updated: i64 = row.try_get("last_updated_ms").map_err(map_sqlx_error)?;
1347    let first_applied: i64 = row.try_get("first_applied_ms").map_err(map_sqlx_error)?;
1348
1349    // Re-serialize via serde_json to normalize whitespace so SQLite and
1350    // PG observers receive byte-identical documents for equivalent
1351    // stored state. A corrupt stored blob surfaces as Validation —
1352    // matches the Phase 2a.3 `append_frame` strict-parse posture.
1353    let parsed: serde_json::Value =
1354        serde_json::from_str(&doc_text).map_err(|e| EngineError::Validation {
1355            kind: ValidationKind::Corruption,
1356            detail: format!("corrupt summary document in ff_stream_summary: {e}"),
1357        })?;
1358    let bytes = serde_json::to_vec(&parsed).map_err(|e| EngineError::Validation {
1359        kind: ValidationKind::InvalidInput,
1360        detail: format!("summary document not serialisable: {e}"),
1361    })?;
1362    let patch_kind = match patch_kind_wire.as_deref() {
1363        Some("json-merge-patch") => PatchKind::JsonMergePatch,
1364        _ => PatchKind::JsonMergePatch,
1365    };
1366    Ok(Some(SummaryDocument::new(
1367        bytes,
1368        u64::try_from(version).unwrap_or(0),
1369        patch_kind,
1370        u64::try_from(last_updated).unwrap_or(0),
1371        u64::try_from(first_applied).unwrap_or(0),
1372    )))
1373}
1374
1375// ── claim_from_resume_grant ────────────────────────────────────────────────
1376
1377async fn claim_from_reclaim_impl(
1378    pool: &SqlitePool,
1379    pubsub: &PubSub,
1380    token: &ResumeToken,
1381) -> Result<Option<Handle>, EngineError> {
1382    let eid = &token.grant.execution_id;
1383    let (part, exec_uuid) = split_exec_id(eid)?;
1384
1385    let mut conn = begin_immediate(pool).await?;
1386    let result = claim_from_reclaim_inner(&mut conn, part, exec_uuid, token).await;
1387    match result {
1388        Ok(Some((handle, emits))) => {
1389            commit_or_rollback(&mut conn).await?;
1390            dispatch_pending_emits(pubsub, &emits);
1391            Ok(Some(handle))
1392        }
1393        Ok(None) => {
1394            rollback_quiet(&mut conn).await;
1395            Ok(None)
1396        }
1397        Err(e) => {
1398            rollback_quiet(&mut conn).await;
1399            Err(e)
1400        }
1401    }
1402}
1403
1404async fn claim_from_reclaim_inner(
1405    conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
1406    part: i64,
1407    exec_uuid: Uuid,
1408    token: &ResumeToken,
1409) -> Result<Option<(Handle, Vec<PendingEmit>)>, EngineError> {
1410    // Latest attempt under the partition/exec. Mirror of PG at
1411    // `ff-backend-postgres/src/attempt.rs:294-308`.
1412    let row = sqlx::query(q_lease::SELECT_LATEST_ATTEMPT_FOR_RECLAIM_SQL)
1413        .bind(part)
1414        .bind(exec_uuid)
1415        .fetch_optional(&mut **conn)
1416        .await
1417        .map_err(map_sqlx_error)?;
1418    let Some(row) = row else {
1419        return Err(EngineError::NotFound { entity: "attempt" });
1420    };
1421    let attempt_index_i: i64 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
1422    let current_epoch: i64 = row.try_get("lease_epoch").map_err(map_sqlx_error)?;
1423    let expires_at: Option<i64> = row
1424        .try_get::<Option<i64>, _>("lease_expires_at_ms")
1425        .map_err(map_sqlx_error)?;
1426
1427    let now = now_ms();
1428    // Live-lease → grant no longer honour-able.
1429    let live = matches!(expires_at, Some(exp) if exp > now);
1430    if live {
1431        return Ok(None);
1432    }
1433
1434    let lease_ttl_ms = i64::from(token.lease_ttl_ms);
1435    let new_expires = now.saturating_add(lease_ttl_ms);
1436
1437    sqlx::query(q_lease::UPDATE_ATTEMPT_RECLAIM_SQL)
1438        .bind(token.worker_id.as_str())
1439        .bind(token.worker_instance_id.as_str())
1440        .bind(new_expires)
1441        .bind(now)
1442        .bind(part)
1443        .bind(exec_uuid)
1444        .bind(attempt_index_i)
1445        .execute(&mut **conn)
1446        .await
1447        .map_err(map_sqlx_error)?;
1448
1449    sqlx::query(q_lease::UPDATE_EXEC_CORE_RECLAIM_SQL)
1450        .bind(part)
1451        .bind(exec_uuid)
1452        .execute(&mut **conn)
1453        .await
1454        .map_err(map_sqlx_error)?;
1455
1456    let ev = insert_lease_event(conn, part, exec_uuid, "reclaimed", now).await?;
1457    let emits = vec![(OutboxChannel::LeaseHistory, ev)];
1458
1459    let new_epoch = current_epoch.saturating_add(1);
1460    let payload = HandlePayload::new(
1461        token.grant.execution_id.clone(),
1462        AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0)),
1463        AttemptId::new(),
1464        LeaseId::new(),
1465        LeaseEpoch(u64::try_from(new_epoch).unwrap_or(0)),
1466        u64::from(token.lease_ttl_ms),
1467        token.grant.lane_id.clone(),
1468        token.worker_instance_id.clone(),
1469    );
1470    Ok(Some((encode_handle(&payload, HandleKind::Resumed), emits)))
1471}
1472
1473// ── Phase 2b.1 producer-side bodies (Group A) ─────────────────────────
1474
1475/// Serialize an optional [`ff_core::policy::ExecutionPolicy`] into the
1476/// TEXT JSON shape stored in `ff_exec_core.policy`. Mirrors PG at
1477/// `ff-backend-postgres/src/exec_core.rs:144-150` (the PG side stores
1478/// jsonb; SQLite stores the same JSON in a TEXT column).
1479#[cfg(feature = "core")]
1480fn encode_policy_json(
1481    policy: Option<&ff_core::policy::ExecutionPolicy>,
1482) -> Result<Option<String>, EngineError> {
1483    match policy {
1484        Some(p) => serde_json::to_string(p)
1485            .map(Some)
1486            .map_err(|e| EngineError::Validation {
1487                kind: ValidationKind::InvalidInput,
1488                detail: format!("create_execution: policy: serialize failed: {e}"),
1489            }),
1490        None => Ok(None),
1491    }
1492}
1493
1494/// Build `raw_fields` for a fresh `ff_exec_core` row. Mirror of PG's
1495/// `create_execution_impl` JSON shape so downstream read paths decode
1496/// identically. TEXT JSON in SQLite vs jsonb in PG is otherwise opaque.
1497#[cfg(feature = "core")]
1498fn build_create_execution_raw_fields(args: &CreateExecutionArgs) -> String {
1499    use serde_json::{Map, Value};
1500    let mut raw: Map<String, Value> = Map::new();
1501    raw.insert(
1502        "namespace".into(),
1503        Value::String(args.namespace.as_str().to_owned()),
1504    );
1505    raw.insert(
1506        "execution_kind".into(),
1507        Value::String(args.execution_kind.clone()),
1508    );
1509    raw.insert(
1510        "creator_identity".into(),
1511        Value::String(args.creator_identity.clone()),
1512    );
1513    if let Some(k) = &args.idempotency_key {
1514        raw.insert("idempotency_key".into(), Value::String(k.clone()));
1515    }
1516    if let Some(enc) = &args.payload_encoding {
1517        raw.insert("payload_encoding".into(), Value::String(enc.clone()));
1518    }
1519    raw.insert(
1520        "last_mutation_at".into(),
1521        Value::String(args.now.0.to_string()),
1522    );
1523    raw.insert("total_attempt_count".into(), Value::String("0".to_owned()));
1524    let tags_json: Map<String, Value> = args
1525        .tags
1526        .iter()
1527        .map(|(k, v)| (k.clone(), Value::String(v.clone())))
1528        .collect();
1529    raw.insert("tags".into(), Value::Object(tags_json));
1530    Value::Object(raw).to_string()
1531}
1532
1533#[cfg(feature = "core")]
1534async fn create_execution_impl(
1535    pool: &SqlitePool,
1536    args: &CreateExecutionArgs,
1537) -> Result<CreateExecutionResult, EngineError> {
1538    let part: i64 = i64::from(args.execution_id.partition());
1539    let exec_uuid = {
1540        let s = args.execution_id.as_str();
1541        let tail = s
1542            .split_once("}:")
1543            .map(|(_, t)| t)
1544            .ok_or_else(|| EngineError::Validation {
1545                kind: ValidationKind::InvalidInput,
1546                detail: format!("execution_id missing `}}:` separator: {s}"),
1547            })?;
1548        Uuid::parse_str(tail).map_err(|e| EngineError::Validation {
1549            kind: ValidationKind::InvalidInput,
1550            detail: format!("execution_id UUID invalid: {e}"),
1551        })?
1552    };
1553    let lane_id = args.lane_id.as_str().to_owned();
1554    let priority: i64 = i64::from(args.priority);
1555    let created_at_ms: i64 = args.now.0;
1556    let deadline_at_ms: Option<i64> = args.execution_deadline_at.map(|t| t.0);
1557    let raw_fields = build_create_execution_raw_fields(args);
1558    let policy_json = encode_policy_json(args.policy.as_ref())?;
1559
1560    let mut conn = begin_immediate(pool).await?;
1561
1562    let insert_result = sqlx::query(q_exec::INSERT_EXEC_CORE_SQL)
1563        .bind(part)
1564        .bind(exec_uuid)
1565        .bind(&lane_id)
1566        .bind(priority)
1567        .bind(created_at_ms)
1568        .bind(deadline_at_ms)
1569        .bind(args.input_payload.as_slice())
1570        .bind(policy_json.as_deref())
1571        .bind(&raw_fields)
1572        .execute(&mut *conn)
1573        .await
1574        .map_err(map_sqlx_error);
1575
1576    let result = async {
1577        let res = insert_result?;
1578        let inserted = res.rows_affected() > 0;
1579
1580        if inserted {
1581            // Populate the capability junction — RFC-023 §4.1 A4.
1582            // Required caps live on
1583            // `ExecutionPolicy.routing_requirements.required_capabilities`;
1584            // if absent, no junction rows are written (matches PG's
1585            // empty `text[]` default — see PG reference at
1586            // `ff-backend-postgres/src/exec_core.rs:157-188` which also
1587            // stores an empty `text[]` array when the policy is None).
1588            let required: Vec<String> = args
1589                .policy
1590                .as_ref()
1591                .and_then(|p| p.routing_requirements.as_ref())
1592                .map(|r| r.required_capabilities.iter().cloned().collect())
1593                .unwrap_or_default();
1594            for cap in &required {
1595                sqlx::query(q_exec::INSERT_EXEC_CAPABILITY_SQL)
1596                    .bind(exec_uuid)
1597                    .bind(cap)
1598                    .execute(&mut *conn)
1599                    .await
1600                    .map_err(map_sqlx_error)?;
1601            }
1602        }
1603
1604        // Lane-registry seed is idempotent and runs on every call so
1605        // a dynamic lane seen for the first time on a duplicate
1606        // create_execution still registers.
1607        sqlx::query(q_exec::INSERT_LANE_REGISTRY_SQL)
1608            .bind(&lane_id)
1609            .bind(created_at_ms)
1610            .execute(&mut *conn)
1611            .await
1612            .map_err(map_sqlx_error)?;
1613
1614        Ok::<bool, EngineError>(inserted)
1615    }
1616    .await;
1617
1618    match result {
1619        Ok(inserted) => {
1620            commit_or_rollback(&mut conn).await?;
1621            if inserted {
1622                Ok(CreateExecutionResult::Created {
1623                    execution_id: args.execution_id.clone(),
1624                    public_state: PublicState::Waiting,
1625                })
1626            } else {
1627                Ok(CreateExecutionResult::Duplicate {
1628                    execution_id: args.execution_id.clone(),
1629                })
1630            }
1631        }
1632        Err(e) => {
1633            rollback_quiet(&mut conn).await;
1634            Err(e)
1635        }
1636    }
1637}
1638
1639#[cfg(feature = "core")]
1640async fn create_flow_impl(
1641    pool: &SqlitePool,
1642    args: &CreateFlowArgs,
1643) -> Result<CreateFlowResult, EngineError> {
1644    // Flow partition under single-writer SQLite is always 0 (§4.1 A3).
1645    let part: i64 = 0;
1646    let flow_uuid: Uuid = args.flow_id.0;
1647    let now_ms = args.now.0;
1648
1649    let raw_fields = serde_json::json!({
1650        "flow_kind": args.flow_kind,
1651        "namespace": args.namespace.as_str(),
1652        "node_count": 0,
1653        "edge_count": 0,
1654        "last_mutation_at_ms": now_ms,
1655    })
1656    .to_string();
1657
1658    let mut conn = begin_immediate(pool).await?;
1659    let ins = sqlx::query(q_flow::INSERT_FLOW_CORE_SQL)
1660        .bind(part)
1661        .bind(flow_uuid)
1662        .bind(now_ms)
1663        .bind(&raw_fields)
1664        .execute(&mut *conn)
1665        .await
1666        .map_err(map_sqlx_error);
1667    match ins {
1668        Ok(r) => {
1669            commit_or_rollback(&mut conn).await?;
1670            if r.rows_affected() > 0 {
1671                Ok(CreateFlowResult::Created {
1672                    flow_id: args.flow_id.clone(),
1673                })
1674            } else {
1675                Ok(CreateFlowResult::AlreadySatisfied {
1676                    flow_id: args.flow_id.clone(),
1677                })
1678            }
1679        }
1680        Err(e) => {
1681            rollback_quiet(&mut conn).await;
1682            Err(e)
1683        }
1684    }
1685}
1686
1687#[cfg(feature = "core")]
1688async fn add_execution_to_flow_impl(
1689    pool: &SqlitePool,
1690    args: &AddExecutionToFlowArgs,
1691) -> Result<AddExecutionToFlowResult, EngineError> {
1692    let part: i64 = 0;
1693    let flow_uuid: Uuid = args.flow_id.0;
1694    let (exec_part, exec_uuid) = split_exec_id(&args.execution_id)?;
1695    // Under single-writer SQLite every entity lives on partition 0
1696    // (§4.1 A3). The exec id MUST carry `{fp:0}` because any other
1697    // partition is unreachable.
1698    if exec_part != part {
1699        return Err(EngineError::Validation {
1700            kind: ValidationKind::InvalidInput,
1701            detail: format!("execution partition mismatch: expected 0, got {exec_part}"),
1702        });
1703    }
1704    let now_ms = args.now.0;
1705
1706    let mut conn = begin_immediate(pool).await?;
1707    let work = async {
1708        // 1. Load flow_core.
1709        let flow_row = sqlx::query(q_flow_staging::SELECT_FLOW_CORE_FOR_STAGE_SQL)
1710            .bind(part)
1711            .bind(flow_uuid)
1712            .fetch_optional(&mut *conn)
1713            .await
1714            .map_err(map_sqlx_error)?;
1715        let Some(flow_row) = flow_row else {
1716            return Err(EngineError::Validation {
1717                kind: ValidationKind::InvalidInput,
1718                detail: "flow_not_found".into(),
1719            });
1720        };
1721        let public_flow_state: String = flow_row
1722            .try_get("public_flow_state")
1723            .map_err(map_sqlx_error)?;
1724        if matches!(
1725            public_flow_state.as_str(),
1726            "cancelled" | "completed" | "failed" | "terminal"
1727        ) {
1728            return Err(EngineError::Validation {
1729                kind: ValidationKind::InvalidInput,
1730                detail: "flow_already_terminal".into(),
1731            });
1732        }
1733        let raw_fields_text: String = flow_row.try_get("raw_fields").map_err(map_sqlx_error)?;
1734
1735        // 2. Load exec_core back-pointer.
1736        let exec_row = sqlx::query(q_flow_staging::SELECT_EXEC_FLOW_ID_SQL)
1737            .bind(part)
1738            .bind(exec_uuid)
1739            .fetch_optional(&mut *conn)
1740            .await
1741            .map_err(map_sqlx_error)?;
1742        let Some(exec_row) = exec_row else {
1743            return Err(EngineError::Validation {
1744                kind: ValidationKind::InvalidInput,
1745                detail: "execution_not_found".into(),
1746            });
1747        };
1748        let existing_flow_id: Option<Uuid> = exec_row.try_get("flow_id").map_err(map_sqlx_error)?;
1749
1750        // 3. Idempotent: already on this flow.
1751        if existing_flow_id == Some(flow_uuid) {
1752            // Read node_count from cached raw_fields (avoid a second SELECT).
1753            let raw_val: serde_json::Value = serde_json::from_str(&raw_fields_text)
1754                .unwrap_or_else(|_| serde_json::Value::Object(serde_json::Map::new()));
1755            let nc = raw_val
1756                .get("node_count")
1757                .and_then(|v| v.as_u64())
1758                .and_then(|n| u32::try_from(n).ok())
1759                .unwrap_or(0);
1760            return Ok(AddExecutionToFlowResult::AlreadyMember {
1761                execution_id: args.execution_id.clone(),
1762                node_count: nc,
1763            });
1764        }
1765
1766        // 4. Cross-flow refusal.
1767        if let Some(other) = existing_flow_id
1768            && other != flow_uuid
1769        {
1770            return Err(EngineError::Validation {
1771                kind: ValidationKind::InvalidInput,
1772                detail: format!("already_member_of_different_flow:{other}"),
1773            });
1774        }
1775
1776        // 5. Stamp exec.flow_id + bump flow counters.
1777        sqlx::query(q_flow_staging::UPDATE_EXEC_SET_FLOW_ID_SQL)
1778            .bind(part)
1779            .bind(exec_uuid)
1780            .bind(flow_uuid)
1781            .execute(&mut *conn)
1782            .await
1783            .map_err(map_sqlx_error)?;
1784        sqlx::query(q_flow_staging::BUMP_FLOW_NODE_COUNT_SQL)
1785            .bind(part)
1786            .bind(flow_uuid)
1787            .bind(now_ms)
1788            .execute(&mut *conn)
1789            .await
1790            .map_err(map_sqlx_error)?;
1791        let new_nc: i64 = sqlx::query_scalar(q_flow_staging::SELECT_FLOW_NODE_COUNT_SQL)
1792            .bind(part)
1793            .bind(flow_uuid)
1794            .fetch_one(&mut *conn)
1795            .await
1796            .map_err(map_sqlx_error)?;
1797        Ok(AddExecutionToFlowResult::Added {
1798            execution_id: args.execution_id.clone(),
1799            new_node_count: u32::try_from(new_nc.max(0)).unwrap_or(0),
1800        })
1801    }
1802    .await;
1803
1804    match work {
1805        Ok(res) => {
1806            commit_or_rollback(&mut conn).await?;
1807            Ok(res)
1808        }
1809        Err(e) => {
1810            rollback_quiet(&mut conn).await;
1811            Err(e)
1812        }
1813    }
1814}
1815
1816#[cfg(feature = "core")]
1817async fn stage_dependency_edge_impl(
1818    pool: &SqlitePool,
1819    args: &StageDependencyEdgeArgs,
1820) -> Result<StageDependencyEdgeResult, EngineError> {
1821    if args.upstream_execution_id == args.downstream_execution_id {
1822        return Err(EngineError::Validation {
1823            kind: ValidationKind::InvalidInput,
1824            detail: "self_referencing_edge".into(),
1825        });
1826    }
1827
1828    let part: i64 = 0;
1829    let flow_uuid: Uuid = args.flow_id.0;
1830    let edge_uuid: Uuid = args.edge_id.0;
1831    let (up_part, upstream_uuid) = split_exec_id(&args.upstream_execution_id)?;
1832    let (down_part, downstream_uuid) = split_exec_id(&args.downstream_execution_id)?;
1833    if up_part != part || down_part != part {
1834        return Err(EngineError::Validation {
1835            kind: ValidationKind::InvalidInput,
1836            detail: "execution partition mismatch under single-writer SQLite".into(),
1837        });
1838    }
1839    let now_ms = args.now.0;
1840    let expected_rev = i64::try_from(args.expected_graph_revision).unwrap_or(i64::MAX);
1841
1842    let mut conn = begin_immediate(pool).await?;
1843    let work = async {
1844        // 1. CAS bump flow_core. `changes()` after execute tells us
1845        //    whether the WHERE matched.
1846        let cas = sqlx::query(q_flow_staging::CAS_BUMP_FLOW_REV_SQL)
1847            .bind(part)
1848            .bind(flow_uuid)
1849            .bind(expected_rev)
1850            .bind(now_ms)
1851            .execute(&mut *conn)
1852            .await
1853            .map_err(map_sqlx_error)?;
1854        if cas.rows_affected() == 0 {
1855            // Distinguish flow-missing vs terminal vs stale-rev.
1856            let probe = sqlx::query(q_flow_staging::SELECT_FLOW_REV_AND_STATE_SQL)
1857                .bind(part)
1858                .bind(flow_uuid)
1859                .fetch_optional(&mut *conn)
1860                .await
1861                .map_err(map_sqlx_error)?;
1862            return match probe {
1863                None => Err(EngineError::Validation {
1864                    kind: ValidationKind::InvalidInput,
1865                    detail: "flow_not_found".into(),
1866                }),
1867                Some(r) => {
1868                    let state: String = r.try_get("public_flow_state").map_err(map_sqlx_error)?;
1869                    if matches!(
1870                        state.as_str(),
1871                        "cancelled" | "completed" | "failed" | "terminal"
1872                    ) {
1873                        Err(EngineError::Validation {
1874                            kind: ValidationKind::InvalidInput,
1875                            detail: "flow_already_terminal".into(),
1876                        })
1877                    } else {
1878                        Err(EngineError::Contention(ContentionKind::StaleGraphRevision))
1879                    }
1880                }
1881            };
1882        }
1883
1884        // 2. Membership check.
1885        let member_rows =
1886            sqlx::query_scalar::<_, Uuid>(q_flow_staging::SELECT_FLOW_MEMBERSHIP_PAIR_SQL)
1887                .bind(part)
1888                .bind(flow_uuid)
1889                .bind(upstream_uuid)
1890                .bind(downstream_uuid)
1891                .fetch_all(&mut *conn)
1892                .await
1893                .map_err(map_sqlx_error)?;
1894        if !member_rows.contains(&upstream_uuid) || !member_rows.contains(&downstream_uuid) {
1895            return Err(EngineError::Validation {
1896                kind: ValidationKind::InvalidInput,
1897                detail: "execution_not_in_flow".into(),
1898            });
1899        }
1900
1901        // 3. Insert edge.
1902        let policy_json = serde_json::json!({
1903            "dependency_kind": args.dependency_kind,
1904            "satisfaction_condition": "all_required",
1905            "data_passing_ref": args.data_passing_ref.clone().unwrap_or_default(),
1906            "edge_state": "pending",
1907            "created_at_ms": now_ms,
1908            "created_by": "engine",
1909            "staged_at_ms": now_ms,
1910            "applied_at_ms": serde_json::Value::Null,
1911        })
1912        .to_string();
1913        let ins = sqlx::query(q_flow_staging::INSERT_EDGE_SQL)
1914            .bind(part)
1915            .bind(flow_uuid)
1916            .bind(edge_uuid)
1917            .bind(upstream_uuid)
1918            .bind(downstream_uuid)
1919            .bind(&policy_json)
1920            .execute(&mut *conn)
1921            .await
1922            .map_err(map_sqlx_error)?;
1923        if ins.rows_affected() == 0 {
1924            // Edge already exists — parity with the PG `Conflict(
1925            // DependencyAlreadyExists { existing })` path would require
1926            // rehydrating the existing `EdgeSnapshot`, but SQLite
1927            // currently has no `describe_edge` reader wired (Phase
1928            // 2b.2). Surface the conflict as a Validation error
1929            // naming the edge_id so callers see a stable signal;
1930            // when `describe_edge` lands this can tighten to
1931            // `Conflict(DependencyAlreadyExists {..})`.
1932            return Err(EngineError::Validation {
1933                kind: ValidationKind::InvalidInput,
1934                detail: format!("dependency_already_exists:edge_id={edge_uuid}"),
1935            });
1936        }
1937
1938        // 4. Read post-bump revision.
1939        let new_rev: i64 = sqlx::query_scalar::<_, i64>(
1940            "SELECT graph_revision FROM ff_flow_core \
1941             WHERE partition_key = ?1 AND flow_id = ?2",
1942        )
1943        .bind(part)
1944        .bind(flow_uuid)
1945        .fetch_one(&mut *conn)
1946        .await
1947        .map_err(map_sqlx_error)?;
1948
1949        Ok(StageDependencyEdgeResult::Staged {
1950            edge_id: args.edge_id.clone(),
1951            new_graph_revision: u64::try_from(new_rev).unwrap_or(0),
1952        })
1953    }
1954    .await;
1955
1956    match work {
1957        Ok(res) => {
1958            commit_or_rollback(&mut conn).await?;
1959            Ok(res)
1960        }
1961        Err(e) => {
1962            rollback_quiet(&mut conn).await;
1963            Err(e)
1964        }
1965    }
1966}
1967
1968#[cfg(feature = "core")]
1969async fn apply_dependency_to_child_impl(
1970    pool: &SqlitePool,
1971    args: &ApplyDependencyToChildArgs,
1972) -> Result<ApplyDependencyToChildResult, EngineError> {
1973    let part: i64 = 0;
1974    let flow_uuid: Uuid = args.flow_id.0;
1975    let edge_uuid: Uuid = args.edge_id.0;
1976    let (down_part, downstream_uuid) = split_exec_id(&args.downstream_execution_id)?;
1977    if down_part != part {
1978        return Err(EngineError::Validation {
1979            kind: ValidationKind::InvalidInput,
1980            detail: "execution partition mismatch under single-writer SQLite".into(),
1981        });
1982    }
1983    let now_ms = args.now.0;
1984
1985    let mut conn = begin_immediate(pool).await?;
1986    let work = async {
1987        // 1. Load the edge row.
1988        let row = sqlx::query(q_flow_staging::SELECT_EDGE_POLICY_SQL)
1989            .bind(part)
1990            .bind(flow_uuid)
1991            .bind(edge_uuid)
1992            .fetch_optional(&mut *conn)
1993            .await
1994            .map_err(map_sqlx_error)?;
1995        let Some(row) = row else {
1996            return Err(EngineError::Validation {
1997                kind: ValidationKind::InvalidInput,
1998                detail: "edge_not_found".into(),
1999            });
2000        };
2001        let policy_text: String = row.try_get("policy").map_err(map_sqlx_error)?;
2002        let mut policy: serde_json::Value =
2003            serde_json::from_str(&policy_text).map_err(|e| EngineError::Validation {
2004                kind: ValidationKind::Corruption,
2005                detail: format!("ff_edge.policy: {e}"),
2006            })?;
2007
2008        // 2. Idempotency.
2009        let already_applied = policy
2010            .get("applied_at_ms")
2011            .and_then(|v| v.as_i64())
2012            .is_some();
2013        if already_applied {
2014            return Ok(ApplyDependencyToChildResult::AlreadyApplied);
2015        }
2016
2017        // 3. Mutate policy JSON.
2018        if let Some(obj) = policy.as_object_mut() {
2019            obj.insert("applied_at_ms".into(), serde_json::json!(now_ms));
2020            obj.insert("edge_state".into(), serde_json::json!("applied"));
2021        }
2022        let new_policy_text = policy.to_string();
2023        sqlx::query(q_flow_staging::UPDATE_EDGE_POLICY_SQL)
2024            .bind(part)
2025            .bind(flow_uuid)
2026            .bind(edge_uuid)
2027            .bind(&new_policy_text)
2028            .execute(&mut *conn)
2029            .await
2030            .map_err(map_sqlx_error)?;
2031
2032        // 4. Upsert edge_group.
2033        let default_group_policy = serde_json::json!({ "kind": "all_of" }).to_string();
2034        sqlx::query(q_flow_staging::UPSERT_EDGE_GROUP_APPLY_SQL)
2035            .bind(part)
2036            .bind(flow_uuid)
2037            .bind(downstream_uuid)
2038            .bind(&default_group_policy)
2039            .execute(&mut *conn)
2040            .await
2041            .map_err(map_sqlx_error)?;
2042
2043        // 5. Read post-upsert running_count.
2044        let unsatisfied: i64 =
2045            sqlx::query_scalar(q_flow_staging::SELECT_EDGE_GROUP_RUNNING_COUNT_SQL)
2046                .bind(part)
2047                .bind(flow_uuid)
2048                .bind(downstream_uuid)
2049                .fetch_one(&mut *conn)
2050                .await
2051                .map_err(map_sqlx_error)?;
2052
2053        Ok(ApplyDependencyToChildResult::Applied {
2054            unsatisfied_count: u32::try_from(unsatisfied.max(0)).unwrap_or(0),
2055        })
2056    }
2057    .await;
2058
2059    match work {
2060        Ok(res) => {
2061            commit_or_rollback(&mut conn).await?;
2062            Ok(res)
2063        }
2064        Err(e) => {
2065            rollback_quiet(&mut conn).await;
2066            Err(e)
2067        }
2068    }
2069}
2070
2071fn cancel_policy_to_str(p: CancelFlowPolicy) -> &'static str {
2072    match p {
2073        CancelFlowPolicy::FlowOnly => "cancel_flow_only",
2074        CancelFlowPolicy::CancelAll => "cancel_all",
2075        CancelFlowPolicy::CancelPending => "cancel_pending",
2076        // Forward-compat for additive `CancelFlowPolicy` variants.
2077        // Per the project's non-exhaustive-enum rule (confirmed by
2078        // cursor-bugbot learned-rule #dc768b31; cf. Valkey backend fix
2079        // PR #114), destructive-action wildcards MUST default to the
2080        // LEAST-destructive variant — widening cancel scope
2081        // irreversibly destroys work, while narrowing is safely
2082        // retryable by the caller. The PG reference at
2083        // `ff-backend-postgres/src/flow.rs:525-534` takes the wider
2084        // default; SQLite intentionally diverges here to match
2085        // Valkey's correctness posture.
2086        _ => "cancel_flow_only",
2087    }
2088}
2089
2090async fn cancel_flow_impl(
2091    pool: &SqlitePool,
2092    pubsub: &PubSub,
2093    id: &FlowId,
2094    policy: CancelFlowPolicy,
2095) -> Result<CancelFlowResult, EngineError> {
2096    let part: i64 = 0;
2097    let flow_uuid: Uuid = id.0;
2098    let policy_str = cancel_policy_to_str(policy);
2099    let now_ms = now_ms();
2100
2101    let mut conn = begin_immediate(pool).await?;
2102    let work: Result<(CancelFlowResult, Vec<PendingEmit>), EngineError> = async {
2103        // Step 1 — flip flow_core.
2104        let flip = sqlx::query(q_flow::UPDATE_FLOW_CORE_CANCEL_SQL)
2105            .bind(part)
2106            .bind(flow_uuid)
2107            .bind(now_ms)
2108            .bind(policy_str)
2109            .execute(&mut *conn)
2110            .await
2111            .map_err(map_sqlx_error)?;
2112
2113        if flip.rows_affected() == 0 {
2114            // Flow not found — return idempotent empty-member success
2115            // matching PG at `ff-backend-postgres/src/flow.rs:635-641`.
2116            return Ok((
2117                CancelFlowResult::Cancelled {
2118                    cancellation_policy: policy_str.to_owned(),
2119                    member_execution_ids: Vec::new(),
2120                },
2121                Vec::new(),
2122            ));
2123        }
2124
2125        // Step 2 — enumerate + cancel members.
2126        let member_rows: Vec<Uuid> = if matches!(policy, CancelFlowPolicy::FlowOnly) {
2127            Vec::new()
2128        } else {
2129            let sql = match policy {
2130                CancelFlowPolicy::CancelPending => q_flow::SELECT_FLOW_MEMBERS_CANCEL_PENDING_SQL,
2131                _ => q_flow::SELECT_FLOW_MEMBERS_CANCEL_ALL_SQL,
2132            };
2133            sqlx::query_scalar::<_, Uuid>(sql)
2134                .bind(part)
2135                .bind(flow_uuid)
2136                .fetch_all(&mut *conn)
2137                .await
2138                .map_err(map_sqlx_error)?
2139        };
2140
2141        let mut member_execution_ids: Vec<String> = Vec::with_capacity(member_rows.len());
2142        let mut emits: Vec<PendingEmit> = Vec::new();
2143        for exec_uuid in &member_rows {
2144            sqlx::query(q_flow::UPDATE_EXEC_CORE_CANCEL_MEMBER_SQL)
2145                .bind(part)
2146                .bind(exec_uuid)
2147                .bind(now_ms)
2148                .execute(&mut *conn)
2149                .await
2150                .map_err(map_sqlx_error)?;
2151
2152            // #355: clear the current attempt's `outcome` so a later
2153            // `read_execution_info` doesn't surface a stale
2154            // terminal-outcome on a cancelled row. PG parallel in
2155            // `ff-backend-postgres/src/flow.rs` cancel-member loop.
2156            sqlx::query(q_flow::UPDATE_ATTEMPT_CLEAR_OUTCOME_FOR_CURRENT_SQL)
2157                .bind(part)
2158                .bind(exec_uuid)
2159                .execute(&mut *conn)
2160                .await
2161                .map_err(map_sqlx_error)?;
2162
2163            // Completion outbox + lease-revoked outbox — mirror of PG
2164            // at `ff-backend-postgres/src/flow.rs:688-716`.
2165            let completion_ev =
2166                insert_completion_event_ev(&mut conn, part, *exec_uuid, "cancelled", now_ms)
2167                    .await?;
2168            emits.push((OutboxChannel::Completion, completion_ev));
2169            let lease_ev =
2170                insert_lease_event(&mut conn, part, *exec_uuid, "revoked", now_ms).await?;
2171            emits.push((OutboxChannel::LeaseHistory, lease_ev));
2172
2173            member_execution_ids.push(format!("{{fp:{part}}}:{exec_uuid}"));
2174        }
2175
2176        // Step 3 — CancelPending bookkeeping.
2177        if matches!(policy, CancelFlowPolicy::CancelPending) {
2178            sqlx::query(q_flow::INSERT_PENDING_CANCEL_GROUPS_SQL)
2179                .bind(part)
2180                .bind(flow_uuid)
2181                .bind(now_ms)
2182                .execute(&mut *conn)
2183                .await
2184                .map_err(map_sqlx_error)?;
2185        }
2186
2187        Ok((
2188            CancelFlowResult::Cancelled {
2189                cancellation_policy: policy_str.to_owned(),
2190                member_execution_ids,
2191            },
2192            emits,
2193        ))
2194    }
2195    .await;
2196
2197    match work {
2198        Ok((res, emits)) => {
2199            commit_or_rollback(&mut conn).await?;
2200            dispatch_pending_emits(pubsub, &emits);
2201            Ok(res)
2202        }
2203        Err(e) => {
2204            rollback_quiet(&mut conn).await;
2205            Err(e)
2206        }
2207    }
2208}
2209
2210/// Internal shared state. `Arc<SqliteBackendInner>` is what the
2211/// registry stores weak references to and what `SqliteBackend`
2212/// wraps.
2213pub(crate) struct SqliteBackendInner {
2214    /// Connection pool. Held live even when the trait-object surface
2215    /// isn't exercising it so Phase 2+ can migrate bodies without
2216    /// re-plumbing construction.
2217    #[allow(dead_code)]
2218    pub(crate) pool: SqlitePool,
2219    /// Per-backend post-commit wakeup channels (Phase 2b.1 wiring).
2220    pub(crate) pubsub: PubSub,
2221    /// Registry key (canonical path or verbatim `:memory:` URI).
2222    /// Held for Drop-time cleanup if we need it in a future phase;
2223    /// today the `Weak` entries decay naturally.
2224    #[allow(dead_code)]
2225    pub(crate) key: PathBuf,
2226    /// Sentinel connection for shared-cache `:memory:` databases.
2227    /// SQLite drops a shared-cache in-memory DB the moment the last
2228    /// connection referencing it closes; the pool recycles idle
2229    /// connections, so without a pinned sentinel the schema + data
2230    /// would silently reset between pool checkouts. `None` for
2231    /// filesystem-backed databases where the file itself is the
2232    /// durable backing store.
2233    ///
2234    /// Held in a `Mutex` so `Drop` can take ownership — sqlx's
2235    /// `SqliteConnection::close` is async + consumes `self`, but we
2236    /// don't need graceful close here: process exit drops the
2237    /// in-memory DB regardless. Presence alone is what keeps the
2238    /// shared cache alive.
2239    #[allow(dead_code)]
2240    pub(crate) memory_sentinel: Option<std::sync::Mutex<Option<sqlx::SqliteConnection>>>,
2241    /// RFC-023 Phase 3.5: scanner supervisor handle. Installed at
2242    /// most once per backend instance (registry-shared inner) via
2243    /// [`SqliteBackend::with_scanners`]; drained on
2244    /// `EngineBackend::shutdown_prepare`. `OnceLock` so dedup clones
2245    /// that race on `with_scanners` produce at most one supervisor.
2246    pub(crate) scanner_handle: std::sync::OnceLock<crate::scanner_supervisor::SqliteScannerHandle>,
2247}
2248
2249/// RFC-023 §6.3 — v0.12 atomic Supports flag flip for the SQLite
2250/// dev-only backend.
2251///
2252/// Every flag whose backing trait method shipped in Phase 1-3 flips
2253/// `true` here; two flags remain `false`:
2254///
2255/// - `claim_for_worker` — RFC-023 §5 permanent non-goal (scheduler
2256///   routing is out of scope for the dev-only backend; SqliteBackend
2257///   exposes `claim` via handle but not the scheduler-routed surface).
2258/// - `subscribe_instance_tags` — #311 deferred on all backends;
2259///   cairn's `instance_tag_backfill` is served by `list_executions`
2260///   + `ScannerFilter::with_instance_tag`.
2261///
2262/// Mirrors the `postgres_supports_base()` shape in
2263/// `ff-backend-postgres/src/lib.rs` for consumer-copy-paste parity
2264/// per cairn #277. `Supports` is `#[non_exhaustive]` so we start from
2265/// [`Supports::none`] and mutate named fields.
2266fn sqlite_supports_base() -> Supports {
2267    let mut s = Supports::none();
2268
2269    // ── Flow bulk cancel (Phase 2b.1 Group A) ──
2270    // SqliteBackend::cancel_flow is always synchronous under the
2271    // single-writer model: every member flips in the same transaction
2272    // as the header. Both wait axes are callable (the `_wait` arg is
2273    // ignored — the result is always `Cancelled {..}` immediately).
2274    s.cancel_flow_wait_timeout = true;
2275    s.cancel_flow_wait_indefinite = true;
2276
2277    // ── Admin seed + rotate HMAC (Phase 2b.1) ──
2278    s.rotate_waitpoint_hmac_secret_all = true;
2279    s.seed_waitpoint_hmac_secret = true;
2280
2281    // ── RFC-019 subscriptions (Phase 3.1) ──
2282    s.subscribe_lease_history = true;
2283    s.subscribe_completion = true;
2284    s.subscribe_signal_delivery = true;
2285
2286    // ── Streaming (Phase 2b.2.2) ──
2287    s.stream_durable_summary = true;
2288    s.stream_best_effort_live = true;
2289
2290    // ── Boot ──
2291    // SqliteBackend::prepare() returns NoOp (migrations run inside
2292    // `SqliteBackend::new`, matching the PG posture); NoOp is a
2293    // callable + correct outcome, not Unavailable.
2294    s.prepare = true;
2295
2296    // ── Wave 9 (Phase 3.2-3.5) ──
2297    s.cancel_execution = true;
2298    s.change_priority = true;
2299    s.replay_execution = true;
2300    s.revoke_lease = true;
2301    s.read_execution_state = true;
2302    s.read_execution_info = true;
2303    s.get_execution_result = true;
2304    s.budget_admin = true;
2305    s.quota_admin = true;
2306    s.list_pending_waitpoints = true;
2307    s.cancel_flow_header = true;
2308    s.ack_cancel_member = true;
2309
2310    // ── RFC-025 worker registry (Phase 4) ──
2311    // SQLite bodies live in `crate::worker_registry`; all five flip
2312    // on here, matching the PG Phase-3 posture.
2313    s.register_worker = true;
2314    s.heartbeat_worker = true;
2315    s.mark_worker_dead = true;
2316    s.list_expired_leases = true;
2317    s.list_workers = true;
2318
2319    // ── FF #511 Phase 1 scheduler primitive ──
2320    s.release_admission = true;
2321    // ── FF #511 Phase 2a scheduler primitive ──
2322    s.read_quota_policy_limits = true;
2323
2324    // ── Stay `false` (see struct-level rustdoc above) ──
2325    // s.claim_for_worker — RFC-023 §5 non-goal
2326    // s.subscribe_instance_tags — #311 all-backends
2327
2328    s
2329}
2330
2331/// RFC-023 SQLite dev-only backend.
2332///
2333/// Construction demands `FF_DEV_MODE=1` (§4.5). Identical paths
2334/// within a process return the same handle via the §4.2 B6
2335/// registry.
2336#[derive(Clone)]
2337pub struct SqliteBackend {
2338    inner: Arc<SqliteBackendInner>,
2339}
2340
2341impl std::fmt::Debug for SqliteBackend {
2342    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2343        f.debug_struct("SqliteBackend")
2344            .field("key", &self.inner.key)
2345            .finish()
2346    }
2347}
2348
2349impl SqliteBackend {
2350    /// RFC-023 Phase 1a entry point. `path` accepts a filesystem
2351    /// path, `:memory:`, or a `file:...?mode=memory&cache=shared`
2352    /// URI.
2353    ///
2354    /// Uses the [`SqliteServerConfig`] defaults (pool size 4, WAL on
2355    /// for file paths). For operator-tuned pool/WAL settings, call
2356    /// [`SqliteBackend::new_with_config`].
2357    ///
2358    /// [`SqliteServerConfig`]: ff_server::config::SqliteServerConfig
2359    ///
2360    /// # Errors
2361    ///
2362    /// * [`BackendError::RequiresDevMode`] when `FF_DEV_MODE` is
2363    ///   unset or not `"1"`.
2364    /// * [`BackendError::Valkey`] (historical name — the classifier
2365    ///   is backend-agnostic despite the variant name) when the
2366    ///   pool cannot be constructed.
2367    pub async fn new(path: &str) -> Result<Arc<Self>, BackendError> {
2368        Self::new_with_tuning(path, 4, true).await
2369    }
2370
2371    /// Operator-tuned entry point. `pool_size` sets the pool's max
2372    /// connections; `wal_mode` enables `PRAGMA journal_mode=WAL` for
2373    /// filesystem-backed databases (ignored for `:memory:` variants
2374    /// per RFC-023 §4.6).
2375    pub async fn new_with_tuning(
2376        path: &str,
2377        pool_size: u32,
2378        wal_mode: bool,
2379    ) -> Result<Arc<Self>, BackendError> {
2380        // §4.5 production guard — TYPE-level emission point (§3.3 A3).
2381        if std::env::var("FF_DEV_MODE").as_deref() != Ok("1") {
2382            return Err(BackendError::RequiresDevMode);
2383        }
2384
2385        // §4.2 B6: canonicalize the key. `:memory:` and
2386        // `file::memory:...` pass through verbatim (distinct per-URI
2387        // entries via embedded UUIDs). Filesystem paths resolve via
2388        // `fs::canonicalize` when the file exists; absent files fall
2389        // back to the raw path so two concurrent constructions before
2390        // file creation still dedup.
2391        //
2392        // F1: bare `:memory:` is rewritten to
2393        // `file::memory:?cache=shared` so a multi-connection pool shares
2394        // ONE in-memory database. Without this rewrite, each pool
2395        // connection opens its own private DB and tests see schema
2396        // mismatches silently. (sqlx infers URI mode from the `file:`
2397        // prefix, so no explicit `uri=true` query parameter is needed.)
2398        let is_memory = is_memory_uri(path);
2399        let effective_path: std::borrow::Cow<'_, str> = if path == ":memory:" {
2400            std::borrow::Cow::Borrowed("file::memory:?cache=shared")
2401        } else {
2402            std::borrow::Cow::Borrowed(path)
2403        };
2404
2405        let key = if is_memory {
2406            PathBuf::from(effective_path.as_ref())
2407        } else {
2408            std::fs::canonicalize(path).unwrap_or_else(|_| PathBuf::from(path))
2409        };
2410
2411        if let Some(existing) = registry::lookup(&key) {
2412            // F6: emit WARN only on first-time construction. Registry
2413            // hits are dedup clones; operators already saw the banner
2414            // when the original handle was built.
2415            return Ok(Arc::new(Self { inner: existing }));
2416        }
2417
2418        // Build the pool. sqlx's SqliteConnectOptions parses the full
2419        // URI form as well as plain paths. `create_if_missing` is
2420        // what embedded-test consumers expect.
2421        let opts: SqliteConnectOptions = effective_path
2422            .parse::<SqliteConnectOptions>()
2423            .map_err(|e| BackendError::Valkey {
2424                kind: ff_core::engine_error::BackendErrorKind::Protocol,
2425                message: format!("sqlite connect-opts parse for {path:?}: {e}"),
2426            })?
2427            .create_if_missing(true);
2428
2429        // F2: apply WAL for filesystem-backed DBs only. SQLite's WAL
2430        // is a no-op (and warns) for `:memory:` variants per RFC §4.6.
2431        let opts = if wal_mode && !is_memory {
2432            opts.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
2433        } else {
2434            opts
2435        };
2436
2437        // F2: pool size from config, default 4. Minimum 1 — sqlx
2438        // rejects 0 at pool-build time anyway.
2439        let pool_max = pool_size.max(1);
2440        let pool = SqlitePoolOptions::new()
2441            .max_connections(pool_max)
2442            .connect_with(opts.clone())
2443            .await
2444            .map_err(|e| BackendError::Valkey {
2445                kind: ff_core::engine_error::BackendErrorKind::Transport,
2446                message: format!("sqlite pool connect for {path:?}: {e}"),
2447            })?;
2448
2449        // F1: for shared-cache `:memory:` DBs, open a standalone
2450        // sentinel connection and hold it for the `Arc`'s lifetime.
2451        // The shared cache is torn down the moment the last connection
2452        // closes; without the sentinel, a pool-idle cycle (all 4
2453        // connections temporarily returned) would drop the DB between
2454        // test assertions.
2455        let memory_sentinel = if is_memory {
2456            use sqlx::ConnectOptions;
2457            let conn = opts.connect().await.map_err(|e| BackendError::Valkey {
2458                kind: ff_core::engine_error::BackendErrorKind::Transport,
2459                message: format!("sqlite sentinel connect for {path:?}: {e}"),
2460            })?;
2461            Some(std::sync::Mutex::new(Some(conn)))
2462        } else {
2463            None
2464        };
2465
2466        // F6: §3.3 WARN banner — now emitted AFTER registry-miss is
2467        // confirmed so dedup clones don't spam the log.
2468        tracing::warn!(
2469            "FlowFabric SQLite backend active (FF_DEV_MODE=1). \
2470             This backend is dev-only; single-writer, single-process, \
2471             not supported in production. See RFC-023."
2472        );
2473
2474        // RFC-023 Phase 1b: apply the 14 hand-ported SQLite-dialect
2475        // migrations against the freshly-constructed pool. `sqlx::migrate!`
2476        // embeds the files at compile time and records applied versions
2477        // in `_sqlx_migrations` so reruns are idempotent.
2478        sqlx::migrate!("./migrations")
2479            .run(&pool)
2480            .await
2481            .map_err(|e| BackendError::Valkey {
2482                kind: ff_core::engine_error::BackendErrorKind::Protocol,
2483                message: format!("sqlite migrate for {path:?}: {e}"),
2484            })?;
2485
2486        let inner = Arc::new(SqliteBackendInner {
2487            pool,
2488            pubsub: PubSub::new(),
2489            key: key.clone(),
2490            memory_sentinel,
2491            scanner_handle: std::sync::OnceLock::new(),
2492        });
2493        let inner = registry::insert(key, inner);
2494        Ok(Arc::new(Self { inner }))
2495    }
2496
2497    /// Accessor for Phase 2+ code that needs direct pool access
2498    /// without re-routing through the trait surface.
2499    #[allow(dead_code)]
2500    pub(crate) fn pool(&self) -> &SqlitePool {
2501        &self.inner.pool
2502    }
2503
2504    /// RFC-023 Phase 3.5: spawn the N=1 scanner supervisor
2505    /// (currently `budget_reset` only) as a background tick loop.
2506    /// Idempotent: the first caller wins; subsequent calls on the
2507    /// same registry-shared backend no-op. Drained on
2508    /// [`EngineBackend::shutdown_prepare`].
2509    ///
2510    /// Returns `true` if this call installed the supervisor,
2511    /// `false` if a supervisor was already present.
2512    pub fn with_scanners(&self, cfg: crate::scanner_supervisor::SqliteScannerConfig) -> bool {
2513        // Build outside the `OnceLock::set` call so we only spawn
2514        // tasks if we actually win the race to install.
2515        let mut result = false;
2516        let _ = self.inner.scanner_handle.get_or_init(|| {
2517            result = true;
2518            crate::scanner_supervisor::spawn_scanners(self.inner.pool.clone(), cfg)
2519        });
2520        result
2521    }
2522
2523    /// Test-only hook to drive the `budget_reset` reconciler
2524    /// synchronously against a fixed `now`. Hidden from rustdoc;
2525    /// exists so Phase 3.5 integration tests can verify reconciler
2526    /// semantics without waiting on wall-clock cadence ticks.
2527    #[doc(hidden)]
2528    pub async fn budget_reset_scan_tick_for_test(
2529        &self,
2530        now_ms: i64,
2531    ) -> Result<(u32, u32), EngineError> {
2532        let report = crate::reconcilers::budget_reset::scan_tick(&self.inner.pool, now_ms).await?;
2533        Ok((report.processed, report.errors))
2534    }
2535
2536    /// Test-only pool accessor. Hidden from rustdoc; not a stable
2537    /// API. Exists so the in-crate integration tests can verify
2538    /// pool-level behaviour (F1 shared-cache sentinel) without
2539    /// waiting for Phase 2 data-plane methods to land.
2540    #[doc(hidden)]
2541    pub fn pool_for_test(&self) -> &SqlitePool {
2542        &self.inner.pool
2543    }
2544
2545    /// Test-only subscribe helper — returns a `Receiver` for the
2546    /// completion-outbox broadcast channel so integration tests can
2547    /// assert that a `complete()` / `fail()` / `cancel_flow()` call
2548    /// wakes subscribers post-commit. The production `subscribe_*`
2549    /// surface lands in Phase 2b.2; this accessor is narrow + hidden
2550    /// so it doesn't leak into the public API.
2551    #[doc(hidden)]
2552    pub fn subscribe_completion_for_test(
2553        &self,
2554    ) -> tokio::sync::broadcast::Receiver<crate::pubsub::OutboxEvent> {
2555        self.inner.pubsub.completion.subscribe()
2556    }
2557
2558    /// Test-only broadcast accessor for the `stream_frame` channel.
2559    /// Exposed so Phase 2b.2.2 `outbox_cursor::tests` can subscribe
2560    /// before driving `append_frame`.
2561    #[doc(hidden)]
2562    #[cfg(test)]
2563    pub(crate) fn stream_frame_receiver_for_test(
2564        &self,
2565    ) -> tokio::sync::broadcast::Receiver<crate::pubsub::OutboxEvent> {
2566        self.inner.pubsub.stream_frame.subscribe()
2567    }
2568}
2569
2570#[async_trait]
2571impl EngineBackend for SqliteBackend {
2572    // ── Lifecycle ──
2573
2574    /// RFC-023 Phase 3.5: drain the scanner supervisor (if
2575    /// installed) up to `grace`. Matches the PG backend's
2576    /// `shutdown_prepare` contract — bounded best-effort drain,
2577    /// never returns an error.
2578    async fn shutdown_prepare(&self, grace: Duration) -> Result<(), EngineError> {
2579        if let Some(handle) = self.inner.scanner_handle.get() {
2580            let timed_out = handle.shutdown(grace).await;
2581            if timed_out > 0 {
2582                tracing::warn!(
2583                    timed_out,
2584                    ?grace,
2585                    "sqlite scanner supervisor exceeded grace on shutdown"
2586                );
2587            }
2588        }
2589        Ok(())
2590    }
2591
2592    // ── Claim + lifecycle ──
2593
2594    async fn claim(
2595        &self,
2596        lane: &LaneId,
2597        capabilities: &CapabilitySet,
2598        policy: ClaimPolicy,
2599    ) -> Result<Option<Handle>, EngineError> {
2600        let pool = &self.inner.pool;
2601        let pubsub = &self.inner.pubsub;
2602        retry_serializable(|| claim_impl(pool, pubsub, lane, capabilities, &policy)).await
2603    }
2604
2605    async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError> {
2606        let pool = &self.inner.pool;
2607        let pubsub = &self.inner.pubsub;
2608        retry_serializable(|| renew_impl(pool, pubsub, handle)).await
2609    }
2610
2611    // ── PR-7b / #453: typed-FCALL trait methods ──
2612
2613    async fn renew_lease(
2614        &self,
2615        args: ff_core::contracts::RenewLeaseArgs,
2616    ) -> Result<ff_core::contracts::RenewLeaseResult, EngineError> {
2617        let pool = &self.inner.pool;
2618        let pubsub = &self.inner.pubsub;
2619        retry_serializable(|| crate::typed_ops::renew_lease(pool, pubsub, args.clone())).await
2620    }
2621
2622    async fn complete_execution(
2623        &self,
2624        args: ff_core::contracts::CompleteExecutionArgs,
2625    ) -> Result<ff_core::contracts::CompleteExecutionResult, EngineError> {
2626        let pool = &self.inner.pool;
2627        let pubsub = &self.inner.pubsub;
2628        retry_serializable(|| crate::typed_ops::complete_execution(pool, pubsub, args.clone()))
2629            .await
2630    }
2631
2632    async fn fail_execution(
2633        &self,
2634        args: ff_core::contracts::FailExecutionArgs,
2635    ) -> Result<ff_core::contracts::FailExecutionResult, EngineError> {
2636        let pool = &self.inner.pool;
2637        let pubsub = &self.inner.pubsub;
2638        retry_serializable(|| crate::typed_ops::fail_execution(pool, pubsub, args.clone())).await
2639    }
2640
2641    async fn resume_execution(
2642        &self,
2643        args: ff_core::contracts::ResumeExecutionArgs,
2644    ) -> Result<ff_core::contracts::ResumeExecutionResult, EngineError> {
2645        let pool = &self.inner.pool;
2646        let pubsub = &self.inner.pubsub;
2647        retry_serializable(|| crate::typed_ops::resume_execution(pool, pubsub, args.clone())).await
2648    }
2649
2650    async fn evaluate_flow_eligibility(
2651        &self,
2652        args: ff_core::contracts::EvaluateFlowEligibilityArgs,
2653    ) -> Result<ff_core::contracts::EvaluateFlowEligibilityResult, EngineError> {
2654        // Read-only; no retry_serializable needed.
2655        crate::typed_ops::evaluate_flow_eligibility(&self.inner.pool, args).await
2656    }
2657
2658    async fn claim_execution(
2659        &self,
2660        args: ff_core::contracts::ClaimExecutionArgs,
2661    ) -> Result<ff_core::contracts::ClaimExecutionResult, EngineError> {
2662        let pool = &self.inner.pool;
2663        let pubsub = &self.inner.pubsub;
2664        let pc = ff_core::partition::PartitionConfig::default();
2665        retry_serializable(|| {
2666            crate::typed_ops::claim_execution(pool, &pc, pubsub, args.clone())
2667        })
2668        .await
2669    }
2670
2671    async fn check_admission(
2672        &self,
2673        quota_policy_id: &ff_core::types::QuotaPolicyId,
2674        _dimension: &str,
2675        args: ff_core::contracts::CheckAdmissionArgs,
2676    ) -> Result<ff_core::contracts::CheckAdmissionResult, EngineError> {
2677        let pool = &self.inner.pool;
2678        // `partition_config` is ignored inside the body on SQLite
2679        // (single-writer, partition_key=0); accepted for cross-backend
2680        // signature parity. See typed_ops::check_admission rustdoc.
2681        let pc = ff_core::partition::PartitionConfig::default();
2682        retry_serializable(|| {
2683            crate::typed_ops::check_admission(pool, &pc, quota_policy_id, args.clone())
2684        })
2685        .await
2686    }
2687
2688    async fn progress(
2689        &self,
2690        handle: &Handle,
2691        percent: Option<u8>,
2692        message: Option<String>,
2693    ) -> Result<(), EngineError> {
2694        let pool = &self.inner.pool;
2695        retry_serializable(|| progress_impl(pool, handle, percent, message.clone())).await
2696    }
2697
2698    async fn append_frame(
2699        &self,
2700        handle: &Handle,
2701        frame: Frame,
2702    ) -> Result<AppendFrameOutcome, EngineError> {
2703        let pool = &self.inner.pool;
2704        let pubsub = &self.inner.pubsub;
2705        retry_serializable(|| append_frame_impl(pool, pubsub, handle, frame.clone())).await
2706    }
2707
2708    async fn complete(&self, handle: &Handle, payload: Option<Vec<u8>>) -> Result<(), EngineError> {
2709        let pool = &self.inner.pool;
2710        let pubsub = &self.inner.pubsub;
2711        retry_serializable(|| complete_impl(pool, pubsub, handle, payload.clone())).await
2712    }
2713
2714    async fn fail(
2715        &self,
2716        handle: &Handle,
2717        reason: FailureReason,
2718        classification: FailureClass,
2719    ) -> Result<FailOutcome, EngineError> {
2720        let pool = &self.inner.pool;
2721        let pubsub = &self.inner.pubsub;
2722        retry_serializable(|| fail_impl(pool, pubsub, handle, reason.clone(), classification)).await
2723    }
2724
2725    async fn cancel(&self, _handle: &Handle, _reason: &str) -> Result<(), EngineError> {
2726        unavailable("sqlite.cancel")
2727    }
2728
2729    async fn suspend(
2730        &self,
2731        handle: &Handle,
2732        args: SuspendArgs,
2733    ) -> Result<SuspendOutcome, EngineError> {
2734        let pool = &self.inner.pool;
2735        let pubsub = &self.inner.pubsub;
2736        retry_serializable(|| crate::suspend_ops::suspend_impl(pool, pubsub, handle, args.clone()))
2737            .await
2738    }
2739
2740    async fn suspend_by_triple(
2741        &self,
2742        exec_id: ExecutionId,
2743        triple: LeaseFence,
2744        args: SuspendArgs,
2745    ) -> Result<SuspendOutcome, EngineError> {
2746        let pool = &self.inner.pool;
2747        let pubsub = &self.inner.pubsub;
2748        retry_serializable(|| {
2749            crate::suspend_ops::suspend_by_triple_impl(
2750                pool,
2751                pubsub,
2752                exec_id.clone(),
2753                triple.clone(),
2754                args.clone(),
2755            )
2756        })
2757        .await
2758    }
2759
2760    async fn create_waitpoint(
2761        &self,
2762        handle: &Handle,
2763        waitpoint_key: &str,
2764        expires_in: Duration,
2765    ) -> Result<PendingWaitpoint, EngineError> {
2766        let pool = &self.inner.pool;
2767        retry_serializable(|| {
2768            crate::suspend_ops::create_waitpoint_impl(pool, handle, waitpoint_key, expires_in)
2769        })
2770        .await
2771    }
2772
2773    #[cfg(feature = "core")]
2774    async fn read_waitpoint_token(
2775        &self,
2776        partition: PartitionKey,
2777        waitpoint_id: &ff_core::types::WaitpointId,
2778    ) -> Result<Option<String>, EngineError> {
2779        crate::reads::read_waitpoint_token_impl(&self.inner.pool, &partition, waitpoint_id).await
2780    }
2781
2782    async fn observe_signals(&self, handle: &Handle) -> Result<Vec<ResumeSignal>, EngineError> {
2783        let pool = &self.inner.pool;
2784        retry_serializable(|| crate::suspend_ops::observe_signals_impl(pool, handle)).await
2785    }
2786
2787    async fn claim_from_resume_grant(&self, token: ResumeToken) -> Result<Option<Handle>, EngineError> {
2788        let pool = &self.inner.pool;
2789        let pubsub = &self.inner.pubsub;
2790        retry_serializable(|| claim_from_reclaim_impl(pool, pubsub, &token)).await
2791    }
2792
2793    async fn issue_reclaim_grant(
2794        &self,
2795        args: IssueReclaimGrantArgs,
2796    ) -> Result<IssueReclaimGrantOutcome, EngineError> {
2797        let pool = &self.inner.pool;
2798        retry_serializable(|| crate::reclaim::issue_reclaim_grant_impl(pool, &args)).await
2799    }
2800
2801    async fn reclaim_execution(
2802        &self,
2803        args: ReclaimExecutionArgs,
2804    ) -> Result<ReclaimExecutionOutcome, EngineError> {
2805        let pool = &self.inner.pool;
2806        let pubsub = &self.inner.pubsub;
2807        retry_serializable(|| crate::reclaim::reclaim_execution_impl(pool, pubsub, &args)).await
2808    }
2809
2810    async fn delay(&self, _handle: &Handle, _delay_until: TimestampMs) -> Result<(), EngineError> {
2811        unavailable("sqlite.delay")
2812    }
2813
2814    async fn wait_children(&self, _handle: &Handle) -> Result<(), EngineError> {
2815        unavailable("sqlite.wait_children")
2816    }
2817
2818    // ── Read / admin ──
2819
2820    async fn describe_execution(
2821        &self,
2822        _id: &ExecutionId,
2823    ) -> Result<Option<ExecutionSnapshot>, EngineError> {
2824        unavailable("sqlite.describe_execution")
2825    }
2826
2827    async fn read_execution_context(
2828        &self,
2829        execution_id: &ExecutionId,
2830    ) -> Result<ExecutionContext, EngineError> {
2831        crate::reads::read_execution_context_impl(&self.inner.pool, execution_id).await
2832    }
2833
2834    async fn read_current_attempt_index(
2835        &self,
2836        execution_id: &ExecutionId,
2837    ) -> Result<ff_core::types::AttemptIndex, EngineError> {
2838        crate::reads::read_current_attempt_index_impl(&self.inner.pool, execution_id).await
2839    }
2840
2841    async fn read_total_attempt_count(
2842        &self,
2843        execution_id: &ExecutionId,
2844    ) -> Result<ff_core::types::AttemptIndex, EngineError> {
2845        crate::reads::read_total_attempt_count_impl(&self.inner.pool, execution_id).await
2846    }
2847
2848    async fn describe_flow(&self, _id: &FlowId) -> Result<Option<FlowSnapshot>, EngineError> {
2849        unavailable("sqlite.describe_flow")
2850    }
2851
2852    async fn set_execution_tag(
2853        &self,
2854        execution_id: &ExecutionId,
2855        key: &str,
2856        value: &str,
2857    ) -> Result<(), EngineError> {
2858        ff_core::engine_backend::validate_tag_key(key)?;
2859        crate::reads::set_execution_tag_impl(&self.inner.pool, execution_id, key, value).await
2860    }
2861
2862    async fn set_flow_tag(
2863        &self,
2864        flow_id: &FlowId,
2865        key: &str,
2866        value: &str,
2867    ) -> Result<(), EngineError> {
2868        ff_core::engine_backend::validate_tag_key(key)?;
2869        crate::reads::set_flow_tag_impl(&self.inner.pool, flow_id, key, value).await
2870    }
2871
2872    async fn get_execution_tag(
2873        &self,
2874        execution_id: &ExecutionId,
2875        key: &str,
2876    ) -> Result<Option<String>, EngineError> {
2877        ff_core::engine_backend::validate_tag_key(key)?;
2878        crate::reads::get_execution_tag_impl(&self.inner.pool, execution_id, key).await
2879    }
2880
2881    async fn get_flow_tag(
2882        &self,
2883        flow_id: &FlowId,
2884        key: &str,
2885    ) -> Result<Option<String>, EngineError> {
2886        ff_core::engine_backend::validate_tag_key(key)?;
2887        crate::reads::get_flow_tag_impl(&self.inner.pool, flow_id, key).await
2888    }
2889
2890    async fn get_execution_namespace(
2891        &self,
2892        execution_id: &ExecutionId,
2893    ) -> Result<Option<String>, EngineError> {
2894        crate::reads::get_execution_namespace_impl(&self.inner.pool, execution_id).await
2895    }
2896
2897    #[cfg(feature = "core")]
2898    async fn list_edges(
2899        &self,
2900        _flow_id: &FlowId,
2901        _direction: EdgeDirection,
2902    ) -> Result<Vec<EdgeSnapshot>, EngineError> {
2903        unavailable("sqlite.list_edges")
2904    }
2905
2906    #[cfg(feature = "core")]
2907    async fn describe_edge(
2908        &self,
2909        _flow_id: &FlowId,
2910        _edge_id: &EdgeId,
2911    ) -> Result<Option<EdgeSnapshot>, EngineError> {
2912        unavailable("sqlite.describe_edge")
2913    }
2914
2915    #[cfg(feature = "core")]
2916    async fn resolve_execution_flow_id(
2917        &self,
2918        _eid: &ExecutionId,
2919    ) -> Result<Option<FlowId>, EngineError> {
2920        unavailable("sqlite.resolve_execution_flow_id")
2921    }
2922
2923    #[cfg(feature = "core")]
2924    async fn list_flows(
2925        &self,
2926        _partition: PartitionKey,
2927        _cursor: Option<FlowId>,
2928        _limit: usize,
2929    ) -> Result<ListFlowsPage, EngineError> {
2930        unavailable("sqlite.list_flows")
2931    }
2932
2933    #[cfg(feature = "core")]
2934    async fn list_lanes(
2935        &self,
2936        _cursor: Option<LaneId>,
2937        _limit: usize,
2938    ) -> Result<ListLanesPage, EngineError> {
2939        unavailable("sqlite.list_lanes")
2940    }
2941
2942    #[cfg(feature = "core")]
2943    async fn list_suspended(
2944        &self,
2945        _partition: PartitionKey,
2946        _cursor: Option<ExecutionId>,
2947        _limit: usize,
2948    ) -> Result<ListSuspendedPage, EngineError> {
2949        unavailable("sqlite.list_suspended")
2950    }
2951
2952    #[cfg(feature = "core")]
2953    async fn list_executions(
2954        &self,
2955        _partition: PartitionKey,
2956        _cursor: Option<ExecutionId>,
2957        _limit: usize,
2958    ) -> Result<ListExecutionsPage, EngineError> {
2959        unavailable("sqlite.list_executions")
2960    }
2961
2962    #[cfg(feature = "core")]
2963    async fn deliver_signal(
2964        &self,
2965        args: DeliverSignalArgs,
2966    ) -> Result<DeliverSignalResult, EngineError> {
2967        let pool = &self.inner.pool;
2968        let pubsub = &self.inner.pubsub;
2969        retry_serializable(|| crate::suspend_ops::deliver_signal_impl(pool, pubsub, args.clone()))
2970            .await
2971    }
2972
2973    #[cfg(feature = "core")]
2974    async fn claim_resumed_execution(
2975        &self,
2976        args: ClaimResumedExecutionArgs,
2977    ) -> Result<ClaimResumedExecutionResult, EngineError> {
2978        let pool = &self.inner.pool;
2979        let pubsub = &self.inner.pubsub;
2980        retry_serializable(|| {
2981            crate::suspend_ops::claim_resumed_execution_impl(pool, pubsub, args.clone())
2982        })
2983        .await
2984    }
2985
2986    async fn cancel_flow(
2987        &self,
2988        id: &FlowId,
2989        policy: CancelFlowPolicy,
2990        _wait: CancelFlowWait,
2991    ) -> Result<CancelFlowResult, EngineError> {
2992        // RFC-023 Phase 2b.1 Group A — classic cancel_flow only. The
2993        // `wait` axis is a Valkey/PG async-dispatch concern (member
2994        // cancel fan-out); under single-writer SQLite every member
2995        // flip happens in the same transaction as the header flip, so
2996        // the result is always synchronous `Cancelled {..}`.
2997        let pool = &self.inner.pool;
2998        let pubsub = &self.inner.pubsub;
2999        retry_serializable(|| cancel_flow_impl(pool, pubsub, id, policy)).await
3000    }
3001
3002    #[cfg(feature = "core")]
3003    async fn set_edge_group_policy(
3004        &self,
3005        _flow_id: &FlowId,
3006        _downstream_execution_id: &ExecutionId,
3007        _policy: EdgeDependencyPolicy,
3008    ) -> Result<SetEdgeGroupPolicyResult, EngineError> {
3009        unavailable("sqlite.set_edge_group_policy")
3010    }
3011
3012    // ── RFC-020 Wave 9 — Budget + quota admin (Phase 3.4) ───────────
3013    //
3014    // Five admin methods (§4.4.1-§4.4.7) + `report_usage` hot-path are
3015    // extended to maintain the 0013 breach-counter columns
3016    // incrementally (RFC-020 Rev 6 §7.2 pin-lift). All write paths run
3017    // under `BEGIN IMMEDIATE` + `retry_serializable`; the single-writer
3018    // envelope replaces PG's `FOR NO KEY UPDATE` lock discipline.
3019
3020    async fn report_usage(
3021        &self,
3022        _handle: &Handle,
3023        budget: &BudgetId,
3024        dimensions: UsageDimensions,
3025    ) -> Result<ReportUsageResult, EngineError> {
3026        crate::budget::report_usage_impl(&self.inner.pool, budget, dimensions).await
3027    }
3028
3029    #[cfg(feature = "core")]
3030    async fn create_budget(
3031        &self,
3032        args: CreateBudgetArgs,
3033    ) -> Result<CreateBudgetResult, EngineError> {
3034        crate::budget::create_budget_impl(&self.inner.pool, args).await
3035    }
3036
3037    #[cfg(feature = "core")]
3038    async fn reset_budget(
3039        &self,
3040        args: ResetBudgetArgs,
3041    ) -> Result<ResetBudgetResult, EngineError> {
3042        crate::budget::reset_budget_impl(&self.inner.pool, args).await
3043    }
3044
3045    #[cfg(feature = "core")]
3046    async fn create_quota_policy(
3047        &self,
3048        args: CreateQuotaPolicyArgs,
3049    ) -> Result<CreateQuotaPolicyResult, EngineError> {
3050        crate::budget::create_quota_policy_impl(&self.inner.pool, args).await
3051    }
3052
3053    #[cfg(feature = "core")]
3054    async fn get_budget_status(
3055        &self,
3056        id: &BudgetId,
3057    ) -> Result<BudgetStatus, EngineError> {
3058        crate::budget::get_budget_status_impl(&self.inner.pool, id).await
3059    }
3060
3061    #[cfg(feature = "core")]
3062    async fn report_usage_admin(
3063        &self,
3064        budget_id: &BudgetId,
3065        args: ReportUsageAdminArgs,
3066    ) -> Result<ReportUsageResult, EngineError> {
3067        crate::budget::report_usage_admin_impl(&self.inner.pool, budget_id, args).await
3068    }
3069
3070    // ── cairn #454 Phase 5 — typed-FCALL bodies for SQLite ──────────
3071
3072    #[cfg(feature = "core")]
3073    async fn record_spend(
3074        &self,
3075        args: ff_core::contracts::RecordSpendArgs,
3076    ) -> Result<ReportUsageResult, EngineError> {
3077        let pool = &self.inner.pool;
3078        retry_serializable(|| crate::typed_ops::record_spend(pool, args.clone())).await
3079    }
3080
3081    #[cfg(feature = "core")]
3082    async fn release_budget(
3083        &self,
3084        args: ff_core::contracts::ReleaseBudgetArgs,
3085    ) -> Result<(), EngineError> {
3086        let pool = &self.inner.pool;
3087        retry_serializable(|| crate::typed_ops::release_budget(pool, args.clone())).await
3088    }
3089
3090    #[cfg(feature = "core")]
3091    async fn release_admission(
3092        &self,
3093        args: ff_core::contracts::ReleaseAdmissionArgs,
3094    ) -> Result<ff_core::contracts::ReleaseAdmissionResult, EngineError> {
3095        let pool = &self.inner.pool;
3096        retry_serializable(|| crate::typed_ops::release_admission(pool, args.clone())).await
3097    }
3098
3099    #[cfg(feature = "core")]
3100    async fn read_quota_policy_limits(
3101        &self,
3102        quota_policy_id: &ff_core::types::QuotaPolicyId,
3103    ) -> Result<Option<ff_core::contracts::QuotaPolicyLimits>, EngineError> {
3104        let pool = &self.inner.pool;
3105        crate::typed_ops::read_quota_policy_limits(pool, quota_policy_id).await
3106    }
3107
3108    #[cfg(feature = "core")]
3109    async fn deliver_approval_signal(
3110        &self,
3111        args: ff_core::contracts::DeliverApprovalSignalArgs,
3112    ) -> Result<ff_core::contracts::DeliverSignalResult, EngineError> {
3113        let pool = &self.inner.pool;
3114        let pubsub = &self.inner.pubsub;
3115        retry_serializable(|| {
3116            crate::typed_ops::deliver_approval_signal(pool, pubsub, args.clone())
3117        })
3118        .await
3119    }
3120
3121    #[cfg(feature = "core")]
3122    async fn issue_grant_and_claim(
3123        &self,
3124        args: ff_core::contracts::IssueGrantAndClaimArgs,
3125    ) -> Result<ff_core::contracts::ClaimGrantOutcome, EngineError> {
3126        let pool = &self.inner.pool;
3127        let pubsub = &self.inner.pubsub;
3128        retry_serializable(|| crate::typed_ops::issue_grant_and_claim(pool, pubsub, args.clone()))
3129            .await
3130    }
3131
3132    #[cfg(feature = "streaming")]
3133    async fn read_stream(
3134        &self,
3135        execution_id: &ExecutionId,
3136        attempt_index: AttemptIndex,
3137        from: StreamCursor,
3138        to: StreamCursor,
3139        count_limit: u64,
3140    ) -> Result<StreamFrames, EngineError> {
3141        let pool = &self.inner.pool;
3142        read_stream_impl(pool, execution_id, attempt_index, from, to, count_limit).await
3143    }
3144
3145    #[cfg(feature = "streaming")]
3146    async fn tail_stream(
3147        &self,
3148        execution_id: &ExecutionId,
3149        attempt_index: AttemptIndex,
3150        after: StreamCursor,
3151        block_ms: u64,
3152        count_limit: u64,
3153        visibility: TailVisibility,
3154    ) -> Result<StreamFrames, EngineError> {
3155        let pool = &self.inner.pool;
3156        let pubsub = &self.inner.pubsub;
3157        tail_stream_impl(
3158            pool,
3159            pubsub,
3160            execution_id,
3161            attempt_index,
3162            after,
3163            block_ms,
3164            count_limit,
3165            visibility,
3166        )
3167        .await
3168    }
3169
3170    #[cfg(feature = "streaming")]
3171    async fn read_summary(
3172        &self,
3173        execution_id: &ExecutionId,
3174        attempt_index: AttemptIndex,
3175    ) -> Result<Option<SummaryDocument>, EngineError> {
3176        let pool = &self.inner.pool;
3177        read_summary_impl(pool, execution_id, attempt_index).await
3178    }
3179
3180    // ── RFC-017 Stage A — Ingress (create + flow staging) ──
3181    //
3182    // Phase 2b.1 Group A lands 5 of the 9 ingress methods. The
3183    // remaining 4 (cancel_execution / change_priority /
3184    // replay_execution / plus the operator-event reads) land in
3185    // Phase 2b.2 alongside the Group B/C/D.2 scope.
3186
3187    #[cfg(feature = "core")]
3188    async fn create_execution(
3189        &self,
3190        args: CreateExecutionArgs,
3191    ) -> Result<CreateExecutionResult, EngineError> {
3192        let pool = &self.inner.pool;
3193        retry_serializable(|| create_execution_impl(pool, &args)).await
3194    }
3195
3196    #[cfg(feature = "core")]
3197    async fn create_flow(&self, args: CreateFlowArgs) -> Result<CreateFlowResult, EngineError> {
3198        let pool = &self.inner.pool;
3199        retry_serializable(|| create_flow_impl(pool, &args)).await
3200    }
3201
3202    #[cfg(feature = "core")]
3203    async fn add_execution_to_flow(
3204        &self,
3205        args: AddExecutionToFlowArgs,
3206    ) -> Result<AddExecutionToFlowResult, EngineError> {
3207        let pool = &self.inner.pool;
3208        retry_serializable(|| add_execution_to_flow_impl(pool, &args)).await
3209    }
3210
3211    #[cfg(feature = "core")]
3212    async fn stage_dependency_edge(
3213        &self,
3214        args: StageDependencyEdgeArgs,
3215    ) -> Result<StageDependencyEdgeResult, EngineError> {
3216        let pool = &self.inner.pool;
3217        retry_serializable(|| stage_dependency_edge_impl(pool, &args)).await
3218    }
3219
3220    #[cfg(feature = "core")]
3221    async fn apply_dependency_to_child(
3222        &self,
3223        args: ApplyDependencyToChildArgs,
3224    ) -> Result<ApplyDependencyToChildResult, EngineError> {
3225        let pool = &self.inner.pool;
3226        retry_serializable(|| apply_dependency_to_child_impl(pool, &args)).await
3227    }
3228
3229    // ── RFC-020 Wave 9 — Operator control (Phase 3.2) ────────────────
3230    //
3231    // Each body lives in `crate::operator` and follows the §4.2 shared
3232    // spine adapted for SQLite (BEGIN IMMEDIATE + WHERE-clause CAS +
3233    // post-commit broadcast). Outbox rows populate namespace +
3234    // instance_tag via co-transactional SELECT so tag-filtered
3235    // subscribers do not silently drop events.
3236
3237    #[cfg(feature = "core")]
3238    async fn cancel_execution(
3239        &self,
3240        args: CancelExecutionArgs,
3241    ) -> Result<CancelExecutionResult, EngineError> {
3242        crate::operator::cancel_execution_impl(&self.inner.pool, &self.inner.pubsub, args).await
3243    }
3244
3245    #[cfg(feature = "core")]
3246    async fn revoke_lease(
3247        &self,
3248        args: RevokeLeaseArgs,
3249    ) -> Result<RevokeLeaseResult, EngineError> {
3250        crate::operator::revoke_lease_impl(&self.inner.pool, &self.inner.pubsub, args).await
3251    }
3252
3253    #[cfg(feature = "core")]
3254    async fn change_priority(
3255        &self,
3256        args: ChangePriorityArgs,
3257    ) -> Result<ChangePriorityResult, EngineError> {
3258        crate::operator::change_priority_impl(&self.inner.pool, &self.inner.pubsub, args).await
3259    }
3260
3261    #[cfg(feature = "core")]
3262    async fn replay_execution(
3263        &self,
3264        args: ReplayExecutionArgs,
3265    ) -> Result<ReplayExecutionResult, EngineError> {
3266        crate::operator::replay_execution_impl(&self.inner.pool, &self.inner.pubsub, args).await
3267    }
3268
3269    // ── RFC-020 Wave 9 — Read model (Phase 3.3) ──────────────────────
3270    //
3271    // Three read-only methods paralleling PG §4.1. Normalisation
3272    // helpers collapse storage-tier lifecycle/state literals to the
3273    // `serde_snake_case` wire form; unknown tokens surface
3274    // `Corruption`. `get_execution_result` has current-attempt
3275    // semantics per RFC-020 Rev 7 Fork 3.
3276
3277    #[cfg(feature = "core")]
3278    async fn read_execution_state(
3279        &self,
3280        id: &ExecutionId,
3281    ) -> Result<Option<PublicState>, EngineError> {
3282        crate::reads::read_execution_state_impl(&self.inner.pool, id).await
3283    }
3284
3285    #[cfg(feature = "core")]
3286    async fn read_execution_info(
3287        &self,
3288        id: &ExecutionId,
3289    ) -> Result<Option<ExecutionInfo>, EngineError> {
3290        crate::reads::read_execution_info_impl(&self.inner.pool, id).await
3291    }
3292
3293    async fn get_execution_result(
3294        &self,
3295        id: &ExecutionId,
3296    ) -> Result<Option<Vec<u8>>, EngineError> {
3297        crate::reads::get_execution_result_impl(&self.inner.pool, id).await
3298    }
3299
3300    // ── RFC-020 Wave 9 — Cancel-flow split (Phase 3.3) ───────────────
3301    //
3302    // `cancel_flow_header` is the atomic flow-state flip + member
3303    // enumeration; `ack_cancel_member` is the drain of one member +
3304    // parent-delete when empty. The Server composes these with its
3305    // wait/async machinery to build the wire-level
3306    // [`CancelFlowResult`]. `ack_cancel_member` is silent on the
3307    // outbox (RFC-020 §4.2.7 Valkey-parity).
3308
3309    #[cfg(feature = "core")]
3310    async fn cancel_flow_header(
3311        &self,
3312        args: CancelFlowArgs,
3313    ) -> Result<CancelFlowHeader, EngineError> {
3314        crate::operator::cancel_flow_header_impl(&self.inner.pool, &self.inner.pubsub, args).await
3315    }
3316
3317    #[cfg(feature = "core")]
3318    async fn ack_cancel_member(
3319        &self,
3320        flow_id: &FlowId,
3321        execution_id: &ExecutionId,
3322    ) -> Result<(), EngineError> {
3323        crate::operator::ack_cancel_member_impl(
3324            &self.inner.pool,
3325            flow_id.clone(),
3326            execution_id.clone(),
3327        )
3328        .await
3329    }
3330
3331    // ── RFC-020 Wave 9 — list_pending_waitpoints (Phase 3.3) ─────────
3332
3333    #[cfg(feature = "core")]
3334    async fn list_pending_waitpoints(
3335        &self,
3336        args: ListPendingWaitpointsArgs,
3337    ) -> Result<ListPendingWaitpointsResult, EngineError> {
3338        crate::suspend_ops::list_pending_waitpoints_impl(&self.inner.pool, args).await
3339    }
3340
3341    // ── RFC-019 Stage B/C — subscribe_* (Phase 3.1) ──────────────────
3342    //
3343    // Each method wraps the Phase 2b.2.2
3344    // [`crate::outbox_cursor::OutboxCursorReader`] primitive against
3345    // the matching outbox table + broadcast channel. Cursor encoding,
3346    // `ScannerFilter` semantics, and event-type → typed-variant
3347    // mapping all mirror the Postgres reference in
3348    // `ff-backend-postgres/src/{lease,signal_delivery}_subscribe.rs`
3349    // so cross-backend consumers see identical shapes.
3350    //
3351    // `subscribe_instance_tags` stays on the trait default
3352    // (`Unavailable`) per RFC-020 §3.2 / the #311 deferral.
3353
3354    async fn subscribe_completion(
3355        &self,
3356        cursor: ff_core::stream_subscribe::StreamCursor,
3357        filter: &ff_core::backend::ScannerFilter,
3358    ) -> Result<ff_core::stream_events::CompletionSubscription, EngineError> {
3359        let pool = self.inner.pool.clone();
3360        let wakeup = self.inner.pubsub.completion.subscribe();
3361        crate::completion_subscribe::subscribe(pool, wakeup, cursor, filter.clone()).await
3362    }
3363
3364    async fn subscribe_lease_history(
3365        &self,
3366        cursor: ff_core::stream_subscribe::StreamCursor,
3367        filter: &ff_core::backend::ScannerFilter,
3368    ) -> Result<ff_core::stream_events::LeaseHistorySubscription, EngineError> {
3369        let pool = self.inner.pool.clone();
3370        let wakeup = self.inner.pubsub.lease_history.subscribe();
3371        crate::lease_event_subscribe::subscribe(pool, wakeup, cursor, filter.clone()).await
3372    }
3373
3374    async fn subscribe_signal_delivery(
3375        &self,
3376        cursor: ff_core::stream_subscribe::StreamCursor,
3377        filter: &ff_core::backend::ScannerFilter,
3378    ) -> Result<ff_core::stream_events::SignalDeliverySubscription, EngineError> {
3379        let pool = self.inner.pool.clone();
3380        let wakeup = self.inner.pubsub.signal_delivery.subscribe();
3381        crate::signal_delivery_subscribe::subscribe(pool, wakeup, cursor, filter.clone()).await
3382    }
3383
3384    // ── HMAC secret management (RFC-023 Phase 2b.2.1) ──
3385
3386    async fn seed_waitpoint_hmac_secret(
3387        &self,
3388        args: SeedWaitpointHmacSecretArgs,
3389    ) -> Result<SeedOutcome, EngineError> {
3390        let pool = &self.inner.pool;
3391        retry_serializable(|| {
3392            crate::suspend_ops::seed_waitpoint_hmac_secret_impl(pool, args.clone())
3393        })
3394        .await
3395    }
3396
3397    async fn rotate_waitpoint_hmac_secret_all(
3398        &self,
3399        args: RotateWaitpointHmacSecretAllArgs,
3400    ) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
3401        let pool = &self.inner.pool;
3402        retry_serializable(|| {
3403            crate::suspend_ops::rotate_waitpoint_hmac_secret_all_impl(pool, args.clone())
3404        })
3405        .await
3406    }
3407
3408    // ── RFC-018 capability discovery ──
3409
3410    fn backend_label(&self) -> &'static str {
3411        "sqlite"
3412    }
3413
3414    fn capabilities(&self) -> Capabilities {
3415        // RFC-023 §6.3: atomic flag-flip at the v0.12 release PR.
3416        // Phase 1-3 trait bodies shipped on `main` ahead of this PR; this
3417        // capabilities() snapshot flips every `Supports::*` flag whose
3418        // backing method ships (Wave 10 live), EXCEPT `claim_for_worker`
3419        // (§5 permanent non-goal — scheduler routing is out of scope for
3420        // the dev-only backend) and `subscribe_instance_tags` (#311 —
3421        // deferred on all backends; cairn's `instance_tag_backfill` is
3422        // served by `list_executions` + `ScannerFilter::with_instance_tag`).
3423        Capabilities::new(
3424            BackendIdentity::new(
3425                "sqlite",
3426                Version::new(
3427                    env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap_or(0),
3428                    env!("CARGO_PKG_VERSION_MINOR").parse().unwrap_or(0),
3429                    env!("CARGO_PKG_VERSION_PATCH").parse().unwrap_or(0),
3430                ),
3431                "Phase-4",
3432            ),
3433            sqlite_supports_base(),
3434        )
3435    }
3436
3437    async fn prepare(&self) -> Result<PrepareOutcome, EngineError> {
3438        // Phase 1a: no boot-time prep (no migrations yet). Phase 1b
3439        // applies the migrations inside `SqliteBackend::new` itself
3440        // rather than here, matching the PG posture.
3441        Ok(PrepareOutcome::NoOp)
3442    }
3443
3444    // ── PR-7b Wave 0a: exec_core field read ──
3445
3446    async fn read_exec_core_fields(
3447        &self,
3448        partition: ff_core::partition::Partition,
3449        execution_id: &ff_core::types::ExecutionId,
3450        fields: &[&str],
3451    ) -> Result<std::collections::HashMap<String, Option<String>>, EngineError> {
3452        if fields.is_empty() {
3453            return Ok(std::collections::HashMap::new());
3454        }
3455        let (part, exec_uuid) = split_exec_id(execution_id)?;
3456        if part as u16 != partition.index {
3457            return Err(EngineError::Validation {
3458                kind: ff_core::engine_error::ValidationKind::InvalidInput,
3459                detail: format!(
3460                    "read_exec_core_fields: partition mismatch (arg={}, eid={})",
3461                    partition.index, part
3462                ),
3463            });
3464        }
3465
3466        // Classify fields: canonical columns CAST to text; known
3467        // raw_fields JSON names via json_extract; unknown names → NULL.
3468        let mut projections: Vec<String> = Vec::with_capacity(fields.len());
3469        for field in fields {
3470            let expr = match *field {
3471                "lane_id" | "lifecycle_phase" | "ownership_state" | "eligibility_state"
3472                | "public_state" | "attempt_state" | "blocking_reason" | "cancellation_reason"
3473                | "cancelled_by" => format!("CAST({field} AS TEXT)"),
3474                "attempt_index" => "CAST(attempt_index AS TEXT)".to_string(),
3475                "flow_id" => "CAST(flow_id AS TEXT)".to_string(),
3476                "priority" => "CAST(priority AS TEXT)".to_string(),
3477                "created_at_ms" => "CAST(created_at_ms AS TEXT)".to_string(),
3478                "terminal_at_ms" => "CAST(terminal_at_ms AS TEXT)".to_string(),
3479                "deadline_at_ms" => "CAST(deadline_at_ms AS TEXT)".to_string(),
3480                "current_attempt_index" => "CAST(attempt_index AS TEXT)".to_string(),
3481                "completed_at" => "CAST(terminal_at_ms AS TEXT)".to_string(),
3482                "cancel_reason" => "CAST(cancellation_reason AS TEXT)".to_string(),
3483                "required_capabilities" => {
3484                    // Mirror PG CSV projection from junction table.
3485                    "(SELECT group_concat(capability, ',') \
3486                      FROM ff_execution_capabilities \
3487                      WHERE execution_id = ff_exec_core.execution_id)"
3488                        .to_string()
3489                }
3490                other => match other {
3491                    "current_waitpoint_id"
3492                    | "current_worker_instance_id"
3493                    | "budget_ids"
3494                    | "quota_policy_id" => {
3495                        format!("json_extract(raw_fields, '$.{other}')")
3496                    }
3497                    _ => "NULL".to_string(),
3498                },
3499            };
3500            projections.push(expr);
3501        }
3502        let projection_sql = projections.join(", ");
3503        let query = format!(
3504            "SELECT {projection_sql} FROM ff_exec_core \
3505             WHERE partition_key = ?1 AND execution_id = ?2"
3506        );
3507        let row_opt = sqlx::query(&query)
3508            .bind(part)
3509            .bind(exec_uuid)
3510            .fetch_optional(self.pool())
3511            .await
3512            .map_err(|e| EngineError::Transport {
3513                backend: "sqlite",
3514                source: format!("read_exec_core_fields: {e}").into(),
3515            })?;
3516
3517        let mut out = std::collections::HashMap::with_capacity(fields.len());
3518        if let Some(row) = row_opt {
3519            use sqlx::Row;
3520            for (idx, field) in fields.iter().enumerate() {
3521                let val: Option<String> =
3522                    row.try_get(idx).map_err(|e| EngineError::Transport {
3523                        backend: "sqlite",
3524                        source: format!("read_exec_core_fields[{field}]: {e}").into(),
3525                    })?;
3526                out.insert((*field).to_string(), val);
3527            }
3528        } else {
3529            for field in fields {
3530                out.insert((*field).to_string(), None);
3531            }
3532        }
3533        Ok(out)
3534    }
3535
3536    // ── PR-7b Wave 0a: clock primitive ──
3537
3538    async fn server_time_ms(&self) -> Result<u64, EngineError> {
3539        // julianday('now') returns UT1 days since 4714 BC noon.
3540        // 2440587.5 is julianday at Unix epoch.
3541        let ms: i64 = sqlx::query_scalar(
3542            "SELECT CAST((julianday('now') - 2440587.5) * 86400000 AS INTEGER)",
3543        )
3544        .fetch_one(self.pool())
3545        .await
3546        .map_err(|e| EngineError::Transport {
3547            backend: "sqlite",
3548            source: format!("server_time_ms: {e}").into(),
3549        })?;
3550        if ms < 0 {
3551            return Err(EngineError::Transport {
3552                backend: "sqlite",
3553                source: "server_time_ms: negative epoch".into(),
3554            });
3555        }
3556        Ok(ms as u64)
3557    }
3558
3559    // ── RFC-025 Phase 4 — worker registry ───────────────────────
3560    //
3561    // Bodies live in `crate::worker_registry`; overrides here
3562    // forward to those free functions. `#[cfg(feature = ...)]`
3563    // gates match the trait declarations in ff-core.
3564
3565    #[cfg(feature = "core")]
3566    #[tracing::instrument(name = "sqlite.register_worker", skip_all)]
3567    async fn register_worker(
3568        &self,
3569        args: ff_core::contracts::RegisterWorkerArgs,
3570    ) -> Result<ff_core::contracts::RegisterWorkerOutcome, EngineError> {
3571        crate::worker_registry::register_worker(&self.inner.pool, args).await
3572    }
3573
3574    #[cfg(feature = "core")]
3575    #[tracing::instrument(name = "sqlite.heartbeat_worker", skip_all)]
3576    async fn heartbeat_worker(
3577        &self,
3578        args: ff_core::contracts::HeartbeatWorkerArgs,
3579    ) -> Result<ff_core::contracts::HeartbeatWorkerOutcome, EngineError> {
3580        crate::worker_registry::heartbeat_worker(&self.inner.pool, args).await
3581    }
3582
3583    #[cfg(feature = "core")]
3584    #[tracing::instrument(name = "sqlite.mark_worker_dead", skip_all)]
3585    async fn mark_worker_dead(
3586        &self,
3587        args: ff_core::contracts::MarkWorkerDeadArgs,
3588    ) -> Result<ff_core::contracts::MarkWorkerDeadOutcome, EngineError> {
3589        crate::worker_registry::mark_worker_dead(&self.inner.pool, args).await
3590    }
3591
3592    // list_expired_leases joins ff_attempt + ff_exec_core, which
3593    // live under `core`. Require both features to keep the body's
3594    // dep chain intact, mirroring the PG `#[cfg(all(...))]` posture.
3595    #[cfg(all(feature = "core", feature = "suspension"))]
3596    #[tracing::instrument(name = "sqlite.list_expired_leases", skip_all)]
3597    async fn list_expired_leases(
3598        &self,
3599        args: ff_core::contracts::ListExpiredLeasesArgs,
3600    ) -> Result<ff_core::contracts::ListExpiredLeasesResult, EngineError> {
3601        crate::worker_registry::list_expired_leases(&self.inner.pool, args).await
3602    }
3603
3604    #[cfg(feature = "core")]
3605    #[tracing::instrument(name = "sqlite.list_workers", skip_all)]
3606    async fn list_workers(
3607        &self,
3608        args: ff_core::contracts::ListWorkersArgs,
3609    ) -> Result<ff_core::contracts::ListWorkersResult, EngineError> {
3610        crate::worker_registry::list_workers(&self.inner.pool, args).await
3611    }
3612}
3613
3614#[cfg(test)]
3615mod tests {
3616    use super::is_memory_uri;
3617
3618    /// #372 regression: `is_memory_uri` must detect the three in-memory
3619    /// URI forms the backend supports, including the RFC-023 §4.6
3620    /// recommended `file:<name>?mode=memory&cache=shared` test-
3621    /// isolation form. A miss on the third form caused WAL to be
3622    /// applied inappropriately and no sentinel connection to be held,
3623    /// so pool-idle cycles dropped the shared cache mid-test.
3624    #[test]
3625    fn is_memory_detects_all_uri_forms() {
3626        // Bare.
3627        assert!(is_memory_uri(":memory:"));
3628        // Short-form shared-cache URI.
3629        assert!(is_memory_uri("file::memory:"));
3630        assert!(is_memory_uri("file::memory:?cache=shared"));
3631        // §4.6 named form (the one #372 missed).
3632        assert!(is_memory_uri(
3633            "file:ff-test-abc123?mode=memory&cache=shared"
3634        ));
3635        assert!(is_memory_uri(
3636            "file:ff-test-00000000-0000-0000-0000-000000000000?mode=memory&cache=shared"
3637        ));
3638        // Filesystem paths and unrelated URIs must not match.
3639        assert!(!is_memory_uri("/tmp/ff.sqlite"));
3640        assert!(!is_memory_uri("./ff.sqlite"));
3641        assert!(!is_memory_uri("file:/tmp/ff.sqlite"));
3642        assert!(!is_memory_uri("file:ff-test?cache=shared"));
3643        // Filename happens to contain the substring `mode=memory`
3644        // but it is not a query parameter — MUST NOT match.
3645        assert!(!is_memory_uri("file:my_mode=memory_db.sqlite"));
3646        // Query-parameter form with `&` delimiter (mode is not the
3647        // first parameter) — MUST match.
3648        assert!(is_memory_uri("file:ff-test?cache=shared&mode=memory"));
3649    }
3650
3651    /// Filename that happens to contain the substring `mode=memory`
3652    /// must NOT be classified as in-memory — it is a persistent file
3653    /// path, not a URI query parameter.
3654    #[test]
3655    fn is_memory_uri_rejects_filename_with_mode_memory() {
3656        assert!(!is_memory_uri("file:my_mode=memory_db.sqlite"));
3657        // Also guard against a value-prefix mismatch: a query
3658        // parameter whose value starts with `memory` but isn't
3659        // exactly `memory` must not match.
3660        assert!(!is_memory_uri("file:foo?mode=memory_extra"));
3661    }
3662
3663    /// Simple `?mode=memory` query parameter — the canonical form.
3664    #[test]
3665    fn is_memory_uri_accepts_query_param() {
3666        assert!(is_memory_uri("file:test?mode=memory"));
3667    }
3668
3669    /// RFC-023 §4.6 recommended test-isolation form — the original
3670    /// #372 regression case.
3671    #[test]
3672    fn is_memory_uri_accepts_shared_cache_form() {
3673        assert!(is_memory_uri(
3674            "file:ff-test-00000000-0000-0000-0000-000000000000?mode=memory&cache=shared"
3675        ));
3676    }
3677
3678    /// Plain file path (no query string) must not match.
3679    #[test]
3680    fn is_memory_uri_rejects_plain_file() {
3681        assert!(!is_memory_uri("file:./data.db"));
3682    }
3683}