ff-backend-sqlite 0.15.0

FlowFabric EngineBackend impl — SQLite dev-only backend (RFC-023, Phase 1a scaffold)
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
//! RFC-025 Phase 4 — SQLite worker-registry bodies + TTL-sweep.
//!
//! Backs the five `EngineBackend` worker-registry methods
//! (`register_worker`, `heartbeat_worker`, `mark_worker_dead`,
//! `list_expired_leases`, `list_workers`) on SQLite, plus the
//! `worker_registry_ttl_sweep` reconciler that prunes rows whose
//! `last_heartbeat_ms + liveness_ttl_ms` has elapsed.
//!
//! Schema lives in `migrations/0021_worker_registry.sql`. Unlike
//! Postgres, SQLite is single-writer flat (RFC-023 §4.1 A3): no
//! `partition_key`, no HASH partitioning. `lanes` is stored as a
//! sorted-joined CSV since SQLite lacks `text[]`.
//!
//! Implementation notes mirror `ff-backend-postgres::worker_registry`
//! for behavioural parity; divergences are callouts on the call site.

use std::collections::BTreeSet;

use sqlx::{Row, SqlitePool};
#[cfg(feature = "suspension")]
use uuid::Uuid;

#[cfg(feature = "suspension")]
use ff_core::contracts::{
    ExpiredLeaseInfo, ExpiredLeasesCursor, LIST_EXPIRED_LEASES_DEFAULT_LIMIT,
    LIST_EXPIRED_LEASES_MAX_LIMIT, ListExpiredLeasesArgs, ListExpiredLeasesResult,
};
use ff_core::contracts::{
    HeartbeatWorkerArgs, HeartbeatWorkerOutcome, ListWorkersArgs, ListWorkersResult,
    MARK_WORKER_DEAD_REASON_MAX_BYTES, MarkWorkerDeadArgs, MarkWorkerDeadOutcome,
    RegisterWorkerArgs, RegisterWorkerOutcome, WorkerInfo,
};
use ff_core::engine_error::{EngineError, ValidationKind};
#[cfg(feature = "suspension")]
use ff_core::types::{AttemptIndex, ExecutionId, LeaseEpoch, LeaseId};
use ff_core::types::{LaneId, Namespace, TimestampMs, WorkerId, WorkerInstanceId};

use crate::errors::map_sqlx_error;
use crate::reconcilers::ScanReport;

/// SQLite has no stable `lease_id` column on `ff_attempt` (identity
/// is `(execution_id, attempt_index, lease_epoch)`). For
/// `list_expired_leases` we surface a `LeaseId` derived from that
/// triple so two callers (or cross-backend fixtures) observe the
/// same id. Must match the PG sibling byte-for-byte.
#[cfg(feature = "suspension")]
fn synthetic_lease_uuid(exec_uuid: Uuid, attempt_index: i32, lease_epoch: i64) -> Uuid {
    let mut bytes = *exec_uuid.as_bytes();
    let ai = attempt_index.to_be_bytes();
    let le = lease_epoch.to_be_bytes();
    for i in 0..4 {
        bytes[8 + i] ^= ai[i];
    }
    for i in 0..8 {
        bytes[i] ^= le[i];
    }
    Uuid::from_bytes(bytes)
}

// ── register_worker ──────────────────────────────────────────────

pub async fn register_worker(
    pool: &SqlitePool,
    args: RegisterWorkerArgs,
) -> Result<RegisterWorkerOutcome, EngineError> {
    let lanes_csv: String = args
        .lanes
        .iter()
        .map(|l| l.0.as_str())
        .collect::<Vec<&str>>()
        .join(",");
    let caps_csv: String = args
        .capabilities
        .iter()
        .cloned()
        .collect::<Vec<String>>()
        .join(",");

    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;

    // Preflight: (a) detect instance_id reassignment under a
    // different `worker_id` (reject per RFC-025 §4); (b) decide
    // Registered-vs-Refreshed from row presence — SQLite lacks the
    // PG `xmax = 0` upsert-origin signal, so the preflight SELECT
    // IS the signal.
    let existing_worker_id: Option<String> = sqlx::query_scalar(
        "SELECT worker_id FROM ff_worker_registry \
         WHERE namespace = ? AND worker_instance_id = ?",
    )
    .bind(args.namespace.as_str())
    .bind(args.worker_instance_id.as_str())
    .fetch_optional(&mut *tx)
    .await
    .map_err(map_sqlx_error)?;

    if let Some(existing) = existing_worker_id.as_deref()
        && existing != args.worker_id.as_str()
    {
        return Err(EngineError::Validation {
            kind: ValidationKind::InvalidInput,
            detail: "instance_id reassigned".into(),
        });
    }
    let was_present = existing_worker_id.is_some();

    sqlx::query(
        "INSERT INTO ff_worker_registry (\
             namespace, worker_instance_id, worker_id, \
             lanes, capabilities_csv, last_heartbeat_ms, liveness_ttl_ms, \
             registered_at_ms\
         ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) \
         ON CONFLICT(namespace, worker_instance_id) DO UPDATE SET \
             worker_id = excluded.worker_id, \
             lanes = excluded.lanes, \
             capabilities_csv = excluded.capabilities_csv, \
             last_heartbeat_ms = excluded.last_heartbeat_ms, \
             liveness_ttl_ms = excluded.liveness_ttl_ms",
    )
    .bind(args.namespace.as_str())
    .bind(args.worker_instance_id.as_str())
    .bind(args.worker_id.as_str())
    .bind(lanes_csv.as_str())
    .bind(caps_csv.as_str())
    .bind(args.now.0)
    .bind(args.liveness_ttl_ms as i64)
    .bind(args.now.0)
    .execute(&mut *tx)
    .await
    .map_err(map_sqlx_error)?;

    sqlx::query(
        "INSERT OR IGNORE INTO ff_worker_registry_event \
             (namespace, worker_instance_id, event_kind, event_at_ms, reason) \
         VALUES (?, ?, 'registered', ?, NULL)",
    )
    .bind(args.namespace.as_str())
    .bind(args.worker_instance_id.as_str())
    .bind(args.now.0)
    .execute(&mut *tx)
    .await
    .map_err(map_sqlx_error)?;

    tx.commit().await.map_err(map_sqlx_error)?;

    Ok(if was_present {
        RegisterWorkerOutcome::Refreshed
    } else {
        RegisterWorkerOutcome::Registered
    })
}

// ── heartbeat_worker ─────────────────────────────────────────────

pub async fn heartbeat_worker(
    pool: &SqlitePool,
    args: HeartbeatWorkerArgs,
) -> Result<HeartbeatWorkerOutcome, EngineError> {
    // Refresh `last_heartbeat_ms` only when the row is still within
    // its declared TTL window. If the TTL has lapsed, surface
    // `NotRegistered` — the sweep will drop it on its next tick.
    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;

    let ttl_row: Option<i64> = sqlx::query_scalar(
        "UPDATE ff_worker_registry SET last_heartbeat_ms = ?3 \
         WHERE namespace = ?1 AND worker_instance_id = ?2 \
           AND last_heartbeat_ms + liveness_ttl_ms > ?3 \
         RETURNING liveness_ttl_ms",
    )
    .bind(args.namespace.as_str())
    .bind(args.worker_instance_id.as_str())
    .bind(args.now.0)
    .fetch_optional(&mut *tx)
    .await
    .map_err(map_sqlx_error)?;

    let Some(ttl_ms) = ttl_row else {
        tx.commit().await.map_err(map_sqlx_error)?;
        return Ok(HeartbeatWorkerOutcome::NotRegistered);
    };

    sqlx::query(
        "INSERT OR IGNORE INTO ff_worker_registry_event \
             (namespace, worker_instance_id, event_kind, event_at_ms, reason) \
         VALUES (?, ?, 'heartbeat', ?, NULL)",
    )
    .bind(args.namespace.as_str())
    .bind(args.worker_instance_id.as_str())
    .bind(args.now.0)
    .execute(&mut *tx)
    .await
    .map_err(map_sqlx_error)?;

    tx.commit().await.map_err(map_sqlx_error)?;

    let next_expiry_ms = TimestampMs::from_millis(args.now.0.saturating_add(ttl_ms));
    Ok(HeartbeatWorkerOutcome::Refreshed { next_expiry_ms })
}

// ── mark_worker_dead ─────────────────────────────────────────────

pub async fn mark_worker_dead(
    pool: &SqlitePool,
    args: MarkWorkerDeadArgs,
) -> Result<MarkWorkerDeadOutcome, EngineError> {
    // Mirrors PG + Valkey validation: 256-byte cap + no control
    // characters.
    if args.reason.len() > MARK_WORKER_DEAD_REASON_MAX_BYTES {
        return Err(EngineError::Validation {
            kind: ValidationKind::InvalidInput,
            detail: format!(
                "reason: exceeds {} bytes (got {})",
                MARK_WORKER_DEAD_REASON_MAX_BYTES,
                args.reason.len()
            ),
        });
    }
    if args.reason.chars().any(|c| c.is_control()) {
        return Err(EngineError::Validation {
            kind: ValidationKind::InvalidInput,
            detail: "reason: must not contain control characters".into(),
        });
    }

    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;

    let deleted: Option<i64> = sqlx::query_scalar(
        "DELETE FROM ff_worker_registry \
         WHERE namespace = ? AND worker_instance_id = ? \
         RETURNING 1",
    )
    .bind(args.namespace.as_str())
    .bind(args.worker_instance_id.as_str())
    .fetch_optional(&mut *tx)
    .await
    .map_err(map_sqlx_error)?;

    if deleted.is_none() {
        tx.commit().await.map_err(map_sqlx_error)?;
        return Ok(MarkWorkerDeadOutcome::NotRegistered);
    }

    sqlx::query(
        "INSERT OR IGNORE INTO ff_worker_registry_event \
             (namespace, worker_instance_id, event_kind, event_at_ms, reason) \
         VALUES (?, ?, 'marked_dead', ?, ?)",
    )
    .bind(args.namespace.as_str())
    .bind(args.worker_instance_id.as_str())
    .bind(args.now.0)
    .bind(args.reason.as_str())
    .execute(&mut *tx)
    .await
    .map_err(map_sqlx_error)?;

    tx.commit().await.map_err(map_sqlx_error)?;
    Ok(MarkWorkerDeadOutcome::Marked)
}

// ── list_expired_leases ──────────────────────────────────────────

#[cfg(feature = "suspension")]
pub async fn list_expired_leases(
    pool: &SqlitePool,
    args: ListExpiredLeasesArgs,
) -> Result<ListExpiredLeasesResult, EngineError> {
    let limit = args
        .limit
        .unwrap_or(LIST_EXPIRED_LEASES_DEFAULT_LIMIT)
        .min(LIST_EXPIRED_LEASES_MAX_LIMIT) as i64;
    // SQLite has no ZSET fan-out knob; accepted-ignored.
    let _ = args.max_partitions_per_call;

    let (cursor_expiry, cursor_eid_str): (Option<i64>, Option<String>) = match args.after.as_ref() {
        Some(c) => (Some(c.expires_at_ms.0), Some(c.execution_id.to_string())),
        None => (None, None),
    };
    let namespace_filter: Option<&str> = args.namespace.as_ref().map(|n| n.as_str());

    // Join `ff_attempt` (lease state) with `ff_exec_core`
    // (partition-key reconstruction + namespace filter). Unlike PG
    // where `ff_exec_core.namespace` is a column in the outstanding
    // review trail, SQLite stashes `namespace` inside the
    // `raw_fields` JSON blob (matching `ff-backend-sqlite/queries/*`
    // convention); we project it via `json_extract`. The
    // `ff_attempt_lease_expiry_idx` partial index keys the scan.
    //
    // Cursor tuple `(lease_expires_at_ms, execution_id_text)` is
    // strict-greater so pagination is stable under equal-expiry.
    // Execution id is BLOB (16 bytes); cursor compare uses the same
    // textual form the caller receives (`"{fp:N}:uuid"`) — rebuild
    // before comparing.
    let rows = sqlx::query(
        "SELECT a.partition_key, a.execution_id, a.attempt_index, a.lease_epoch, \
                a.worker_instance_id, a.lease_expires_at_ms, \
                json_extract(c.raw_fields, '$.namespace') AS namespace \
           FROM ff_attempt a \
           JOIN ff_exec_core c \
             ON c.partition_key = a.partition_key AND c.execution_id = a.execution_id \
          WHERE a.lease_expires_at_ms IS NOT NULL \
            AND a.lease_expires_at_ms <= ?1 \
            AND a.worker_instance_id IS NOT NULL \
            AND c.public_state IN ('claimed', 'running') \
            AND (?2 IS NULL OR json_extract(c.raw_fields, '$.namespace') = ?2) \
            AND (?3 IS NULL \
                 OR (a.lease_expires_at_ms > ?3) \
                 OR (a.lease_expires_at_ms = ?3 \
                     AND ('{fp:' || a.partition_key || '}:' || \
                          substr(lower(hex(a.execution_id)), 1, 8) || '-' || \
                          substr(lower(hex(a.execution_id)), 9, 4) || '-' || \
                          substr(lower(hex(a.execution_id)), 13, 4) || '-' || \
                          substr(lower(hex(a.execution_id)), 17, 4) || '-' || \
                          substr(lower(hex(a.execution_id)), 21, 12)) > ?4)) \
          ORDER BY a.lease_expires_at_ms ASC, a.partition_key ASC, a.execution_id ASC \
          LIMIT ?5",
    )
    .bind(args.as_of.0)
    .bind(namespace_filter)
    .bind(cursor_expiry)
    .bind(cursor_eid_str.as_deref().unwrap_or(""))
    .bind(limit)
    .fetch_all(pool)
    .await
    .map_err(map_sqlx_error)?;

    let mut entries: Vec<ExpiredLeaseInfo> = Vec::with_capacity(rows.len());
    for row in &rows {
        let partition_key: i64 = row.try_get("partition_key").map_err(map_sqlx_error)?;
        let exec_uuid: Uuid = row.try_get("execution_id").map_err(map_sqlx_error)?;
        let attempt_index: i64 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
        let lease_epoch: i64 = row.try_get("lease_epoch").map_err(map_sqlx_error)?;
        let worker_inst: String = row.try_get("worker_instance_id").map_err(map_sqlx_error)?;
        let expires_at_ms: i64 = row.try_get("lease_expires_at_ms").map_err(map_sqlx_error)?;

        let eid_str = format!("{{fp:{}}}:{}", partition_key, exec_uuid);
        let execution_id = ExecutionId::parse(&eid_str).map_err(|e| EngineError::Validation {
            kind: ValidationKind::Corruption,
            detail: format!("list_expired_leases: bad execution_id {eid_str:?}: {e}"),
        })?;

        let attempt_index_i32 = i32::try_from(attempt_index).unwrap_or(i32::MAX);
        let lease_id = LeaseId::from_uuid(synthetic_lease_uuid(
            exec_uuid,
            attempt_index_i32,
            lease_epoch,
        ));
        let attempt_index_u = u32::try_from(attempt_index.max(0)).unwrap_or(0);
        let lease_epoch_u = u64::try_from(lease_epoch.max(0)).unwrap_or(0);

        entries.push(ExpiredLeaseInfo::new(
            execution_id,
            lease_id,
            LeaseEpoch::new(lease_epoch_u),
            WorkerInstanceId::new(worker_inst),
            TimestampMs::from_millis(expires_at_ms),
            AttemptIndex::new(attempt_index_u),
        ));
    }

    let page_full = rows.len() as i64 >= limit;
    let cursor = if page_full {
        entries
            .last()
            .map(|e| ExpiredLeasesCursor::new(e.expires_at_ms, e.execution_id.clone()))
    } else {
        None
    };
    Ok(ListExpiredLeasesResult::new(entries, cursor))
}

// ── list_workers ─────────────────────────────────────────────────

pub async fn list_workers(
    pool: &SqlitePool,
    args: ListWorkersArgs,
) -> Result<ListWorkersResult, EngineError> {
    // RFC-025 §9.4: cross-namespace enumeration requires a two-key
    // cursor (namespace, worker_instance_id) because instance_ids
    // collide across namespaces. Phase-1's cursor is single-key
    // `Option<WorkerInstanceId>`. Mirror the PG Phase-3 decision:
    // reject cross-ns with `Unavailable` and let operator tooling
    // loop per namespace until a tuple-cursor RFC lands.
    let Some(ns) = args.namespace.as_ref() else {
        return Err(EngineError::Unavailable {
            op: "list_workers (cross-namespace on SQLite — pass namespace explicitly)",
        });
    };
    let limit = args.limit.unwrap_or(1000) as i64;
    let namespace_filter: &str = ns.as_str();
    let after_cursor: Option<&str> = args.after.as_ref().map(|w| w.as_str());

    let rows = sqlx::query(
        "SELECT worker_id, worker_instance_id, namespace, lanes, \
                capabilities_csv, last_heartbeat_ms, liveness_ttl_ms, registered_at_ms \
           FROM ff_worker_registry \
          WHERE namespace = ?1 \
            AND (?2 IS NULL OR worker_instance_id > ?2) \
          ORDER BY worker_instance_id ASC \
          LIMIT ?3",
    )
    .bind(namespace_filter)
    .bind(after_cursor)
    .bind(limit)
    .fetch_all(pool)
    .await
    .map_err(map_sqlx_error)?;

    let mut entries: Vec<WorkerInfo> = Vec::with_capacity(rows.len());
    for row in &rows {
        let worker_id: String = row.try_get("worker_id").map_err(map_sqlx_error)?;
        let worker_inst: String = row.try_get("worker_instance_id").map_err(map_sqlx_error)?;
        let namespace: String = row.try_get("namespace").map_err(map_sqlx_error)?;
        let lanes_csv: String = row.try_get("lanes").map_err(map_sqlx_error)?;
        let caps_csv: String = row.try_get("capabilities_csv").map_err(map_sqlx_error)?;
        let last_hb_ms: i64 = row.try_get("last_heartbeat_ms").map_err(map_sqlx_error)?;
        let liveness_ttl_ms: i64 = row.try_get("liveness_ttl_ms").map_err(map_sqlx_error)?;
        let registered_at_ms: i64 = row.try_get("registered_at_ms").map_err(map_sqlx_error)?;

        let lanes: BTreeSet<LaneId> = lanes_csv
            .split(',')
            .filter(|s| !s.is_empty())
            .map(|s| LaneId(s.to_owned()))
            .collect();
        let capabilities: BTreeSet<String> = caps_csv
            .split(',')
            .filter(|s| !s.is_empty())
            .map(str::to_owned)
            .collect();

        entries.push(WorkerInfo::new(
            WorkerId::new(worker_id),
            WorkerInstanceId::new(worker_inst),
            Namespace::new(namespace),
            lanes,
            capabilities,
            TimestampMs::from_millis(last_hb_ms),
            u64::try_from(liveness_ttl_ms.max(0)).unwrap_or(0),
            TimestampMs::from_millis(registered_at_ms),
        ));
    }

    let page_full = rows.len() as i64 >= limit;
    let cursor = if page_full {
        entries.last().map(|w| w.worker_instance_id.clone())
    } else {
        None
    };
    Ok(ListWorkersResult::new(entries, cursor))
}

// ── TTL-sweep scanner ────────────────────────────────────────────

/// Single-writer TTL sweep. Rows whose
/// `last_heartbeat_ms + liveness_ttl_ms < now_ms` are deleted and
/// a `ttl_swept` event appended to the audit log. SQLite's
/// WITH-CTE-DELETE-INSERT parses, but sqlite's WITH + DML support
/// on INSERTs with source CTEs via `INSERT ... SELECT FROM cte` is
/// tight; we use a two-statement transaction for simplicity.
///
/// Returns a `ScanReport` for the supervisor's per-tick log.
pub async fn ttl_sweep_tick(pool: &SqlitePool) -> Result<ScanReport, EngineError> {
    let now_ms: i64 = i64::try_from(
        std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .map(|d| d.as_millis())
            .unwrap_or(0),
    )
    .unwrap_or(i64::MAX);

    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;

    // Capture swept rows via DELETE ... RETURNING for audit
    // logging, then fan them into ff_worker_registry_event. One
    // transaction = atomic: concurrent `mark_worker_dead` on the
    // same row either beats the sweep (DELETE finds zero rows) or
    // races after it (same outcome, different `event_kind`).
    let swept = sqlx::query(
        "DELETE FROM ff_worker_registry \
         WHERE last_heartbeat_ms + liveness_ttl_ms < ? \
         RETURNING namespace, worker_instance_id",
    )
    .bind(now_ms)
    .fetch_all(&mut *tx)
    .await
    .map_err(map_sqlx_error)?;

    for row in &swept {
        let namespace: String = row.try_get("namespace").map_err(map_sqlx_error)?;
        let worker_inst: String = row.try_get("worker_instance_id").map_err(map_sqlx_error)?;
        sqlx::query(
            "INSERT OR IGNORE INTO ff_worker_registry_event \
                 (namespace, worker_instance_id, event_kind, event_at_ms, reason) \
             VALUES (?, ?, 'ttl_swept', ?, NULL)",
        )
        .bind(namespace)
        .bind(worker_inst)
        .bind(now_ms)
        .execute(&mut *tx)
        .await
        .map_err(map_sqlx_error)?;
    }

    tx.commit().await.map_err(map_sqlx_error)?;

    Ok(ScanReport {
        processed: u32::try_from(swept.len()).unwrap_or(u32::MAX),
        errors: 0,
    })
}