1use std::collections::HashMap;
22use std::time::{SystemTime, UNIX_EPOCH};
23
24use ff_core::backend::{BackendTag, Handle, HandleKind, HandleOpaque, ResumeSignal, WaitpointHmac};
25use ff_core::contracts::{
26 AdditionalWaitpointBinding, ClaimResumedExecutionArgs, ClaimResumedExecutionResult,
27 ClaimedResumedExecution, DeliverSignalArgs, DeliverSignalResult, ResumeCondition, SuspendArgs,
28 SuspendOutcome, SuspendOutcomeDetails, WaitpointBinding,
29};
30use ff_core::engine_error::{ContentionKind, EngineError, ValidationKind};
31use ff_core::handle_codec::{encode as encode_opaque, HandlePayload};
32use ff_core::partition::PartitionConfig;
33use ff_core::types::{
34 AttemptId, AttemptIndex, ExecutionId, LeaseEpoch, SignalId, SuspensionId, TimestampMs,
35 WaitpointId,
36};
37use serde_json::{json, Value as JsonValue};
38use sqlx::{PgPool, Postgres, Transaction};
39use uuid::Uuid;
40
41use crate::error::map_sqlx_error;
42use crate::signal::{hmac_sign, hmac_verify, is_retryable_serialization, SERIALIZABLE_RETRY_BUDGET};
43use crate::suspend::evaluate;
44
45fn now_ms() -> i64 {
48 let d = SystemTime::now()
49 .duration_since(UNIX_EPOCH)
50 .unwrap_or_default();
51 (d.as_millis() as i64).max(0)
52}
53
54fn split_exec_id(eid: &ExecutionId) -> Result<(i16, Uuid), EngineError> {
55 let s = eid.as_str();
56 let rest = s.strip_prefix("{fp:").ok_or_else(|| EngineError::Validation {
57 kind: ValidationKind::InvalidInput,
58 detail: format!("execution_id missing `{{fp:` prefix: {s}"),
59 })?;
60 let close = rest.find("}:").ok_or_else(|| EngineError::Validation {
61 kind: ValidationKind::InvalidInput,
62 detail: format!("execution_id missing `}}:`: {s}"),
63 })?;
64 let part: i16 = rest[..close]
65 .parse()
66 .map_err(|_| EngineError::Validation {
67 kind: ValidationKind::InvalidInput,
68 detail: format!("execution_id partition index not u16: {s}"),
69 })?;
70 let uuid = Uuid::parse_str(&rest[close + 2..]).map_err(|_| EngineError::Validation {
71 kind: ValidationKind::InvalidInput,
72 detail: format!("execution_id UUID invalid: {s}"),
73 })?;
74 Ok((part, uuid))
75}
76
77fn decode_handle(handle: &Handle) -> Result<HandlePayload, EngineError> {
78 if handle.backend != BackendTag::Postgres {
79 return Err(EngineError::Validation {
80 kind: ValidationKind::HandleFromOtherBackend,
81 detail: format!("expected Postgres, got {:?}", handle.backend),
82 });
83 }
84 let decoded = ff_core::handle_codec::decode(&handle.opaque)?;
85 if decoded.tag != BackendTag::Postgres {
86 return Err(EngineError::Validation {
87 kind: ValidationKind::HandleFromOtherBackend,
88 detail: format!("embedded tag {:?}", decoded.tag),
89 });
90 }
91 Ok(decoded.payload)
92}
93
94fn wp_uuid(w: &WaitpointId) -> Result<Uuid, EngineError> {
95 Uuid::parse_str(&w.to_string()).map_err(|e| EngineError::Validation {
96 kind: ValidationKind::InvalidInput,
97 detail: format!("waitpoint_id not a UUID: {e}"),
98 })
99}
100
101fn susp_uuid(s: &SuspensionId) -> Result<Uuid, EngineError> {
102 Uuid::parse_str(&s.to_string()).map_err(|e| EngineError::Validation {
103 kind: ValidationKind::InvalidInput,
104 detail: format!("suspension_id not a UUID: {e}"),
105 })
106}
107
108fn is_retryable_engine(err: &EngineError) -> bool {
115 match err {
116 EngineError::Transport { source, .. } => {
117 let s = source.to_string();
118 s.contains("40001")
119 || s.contains("40P01")
120 || s.contains("serialization_failure")
121 || s.contains("deadlock_detected")
122 }
123 EngineError::Contention(ContentionKind::LeaseConflict) => true,
133 _ => false,
134 }
135}
136
137async fn run_serializable<T, F>(pool: &PgPool, mut op: F) -> Result<T, EngineError>
141where
142 T: Send,
143 F: for<'a> FnMut(
144 &'a mut Transaction<'_, Postgres>,
145 ) -> std::pin::Pin<
146 Box<dyn std::future::Future<Output = Result<T, EngineError>> + Send + 'a>,
147 > + Send,
148{
149 for _ in 0..SERIALIZABLE_RETRY_BUDGET {
150 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
151 sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
152 .execute(&mut *tx)
153 .await
154 .map_err(map_sqlx_error)?;
155 let body_res = op(&mut tx).await;
156 match body_res {
157 Ok(v) => match tx.commit().await {
158 Ok(()) => return Ok(v),
159 Err(e) if is_retryable_serialization(&e) => continue,
160 Err(e) => return Err(map_sqlx_error(e)),
161 },
162 Err(e) if is_retryable_engine(&e) => {
163 let _ = tx.rollback().await;
164 continue;
165 }
166 Err(e) => {
167 let _ = tx.rollback().await;
168 return Err(e);
169 }
170 }
171 }
172 Err(EngineError::Contention(ContentionKind::RetryExhausted))
173}
174
175fn outcome_to_dedup_json(outcome: &SuspendOutcome) -> JsonValue {
178 let details = outcome.details();
179 let extras: Vec<JsonValue> = details
180 .additional_waitpoints
181 .iter()
182 .map(|e| {
183 json!({
184 "waitpoint_id": e.waitpoint_id.to_string(),
185 "waitpoint_key": e.waitpoint_key,
186 "token": e.waitpoint_token.as_str(),
187 })
188 })
189 .collect();
190 let (variant, handle_opaque) = match outcome {
191 SuspendOutcome::Suspended { handle, .. } => {
192 ("Suspended", Some(hex::encode(handle.opaque.as_bytes())))
193 }
194 SuspendOutcome::AlreadySatisfied { .. } => ("AlreadySatisfied", None),
195 _ => ("Suspended", None),
196 };
197 json!({
198 "variant": variant,
199 "details": {
200 "suspension_id": details.suspension_id.to_string(),
201 "waitpoint_id": details.waitpoint_id.to_string(),
202 "waitpoint_key": details.waitpoint_key,
203 "token": details.waitpoint_token.as_str(),
204 "extras": extras,
205 },
206 "handle_opaque_hex": handle_opaque,
207 })
208}
209
210fn outcome_from_dedup_json(v: &JsonValue) -> Result<SuspendOutcome, EngineError> {
211 let corrupt = |s: String| EngineError::Validation {
212 kind: ValidationKind::Corruption,
213 detail: s,
214 };
215 let det = &v["details"];
216 let suspension_id = SuspensionId::parse(det["suspension_id"].as_str().unwrap_or(""))
217 .map_err(|e| corrupt(format!("dedup suspension_id: {e}")))?;
218 let waitpoint_id = WaitpointId::parse(det["waitpoint_id"].as_str().unwrap_or(""))
219 .map_err(|e| corrupt(format!("dedup waitpoint_id: {e}")))?;
220 let waitpoint_key = det["waitpoint_key"].as_str().unwrap_or("").to_owned();
221 let token = det["token"].as_str().unwrap_or("").to_owned();
222 let mut extras: Vec<AdditionalWaitpointBinding> = Vec::new();
223 if let Some(arr) = det["extras"].as_array() {
224 for e in arr {
225 let wid = WaitpointId::parse(e["waitpoint_id"].as_str().unwrap_or(""))
226 .map_err(|err| corrupt(format!("dedup extra wp_id: {err}")))?;
227 let wkey = e["waitpoint_key"].as_str().unwrap_or("").to_owned();
228 let tok = e["token"].as_str().unwrap_or("").to_owned();
229 extras.push(AdditionalWaitpointBinding::new(
230 wid,
231 wkey,
232 WaitpointHmac::new(tok),
233 ));
234 }
235 }
236 let details = SuspendOutcomeDetails::new(
237 suspension_id,
238 waitpoint_id,
239 waitpoint_key,
240 WaitpointHmac::new(token),
241 )
242 .with_additional_waitpoints(extras);
243
244 match v["variant"].as_str().unwrap_or("Suspended") {
245 "AlreadySatisfied" => Ok(SuspendOutcome::AlreadySatisfied { details }),
246 _ => {
247 let opaque_hex = v["handle_opaque_hex"].as_str().unwrap_or("");
248 let bytes = hex::decode(opaque_hex)
249 .map_err(|e| corrupt(format!("dedup handle hex: {e}")))?;
250 let opaque = HandleOpaque::new(bytes.into_boxed_slice());
251 let handle = Handle::new(BackendTag::Postgres, HandleKind::Suspended, opaque);
252 Ok(SuspendOutcome::Suspended { details, handle })
253 }
254 }
255}
256
257pub(crate) async fn suspend_impl(
260 pool: &PgPool,
261 _partition_config: &PartitionConfig,
262 handle: &Handle,
263 args: SuspendArgs,
264) -> Result<SuspendOutcome, EngineError> {
265 let payload = decode_handle(handle)?;
266 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
267 let attempt_index_i = i32::try_from(payload.attempt_index.0).unwrap_or(0);
268 let expected_epoch = payload.lease_epoch.0;
269 let idem_key = args.idempotency_key.as_ref().map(|k| k.as_str().to_owned());
270
271 run_serializable(pool, move |tx| {
272 let args = args.clone();
273 let idem = idem_key.clone();
274 let payload = payload.clone();
275 Box::pin(async move {
276 if let Some(key) = idem.as_deref() {
278 let row: Option<(JsonValue,)> = sqlx::query_as(
279 "SELECT outcome_json FROM ff_suspend_dedup \
280 WHERE partition_key = $1 AND idempotency_key = $2",
281 )
282 .bind(part)
283 .bind(key)
284 .fetch_optional(&mut **tx)
285 .await
286 .map_err(map_sqlx_error)?;
287 if let Some((cached,)) = row {
288 return outcome_from_dedup_json(&cached);
289 }
290 }
291
292 let epoch_row: Option<(i64,)> = sqlx::query_as(
294 "SELECT lease_epoch FROM ff_attempt \
295 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3 \
296 FOR UPDATE",
297 )
298 .bind(part)
299 .bind(exec_uuid)
300 .bind(attempt_index_i)
301 .fetch_optional(&mut **tx)
302 .await
303 .map_err(map_sqlx_error)?;
304 let observed_epoch: u64 = match epoch_row {
305 Some((e,)) => u64::try_from(e).unwrap_or(0),
306 None => return Err(EngineError::NotFound { entity: "attempt" }),
307 };
308 if observed_epoch != expected_epoch {
309 return Err(EngineError::Contention(ContentionKind::LeaseConflict));
310 }
311
312 let kid_row: Option<(String, Vec<u8>)> = sqlx::query_as(
314 "SELECT kid, secret FROM ff_waitpoint_hmac \
315 WHERE active = TRUE \
316 ORDER BY rotated_at_ms DESC LIMIT 1",
317 )
318 .fetch_optional(&mut **tx)
319 .await
320 .map_err(map_sqlx_error)?;
321 let (kid, secret) = kid_row.ok_or_else(|| EngineError::Validation {
322 kind: ValidationKind::InvalidInput,
323 detail: "ff_waitpoint_hmac empty — rotate a kid before suspend".into(),
324 })?;
325
326 let now = args.now.0;
328 let mut signed: Vec<(WaitpointId, String, String)> = Vec::new();
329 for binding in args.waitpoints.iter() {
330 let (wp_id, wp_key) = match binding {
331 WaitpointBinding::Fresh {
332 waitpoint_id,
333 waitpoint_key,
334 } => (waitpoint_id.clone(), waitpoint_key.clone()),
335 WaitpointBinding::UsePending { waitpoint_id } => {
336 let row: Option<(String,)> = sqlx::query_as(
337 "SELECT waitpoint_key FROM ff_waitpoint_pending \
338 WHERE partition_key = $1 AND waitpoint_id = $2",
339 )
340 .bind(part)
341 .bind(wp_uuid(waitpoint_id)?)
342 .fetch_optional(&mut **tx)
343 .await
344 .map_err(map_sqlx_error)?;
345 let wp_key = row.map(|(k,)| k).unwrap_or_default();
346 (waitpoint_id.clone(), wp_key)
347 }
348 _ => {
349 return Err(EngineError::Validation {
350 kind: ValidationKind::InvalidInput,
351 detail: "unsupported WaitpointBinding variant".into(),
352 });
353 }
354 };
355 let msg = format!("{}:{}", payload.execution_id, wp_id);
356 let token = hmac_sign(&secret, &kid, msg.as_bytes());
357 sqlx::query(
358 "INSERT INTO ff_waitpoint_pending \
359 (partition_key, waitpoint_id, execution_id, token_kid, token, \
360 created_at_ms, expires_at_ms, waitpoint_key) \
361 VALUES ($1, $2, $3, $4, $5, $6, $7, $8) \
362 ON CONFLICT (partition_key, waitpoint_id) DO UPDATE SET \
363 token_kid = EXCLUDED.token_kid, token = EXCLUDED.token, \
364 waitpoint_key = EXCLUDED.waitpoint_key",
365 )
366 .bind(part)
367 .bind(wp_uuid(&wp_id)?)
368 .bind(exec_uuid)
369 .bind(&kid)
370 .bind(&token)
371 .bind(now)
372 .bind(args.timeout_at.map(|t| t.0))
373 .bind(&wp_key)
374 .execute(&mut **tx)
375 .await
376 .map_err(map_sqlx_error)?;
377 signed.push((wp_id, wp_key, token));
378 }
379
380 let condition_json =
382 serde_json::to_value(&args.resume_condition).map_err(|e| {
383 EngineError::Validation {
384 kind: ValidationKind::Corruption,
385 detail: format!("resume_condition serialize: {e}"),
386 }
387 })?;
388 sqlx::query(
389 "INSERT INTO ff_suspension_current \
390 (partition_key, execution_id, suspension_id, suspended_at_ms, \
391 timeout_at_ms, reason_code, condition, satisfied_set, member_map, \
392 timeout_behavior) \
393 VALUES ($1, $2, $3, $4, $5, $6, $7, '[]'::jsonb, '{}'::jsonb, $8) \
394 ON CONFLICT (partition_key, execution_id) DO UPDATE SET \
395 suspension_id = EXCLUDED.suspension_id, \
396 suspended_at_ms = EXCLUDED.suspended_at_ms, \
397 timeout_at_ms = EXCLUDED.timeout_at_ms, \
398 reason_code = EXCLUDED.reason_code, \
399 condition = EXCLUDED.condition, \
400 satisfied_set = '[]'::jsonb, \
401 member_map = '{}'::jsonb, \
402 timeout_behavior = EXCLUDED.timeout_behavior",
403 )
404 .bind(part)
405 .bind(exec_uuid)
406 .bind(susp_uuid(&args.suspension_id)?)
407 .bind(now)
408 .bind(args.timeout_at.map(|t| t.0))
409 .bind(args.reason_code.as_wire_str())
410 .bind(&condition_json)
411 .bind(args.timeout_behavior.as_wire_str())
412 .execute(&mut **tx)
413 .await
414 .map_err(map_sqlx_error)?;
415
416 sqlx::query(
418 "UPDATE ff_exec_core \
419 SET lifecycle_phase = 'suspended', \
420 ownership_state = 'released', \
421 eligibility_state = 'not_applicable', \
422 public_state = 'suspended', \
423 attempt_state = 'attempt_interrupted' \
424 WHERE partition_key = $1 AND execution_id = $2",
425 )
426 .bind(part)
427 .bind(exec_uuid)
428 .execute(&mut **tx)
429 .await
430 .map_err(map_sqlx_error)?;
431
432 sqlx::query(
434 "UPDATE ff_attempt \
435 SET worker_id = NULL, \
436 worker_instance_id = NULL, \
437 lease_expires_at_ms = NULL, \
438 lease_epoch = lease_epoch + 1, \
439 outcome = 'attempt_interrupted' \
440 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3",
441 )
442 .bind(part)
443 .bind(exec_uuid)
444 .bind(attempt_index_i)
445 .execute(&mut **tx)
446 .await
447 .map_err(map_sqlx_error)?;
448
449 let (primary_id, primary_key, primary_token) = signed[0].clone();
451 let extras: Vec<AdditionalWaitpointBinding> = signed
452 .iter()
453 .skip(1)
454 .map(|(id, key, tok)| {
455 AdditionalWaitpointBinding::new(
456 id.clone(),
457 key.clone(),
458 WaitpointHmac::new(tok.clone()),
459 )
460 })
461 .collect();
462 let details = SuspendOutcomeDetails::new(
463 args.suspension_id.clone(),
464 primary_id,
465 primary_key,
466 WaitpointHmac::new(primary_token),
467 )
468 .with_additional_waitpoints(extras);
469
470 let opaque = encode_opaque(BackendTag::Postgres, &payload);
471 let suspended_handle =
472 Handle::new(BackendTag::Postgres, HandleKind::Suspended, opaque);
473 let outcome = SuspendOutcome::Suspended {
474 details,
475 handle: suspended_handle,
476 };
477
478 if let Some(key) = idem.as_deref() {
480 let cached = outcome_to_dedup_json(&outcome);
481 sqlx::query(
482 "INSERT INTO ff_suspend_dedup \
483 (partition_key, idempotency_key, outcome_json, created_at_ms) \
484 VALUES ($1, $2, $3, $4) \
485 ON CONFLICT DO NOTHING",
486 )
487 .bind(part)
488 .bind(key)
489 .bind(&cached)
490 .bind(now)
491 .execute(&mut **tx)
492 .await
493 .map_err(map_sqlx_error)?;
494 }
495
496 Ok(outcome)
497 })
498 })
499 .await
500}
501
502pub(crate) async fn deliver_signal_impl(
505 pool: &PgPool,
506 _partition_config: &PartitionConfig,
507 args: DeliverSignalArgs,
508) -> Result<DeliverSignalResult, EngineError> {
509 let (part, exec_uuid) = split_exec_id(&args.execution_id)?;
510 let wp_u = wp_uuid(&args.waitpoint_id)?;
511
512 run_serializable(pool, move |tx| {
513 let args = args.clone();
514 Box::pin(async move {
515 let row: Option<(String, String, String, Uuid)> = sqlx::query_as(
517 "SELECT token_kid, token, waitpoint_key, execution_id \
518 FROM ff_waitpoint_pending \
519 WHERE partition_key = $1 AND waitpoint_id = $2 \
520 FOR UPDATE",
521 )
522 .bind(part)
523 .bind(wp_u)
524 .fetch_optional(&mut **tx)
525 .await
526 .map_err(map_sqlx_error)?;
527 let (kid, stored_token, wp_key, stored_exec) = match row {
528 Some(r) => r,
529 None => return Err(EngineError::NotFound { entity: "waitpoint" }),
530 };
531 if stored_exec != exec_uuid {
532 return Err(EngineError::Validation {
533 kind: ValidationKind::InvalidInput,
534 detail: "waitpoint belongs to a different execution".into(),
535 });
536 }
537
538 let secret_row: Option<(Vec<u8>,)> = sqlx::query_as(
542 "SELECT secret FROM ff_waitpoint_hmac WHERE kid = $1",
543 )
544 .bind(&kid)
545 .fetch_optional(&mut **tx)
546 .await
547 .map_err(map_sqlx_error)?;
548 let (secret,) = secret_row.ok_or_else(|| EngineError::Validation {
549 kind: ValidationKind::InvalidInput,
550 detail: format!("kid {kid} missing from keystore"),
551 })?;
552 let presented = args.waitpoint_token.as_str();
553 let msg = format!("{}:{}", args.execution_id, args.waitpoint_id);
554 hmac_verify(&secret, &kid, msg.as_bytes(), presented).map_err(|e| {
555 EngineError::Validation {
556 kind: ValidationKind::InvalidInput,
557 detail: format!("waitpoint_token verify: {e}"),
558 }
559 })?;
560 if presented != stored_token {
561 return Err(EngineError::Validation {
562 kind: ValidationKind::InvalidInput,
563 detail: "waitpoint_token does not match minted token".into(),
564 });
565 }
566
567 let susp_row: Option<(JsonValue, JsonValue)> = sqlx::query_as(
570 "SELECT condition, member_map FROM ff_suspension_current \
571 WHERE partition_key = $1 AND execution_id = $2 \
572 FOR UPDATE",
573 )
574 .bind(part)
575 .bind(exec_uuid)
576 .fetch_optional(&mut **tx)
577 .await
578 .map_err(map_sqlx_error)?;
579 let (condition_json, mut member_map) = match susp_row {
580 Some(r) => r,
581 None => return Err(EngineError::NotFound { entity: "suspension" }),
582 };
583
584 let signal_blob = json!({
586 "signal_id": args.signal_id.to_string(),
587 "signal_name": args.signal_name,
588 "signal_category": args.signal_category,
589 "source_type": args.source_type,
590 "source_identity": args.source_identity,
591 "correlation_id": args.correlation_id.clone().unwrap_or_default(),
592 "accepted_at": args.now.0,
593 "payload_hex": args.payload.as_ref().map(hex::encode),
594 });
595 let map_obj = member_map.as_object_mut().ok_or_else(|| {
596 EngineError::Validation {
597 kind: ValidationKind::Corruption,
598 detail: "member_map not a JSON object".into(),
599 }
600 })?;
601 let entry = map_obj.entry(wp_key.clone()).or_insert_with(|| json!([]));
602 entry
603 .as_array_mut()
604 .ok_or_else(|| EngineError::Validation {
605 kind: ValidationKind::Corruption,
606 detail: "member_map[wp_key] not a JSON array".into(),
607 })?
608 .push(signal_blob);
609
610 let condition: ResumeCondition = serde_json::from_value(condition_json)
612 .map_err(|e| EngineError::Validation {
613 kind: ValidationKind::Corruption,
614 detail: format!("condition deserialize: {e}"),
615 })?;
616 let signals_by_wp: HashMap<String, Vec<ResumeSignal>> = map_obj
617 .iter()
618 .map(|(k, v)| {
619 let sigs: Vec<ResumeSignal> = v
620 .as_array()
621 .map(|arr| arr.iter().filter_map(resume_signal_from_json).collect())
622 .unwrap_or_default();
623 (k.clone(), sigs)
624 })
625 .collect();
626 let borrowed: HashMap<&str, &[ResumeSignal]> = signals_by_wp
627 .iter()
628 .map(|(k, v)| (k.as_str(), v.as_slice()))
629 .collect();
630 let satisfied = evaluate(&condition, &borrowed);
631
632 sqlx::query(
635 "UPDATE ff_suspension_current SET member_map = $1 \
636 WHERE partition_key = $2 AND execution_id = $3",
637 )
638 .bind(&member_map)
639 .bind(part)
640 .bind(exec_uuid)
641 .execute(&mut **tx)
642 .await
643 .map_err(map_sqlx_error)?;
644
645 let effect = if satisfied {
646 sqlx::query(
647 "UPDATE ff_exec_core \
648 SET public_state = 'resumable', \
649 lifecycle_phase = 'runnable', \
650 eligibility_state = 'eligible_now' \
651 WHERE partition_key = $1 AND execution_id = $2",
652 )
653 .bind(part)
654 .bind(exec_uuid)
655 .execute(&mut **tx)
656 .await
657 .map_err(map_sqlx_error)?;
658
659 sqlx::query(
660 "DELETE FROM ff_waitpoint_pending \
661 WHERE partition_key = $1 AND execution_id = $2",
662 )
663 .bind(part)
664 .bind(exec_uuid)
665 .execute(&mut **tx)
666 .await
667 .map_err(map_sqlx_error)?;
668
669 sqlx::query(
670 "INSERT INTO ff_completion_event \
671 (partition_key, execution_id, outcome, occurred_at_ms) \
672 VALUES ($1, $2, 'resumable', $3)",
673 )
674 .bind(part)
675 .bind(exec_uuid)
676 .bind(args.now.0)
677 .execute(&mut **tx)
678 .await
679 .map_err(map_sqlx_error)?;
680
681 "resume_condition_satisfied"
682 } else {
683 "appended_to_waitpoint"
684 };
685
686 Ok(DeliverSignalResult::Accepted {
687 signal_id: args.signal_id.clone(),
688 effect: effect.to_owned(),
689 })
690 })
691 })
692 .await
693}
694
695fn resume_signal_from_json(v: &JsonValue) -> Option<ResumeSignal> {
696 let signal_id = SignalId::parse(v["signal_id"].as_str()?).ok()?;
697 Some(ResumeSignal {
698 signal_id,
699 signal_name: v["signal_name"].as_str()?.to_owned(),
700 signal_category: v["signal_category"].as_str().unwrap_or("").to_owned(),
701 source_type: v["source_type"].as_str().unwrap_or("").to_owned(),
702 source_identity: v["source_identity"].as_str().unwrap_or("").to_owned(),
703 correlation_id: v["correlation_id"].as_str().unwrap_or("").to_owned(),
704 accepted_at: TimestampMs::from_millis(v["accepted_at"].as_i64().unwrap_or(0)),
705 payload: v["payload_hex"].as_str().and_then(|h| hex::decode(h).ok()),
706 })
707}
708
709pub(crate) async fn claim_resumed_execution_impl(
712 pool: &PgPool,
713 _partition_config: &PartitionConfig,
714 args: ClaimResumedExecutionArgs,
715) -> Result<ClaimResumedExecutionResult, EngineError> {
716 let (part, exec_uuid) = split_exec_id(&args.execution_id)?;
717 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
718
719 let row: Option<(String, i32)> = sqlx::query_as(
720 "SELECT public_state, attempt_index FROM ff_exec_core \
721 WHERE partition_key = $1 AND execution_id = $2 \
722 FOR UPDATE",
723 )
724 .bind(part)
725 .bind(exec_uuid)
726 .fetch_optional(&mut *tx)
727 .await
728 .map_err(map_sqlx_error)?;
729 let (public_state, attempt_index_i) = match row {
730 Some(r) => r,
731 None => {
732 tx.rollback().await.ok();
733 return Err(EngineError::NotFound { entity: "execution" });
734 }
735 };
736 if public_state != "resumable" {
737 tx.rollback().await.ok();
738 return Err(EngineError::Contention(
739 ContentionKind::NotAResumedExecution,
740 ));
741 }
742
743 let now = now_ms();
744 let lease_ttl = i64::try_from(args.lease_ttl_ms).unwrap_or(0);
745 let new_expires = now.saturating_add(lease_ttl);
746
747 sqlx::query(
748 "UPDATE ff_attempt \
749 SET worker_id = $1, worker_instance_id = $2, \
750 lease_epoch = lease_epoch + 1, \
751 lease_expires_at_ms = $3, started_at_ms = $4, outcome = NULL \
752 WHERE partition_key = $5 AND execution_id = $6 AND attempt_index = $7",
753 )
754 .bind(args.worker_id.as_str())
755 .bind(args.worker_instance_id.as_str())
756 .bind(new_expires)
757 .bind(now)
758 .bind(part)
759 .bind(exec_uuid)
760 .bind(attempt_index_i)
761 .execute(&mut *tx)
762 .await
763 .map_err(map_sqlx_error)?;
764
765 sqlx::query(
766 "UPDATE ff_exec_core \
767 SET lifecycle_phase = 'active', ownership_state = 'leased', \
768 eligibility_state = 'not_applicable', \
769 public_state = 'running', attempt_state = 'running_attempt' \
770 WHERE partition_key = $1 AND execution_id = $2",
771 )
772 .bind(part)
773 .bind(exec_uuid)
774 .execute(&mut *tx)
775 .await
776 .map_err(map_sqlx_error)?;
777
778 let epoch_row: (i64,) = sqlx::query_as(
779 "SELECT lease_epoch FROM ff_attempt \
780 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3",
781 )
782 .bind(part)
783 .bind(exec_uuid)
784 .bind(attempt_index_i)
785 .fetch_one(&mut *tx)
786 .await
787 .map_err(map_sqlx_error)?;
788
789 tx.commit().await.map_err(map_sqlx_error)?;
790
791 let attempt_index = AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0));
792 let lease_epoch = LeaseEpoch(u64::try_from(epoch_row.0).unwrap_or(0));
793 let attempt_id = AttemptId::new();
794
795 Ok(ClaimResumedExecutionResult::Claimed(
796 ClaimedResumedExecution {
797 execution_id: args.execution_id.clone(),
798 lease_id: args.lease_id.clone(),
799 lease_epoch,
800 attempt_index,
801 attempt_id,
802 lease_expires_at: TimestampMs::from_millis(new_expires),
803 },
804 ))
805}
806
807pub(crate) async fn observe_signals_impl(
810 pool: &PgPool,
811 handle: &Handle,
812) -> Result<Vec<ResumeSignal>, EngineError> {
813 let payload = decode_handle(handle)?;
814 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
815
816 let row: Option<(JsonValue,)> = sqlx::query_as(
817 "SELECT member_map FROM ff_suspension_current \
818 WHERE partition_key = $1 AND execution_id = $2",
819 )
820 .bind(part)
821 .bind(exec_uuid)
822 .fetch_optional(pool)
823 .await
824 .map_err(map_sqlx_error)?;
825
826 let Some((member_map,)) = row else {
827 return Ok(Vec::new());
828 };
829 let mut out: Vec<ResumeSignal> = Vec::new();
830 if let Some(map) = member_map.as_object() {
831 for (_wp_key, arr) in map {
832 if let Some(sigs) = arr.as_array() {
833 for v in sigs {
834 if let Some(s) = resume_signal_from_json(v) {
835 out.push(s);
836 }
837 }
838 }
839 }
840 }
841 Ok(out)
842}