Skip to main content

ff_backend_postgres/
stream.rs

1//! Stream-family trait methods on the Postgres backend (RFC-015).
2//!
3//! This module houses the four stream methods called from
4//! `EngineBackend` on [`crate::PostgresBackend`]:
5//!
6//! 1. [`append_frame`] — Durable, DurableSummary (with JSON Merge
7//!    Patch apply), and BestEffortLive (EMA-driven dynamic MAXLEN).
8//! 2. [`read_stream`] — XRANGE-equivalent over `(ts_ms, seq)`.
9//! 3. [`tail_stream`] — blocking tail via the shared
10//!    [`crate::listener::StreamNotifier`]; NO pg connection is held
11//!    while parked (Q2).
12//! 4. [`read_summary`] — reads the rolling DurableSummary document.
13
14use std::collections::BTreeMap;
15use std::time::Duration;
16
17use ff_core::backend::{
18    AppendFrameOutcome, Frame, FrameKind, Handle, PatchKind, StreamMode, SummaryDocument,
19    TailVisibility,
20};
21use ff_core::contracts::{StreamCursor, StreamFrame, StreamFrames};
22use ff_core::engine_error::{EngineError, ValidationKind};
23use ff_core::partition::PartitionConfig;
24use ff_core::types::{AttemptIndex, ExecutionId};
25use serde_json::Value as Json;
26use sqlx::{PgPool, Row};
27use uuid::Uuid;
28
29use crate::error::map_sqlx_error;
30use crate::handle_codec::decode_handle;
31use crate::listener::{channel_name, StreamNotifier};
32
33/// Extract the UUID tail of an `ExecutionId` ({fp:N}:<uuid>).
34fn exec_uuid(eid: &ExecutionId) -> Result<Uuid, EngineError> {
35    let s = eid.as_str();
36    // After the "}:" the rest is the uuid.
37    let after = s.split_once("}:").map(|(_, r)| r).unwrap_or(s);
38    Uuid::parse_str(after).map_err(|e| EngineError::Validation {
39        kind: ValidationKind::InvalidInput,
40        detail: format!("invalid execution_id uuid tail: {e}"),
41    })
42}
43
44fn partition_key_of(eid: &ExecutionId) -> i16 {
45    // The partition index is mod'd by 256 (migration schema uses
46    // HASH 256). Our PartitionConfig default is 256, so partition()
47    // already fits in smallint; downcast is safe.
48    (eid.partition() % 256) as i16
49}
50
51fn frame_type_of(frame: &Frame) -> String {
52    if frame.frame_type.is_empty() {
53        match frame.kind {
54            FrameKind::Stdout => "stdout",
55            FrameKind::Stderr => "stderr",
56            FrameKind::Event => "event",
57            FrameKind::Blob => "blob",
58            _ => "event",
59        }
60        .to_owned()
61    } else {
62        frame.frame_type.clone()
63    }
64}
65
66fn now_ms() -> i64 {
67    use std::time::{SystemTime, UNIX_EPOCH};
68    SystemTime::now()
69        .duration_since(UNIX_EPOCH)
70        .map(|d| d.as_millis() as i64)
71        .unwrap_or(0)
72}
73
74/// Build the `fields` jsonb for a frame — mirrors the valkey
75/// stream entry shape so downstream parsers stay symmetric.
76fn build_fields_json(frame: &Frame) -> Json {
77    let payload_str = String::from_utf8_lossy(&frame.bytes).into_owned();
78    let mut map = serde_json::Map::new();
79    map.insert("frame_type".into(), Json::String(frame_type_of(frame)));
80    map.insert("payload".into(), Json::String(payload_str));
81    map.insert("encoding".into(), Json::String("utf8".into()));
82    map.insert("source".into(), Json::String("worker".into()));
83    if let Some(corr) = &frame.correlation_id {
84        map.insert("correlation_id".into(), Json::String(corr.clone()));
85    }
86    Json::Object(map)
87}
88
89// ── JSON Merge Patch (RFC 7396) with ff sentinel rewrite ──
90
91/// Apply an RFC 7396 JSON Merge Patch. The `__ff_null__` sentinel is
92/// rewritten to JSON `null` at leaf positions so callers can express
93/// "set leaf to null" vs. "delete key" (7396 uses bare null for
94/// delete). Sentinel handling matches the Lua implementation.
95pub(crate) fn apply_json_merge_patch(target: &mut Json, patch: &Json) {
96    if let Json::Object(patch_map) = patch {
97        if !target.is_object() {
98            *target = Json::Object(serde_json::Map::new());
99        }
100        let target_map = target.as_object_mut().expect("just ensured object");
101        for (k, v) in patch_map {
102            match v {
103                Json::Null => {
104                    target_map.remove(k);
105                }
106                Json::String(s) if s == ff_core::backend::SUMMARY_NULL_SENTINEL => {
107                    target_map.insert(k.clone(), Json::Null);
108                }
109                Json::Object(_) => {
110                    let entry = target_map.entry(k.clone()).or_insert(Json::Null);
111                    apply_json_merge_patch(entry, v);
112                }
113                other => {
114                    target_map.insert(k.clone(), other.clone());
115                }
116            }
117        }
118    } else {
119        *target = patch.clone();
120    }
121}
122
123// ── append_frame ──
124
125pub async fn append_frame(
126    pool: &PgPool,
127    _partition_config: &PartitionConfig,
128    handle: &Handle,
129    frame: Frame,
130) -> Result<AppendFrameOutcome, EngineError> {
131    let payload = decode_handle(handle)?;
132    let eid_uuid = exec_uuid(&payload.execution_id)?;
133    let pkey = partition_key_of(&payload.execution_id);
134    let aidx = payload.attempt_index.0 as i32;
135
136    let mode_wire = frame.mode.wire_str();
137    let fields = build_fields_json(&frame);
138
139    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
140
141    // Authoritative (ts_ms, seq) minting: row-lock the meta row per
142    // (eid, aidx), look up current max seq for this ts_ms, INSERT.
143    // One INSERT retry on unique-constraint violation (clock moved
144    // backwards or concurrent append within same ms).
145    let ts_ms: i64 = now_ms();
146
147    // Row-level serialisation via advisory xact lock keyed on
148    // (pkey, hash(eid, aidx)). Cheap; scoped to txn.
149    let lock_key: i64 = {
150        use std::collections::hash_map::DefaultHasher;
151        use std::hash::{Hash, Hasher};
152        let mut h = DefaultHasher::new();
153        (eid_uuid.as_bytes(), aidx).hash(&mut h);
154        h.finish() as i64
155    };
156    sqlx::query("SELECT pg_advisory_xact_lock($1)")
157        .bind(lock_key)
158        .execute(&mut *tx)
159        .await
160        .map_err(map_sqlx_error)?;
161
162    // Seq = max(seq WHERE ts_ms = $ts_ms) + 1, default 0.
163    let next_seq: i32 = sqlx::query_scalar::<_, Option<i32>>(
164        "SELECT MAX(seq) FROM ff_stream_frame \
165         WHERE partition_key=$1 AND execution_id=$2 \
166         AND attempt_index=$3 AND ts_ms=$4",
167    )
168    .bind(pkey)
169    .bind(eid_uuid)
170    .bind(aidx)
171    .bind(ts_ms)
172    .fetch_one(&mut *tx)
173    .await
174    .map_err(map_sqlx_error)?
175    .map(|s| s + 1)
176    .unwrap_or(0);
177
178    sqlx::query(
179        "INSERT INTO ff_stream_frame \
180         (partition_key, execution_id, attempt_index, ts_ms, seq, fields, mode, created_at_ms) \
181         VALUES ($1,$2,$3,$4,$5,$6,$7,$8)",
182    )
183    .bind(pkey)
184    .bind(eid_uuid)
185    .bind(aidx)
186    .bind(ts_ms)
187    .bind(next_seq)
188    .bind(&fields)
189    .bind(mode_wire)
190    .bind(ts_ms)
191    .execute(&mut *tx)
192    .await
193    .map_err(map_sqlx_error)?;
194
195    let mut summary_version: Option<u64> = None;
196
197    // DurableSummary: merge the frame's payload (as JSON) into
198    // ff_stream_summary.document_json; bump version.
199    if let StreamMode::DurableSummary { patch_kind } = &frame.mode {
200        // Parse the frame payload as JSON — callers supply JSON
201        // Merge Patch bytes.
202        let patch: Json = serde_json::from_slice(&frame.bytes).map_err(|e| {
203            EngineError::Validation {
204                kind: ValidationKind::InvalidInput,
205                detail: format!("summary patch not valid JSON: {e}"),
206            }
207        })?;
208
209        // UPSERT: fetch existing, merge in-Rust, write back.
210        let existing: Option<(Json, i32)> = sqlx::query_as(
211            "SELECT document_json, version FROM ff_stream_summary \
212             WHERE partition_key=$1 AND execution_id=$2 AND attempt_index=$3 \
213             FOR UPDATE",
214        )
215        .bind(pkey)
216        .bind(eid_uuid)
217        .bind(aidx)
218        .fetch_optional(&mut *tx)
219        .await
220        .map_err(map_sqlx_error)?;
221
222        let (mut doc, prev_version): (Json, i32) = match existing {
223            Some((d, v)) => (d, v),
224            None => (Json::Object(serde_json::Map::new()), 0),
225        };
226
227        match patch_kind {
228            PatchKind::JsonMergePatch => apply_json_merge_patch(&mut doc, &patch),
229            _ => apply_json_merge_patch(&mut doc, &patch),
230        }
231
232        let new_version = prev_version + 1;
233        let patch_kind_wire = "json-merge-patch";
234        if prev_version == 0 {
235            sqlx::query(
236                "INSERT INTO ff_stream_summary \
237                 (partition_key, execution_id, attempt_index, document_json, \
238                  version, patch_kind, last_updated_ms, first_applied_ms) \
239                 VALUES ($1,$2,$3,$4,$5,$6,$7,$8)",
240            )
241            .bind(pkey)
242            .bind(eid_uuid)
243            .bind(aidx)
244            .bind(&doc)
245            .bind(new_version)
246            .bind(patch_kind_wire)
247            .bind(ts_ms)
248            .bind(ts_ms)
249            .execute(&mut *tx)
250            .await
251            .map_err(map_sqlx_error)?;
252        } else {
253            sqlx::query(
254                "UPDATE ff_stream_summary SET document_json=$4, version=$5, \
255                  patch_kind=$6, last_updated_ms=$7 \
256                 WHERE partition_key=$1 AND execution_id=$2 AND attempt_index=$3",
257            )
258            .bind(pkey)
259            .bind(eid_uuid)
260            .bind(aidx)
261            .bind(&doc)
262            .bind(new_version)
263            .bind(patch_kind_wire)
264            .bind(ts_ms)
265            .execute(&mut *tx)
266            .await
267            .map_err(map_sqlx_error)?;
268        }
269        summary_version = Some(new_version as u64);
270    }
271
272    // BestEffortLive: update EMA + trim beyond dynamic MAXLEN.
273    if let StreamMode::BestEffortLive { config } = &frame.mode {
274        let meta: Option<(f64, i64)> = sqlx::query_as(
275            "SELECT ema_rate_hz, last_append_ts_ms FROM ff_stream_meta \
276             WHERE partition_key=$1 AND execution_id=$2 AND attempt_index=$3 \
277             FOR UPDATE",
278        )
279        .bind(pkey)
280        .bind(eid_uuid)
281        .bind(aidx)
282        .fetch_optional(&mut *tx)
283        .await
284        .map_err(map_sqlx_error)?;
285
286        let (ema_prev, last_ts) = meta.unwrap_or((0.0, 0));
287        let inst_rate: f64 = if last_ts > 0 && ts_ms > last_ts {
288            1000.0 / ((ts_ms - last_ts) as f64)
289        } else {
290            0.0
291        };
292        let alpha = config.ema_alpha;
293        let ema_new = alpha * inst_rate + (1.0 - alpha) * ema_prev;
294        let k_raw = (ema_new * (config.ttl_ms as f64) / 1000.0).ceil() as i64 * 2;
295        let k = k_raw
296            .max(config.maxlen_floor as i64)
297            .min(config.maxlen_ceiling as i64);
298
299        // Upsert meta.
300        sqlx::query(
301            "INSERT INTO ff_stream_meta \
302             (partition_key, execution_id, attempt_index, ema_rate_hz, \
303              last_append_ts_ms, maxlen_applied_last) \
304             VALUES ($1,$2,$3,$4,$5,$6) \
305             ON CONFLICT (partition_key, execution_id, attempt_index) DO UPDATE \
306             SET ema_rate_hz = EXCLUDED.ema_rate_hz, \
307                 last_append_ts_ms = EXCLUDED.last_append_ts_ms, \
308                 maxlen_applied_last = EXCLUDED.maxlen_applied_last",
309        )
310        .bind(pkey)
311        .bind(eid_uuid)
312        .bind(aidx)
313        .bind(ema_new)
314        .bind(ts_ms)
315        .bind(k as i32)
316        .execute(&mut *tx)
317        .await
318        .map_err(map_sqlx_error)?;
319
320        // Trim: delete frames beyond the most recent K.
321        // Piggy-back on append (per K-round-2: prefer this over
322        // per-row triggers).
323        sqlx::query(
324            "DELETE FROM ff_stream_frame \
325             WHERE partition_key=$1 AND execution_id=$2 AND attempt_index=$3 \
326             AND (ts_ms, seq) NOT IN ( \
327                 SELECT ts_ms, seq FROM ff_stream_frame \
328                 WHERE partition_key=$1 AND execution_id=$2 AND attempt_index=$3 \
329                 ORDER BY ts_ms DESC, seq DESC \
330                 LIMIT $4)",
331        )
332        .bind(pkey)
333        .bind(eid_uuid)
334        .bind(aidx)
335        .bind(k)
336        .execute(&mut *tx)
337        .await
338        .map_err(map_sqlx_error)?;
339    }
340
341    // Frame count after the append.
342    let frame_count: i64 = sqlx::query_scalar(
343        "SELECT COUNT(*)::bigint FROM ff_stream_frame \
344         WHERE partition_key=$1 AND execution_id=$2 AND attempt_index=$3",
345    )
346    .bind(pkey)
347    .bind(eid_uuid)
348    .bind(aidx)
349    .fetch_one(&mut *tx)
350    .await
351    .map_err(map_sqlx_error)?;
352
353    tx.commit().await.map_err(map_sqlx_error)?;
354
355    let stream_id = format!("{ts_ms}-{next_seq}");
356    let mut out = AppendFrameOutcome::new(stream_id, frame_count as u64);
357    if let Some(v) = summary_version {
358        out = out.with_summary_version(v);
359    }
360    Ok(out)
361}
362
363// ── Cursor parsing ──
364
365fn parse_cursor_lower(c: &StreamCursor) -> Result<(i64, i32), EngineError> {
366    match c {
367        StreamCursor::Start => Ok((i64::MIN, i32::MIN)),
368        StreamCursor::End => Ok((i64::MAX, i32::MAX)),
369        StreamCursor::At(s) => parse_at(s),
370    }
371}
372
373fn parse_cursor_upper(c: &StreamCursor) -> Result<(i64, i32), EngineError> {
374    match c {
375        StreamCursor::Start => Ok((i64::MIN, i32::MIN)),
376        StreamCursor::End => Ok((i64::MAX, i32::MAX)),
377        StreamCursor::At(s) => parse_at(s),
378    }
379}
380
381fn parse_at(s: &str) -> Result<(i64, i32), EngineError> {
382    let (ms, seq) = match s.split_once('-') {
383        Some((a, b)) => (a, b),
384        None => (s, "0"),
385    };
386    let ms: i64 = ms.parse().map_err(|_| EngineError::Validation {
387        kind: ValidationKind::InvalidInput,
388        detail: format!("bad stream cursor '{s}' (ms)"),
389    })?;
390    let sq: i32 = seq.parse().map_err(|_| EngineError::Validation {
391        kind: ValidationKind::InvalidInput,
392        detail: format!("bad stream cursor '{s}' (seq)"),
393    })?;
394    Ok((ms, sq))
395}
396
397// ── read_stream ──
398
399pub async fn read_stream(
400    pool: &PgPool,
401    execution_id: &ExecutionId,
402    attempt_index: AttemptIndex,
403    from: StreamCursor,
404    to: StreamCursor,
405    count_limit: u64,
406) -> Result<StreamFrames, EngineError> {
407    let eid_uuid = exec_uuid(execution_id)?;
408    let pkey = partition_key_of(execution_id);
409    let aidx = attempt_index.0 as i32;
410    let (from_ms, from_seq) = parse_cursor_lower(&from)?;
411    let (to_ms, to_seq) = parse_cursor_upper(&to)?;
412    let lim = count_limit.min(ff_core::contracts::STREAM_READ_HARD_CAP) as i64;
413
414    let rows = sqlx::query(
415        "SELECT ts_ms, seq, fields FROM ff_stream_frame \
416         WHERE partition_key=$1 AND execution_id=$2 AND attempt_index=$3 \
417         AND (ts_ms, seq) >= ($4, $5) AND (ts_ms, seq) <= ($6, $7) \
418         ORDER BY ts_ms, seq LIMIT $8",
419    )
420    .bind(pkey)
421    .bind(eid_uuid)
422    .bind(aidx)
423    .bind(from_ms)
424    .bind(from_seq)
425    .bind(to_ms)
426    .bind(to_seq)
427    .bind(lim)
428    .fetch_all(pool)
429    .await
430    .map_err(map_sqlx_error)?;
431
432    let mut frames = Vec::with_capacity(rows.len());
433    for row in rows {
434        let ts: i64 = row.get("ts_ms");
435        let seq: i32 = row.get("seq");
436        let fields: Json = row.get("fields");
437        frames.push(row_to_frame(ts, seq, fields));
438    }
439    Ok(StreamFrames {
440        frames,
441        closed_at: None,
442        closed_reason: None,
443    })
444}
445
446fn row_to_frame(ts_ms: i64, seq: i32, fields: Json) -> StreamFrame {
447    let mut out = BTreeMap::new();
448    if let Json::Object(map) = fields {
449        for (k, v) in map {
450            let s = match v {
451                Json::String(s) => s,
452                other => other.to_string(),
453            };
454            out.insert(k, s);
455        }
456    }
457    StreamFrame {
458        id: format!("{ts_ms}-{seq}"),
459        fields: out,
460    }
461}
462
463// ── tail_stream ──
464
465#[allow(clippy::too_many_arguments)] // mirrors the EngineBackend trait method signature
466pub async fn tail_stream(
467    pool: &PgPool,
468    notifier: &StreamNotifier,
469    execution_id: &ExecutionId,
470    attempt_index: AttemptIndex,
471    after: StreamCursor,
472    block_ms: u64,
473    count_limit: u64,
474    visibility: TailVisibility,
475) -> Result<StreamFrames, EngineError> {
476    let eid_uuid = exec_uuid(execution_id)?;
477    let pkey = partition_key_of(execution_id);
478    let aidx = attempt_index.0 as i32;
479    let (after_ms, after_seq) = match &after {
480        StreamCursor::At(s) => parse_at(s)?,
481        _ => {
482            return Err(EngineError::Validation {
483                kind: ValidationKind::InvalidInput,
484                detail: "tail_stream requires concrete after cursor".into(),
485            });
486        }
487    };
488    let lim = count_limit.min(ff_core::contracts::STREAM_READ_HARD_CAP) as i64;
489    let visibility_filter = match visibility {
490        TailVisibility::ExcludeBestEffort => "AND mode <> 'best_effort'",
491        _ => "",
492    };
493
494    // Subscribe BEFORE the first SELECT so we never miss a NOTIFY
495    // that fires between the SELECT and our park.
496    let chan = channel_name(&eid_uuid, aidx as u32);
497    let mut rx = notifier.subscribe(&chan).await;
498
499    let do_select = |pool: PgPool| async move {
500        let sql = format!(
501            "SELECT ts_ms, seq, fields FROM ff_stream_frame \
502             WHERE partition_key=$1 AND execution_id=$2 AND attempt_index=$3 \
503             AND (ts_ms, seq) > ($4, $5) {visibility_filter} \
504             ORDER BY ts_ms, seq LIMIT $6"
505        );
506        sqlx::query(&sql)
507            .bind(pkey)
508            .bind(eid_uuid)
509            .bind(aidx)
510            .bind(after_ms)
511            .bind(after_seq)
512            .bind(lim)
513            .fetch_all(&pool)
514            .await
515            .map_err(map_sqlx_error)
516    };
517
518    // First opportunistic SELECT — returns fast-path if frames
519    // already exist.
520    let rows = do_select(pool.clone()).await?;
521    if !rows.is_empty() || block_ms == 0 {
522        return Ok(rows_to_frames(rows));
523    }
524
525    // Park on the broadcast receiver — NO pg conn held here.
526    // Loop until timeout OR the re-SELECT returns a non-empty set:
527    // wake-ups may come from `Reconnect` signals that carry no new
528    // frames, so we must re-check the cursor and keep waiting.
529    let start = std::time::Instant::now();
530    let total = Duration::from_millis(block_ms);
531    loop {
532        let remaining = match total.checked_sub(start.elapsed()) {
533            Some(r) if !r.is_zero() => r,
534            _ => break,
535        };
536        let _ = tokio::time::timeout(remaining, rx.recv()).await;
537        let rows = do_select(pool.clone()).await?;
538        if !rows.is_empty() {
539            return Ok(rows_to_frames(rows));
540        }
541        // If timed out (elapsed >= total), break and return empty.
542        if start.elapsed() >= total {
543            break;
544        }
545    }
546
547    Ok(StreamFrames::empty_open())
548}
549
550fn rows_to_frames(rows: Vec<sqlx::postgres::PgRow>) -> StreamFrames {
551    let mut frames = Vec::with_capacity(rows.len());
552    for row in rows {
553        let ts: i64 = row.get("ts_ms");
554        let seq: i32 = row.get("seq");
555        let fields: Json = row.get("fields");
556        frames.push(row_to_frame(ts, seq, fields));
557    }
558    StreamFrames {
559        frames,
560        closed_at: None,
561        closed_reason: None,
562    }
563}
564
565// ── read_summary ──
566
567pub async fn read_summary(
568    pool: &PgPool,
569    execution_id: &ExecutionId,
570    attempt_index: AttemptIndex,
571) -> Result<Option<SummaryDocument>, EngineError> {
572    let eid_uuid = exec_uuid(execution_id)?;
573    let pkey = partition_key_of(execution_id);
574    let aidx = attempt_index.0 as i32;
575
576    let row = sqlx::query(
577        "SELECT document_json, version, patch_kind, last_updated_ms, first_applied_ms \
578         FROM ff_stream_summary \
579         WHERE partition_key=$1 AND execution_id=$2 AND attempt_index=$3",
580    )
581    .bind(pkey)
582    .bind(eid_uuid)
583    .bind(aidx)
584    .fetch_optional(pool)
585    .await
586    .map_err(map_sqlx_error)?;
587
588    let Some(row) = row else { return Ok(None) };
589    let doc: Json = row.get("document_json");
590    let version: i32 = row.get("version");
591    let patch_kind_wire: Option<String> = row.get("patch_kind");
592    let last_updated: i64 = row.get("last_updated_ms");
593    let first_applied: i64 = row.get("first_applied_ms");
594
595    let bytes = serde_json::to_vec(&doc).map_err(|e| EngineError::Validation {
596        kind: ValidationKind::InvalidInput,
597        detail: format!("summary document not serialisable: {e}"),
598    })?;
599    let patch_kind = match patch_kind_wire.as_deref() {
600        Some("json-merge-patch") => PatchKind::JsonMergePatch,
601        _ => PatchKind::JsonMergePatch,
602    };
603    Ok(Some(SummaryDocument::new(
604        bytes,
605        version as u64,
606        patch_kind,
607        last_updated as u64,
608        first_applied as u64,
609    )))
610}
611