1use std::collections::HashMap;
22use std::time::{SystemTime, UNIX_EPOCH};
23
24use ff_core::backend::{BackendTag, Handle, HandleKind, HandleOpaque, ResumeSignal, WaitpointHmac};
25use ff_core::contracts::{
26 AdditionalWaitpointBinding, ClaimResumedExecutionArgs, ClaimResumedExecutionResult,
27 ClaimedResumedExecution, DeliverSignalArgs, DeliverSignalResult, ResumeCondition, SuspendArgs,
28 SuspendOutcome, SuspendOutcomeDetails, WaitpointBinding,
29};
30use ff_core::engine_error::{ContentionKind, EngineError, ValidationKind};
31use ff_core::handle_codec::{encode as encode_opaque, HandlePayload};
32use ff_core::partition::PartitionConfig;
33use ff_core::types::{
34 AttemptId, AttemptIndex, ExecutionId, LeaseEpoch, LeaseFence, SignalId, SuspensionId,
35 TimestampMs, WaitpointId,
36};
37use serde_json::{json, Value as JsonValue};
38use sqlx::{PgPool, Postgres, Transaction};
39use uuid::Uuid;
40
41use crate::error::map_sqlx_error;
42use crate::lease_event;
43use crate::signal::{hmac_sign, hmac_verify, is_retryable_serialization, SERIALIZABLE_RETRY_BUDGET};
44use crate::signal_event;
45use crate::suspend::evaluate;
46
47fn now_ms() -> i64 {
50 let d = SystemTime::now()
51 .duration_since(UNIX_EPOCH)
52 .unwrap_or_default();
53 (d.as_millis() as i64).max(0)
54}
55
56fn split_exec_id(eid: &ExecutionId) -> Result<(i16, Uuid), EngineError> {
57 let s = eid.as_str();
58 let rest = s.strip_prefix("{fp:").ok_or_else(|| EngineError::Validation {
59 kind: ValidationKind::InvalidInput,
60 detail: format!("execution_id missing `{{fp:` prefix: {s}"),
61 })?;
62 let close = rest.find("}:").ok_or_else(|| EngineError::Validation {
63 kind: ValidationKind::InvalidInput,
64 detail: format!("execution_id missing `}}:`: {s}"),
65 })?;
66 let part: i16 = rest[..close]
67 .parse()
68 .map_err(|_| EngineError::Validation {
69 kind: ValidationKind::InvalidInput,
70 detail: format!("execution_id partition index not u16: {s}"),
71 })?;
72 let uuid = Uuid::parse_str(&rest[close + 2..]).map_err(|_| EngineError::Validation {
73 kind: ValidationKind::InvalidInput,
74 detail: format!("execution_id UUID invalid: {s}"),
75 })?;
76 Ok((part, uuid))
77}
78
79fn decode_handle(handle: &Handle) -> Result<HandlePayload, EngineError> {
80 if handle.backend != BackendTag::Postgres {
81 return Err(EngineError::Validation {
82 kind: ValidationKind::HandleFromOtherBackend,
83 detail: format!("expected Postgres, got {:?}", handle.backend),
84 });
85 }
86 let decoded = ff_core::handle_codec::decode(&handle.opaque)?;
87 if decoded.tag != BackendTag::Postgres {
88 return Err(EngineError::Validation {
89 kind: ValidationKind::HandleFromOtherBackend,
90 detail: format!("embedded tag {:?}", decoded.tag),
91 });
92 }
93 Ok(decoded.payload)
94}
95
96fn wp_uuid(w: &WaitpointId) -> Result<Uuid, EngineError> {
97 Uuid::parse_str(&w.to_string()).map_err(|e| EngineError::Validation {
98 kind: ValidationKind::InvalidInput,
99 detail: format!("waitpoint_id not a UUID: {e}"),
100 })
101}
102
103fn susp_uuid(s: &SuspensionId) -> Result<Uuid, EngineError> {
104 Uuid::parse_str(&s.to_string()).map_err(|e| EngineError::Validation {
105 kind: ValidationKind::InvalidInput,
106 detail: format!("suspension_id not a UUID: {e}"),
107 })
108}
109
110fn is_retryable_engine(err: &EngineError) -> bool {
117 match err {
118 EngineError::Transport { source, .. } => {
119 let s = source.to_string();
120 s.contains("40001")
121 || s.contains("40P01")
122 || s.contains("serialization_failure")
123 || s.contains("deadlock_detected")
124 }
125 EngineError::Contention(ContentionKind::LeaseConflict) => true,
135 _ => false,
136 }
137}
138
139async fn run_serializable<T, F>(pool: &PgPool, mut op: F) -> Result<T, EngineError>
143where
144 T: Send,
145 F: for<'a> FnMut(
146 &'a mut Transaction<'_, Postgres>,
147 ) -> std::pin::Pin<
148 Box<dyn std::future::Future<Output = Result<T, EngineError>> + Send + 'a>,
149 > + Send,
150{
151 for _ in 0..SERIALIZABLE_RETRY_BUDGET {
152 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
153 sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
154 .execute(&mut *tx)
155 .await
156 .map_err(map_sqlx_error)?;
157 let body_res = op(&mut tx).await;
158 match body_res {
159 Ok(v) => match tx.commit().await {
160 Ok(()) => return Ok(v),
161 Err(e) if is_retryable_serialization(&e) => continue,
162 Err(e) => return Err(map_sqlx_error(e)),
163 },
164 Err(e) if is_retryable_engine(&e) => {
165 let _ = tx.rollback().await;
166 continue;
167 }
168 Err(e) => {
169 let _ = tx.rollback().await;
170 return Err(e);
171 }
172 }
173 }
174 Err(EngineError::Contention(ContentionKind::RetryExhausted))
175}
176
177fn outcome_to_dedup_json(outcome: &SuspendOutcome) -> JsonValue {
180 let details = outcome.details();
181 let extras: Vec<JsonValue> = details
182 .additional_waitpoints
183 .iter()
184 .map(|e| {
185 json!({
186 "waitpoint_id": e.waitpoint_id.to_string(),
187 "waitpoint_key": e.waitpoint_key,
188 "token": e.waitpoint_token.as_str(),
189 })
190 })
191 .collect();
192 let (variant, handle_opaque) = match outcome {
193 SuspendOutcome::Suspended { handle, .. } => {
194 ("Suspended", Some(hex::encode(handle.opaque.as_bytes())))
195 }
196 SuspendOutcome::AlreadySatisfied { .. } => ("AlreadySatisfied", None),
197 _ => ("Suspended", None),
198 };
199 json!({
200 "variant": variant,
201 "details": {
202 "suspension_id": details.suspension_id.to_string(),
203 "waitpoint_id": details.waitpoint_id.to_string(),
204 "waitpoint_key": details.waitpoint_key,
205 "token": details.waitpoint_token.as_str(),
206 "extras": extras,
207 },
208 "handle_opaque_hex": handle_opaque,
209 })
210}
211
212fn outcome_from_dedup_json(v: &JsonValue) -> Result<SuspendOutcome, EngineError> {
213 let corrupt = |s: String| EngineError::Validation {
214 kind: ValidationKind::Corruption,
215 detail: s,
216 };
217 let det = &v["details"];
218 let suspension_id = SuspensionId::parse(det["suspension_id"].as_str().unwrap_or(""))
219 .map_err(|e| corrupt(format!("dedup suspension_id: {e}")))?;
220 let waitpoint_id = WaitpointId::parse(det["waitpoint_id"].as_str().unwrap_or(""))
221 .map_err(|e| corrupt(format!("dedup waitpoint_id: {e}")))?;
222 let waitpoint_key = det["waitpoint_key"].as_str().unwrap_or("").to_owned();
223 let token = det["token"].as_str().unwrap_or("").to_owned();
224 let mut extras: Vec<AdditionalWaitpointBinding> = Vec::new();
225 if let Some(arr) = det["extras"].as_array() {
226 for e in arr {
227 let wid = WaitpointId::parse(e["waitpoint_id"].as_str().unwrap_or(""))
228 .map_err(|err| corrupt(format!("dedup extra wp_id: {err}")))?;
229 let wkey = e["waitpoint_key"].as_str().unwrap_or("").to_owned();
230 let tok = e["token"].as_str().unwrap_or("").to_owned();
231 extras.push(AdditionalWaitpointBinding::new(
232 wid,
233 wkey,
234 WaitpointHmac::new(tok),
235 ));
236 }
237 }
238 let details = SuspendOutcomeDetails::new(
239 suspension_id,
240 waitpoint_id,
241 waitpoint_key,
242 WaitpointHmac::new(token),
243 )
244 .with_additional_waitpoints(extras);
245
246 match v["variant"].as_str().unwrap_or("Suspended") {
247 "AlreadySatisfied" => Ok(SuspendOutcome::AlreadySatisfied { details }),
248 _ => {
249 let opaque_hex = v["handle_opaque_hex"].as_str().unwrap_or("");
250 let bytes = hex::decode(opaque_hex)
251 .map_err(|e| corrupt(format!("dedup handle hex: {e}")))?;
252 let opaque = HandleOpaque::new(bytes.into_boxed_slice());
253 let handle = Handle::new(BackendTag::Postgres, HandleKind::Suspended, opaque);
254 Ok(SuspendOutcome::Suspended { details, handle })
255 }
256 }
257}
258
259pub(crate) async fn suspend_impl(
262 pool: &PgPool,
263 _partition_config: &PartitionConfig,
264 handle: &Handle,
265 args: SuspendArgs,
266) -> Result<SuspendOutcome, EngineError> {
267 let payload = decode_handle(handle)?;
268 suspend_core(pool, payload, args).await
269}
270
271pub(crate) async fn suspend_by_triple_impl(
276 pool: &PgPool,
277 _partition_config: &PartitionConfig,
278 exec_id: ExecutionId,
279 triple: LeaseFence,
280 args: SuspendArgs,
281) -> Result<SuspendOutcome, EngineError> {
282 let (part, exec_uuid) = split_exec_id(&exec_id)?;
283 let row: Option<(i32,)> = sqlx::query_as(
291 "SELECT attempt_index FROM ff_exec_core \
292 WHERE partition_key = $1 AND execution_id = $2",
293 )
294 .bind(part)
295 .bind(exec_uuid)
296 .fetch_optional(pool)
297 .await
298 .map_err(map_sqlx_error)?;
299 let attempt_index_i = match row {
300 Some((i,)) => i,
301 None => return Err(EngineError::NotFound { entity: "execution" }),
302 };
303 let attempt_index =
304 AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0));
305
306 let payload = HandlePayload::new(
311 exec_id,
312 attempt_index,
313 triple.attempt_id,
314 triple.lease_id,
315 triple.lease_epoch,
316 0,
317 ff_core::types::LaneId::new(""),
318 ff_core::types::WorkerInstanceId::new(""),
319 );
320 suspend_core(pool, payload, args).await
321}
322
323async fn suspend_core(
324 pool: &PgPool,
325 payload: HandlePayload,
326 args: SuspendArgs,
327) -> Result<SuspendOutcome, EngineError> {
328 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
329 let attempt_index_i = i32::try_from(payload.attempt_index.0).unwrap_or(0);
330 let expected_epoch = payload.lease_epoch.0;
331 let idem_key = args.idempotency_key.as_ref().map(|k| k.as_str().to_owned());
332
333 run_serializable(pool, move |tx| {
334 let args = args.clone();
335 let idem = idem_key.clone();
336 let payload = payload.clone();
337 Box::pin(async move {
338 if let Some(key) = idem.as_deref() {
340 let row: Option<(JsonValue,)> = sqlx::query_as(
341 "SELECT outcome_json FROM ff_suspend_dedup \
342 WHERE partition_key = $1 AND idempotency_key = $2",
343 )
344 .bind(part)
345 .bind(key)
346 .fetch_optional(&mut **tx)
347 .await
348 .map_err(map_sqlx_error)?;
349 if let Some((cached,)) = row {
350 return outcome_from_dedup_json(&cached);
351 }
352 }
353
354 let epoch_row: Option<(i64,)> = sqlx::query_as(
356 "SELECT lease_epoch FROM ff_attempt \
357 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3 \
358 FOR UPDATE",
359 )
360 .bind(part)
361 .bind(exec_uuid)
362 .bind(attempt_index_i)
363 .fetch_optional(&mut **tx)
364 .await
365 .map_err(map_sqlx_error)?;
366 let observed_epoch: u64 = match epoch_row {
367 Some((e,)) => u64::try_from(e).unwrap_or(0),
368 None => return Err(EngineError::NotFound { entity: "attempt" }),
369 };
370 if observed_epoch != expected_epoch {
371 return Err(EngineError::Contention(ContentionKind::LeaseConflict));
372 }
373
374 let kid_row: Option<(String, Vec<u8>)> = sqlx::query_as(
376 "SELECT kid, secret FROM ff_waitpoint_hmac \
377 WHERE active = TRUE \
378 ORDER BY rotated_at_ms DESC LIMIT 1",
379 )
380 .fetch_optional(&mut **tx)
381 .await
382 .map_err(map_sqlx_error)?;
383 let (kid, secret) = kid_row.ok_or_else(|| EngineError::Validation {
384 kind: ValidationKind::InvalidInput,
385 detail: "ff_waitpoint_hmac empty — rotate a kid before suspend".into(),
386 })?;
387
388 let now = args.now.0;
390 let mut signed: Vec<(WaitpointId, String, String)> = Vec::new();
391 for binding in args.waitpoints.iter() {
392 let (wp_id, wp_key) = match binding {
393 WaitpointBinding::Fresh {
394 waitpoint_id,
395 waitpoint_key,
396 } => (waitpoint_id.clone(), waitpoint_key.clone()),
397 WaitpointBinding::UsePending { waitpoint_id } => {
398 let row: Option<(String,)> = sqlx::query_as(
399 "SELECT waitpoint_key FROM ff_waitpoint_pending \
400 WHERE partition_key = $1 AND waitpoint_id = $2",
401 )
402 .bind(part)
403 .bind(wp_uuid(waitpoint_id)?)
404 .fetch_optional(&mut **tx)
405 .await
406 .map_err(map_sqlx_error)?;
407 let wp_key = row.map(|(k,)| k).unwrap_or_default();
408 (waitpoint_id.clone(), wp_key)
409 }
410 _ => {
411 return Err(EngineError::Validation {
412 kind: ValidationKind::InvalidInput,
413 detail: "unsupported WaitpointBinding variant".into(),
414 });
415 }
416 };
417 let msg = format!("{}:{}", payload.execution_id, wp_id);
418 let token = hmac_sign(&secret, &kid, msg.as_bytes());
419 sqlx::query(
420 "INSERT INTO ff_waitpoint_pending \
421 (partition_key, waitpoint_id, execution_id, token_kid, token, \
422 created_at_ms, expires_at_ms, waitpoint_key) \
423 VALUES ($1, $2, $3, $4, $5, $6, $7, $8) \
424 ON CONFLICT (partition_key, waitpoint_id) DO UPDATE SET \
425 token_kid = EXCLUDED.token_kid, token = EXCLUDED.token, \
426 waitpoint_key = EXCLUDED.waitpoint_key",
427 )
428 .bind(part)
429 .bind(wp_uuid(&wp_id)?)
430 .bind(exec_uuid)
431 .bind(&kid)
432 .bind(&token)
433 .bind(now)
434 .bind(args.timeout_at.map(|t| t.0))
435 .bind(&wp_key)
436 .execute(&mut **tx)
437 .await
438 .map_err(map_sqlx_error)?;
439 signed.push((wp_id, wp_key, token));
440 }
441
442 let condition_json =
444 serde_json::to_value(&args.resume_condition).map_err(|e| {
445 EngineError::Validation {
446 kind: ValidationKind::Corruption,
447 detail: format!("resume_condition serialize: {e}"),
448 }
449 })?;
450 sqlx::query(
451 "INSERT INTO ff_suspension_current \
452 (partition_key, execution_id, suspension_id, suspended_at_ms, \
453 timeout_at_ms, reason_code, condition, satisfied_set, member_map, \
454 timeout_behavior) \
455 VALUES ($1, $2, $3, $4, $5, $6, $7, '[]'::jsonb, '{}'::jsonb, $8) \
456 ON CONFLICT (partition_key, execution_id) DO UPDATE SET \
457 suspension_id = EXCLUDED.suspension_id, \
458 suspended_at_ms = EXCLUDED.suspended_at_ms, \
459 timeout_at_ms = EXCLUDED.timeout_at_ms, \
460 reason_code = EXCLUDED.reason_code, \
461 condition = EXCLUDED.condition, \
462 satisfied_set = '[]'::jsonb, \
463 member_map = '{}'::jsonb, \
464 timeout_behavior = EXCLUDED.timeout_behavior",
465 )
466 .bind(part)
467 .bind(exec_uuid)
468 .bind(susp_uuid(&args.suspension_id)?)
469 .bind(now)
470 .bind(args.timeout_at.map(|t| t.0))
471 .bind(args.reason_code.as_wire_str())
472 .bind(&condition_json)
473 .bind(args.timeout_behavior.as_wire_str())
474 .execute(&mut **tx)
475 .await
476 .map_err(map_sqlx_error)?;
477
478 sqlx::query(
480 "UPDATE ff_exec_core \
481 SET lifecycle_phase = 'suspended', \
482 ownership_state = 'released', \
483 eligibility_state = 'not_applicable', \
484 public_state = 'suspended', \
485 attempt_state = 'attempt_interrupted' \
486 WHERE partition_key = $1 AND execution_id = $2",
487 )
488 .bind(part)
489 .bind(exec_uuid)
490 .execute(&mut **tx)
491 .await
492 .map_err(map_sqlx_error)?;
493
494 sqlx::query(
496 "UPDATE ff_attempt \
497 SET worker_id = NULL, \
498 worker_instance_id = NULL, \
499 lease_expires_at_ms = NULL, \
500 lease_epoch = lease_epoch + 1, \
501 outcome = 'attempt_interrupted' \
502 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3",
503 )
504 .bind(part)
505 .bind(exec_uuid)
506 .bind(attempt_index_i)
507 .execute(&mut **tx)
508 .await
509 .map_err(map_sqlx_error)?;
510
511 lease_event::emit(
513 tx,
514 part,
515 exec_uuid,
516 None,
517 lease_event::EVENT_REVOKED,
518 now,
519 )
520 .await?;
521
522 let (primary_id, primary_key, primary_token) = signed[0].clone();
524 let extras: Vec<AdditionalWaitpointBinding> = signed
525 .iter()
526 .skip(1)
527 .map(|(id, key, tok)| {
528 AdditionalWaitpointBinding::new(
529 id.clone(),
530 key.clone(),
531 WaitpointHmac::new(tok.clone()),
532 )
533 })
534 .collect();
535 let details = SuspendOutcomeDetails::new(
536 args.suspension_id.clone(),
537 primary_id,
538 primary_key,
539 WaitpointHmac::new(primary_token),
540 )
541 .with_additional_waitpoints(extras);
542
543 let opaque = encode_opaque(BackendTag::Postgres, &payload);
544 let suspended_handle =
545 Handle::new(BackendTag::Postgres, HandleKind::Suspended, opaque);
546 let outcome = SuspendOutcome::Suspended {
547 details,
548 handle: suspended_handle,
549 };
550
551 if let Some(key) = idem.as_deref() {
553 let cached = outcome_to_dedup_json(&outcome);
554 sqlx::query(
555 "INSERT INTO ff_suspend_dedup \
556 (partition_key, idempotency_key, outcome_json, created_at_ms) \
557 VALUES ($1, $2, $3, $4) \
558 ON CONFLICT DO NOTHING",
559 )
560 .bind(part)
561 .bind(key)
562 .bind(&cached)
563 .bind(now)
564 .execute(&mut **tx)
565 .await
566 .map_err(map_sqlx_error)?;
567 }
568
569 Ok(outcome)
570 })
571 })
572 .await
573}
574
575pub(crate) async fn deliver_signal_impl(
578 pool: &PgPool,
579 _partition_config: &PartitionConfig,
580 args: DeliverSignalArgs,
581) -> Result<DeliverSignalResult, EngineError> {
582 let (part, exec_uuid) = split_exec_id(&args.execution_id)?;
583 let wp_u = wp_uuid(&args.waitpoint_id)?;
584
585 run_serializable(pool, move |tx| {
586 let args = args.clone();
587 Box::pin(async move {
588 let row: Option<(String, String, String, Uuid)> = sqlx::query_as(
590 "SELECT token_kid, token, waitpoint_key, execution_id \
591 FROM ff_waitpoint_pending \
592 WHERE partition_key = $1 AND waitpoint_id = $2 \
593 FOR UPDATE",
594 )
595 .bind(part)
596 .bind(wp_u)
597 .fetch_optional(&mut **tx)
598 .await
599 .map_err(map_sqlx_error)?;
600 let (kid, stored_token, wp_key, stored_exec) = match row {
601 Some(r) => r,
602 None => return Err(EngineError::NotFound { entity: "waitpoint" }),
603 };
604 if stored_exec != exec_uuid {
605 return Err(EngineError::Validation {
606 kind: ValidationKind::InvalidInput,
607 detail: "waitpoint belongs to a different execution".into(),
608 });
609 }
610
611 let secret_row: Option<(Vec<u8>,)> = sqlx::query_as(
615 "SELECT secret FROM ff_waitpoint_hmac WHERE kid = $1",
616 )
617 .bind(&kid)
618 .fetch_optional(&mut **tx)
619 .await
620 .map_err(map_sqlx_error)?;
621 let (secret,) = secret_row.ok_or_else(|| EngineError::Validation {
622 kind: ValidationKind::InvalidInput,
623 detail: format!("kid {kid} missing from keystore"),
624 })?;
625 let presented = args.waitpoint_token.as_str();
626 let msg = format!("{}:{}", args.execution_id, args.waitpoint_id);
627 hmac_verify(&secret, &kid, msg.as_bytes(), presented).map_err(|e| {
628 EngineError::Validation {
629 kind: ValidationKind::InvalidInput,
630 detail: format!("waitpoint_token verify: {e}"),
631 }
632 })?;
633 if presented != stored_token {
634 return Err(EngineError::Validation {
635 kind: ValidationKind::InvalidInput,
636 detail: "waitpoint_token does not match minted token".into(),
637 });
638 }
639
640 let susp_row: Option<(JsonValue, JsonValue)> = sqlx::query_as(
643 "SELECT condition, member_map FROM ff_suspension_current \
644 WHERE partition_key = $1 AND execution_id = $2 \
645 FOR UPDATE",
646 )
647 .bind(part)
648 .bind(exec_uuid)
649 .fetch_optional(&mut **tx)
650 .await
651 .map_err(map_sqlx_error)?;
652 let (condition_json, mut member_map) = match susp_row {
653 Some(r) => r,
654 None => return Err(EngineError::NotFound { entity: "suspension" }),
655 };
656
657 let signal_blob = json!({
659 "signal_id": args.signal_id.to_string(),
660 "signal_name": args.signal_name,
661 "signal_category": args.signal_category,
662 "source_type": args.source_type,
663 "source_identity": args.source_identity,
664 "correlation_id": args.correlation_id.clone().unwrap_or_default(),
665 "accepted_at": args.now.0,
666 "payload_hex": args.payload.as_ref().map(hex::encode),
667 });
668 let map_obj = member_map.as_object_mut().ok_or_else(|| {
669 EngineError::Validation {
670 kind: ValidationKind::Corruption,
671 detail: "member_map not a JSON object".into(),
672 }
673 })?;
674 let entry = map_obj.entry(wp_key.clone()).or_insert_with(|| json!([]));
675 entry
676 .as_array_mut()
677 .ok_or_else(|| EngineError::Validation {
678 kind: ValidationKind::Corruption,
679 detail: "member_map[wp_key] not a JSON array".into(),
680 })?
681 .push(signal_blob);
682
683 let condition: ResumeCondition = serde_json::from_value(condition_json)
685 .map_err(|e| EngineError::Validation {
686 kind: ValidationKind::Corruption,
687 detail: format!("condition deserialize: {e}"),
688 })?;
689 let signals_by_wp: HashMap<String, Vec<ResumeSignal>> = map_obj
690 .iter()
691 .map(|(k, v)| {
692 let sigs: Vec<ResumeSignal> = v
693 .as_array()
694 .map(|arr| arr.iter().filter_map(resume_signal_from_json).collect())
695 .unwrap_or_default();
696 (k.clone(), sigs)
697 })
698 .collect();
699 let borrowed: HashMap<&str, &[ResumeSignal]> = signals_by_wp
700 .iter()
701 .map(|(k, v)| (k.as_str(), v.as_slice()))
702 .collect();
703 let satisfied = evaluate(&condition, &borrowed);
704
705 sqlx::query(
708 "UPDATE ff_suspension_current SET member_map = $1 \
709 WHERE partition_key = $2 AND execution_id = $3",
710 )
711 .bind(&member_map)
712 .bind(part)
713 .bind(exec_uuid)
714 .execute(&mut **tx)
715 .await
716 .map_err(map_sqlx_error)?;
717
718 let effect = if satisfied {
719 sqlx::query(
720 "UPDATE ff_exec_core \
721 SET public_state = 'resumable', \
722 lifecycle_phase = 'runnable', \
723 eligibility_state = 'eligible_now' \
724 WHERE partition_key = $1 AND execution_id = $2",
725 )
726 .bind(part)
727 .bind(exec_uuid)
728 .execute(&mut **tx)
729 .await
730 .map_err(map_sqlx_error)?;
731
732 sqlx::query(
733 "DELETE FROM ff_waitpoint_pending \
734 WHERE partition_key = $1 AND execution_id = $2",
735 )
736 .bind(part)
737 .bind(exec_uuid)
738 .execute(&mut **tx)
739 .await
740 .map_err(map_sqlx_error)?;
741
742 sqlx::query(
743 "INSERT INTO ff_completion_event \
744 (partition_key, execution_id, outcome, occurred_at_ms) \
745 VALUES ($1, $2, 'resumable', $3)",
746 )
747 .bind(part)
748 .bind(exec_uuid)
749 .bind(args.now.0)
750 .execute(&mut **tx)
751 .await
752 .map_err(map_sqlx_error)?;
753
754 "resume_condition_satisfied"
755 } else {
756 "appended_to_waitpoint"
757 };
758
759 let wp_id_str = args.waitpoint_id.to_string();
763 signal_event::emit(
764 tx,
765 part,
766 exec_uuid,
767 &args.signal_id.to_string(),
768 Some(wp_id_str.as_str()),
769 Some(args.source_identity.as_str()),
770 args.now.0,
771 )
772 .await?;
773
774 Ok(DeliverSignalResult::Accepted {
775 signal_id: args.signal_id.clone(),
776 effect: effect.to_owned(),
777 })
778 })
779 })
780 .await
781}
782
783fn resume_signal_from_json(v: &JsonValue) -> Option<ResumeSignal> {
784 let signal_id = SignalId::parse(v["signal_id"].as_str()?).ok()?;
785 Some(ResumeSignal {
786 signal_id,
787 signal_name: v["signal_name"].as_str()?.to_owned(),
788 signal_category: v["signal_category"].as_str().unwrap_or("").to_owned(),
789 source_type: v["source_type"].as_str().unwrap_or("").to_owned(),
790 source_identity: v["source_identity"].as_str().unwrap_or("").to_owned(),
791 correlation_id: v["correlation_id"].as_str().unwrap_or("").to_owned(),
792 accepted_at: TimestampMs::from_millis(v["accepted_at"].as_i64().unwrap_or(0)),
793 payload: v["payload_hex"].as_str().and_then(|h| hex::decode(h).ok()),
794 })
795}
796
797pub(crate) async fn claim_resumed_execution_impl(
800 pool: &PgPool,
801 _partition_config: &PartitionConfig,
802 args: ClaimResumedExecutionArgs,
803) -> Result<ClaimResumedExecutionResult, EngineError> {
804 let (part, exec_uuid) = split_exec_id(&args.execution_id)?;
805 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
806
807 let row: Option<(String, i32)> = sqlx::query_as(
808 "SELECT public_state, attempt_index FROM ff_exec_core \
809 WHERE partition_key = $1 AND execution_id = $2 \
810 FOR UPDATE",
811 )
812 .bind(part)
813 .bind(exec_uuid)
814 .fetch_optional(&mut *tx)
815 .await
816 .map_err(map_sqlx_error)?;
817 let (public_state, attempt_index_i) = match row {
818 Some(r) => r,
819 None => {
820 tx.rollback().await.ok();
821 return Err(EngineError::NotFound { entity: "execution" });
822 }
823 };
824 if public_state != "resumable" {
825 tx.rollback().await.ok();
826 return Err(EngineError::Contention(
827 ContentionKind::NotAResumedExecution,
828 ));
829 }
830
831 let now = now_ms();
832 let lease_ttl = i64::try_from(args.lease_ttl_ms).unwrap_or(0);
833 let new_expires = now.saturating_add(lease_ttl);
834
835 sqlx::query(
836 "UPDATE ff_attempt \
837 SET worker_id = $1, worker_instance_id = $2, \
838 lease_epoch = lease_epoch + 1, \
839 lease_expires_at_ms = $3, started_at_ms = $4, outcome = NULL \
840 WHERE partition_key = $5 AND execution_id = $6 AND attempt_index = $7",
841 )
842 .bind(args.worker_id.as_str())
843 .bind(args.worker_instance_id.as_str())
844 .bind(new_expires)
845 .bind(now)
846 .bind(part)
847 .bind(exec_uuid)
848 .bind(attempt_index_i)
849 .execute(&mut *tx)
850 .await
851 .map_err(map_sqlx_error)?;
852
853 sqlx::query(
854 "UPDATE ff_exec_core \
855 SET lifecycle_phase = 'active', ownership_state = 'leased', \
856 eligibility_state = 'not_applicable', \
857 public_state = 'running', attempt_state = 'running_attempt' \
858 WHERE partition_key = $1 AND execution_id = $2",
859 )
860 .bind(part)
861 .bind(exec_uuid)
862 .execute(&mut *tx)
863 .await
864 .map_err(map_sqlx_error)?;
865
866 let epoch_row: (i64,) = sqlx::query_as(
867 "SELECT lease_epoch FROM ff_attempt \
868 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3",
869 )
870 .bind(part)
871 .bind(exec_uuid)
872 .bind(attempt_index_i)
873 .fetch_one(&mut *tx)
874 .await
875 .map_err(map_sqlx_error)?;
876
877 let lease_id_str = args.lease_id.to_string();
879 lease_event::emit(
880 &mut tx,
881 part,
882 exec_uuid,
883 Some(&lease_id_str),
884 lease_event::EVENT_ACQUIRED,
885 now,
886 )
887 .await?;
888
889 tx.commit().await.map_err(map_sqlx_error)?;
890
891 let attempt_index = AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0));
892 let lease_epoch = LeaseEpoch(u64::try_from(epoch_row.0).unwrap_or(0));
893 let attempt_id = AttemptId::new();
894
895 Ok(ClaimResumedExecutionResult::Claimed(
896 ClaimedResumedExecution {
897 execution_id: args.execution_id.clone(),
898 lease_id: args.lease_id.clone(),
899 lease_epoch,
900 attempt_index,
901 attempt_id,
902 lease_expires_at: TimestampMs::from_millis(new_expires),
903 },
904 ))
905}
906
907pub(crate) async fn observe_signals_impl(
910 pool: &PgPool,
911 handle: &Handle,
912) -> Result<Vec<ResumeSignal>, EngineError> {
913 let payload = decode_handle(handle)?;
914 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
915
916 let row: Option<(JsonValue,)> = sqlx::query_as(
917 "SELECT member_map FROM ff_suspension_current \
918 WHERE partition_key = $1 AND execution_id = $2",
919 )
920 .bind(part)
921 .bind(exec_uuid)
922 .fetch_optional(pool)
923 .await
924 .map_err(map_sqlx_error)?;
925
926 let Some((member_map,)) = row else {
927 return Ok(Vec::new());
928 };
929 let mut out: Vec<ResumeSignal> = Vec::new();
930 if let Some(map) = member_map.as_object() {
931 for (_wp_key, arr) in map {
932 if let Some(sigs) = arr.as_array() {
933 for v in sigs {
934 if let Some(s) = resume_signal_from_json(v) {
935 out.push(s);
936 }
937 }
938 }
939 }
940 }
941 Ok(out)
942}