1use std::collections::BTreeMap;
15use std::time::Duration;
16
17use ff_core::backend::{
18 AppendFrameOutcome, Frame, FrameKind, Handle, PatchKind, StreamMode, SummaryDocument,
19 TailVisibility,
20};
21use ff_core::contracts::{StreamCursor, StreamFrame, StreamFrames};
22use ff_core::engine_error::{EngineError, ValidationKind};
23use ff_core::partition::PartitionConfig;
24use ff_core::types::{AttemptIndex, ExecutionId};
25use serde_json::Value as Json;
26use sqlx::{PgPool, Row};
27use uuid::Uuid;
28
29use crate::error::map_sqlx_error;
30use crate::handle_codec::decode_handle;
31use crate::listener::{channel_name, StreamNotifier};
32
33fn exec_uuid(eid: &ExecutionId) -> Result<Uuid, EngineError> {
35 let s = eid.as_str();
36 let after = s.split_once("}:").map(|(_, r)| r).unwrap_or(s);
38 Uuid::parse_str(after).map_err(|e| EngineError::Validation {
39 kind: ValidationKind::InvalidInput,
40 detail: format!("invalid execution_id uuid tail: {e}"),
41 })
42}
43
44fn partition_key_of(eid: &ExecutionId) -> i16 {
45 (eid.partition() % 256) as i16
49}
50
51fn frame_type_of(frame: &Frame) -> String {
52 if frame.frame_type.is_empty() {
53 match frame.kind {
54 FrameKind::Stdout => "stdout",
55 FrameKind::Stderr => "stderr",
56 FrameKind::Event => "event",
57 FrameKind::Blob => "blob",
58 _ => "event",
59 }
60 .to_owned()
61 } else {
62 frame.frame_type.clone()
63 }
64}
65
66fn now_ms() -> i64 {
67 use std::time::{SystemTime, UNIX_EPOCH};
68 SystemTime::now()
69 .duration_since(UNIX_EPOCH)
70 .map(|d| d.as_millis() as i64)
71 .unwrap_or(0)
72}
73
74fn build_fields_json(frame: &Frame) -> Json {
77 let payload_str = String::from_utf8_lossy(&frame.bytes).into_owned();
78 let mut map = serde_json::Map::new();
79 map.insert("frame_type".into(), Json::String(frame_type_of(frame)));
80 map.insert("payload".into(), Json::String(payload_str));
81 map.insert("encoding".into(), Json::String("utf8".into()));
82 map.insert("source".into(), Json::String("worker".into()));
83 if let Some(corr) = &frame.correlation_id {
84 map.insert("correlation_id".into(), Json::String(corr.clone()));
85 }
86 Json::Object(map)
87}
88
89pub(crate) fn apply_json_merge_patch(target: &mut Json, patch: &Json) {
96 if let Json::Object(patch_map) = patch {
97 if !target.is_object() {
98 *target = Json::Object(serde_json::Map::new());
99 }
100 let target_map = target.as_object_mut().expect("just ensured object");
101 for (k, v) in patch_map {
102 match v {
103 Json::Null => {
104 target_map.remove(k);
105 }
106 Json::String(s) if s == ff_core::backend::SUMMARY_NULL_SENTINEL => {
107 target_map.insert(k.clone(), Json::Null);
108 }
109 Json::Object(_) => {
110 let entry = target_map.entry(k.clone()).or_insert(Json::Null);
111 apply_json_merge_patch(entry, v);
112 }
113 other => {
114 target_map.insert(k.clone(), other.clone());
115 }
116 }
117 }
118 } else {
119 *target = patch.clone();
120 }
121}
122
123pub async fn append_frame(
126 pool: &PgPool,
127 _partition_config: &PartitionConfig,
128 handle: &Handle,
129 frame: Frame,
130) -> Result<AppendFrameOutcome, EngineError> {
131 let payload = decode_handle(handle)?;
132 let eid_uuid = exec_uuid(&payload.execution_id)?;
133 let pkey = partition_key_of(&payload.execution_id);
134 let aidx = payload.attempt_index.0 as i32;
135
136 let mode_wire = frame.mode.wire_str();
137 let fields = build_fields_json(&frame);
138
139 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
140
141 let ts_ms: i64 = now_ms();
146
147 let lock_key: i64 = {
150 use std::collections::hash_map::DefaultHasher;
151 use std::hash::{Hash, Hasher};
152 let mut h = DefaultHasher::new();
153 (eid_uuid.as_bytes(), aidx).hash(&mut h);
154 h.finish() as i64
155 };
156 sqlx::query("SELECT pg_advisory_xact_lock($1)")
157 .bind(lock_key)
158 .execute(&mut *tx)
159 .await
160 .map_err(map_sqlx_error)?;
161
162 let next_seq: i32 = sqlx::query_scalar::<_, Option<i32>>(
164 "SELECT MAX(seq) FROM ff_stream_frame \
165 WHERE partition_key=$1 AND execution_id=$2 \
166 AND attempt_index=$3 AND ts_ms=$4",
167 )
168 .bind(pkey)
169 .bind(eid_uuid)
170 .bind(aidx)
171 .bind(ts_ms)
172 .fetch_one(&mut *tx)
173 .await
174 .map_err(map_sqlx_error)?
175 .map(|s| s + 1)
176 .unwrap_or(0);
177
178 sqlx::query(
179 "INSERT INTO ff_stream_frame \
180 (partition_key, execution_id, attempt_index, ts_ms, seq, fields, mode, created_at_ms) \
181 VALUES ($1,$2,$3,$4,$5,$6,$7,$8)",
182 )
183 .bind(pkey)
184 .bind(eid_uuid)
185 .bind(aidx)
186 .bind(ts_ms)
187 .bind(next_seq)
188 .bind(&fields)
189 .bind(mode_wire)
190 .bind(ts_ms)
191 .execute(&mut *tx)
192 .await
193 .map_err(map_sqlx_error)?;
194
195 let mut summary_version: Option<u64> = None;
196
197 if let StreamMode::DurableSummary { patch_kind } = &frame.mode {
200 let patch: Json = serde_json::from_slice(&frame.bytes).map_err(|e| {
203 EngineError::Validation {
204 kind: ValidationKind::InvalidInput,
205 detail: format!("summary patch not valid JSON: {e}"),
206 }
207 })?;
208
209 let existing: Option<(Json, i32)> = sqlx::query_as(
211 "SELECT document_json, version FROM ff_stream_summary \
212 WHERE partition_key=$1 AND execution_id=$2 AND attempt_index=$3 \
213 FOR UPDATE",
214 )
215 .bind(pkey)
216 .bind(eid_uuid)
217 .bind(aidx)
218 .fetch_optional(&mut *tx)
219 .await
220 .map_err(map_sqlx_error)?;
221
222 let (mut doc, prev_version): (Json, i32) = match existing {
223 Some((d, v)) => (d, v),
224 None => (Json::Object(serde_json::Map::new()), 0),
225 };
226
227 match patch_kind {
228 PatchKind::JsonMergePatch => apply_json_merge_patch(&mut doc, &patch),
229 _ => apply_json_merge_patch(&mut doc, &patch),
230 }
231
232 let new_version = prev_version + 1;
233 let patch_kind_wire = "json-merge-patch";
234 if prev_version == 0 {
235 sqlx::query(
236 "INSERT INTO ff_stream_summary \
237 (partition_key, execution_id, attempt_index, document_json, \
238 version, patch_kind, last_updated_ms, first_applied_ms) \
239 VALUES ($1,$2,$3,$4,$5,$6,$7,$8)",
240 )
241 .bind(pkey)
242 .bind(eid_uuid)
243 .bind(aidx)
244 .bind(&doc)
245 .bind(new_version)
246 .bind(patch_kind_wire)
247 .bind(ts_ms)
248 .bind(ts_ms)
249 .execute(&mut *tx)
250 .await
251 .map_err(map_sqlx_error)?;
252 } else {
253 sqlx::query(
254 "UPDATE ff_stream_summary SET document_json=$4, version=$5, \
255 patch_kind=$6, last_updated_ms=$7 \
256 WHERE partition_key=$1 AND execution_id=$2 AND attempt_index=$3",
257 )
258 .bind(pkey)
259 .bind(eid_uuid)
260 .bind(aidx)
261 .bind(&doc)
262 .bind(new_version)
263 .bind(patch_kind_wire)
264 .bind(ts_ms)
265 .execute(&mut *tx)
266 .await
267 .map_err(map_sqlx_error)?;
268 }
269 summary_version = Some(new_version as u64);
270 }
271
272 if let StreamMode::BestEffortLive { config } = &frame.mode {
274 let meta: Option<(f64, i64)> = sqlx::query_as(
275 "SELECT ema_rate_hz, last_append_ts_ms FROM ff_stream_meta \
276 WHERE partition_key=$1 AND execution_id=$2 AND attempt_index=$3 \
277 FOR UPDATE",
278 )
279 .bind(pkey)
280 .bind(eid_uuid)
281 .bind(aidx)
282 .fetch_optional(&mut *tx)
283 .await
284 .map_err(map_sqlx_error)?;
285
286 let (ema_prev, last_ts) = meta.unwrap_or((0.0, 0));
287 let inst_rate: f64 = if last_ts > 0 && ts_ms > last_ts {
288 1000.0 / ((ts_ms - last_ts) as f64)
289 } else {
290 0.0
291 };
292 let alpha = config.ema_alpha;
293 let ema_new = alpha * inst_rate + (1.0 - alpha) * ema_prev;
294 let k_raw = (ema_new * (config.ttl_ms as f64) / 1000.0).ceil() as i64 * 2;
295 let k = k_raw
296 .max(config.maxlen_floor as i64)
297 .min(config.maxlen_ceiling as i64);
298
299 sqlx::query(
301 "INSERT INTO ff_stream_meta \
302 (partition_key, execution_id, attempt_index, ema_rate_hz, \
303 last_append_ts_ms, maxlen_applied_last) \
304 VALUES ($1,$2,$3,$4,$5,$6) \
305 ON CONFLICT (partition_key, execution_id, attempt_index) DO UPDATE \
306 SET ema_rate_hz = EXCLUDED.ema_rate_hz, \
307 last_append_ts_ms = EXCLUDED.last_append_ts_ms, \
308 maxlen_applied_last = EXCLUDED.maxlen_applied_last",
309 )
310 .bind(pkey)
311 .bind(eid_uuid)
312 .bind(aidx)
313 .bind(ema_new)
314 .bind(ts_ms)
315 .bind(k as i32)
316 .execute(&mut *tx)
317 .await
318 .map_err(map_sqlx_error)?;
319
320 sqlx::query(
324 "DELETE FROM ff_stream_frame \
325 WHERE partition_key=$1 AND execution_id=$2 AND attempt_index=$3 \
326 AND (ts_ms, seq) NOT IN ( \
327 SELECT ts_ms, seq FROM ff_stream_frame \
328 WHERE partition_key=$1 AND execution_id=$2 AND attempt_index=$3 \
329 ORDER BY ts_ms DESC, seq DESC \
330 LIMIT $4)",
331 )
332 .bind(pkey)
333 .bind(eid_uuid)
334 .bind(aidx)
335 .bind(k)
336 .execute(&mut *tx)
337 .await
338 .map_err(map_sqlx_error)?;
339 }
340
341 let frame_count: i64 = sqlx::query_scalar(
343 "SELECT COUNT(*)::bigint FROM ff_stream_frame \
344 WHERE partition_key=$1 AND execution_id=$2 AND attempt_index=$3",
345 )
346 .bind(pkey)
347 .bind(eid_uuid)
348 .bind(aidx)
349 .fetch_one(&mut *tx)
350 .await
351 .map_err(map_sqlx_error)?;
352
353 tx.commit().await.map_err(map_sqlx_error)?;
354
355 let stream_id = format!("{ts_ms}-{next_seq}");
356 let mut out = AppendFrameOutcome::new(stream_id, frame_count as u64);
357 if let Some(v) = summary_version {
358 out = out.with_summary_version(v);
359 }
360 Ok(out)
361}
362
363fn parse_cursor_lower(c: &StreamCursor) -> Result<(i64, i32), EngineError> {
366 match c {
367 StreamCursor::Start => Ok((i64::MIN, i32::MIN)),
368 StreamCursor::End => Ok((i64::MAX, i32::MAX)),
369 StreamCursor::At(s) => parse_at(s),
370 }
371}
372
373fn parse_cursor_upper(c: &StreamCursor) -> Result<(i64, i32), EngineError> {
374 match c {
375 StreamCursor::Start => Ok((i64::MIN, i32::MIN)),
376 StreamCursor::End => Ok((i64::MAX, i32::MAX)),
377 StreamCursor::At(s) => parse_at(s),
378 }
379}
380
381fn parse_at(s: &str) -> Result<(i64, i32), EngineError> {
382 let (ms, seq) = match s.split_once('-') {
383 Some((a, b)) => (a, b),
384 None => (s, "0"),
385 };
386 let ms: i64 = ms.parse().map_err(|_| EngineError::Validation {
387 kind: ValidationKind::InvalidInput,
388 detail: format!("bad stream cursor '{s}' (ms)"),
389 })?;
390 let sq: i32 = seq.parse().map_err(|_| EngineError::Validation {
391 kind: ValidationKind::InvalidInput,
392 detail: format!("bad stream cursor '{s}' (seq)"),
393 })?;
394 Ok((ms, sq))
395}
396
397pub async fn read_stream(
400 pool: &PgPool,
401 execution_id: &ExecutionId,
402 attempt_index: AttemptIndex,
403 from: StreamCursor,
404 to: StreamCursor,
405 count_limit: u64,
406) -> Result<StreamFrames, EngineError> {
407 let eid_uuid = exec_uuid(execution_id)?;
408 let pkey = partition_key_of(execution_id);
409 let aidx = attempt_index.0 as i32;
410 let (from_ms, from_seq) = parse_cursor_lower(&from)?;
411 let (to_ms, to_seq) = parse_cursor_upper(&to)?;
412 let lim = count_limit.min(ff_core::contracts::STREAM_READ_HARD_CAP) as i64;
413
414 let rows = sqlx::query(
415 "SELECT ts_ms, seq, fields FROM ff_stream_frame \
416 WHERE partition_key=$1 AND execution_id=$2 AND attempt_index=$3 \
417 AND (ts_ms, seq) >= ($4, $5) AND (ts_ms, seq) <= ($6, $7) \
418 ORDER BY ts_ms, seq LIMIT $8",
419 )
420 .bind(pkey)
421 .bind(eid_uuid)
422 .bind(aidx)
423 .bind(from_ms)
424 .bind(from_seq)
425 .bind(to_ms)
426 .bind(to_seq)
427 .bind(lim)
428 .fetch_all(pool)
429 .await
430 .map_err(map_sqlx_error)?;
431
432 let mut frames = Vec::with_capacity(rows.len());
433 for row in rows {
434 let ts: i64 = row.get("ts_ms");
435 let seq: i32 = row.get("seq");
436 let fields: Json = row.get("fields");
437 frames.push(row_to_frame(ts, seq, fields));
438 }
439 Ok(StreamFrames {
440 frames,
441 closed_at: None,
442 closed_reason: None,
443 })
444}
445
446fn row_to_frame(ts_ms: i64, seq: i32, fields: Json) -> StreamFrame {
447 let mut out = BTreeMap::new();
448 if let Json::Object(map) = fields {
449 for (k, v) in map {
450 let s = match v {
451 Json::String(s) => s,
452 other => other.to_string(),
453 };
454 out.insert(k, s);
455 }
456 }
457 StreamFrame {
458 id: format!("{ts_ms}-{seq}"),
459 fields: out,
460 }
461}
462
463#[allow(clippy::too_many_arguments)] pub async fn tail_stream(
467 pool: &PgPool,
468 notifier: &StreamNotifier,
469 execution_id: &ExecutionId,
470 attempt_index: AttemptIndex,
471 after: StreamCursor,
472 block_ms: u64,
473 count_limit: u64,
474 visibility: TailVisibility,
475) -> Result<StreamFrames, EngineError> {
476 let eid_uuid = exec_uuid(execution_id)?;
477 let pkey = partition_key_of(execution_id);
478 let aidx = attempt_index.0 as i32;
479 let (after_ms, after_seq) = match &after {
480 StreamCursor::At(s) => parse_at(s)?,
481 _ => {
482 return Err(EngineError::Validation {
483 kind: ValidationKind::InvalidInput,
484 detail: "tail_stream requires concrete after cursor".into(),
485 });
486 }
487 };
488 let lim = count_limit.min(ff_core::contracts::STREAM_READ_HARD_CAP) as i64;
489 let visibility_filter = match visibility {
490 TailVisibility::ExcludeBestEffort => "AND mode <> 'best_effort'",
491 _ => "",
492 };
493
494 let chan = channel_name(&eid_uuid, aidx as u32);
497 let mut rx = notifier.subscribe(&chan).await;
498
499 let do_select = |pool: PgPool| async move {
500 let sql = format!(
501 "SELECT ts_ms, seq, fields FROM ff_stream_frame \
502 WHERE partition_key=$1 AND execution_id=$2 AND attempt_index=$3 \
503 AND (ts_ms, seq) > ($4, $5) {visibility_filter} \
504 ORDER BY ts_ms, seq LIMIT $6"
505 );
506 sqlx::query(&sql)
507 .bind(pkey)
508 .bind(eid_uuid)
509 .bind(aidx)
510 .bind(after_ms)
511 .bind(after_seq)
512 .bind(lim)
513 .fetch_all(&pool)
514 .await
515 .map_err(map_sqlx_error)
516 };
517
518 let rows = do_select(pool.clone()).await?;
521 if !rows.is_empty() || block_ms == 0 {
522 return Ok(rows_to_frames(rows));
523 }
524
525 let start = std::time::Instant::now();
530 let total = Duration::from_millis(block_ms);
531 loop {
532 let remaining = match total.checked_sub(start.elapsed()) {
533 Some(r) if !r.is_zero() => r,
534 _ => break,
535 };
536 let _ = tokio::time::timeout(remaining, rx.recv()).await;
537 let rows = do_select(pool.clone()).await?;
538 if !rows.is_empty() {
539 return Ok(rows_to_frames(rows));
540 }
541 if start.elapsed() >= total {
543 break;
544 }
545 }
546
547 Ok(StreamFrames::empty_open())
548}
549
550fn rows_to_frames(rows: Vec<sqlx::postgres::PgRow>) -> StreamFrames {
551 let mut frames = Vec::with_capacity(rows.len());
552 for row in rows {
553 let ts: i64 = row.get("ts_ms");
554 let seq: i32 = row.get("seq");
555 let fields: Json = row.get("fields");
556 frames.push(row_to_frame(ts, seq, fields));
557 }
558 StreamFrames {
559 frames,
560 closed_at: None,
561 closed_reason: None,
562 }
563}
564
565pub async fn read_summary(
568 pool: &PgPool,
569 execution_id: &ExecutionId,
570 attempt_index: AttemptIndex,
571) -> Result<Option<SummaryDocument>, EngineError> {
572 let eid_uuid = exec_uuid(execution_id)?;
573 let pkey = partition_key_of(execution_id);
574 let aidx = attempt_index.0 as i32;
575
576 let row = sqlx::query(
577 "SELECT document_json, version, patch_kind, last_updated_ms, first_applied_ms \
578 FROM ff_stream_summary \
579 WHERE partition_key=$1 AND execution_id=$2 AND attempt_index=$3",
580 )
581 .bind(pkey)
582 .bind(eid_uuid)
583 .bind(aidx)
584 .fetch_optional(pool)
585 .await
586 .map_err(map_sqlx_error)?;
587
588 let Some(row) = row else { return Ok(None) };
589 let doc: Json = row.get("document_json");
590 let version: i32 = row.get("version");
591 let patch_kind_wire: Option<String> = row.get("patch_kind");
592 let last_updated: i64 = row.get("last_updated_ms");
593 let first_applied: i64 = row.get("first_applied_ms");
594
595 let bytes = serde_json::to_vec(&doc).map_err(|e| EngineError::Validation {
596 kind: ValidationKind::InvalidInput,
597 detail: format!("summary document not serialisable: {e}"),
598 })?;
599 let patch_kind = match patch_kind_wire.as_deref() {
600 Some("json-merge-patch") => PatchKind::JsonMergePatch,
601 _ => PatchKind::JsonMergePatch,
602 };
603 Ok(Some(SummaryDocument::new(
604 bytes,
605 version as u64,
606 patch_kind,
607 last_updated as u64,
608 first_applied as u64,
609 )))
610}
611