1use std::collections::HashMap;
36use std::time::{SystemTime, UNIX_EPOCH};
37
38use ff_core::contracts::decode::build_execution_snapshot;
39use ff_core::contracts::{CreateExecutionArgs, ExecutionSnapshot, ListExecutionsPage};
40use ff_core::engine_error::{EngineError, ValidationKind};
41use ff_core::partition::{PartitionConfig, PartitionKey};
42use ff_core::types::{ExecutionId, FlowId};
43use serde_json::Value as JsonValue;
44use sqlx::{PgPool, Row};
45use uuid::Uuid;
46
47use crate::error::map_sqlx_error;
48
49fn eid_uuid(eid: &ExecutionId) -> Uuid {
54 let s = eid.as_str();
55 let suffix = s
57 .split_once("}:")
58 .map(|(_, u)| u)
59 .expect("ExecutionId has `}:` separator (invariant)");
60 Uuid::parse_str(suffix).expect("ExecutionId suffix is a valid UUID (invariant)")
61}
62
63fn eid_from_parts(partition: u16, uuid: Uuid) -> Result<ExecutionId, EngineError> {
67 let s = format!("{{fp:{partition}}}:{uuid}");
68 ExecutionId::parse(&s).map_err(|e| EngineError::Validation {
69 kind: ValidationKind::Corruption,
70 detail: format!("exec_core: execution_id: could not reassemble '{s}': {e}"),
71 })
72}
73
74fn now_ms() -> i64 {
75 let d = SystemTime::now()
76 .duration_since(UNIX_EPOCH)
77 .expect("clock is after UNIX_EPOCH");
78 (d.as_millis() as i64).max(0)
79}
80
81pub(super) async fn create_execution_impl(
90 pool: &PgPool,
91 _partition_config: &PartitionConfig,
92 args: CreateExecutionArgs,
93) -> Result<ExecutionId, EngineError> {
94 let partition_key: i16 = args.execution_id.partition() as i16;
95 let execution_id = eid_uuid(&args.execution_id);
96 let lane_id = args.lane_id.as_str().to_owned();
97 let priority: i32 = args.priority;
98 let created_at_ms: i64 = args.now.0;
99 let deadline_at_ms: Option<i64> = args.execution_deadline_at.map(|t| t.0);
100
101 let mut raw: serde_json::Map<String, JsonValue> = serde_json::Map::new();
106 raw.insert(
107 "namespace".into(),
108 JsonValue::String(args.namespace.as_str().to_owned()),
109 );
110 raw.insert("execution_kind".into(), JsonValue::String(args.execution_kind));
111 raw.insert(
112 "creator_identity".into(),
113 JsonValue::String(args.creator_identity),
114 );
115 if let Some(k) = args.idempotency_key {
116 raw.insert("idempotency_key".into(), JsonValue::String(k));
117 }
118 if let Some(enc) = args.payload_encoding {
119 raw.insert("payload_encoding".into(), JsonValue::String(enc));
120 }
121 raw.insert(
124 "last_mutation_at".into(),
125 JsonValue::String(created_at_ms.to_string()),
126 );
127 raw.insert(
128 "total_attempt_count".into(),
129 JsonValue::String("0".to_owned()),
130 );
131 let tags_json: serde_json::Map<String, JsonValue> = args
133 .tags
134 .into_iter()
135 .map(|(k, v)| (k, JsonValue::String(v)))
136 .collect();
137 raw.insert("tags".into(), JsonValue::Object(tags_json));
138
139 let raw_fields = JsonValue::Object(raw);
140 let policy_json: Option<JsonValue> = match args.policy {
141 Some(p) => Some(serde_json::to_value(&p).map_err(|e| EngineError::Validation {
142 kind: ValidationKind::InvalidInput,
143 detail: format!("create_execution: policy: serialize failed: {e}"),
144 })?),
145 None => None,
146 };
147
148 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
152
153 sqlx::query(
154 r#"
155 INSERT INTO ff_exec_core (
156 partition_key, execution_id, flow_id, lane_id,
157 required_capabilities, attempt_index,
158 lifecycle_phase, ownership_state, eligibility_state,
159 public_state, attempt_state,
160 priority, created_at_ms, deadline_at_ms,
161 payload, policy, raw_fields
162 ) VALUES (
163 $1, $2, NULL, $3,
164 '{}'::text[], 0,
165 'submitted', 'unowned', 'eligible_now',
166 'waiting', 'pending',
167 $4, $5, $6,
168 $7, $8, $9
169 )
170 ON CONFLICT (partition_key, execution_id) DO NOTHING
171 "#,
172 )
173 .bind(partition_key)
174 .bind(execution_id)
175 .bind(&lane_id)
176 .bind(priority)
177 .bind(created_at_ms)
178 .bind(deadline_at_ms)
179 .bind(&args.input_payload)
180 .bind(policy_json)
181 .bind(&raw_fields)
182 .execute(&mut *tx)
183 .await
184 .map_err(map_sqlx_error)?;
185
186 sqlx::query(
188 r#"
189 INSERT INTO ff_lane_registry (lane_id, registered_at_ms, registered_by)
190 VALUES ($1, $2, $3)
191 ON CONFLICT (lane_id) DO NOTHING
192 "#,
193 )
194 .bind(&lane_id)
195 .bind(created_at_ms)
196 .bind("create_execution")
197 .execute(&mut *tx)
198 .await
199 .map_err(map_sqlx_error)?;
200
201 tx.commit().await.map_err(map_sqlx_error)?;
202
203 Ok(args.execution_id)
204}
205
206pub(super) async fn describe_execution_impl(
209 pool: &PgPool,
210 _partition_config: &PartitionConfig,
211 id: &ExecutionId,
212) -> Result<Option<ExecutionSnapshot>, EngineError> {
213 let partition_key: i16 = id.partition() as i16;
214 let execution_id = eid_uuid(id);
215
216 let row = sqlx::query(
217 r#"
218 SELECT flow_id, lane_id, public_state, blocking_reason,
219 created_at_ms, raw_fields
220 FROM ff_exec_core
221 WHERE partition_key = $1 AND execution_id = $2
222 "#,
223 )
224 .bind(partition_key)
225 .bind(execution_id)
226 .fetch_optional(pool)
227 .await
228 .map_err(map_sqlx_error)?;
229
230 let Some(row) = row else {
231 return Ok(None);
232 };
233
234 let flow_id_uuid: Option<Uuid> = row.try_get("flow_id").map_err(map_sqlx_error)?;
235 let lane_id: String = row.try_get("lane_id").map_err(map_sqlx_error)?;
236 let public_state: String = row.try_get("public_state").map_err(map_sqlx_error)?;
237 let blocking_reason: Option<String> =
238 row.try_get("blocking_reason").map_err(map_sqlx_error)?;
239 let created_at_ms: i64 = row.try_get("created_at_ms").map_err(map_sqlx_error)?;
240 let raw_fields: JsonValue = row.try_get("raw_fields").map_err(map_sqlx_error)?;
241
242 let mut core: HashMap<String, String> = HashMap::new();
246 core.insert("public_state".into(), public_state);
247 core.insert("lane_id".into(), lane_id);
248 if let Some(fid) = flow_id_uuid {
249 core.insert(
252 "flow_id".into(),
253 format!("{{fp:{part}}}:{fid}", part = id.partition()),
254 );
255 }
256 if let Some(r) = blocking_reason {
257 core.insert("blocking_reason".into(), r);
258 }
259 core.insert("created_at".into(), created_at_ms.to_string());
260
261 if let JsonValue::Object(map) = &raw_fields {
265 for key in [
266 "namespace",
267 "last_mutation_at",
268 "total_attempt_count",
269 "current_attempt_id",
270 "current_attempt_index",
271 "current_waitpoint_id",
272 "blocking_detail",
273 ] {
274 if let Some(JsonValue::String(s)) = map.get(key) {
275 core.insert(key.to_owned(), s.clone());
276 }
277 }
278 }
279
280 let tags_raw: HashMap<String, String> = match &raw_fields {
282 JsonValue::Object(map) => match map.get("tags") {
283 Some(JsonValue::Object(tag_map)) => tag_map
284 .iter()
285 .filter_map(|(k, v)| {
286 v.as_str().map(|s| (k.clone(), s.to_owned()))
287 })
288 .collect(),
289 _ => HashMap::new(),
290 },
291 _ => HashMap::new(),
292 };
293
294 build_execution_snapshot(id.clone(), &core, tags_raw)
295}
296
297pub(super) async fn list_executions_impl(
300 pool: &PgPool,
301 _partition_config: &PartitionConfig,
302 partition: PartitionKey,
303 cursor: Option<ExecutionId>,
304 limit: usize,
305) -> Result<ListExecutionsPage, EngineError> {
306 if limit == 0 {
307 return Ok(ListExecutionsPage::new(Vec::new(), None));
308 }
309 let parsed = partition.parse().map_err(|e| EngineError::Validation {
311 kind: ValidationKind::InvalidInput,
312 detail: format!("list_executions: partition: '{partition}': {e}"),
313 })?;
314 let partition_key: i16 = parsed.index as i16;
315 let cursor_uuid: Option<Uuid> = cursor.as_ref().map(eid_uuid);
316
317 let effective_limit = limit.min(1000);
319 let fetch_limit: i64 = (effective_limit as i64).saturating_add(1);
320
321 let rows = sqlx::query(
322 r#"
323 SELECT execution_id
324 FROM ff_exec_core
325 WHERE partition_key = $1
326 AND ($2::uuid IS NULL OR execution_id > $2)
327 ORDER BY execution_id ASC
328 LIMIT $3
329 "#,
330 )
331 .bind(partition_key)
332 .bind(cursor_uuid)
333 .bind(fetch_limit)
334 .fetch_all(pool)
335 .await
336 .map_err(map_sqlx_error)?;
337
338 let mut ids: Vec<ExecutionId> = Vec::with_capacity(rows.len());
339 for row in &rows {
340 let u: Uuid = row.try_get("execution_id").map_err(map_sqlx_error)?;
341 ids.push(eid_from_parts(parsed.index, u)?);
342 }
343
344 let has_more = ids.len() > effective_limit;
345 if has_more {
346 ids.truncate(effective_limit);
347 }
348 let next_cursor = if has_more { ids.last().cloned() } else { None };
349 Ok(ListExecutionsPage::new(ids, next_cursor))
350}
351
352pub(super) async fn cancel_impl(
362 pool: &PgPool,
363 _partition_config: &PartitionConfig,
364 execution_id: &ExecutionId,
365 reason: &str,
366) -> Result<(), EngineError> {
367 let partition_key: i16 = execution_id.partition() as i16;
368 let eid_uuid = eid_uuid(execution_id);
369 let now = now_ms();
370
371 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
372
373 let current: Option<(String, String)> = sqlx::query_as(
374 r#"
375 SELECT lifecycle_phase, public_state
376 FROM ff_exec_core
377 WHERE partition_key = $1 AND execution_id = $2
378 FOR UPDATE
379 "#,
380 )
381 .bind(partition_key)
382 .bind(eid_uuid)
383 .fetch_optional(&mut *tx)
384 .await
385 .map_err(map_sqlx_error)?;
386
387 let Some((lifecycle_phase, public_state)) = current else {
388 tx.rollback().await.map_err(map_sqlx_error)?;
391 return Err(EngineError::Validation {
392 kind: ValidationKind::InvalidInput,
393 detail: format!(
394 "cancel: execution_id={execution_id}: row not found on partition_key={partition_key}"
395 ),
396 });
397 };
398
399 if lifecycle_phase == "terminal" {
402 tx.rollback().await.map_err(map_sqlx_error)?;
403 return if public_state == "cancelled" {
407 Ok(())
408 } else {
409 Err(EngineError::Validation {
410 kind: ValidationKind::InvalidInput,
411 detail: format!(
412 "cancel: execution_id={execution_id}: already terminal in state '{public_state}'"
413 ),
414 })
415 };
416 }
417
418 sqlx::query(
419 r#"
420 UPDATE ff_exec_core
421 SET lifecycle_phase = 'terminal',
422 ownership_state = 'unowned',
423 eligibility_state = 'not_applicable',
424 public_state = 'cancelled',
425 attempt_state = 'cancelled',
426 terminal_at_ms = $3,
427 cancellation_reason = $4,
428 cancelled_by = 'worker',
429 raw_fields = jsonb_set(raw_fields, '{last_mutation_at}', to_jsonb($3::text))
430 WHERE partition_key = $1 AND execution_id = $2
431 "#,
432 )
433 .bind(partition_key)
434 .bind(eid_uuid)
435 .bind(now)
436 .bind(reason)
437 .execute(&mut *tx)
438 .await
439 .map_err(map_sqlx_error)?;
440
441 tx.commit().await.map_err(map_sqlx_error)?;
442 Ok(())
443}
444
445pub(super) async fn resolve_execution_flow_id_impl(
448 pool: &PgPool,
449 _partition_config: &PartitionConfig,
450 eid: &ExecutionId,
451) -> Result<Option<FlowId>, EngineError> {
452 let partition_key: i16 = eid.partition() as i16;
453 let execution_id = eid_uuid(eid);
454
455 let row: Option<(Option<Uuid>,)> = sqlx::query_as(
456 r#"
457 SELECT flow_id
458 FROM ff_exec_core
459 WHERE partition_key = $1 AND execution_id = $2
460 "#,
461 )
462 .bind(partition_key)
463 .bind(execution_id)
464 .fetch_optional(pool)
465 .await
466 .map_err(map_sqlx_error)?;
467
468 let Some((maybe_fid,)) = row else {
469 return Ok(None);
470 };
471 let Some(fid_uuid) = maybe_fid else {
472 return Ok(None);
473 };
474 let s = fid_uuid.to_string();
475 FlowId::parse(&s)
476 .map(Some)
477 .map_err(|e| EngineError::Validation {
478 kind: ValidationKind::Corruption,
479 detail: format!(
480 "resolve_execution_flow_id: exec_core.flow_id='{s}' is not a valid FlowId: {e}"
481 ),
482 })
483}
484