1use std::collections::HashMap;
22use std::time::{SystemTime, UNIX_EPOCH};
23
24use ff_core::backend::{BackendTag, Handle, HandleKind, HandleOpaque, ResumeSignal, WaitpointHmac};
25use ff_core::contracts::{
26 AdditionalWaitpointBinding, ClaimResumedExecutionArgs, ClaimResumedExecutionResult,
27 ClaimedResumedExecution, CompositeBody, DeliverSignalArgs, DeliverSignalResult,
28 ListPendingWaitpointsArgs, ListPendingWaitpointsResult, PendingWaitpointInfo, ResumeCondition,
29 SignalMatcher, SuspendArgs, SuspendOutcome, SuspendOutcomeDetails, WaitpointBinding,
30};
31use ff_core::engine_error::{ContentionKind, EngineError, ValidationKind};
32use ff_core::handle_codec::{encode as encode_opaque, HandlePayload};
33use ff_core::partition::PartitionConfig;
34use ff_core::types::{
35 AttemptId, AttemptIndex, ExecutionId, LeaseEpoch, LeaseFence, SignalId, SuspensionId,
36 TimestampMs, WaitpointId,
37};
38use serde_json::{json, Value as JsonValue};
39use sqlx::{PgPool, Postgres, Transaction};
40use uuid::Uuid;
41
42use crate::error::map_sqlx_error;
43use crate::lease_event;
44use crate::signal::{hmac_sign, hmac_verify, is_retryable_serialization, SERIALIZABLE_RETRY_BUDGET};
45use crate::signal_event;
46use crate::suspend::evaluate;
47
48fn now_ms() -> i64 {
51 let d = SystemTime::now()
52 .duration_since(UNIX_EPOCH)
53 .unwrap_or_default();
54 (d.as_millis() as i64).max(0)
55}
56
57fn split_exec_id(eid: &ExecutionId) -> Result<(i16, Uuid), EngineError> {
58 let s = eid.as_str();
59 let rest = s.strip_prefix("{fp:").ok_or_else(|| EngineError::Validation {
60 kind: ValidationKind::InvalidInput,
61 detail: format!("execution_id missing `{{fp:` prefix: {s}"),
62 })?;
63 let close = rest.find("}:").ok_or_else(|| EngineError::Validation {
64 kind: ValidationKind::InvalidInput,
65 detail: format!("execution_id missing `}}:`: {s}"),
66 })?;
67 let part: i16 = rest[..close]
68 .parse()
69 .map_err(|_| EngineError::Validation {
70 kind: ValidationKind::InvalidInput,
71 detail: format!("execution_id partition index not u16: {s}"),
72 })?;
73 let uuid = Uuid::parse_str(&rest[close + 2..]).map_err(|_| EngineError::Validation {
74 kind: ValidationKind::InvalidInput,
75 detail: format!("execution_id UUID invalid: {s}"),
76 })?;
77 Ok((part, uuid))
78}
79
80fn decode_handle(handle: &Handle) -> Result<HandlePayload, EngineError> {
81 if handle.backend != BackendTag::Postgres {
82 return Err(EngineError::Validation {
83 kind: ValidationKind::HandleFromOtherBackend,
84 detail: format!("expected Postgres, got {:?}", handle.backend),
85 });
86 }
87 let decoded = ff_core::handle_codec::decode(&handle.opaque)?;
88 if decoded.tag != BackendTag::Postgres {
89 return Err(EngineError::Validation {
90 kind: ValidationKind::HandleFromOtherBackend,
91 detail: format!("embedded tag {:?}", decoded.tag),
92 });
93 }
94 Ok(decoded.payload)
95}
96
97fn wp_uuid(w: &WaitpointId) -> Result<Uuid, EngineError> {
98 Uuid::parse_str(&w.to_string()).map_err(|e| EngineError::Validation {
99 kind: ValidationKind::InvalidInput,
100 detail: format!("waitpoint_id not a UUID: {e}"),
101 })
102}
103
104fn susp_uuid(s: &SuspensionId) -> Result<Uuid, EngineError> {
105 Uuid::parse_str(&s.to_string()).map_err(|e| EngineError::Validation {
106 kind: ValidationKind::InvalidInput,
107 detail: format!("suspension_id not a UUID: {e}"),
108 })
109}
110
111fn derive_required_signal_names(cond: &ResumeCondition, wp_key: &str) -> Vec<String> {
129 const OPERATOR_ONLY_SENTINEL: &str = "__operator_only__";
130 const TIMEOUT_ONLY_SENTINEL: &str = "__timeout_only__";
131
132 let mut out: Vec<String> = Vec::new();
133 let mut push = |name: &str| {
134 if !name.is_empty() && !out.iter().any(|e| e == name) {
135 out.push(name.to_owned());
136 }
137 };
138 fn walk(cond: &ResumeCondition, target: &str, push: &mut dyn FnMut(&str)) {
139 match cond {
140 ResumeCondition::Single {
141 waitpoint_key,
142 matcher,
143 } => {
144 if waitpoint_key == target
145 && let SignalMatcher::ByName(name) = matcher
146 {
147 push(name.as_str());
148 }
149 }
150 ResumeCondition::OperatorOnly => push(OPERATOR_ONLY_SENTINEL),
151 ResumeCondition::TimeoutOnly => push(TIMEOUT_ONLY_SENTINEL),
152 ResumeCondition::Composite(body) => walk_body(body, target, push),
153 _ => {}
154 }
155 }
156 fn walk_body(body: &CompositeBody, target: &str, push: &mut dyn FnMut(&str)) {
157 match body {
158 CompositeBody::AllOf { members } => {
159 for m in members {
160 walk(m, target, push);
161 }
162 }
163 CompositeBody::Count {
164 matcher, waitpoints, ..
165 } => {
166 if waitpoints.iter().any(|w| w == target)
167 && let Some(SignalMatcher::ByName(name)) = matcher
168 {
169 push(name.as_str());
170 }
171 }
172 _ => {}
173 }
174 }
175 walk(cond, wp_key, &mut push);
176 out
177}
178
179fn parse_waitpoint_token_kid_fp(raw: &str) -> (String, String) {
186 match raw.split_once(':') {
187 Some((kid, hex)) if !kid.is_empty() && !hex.is_empty() => {
188 let fp_len = hex.len().min(16);
189 (kid.to_owned(), hex[..fp_len].to_owned())
190 }
191 _ => (String::new(), String::new()),
192 }
193}
194
195fn is_retryable_engine(err: &EngineError) -> bool {
202 match err {
203 EngineError::Transport { source, .. } => {
204 let s = source.to_string();
205 s.contains("40001")
206 || s.contains("40P01")
207 || s.contains("serialization_failure")
208 || s.contains("deadlock_detected")
209 }
210 EngineError::Contention(ContentionKind::LeaseConflict) => true,
220 _ => false,
221 }
222}
223
224async fn run_serializable<T, F>(pool: &PgPool, mut op: F) -> Result<T, EngineError>
228where
229 T: Send,
230 F: for<'a> FnMut(
231 &'a mut Transaction<'_, Postgres>,
232 ) -> std::pin::Pin<
233 Box<dyn std::future::Future<Output = Result<T, EngineError>> + Send + 'a>,
234 > + Send,
235{
236 for _ in 0..SERIALIZABLE_RETRY_BUDGET {
237 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
238 sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
239 .execute(&mut *tx)
240 .await
241 .map_err(map_sqlx_error)?;
242 let body_res = op(&mut tx).await;
243 match body_res {
244 Ok(v) => match tx.commit().await {
245 Ok(()) => return Ok(v),
246 Err(e) if is_retryable_serialization(&e) => continue,
247 Err(e) => return Err(map_sqlx_error(e)),
248 },
249 Err(e) if is_retryable_engine(&e) => {
250 let _ = tx.rollback().await;
251 continue;
252 }
253 Err(e) => {
254 let _ = tx.rollback().await;
255 return Err(e);
256 }
257 }
258 }
259 Err(EngineError::Contention(ContentionKind::RetryExhausted))
260}
261
262fn outcome_to_dedup_json(outcome: &SuspendOutcome) -> JsonValue {
265 let details = outcome.details();
266 let extras: Vec<JsonValue> = details
267 .additional_waitpoints
268 .iter()
269 .map(|e| {
270 json!({
271 "waitpoint_id": e.waitpoint_id.to_string(),
272 "waitpoint_key": e.waitpoint_key,
273 "token": e.waitpoint_token.as_str(),
274 })
275 })
276 .collect();
277 let (variant, handle_opaque) = match outcome {
278 SuspendOutcome::Suspended { handle, .. } => {
279 ("Suspended", Some(hex::encode(handle.opaque.as_bytes())))
280 }
281 SuspendOutcome::AlreadySatisfied { .. } => ("AlreadySatisfied", None),
282 _ => ("Suspended", None),
283 };
284 json!({
285 "variant": variant,
286 "details": {
287 "suspension_id": details.suspension_id.to_string(),
288 "waitpoint_id": details.waitpoint_id.to_string(),
289 "waitpoint_key": details.waitpoint_key,
290 "token": details.waitpoint_token.as_str(),
291 "extras": extras,
292 },
293 "handle_opaque_hex": handle_opaque,
294 })
295}
296
297fn outcome_from_dedup_json(v: &JsonValue) -> Result<SuspendOutcome, EngineError> {
298 let corrupt = |s: String| EngineError::Validation {
299 kind: ValidationKind::Corruption,
300 detail: s,
301 };
302 let det = &v["details"];
303 let suspension_id = SuspensionId::parse(det["suspension_id"].as_str().unwrap_or(""))
304 .map_err(|e| corrupt(format!("dedup suspension_id: {e}")))?;
305 let waitpoint_id = WaitpointId::parse(det["waitpoint_id"].as_str().unwrap_or(""))
306 .map_err(|e| corrupt(format!("dedup waitpoint_id: {e}")))?;
307 let waitpoint_key = det["waitpoint_key"].as_str().unwrap_or("").to_owned();
308 let token = det["token"].as_str().unwrap_or("").to_owned();
309 let mut extras: Vec<AdditionalWaitpointBinding> = Vec::new();
310 if let Some(arr) = det["extras"].as_array() {
311 for e in arr {
312 let wid = WaitpointId::parse(e["waitpoint_id"].as_str().unwrap_or(""))
313 .map_err(|err| corrupt(format!("dedup extra wp_id: {err}")))?;
314 let wkey = e["waitpoint_key"].as_str().unwrap_or("").to_owned();
315 let tok = e["token"].as_str().unwrap_or("").to_owned();
316 extras.push(AdditionalWaitpointBinding::new(
317 wid,
318 wkey,
319 WaitpointHmac::new(tok),
320 ));
321 }
322 }
323 let details = SuspendOutcomeDetails::new(
324 suspension_id,
325 waitpoint_id,
326 waitpoint_key,
327 WaitpointHmac::new(token),
328 )
329 .with_additional_waitpoints(extras);
330
331 match v["variant"].as_str().unwrap_or("Suspended") {
332 "AlreadySatisfied" => Ok(SuspendOutcome::AlreadySatisfied { details }),
333 _ => {
334 let opaque_hex = v["handle_opaque_hex"].as_str().unwrap_or("");
335 let bytes = hex::decode(opaque_hex)
336 .map_err(|e| corrupt(format!("dedup handle hex: {e}")))?;
337 let opaque = HandleOpaque::new(bytes.into_boxed_slice());
338 let handle = Handle::new(BackendTag::Postgres, HandleKind::Suspended, opaque);
339 Ok(SuspendOutcome::Suspended { details, handle })
340 }
341 }
342}
343
344pub(crate) async fn suspend_impl(
347 pool: &PgPool,
348 _partition_config: &PartitionConfig,
349 handle: &Handle,
350 args: SuspendArgs,
351) -> Result<SuspendOutcome, EngineError> {
352 let payload = decode_handle(handle)?;
353 suspend_core(pool, payload, args).await
354}
355
356pub(crate) async fn suspend_by_triple_impl(
361 pool: &PgPool,
362 _partition_config: &PartitionConfig,
363 exec_id: ExecutionId,
364 triple: LeaseFence,
365 args: SuspendArgs,
366) -> Result<SuspendOutcome, EngineError> {
367 let (part, exec_uuid) = split_exec_id(&exec_id)?;
368 let row: Option<(i32,)> = sqlx::query_as(
376 "SELECT attempt_index FROM ff_exec_core \
377 WHERE partition_key = $1 AND execution_id = $2",
378 )
379 .bind(part)
380 .bind(exec_uuid)
381 .fetch_optional(pool)
382 .await
383 .map_err(map_sqlx_error)?;
384 let attempt_index_i = match row {
385 Some((i,)) => i,
386 None => return Err(EngineError::NotFound { entity: "execution" }),
387 };
388 let attempt_index =
389 AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0));
390
391 let payload = HandlePayload::new(
396 exec_id,
397 attempt_index,
398 triple.attempt_id,
399 triple.lease_id,
400 triple.lease_epoch,
401 0,
402 ff_core::types::LaneId::new(""),
403 ff_core::types::WorkerInstanceId::new(""),
404 );
405 suspend_core(pool, payload, args).await
406}
407
408async fn suspend_core(
409 pool: &PgPool,
410 payload: HandlePayload,
411 args: SuspendArgs,
412) -> Result<SuspendOutcome, EngineError> {
413 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
414 let attempt_index_i = i32::try_from(payload.attempt_index.0).unwrap_or(0);
415 let expected_epoch = payload.lease_epoch.0;
416 let idem_key = args.idempotency_key.as_ref().map(|k| k.as_str().to_owned());
417
418 run_serializable(pool, move |tx| {
419 let args = args.clone();
420 let idem = idem_key.clone();
421 let payload = payload.clone();
422 Box::pin(async move {
423 if let Some(key) = idem.as_deref() {
425 let row: Option<(JsonValue,)> = sqlx::query_as(
426 "SELECT outcome_json FROM ff_suspend_dedup \
427 WHERE partition_key = $1 AND idempotency_key = $2",
428 )
429 .bind(part)
430 .bind(key)
431 .fetch_optional(&mut **tx)
432 .await
433 .map_err(map_sqlx_error)?;
434 if let Some((cached,)) = row {
435 return outcome_from_dedup_json(&cached);
436 }
437 }
438
439 let epoch_row: Option<(i64,)> = sqlx::query_as(
441 "SELECT lease_epoch FROM ff_attempt \
442 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3 \
443 FOR UPDATE",
444 )
445 .bind(part)
446 .bind(exec_uuid)
447 .bind(attempt_index_i)
448 .fetch_optional(&mut **tx)
449 .await
450 .map_err(map_sqlx_error)?;
451 let observed_epoch: u64 = match epoch_row {
452 Some((e,)) => u64::try_from(e).unwrap_or(0),
453 None => return Err(EngineError::NotFound { entity: "attempt" }),
454 };
455 if observed_epoch != expected_epoch {
456 return Err(EngineError::Contention(ContentionKind::LeaseConflict));
457 }
458
459 let kid_row: Option<(String, Vec<u8>)> = sqlx::query_as(
461 "SELECT kid, secret FROM ff_waitpoint_hmac \
462 WHERE active = TRUE \
463 ORDER BY rotated_at_ms DESC LIMIT 1",
464 )
465 .fetch_optional(&mut **tx)
466 .await
467 .map_err(map_sqlx_error)?;
468 let (kid, secret) = kid_row.ok_or_else(|| EngineError::Validation {
469 kind: ValidationKind::InvalidInput,
470 detail: "ff_waitpoint_hmac empty — rotate a kid before suspend".into(),
471 })?;
472
473 let now = args.now.0;
475 let mut signed: Vec<(WaitpointId, String, String)> = Vec::new();
476 for binding in args.waitpoints.iter() {
477 let (wp_id, wp_key) = match binding {
478 WaitpointBinding::Fresh {
479 waitpoint_id,
480 waitpoint_key,
481 } => (waitpoint_id.clone(), waitpoint_key.clone()),
482 WaitpointBinding::UsePending { waitpoint_id } => {
483 let row: Option<(String,)> = sqlx::query_as(
484 "SELECT waitpoint_key FROM ff_waitpoint_pending \
485 WHERE partition_key = $1 AND waitpoint_id = $2",
486 )
487 .bind(part)
488 .bind(wp_uuid(waitpoint_id)?)
489 .fetch_optional(&mut **tx)
490 .await
491 .map_err(map_sqlx_error)?;
492 let wp_key = row.map(|(k,)| k).unwrap_or_default();
493 (waitpoint_id.clone(), wp_key)
494 }
495 _ => {
496 return Err(EngineError::Validation {
497 kind: ValidationKind::InvalidInput,
498 detail: "unsupported WaitpointBinding variant".into(),
499 });
500 }
501 };
502 let msg = format!("{}:{}", payload.execution_id, wp_id);
503 let token = hmac_sign(&secret, &kid, msg.as_bytes());
504 let required_names =
516 derive_required_signal_names(&args.resume_condition, &wp_key);
517 sqlx::query(
518 "INSERT INTO ff_waitpoint_pending \
519 (partition_key, waitpoint_id, execution_id, token_kid, token, \
520 created_at_ms, expires_at_ms, waitpoint_key, \
521 state, required_signal_names, activated_at_ms) \
522 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 'active', $9, $6) \
523 ON CONFLICT (partition_key, waitpoint_id) DO UPDATE SET \
524 token_kid = EXCLUDED.token_kid, token = EXCLUDED.token, \
525 waitpoint_key = EXCLUDED.waitpoint_key, \
526 state = EXCLUDED.state, \
527 required_signal_names = EXCLUDED.required_signal_names, \
528 activated_at_ms = EXCLUDED.activated_at_ms",
529 )
530 .bind(part)
531 .bind(wp_uuid(&wp_id)?)
532 .bind(exec_uuid)
533 .bind(&kid)
534 .bind(&token)
535 .bind(now)
536 .bind(args.timeout_at.map(|t| t.0))
537 .bind(&wp_key)
538 .bind(&required_names)
539 .execute(&mut **tx)
540 .await
541 .map_err(map_sqlx_error)?;
542 signed.push((wp_id, wp_key, token));
543 }
544
545 let condition_json =
547 serde_json::to_value(&args.resume_condition).map_err(|e| {
548 EngineError::Validation {
549 kind: ValidationKind::Corruption,
550 detail: format!("resume_condition serialize: {e}"),
551 }
552 })?;
553 sqlx::query(
554 "INSERT INTO ff_suspension_current \
555 (partition_key, execution_id, suspension_id, suspended_at_ms, \
556 timeout_at_ms, reason_code, condition, satisfied_set, member_map, \
557 timeout_behavior) \
558 VALUES ($1, $2, $3, $4, $5, $6, $7, '[]'::jsonb, '{}'::jsonb, $8) \
559 ON CONFLICT (partition_key, execution_id) DO UPDATE SET \
560 suspension_id = EXCLUDED.suspension_id, \
561 suspended_at_ms = EXCLUDED.suspended_at_ms, \
562 timeout_at_ms = EXCLUDED.timeout_at_ms, \
563 reason_code = EXCLUDED.reason_code, \
564 condition = EXCLUDED.condition, \
565 satisfied_set = '[]'::jsonb, \
566 member_map = '{}'::jsonb, \
567 timeout_behavior = EXCLUDED.timeout_behavior",
568 )
569 .bind(part)
570 .bind(exec_uuid)
571 .bind(susp_uuid(&args.suspension_id)?)
572 .bind(now)
573 .bind(args.timeout_at.map(|t| t.0))
574 .bind(args.reason_code.as_wire_str())
575 .bind(&condition_json)
576 .bind(args.timeout_behavior.as_wire_str())
577 .execute(&mut **tx)
578 .await
579 .map_err(map_sqlx_error)?;
580
581 sqlx::query(
583 "UPDATE ff_exec_core \
584 SET lifecycle_phase = 'suspended', \
585 ownership_state = 'released', \
586 eligibility_state = 'not_applicable', \
587 public_state = 'suspended', \
588 attempt_state = 'attempt_interrupted' \
589 WHERE partition_key = $1 AND execution_id = $2",
590 )
591 .bind(part)
592 .bind(exec_uuid)
593 .execute(&mut **tx)
594 .await
595 .map_err(map_sqlx_error)?;
596
597 sqlx::query(
599 "UPDATE ff_attempt \
600 SET worker_id = NULL, \
601 worker_instance_id = NULL, \
602 lease_expires_at_ms = NULL, \
603 lease_epoch = lease_epoch + 1, \
604 outcome = 'attempt_interrupted' \
605 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3",
606 )
607 .bind(part)
608 .bind(exec_uuid)
609 .bind(attempt_index_i)
610 .execute(&mut **tx)
611 .await
612 .map_err(map_sqlx_error)?;
613
614 lease_event::emit(
616 tx,
617 part,
618 exec_uuid,
619 None,
620 lease_event::EVENT_REVOKED,
621 now,
622 )
623 .await?;
624
625 let (primary_id, primary_key, primary_token) = signed[0].clone();
627 let extras: Vec<AdditionalWaitpointBinding> = signed
628 .iter()
629 .skip(1)
630 .map(|(id, key, tok)| {
631 AdditionalWaitpointBinding::new(
632 id.clone(),
633 key.clone(),
634 WaitpointHmac::new(tok.clone()),
635 )
636 })
637 .collect();
638 let details = SuspendOutcomeDetails::new(
639 args.suspension_id.clone(),
640 primary_id,
641 primary_key,
642 WaitpointHmac::new(primary_token),
643 )
644 .with_additional_waitpoints(extras);
645
646 let opaque = encode_opaque(BackendTag::Postgres, &payload);
647 let suspended_handle =
648 Handle::new(BackendTag::Postgres, HandleKind::Suspended, opaque);
649 let outcome = SuspendOutcome::Suspended {
650 details,
651 handle: suspended_handle,
652 };
653
654 if let Some(key) = idem.as_deref() {
656 let cached = outcome_to_dedup_json(&outcome);
657 sqlx::query(
658 "INSERT INTO ff_suspend_dedup \
659 (partition_key, idempotency_key, outcome_json, created_at_ms) \
660 VALUES ($1, $2, $3, $4) \
661 ON CONFLICT DO NOTHING",
662 )
663 .bind(part)
664 .bind(key)
665 .bind(&cached)
666 .bind(now)
667 .execute(&mut **tx)
668 .await
669 .map_err(map_sqlx_error)?;
670 }
671
672 Ok(outcome)
673 })
674 })
675 .await
676}
677
678pub(crate) async fn deliver_signal_impl(
681 pool: &PgPool,
682 _partition_config: &PartitionConfig,
683 args: DeliverSignalArgs,
684) -> Result<DeliverSignalResult, EngineError> {
685 let (part, exec_uuid) = split_exec_id(&args.execution_id)?;
686 let wp_u = wp_uuid(&args.waitpoint_id)?;
687
688 run_serializable(pool, move |tx| {
689 let args = args.clone();
690 Box::pin(async move {
691 let row: Option<(String, String, String, Uuid)> = sqlx::query_as(
693 "SELECT token_kid, token, waitpoint_key, execution_id \
694 FROM ff_waitpoint_pending \
695 WHERE partition_key = $1 AND waitpoint_id = $2 \
696 FOR UPDATE",
697 )
698 .bind(part)
699 .bind(wp_u)
700 .fetch_optional(&mut **tx)
701 .await
702 .map_err(map_sqlx_error)?;
703 let (kid, stored_token, wp_key, stored_exec) = match row {
704 Some(r) => r,
705 None => return Err(EngineError::NotFound { entity: "waitpoint" }),
706 };
707 if stored_exec != exec_uuid {
708 return Err(EngineError::Validation {
709 kind: ValidationKind::InvalidInput,
710 detail: "waitpoint belongs to a different execution".into(),
711 });
712 }
713
714 let secret_row: Option<(Vec<u8>,)> = sqlx::query_as(
718 "SELECT secret FROM ff_waitpoint_hmac WHERE kid = $1",
719 )
720 .bind(&kid)
721 .fetch_optional(&mut **tx)
722 .await
723 .map_err(map_sqlx_error)?;
724 let (secret,) = secret_row.ok_or_else(|| EngineError::Validation {
725 kind: ValidationKind::InvalidInput,
726 detail: format!("kid {kid} missing from keystore"),
727 })?;
728 let presented = args.waitpoint_token.as_str();
729 let msg = format!("{}:{}", args.execution_id, args.waitpoint_id);
730 hmac_verify(&secret, &kid, msg.as_bytes(), presented).map_err(|e| {
731 EngineError::Validation {
732 kind: ValidationKind::InvalidInput,
733 detail: format!("waitpoint_token verify: {e}"),
734 }
735 })?;
736 if presented != stored_token {
737 return Err(EngineError::Validation {
738 kind: ValidationKind::InvalidInput,
739 detail: "waitpoint_token does not match minted token".into(),
740 });
741 }
742
743 let susp_row: Option<(JsonValue, JsonValue)> = sqlx::query_as(
746 "SELECT condition, member_map FROM ff_suspension_current \
747 WHERE partition_key = $1 AND execution_id = $2 \
748 FOR UPDATE",
749 )
750 .bind(part)
751 .bind(exec_uuid)
752 .fetch_optional(&mut **tx)
753 .await
754 .map_err(map_sqlx_error)?;
755 let (condition_json, mut member_map) = match susp_row {
756 Some(r) => r,
757 None => return Err(EngineError::NotFound { entity: "suspension" }),
758 };
759
760 let signal_blob = json!({
762 "signal_id": args.signal_id.to_string(),
763 "signal_name": args.signal_name,
764 "signal_category": args.signal_category,
765 "source_type": args.source_type,
766 "source_identity": args.source_identity,
767 "correlation_id": args.correlation_id.clone().unwrap_or_default(),
768 "accepted_at": args.now.0,
769 "payload_hex": args.payload.as_ref().map(hex::encode),
770 });
771 let map_obj = member_map.as_object_mut().ok_or_else(|| {
772 EngineError::Validation {
773 kind: ValidationKind::Corruption,
774 detail: "member_map not a JSON object".into(),
775 }
776 })?;
777 let entry = map_obj.entry(wp_key.clone()).or_insert_with(|| json!([]));
778 entry
779 .as_array_mut()
780 .ok_or_else(|| EngineError::Validation {
781 kind: ValidationKind::Corruption,
782 detail: "member_map[wp_key] not a JSON array".into(),
783 })?
784 .push(signal_blob);
785
786 let condition: ResumeCondition = serde_json::from_value(condition_json)
788 .map_err(|e| EngineError::Validation {
789 kind: ValidationKind::Corruption,
790 detail: format!("condition deserialize: {e}"),
791 })?;
792 let signals_by_wp: HashMap<String, Vec<ResumeSignal>> = map_obj
793 .iter()
794 .map(|(k, v)| {
795 let sigs: Vec<ResumeSignal> = v
796 .as_array()
797 .map(|arr| arr.iter().filter_map(resume_signal_from_json).collect())
798 .unwrap_or_default();
799 (k.clone(), sigs)
800 })
801 .collect();
802 let borrowed: HashMap<&str, &[ResumeSignal]> = signals_by_wp
803 .iter()
804 .map(|(k, v)| (k.as_str(), v.as_slice()))
805 .collect();
806 let satisfied = evaluate(&condition, &borrowed);
807
808 sqlx::query(
811 "UPDATE ff_suspension_current SET member_map = $1 \
812 WHERE partition_key = $2 AND execution_id = $3",
813 )
814 .bind(&member_map)
815 .bind(part)
816 .bind(exec_uuid)
817 .execute(&mut **tx)
818 .await
819 .map_err(map_sqlx_error)?;
820
821 let effect = if satisfied {
822 sqlx::query(
823 "UPDATE ff_exec_core \
824 SET public_state = 'resumable', \
825 lifecycle_phase = 'runnable', \
826 eligibility_state = 'eligible_now' \
827 WHERE partition_key = $1 AND execution_id = $2",
828 )
829 .bind(part)
830 .bind(exec_uuid)
831 .execute(&mut **tx)
832 .await
833 .map_err(map_sqlx_error)?;
834
835 sqlx::query(
836 "DELETE FROM ff_waitpoint_pending \
837 WHERE partition_key = $1 AND execution_id = $2",
838 )
839 .bind(part)
840 .bind(exec_uuid)
841 .execute(&mut **tx)
842 .await
843 .map_err(map_sqlx_error)?;
844
845 sqlx::query(
846 "INSERT INTO ff_completion_event \
847 (partition_key, execution_id, outcome, occurred_at_ms) \
848 VALUES ($1, $2, 'resumable', $3)",
849 )
850 .bind(part)
851 .bind(exec_uuid)
852 .bind(args.now.0)
853 .execute(&mut **tx)
854 .await
855 .map_err(map_sqlx_error)?;
856
857 "resume_condition_satisfied"
858 } else {
859 "appended_to_waitpoint"
860 };
861
862 let wp_id_str = args.waitpoint_id.to_string();
866 signal_event::emit(
867 tx,
868 part,
869 exec_uuid,
870 &args.signal_id.to_string(),
871 Some(wp_id_str.as_str()),
872 Some(args.source_identity.as_str()),
873 args.now.0,
874 )
875 .await?;
876
877 Ok(DeliverSignalResult::Accepted {
878 signal_id: args.signal_id.clone(),
879 effect: effect.to_owned(),
880 })
881 })
882 })
883 .await
884}
885
886fn resume_signal_from_json(v: &JsonValue) -> Option<ResumeSignal> {
887 let signal_id = SignalId::parse(v["signal_id"].as_str()?).ok()?;
888 Some(ResumeSignal {
889 signal_id,
890 signal_name: v["signal_name"].as_str()?.to_owned(),
891 signal_category: v["signal_category"].as_str().unwrap_or("").to_owned(),
892 source_type: v["source_type"].as_str().unwrap_or("").to_owned(),
893 source_identity: v["source_identity"].as_str().unwrap_or("").to_owned(),
894 correlation_id: v["correlation_id"].as_str().unwrap_or("").to_owned(),
895 accepted_at: TimestampMs::from_millis(v["accepted_at"].as_i64().unwrap_or(0)),
896 payload: v["payload_hex"].as_str().and_then(|h| hex::decode(h).ok()),
897 })
898}
899
900pub(crate) async fn claim_resumed_execution_impl(
903 pool: &PgPool,
904 _partition_config: &PartitionConfig,
905 args: ClaimResumedExecutionArgs,
906) -> Result<ClaimResumedExecutionResult, EngineError> {
907 let (part, exec_uuid) = split_exec_id(&args.execution_id)?;
908 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
909
910 let row: Option<(String, i32)> = sqlx::query_as(
911 "SELECT public_state, attempt_index FROM ff_exec_core \
912 WHERE partition_key = $1 AND execution_id = $2 \
913 FOR UPDATE",
914 )
915 .bind(part)
916 .bind(exec_uuid)
917 .fetch_optional(&mut *tx)
918 .await
919 .map_err(map_sqlx_error)?;
920 let (public_state, attempt_index_i) = match row {
921 Some(r) => r,
922 None => {
923 tx.rollback().await.ok();
924 return Err(EngineError::NotFound { entity: "execution" });
925 }
926 };
927 if public_state != "resumable" {
928 tx.rollback().await.ok();
929 return Err(EngineError::Contention(
930 ContentionKind::NotAResumedExecution,
931 ));
932 }
933
934 let now = now_ms();
935 let lease_ttl = i64::try_from(args.lease_ttl_ms).unwrap_or(0);
936 let new_expires = now.saturating_add(lease_ttl);
937
938 sqlx::query(
939 "UPDATE ff_attempt \
940 SET worker_id = $1, worker_instance_id = $2, \
941 lease_epoch = lease_epoch + 1, \
942 lease_expires_at_ms = $3, started_at_ms = $4, outcome = NULL \
943 WHERE partition_key = $5 AND execution_id = $6 AND attempt_index = $7",
944 )
945 .bind(args.worker_id.as_str())
946 .bind(args.worker_instance_id.as_str())
947 .bind(new_expires)
948 .bind(now)
949 .bind(part)
950 .bind(exec_uuid)
951 .bind(attempt_index_i)
952 .execute(&mut *tx)
953 .await
954 .map_err(map_sqlx_error)?;
955
956 sqlx::query(
957 "UPDATE ff_exec_core \
958 SET lifecycle_phase = 'active', ownership_state = 'leased', \
959 eligibility_state = 'not_applicable', \
960 public_state = 'running', attempt_state = 'running_attempt' \
961 WHERE partition_key = $1 AND execution_id = $2",
962 )
963 .bind(part)
964 .bind(exec_uuid)
965 .execute(&mut *tx)
966 .await
967 .map_err(map_sqlx_error)?;
968
969 let epoch_row: (i64,) = sqlx::query_as(
970 "SELECT lease_epoch FROM ff_attempt \
971 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3",
972 )
973 .bind(part)
974 .bind(exec_uuid)
975 .bind(attempt_index_i)
976 .fetch_one(&mut *tx)
977 .await
978 .map_err(map_sqlx_error)?;
979
980 let lease_id_str = args.lease_id.to_string();
982 lease_event::emit(
983 &mut tx,
984 part,
985 exec_uuid,
986 Some(&lease_id_str),
987 lease_event::EVENT_ACQUIRED,
988 now,
989 )
990 .await?;
991
992 tx.commit().await.map_err(map_sqlx_error)?;
993
994 let attempt_index = AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0));
995 let lease_epoch = LeaseEpoch(u64::try_from(epoch_row.0).unwrap_or(0));
996 let attempt_id = AttemptId::new();
997
998 Ok(ClaimResumedExecutionResult::Claimed(
999 ClaimedResumedExecution {
1000 execution_id: args.execution_id.clone(),
1001 lease_id: args.lease_id.clone(),
1002 lease_epoch,
1003 attempt_index,
1004 attempt_id,
1005 lease_expires_at: TimestampMs::from_millis(new_expires),
1006 },
1007 ))
1008}
1009
1010pub(crate) async fn observe_signals_impl(
1013 pool: &PgPool,
1014 handle: &Handle,
1015) -> Result<Vec<ResumeSignal>, EngineError> {
1016 let payload = decode_handle(handle)?;
1017 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
1018
1019 let row: Option<(JsonValue,)> = sqlx::query_as(
1020 "SELECT member_map FROM ff_suspension_current \
1021 WHERE partition_key = $1 AND execution_id = $2",
1022 )
1023 .bind(part)
1024 .bind(exec_uuid)
1025 .fetch_optional(pool)
1026 .await
1027 .map_err(map_sqlx_error)?;
1028
1029 let Some((member_map,)) = row else {
1030 return Ok(Vec::new());
1031 };
1032 let mut out: Vec<ResumeSignal> = Vec::new();
1033 if let Some(map) = member_map.as_object() {
1034 for (_wp_key, arr) in map {
1035 if let Some(sigs) = arr.as_array() {
1036 for v in sigs {
1037 if let Some(s) = resume_signal_from_json(v) {
1038 out.push(s);
1039 }
1040 }
1041 }
1042 }
1043 }
1044 Ok(out)
1045}
1046
1047pub(crate) async fn list_pending_waitpoints_impl(
1065 pool: &PgPool,
1066 args: ListPendingWaitpointsArgs,
1067) -> Result<ListPendingWaitpointsResult, EngineError> {
1068 const DEFAULT_LIMIT: u32 = 100;
1069 const MAX_LIMIT: u32 = 1000;
1070
1071 let (part, exec_uuid) = split_exec_id(&args.execution_id)?;
1072 let limit = args.limit.unwrap_or(DEFAULT_LIMIT).clamp(1, MAX_LIMIT) as i64;
1073 let after_uuid = match args.after.as_ref() {
1074 Some(wp) => Some(wp_uuid(wp)?),
1075 None => None,
1076 };
1077
1078 let exists: Option<(i16,)> = sqlx::query_as(
1081 "SELECT 1::smallint FROM ff_exec_core \
1082 WHERE partition_key = $1 AND execution_id = $2",
1083 )
1084 .bind(part)
1085 .bind(exec_uuid)
1086 .fetch_optional(pool)
1087 .await
1088 .map_err(map_sqlx_error)?;
1089 if exists.is_none() {
1090 return Err(EngineError::NotFound { entity: "execution" });
1091 }
1092
1093 type Row = (
1100 Uuid,
1101 String,
1102 String,
1103 Vec<String>,
1104 i64,
1105 Option<i64>,
1106 Option<i64>,
1107 String,
1108 String,
1109 );
1110 let rows: Vec<Row> = sqlx::query_as(
1111 "SELECT waitpoint_id, waitpoint_key, state, required_signal_names, \
1112 created_at_ms, activated_at_ms, expires_at_ms, token_kid, token \
1113 FROM ff_waitpoint_pending \
1114 WHERE partition_key = $1 \
1115 AND execution_id = $2 \
1116 AND state IN ('pending', 'active') \
1117 AND ($3::uuid IS NULL OR waitpoint_id > $3) \
1118 ORDER BY waitpoint_id \
1119 LIMIT $4",
1120 )
1121 .bind(part)
1122 .bind(exec_uuid)
1123 .bind(after_uuid)
1124 .bind(limit + 1)
1125 .fetch_all(pool)
1126 .await
1127 .map_err(map_sqlx_error)?;
1128
1129 let has_more = rows.len() as i64 > limit;
1130 let take_n = if has_more { limit as usize } else { rows.len() };
1131
1132 let mut entries: Vec<PendingWaitpointInfo> = Vec::with_capacity(take_n);
1133 for (wp_uid, wp_key, state, req_names, created_ms, activated_ms, expires_ms, _kid, token)
1134 in rows.into_iter().take(take_n)
1135 {
1136 let wp_id = WaitpointId::from_uuid(wp_uid);
1137 let (token_kid, token_fingerprint) = parse_waitpoint_token_kid_fp(&token);
1141 let mut info = PendingWaitpointInfo::new(
1142 wp_id,
1143 wp_key,
1144 state,
1145 TimestampMs(created_ms),
1146 args.execution_id.clone(),
1147 token_kid,
1148 token_fingerprint,
1149 );
1150 if !req_names.is_empty() {
1151 info = info.with_required_signal_names(req_names);
1152 }
1153 if let Some(ms) = activated_ms {
1154 info = info.with_activated_at(TimestampMs(ms));
1155 }
1156 if let Some(ms) = expires_ms {
1157 info = info.with_expires_at(TimestampMs(ms));
1158 }
1159 entries.push(info);
1160 }
1161
1162 let next_cursor = if has_more {
1163 entries.last().map(|e| e.waitpoint_id.clone())
1164 } else {
1165 None
1166 };
1167 let mut result = ListPendingWaitpointsResult::new(entries);
1168 if let Some(cursor) = next_cursor {
1169 result = result.with_next_cursor(cursor);
1170 }
1171 Ok(result)
1172}