1use std::collections::HashMap;
22use std::time::{SystemTime, UNIX_EPOCH};
23
24use ff_core::backend::{
25 BackendTag, Handle, HandleKind, HandleOpaque, PendingWaitpoint, ResumeSignal, WaitpointHmac,
26};
27use ff_core::contracts::{
28 AdditionalWaitpointBinding, ClaimResumedExecutionArgs, ClaimResumedExecutionResult,
29 ClaimedResumedExecution, CompositeBody, DeliverApprovalSignalArgs, DeliverSignalArgs,
30 DeliverSignalResult, ListPendingWaitpointsArgs, ListPendingWaitpointsResult,
31 PendingWaitpointInfo, ResumeCondition, SignalMatcher, SuspendArgs, SuspendOutcome,
32 SuspendOutcomeDetails, WaitpointBinding,
33};
34use ff_core::engine_error::{ContentionKind, EngineError, ValidationKind};
35use ff_core::handle_codec::{encode as encode_opaque, HandlePayload};
36use ff_core::partition::PartitionConfig;
37use ff_core::types::{
38 AttemptId, AttemptIndex, ExecutionId, LaneId, LeaseEpoch, LeaseFence, SignalId, SuspensionId,
39 TimestampMs, WaitpointId, WaitpointToken,
40};
41use serde_json::{json, Value as JsonValue};
42use sqlx::{PgPool, Postgres, Transaction};
43use uuid::Uuid;
44
45use crate::error::map_sqlx_error;
46use crate::lease_event;
47use crate::signal::{hmac_sign, hmac_verify, is_retryable_serialization, SERIALIZABLE_RETRY_BUDGET};
48use crate::signal_event;
49use crate::suspend::evaluate;
50
51fn now_ms() -> i64 {
54 let d = SystemTime::now()
55 .duration_since(UNIX_EPOCH)
56 .unwrap_or_default();
57 (d.as_millis() as i64).max(0)
58}
59
60fn split_exec_id(eid: &ExecutionId) -> Result<(i16, Uuid), EngineError> {
61 let s = eid.as_str();
62 let rest = s.strip_prefix("{fp:").ok_or_else(|| EngineError::Validation {
63 kind: ValidationKind::InvalidInput,
64 detail: format!("execution_id missing `{{fp:` prefix: {s}"),
65 })?;
66 let close = rest.find("}:").ok_or_else(|| EngineError::Validation {
67 kind: ValidationKind::InvalidInput,
68 detail: format!("execution_id missing `}}:`: {s}"),
69 })?;
70 let part: i16 = rest[..close]
71 .parse()
72 .map_err(|_| EngineError::Validation {
73 kind: ValidationKind::InvalidInput,
74 detail: format!("execution_id partition index not u16: {s}"),
75 })?;
76 let uuid = Uuid::parse_str(&rest[close + 2..]).map_err(|_| EngineError::Validation {
77 kind: ValidationKind::InvalidInput,
78 detail: format!("execution_id UUID invalid: {s}"),
79 })?;
80 Ok((part, uuid))
81}
82
83fn decode_handle(handle: &Handle) -> Result<HandlePayload, EngineError> {
84 if handle.backend != BackendTag::Postgres {
85 return Err(EngineError::Validation {
86 kind: ValidationKind::HandleFromOtherBackend,
87 detail: format!("expected Postgres, got {:?}", handle.backend),
88 });
89 }
90 let decoded = ff_core::handle_codec::decode(&handle.opaque)?;
91 if decoded.tag != BackendTag::Postgres {
92 return Err(EngineError::Validation {
93 kind: ValidationKind::HandleFromOtherBackend,
94 detail: format!("embedded tag {:?}", decoded.tag),
95 });
96 }
97 Ok(decoded.payload)
98}
99
100fn wp_uuid(w: &WaitpointId) -> Result<Uuid, EngineError> {
101 Uuid::parse_str(&w.to_string()).map_err(|e| EngineError::Validation {
102 kind: ValidationKind::InvalidInput,
103 detail: format!("waitpoint_id not a UUID: {e}"),
104 })
105}
106
107fn susp_uuid(s: &SuspensionId) -> Result<Uuid, EngineError> {
108 Uuid::parse_str(&s.to_string()).map_err(|e| EngineError::Validation {
109 kind: ValidationKind::InvalidInput,
110 detail: format!("suspension_id not a UUID: {e}"),
111 })
112}
113
114fn derive_required_signal_names(cond: &ResumeCondition, wp_key: &str) -> Vec<String> {
132 const OPERATOR_ONLY_SENTINEL: &str = "__operator_only__";
133 const TIMEOUT_ONLY_SENTINEL: &str = "__timeout_only__";
134
135 let mut out: Vec<String> = Vec::new();
136 let mut push = |name: &str| {
137 if !name.is_empty() && !out.iter().any(|e| e == name) {
138 out.push(name.to_owned());
139 }
140 };
141 fn walk(cond: &ResumeCondition, target: &str, push: &mut dyn FnMut(&str)) {
142 match cond {
143 ResumeCondition::Single {
144 waitpoint_key,
145 matcher,
146 } => {
147 if waitpoint_key == target
148 && let SignalMatcher::ByName(name) = matcher
149 {
150 push(name.as_str());
151 }
152 }
153 ResumeCondition::OperatorOnly => push(OPERATOR_ONLY_SENTINEL),
154 ResumeCondition::TimeoutOnly => push(TIMEOUT_ONLY_SENTINEL),
155 ResumeCondition::Composite(body) => walk_body(body, target, push),
156 _ => {}
157 }
158 }
159 fn walk_body(body: &CompositeBody, target: &str, push: &mut dyn FnMut(&str)) {
160 match body {
161 CompositeBody::AllOf { members } => {
162 for m in members {
163 walk(m, target, push);
164 }
165 }
166 CompositeBody::Count {
167 matcher, waitpoints, ..
168 } => {
169 if waitpoints.iter().any(|w| w == target)
170 && let Some(SignalMatcher::ByName(name)) = matcher
171 {
172 push(name.as_str());
173 }
174 }
175 _ => {}
176 }
177 }
178 walk(cond, wp_key, &mut push);
179 out
180}
181
182fn parse_waitpoint_token_kid_fp(raw: &str) -> (String, String) {
189 match raw.split_once(':') {
190 Some((kid, hex)) if !kid.is_empty() && !hex.is_empty() => {
191 let fp_len = hex.len().min(16);
192 (kid.to_owned(), hex[..fp_len].to_owned())
193 }
194 _ => (String::new(), String::new()),
195 }
196}
197
198fn is_retryable_engine(err: &EngineError) -> bool {
205 match err {
206 EngineError::Transport { source, .. } => {
207 let s = source.to_string();
208 s.contains("40001")
209 || s.contains("40P01")
210 || s.contains("serialization_failure")
211 || s.contains("deadlock_detected")
212 }
213 EngineError::Contention(ContentionKind::LeaseConflict) => true,
223 _ => false,
224 }
225}
226
227async fn run_serializable<T, F>(pool: &PgPool, mut op: F) -> Result<T, EngineError>
231where
232 T: Send,
233 F: for<'a> FnMut(
234 &'a mut Transaction<'_, Postgres>,
235 ) -> std::pin::Pin<
236 Box<dyn std::future::Future<Output = Result<T, EngineError>> + Send + 'a>,
237 > + Send,
238{
239 for _ in 0..SERIALIZABLE_RETRY_BUDGET {
240 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
241 sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
242 .execute(&mut *tx)
243 .await
244 .map_err(map_sqlx_error)?;
245 let body_res = op(&mut tx).await;
246 match body_res {
247 Ok(v) => match tx.commit().await {
248 Ok(()) => return Ok(v),
249 Err(e) if is_retryable_serialization(&e) => continue,
250 Err(e) => return Err(map_sqlx_error(e)),
251 },
252 Err(e) if is_retryable_engine(&e) => {
253 let _ = tx.rollback().await;
254 continue;
255 }
256 Err(e) => {
257 let _ = tx.rollback().await;
258 return Err(e);
259 }
260 }
261 }
262 Err(EngineError::Contention(ContentionKind::RetryExhausted))
263}
264
265fn outcome_to_dedup_json(outcome: &SuspendOutcome) -> JsonValue {
268 let details = outcome.details();
269 let extras: Vec<JsonValue> = details
270 .additional_waitpoints
271 .iter()
272 .map(|e| {
273 json!({
274 "waitpoint_id": e.waitpoint_id.to_string(),
275 "waitpoint_key": e.waitpoint_key,
276 "token": e.waitpoint_token.as_str(),
277 })
278 })
279 .collect();
280 let (variant, handle_opaque) = match outcome {
281 SuspendOutcome::Suspended { handle, .. } => {
282 ("Suspended", Some(hex::encode(handle.opaque.as_bytes())))
283 }
284 SuspendOutcome::AlreadySatisfied { .. } => ("AlreadySatisfied", None),
285 _ => ("Suspended", None),
286 };
287 json!({
288 "variant": variant,
289 "details": {
290 "suspension_id": details.suspension_id.to_string(),
291 "waitpoint_id": details.waitpoint_id.to_string(),
292 "waitpoint_key": details.waitpoint_key,
293 "token": details.waitpoint_token.as_str(),
294 "extras": extras,
295 },
296 "handle_opaque_hex": handle_opaque,
297 })
298}
299
300fn outcome_from_dedup_json(v: &JsonValue) -> Result<SuspendOutcome, EngineError> {
301 let corrupt = |s: String| EngineError::Validation {
302 kind: ValidationKind::Corruption,
303 detail: s,
304 };
305 let det = &v["details"];
306 let suspension_id = SuspensionId::parse(det["suspension_id"].as_str().unwrap_or(""))
307 .map_err(|e| corrupt(format!("dedup suspension_id: {e}")))?;
308 let waitpoint_id = WaitpointId::parse(det["waitpoint_id"].as_str().unwrap_or(""))
309 .map_err(|e| corrupt(format!("dedup waitpoint_id: {e}")))?;
310 let waitpoint_key = det["waitpoint_key"].as_str().unwrap_or("").to_owned();
311 let token = det["token"].as_str().unwrap_or("").to_owned();
312 let mut extras: Vec<AdditionalWaitpointBinding> = Vec::new();
313 if let Some(arr) = det["extras"].as_array() {
314 for e in arr {
315 let wid = WaitpointId::parse(e["waitpoint_id"].as_str().unwrap_or(""))
316 .map_err(|err| corrupt(format!("dedup extra wp_id: {err}")))?;
317 let wkey = e["waitpoint_key"].as_str().unwrap_or("").to_owned();
318 let tok = e["token"].as_str().unwrap_or("").to_owned();
319 extras.push(AdditionalWaitpointBinding::new(
320 wid,
321 wkey,
322 WaitpointHmac::new(tok),
323 ));
324 }
325 }
326 let details = SuspendOutcomeDetails::new(
327 suspension_id,
328 waitpoint_id,
329 waitpoint_key,
330 WaitpointHmac::new(token),
331 )
332 .with_additional_waitpoints(extras);
333
334 match v["variant"].as_str().unwrap_or("Suspended") {
335 "AlreadySatisfied" => Ok(SuspendOutcome::AlreadySatisfied { details }),
336 _ => {
337 let opaque_hex = v["handle_opaque_hex"].as_str().unwrap_or("");
338 let bytes = hex::decode(opaque_hex)
339 .map_err(|e| corrupt(format!("dedup handle hex: {e}")))?;
340 let opaque = HandleOpaque::new(bytes.into_boxed_slice());
341 let handle = Handle::new(BackendTag::Postgres, HandleKind::Suspended, opaque);
342 Ok(SuspendOutcome::Suspended { details, handle })
343 }
344 }
345}
346
347pub(crate) async fn suspend_impl(
350 pool: &PgPool,
351 _partition_config: &PartitionConfig,
352 handle: &Handle,
353 args: SuspendArgs,
354) -> Result<SuspendOutcome, EngineError> {
355 let payload = decode_handle(handle)?;
356 suspend_core(pool, payload, args).await
357}
358
359pub(crate) async fn suspend_by_triple_impl(
364 pool: &PgPool,
365 _partition_config: &PartitionConfig,
366 exec_id: ExecutionId,
367 triple: LeaseFence,
368 args: SuspendArgs,
369) -> Result<SuspendOutcome, EngineError> {
370 let (part, exec_uuid) = split_exec_id(&exec_id)?;
371 let row: Option<(i32,)> = sqlx::query_as(
379 "SELECT attempt_index FROM ff_exec_core \
380 WHERE partition_key = $1 AND execution_id = $2",
381 )
382 .bind(part)
383 .bind(exec_uuid)
384 .fetch_optional(pool)
385 .await
386 .map_err(map_sqlx_error)?;
387 let attempt_index_i = match row {
388 Some((i,)) => i,
389 None => return Err(EngineError::NotFound { entity: "execution" }),
390 };
391 let attempt_index =
392 AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0));
393
394 let payload = HandlePayload::new(
399 exec_id,
400 attempt_index,
401 triple.attempt_id,
402 triple.lease_id,
403 triple.lease_epoch,
404 0,
405 ff_core::types::LaneId::new(""),
406 ff_core::types::WorkerInstanceId::new(""),
407 );
408 suspend_core(pool, payload, args).await
409}
410
411async fn suspend_core(
412 pool: &PgPool,
413 payload: HandlePayload,
414 args: SuspendArgs,
415) -> Result<SuspendOutcome, EngineError> {
416 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
417 let attempt_index_i = i32::try_from(payload.attempt_index.0).unwrap_or(0);
418 let expected_epoch = payload.lease_epoch.0;
419 let idem_key = args.idempotency_key.as_ref().map(|k| k.as_str().to_owned());
420
421 run_serializable(pool, move |tx| {
422 let args = args.clone();
423 let idem = idem_key.clone();
424 let payload = payload.clone();
425 Box::pin(async move {
426 if let Some(key) = idem.as_deref() {
428 let row: Option<(JsonValue,)> = sqlx::query_as(
429 "SELECT outcome_json FROM ff_suspend_dedup \
430 WHERE partition_key = $1 AND idempotency_key = $2",
431 )
432 .bind(part)
433 .bind(key)
434 .fetch_optional(&mut **tx)
435 .await
436 .map_err(map_sqlx_error)?;
437 if let Some((cached,)) = row {
438 return outcome_from_dedup_json(&cached);
439 }
440 }
441
442 let epoch_row: Option<(i64,)> = sqlx::query_as(
444 "SELECT lease_epoch FROM ff_attempt \
445 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3 \
446 FOR UPDATE",
447 )
448 .bind(part)
449 .bind(exec_uuid)
450 .bind(attempt_index_i)
451 .fetch_optional(&mut **tx)
452 .await
453 .map_err(map_sqlx_error)?;
454 let observed_epoch: u64 = match epoch_row {
455 Some((e,)) => u64::try_from(e).unwrap_or(0),
456 None => return Err(EngineError::NotFound { entity: "attempt" }),
457 };
458 if observed_epoch != expected_epoch {
459 return Err(EngineError::Contention(ContentionKind::LeaseConflict));
460 }
461
462 let kid_row: Option<(String, Vec<u8>)> = sqlx::query_as(
464 "SELECT kid, secret FROM ff_waitpoint_hmac \
465 WHERE active = TRUE \
466 ORDER BY rotated_at_ms DESC LIMIT 1",
467 )
468 .fetch_optional(&mut **tx)
469 .await
470 .map_err(map_sqlx_error)?;
471 let (kid, secret) = kid_row.ok_or_else(|| EngineError::Validation {
472 kind: ValidationKind::InvalidInput,
473 detail: "ff_waitpoint_hmac empty — rotate a kid before suspend".into(),
474 })?;
475
476 let now = args.now.0;
478 let mut signed: Vec<(WaitpointId, String, String)> = Vec::new();
479 for binding in args.waitpoints.iter() {
480 let (wp_id, wp_key) = match binding {
481 WaitpointBinding::Fresh {
482 waitpoint_id,
483 waitpoint_key,
484 } => (waitpoint_id.clone(), waitpoint_key.clone()),
485 WaitpointBinding::UsePending { waitpoint_id } => {
486 let row: Option<(String,)> = sqlx::query_as(
487 "SELECT waitpoint_key FROM ff_waitpoint_pending \
488 WHERE partition_key = $1 AND waitpoint_id = $2",
489 )
490 .bind(part)
491 .bind(wp_uuid(waitpoint_id)?)
492 .fetch_optional(&mut **tx)
493 .await
494 .map_err(map_sqlx_error)?;
495 let wp_key = row.map(|(k,)| k).unwrap_or_default();
496 (waitpoint_id.clone(), wp_key)
497 }
498 _ => {
499 return Err(EngineError::Validation {
500 kind: ValidationKind::InvalidInput,
501 detail: "unsupported WaitpointBinding variant".into(),
502 });
503 }
504 };
505 let msg = format!("{}:{}", payload.execution_id, wp_id);
506 let token = hmac_sign(&secret, &kid, msg.as_bytes());
507 let required_names =
519 derive_required_signal_names(&args.resume_condition, &wp_key);
520 sqlx::query(
521 "INSERT INTO ff_waitpoint_pending \
522 (partition_key, waitpoint_id, execution_id, token_kid, token, \
523 created_at_ms, expires_at_ms, waitpoint_key, \
524 state, required_signal_names, activated_at_ms) \
525 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 'active', $9, $6) \
526 ON CONFLICT (partition_key, waitpoint_id) DO UPDATE SET \
527 token_kid = EXCLUDED.token_kid, token = EXCLUDED.token, \
528 waitpoint_key = EXCLUDED.waitpoint_key, \
529 state = EXCLUDED.state, \
530 required_signal_names = EXCLUDED.required_signal_names, \
531 activated_at_ms = EXCLUDED.activated_at_ms",
532 )
533 .bind(part)
534 .bind(wp_uuid(&wp_id)?)
535 .bind(exec_uuid)
536 .bind(&kid)
537 .bind(&token)
538 .bind(now)
539 .bind(args.timeout_at.map(|t| t.0))
540 .bind(&wp_key)
541 .bind(&required_names)
542 .execute(&mut **tx)
543 .await
544 .map_err(map_sqlx_error)?;
545 signed.push((wp_id, wp_key, token));
546 }
547
548 let condition_json =
550 serde_json::to_value(&args.resume_condition).map_err(|e| {
551 EngineError::Validation {
552 kind: ValidationKind::Corruption,
553 detail: format!("resume_condition serialize: {e}"),
554 }
555 })?;
556 sqlx::query(
557 "INSERT INTO ff_suspension_current \
558 (partition_key, execution_id, suspension_id, suspended_at_ms, \
559 timeout_at_ms, reason_code, condition, satisfied_set, member_map, \
560 timeout_behavior) \
561 VALUES ($1, $2, $3, $4, $5, $6, $7, '[]'::jsonb, '{}'::jsonb, $8) \
562 ON CONFLICT (partition_key, execution_id) DO UPDATE SET \
563 suspension_id = EXCLUDED.suspension_id, \
564 suspended_at_ms = EXCLUDED.suspended_at_ms, \
565 timeout_at_ms = EXCLUDED.timeout_at_ms, \
566 reason_code = EXCLUDED.reason_code, \
567 condition = EXCLUDED.condition, \
568 satisfied_set = '[]'::jsonb, \
569 member_map = '{}'::jsonb, \
570 timeout_behavior = EXCLUDED.timeout_behavior",
571 )
572 .bind(part)
573 .bind(exec_uuid)
574 .bind(susp_uuid(&args.suspension_id)?)
575 .bind(now)
576 .bind(args.timeout_at.map(|t| t.0))
577 .bind(args.reason_code.as_wire_str())
578 .bind(&condition_json)
579 .bind(args.timeout_behavior.as_wire_str())
580 .execute(&mut **tx)
581 .await
582 .map_err(map_sqlx_error)?;
583
584 sqlx::query(
586 "UPDATE ff_exec_core \
587 SET lifecycle_phase = 'suspended', \
588 ownership_state = 'released', \
589 eligibility_state = 'not_applicable', \
590 public_state = 'suspended', \
591 attempt_state = 'attempt_interrupted' \
592 WHERE partition_key = $1 AND execution_id = $2",
593 )
594 .bind(part)
595 .bind(exec_uuid)
596 .execute(&mut **tx)
597 .await
598 .map_err(map_sqlx_error)?;
599
600 sqlx::query(
602 "UPDATE ff_attempt \
603 SET worker_id = NULL, \
604 worker_instance_id = NULL, \
605 lease_expires_at_ms = NULL, \
606 lease_epoch = lease_epoch + 1, \
607 outcome = 'attempt_interrupted' \
608 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3",
609 )
610 .bind(part)
611 .bind(exec_uuid)
612 .bind(attempt_index_i)
613 .execute(&mut **tx)
614 .await
615 .map_err(map_sqlx_error)?;
616
617 lease_event::emit(
619 tx,
620 part,
621 exec_uuid,
622 None,
623 lease_event::EVENT_REVOKED,
624 now,
625 )
626 .await?;
627
628 let (primary_id, primary_key, primary_token) = signed[0].clone();
630 let extras: Vec<AdditionalWaitpointBinding> = signed
631 .iter()
632 .skip(1)
633 .map(|(id, key, tok)| {
634 AdditionalWaitpointBinding::new(
635 id.clone(),
636 key.clone(),
637 WaitpointHmac::new(tok.clone()),
638 )
639 })
640 .collect();
641 let details = SuspendOutcomeDetails::new(
642 args.suspension_id.clone(),
643 primary_id,
644 primary_key,
645 WaitpointHmac::new(primary_token),
646 )
647 .with_additional_waitpoints(extras);
648
649 let opaque = encode_opaque(BackendTag::Postgres, &payload);
650 let suspended_handle =
651 Handle::new(BackendTag::Postgres, HandleKind::Suspended, opaque);
652 let outcome = SuspendOutcome::Suspended {
653 details,
654 handle: suspended_handle,
655 };
656
657 if let Some(key) = idem.as_deref() {
659 let cached = outcome_to_dedup_json(&outcome);
660 sqlx::query(
661 "INSERT INTO ff_suspend_dedup \
662 (partition_key, idempotency_key, outcome_json, created_at_ms) \
663 VALUES ($1, $2, $3, $4) \
664 ON CONFLICT DO NOTHING",
665 )
666 .bind(part)
667 .bind(key)
668 .bind(&cached)
669 .bind(now)
670 .execute(&mut **tx)
671 .await
672 .map_err(map_sqlx_error)?;
673 }
674
675 Ok(outcome)
676 })
677 })
678 .await
679}
680
681pub(crate) async fn create_waitpoint_impl(
697 pool: &PgPool,
698 handle: &Handle,
699 waitpoint_key: &str,
700 expires_in: std::time::Duration,
701) -> Result<PendingWaitpoint, EngineError> {
702 let payload = decode_handle(handle)?;
703 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
704
705 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
706
707 let kid_row: Option<(String, Vec<u8>)> = sqlx::query_as(
708 "SELECT kid, secret FROM ff_waitpoint_hmac \
709 WHERE active = TRUE \
710 ORDER BY rotated_at_ms DESC LIMIT 1",
711 )
712 .fetch_optional(&mut *tx)
713 .await
714 .map_err(map_sqlx_error)?;
715 let (kid, secret) = kid_row.ok_or_else(|| EngineError::Validation {
716 kind: ValidationKind::InvalidInput,
717 detail: "ff_waitpoint_hmac empty — seed a kid before create_waitpoint".into(),
718 })?;
719
720 let wp_id = WaitpointId::new();
721 let wp_u = wp_uuid(&wp_id)?;
722 let now = now_ms();
723 let expires_ms = i64::try_from(expires_in.as_millis()).unwrap_or(i64::MAX);
733 let expires_at = now.saturating_add(expires_ms);
734 let msg = format!("{}:{}", payload.execution_id, wp_id);
735 let token = hmac_sign(&secret, &kid, msg.as_bytes());
736
737 sqlx::query(
741 "INSERT INTO ff_waitpoint_pending \
742 (partition_key, waitpoint_id, execution_id, token_kid, token, \
743 created_at_ms, expires_at_ms, waitpoint_key, state, required_signal_names) \
744 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 'pending', '{}')",
745 )
746 .bind(part)
747 .bind(wp_u)
748 .bind(exec_uuid)
749 .bind(&kid)
750 .bind(&token)
751 .bind(now)
752 .bind(expires_at)
753 .bind(waitpoint_key)
754 .execute(&mut *tx)
755 .await
756 .map_err(map_sqlx_error)?;
757
758 tx.commit().await.map_err(map_sqlx_error)?;
759
760 Ok(PendingWaitpoint::new(wp_id, WaitpointHmac::new(token)))
761}
762
763pub(crate) async fn deliver_signal_impl(
766 pool: &PgPool,
767 _partition_config: &PartitionConfig,
768 args: DeliverSignalArgs,
769) -> Result<DeliverSignalResult, EngineError> {
770 let (part, exec_uuid) = split_exec_id(&args.execution_id)?;
771 let wp_u = wp_uuid(&args.waitpoint_id)?;
772
773 run_serializable(pool, move |tx| {
774 let args = args.clone();
775 Box::pin(async move {
776 let row: Option<(String, String, String, Uuid)> = sqlx::query_as(
778 "SELECT token_kid, token, waitpoint_key, execution_id \
779 FROM ff_waitpoint_pending \
780 WHERE partition_key = $1 AND waitpoint_id = $2 \
781 FOR UPDATE",
782 )
783 .bind(part)
784 .bind(wp_u)
785 .fetch_optional(&mut **tx)
786 .await
787 .map_err(map_sqlx_error)?;
788 let (kid, stored_token, wp_key, stored_exec) = match row {
789 Some(r) => r,
790 None => return Err(EngineError::NotFound { entity: "waitpoint" }),
791 };
792 if stored_exec != exec_uuid {
793 return Err(EngineError::Validation {
794 kind: ValidationKind::InvalidInput,
795 detail: "waitpoint belongs to a different execution".into(),
796 });
797 }
798
799 let secret_row: Option<(Vec<u8>,)> = sqlx::query_as(
803 "SELECT secret FROM ff_waitpoint_hmac WHERE kid = $1",
804 )
805 .bind(&kid)
806 .fetch_optional(&mut **tx)
807 .await
808 .map_err(map_sqlx_error)?;
809 let (secret,) = secret_row.ok_or_else(|| EngineError::Validation {
810 kind: ValidationKind::InvalidInput,
811 detail: format!("kid {kid} missing from keystore"),
812 })?;
813 let presented = args.waitpoint_token.as_str();
814 let msg = format!("{}:{}", args.execution_id, args.waitpoint_id);
815 hmac_verify(&secret, &kid, msg.as_bytes(), presented).map_err(|e| {
816 EngineError::Validation {
817 kind: ValidationKind::InvalidInput,
818 detail: format!("waitpoint_token verify: {e}"),
819 }
820 })?;
821 if presented != stored_token {
822 return Err(EngineError::Validation {
823 kind: ValidationKind::InvalidInput,
824 detail: "waitpoint_token does not match minted token".into(),
825 });
826 }
827
828 let susp_row: Option<(JsonValue, JsonValue)> = sqlx::query_as(
831 "SELECT condition, member_map FROM ff_suspension_current \
832 WHERE partition_key = $1 AND execution_id = $2 \
833 FOR UPDATE",
834 )
835 .bind(part)
836 .bind(exec_uuid)
837 .fetch_optional(&mut **tx)
838 .await
839 .map_err(map_sqlx_error)?;
840 let (condition_json, mut member_map) = match susp_row {
841 Some(r) => r,
842 None => return Err(EngineError::NotFound { entity: "suspension" }),
843 };
844
845 let signal_blob = json!({
847 "signal_id": args.signal_id.to_string(),
848 "signal_name": args.signal_name,
849 "signal_category": args.signal_category,
850 "source_type": args.source_type,
851 "source_identity": args.source_identity,
852 "correlation_id": args.correlation_id.clone().unwrap_or_default(),
853 "accepted_at": args.now.0,
854 "payload_hex": args.payload.as_ref().map(hex::encode),
855 });
856 let map_obj = member_map.as_object_mut().ok_or_else(|| {
857 EngineError::Validation {
858 kind: ValidationKind::Corruption,
859 detail: "member_map not a JSON object".into(),
860 }
861 })?;
862 let entry = map_obj.entry(wp_key.clone()).or_insert_with(|| json!([]));
863 entry
864 .as_array_mut()
865 .ok_or_else(|| EngineError::Validation {
866 kind: ValidationKind::Corruption,
867 detail: "member_map[wp_key] not a JSON array".into(),
868 })?
869 .push(signal_blob);
870
871 let condition: ResumeCondition = serde_json::from_value(condition_json)
873 .map_err(|e| EngineError::Validation {
874 kind: ValidationKind::Corruption,
875 detail: format!("condition deserialize: {e}"),
876 })?;
877 let signals_by_wp: HashMap<String, Vec<ResumeSignal>> = map_obj
878 .iter()
879 .map(|(k, v)| {
880 let sigs: Vec<ResumeSignal> = v
881 .as_array()
882 .map(|arr| arr.iter().filter_map(resume_signal_from_json).collect())
883 .unwrap_or_default();
884 (k.clone(), sigs)
885 })
886 .collect();
887 let borrowed: HashMap<&str, &[ResumeSignal]> = signals_by_wp
888 .iter()
889 .map(|(k, v)| (k.as_str(), v.as_slice()))
890 .collect();
891 let satisfied = evaluate(&condition, &borrowed);
892
893 sqlx::query(
896 "UPDATE ff_suspension_current SET member_map = $1 \
897 WHERE partition_key = $2 AND execution_id = $3",
898 )
899 .bind(&member_map)
900 .bind(part)
901 .bind(exec_uuid)
902 .execute(&mut **tx)
903 .await
904 .map_err(map_sqlx_error)?;
905
906 let effect = if satisfied {
907 sqlx::query(
908 "UPDATE ff_exec_core \
909 SET public_state = 'resumable', \
910 lifecycle_phase = 'runnable', \
911 eligibility_state = 'eligible_now' \
912 WHERE partition_key = $1 AND execution_id = $2",
913 )
914 .bind(part)
915 .bind(exec_uuid)
916 .execute(&mut **tx)
917 .await
918 .map_err(map_sqlx_error)?;
919
920 sqlx::query(
921 "DELETE FROM ff_waitpoint_pending \
922 WHERE partition_key = $1 AND execution_id = $2",
923 )
924 .bind(part)
925 .bind(exec_uuid)
926 .execute(&mut **tx)
927 .await
928 .map_err(map_sqlx_error)?;
929
930 sqlx::query(
931 "INSERT INTO ff_completion_event \
932 (partition_key, execution_id, outcome, occurred_at_ms) \
933 VALUES ($1, $2, 'resumable', $3)",
934 )
935 .bind(part)
936 .bind(exec_uuid)
937 .bind(args.now.0)
938 .execute(&mut **tx)
939 .await
940 .map_err(map_sqlx_error)?;
941
942 "resume_condition_satisfied"
943 } else {
944 "appended_to_waitpoint"
945 };
946
947 let wp_id_str = args.waitpoint_id.to_string();
951 signal_event::emit(
952 tx,
953 part,
954 exec_uuid,
955 &args.signal_id.to_string(),
956 Some(wp_id_str.as_str()),
957 Some(args.source_identity.as_str()),
958 args.now.0,
959 )
960 .await?;
961
962 Ok(DeliverSignalResult::Accepted {
963 signal_id: args.signal_id.clone(),
964 effect: effect.to_owned(),
965 })
966 })
967 })
968 .await
969}
970
971fn resume_signal_from_json(v: &JsonValue) -> Option<ResumeSignal> {
972 let signal_id = SignalId::parse(v["signal_id"].as_str()?).ok()?;
973 Some(ResumeSignal {
974 signal_id,
975 signal_name: v["signal_name"].as_str()?.to_owned(),
976 signal_category: v["signal_category"].as_str().unwrap_or("").to_owned(),
977 source_type: v["source_type"].as_str().unwrap_or("").to_owned(),
978 source_identity: v["source_identity"].as_str().unwrap_or("").to_owned(),
979 correlation_id: v["correlation_id"].as_str().unwrap_or("").to_owned(),
980 accepted_at: TimestampMs::from_millis(v["accepted_at"].as_i64().unwrap_or(0)),
981 payload: v["payload_hex"].as_str().and_then(|h| hex::decode(h).ok()),
982 })
983}
984
985pub(crate) async fn claim_resumed_execution_impl(
988 pool: &PgPool,
989 _partition_config: &PartitionConfig,
990 args: ClaimResumedExecutionArgs,
991) -> Result<ClaimResumedExecutionResult, EngineError> {
992 let (part, exec_uuid) = split_exec_id(&args.execution_id)?;
993 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
994
995 let row: Option<(String, i32)> = sqlx::query_as(
996 "SELECT public_state, attempt_index FROM ff_exec_core \
997 WHERE partition_key = $1 AND execution_id = $2 \
998 FOR UPDATE",
999 )
1000 .bind(part)
1001 .bind(exec_uuid)
1002 .fetch_optional(&mut *tx)
1003 .await
1004 .map_err(map_sqlx_error)?;
1005 let (public_state, attempt_index_i) = match row {
1006 Some(r) => r,
1007 None => {
1008 tx.rollback().await.ok();
1009 return Err(EngineError::NotFound { entity: "execution" });
1010 }
1011 };
1012 if public_state != "resumable" {
1013 tx.rollback().await.ok();
1014 return Err(EngineError::Contention(
1015 ContentionKind::NotAResumedExecution,
1016 ));
1017 }
1018
1019 let now = now_ms();
1020 let lease_ttl = i64::try_from(args.lease_ttl_ms).unwrap_or(0);
1021 let new_expires = now.saturating_add(lease_ttl);
1022
1023 sqlx::query(
1024 "UPDATE ff_attempt \
1025 SET worker_id = $1, worker_instance_id = $2, \
1026 lease_epoch = lease_epoch + 1, \
1027 lease_expires_at_ms = $3, started_at_ms = $4, outcome = NULL \
1028 WHERE partition_key = $5 AND execution_id = $6 AND attempt_index = $7",
1029 )
1030 .bind(args.worker_id.as_str())
1031 .bind(args.worker_instance_id.as_str())
1032 .bind(new_expires)
1033 .bind(now)
1034 .bind(part)
1035 .bind(exec_uuid)
1036 .bind(attempt_index_i)
1037 .execute(&mut *tx)
1038 .await
1039 .map_err(map_sqlx_error)?;
1040
1041 sqlx::query(
1048 "UPDATE ff_exec_core \
1049 SET lifecycle_phase = 'active', ownership_state = 'leased', \
1050 eligibility_state = 'not_applicable', \
1051 public_state = 'running', attempt_state = 'running_attempt', \
1052 started_at_ms = COALESCE(started_at_ms, $3) \
1053 WHERE partition_key = $1 AND execution_id = $2",
1054 )
1055 .bind(part)
1056 .bind(exec_uuid)
1057 .bind(now)
1058 .execute(&mut *tx)
1059 .await
1060 .map_err(map_sqlx_error)?;
1061
1062 let epoch_row: (i64,) = sqlx::query_as(
1063 "SELECT lease_epoch FROM ff_attempt \
1064 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3",
1065 )
1066 .bind(part)
1067 .bind(exec_uuid)
1068 .bind(attempt_index_i)
1069 .fetch_one(&mut *tx)
1070 .await
1071 .map_err(map_sqlx_error)?;
1072
1073 let lease_id_str = args.lease_id.to_string();
1075 lease_event::emit(
1076 &mut tx,
1077 part,
1078 exec_uuid,
1079 Some(&lease_id_str),
1080 lease_event::EVENT_ACQUIRED,
1081 now,
1082 )
1083 .await?;
1084
1085 tx.commit().await.map_err(map_sqlx_error)?;
1086
1087 let attempt_index = AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0));
1088 let lease_epoch = LeaseEpoch(u64::try_from(epoch_row.0).unwrap_or(0));
1089 let attempt_id = AttemptId::new();
1090
1091 let payload = HandlePayload::new(
1097 args.execution_id.clone(),
1098 attempt_index,
1099 attempt_id.clone(),
1100 args.lease_id.clone(),
1101 lease_epoch,
1102 args.lease_ttl_ms,
1103 args.lane_id.clone(),
1104 args.worker_instance_id.clone(),
1105 );
1106 let handle = Handle::new(
1107 BackendTag::Postgres,
1108 HandleKind::Resumed,
1109 encode_opaque(BackendTag::Postgres, &payload),
1110 );
1111
1112 Ok(ClaimResumedExecutionResult::Claimed(
1113 ClaimedResumedExecution::new(
1114 args.execution_id.clone(),
1115 args.lease_id.clone(),
1116 lease_epoch,
1117 attempt_index,
1118 attempt_id,
1119 TimestampMs::from_millis(new_expires),
1120 handle,
1121 ),
1122 ))
1123}
1124
1125pub(crate) async fn observe_signals_impl(
1128 pool: &PgPool,
1129 handle: &Handle,
1130) -> Result<Vec<ResumeSignal>, EngineError> {
1131 let payload = decode_handle(handle)?;
1132 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
1133
1134 let row: Option<(JsonValue,)> = sqlx::query_as(
1135 "SELECT member_map FROM ff_suspension_current \
1136 WHERE partition_key = $1 AND execution_id = $2",
1137 )
1138 .bind(part)
1139 .bind(exec_uuid)
1140 .fetch_optional(pool)
1141 .await
1142 .map_err(map_sqlx_error)?;
1143
1144 let Some((member_map,)) = row else {
1145 return Ok(Vec::new());
1146 };
1147 let mut out: Vec<ResumeSignal> = Vec::new();
1148 if let Some(map) = member_map.as_object() {
1149 for (_wp_key, arr) in map {
1150 if let Some(sigs) = arr.as_array() {
1151 for v in sigs {
1152 if let Some(s) = resume_signal_from_json(v) {
1153 out.push(s);
1154 }
1155 }
1156 }
1157 }
1158 }
1159 Ok(out)
1160}
1161
1162pub(crate) async fn list_pending_waitpoints_impl(
1180 pool: &PgPool,
1181 args: ListPendingWaitpointsArgs,
1182) -> Result<ListPendingWaitpointsResult, EngineError> {
1183 const DEFAULT_LIMIT: u32 = 100;
1184 const MAX_LIMIT: u32 = 1000;
1185
1186 let (part, exec_uuid) = split_exec_id(&args.execution_id)?;
1187 let limit = args.limit.unwrap_or(DEFAULT_LIMIT).clamp(1, MAX_LIMIT) as i64;
1188 let after_uuid = match args.after.as_ref() {
1189 Some(wp) => Some(wp_uuid(wp)?),
1190 None => None,
1191 };
1192
1193 let exists: Option<(i16,)> = sqlx::query_as(
1196 "SELECT 1::smallint FROM ff_exec_core \
1197 WHERE partition_key = $1 AND execution_id = $2",
1198 )
1199 .bind(part)
1200 .bind(exec_uuid)
1201 .fetch_optional(pool)
1202 .await
1203 .map_err(map_sqlx_error)?;
1204 if exists.is_none() {
1205 return Err(EngineError::NotFound { entity: "execution" });
1206 }
1207
1208 type Row = (
1215 Uuid,
1216 String,
1217 String,
1218 Vec<String>,
1219 i64,
1220 Option<i64>,
1221 Option<i64>,
1222 String,
1223 String,
1224 );
1225 let rows: Vec<Row> = sqlx::query_as(
1226 "SELECT waitpoint_id, waitpoint_key, state, required_signal_names, \
1227 created_at_ms, activated_at_ms, expires_at_ms, token_kid, token \
1228 FROM ff_waitpoint_pending \
1229 WHERE partition_key = $1 \
1230 AND execution_id = $2 \
1231 AND state IN ('pending', 'active') \
1232 AND ($3::uuid IS NULL OR waitpoint_id > $3) \
1233 ORDER BY waitpoint_id \
1234 LIMIT $4",
1235 )
1236 .bind(part)
1237 .bind(exec_uuid)
1238 .bind(after_uuid)
1239 .bind(limit + 1)
1240 .fetch_all(pool)
1241 .await
1242 .map_err(map_sqlx_error)?;
1243
1244 let has_more = rows.len() as i64 > limit;
1245 let take_n = if has_more { limit as usize } else { rows.len() };
1246
1247 let mut entries: Vec<PendingWaitpointInfo> = Vec::with_capacity(take_n);
1248 for (wp_uid, wp_key, state, req_names, created_ms, activated_ms, expires_ms, _kid, token)
1249 in rows.into_iter().take(take_n)
1250 {
1251 let wp_id = WaitpointId::from_uuid(wp_uid);
1252 let (token_kid, token_fingerprint) = parse_waitpoint_token_kid_fp(&token);
1256 let mut info = PendingWaitpointInfo::new(
1257 wp_id,
1258 wp_key,
1259 state,
1260 TimestampMs(created_ms),
1261 args.execution_id.clone(),
1262 token_kid,
1263 token_fingerprint,
1264 );
1265 if !req_names.is_empty() {
1266 info = info.with_required_signal_names(req_names);
1267 }
1268 if let Some(ms) = activated_ms {
1269 info = info.with_activated_at(TimestampMs(ms));
1270 }
1271 if let Some(ms) = expires_ms {
1272 info = info.with_expires_at(TimestampMs(ms));
1273 }
1274 entries.push(info);
1275 }
1276
1277 let next_cursor = if has_more {
1278 entries.last().map(|e| e.waitpoint_id.clone())
1279 } else {
1280 None
1281 };
1282 let mut result = ListPendingWaitpointsResult::new(entries);
1283 if let Some(cursor) = next_cursor {
1284 result = result.with_next_cursor(cursor);
1285 }
1286 Ok(result)
1287}
1288
1289pub(crate) async fn read_waitpoint_token_impl(
1299 pool: &PgPool,
1300 partition: &ff_core::partition::PartitionKey,
1301 waitpoint_id: &WaitpointId,
1302) -> Result<Option<String>, EngineError> {
1303 let part: i16 = partition
1304 .parse()
1305 .map_err(|e| EngineError::Validation {
1306 kind: ValidationKind::InvalidInput,
1307 detail: format!("partition_key: {e}"),
1308 })?
1309 .index as i16;
1310 let wp_uuid = wp_uuid(waitpoint_id)?;
1311 let row: Option<(String,)> = sqlx::query_as(
1312 "SELECT token FROM ff_waitpoint_pending \
1313 WHERE partition_key = $1 AND waitpoint_id = $2 \
1314 LIMIT 1",
1315 )
1316 .bind(part)
1317 .bind(wp_uuid)
1318 .fetch_optional(pool)
1319 .await
1320 .map_err(map_sqlx_error)?;
1321 Ok(row.map(|(t,)| t).filter(|s| !s.is_empty()))
1322}
1323
1324pub(crate) async fn deliver_approval_signal_impl(
1339 pool: &PgPool,
1340 partition_config: &PartitionConfig,
1341 args: DeliverApprovalSignalArgs,
1342) -> Result<DeliverSignalResult, EngineError> {
1343 let (part, exec_uuid) = split_exec_id(&args.execution_id)?;
1344
1345 let lane_row: Option<(String,)> = sqlx::query_as(
1348 "SELECT lane_id FROM ff_exec_core \
1349 WHERE partition_key = $1 AND execution_id = $2",
1350 )
1351 .bind(part)
1352 .bind(exec_uuid)
1353 .fetch_optional(pool)
1354 .await
1355 .map_err(map_sqlx_error)?;
1356 let authoritative_lane = match lane_row {
1357 Some((s,)) => LaneId::new(s),
1358 None => {
1359 return Err(EngineError::NotFound {
1360 entity: "execution",
1361 });
1362 }
1363 };
1364 if authoritative_lane.as_str() != args.lane_id.as_str() {
1365 return Err(EngineError::Validation {
1366 kind: ValidationKind::InvalidInput,
1367 detail: format!(
1368 "lane_mismatch: args.lane_id={} exec_core.lane_id={}",
1369 args.lane_id.as_str(),
1370 authoritative_lane.as_str()
1371 ),
1372 });
1373 }
1374
1375 let partition = ff_core::partition::Partition {
1377 family: ff_core::partition::PartitionFamily::Execution,
1378 index: part as u16,
1379 };
1380 let pkey = ff_core::partition::PartitionKey::from(&partition);
1381 let token_str = read_waitpoint_token_impl(pool, &pkey, &args.waitpoint_id)
1382 .await?
1383 .ok_or(EngineError::NotFound {
1384 entity: "waitpoint",
1385 })?;
1386
1387 let now = TimestampMs::now();
1390 let idempotency_key =
1391 format!("approval:{}:{}", args.signal_name, args.idempotency_suffix);
1392 let ds = DeliverSignalArgs {
1393 execution_id: args.execution_id.clone(),
1394 waitpoint_id: args.waitpoint_id.clone(),
1395 signal_id: SignalId::new(),
1396 signal_name: args.signal_name.clone(),
1397 signal_category: "approval".to_owned(),
1398 source_type: "operator".to_owned(),
1399 source_identity: String::new(),
1400 payload: None,
1401 payload_encoding: None,
1402 correlation_id: None,
1403 idempotency_key: Some(idempotency_key),
1404 target_scope: "execution".to_owned(),
1405 created_at: Some(now),
1406 dedup_ttl_ms: Some(args.signal_dedup_ttl_ms),
1407 resume_delay_ms: None,
1408 max_signals_per_execution: args.max_signals_per_execution,
1409 signal_maxlen: args.maxlen,
1410 waitpoint_token: WaitpointToken::new(token_str),
1411 now,
1412 };
1413
1414 deliver_signal_impl(pool, partition_config, ds).await
1415}