1use std::path::PathBuf;
9use std::sync::Arc;
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12use async_trait::async_trait;
13use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
14use sqlx::{Row, SqlitePool};
15use uuid::Uuid;
16
17use ff_core::backend::PrepareOutcome;
18use ff_core::backend::{
19 AppendFrameOutcome, CancelFlowPolicy, CancelFlowWait, CapabilitySet, ClaimPolicy, FailOutcome,
20 FailureClass, FailureReason, Frame, FrameKind, Handle, HandleKind, LeaseRenewal, PatchKind,
21 PendingWaitpoint, ResumeToken, ResumeSignal, SUMMARY_NULL_SENTINEL, StreamMode,
22 UsageDimensions,
23};
24#[cfg(feature = "streaming")]
25use ff_core::backend::{SummaryDocument, TailVisibility};
26use ff_core::capability::{BackendIdentity, Capabilities, Supports, Version};
27use ff_core::caps::{CapabilityRequirement, matches as caps_matches};
28use ff_core::contracts::{
29 BudgetStatus, CancelFlowResult, CreateBudgetArgs, CreateBudgetResult, CreateQuotaPolicyArgs,
30 CreateQuotaPolicyResult, ExecutionContext, ExecutionSnapshot, FlowSnapshot, IssueReclaimGrantArgs,
31 IssueReclaimGrantOutcome, ReclaimExecutionArgs, ReclaimExecutionOutcome, ReportUsageAdminArgs,
32 ReportUsageResult, ResetBudgetArgs, ResetBudgetResult, RotateWaitpointHmacSecretAllArgs,
33 RotateWaitpointHmacSecretAllResult, SeedOutcome, SeedWaitpointHmacSecretArgs, SuspendArgs,
34 SuspendOutcome,
35};
36#[cfg(feature = "core")]
37use ff_core::contracts::{
38 ClaimResumedExecutionArgs, ClaimResumedExecutionResult, DeliverSignalArgs, DeliverSignalResult,
39 EdgeDependencyPolicy, EdgeDirection, EdgeSnapshot, ListExecutionsPage, ListFlowsPage,
40 ListLanesPage, ListSuspendedPage, SetEdgeGroupPolicyResult,
41};
42#[cfg(feature = "streaming")]
43use ff_core::contracts::{STREAM_READ_HARD_CAP, StreamCursor, StreamFrame, StreamFrames};
44use ff_core::engine_backend::EngineBackend;
45use ff_core::engine_error::{BackendError, ContentionKind, EngineError, ValidationKind};
46use ff_core::handle_codec::HandlePayload;
47use ff_core::types::{AttemptId, AttemptIndex, LeaseEpoch, LeaseFence, LeaseId};
48
49use crate::errors::map_sqlx_error;
50use crate::handle_codec::{decode_handle, encode_handle};
51use crate::queries::{
52 attempt as q_attempt, dispatch as q_dispatch, exec_core as q_exec, flow as q_flow,
53 flow_staging as q_flow_staging, lease as q_lease, stream as q_stream,
54};
55use crate::retry::retry_serializable;
56#[cfg(feature = "core")]
57use ff_core::partition::PartitionKey;
58#[cfg(feature = "core")]
59use ff_core::types::EdgeId;
60use ff_core::types::{BudgetId, ExecutionId, FlowId, LaneId, TimestampMs};
61
62use crate::pubsub::{OutboxEvent, PubSub};
63use crate::registry;
64#[cfg(feature = "core")]
65use ff_core::contracts::{
66 AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
67 ApplyDependencyToChildResult, CancelExecutionArgs, CancelExecutionResult, CancelFlowArgs,
68 CancelFlowHeader, ChangePriorityArgs, ChangePriorityResult, CreateExecutionArgs,
69 CreateExecutionResult, CreateFlowArgs, CreateFlowResult, ExecutionInfo,
70 ListPendingWaitpointsArgs, ListPendingWaitpointsResult, ReplayExecutionArgs,
71 ReplayExecutionResult, RevokeLeaseArgs, RevokeLeaseResult, StageDependencyEdgeArgs,
72 StageDependencyEdgeResult,
73};
74#[cfg(feature = "core")]
75use ff_core::state::PublicState;
76use tokio::sync::broadcast;
77
78#[inline]
81fn unavailable<T>(op: &'static str) -> Result<T, EngineError> {
82 Err(EngineError::Unavailable { op })
83}
84
85#[derive(Clone, Copy, Debug)]
93pub(crate) enum OutboxChannel {
94 LeaseHistory,
95 Completion,
96 #[allow(dead_code)] SignalDelivery,
98 StreamFrame,
99 #[allow(dead_code)] OperatorEvent,
101}
102
103pub(crate) type PendingEmit = (OutboxChannel, OutboxEvent);
105
106fn dispatch_pending_emits(pubsub: &PubSub, emits: &[PendingEmit]) {
110 for (channel, ev) in emits {
111 let sender: &broadcast::Sender<OutboxEvent> = match channel {
112 OutboxChannel::LeaseHistory => &pubsub.lease_history,
113 OutboxChannel::Completion => &pubsub.completion,
114 OutboxChannel::SignalDelivery => &pubsub.signal_delivery,
115 OutboxChannel::StreamFrame => &pubsub.stream_frame,
116 OutboxChannel::OperatorEvent => &pubsub.operator_event,
117 };
118 PubSub::emit(sender, ev.clone());
119 }
120}
121
122async fn last_outbox_event(
127 conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
128 partition_key: i64,
129) -> Result<OutboxEvent, EngineError> {
130 let event_id: i64 = sqlx::query_scalar("SELECT last_insert_rowid()")
131 .fetch_one(&mut **conn)
132 .await
133 .map_err(map_sqlx_error)?;
134 Ok(OutboxEvent {
135 event_id,
136 partition_key,
137 })
138}
139
140fn is_memory_uri(path: &str) -> bool {
162 if path == ":memory:" || path.starts_with("file::memory:") {
163 return true;
164 }
165 if !path.starts_with("file:") {
166 return false;
167 }
168 let Some(query_start) = path.find('?') else {
175 return false;
176 };
177 let query = &path[query_start + 1..];
178 let query = query.split('#').next().unwrap_or("");
180 query.split('&').any(|kv| kv == "mode=memory")
181}
182
183fn now_ms() -> i64 {
187 i64::try_from(
188 SystemTime::now()
189 .duration_since(UNIX_EPOCH)
190 .map(|d| d.as_millis())
191 .unwrap_or(0),
192 )
193 .unwrap_or(i64::MAX)
194}
195
196pub(crate) fn split_exec_id(eid: &ff_core::types::ExecutionId) -> Result<(i64, Uuid), EngineError> {
200 let s = eid.as_str();
201 let rest = s
202 .strip_prefix("{fp:")
203 .ok_or_else(|| EngineError::Validation {
204 kind: ValidationKind::InvalidInput,
205 detail: format!("execution_id missing `{{fp:` prefix: {s}"),
206 })?;
207 let close = rest.find("}:").ok_or_else(|| EngineError::Validation {
208 kind: ValidationKind::InvalidInput,
209 detail: format!("execution_id missing `}}:`: {s}"),
210 })?;
211 let part: i64 = rest[..close].parse().map_err(|_| EngineError::Validation {
212 kind: ValidationKind::InvalidInput,
213 detail: format!("execution_id partition index not u16: {s}"),
214 })?;
215 let uuid = Uuid::parse_str(&rest[close + 2..]).map_err(|_| EngineError::Validation {
216 kind: ValidationKind::InvalidInput,
217 detail: format!("execution_id UUID invalid: {s}"),
218 })?;
219 Ok((part, uuid))
220}
221
222pub(crate) async fn begin_immediate(
237 pool: &SqlitePool,
238) -> Result<sqlx::pool::PoolConnection<sqlx::Sqlite>, EngineError> {
239 let mut conn = pool.acquire().await.map_err(map_sqlx_error)?;
240 sqlx::query("BEGIN IMMEDIATE")
241 .execute(&mut *conn)
242 .await
243 .map_err(map_sqlx_error)?;
244 Ok(conn)
245}
246
247pub(crate) async fn commit_or_rollback(
254 conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
255) -> Result<(), EngineError> {
256 if let Err(e) = sqlx::query("COMMIT")
257 .execute(&mut **conn)
258 .await
259 .map_err(map_sqlx_error)
260 {
261 let _ = sqlx::query("ROLLBACK").execute(&mut **conn).await;
262 return Err(e);
263 }
264 Ok(())
265}
266
267pub(crate) async fn rollback_quiet(conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>) {
270 let _ = sqlx::query("ROLLBACK").execute(&mut **conn).await;
271}
272
273async fn fence_check(
278 conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
279 part: i64,
280 exec_uuid: Uuid,
281 attempt_index: i64,
282 expected_epoch: u64,
283) -> Result<(), EngineError> {
284 let row = sqlx::query(q_attempt::SELECT_ATTEMPT_EPOCH_SQL)
285 .bind(part)
286 .bind(exec_uuid)
287 .bind(attempt_index)
288 .fetch_optional(&mut **conn)
289 .await
290 .map_err(map_sqlx_error)?;
291 let Some(row) = row else {
292 return Err(EngineError::NotFound { entity: "attempt" });
293 };
294 let epoch_i: i64 = row.try_get("lease_epoch").map_err(map_sqlx_error)?;
295 let observed = u64::try_from(epoch_i).unwrap_or(0);
296 if observed != expected_epoch {
297 return Err(EngineError::Contention(ContentionKind::LeaseConflict));
298 }
299 Ok(())
300}
301
302async fn claim_impl(
305 pool: &SqlitePool,
306 pubsub: &PubSub,
307 lane: &ff_core::types::LaneId,
308 capabilities: &CapabilitySet,
309 policy: &ClaimPolicy,
310) -> Result<Option<Handle>, EngineError> {
311 let part: i64 = 0;
315
316 let mut conn = begin_immediate(pool).await?;
317 let result = claim_inner(&mut conn, part, lane, capabilities, policy).await;
318 match result {
319 Ok(Some((handle, emits))) => {
320 commit_or_rollback(&mut conn).await?;
321 dispatch_pending_emits(pubsub, &emits);
322 Ok(Some(handle))
323 }
324 Ok(None) => {
325 rollback_quiet(&mut conn).await;
326 Ok(None)
327 }
328 Err(e) => {
329 rollback_quiet(&mut conn).await;
330 Err(e)
331 }
332 }
333}
334
335async fn claim_inner(
339 conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
340 part: i64,
341 lane: &ff_core::types::LaneId,
342 capabilities: &CapabilitySet,
343 policy: &ClaimPolicy,
344) -> Result<Option<(Handle, Vec<PendingEmit>)>, EngineError> {
345 const CAP_SCAN_BATCH: i64 = 16;
353
354 let candidate_rows = sqlx::query(q_attempt::SELECT_ELIGIBLE_EXEC_SQL)
355 .bind(part)
356 .bind(lane.as_str())
357 .bind(CAP_SCAN_BATCH)
358 .fetch_all(&mut **conn)
359 .await
360 .map_err(map_sqlx_error)?;
361
362 if candidate_rows.is_empty() {
363 return Ok(None);
364 }
365
366 let mut claimable: Option<(Uuid, i64)> = None;
367 for row in &candidate_rows {
368 let exec_uuid: Uuid = row.try_get("execution_id").map_err(map_sqlx_error)?;
369 let attempt_index_i: i64 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
370
371 let cap_rows = sqlx::query(q_attempt::SELECT_EXEC_CAPABILITIES_SQL)
375 .bind(exec_uuid)
376 .fetch_all(&mut **conn)
377 .await
378 .map_err(map_sqlx_error)?;
379 let tokens: Vec<String> = cap_rows
380 .iter()
381 .map(|r| r.try_get::<String, _>("capability"))
382 .collect::<Result<Vec<_>, _>>()
383 .map_err(map_sqlx_error)?;
384 let req = CapabilityRequirement::new(tokens);
385 if caps_matches(&req, capabilities) {
386 claimable = Some((exec_uuid, attempt_index_i));
387 break;
388 }
389 }
390
391 let Some((exec_uuid, attempt_index_i)) = claimable else {
392 return Ok(None);
397 };
398
399 let now = now_ms();
400 let lease_ttl_ms = i64::from(policy.lease_ttl_ms);
401 let expires = now.saturating_add(lease_ttl_ms);
402
403 let epoch_row = sqlx::query(q_attempt::UPSERT_ATTEMPT_ON_CLAIM_SQL)
406 .bind(part)
407 .bind(exec_uuid)
408 .bind(attempt_index_i)
409 .bind(policy.worker_id.as_str())
410 .bind(policy.worker_instance_id.as_str())
411 .bind(expires)
412 .bind(now)
413 .fetch_one(&mut **conn)
414 .await
415 .map_err(map_sqlx_error)?;
416 let epoch_i: i64 = epoch_row.try_get("lease_epoch").map_err(map_sqlx_error)?;
417
418 sqlx::query(q_exec::UPDATE_EXEC_CORE_CLAIM_SQL)
423 .bind(part)
424 .bind(exec_uuid)
425 .bind(now)
426 .execute(&mut **conn)
427 .await
428 .map_err(map_sqlx_error)?;
429
430 let mut emits: Vec<PendingEmit> = Vec::new();
436 let ev = insert_lease_event(conn, part, exec_uuid, "acquired", now).await?;
437 emits.push((OutboxChannel::LeaseHistory, ev));
438
439 let attempt_index = AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0));
440 let exec_id = ff_core::types::ExecutionId::parse(&format!("{{fp:{part}}}:{exec_uuid}"))
441 .map_err(|e| EngineError::Validation {
442 kind: ValidationKind::InvalidInput,
443 detail: format!("reassembling exec id: {e}"),
444 })?;
445 let payload = HandlePayload::new(
446 exec_id,
447 attempt_index,
448 AttemptId::new(),
449 LeaseId::new(),
450 LeaseEpoch(u64::try_from(epoch_i).unwrap_or(1)),
451 u64::from(policy.lease_ttl_ms),
452 lane.clone(),
453 policy.worker_instance_id.clone(),
454 );
455 Ok(Some((encode_handle(&payload, HandleKind::Fresh), emits)))
456}
457
458async fn complete_impl(
459 pool: &SqlitePool,
460 pubsub: &PubSub,
461 handle: &Handle,
462 payload_bytes: Option<Vec<u8>>,
463) -> Result<(), EngineError> {
464 let payload = decode_handle(handle)?;
465 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
466 let attempt_index = i64::from(payload.attempt_index.0);
467 let expected_epoch = payload.lease_epoch.0;
468
469 let mut conn = begin_immediate(pool).await?;
470 let result = complete_inner(
471 &mut conn,
472 part,
473 exec_uuid,
474 attempt_index,
475 expected_epoch,
476 payload_bytes,
477 )
478 .await;
479 match result {
480 Ok(emits) => {
481 commit_or_rollback(&mut conn).await?;
482 dispatch_pending_emits(pubsub, &emits);
483 Ok(())
484 }
485 Err(e) => {
486 rollback_quiet(&mut conn).await;
487 Err(e)
488 }
489 }
490}
491
492async fn complete_inner(
493 conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
494 part: i64,
495 exec_uuid: Uuid,
496 attempt_index: i64,
497 expected_epoch: u64,
498 payload_bytes: Option<Vec<u8>>,
499) -> Result<Vec<PendingEmit>, EngineError> {
500 fence_check(conn, part, exec_uuid, attempt_index, expected_epoch).await?;
501 let now = now_ms();
502
503 sqlx::query(q_attempt::UPDATE_ATTEMPT_COMPLETE_SQL)
504 .bind(now)
505 .bind(part)
506 .bind(exec_uuid)
507 .bind(attempt_index)
508 .execute(&mut **conn)
509 .await
510 .map_err(map_sqlx_error)?;
511
512 sqlx::query(q_exec::UPDATE_EXEC_CORE_COMPLETE_SQL)
513 .bind(now)
514 .bind(payload_bytes.as_deref())
515 .bind(part)
516 .bind(exec_uuid)
517 .execute(&mut **conn)
518 .await
519 .map_err(map_sqlx_error)?;
520
521 let mut emits: Vec<PendingEmit> = Vec::new();
522 let completion_ev = insert_completion_event_ev(conn, part, exec_uuid, "success", now).await?;
523 emits.push((OutboxChannel::Completion, completion_ev));
524
525 let lease_ev = insert_lease_event(conn, part, exec_uuid, "revoked", now).await?;
526 emits.push((OutboxChannel::LeaseHistory, lease_ev));
527 Ok(emits)
528}
529
530fn classify_retryable(classification: FailureClass) -> bool {
539 match classification {
540 FailureClass::Transient | FailureClass::InfraCrash => true,
541 FailureClass::Permanent | FailureClass::Timeout | FailureClass::Cancelled => false,
542 _ => true,
549 }
550}
551
552async fn fail_impl(
553 pool: &SqlitePool,
554 pubsub: &PubSub,
555 handle: &Handle,
556 reason: FailureReason,
557 classification: FailureClass,
558) -> Result<FailOutcome, EngineError> {
559 let payload = decode_handle(handle)?;
560 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
561 let attempt_index = i64::from(payload.attempt_index.0);
562 let expected_epoch = payload.lease_epoch.0;
563 let retryable = classify_retryable(classification);
564
565 let mut conn = begin_immediate(pool).await?;
566 let result = fail_inner(
567 &mut conn,
568 part,
569 exec_uuid,
570 attempt_index,
571 expected_epoch,
572 retryable,
573 &reason,
574 classification,
575 )
576 .await;
577 match result {
578 Ok((outcome, emits)) => {
579 commit_or_rollback(&mut conn).await?;
580 dispatch_pending_emits(pubsub, &emits);
581 Ok(outcome)
582 }
583 Err(e) => {
584 rollback_quiet(&mut conn).await;
585 Err(e)
586 }
587 }
588}
589
590#[allow(clippy::too_many_arguments)] async fn fail_inner(
592 conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
593 part: i64,
594 exec_uuid: Uuid,
595 attempt_index: i64,
596 expected_epoch: u64,
597 retryable: bool,
598 reason: &FailureReason,
599 classification: FailureClass,
600) -> Result<(FailOutcome, Vec<PendingEmit>), EngineError> {
601 fence_check(conn, part, exec_uuid, attempt_index, expected_epoch).await?;
602 let now = now_ms();
603 let mut emits: Vec<PendingEmit> = Vec::new();
604
605 if retryable {
606 sqlx::query(q_attempt::UPDATE_ATTEMPT_FAIL_RETRY_SQL)
607 .bind(now)
608 .bind(part)
609 .bind(exec_uuid)
610 .bind(attempt_index)
611 .execute(&mut **conn)
612 .await
613 .map_err(map_sqlx_error)?;
614
615 sqlx::query(q_exec::UPDATE_EXEC_CORE_FAIL_RETRY_SQL)
616 .bind(&reason.message)
617 .bind(part)
618 .bind(exec_uuid)
619 .execute(&mut **conn)
620 .await
621 .map_err(map_sqlx_error)?;
622
623 let lease_ev = insert_lease_event(conn, part, exec_uuid, "revoked", now).await?;
624 emits.push((OutboxChannel::LeaseHistory, lease_ev));
625 tracing::warn!(
629 error.message = %reason.message,
630 classification = ?classification,
631 execution_id = %exec_uuid,
632 attempt_index = attempt_index,
633 "sqlite.fail: scheduling retry"
634 );
635 Ok((
636 FailOutcome::RetryScheduled {
637 delay_until: ff_core::types::TimestampMs::from_millis(now),
638 },
639 emits,
640 ))
641 } else {
642 sqlx::query(q_attempt::UPDATE_ATTEMPT_FAIL_TERMINAL_SQL)
643 .bind(now)
644 .bind(part)
645 .bind(exec_uuid)
646 .bind(attempt_index)
647 .execute(&mut **conn)
648 .await
649 .map_err(map_sqlx_error)?;
650
651 sqlx::query(q_exec::UPDATE_EXEC_CORE_FAIL_TERMINAL_SQL)
652 .bind(now)
653 .bind(&reason.message)
654 .bind(part)
655 .bind(exec_uuid)
656 .execute(&mut **conn)
657 .await
658 .map_err(map_sqlx_error)?;
659
660 let completion_ev =
661 insert_completion_event_ev(conn, part, exec_uuid, "failed", now).await?;
662 emits.push((OutboxChannel::Completion, completion_ev));
663
664 let lease_ev = insert_lease_event(conn, part, exec_uuid, "revoked", now).await?;
665 emits.push((OutboxChannel::LeaseHistory, lease_ev));
666 Ok((FailOutcome::TerminalFailed, emits))
667 }
668}
669
670pub(crate) async fn insert_lease_event(
680 conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
681 part: i64,
682 exec_uuid: Uuid,
683 event_type: &str,
684 now: i64,
685) -> Result<OutboxEvent, EngineError> {
686 sqlx::query(q_dispatch::INSERT_LEASE_EVENT_SQL)
687 .bind(exec_uuid.to_string())
688 .bind(event_type)
689 .bind(now)
690 .bind(part)
691 .bind(exec_uuid)
694 .execute(&mut **conn)
695 .await
696 .map_err(map_sqlx_error)?;
697 last_outbox_event(conn, part).await
698}
699
700pub(crate) async fn insert_completion_event_ev(
703 conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
704 part: i64,
705 exec_uuid: Uuid,
706 outcome: &str,
707 now: i64,
708) -> Result<OutboxEvent, EngineError> {
709 sqlx::query(q_attempt::INSERT_COMPLETION_EVENT_SQL)
710 .bind(outcome)
711 .bind(now)
712 .bind(part)
713 .bind(exec_uuid)
714 .execute(&mut **conn)
715 .await
716 .map_err(map_sqlx_error)?;
717 last_outbox_event(conn, part).await
718}
719
720async fn renew_impl(
723 pool: &SqlitePool,
724 pubsub: &PubSub,
725 handle: &Handle,
726) -> Result<LeaseRenewal, EngineError> {
727 let payload = decode_handle(handle)?;
728 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
729 let attempt_index = i64::from(payload.attempt_index.0);
730 let expected_epoch = payload.lease_epoch.0;
731 let lease_ttl_ms = i64::try_from(payload.lease_ttl_ms).unwrap_or(0);
732
733 let mut conn = begin_immediate(pool).await?;
734 let result = renew_inner(
735 &mut conn,
736 part,
737 exec_uuid,
738 attempt_index,
739 expected_epoch,
740 lease_ttl_ms,
741 )
742 .await;
743 match result {
744 Ok((renewal, emits)) => {
745 commit_or_rollback(&mut conn).await?;
746 dispatch_pending_emits(pubsub, &emits);
747 Ok(renewal)
748 }
749 Err(e) => {
750 rollback_quiet(&mut conn).await;
751 Err(e)
752 }
753 }
754}
755
756async fn renew_inner(
757 conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
758 part: i64,
759 exec_uuid: Uuid,
760 attempt_index: i64,
761 expected_epoch: u64,
762 lease_ttl_ms: i64,
763) -> Result<(LeaseRenewal, Vec<PendingEmit>), EngineError> {
764 fence_check(conn, part, exec_uuid, attempt_index, expected_epoch).await?;
765 let now = now_ms();
766 let new_expires = now.saturating_add(lease_ttl_ms);
767
768 sqlx::query(q_lease::UPDATE_ATTEMPT_RENEW_SQL)
769 .bind(new_expires)
770 .bind(part)
771 .bind(exec_uuid)
772 .bind(attempt_index)
773 .execute(&mut **conn)
774 .await
775 .map_err(map_sqlx_error)?;
776
777 let ev = insert_lease_event(conn, part, exec_uuid, "renewed", now).await?;
779 let emits = vec![(OutboxChannel::LeaseHistory, ev)];
780
781 Ok((
782 LeaseRenewal::new(u64::try_from(new_expires).unwrap_or(0), expected_epoch),
783 emits,
784 ))
785}
786
787async fn progress_impl(
788 pool: &SqlitePool,
789 handle: &Handle,
790 percent: Option<u8>,
791 message: Option<String>,
792) -> Result<(), EngineError> {
793 let payload = decode_handle(handle)?;
794 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
795 let attempt_index = i64::from(payload.attempt_index.0);
796 let expected_epoch = payload.lease_epoch.0;
797
798 let mut conn = begin_immediate(pool).await?;
799 let result = progress_inner(
800 &mut conn,
801 part,
802 exec_uuid,
803 attempt_index,
804 expected_epoch,
805 percent,
806 message,
807 )
808 .await;
809 match result {
810 Ok(()) => commit_or_rollback(&mut conn).await,
811 Err(e) => {
812 rollback_quiet(&mut conn).await;
813 Err(e)
814 }
815 }
816}
817
818async fn progress_inner(
819 conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
820 part: i64,
821 exec_uuid: Uuid,
822 attempt_index: i64,
823 expected_epoch: u64,
824 percent: Option<u8>,
825 message: Option<String>,
826) -> Result<(), EngineError> {
827 fence_check(conn, part, exec_uuid, attempt_index, expected_epoch).await?;
828
829 sqlx::query(q_exec::UPDATE_EXEC_CORE_PROGRESS_SQL)
835 .bind(percent.map(i64::from))
836 .bind(message)
837 .bind(part)
838 .bind(exec_uuid)
839 .execute(&mut **conn)
840 .await
841 .map_err(map_sqlx_error)?;
842 Ok(())
843}
844
845fn apply_json_merge_patch(target: &mut serde_json::Value, patch: &serde_json::Value) {
853 use serde_json::Value;
854 if let Value::Object(patch_map) = patch {
855 if !target.is_object() {
856 *target = Value::Object(serde_json::Map::new());
857 }
858 let target_map = target.as_object_mut().expect("just ensured object");
859 for (k, v) in patch_map {
860 match v {
861 Value::Null => {
862 target_map.remove(k);
863 }
864 Value::String(s) if s == SUMMARY_NULL_SENTINEL => {
865 target_map.insert(k.clone(), Value::Null);
866 }
867 Value::Object(_) => {
868 let entry = target_map.entry(k.clone()).or_insert(Value::Null);
869 apply_json_merge_patch(entry, v);
870 }
871 other => {
872 target_map.insert(k.clone(), other.clone());
873 }
874 }
875 }
876 } else {
877 *target = patch.clone();
878 }
879}
880
881fn build_fields_json(frame: &Frame) -> String {
885 use serde_json::{Map, Value};
886 let payload_str = String::from_utf8_lossy(&frame.bytes).into_owned();
887 let mut map = Map::new();
888 let frame_type = if frame.frame_type.is_empty() {
889 match frame.kind {
890 FrameKind::Stdout => "stdout",
891 FrameKind::Stderr => "stderr",
892 FrameKind::Event => "event",
893 FrameKind::Blob => "blob",
894 _ => "event",
895 }
896 .to_owned()
897 } else {
898 frame.frame_type.clone()
899 };
900 map.insert("frame_type".into(), Value::String(frame_type));
901 map.insert("payload".into(), Value::String(payload_str));
902 map.insert("encoding".into(), Value::String("utf8".into()));
903 map.insert("source".into(), Value::String("worker".into()));
904 if let Some(corr) = &frame.correlation_id {
905 map.insert("correlation_id".into(), Value::String(corr.clone()));
906 }
907 Value::Object(map).to_string()
908}
909
910async fn append_frame_impl(
911 pool: &SqlitePool,
912 pubsub: &PubSub,
913 handle: &Handle,
914 frame: Frame,
915) -> Result<AppendFrameOutcome, EngineError> {
916 let payload = decode_handle(handle)?;
917 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
918 let attempt_index = i64::from(payload.attempt_index.0);
919 let expected_epoch = payload.lease_epoch.0;
920
921 let mut conn = begin_immediate(pool).await?;
922 let result = append_frame_inner(
923 &mut conn,
924 part,
925 exec_uuid,
926 attempt_index,
927 expected_epoch,
928 frame,
929 )
930 .await;
931 match result {
932 Ok((outcome, emits)) => {
933 commit_or_rollback(&mut conn).await?;
934 dispatch_pending_emits(pubsub, &emits);
935 Ok(outcome)
936 }
937 Err(e) => {
938 rollback_quiet(&mut conn).await;
939 Err(e)
940 }
941 }
942}
943
944async fn append_frame_inner(
945 conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
946 part: i64,
947 exec_uuid: Uuid,
948 attempt_index: i64,
949 expected_epoch: u64,
950 frame: Frame,
951) -> Result<(AppendFrameOutcome, Vec<PendingEmit>), EngineError> {
952 fence_check(conn, part, exec_uuid, attempt_index, expected_epoch).await?;
953
954 let ts_ms = now_ms();
955 let mode_wire = frame.mode.wire_str();
956 let fields_text = build_fields_json(&frame);
957
958 let max_seq: Option<i64> = sqlx::query_scalar(q_stream::SELECT_MAX_SEQ_SQL)
963 .bind(part)
964 .bind(exec_uuid)
965 .bind(attempt_index)
966 .bind(ts_ms)
967 .fetch_one(&mut **conn)
968 .await
969 .map_err(map_sqlx_error)?;
970 let next_seq: i64 = max_seq.map(|s| s + 1).unwrap_or(0);
971
972 sqlx::query(q_stream::INSERT_STREAM_FRAME_SQL)
973 .bind(part)
974 .bind(exec_uuid)
975 .bind(attempt_index)
976 .bind(ts_ms)
977 .bind(next_seq)
978 .bind(&fields_text)
979 .bind(mode_wire)
980 .bind(ts_ms)
981 .execute(&mut **conn)
982 .await
983 .map_err(map_sqlx_error)?;
984
985 let stream_ev = last_outbox_event(conn, part).await?;
991 let emits: Vec<PendingEmit> = vec![(OutboxChannel::StreamFrame, stream_ev)];
992
993 let mut summary_version: Option<u64> = None;
994
995 if let StreamMode::DurableSummary { patch_kind } = &frame.mode {
997 let patch: serde_json::Value =
998 serde_json::from_slice(&frame.bytes).map_err(|e| EngineError::Validation {
999 kind: ValidationKind::InvalidInput,
1000 detail: format!("summary patch not valid JSON: {e}"),
1001 })?;
1002
1003 let existing: Option<(String, i64)> = sqlx::query_as(q_stream::SELECT_STREAM_SUMMARY_SQL)
1004 .bind(part)
1005 .bind(exec_uuid)
1006 .bind(attempt_index)
1007 .fetch_optional(&mut **conn)
1008 .await
1009 .map_err(map_sqlx_error)?;
1010
1011 let (mut doc, prev_version): (serde_json::Value, i64) = match existing {
1012 Some((text, v)) => {
1013 let parsed: serde_json::Value =
1018 serde_json::from_str(&text).map_err(|e| EngineError::Validation {
1019 kind: ValidationKind::Corruption,
1020 detail: format!("corrupt summary document in ff_stream_summary: {e}"),
1021 })?;
1022 (parsed, v)
1023 }
1024 None => (serde_json::Value::Object(serde_json::Map::new()), 0),
1025 };
1026
1027 match patch_kind {
1028 PatchKind::JsonMergePatch => apply_json_merge_patch(&mut doc, &patch),
1029 _ => apply_json_merge_patch(&mut doc, &patch),
1030 }
1031
1032 let new_version = prev_version + 1;
1033 let patch_kind_wire = "json-merge-patch";
1034 let doc_text = doc.to_string();
1035 if prev_version == 0 {
1036 sqlx::query(q_stream::INSERT_STREAM_SUMMARY_SQL)
1037 .bind(part)
1038 .bind(exec_uuid)
1039 .bind(attempt_index)
1040 .bind(&doc_text)
1041 .bind(new_version)
1042 .bind(patch_kind_wire)
1043 .bind(ts_ms)
1044 .bind(ts_ms)
1045 .execute(&mut **conn)
1046 .await
1047 .map_err(map_sqlx_error)?;
1048 } else {
1049 sqlx::query(q_stream::UPDATE_STREAM_SUMMARY_SQL)
1050 .bind(part)
1051 .bind(exec_uuid)
1052 .bind(attempt_index)
1053 .bind(&doc_text)
1054 .bind(new_version)
1055 .bind(patch_kind_wire)
1056 .bind(ts_ms)
1057 .execute(&mut **conn)
1058 .await
1059 .map_err(map_sqlx_error)?;
1060 }
1061 summary_version = Some(u64::try_from(new_version).unwrap_or(0));
1062 }
1063
1064 if let StreamMode::BestEffortLive { config } = &frame.mode {
1067 let meta: Option<(f64, i64)> = sqlx::query_as(q_stream::SELECT_STREAM_META_SQL)
1068 .bind(part)
1069 .bind(exec_uuid)
1070 .bind(attempt_index)
1071 .fetch_optional(&mut **conn)
1072 .await
1073 .map_err(map_sqlx_error)?;
1074
1075 let (ema_prev, last_ts) = meta.unwrap_or((0.0, 0));
1076 let inst_rate: f64 = if last_ts > 0 && ts_ms > last_ts {
1077 1000.0 / ((ts_ms - last_ts) as f64)
1078 } else {
1079 0.0
1080 };
1081 let alpha = config.ema_alpha;
1082 let ema_new = alpha * inst_rate + (1.0 - alpha) * ema_prev;
1083 let k_raw = (ema_new * (f64::from(config.ttl_ms)) / 1000.0).ceil() as i64 * 2;
1084 let k = k_raw
1085 .max(i64::from(config.maxlen_floor))
1086 .min(i64::from(config.maxlen_ceiling));
1087
1088 sqlx::query(q_stream::UPSERT_STREAM_META_SQL)
1089 .bind(part)
1090 .bind(exec_uuid)
1091 .bind(attempt_index)
1092 .bind(ema_new)
1093 .bind(ts_ms)
1094 .bind(k)
1095 .execute(&mut **conn)
1096 .await
1097 .map_err(map_sqlx_error)?;
1098
1099 sqlx::query(q_stream::TRIM_STREAM_FRAMES_SQL)
1100 .bind(part)
1101 .bind(exec_uuid)
1102 .bind(attempt_index)
1103 .bind(k)
1104 .execute(&mut **conn)
1105 .await
1106 .map_err(map_sqlx_error)?;
1107 }
1108
1109 let frame_count: i64 = sqlx::query_scalar(q_stream::COUNT_STREAM_FRAMES_SQL)
1110 .bind(part)
1111 .bind(exec_uuid)
1112 .bind(attempt_index)
1113 .fetch_one(&mut **conn)
1114 .await
1115 .map_err(map_sqlx_error)?;
1116
1117 let stream_id = format!("{ts_ms}-{next_seq}");
1118 let mut out = AppendFrameOutcome::new(stream_id, u64::try_from(frame_count).unwrap_or(0));
1119 if let Some(v) = summary_version {
1120 out = out.with_summary_version(v);
1121 }
1122 Ok((out, emits))
1123}
1124
1125#[cfg(feature = "streaming")]
1133fn parse_cursor_bound(c: &StreamCursor) -> Result<(i64, i64), EngineError> {
1134 match c {
1135 StreamCursor::Start => Ok((i64::MIN, i64::MIN)),
1136 StreamCursor::End => Ok((i64::MAX, i64::MAX)),
1137 StreamCursor::At(s) => parse_concrete_cursor(s),
1138 }
1139}
1140
1141#[cfg(feature = "streaming")]
1142fn parse_concrete_cursor(s: &str) -> Result<(i64, i64), EngineError> {
1143 let (ms, seq) = match s.split_once('-') {
1144 Some((a, b)) => (a, b),
1145 None => (s, "0"),
1146 };
1147 let ms: i64 = ms.parse().map_err(|_| EngineError::Validation {
1148 kind: ValidationKind::InvalidInput,
1149 detail: format!("bad stream cursor '{s}' (ms)"),
1150 })?;
1151 let sq: i64 = seq.parse().map_err(|_| EngineError::Validation {
1152 kind: ValidationKind::InvalidInput,
1153 detail: format!("bad stream cursor '{s}' (seq)"),
1154 })?;
1155 Ok((ms, sq))
1156}
1157
1158#[cfg(feature = "streaming")]
1159fn row_to_frame(ts_ms: i64, seq: i64, fields_text: &str) -> StreamFrame {
1160 use std::collections::BTreeMap;
1161 let mut out: BTreeMap<String, String> = BTreeMap::new();
1162 if let Ok(serde_json::Value::Object(map)) =
1163 serde_json::from_str::<serde_json::Value>(fields_text)
1164 {
1165 for (k, v) in map {
1166 let s = match v {
1167 serde_json::Value::String(s) => s,
1168 other => other.to_string(),
1169 };
1170 out.insert(k, s);
1171 }
1172 }
1173 StreamFrame {
1174 id: format!("{ts_ms}-{seq}"),
1175 fields: out,
1176 }
1177}
1178
1179#[cfg(feature = "streaming")]
1180async fn read_stream_impl(
1181 pool: &SqlitePool,
1182 execution_id: &ExecutionId,
1183 attempt_index: AttemptIndex,
1184 from: StreamCursor,
1185 to: StreamCursor,
1186 count_limit: u64,
1187) -> Result<StreamFrames, EngineError> {
1188 let (part, exec_uuid) = split_exec_id(execution_id)?;
1189 let aidx: i64 = i64::from(attempt_index.0);
1190 let (from_ms, from_seq) = parse_cursor_bound(&from)?;
1191 let (to_ms, to_seq) = parse_cursor_bound(&to)?;
1192 let lim = i64::try_from(count_limit.min(STREAM_READ_HARD_CAP)).unwrap_or(i64::MAX);
1193
1194 let rows = sqlx::query(q_stream::READ_STREAM_RANGE_SQL)
1195 .bind(part)
1196 .bind(exec_uuid)
1197 .bind(aidx)
1198 .bind(from_ms)
1199 .bind(from_seq)
1200 .bind(to_ms)
1201 .bind(to_seq)
1202 .bind(lim)
1203 .fetch_all(pool)
1204 .await
1205 .map_err(map_sqlx_error)?;
1206
1207 let mut frames = Vec::with_capacity(rows.len());
1208 for row in rows {
1209 let ts: i64 = row.try_get("ts_ms").map_err(map_sqlx_error)?;
1210 let seq: i64 = row.try_get("seq").map_err(map_sqlx_error)?;
1211 let fields_text: String = row.try_get("fields").map_err(map_sqlx_error)?;
1212 frames.push(row_to_frame(ts, seq, &fields_text));
1213 }
1214 Ok(StreamFrames {
1215 frames,
1216 closed_at: None,
1217 closed_reason: None,
1218 })
1219}
1220
1221#[cfg(feature = "streaming")]
1222#[allow(clippy::too_many_arguments)] async fn tail_stream_impl(
1224 pool: &SqlitePool,
1225 pubsub: &PubSub,
1226 execution_id: &ExecutionId,
1227 attempt_index: AttemptIndex,
1228 after: StreamCursor,
1229 block_ms: u64,
1230 count_limit: u64,
1231 visibility: TailVisibility,
1232) -> Result<StreamFrames, EngineError> {
1233 let (part, exec_uuid) = split_exec_id(execution_id)?;
1234 let aidx: i64 = i64::from(attempt_index.0);
1235 let (after_ms, after_seq) = match &after {
1236 StreamCursor::At(s) => parse_concrete_cursor(s)?,
1237 _ => {
1238 return Err(EngineError::Validation {
1239 kind: ValidationKind::InvalidInput,
1240 detail: "tail_stream requires concrete after cursor".into(),
1241 });
1242 }
1243 };
1244 let lim = i64::try_from(count_limit.min(STREAM_READ_HARD_CAP)).unwrap_or(i64::MAX);
1245 let sql = match visibility {
1246 TailVisibility::ExcludeBestEffort => q_stream::TAIL_STREAM_AFTER_EXCLUDE_BE_SQL,
1247 _ => q_stream::TAIL_STREAM_AFTER_SQL,
1248 };
1249
1250 let mut rx = pubsub.stream_frame.subscribe();
1254
1255 let do_select = || async {
1256 sqlx::query(sql)
1257 .bind(part)
1258 .bind(exec_uuid)
1259 .bind(aidx)
1260 .bind(after_ms)
1261 .bind(after_seq)
1262 .bind(lim)
1263 .fetch_all(pool)
1264 .await
1265 .map_err(map_sqlx_error)
1266 };
1267
1268 let rows = do_select().await?;
1269 if !rows.is_empty() || block_ms == 0 {
1270 return Ok(rows_to_frames(rows));
1271 }
1272
1273 let start = std::time::Instant::now();
1278 let total = Duration::from_millis(block_ms);
1279 loop {
1280 let remaining = match total.checked_sub(start.elapsed()) {
1281 Some(r) if !r.is_zero() => r,
1282 _ => break,
1283 };
1284 match tokio::time::timeout(remaining, rx.recv()).await {
1285 Ok(Ok(_)) => {}
1286 Ok(Err(broadcast::error::RecvError::Lagged(_))) => {}
1288 Ok(Err(broadcast::error::RecvError::Closed)) => {
1290 return Ok(rows_to_frames(do_select().await?));
1291 }
1292 Err(_) => break,
1294 }
1295 let rows = do_select().await?;
1296 if !rows.is_empty() {
1297 return Ok(rows_to_frames(rows));
1298 }
1299 if start.elapsed() >= total {
1300 break;
1301 }
1302 }
1303
1304 Ok(StreamFrames::empty_open())
1305}
1306
1307#[cfg(feature = "streaming")]
1308fn rows_to_frames(rows: Vec<sqlx::sqlite::SqliteRow>) -> StreamFrames {
1309 let mut frames = Vec::with_capacity(rows.len());
1310 for row in rows {
1311 let ts: i64 = row.try_get("ts_ms").unwrap_or(0);
1312 let seq: i64 = row.try_get("seq").unwrap_or(0);
1313 let fields_text: String = row.try_get("fields").unwrap_or_default();
1314 frames.push(row_to_frame(ts, seq, &fields_text));
1315 }
1316 StreamFrames {
1317 frames,
1318 closed_at: None,
1319 closed_reason: None,
1320 }
1321}
1322
1323#[cfg(feature = "streaming")]
1324async fn read_summary_impl(
1325 pool: &SqlitePool,
1326 execution_id: &ExecutionId,
1327 attempt_index: AttemptIndex,
1328) -> Result<Option<SummaryDocument>, EngineError> {
1329 let (part, exec_uuid) = split_exec_id(execution_id)?;
1330 let aidx: i64 = i64::from(attempt_index.0);
1331
1332 let row = sqlx::query(q_stream::READ_SUMMARY_FULL_SQL)
1333 .bind(part)
1334 .bind(exec_uuid)
1335 .bind(aidx)
1336 .fetch_optional(pool)
1337 .await
1338 .map_err(map_sqlx_error)?;
1339
1340 let Some(row) = row else { return Ok(None) };
1341 let doc_text: String = row.try_get("document_json").map_err(map_sqlx_error)?;
1342 let version: i64 = row.try_get("version").map_err(map_sqlx_error)?;
1343 let patch_kind_wire: Option<String> = row
1344 .try_get::<Option<String>, _>("patch_kind")
1345 .unwrap_or(None);
1346 let last_updated: i64 = row.try_get("last_updated_ms").map_err(map_sqlx_error)?;
1347 let first_applied: i64 = row.try_get("first_applied_ms").map_err(map_sqlx_error)?;
1348
1349 let parsed: serde_json::Value =
1354 serde_json::from_str(&doc_text).map_err(|e| EngineError::Validation {
1355 kind: ValidationKind::Corruption,
1356 detail: format!("corrupt summary document in ff_stream_summary: {e}"),
1357 })?;
1358 let bytes = serde_json::to_vec(&parsed).map_err(|e| EngineError::Validation {
1359 kind: ValidationKind::InvalidInput,
1360 detail: format!("summary document not serialisable: {e}"),
1361 })?;
1362 let patch_kind = match patch_kind_wire.as_deref() {
1363 Some("json-merge-patch") => PatchKind::JsonMergePatch,
1364 _ => PatchKind::JsonMergePatch,
1365 };
1366 Ok(Some(SummaryDocument::new(
1367 bytes,
1368 u64::try_from(version).unwrap_or(0),
1369 patch_kind,
1370 u64::try_from(last_updated).unwrap_or(0),
1371 u64::try_from(first_applied).unwrap_or(0),
1372 )))
1373}
1374
1375async fn claim_from_reclaim_impl(
1378 pool: &SqlitePool,
1379 pubsub: &PubSub,
1380 token: &ResumeToken,
1381) -> Result<Option<Handle>, EngineError> {
1382 let eid = &token.grant.execution_id;
1383 let (part, exec_uuid) = split_exec_id(eid)?;
1384
1385 let mut conn = begin_immediate(pool).await?;
1386 let result = claim_from_reclaim_inner(&mut conn, part, exec_uuid, token).await;
1387 match result {
1388 Ok(Some((handle, emits))) => {
1389 commit_or_rollback(&mut conn).await?;
1390 dispatch_pending_emits(pubsub, &emits);
1391 Ok(Some(handle))
1392 }
1393 Ok(None) => {
1394 rollback_quiet(&mut conn).await;
1395 Ok(None)
1396 }
1397 Err(e) => {
1398 rollback_quiet(&mut conn).await;
1399 Err(e)
1400 }
1401 }
1402}
1403
1404async fn claim_from_reclaim_inner(
1405 conn: &mut sqlx::pool::PoolConnection<sqlx::Sqlite>,
1406 part: i64,
1407 exec_uuid: Uuid,
1408 token: &ResumeToken,
1409) -> Result<Option<(Handle, Vec<PendingEmit>)>, EngineError> {
1410 let row = sqlx::query(q_lease::SELECT_LATEST_ATTEMPT_FOR_RECLAIM_SQL)
1413 .bind(part)
1414 .bind(exec_uuid)
1415 .fetch_optional(&mut **conn)
1416 .await
1417 .map_err(map_sqlx_error)?;
1418 let Some(row) = row else {
1419 return Err(EngineError::NotFound { entity: "attempt" });
1420 };
1421 let attempt_index_i: i64 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
1422 let current_epoch: i64 = row.try_get("lease_epoch").map_err(map_sqlx_error)?;
1423 let expires_at: Option<i64> = row
1424 .try_get::<Option<i64>, _>("lease_expires_at_ms")
1425 .map_err(map_sqlx_error)?;
1426
1427 let now = now_ms();
1428 let live = matches!(expires_at, Some(exp) if exp > now);
1430 if live {
1431 return Ok(None);
1432 }
1433
1434 let lease_ttl_ms = i64::from(token.lease_ttl_ms);
1435 let new_expires = now.saturating_add(lease_ttl_ms);
1436
1437 sqlx::query(q_lease::UPDATE_ATTEMPT_RECLAIM_SQL)
1438 .bind(token.worker_id.as_str())
1439 .bind(token.worker_instance_id.as_str())
1440 .bind(new_expires)
1441 .bind(now)
1442 .bind(part)
1443 .bind(exec_uuid)
1444 .bind(attempt_index_i)
1445 .execute(&mut **conn)
1446 .await
1447 .map_err(map_sqlx_error)?;
1448
1449 sqlx::query(q_lease::UPDATE_EXEC_CORE_RECLAIM_SQL)
1450 .bind(part)
1451 .bind(exec_uuid)
1452 .execute(&mut **conn)
1453 .await
1454 .map_err(map_sqlx_error)?;
1455
1456 let ev = insert_lease_event(conn, part, exec_uuid, "reclaimed", now).await?;
1457 let emits = vec![(OutboxChannel::LeaseHistory, ev)];
1458
1459 let new_epoch = current_epoch.saturating_add(1);
1460 let payload = HandlePayload::new(
1461 token.grant.execution_id.clone(),
1462 AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0)),
1463 AttemptId::new(),
1464 LeaseId::new(),
1465 LeaseEpoch(u64::try_from(new_epoch).unwrap_or(0)),
1466 u64::from(token.lease_ttl_ms),
1467 token.grant.lane_id.clone(),
1468 token.worker_instance_id.clone(),
1469 );
1470 Ok(Some((encode_handle(&payload, HandleKind::Resumed), emits)))
1471}
1472
1473#[cfg(feature = "core")]
1480fn encode_policy_json(
1481 policy: Option<&ff_core::policy::ExecutionPolicy>,
1482) -> Result<Option<String>, EngineError> {
1483 match policy {
1484 Some(p) => serde_json::to_string(p)
1485 .map(Some)
1486 .map_err(|e| EngineError::Validation {
1487 kind: ValidationKind::InvalidInput,
1488 detail: format!("create_execution: policy: serialize failed: {e}"),
1489 }),
1490 None => Ok(None),
1491 }
1492}
1493
1494#[cfg(feature = "core")]
1498fn build_create_execution_raw_fields(args: &CreateExecutionArgs) -> String {
1499 use serde_json::{Map, Value};
1500 let mut raw: Map<String, Value> = Map::new();
1501 raw.insert(
1502 "namespace".into(),
1503 Value::String(args.namespace.as_str().to_owned()),
1504 );
1505 raw.insert(
1506 "execution_kind".into(),
1507 Value::String(args.execution_kind.clone()),
1508 );
1509 raw.insert(
1510 "creator_identity".into(),
1511 Value::String(args.creator_identity.clone()),
1512 );
1513 if let Some(k) = &args.idempotency_key {
1514 raw.insert("idempotency_key".into(), Value::String(k.clone()));
1515 }
1516 if let Some(enc) = &args.payload_encoding {
1517 raw.insert("payload_encoding".into(), Value::String(enc.clone()));
1518 }
1519 raw.insert(
1520 "last_mutation_at".into(),
1521 Value::String(args.now.0.to_string()),
1522 );
1523 raw.insert("total_attempt_count".into(), Value::String("0".to_owned()));
1524 let tags_json: Map<String, Value> = args
1525 .tags
1526 .iter()
1527 .map(|(k, v)| (k.clone(), Value::String(v.clone())))
1528 .collect();
1529 raw.insert("tags".into(), Value::Object(tags_json));
1530 Value::Object(raw).to_string()
1531}
1532
1533#[cfg(feature = "core")]
1534async fn create_execution_impl(
1535 pool: &SqlitePool,
1536 args: &CreateExecutionArgs,
1537) -> Result<CreateExecutionResult, EngineError> {
1538 let part: i64 = i64::from(args.execution_id.partition());
1539 let exec_uuid = {
1540 let s = args.execution_id.as_str();
1541 let tail = s
1542 .split_once("}:")
1543 .map(|(_, t)| t)
1544 .ok_or_else(|| EngineError::Validation {
1545 kind: ValidationKind::InvalidInput,
1546 detail: format!("execution_id missing `}}:` separator: {s}"),
1547 })?;
1548 Uuid::parse_str(tail).map_err(|e| EngineError::Validation {
1549 kind: ValidationKind::InvalidInput,
1550 detail: format!("execution_id UUID invalid: {e}"),
1551 })?
1552 };
1553 let lane_id = args.lane_id.as_str().to_owned();
1554 let priority: i64 = i64::from(args.priority);
1555 let created_at_ms: i64 = args.now.0;
1556 let deadline_at_ms: Option<i64> = args.execution_deadline_at.map(|t| t.0);
1557 let raw_fields = build_create_execution_raw_fields(args);
1558 let policy_json = encode_policy_json(args.policy.as_ref())?;
1559
1560 let mut conn = begin_immediate(pool).await?;
1561
1562 let insert_result = sqlx::query(q_exec::INSERT_EXEC_CORE_SQL)
1563 .bind(part)
1564 .bind(exec_uuid)
1565 .bind(&lane_id)
1566 .bind(priority)
1567 .bind(created_at_ms)
1568 .bind(deadline_at_ms)
1569 .bind(args.input_payload.as_slice())
1570 .bind(policy_json.as_deref())
1571 .bind(&raw_fields)
1572 .execute(&mut *conn)
1573 .await
1574 .map_err(map_sqlx_error);
1575
1576 let result = async {
1577 let res = insert_result?;
1578 let inserted = res.rows_affected() > 0;
1579
1580 if inserted {
1581 let required: Vec<String> = args
1589 .policy
1590 .as_ref()
1591 .and_then(|p| p.routing_requirements.as_ref())
1592 .map(|r| r.required_capabilities.iter().cloned().collect())
1593 .unwrap_or_default();
1594 for cap in &required {
1595 sqlx::query(q_exec::INSERT_EXEC_CAPABILITY_SQL)
1596 .bind(exec_uuid)
1597 .bind(cap)
1598 .execute(&mut *conn)
1599 .await
1600 .map_err(map_sqlx_error)?;
1601 }
1602 }
1603
1604 sqlx::query(q_exec::INSERT_LANE_REGISTRY_SQL)
1608 .bind(&lane_id)
1609 .bind(created_at_ms)
1610 .execute(&mut *conn)
1611 .await
1612 .map_err(map_sqlx_error)?;
1613
1614 Ok::<bool, EngineError>(inserted)
1615 }
1616 .await;
1617
1618 match result {
1619 Ok(inserted) => {
1620 commit_or_rollback(&mut conn).await?;
1621 if inserted {
1622 Ok(CreateExecutionResult::Created {
1623 execution_id: args.execution_id.clone(),
1624 public_state: PublicState::Waiting,
1625 })
1626 } else {
1627 Ok(CreateExecutionResult::Duplicate {
1628 execution_id: args.execution_id.clone(),
1629 })
1630 }
1631 }
1632 Err(e) => {
1633 rollback_quiet(&mut conn).await;
1634 Err(e)
1635 }
1636 }
1637}
1638
1639#[cfg(feature = "core")]
1640async fn create_flow_impl(
1641 pool: &SqlitePool,
1642 args: &CreateFlowArgs,
1643) -> Result<CreateFlowResult, EngineError> {
1644 let part: i64 = 0;
1646 let flow_uuid: Uuid = args.flow_id.0;
1647 let now_ms = args.now.0;
1648
1649 let raw_fields = serde_json::json!({
1650 "flow_kind": args.flow_kind,
1651 "namespace": args.namespace.as_str(),
1652 "node_count": 0,
1653 "edge_count": 0,
1654 "last_mutation_at_ms": now_ms,
1655 })
1656 .to_string();
1657
1658 let mut conn = begin_immediate(pool).await?;
1659 let ins = sqlx::query(q_flow::INSERT_FLOW_CORE_SQL)
1660 .bind(part)
1661 .bind(flow_uuid)
1662 .bind(now_ms)
1663 .bind(&raw_fields)
1664 .execute(&mut *conn)
1665 .await
1666 .map_err(map_sqlx_error);
1667 match ins {
1668 Ok(r) => {
1669 commit_or_rollback(&mut conn).await?;
1670 if r.rows_affected() > 0 {
1671 Ok(CreateFlowResult::Created {
1672 flow_id: args.flow_id.clone(),
1673 })
1674 } else {
1675 Ok(CreateFlowResult::AlreadySatisfied {
1676 flow_id: args.flow_id.clone(),
1677 })
1678 }
1679 }
1680 Err(e) => {
1681 rollback_quiet(&mut conn).await;
1682 Err(e)
1683 }
1684 }
1685}
1686
1687#[cfg(feature = "core")]
1688async fn add_execution_to_flow_impl(
1689 pool: &SqlitePool,
1690 args: &AddExecutionToFlowArgs,
1691) -> Result<AddExecutionToFlowResult, EngineError> {
1692 let part: i64 = 0;
1693 let flow_uuid: Uuid = args.flow_id.0;
1694 let (exec_part, exec_uuid) = split_exec_id(&args.execution_id)?;
1695 if exec_part != part {
1699 return Err(EngineError::Validation {
1700 kind: ValidationKind::InvalidInput,
1701 detail: format!("execution partition mismatch: expected 0, got {exec_part}"),
1702 });
1703 }
1704 let now_ms = args.now.0;
1705
1706 let mut conn = begin_immediate(pool).await?;
1707 let work = async {
1708 let flow_row = sqlx::query(q_flow_staging::SELECT_FLOW_CORE_FOR_STAGE_SQL)
1710 .bind(part)
1711 .bind(flow_uuid)
1712 .fetch_optional(&mut *conn)
1713 .await
1714 .map_err(map_sqlx_error)?;
1715 let Some(flow_row) = flow_row else {
1716 return Err(EngineError::Validation {
1717 kind: ValidationKind::InvalidInput,
1718 detail: "flow_not_found".into(),
1719 });
1720 };
1721 let public_flow_state: String = flow_row
1722 .try_get("public_flow_state")
1723 .map_err(map_sqlx_error)?;
1724 if matches!(
1725 public_flow_state.as_str(),
1726 "cancelled" | "completed" | "failed" | "terminal"
1727 ) {
1728 return Err(EngineError::Validation {
1729 kind: ValidationKind::InvalidInput,
1730 detail: "flow_already_terminal".into(),
1731 });
1732 }
1733 let raw_fields_text: String = flow_row.try_get("raw_fields").map_err(map_sqlx_error)?;
1734
1735 let exec_row = sqlx::query(q_flow_staging::SELECT_EXEC_FLOW_ID_SQL)
1737 .bind(part)
1738 .bind(exec_uuid)
1739 .fetch_optional(&mut *conn)
1740 .await
1741 .map_err(map_sqlx_error)?;
1742 let Some(exec_row) = exec_row else {
1743 return Err(EngineError::Validation {
1744 kind: ValidationKind::InvalidInput,
1745 detail: "execution_not_found".into(),
1746 });
1747 };
1748 let existing_flow_id: Option<Uuid> = exec_row.try_get("flow_id").map_err(map_sqlx_error)?;
1749
1750 if existing_flow_id == Some(flow_uuid) {
1752 let raw_val: serde_json::Value = serde_json::from_str(&raw_fields_text)
1754 .unwrap_or_else(|_| serde_json::Value::Object(serde_json::Map::new()));
1755 let nc = raw_val
1756 .get("node_count")
1757 .and_then(|v| v.as_u64())
1758 .and_then(|n| u32::try_from(n).ok())
1759 .unwrap_or(0);
1760 return Ok(AddExecutionToFlowResult::AlreadyMember {
1761 execution_id: args.execution_id.clone(),
1762 node_count: nc,
1763 });
1764 }
1765
1766 if let Some(other) = existing_flow_id
1768 && other != flow_uuid
1769 {
1770 return Err(EngineError::Validation {
1771 kind: ValidationKind::InvalidInput,
1772 detail: format!("already_member_of_different_flow:{other}"),
1773 });
1774 }
1775
1776 sqlx::query(q_flow_staging::UPDATE_EXEC_SET_FLOW_ID_SQL)
1778 .bind(part)
1779 .bind(exec_uuid)
1780 .bind(flow_uuid)
1781 .execute(&mut *conn)
1782 .await
1783 .map_err(map_sqlx_error)?;
1784 sqlx::query(q_flow_staging::BUMP_FLOW_NODE_COUNT_SQL)
1785 .bind(part)
1786 .bind(flow_uuid)
1787 .bind(now_ms)
1788 .execute(&mut *conn)
1789 .await
1790 .map_err(map_sqlx_error)?;
1791 let new_nc: i64 = sqlx::query_scalar(q_flow_staging::SELECT_FLOW_NODE_COUNT_SQL)
1792 .bind(part)
1793 .bind(flow_uuid)
1794 .fetch_one(&mut *conn)
1795 .await
1796 .map_err(map_sqlx_error)?;
1797 Ok(AddExecutionToFlowResult::Added {
1798 execution_id: args.execution_id.clone(),
1799 new_node_count: u32::try_from(new_nc.max(0)).unwrap_or(0),
1800 })
1801 }
1802 .await;
1803
1804 match work {
1805 Ok(res) => {
1806 commit_or_rollback(&mut conn).await?;
1807 Ok(res)
1808 }
1809 Err(e) => {
1810 rollback_quiet(&mut conn).await;
1811 Err(e)
1812 }
1813 }
1814}
1815
1816#[cfg(feature = "core")]
1817async fn stage_dependency_edge_impl(
1818 pool: &SqlitePool,
1819 args: &StageDependencyEdgeArgs,
1820) -> Result<StageDependencyEdgeResult, EngineError> {
1821 if args.upstream_execution_id == args.downstream_execution_id {
1822 return Err(EngineError::Validation {
1823 kind: ValidationKind::InvalidInput,
1824 detail: "self_referencing_edge".into(),
1825 });
1826 }
1827
1828 let part: i64 = 0;
1829 let flow_uuid: Uuid = args.flow_id.0;
1830 let edge_uuid: Uuid = args.edge_id.0;
1831 let (up_part, upstream_uuid) = split_exec_id(&args.upstream_execution_id)?;
1832 let (down_part, downstream_uuid) = split_exec_id(&args.downstream_execution_id)?;
1833 if up_part != part || down_part != part {
1834 return Err(EngineError::Validation {
1835 kind: ValidationKind::InvalidInput,
1836 detail: "execution partition mismatch under single-writer SQLite".into(),
1837 });
1838 }
1839 let now_ms = args.now.0;
1840 let expected_rev = i64::try_from(args.expected_graph_revision).unwrap_or(i64::MAX);
1841
1842 let mut conn = begin_immediate(pool).await?;
1843 let work = async {
1844 let cas = sqlx::query(q_flow_staging::CAS_BUMP_FLOW_REV_SQL)
1847 .bind(part)
1848 .bind(flow_uuid)
1849 .bind(expected_rev)
1850 .bind(now_ms)
1851 .execute(&mut *conn)
1852 .await
1853 .map_err(map_sqlx_error)?;
1854 if cas.rows_affected() == 0 {
1855 let probe = sqlx::query(q_flow_staging::SELECT_FLOW_REV_AND_STATE_SQL)
1857 .bind(part)
1858 .bind(flow_uuid)
1859 .fetch_optional(&mut *conn)
1860 .await
1861 .map_err(map_sqlx_error)?;
1862 return match probe {
1863 None => Err(EngineError::Validation {
1864 kind: ValidationKind::InvalidInput,
1865 detail: "flow_not_found".into(),
1866 }),
1867 Some(r) => {
1868 let state: String = r.try_get("public_flow_state").map_err(map_sqlx_error)?;
1869 if matches!(
1870 state.as_str(),
1871 "cancelled" | "completed" | "failed" | "terminal"
1872 ) {
1873 Err(EngineError::Validation {
1874 kind: ValidationKind::InvalidInput,
1875 detail: "flow_already_terminal".into(),
1876 })
1877 } else {
1878 Err(EngineError::Contention(ContentionKind::StaleGraphRevision))
1879 }
1880 }
1881 };
1882 }
1883
1884 let member_rows =
1886 sqlx::query_scalar::<_, Uuid>(q_flow_staging::SELECT_FLOW_MEMBERSHIP_PAIR_SQL)
1887 .bind(part)
1888 .bind(flow_uuid)
1889 .bind(upstream_uuid)
1890 .bind(downstream_uuid)
1891 .fetch_all(&mut *conn)
1892 .await
1893 .map_err(map_sqlx_error)?;
1894 if !member_rows.contains(&upstream_uuid) || !member_rows.contains(&downstream_uuid) {
1895 return Err(EngineError::Validation {
1896 kind: ValidationKind::InvalidInput,
1897 detail: "execution_not_in_flow".into(),
1898 });
1899 }
1900
1901 let policy_json = serde_json::json!({
1903 "dependency_kind": args.dependency_kind,
1904 "satisfaction_condition": "all_required",
1905 "data_passing_ref": args.data_passing_ref.clone().unwrap_or_default(),
1906 "edge_state": "pending",
1907 "created_at_ms": now_ms,
1908 "created_by": "engine",
1909 "staged_at_ms": now_ms,
1910 "applied_at_ms": serde_json::Value::Null,
1911 })
1912 .to_string();
1913 let ins = sqlx::query(q_flow_staging::INSERT_EDGE_SQL)
1914 .bind(part)
1915 .bind(flow_uuid)
1916 .bind(edge_uuid)
1917 .bind(upstream_uuid)
1918 .bind(downstream_uuid)
1919 .bind(&policy_json)
1920 .execute(&mut *conn)
1921 .await
1922 .map_err(map_sqlx_error)?;
1923 if ins.rows_affected() == 0 {
1924 return Err(EngineError::Validation {
1933 kind: ValidationKind::InvalidInput,
1934 detail: format!("dependency_already_exists:edge_id={edge_uuid}"),
1935 });
1936 }
1937
1938 let new_rev: i64 = sqlx::query_scalar::<_, i64>(
1940 "SELECT graph_revision FROM ff_flow_core \
1941 WHERE partition_key = ?1 AND flow_id = ?2",
1942 )
1943 .bind(part)
1944 .bind(flow_uuid)
1945 .fetch_one(&mut *conn)
1946 .await
1947 .map_err(map_sqlx_error)?;
1948
1949 Ok(StageDependencyEdgeResult::Staged {
1950 edge_id: args.edge_id.clone(),
1951 new_graph_revision: u64::try_from(new_rev).unwrap_or(0),
1952 })
1953 }
1954 .await;
1955
1956 match work {
1957 Ok(res) => {
1958 commit_or_rollback(&mut conn).await?;
1959 Ok(res)
1960 }
1961 Err(e) => {
1962 rollback_quiet(&mut conn).await;
1963 Err(e)
1964 }
1965 }
1966}
1967
1968#[cfg(feature = "core")]
1969async fn apply_dependency_to_child_impl(
1970 pool: &SqlitePool,
1971 args: &ApplyDependencyToChildArgs,
1972) -> Result<ApplyDependencyToChildResult, EngineError> {
1973 let part: i64 = 0;
1974 let flow_uuid: Uuid = args.flow_id.0;
1975 let edge_uuid: Uuid = args.edge_id.0;
1976 let (down_part, downstream_uuid) = split_exec_id(&args.downstream_execution_id)?;
1977 if down_part != part {
1978 return Err(EngineError::Validation {
1979 kind: ValidationKind::InvalidInput,
1980 detail: "execution partition mismatch under single-writer SQLite".into(),
1981 });
1982 }
1983 let now_ms = args.now.0;
1984
1985 let mut conn = begin_immediate(pool).await?;
1986 let work = async {
1987 let row = sqlx::query(q_flow_staging::SELECT_EDGE_POLICY_SQL)
1989 .bind(part)
1990 .bind(flow_uuid)
1991 .bind(edge_uuid)
1992 .fetch_optional(&mut *conn)
1993 .await
1994 .map_err(map_sqlx_error)?;
1995 let Some(row) = row else {
1996 return Err(EngineError::Validation {
1997 kind: ValidationKind::InvalidInput,
1998 detail: "edge_not_found".into(),
1999 });
2000 };
2001 let policy_text: String = row.try_get("policy").map_err(map_sqlx_error)?;
2002 let mut policy: serde_json::Value =
2003 serde_json::from_str(&policy_text).map_err(|e| EngineError::Validation {
2004 kind: ValidationKind::Corruption,
2005 detail: format!("ff_edge.policy: {e}"),
2006 })?;
2007
2008 let already_applied = policy
2010 .get("applied_at_ms")
2011 .and_then(|v| v.as_i64())
2012 .is_some();
2013 if already_applied {
2014 return Ok(ApplyDependencyToChildResult::AlreadyApplied);
2015 }
2016
2017 if let Some(obj) = policy.as_object_mut() {
2019 obj.insert("applied_at_ms".into(), serde_json::json!(now_ms));
2020 obj.insert("edge_state".into(), serde_json::json!("applied"));
2021 }
2022 let new_policy_text = policy.to_string();
2023 sqlx::query(q_flow_staging::UPDATE_EDGE_POLICY_SQL)
2024 .bind(part)
2025 .bind(flow_uuid)
2026 .bind(edge_uuid)
2027 .bind(&new_policy_text)
2028 .execute(&mut *conn)
2029 .await
2030 .map_err(map_sqlx_error)?;
2031
2032 let default_group_policy = serde_json::json!({ "kind": "all_of" }).to_string();
2034 sqlx::query(q_flow_staging::UPSERT_EDGE_GROUP_APPLY_SQL)
2035 .bind(part)
2036 .bind(flow_uuid)
2037 .bind(downstream_uuid)
2038 .bind(&default_group_policy)
2039 .execute(&mut *conn)
2040 .await
2041 .map_err(map_sqlx_error)?;
2042
2043 let unsatisfied: i64 =
2045 sqlx::query_scalar(q_flow_staging::SELECT_EDGE_GROUP_RUNNING_COUNT_SQL)
2046 .bind(part)
2047 .bind(flow_uuid)
2048 .bind(downstream_uuid)
2049 .fetch_one(&mut *conn)
2050 .await
2051 .map_err(map_sqlx_error)?;
2052
2053 Ok(ApplyDependencyToChildResult::Applied {
2054 unsatisfied_count: u32::try_from(unsatisfied.max(0)).unwrap_or(0),
2055 })
2056 }
2057 .await;
2058
2059 match work {
2060 Ok(res) => {
2061 commit_or_rollback(&mut conn).await?;
2062 Ok(res)
2063 }
2064 Err(e) => {
2065 rollback_quiet(&mut conn).await;
2066 Err(e)
2067 }
2068 }
2069}
2070
2071fn cancel_policy_to_str(p: CancelFlowPolicy) -> &'static str {
2072 match p {
2073 CancelFlowPolicy::FlowOnly => "cancel_flow_only",
2074 CancelFlowPolicy::CancelAll => "cancel_all",
2075 CancelFlowPolicy::CancelPending => "cancel_pending",
2076 _ => "cancel_flow_only",
2087 }
2088}
2089
2090async fn cancel_flow_impl(
2091 pool: &SqlitePool,
2092 pubsub: &PubSub,
2093 id: &FlowId,
2094 policy: CancelFlowPolicy,
2095) -> Result<CancelFlowResult, EngineError> {
2096 let part: i64 = 0;
2097 let flow_uuid: Uuid = id.0;
2098 let policy_str = cancel_policy_to_str(policy);
2099 let now_ms = now_ms();
2100
2101 let mut conn = begin_immediate(pool).await?;
2102 let work: Result<(CancelFlowResult, Vec<PendingEmit>), EngineError> = async {
2103 let flip = sqlx::query(q_flow::UPDATE_FLOW_CORE_CANCEL_SQL)
2105 .bind(part)
2106 .bind(flow_uuid)
2107 .bind(now_ms)
2108 .bind(policy_str)
2109 .execute(&mut *conn)
2110 .await
2111 .map_err(map_sqlx_error)?;
2112
2113 if flip.rows_affected() == 0 {
2114 return Ok((
2117 CancelFlowResult::Cancelled {
2118 cancellation_policy: policy_str.to_owned(),
2119 member_execution_ids: Vec::new(),
2120 },
2121 Vec::new(),
2122 ));
2123 }
2124
2125 let member_rows: Vec<Uuid> = if matches!(policy, CancelFlowPolicy::FlowOnly) {
2127 Vec::new()
2128 } else {
2129 let sql = match policy {
2130 CancelFlowPolicy::CancelPending => q_flow::SELECT_FLOW_MEMBERS_CANCEL_PENDING_SQL,
2131 _ => q_flow::SELECT_FLOW_MEMBERS_CANCEL_ALL_SQL,
2132 };
2133 sqlx::query_scalar::<_, Uuid>(sql)
2134 .bind(part)
2135 .bind(flow_uuid)
2136 .fetch_all(&mut *conn)
2137 .await
2138 .map_err(map_sqlx_error)?
2139 };
2140
2141 let mut member_execution_ids: Vec<String> = Vec::with_capacity(member_rows.len());
2142 let mut emits: Vec<PendingEmit> = Vec::new();
2143 for exec_uuid in &member_rows {
2144 sqlx::query(q_flow::UPDATE_EXEC_CORE_CANCEL_MEMBER_SQL)
2145 .bind(part)
2146 .bind(exec_uuid)
2147 .bind(now_ms)
2148 .execute(&mut *conn)
2149 .await
2150 .map_err(map_sqlx_error)?;
2151
2152 sqlx::query(q_flow::UPDATE_ATTEMPT_CLEAR_OUTCOME_FOR_CURRENT_SQL)
2157 .bind(part)
2158 .bind(exec_uuid)
2159 .execute(&mut *conn)
2160 .await
2161 .map_err(map_sqlx_error)?;
2162
2163 let completion_ev =
2166 insert_completion_event_ev(&mut conn, part, *exec_uuid, "cancelled", now_ms)
2167 .await?;
2168 emits.push((OutboxChannel::Completion, completion_ev));
2169 let lease_ev =
2170 insert_lease_event(&mut conn, part, *exec_uuid, "revoked", now_ms).await?;
2171 emits.push((OutboxChannel::LeaseHistory, lease_ev));
2172
2173 member_execution_ids.push(format!("{{fp:{part}}}:{exec_uuid}"));
2174 }
2175
2176 if matches!(policy, CancelFlowPolicy::CancelPending) {
2178 sqlx::query(q_flow::INSERT_PENDING_CANCEL_GROUPS_SQL)
2179 .bind(part)
2180 .bind(flow_uuid)
2181 .bind(now_ms)
2182 .execute(&mut *conn)
2183 .await
2184 .map_err(map_sqlx_error)?;
2185 }
2186
2187 Ok((
2188 CancelFlowResult::Cancelled {
2189 cancellation_policy: policy_str.to_owned(),
2190 member_execution_ids,
2191 },
2192 emits,
2193 ))
2194 }
2195 .await;
2196
2197 match work {
2198 Ok((res, emits)) => {
2199 commit_or_rollback(&mut conn).await?;
2200 dispatch_pending_emits(pubsub, &emits);
2201 Ok(res)
2202 }
2203 Err(e) => {
2204 rollback_quiet(&mut conn).await;
2205 Err(e)
2206 }
2207 }
2208}
2209
2210pub(crate) struct SqliteBackendInner {
2214 #[allow(dead_code)]
2218 pub(crate) pool: SqlitePool,
2219 pub(crate) pubsub: PubSub,
2221 #[allow(dead_code)]
2225 pub(crate) key: PathBuf,
2226 #[allow(dead_code)]
2240 pub(crate) memory_sentinel: Option<std::sync::Mutex<Option<sqlx::SqliteConnection>>>,
2241 pub(crate) scanner_handle: std::sync::OnceLock<crate::scanner_supervisor::SqliteScannerHandle>,
2247}
2248
2249fn sqlite_supports_base() -> Supports {
2267 let mut s = Supports::none();
2268
2269 s.cancel_flow_wait_timeout = true;
2275 s.cancel_flow_wait_indefinite = true;
2276
2277 s.rotate_waitpoint_hmac_secret_all = true;
2279 s.seed_waitpoint_hmac_secret = true;
2280
2281 s.subscribe_lease_history = true;
2283 s.subscribe_completion = true;
2284 s.subscribe_signal_delivery = true;
2285
2286 s.stream_durable_summary = true;
2288 s.stream_best_effort_live = true;
2289
2290 s.prepare = true;
2295
2296 s.cancel_execution = true;
2298 s.change_priority = true;
2299 s.replay_execution = true;
2300 s.revoke_lease = true;
2301 s.read_execution_state = true;
2302 s.read_execution_info = true;
2303 s.get_execution_result = true;
2304 s.budget_admin = true;
2305 s.quota_admin = true;
2306 s.list_pending_waitpoints = true;
2307 s.cancel_flow_header = true;
2308 s.ack_cancel_member = true;
2309
2310 s
2315}
2316
2317#[derive(Clone)]
2323pub struct SqliteBackend {
2324 inner: Arc<SqliteBackendInner>,
2325}
2326
2327impl std::fmt::Debug for SqliteBackend {
2328 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2329 f.debug_struct("SqliteBackend")
2330 .field("key", &self.inner.key)
2331 .finish()
2332 }
2333}
2334
2335impl SqliteBackend {
2336 pub async fn new(path: &str) -> Result<Arc<Self>, BackendError> {
2354 Self::new_with_tuning(path, 4, true).await
2355 }
2356
2357 pub async fn new_with_tuning(
2362 path: &str,
2363 pool_size: u32,
2364 wal_mode: bool,
2365 ) -> Result<Arc<Self>, BackendError> {
2366 if std::env::var("FF_DEV_MODE").as_deref() != Ok("1") {
2368 return Err(BackendError::RequiresDevMode);
2369 }
2370
2371 let is_memory = is_memory_uri(path);
2385 let effective_path: std::borrow::Cow<'_, str> = if path == ":memory:" {
2386 std::borrow::Cow::Borrowed("file::memory:?cache=shared")
2387 } else {
2388 std::borrow::Cow::Borrowed(path)
2389 };
2390
2391 let key = if is_memory {
2392 PathBuf::from(effective_path.as_ref())
2393 } else {
2394 std::fs::canonicalize(path).unwrap_or_else(|_| PathBuf::from(path))
2395 };
2396
2397 if let Some(existing) = registry::lookup(&key) {
2398 return Ok(Arc::new(Self { inner: existing }));
2402 }
2403
2404 let opts: SqliteConnectOptions = effective_path
2408 .parse::<SqliteConnectOptions>()
2409 .map_err(|e| BackendError::Valkey {
2410 kind: ff_core::engine_error::BackendErrorKind::Protocol,
2411 message: format!("sqlite connect-opts parse for {path:?}: {e}"),
2412 })?
2413 .create_if_missing(true);
2414
2415 let opts = if wal_mode && !is_memory {
2418 opts.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
2419 } else {
2420 opts
2421 };
2422
2423 let pool_max = pool_size.max(1);
2426 let pool = SqlitePoolOptions::new()
2427 .max_connections(pool_max)
2428 .connect_with(opts.clone())
2429 .await
2430 .map_err(|e| BackendError::Valkey {
2431 kind: ff_core::engine_error::BackendErrorKind::Transport,
2432 message: format!("sqlite pool connect for {path:?}: {e}"),
2433 })?;
2434
2435 let memory_sentinel = if is_memory {
2442 use sqlx::ConnectOptions;
2443 let conn = opts.connect().await.map_err(|e| BackendError::Valkey {
2444 kind: ff_core::engine_error::BackendErrorKind::Transport,
2445 message: format!("sqlite sentinel connect for {path:?}: {e}"),
2446 })?;
2447 Some(std::sync::Mutex::new(Some(conn)))
2448 } else {
2449 None
2450 };
2451
2452 tracing::warn!(
2455 "FlowFabric SQLite backend active (FF_DEV_MODE=1). \
2456 This backend is dev-only; single-writer, single-process, \
2457 not supported in production. See RFC-023."
2458 );
2459
2460 sqlx::migrate!("./migrations")
2465 .run(&pool)
2466 .await
2467 .map_err(|e| BackendError::Valkey {
2468 kind: ff_core::engine_error::BackendErrorKind::Protocol,
2469 message: format!("sqlite migrate for {path:?}: {e}"),
2470 })?;
2471
2472 let inner = Arc::new(SqliteBackendInner {
2473 pool,
2474 pubsub: PubSub::new(),
2475 key: key.clone(),
2476 memory_sentinel,
2477 scanner_handle: std::sync::OnceLock::new(),
2478 });
2479 let inner = registry::insert(key, inner);
2480 Ok(Arc::new(Self { inner }))
2481 }
2482
2483 #[allow(dead_code)]
2486 pub(crate) fn pool(&self) -> &SqlitePool {
2487 &self.inner.pool
2488 }
2489
2490 pub fn with_scanners(&self, cfg: crate::scanner_supervisor::SqliteScannerConfig) -> bool {
2499 let mut result = false;
2502 let _ = self.inner.scanner_handle.get_or_init(|| {
2503 result = true;
2504 crate::scanner_supervisor::spawn_scanners(self.inner.pool.clone(), cfg)
2505 });
2506 result
2507 }
2508
2509 #[doc(hidden)]
2514 pub async fn budget_reset_scan_tick_for_test(
2515 &self,
2516 now_ms: i64,
2517 ) -> Result<(u32, u32), EngineError> {
2518 let report = crate::reconcilers::budget_reset::scan_tick(&self.inner.pool, now_ms).await?;
2519 Ok((report.processed, report.errors))
2520 }
2521
2522 #[doc(hidden)]
2527 pub fn pool_for_test(&self) -> &SqlitePool {
2528 &self.inner.pool
2529 }
2530
2531 #[doc(hidden)]
2538 pub fn subscribe_completion_for_test(
2539 &self,
2540 ) -> tokio::sync::broadcast::Receiver<crate::pubsub::OutboxEvent> {
2541 self.inner.pubsub.completion.subscribe()
2542 }
2543
2544 #[doc(hidden)]
2548 #[cfg(test)]
2549 pub(crate) fn stream_frame_receiver_for_test(
2550 &self,
2551 ) -> tokio::sync::broadcast::Receiver<crate::pubsub::OutboxEvent> {
2552 self.inner.pubsub.stream_frame.subscribe()
2553 }
2554}
2555
2556#[async_trait]
2557impl EngineBackend for SqliteBackend {
2558 async fn shutdown_prepare(&self, grace: Duration) -> Result<(), EngineError> {
2565 if let Some(handle) = self.inner.scanner_handle.get() {
2566 let timed_out = handle.shutdown(grace).await;
2567 if timed_out > 0 {
2568 tracing::warn!(
2569 timed_out,
2570 ?grace,
2571 "sqlite scanner supervisor exceeded grace on shutdown"
2572 );
2573 }
2574 }
2575 Ok(())
2576 }
2577
2578 async fn claim(
2581 &self,
2582 lane: &LaneId,
2583 capabilities: &CapabilitySet,
2584 policy: ClaimPolicy,
2585 ) -> Result<Option<Handle>, EngineError> {
2586 let pool = &self.inner.pool;
2587 let pubsub = &self.inner.pubsub;
2588 retry_serializable(|| claim_impl(pool, pubsub, lane, capabilities, &policy)).await
2589 }
2590
2591 async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError> {
2592 let pool = &self.inner.pool;
2593 let pubsub = &self.inner.pubsub;
2594 retry_serializable(|| renew_impl(pool, pubsub, handle)).await
2595 }
2596
2597 async fn renew_lease(
2600 &self,
2601 args: ff_core::contracts::RenewLeaseArgs,
2602 ) -> Result<ff_core::contracts::RenewLeaseResult, EngineError> {
2603 let pool = &self.inner.pool;
2604 let pubsub = &self.inner.pubsub;
2605 retry_serializable(|| crate::typed_ops::renew_lease(pool, pubsub, args.clone())).await
2606 }
2607
2608 async fn complete_execution(
2609 &self,
2610 args: ff_core::contracts::CompleteExecutionArgs,
2611 ) -> Result<ff_core::contracts::CompleteExecutionResult, EngineError> {
2612 let pool = &self.inner.pool;
2613 let pubsub = &self.inner.pubsub;
2614 retry_serializable(|| crate::typed_ops::complete_execution(pool, pubsub, args.clone()))
2615 .await
2616 }
2617
2618 async fn fail_execution(
2619 &self,
2620 args: ff_core::contracts::FailExecutionArgs,
2621 ) -> Result<ff_core::contracts::FailExecutionResult, EngineError> {
2622 let pool = &self.inner.pool;
2623 let pubsub = &self.inner.pubsub;
2624 retry_serializable(|| crate::typed_ops::fail_execution(pool, pubsub, args.clone())).await
2625 }
2626
2627 async fn resume_execution(
2628 &self,
2629 args: ff_core::contracts::ResumeExecutionArgs,
2630 ) -> Result<ff_core::contracts::ResumeExecutionResult, EngineError> {
2631 let pool = &self.inner.pool;
2632 let pubsub = &self.inner.pubsub;
2633 retry_serializable(|| crate::typed_ops::resume_execution(pool, pubsub, args.clone())).await
2634 }
2635
2636 async fn evaluate_flow_eligibility(
2637 &self,
2638 args: ff_core::contracts::EvaluateFlowEligibilityArgs,
2639 ) -> Result<ff_core::contracts::EvaluateFlowEligibilityResult, EngineError> {
2640 crate::typed_ops::evaluate_flow_eligibility(&self.inner.pool, args).await
2642 }
2643
2644 async fn claim_execution(
2645 &self,
2646 args: ff_core::contracts::ClaimExecutionArgs,
2647 ) -> Result<ff_core::contracts::ClaimExecutionResult, EngineError> {
2648 let pool = &self.inner.pool;
2649 let pubsub = &self.inner.pubsub;
2650 let pc = ff_core::partition::PartitionConfig::default();
2651 retry_serializable(|| {
2652 crate::typed_ops::claim_execution(pool, &pc, pubsub, args.clone())
2653 })
2654 .await
2655 }
2656
2657 async fn check_admission(
2658 &self,
2659 quota_policy_id: &ff_core::types::QuotaPolicyId,
2660 _dimension: &str,
2661 args: ff_core::contracts::CheckAdmissionArgs,
2662 ) -> Result<ff_core::contracts::CheckAdmissionResult, EngineError> {
2663 let pool = &self.inner.pool;
2664 let pc = ff_core::partition::PartitionConfig::default();
2668 retry_serializable(|| {
2669 crate::typed_ops::check_admission(pool, &pc, quota_policy_id, args.clone())
2670 })
2671 .await
2672 }
2673
2674 async fn progress(
2675 &self,
2676 handle: &Handle,
2677 percent: Option<u8>,
2678 message: Option<String>,
2679 ) -> Result<(), EngineError> {
2680 let pool = &self.inner.pool;
2681 retry_serializable(|| progress_impl(pool, handle, percent, message.clone())).await
2682 }
2683
2684 async fn append_frame(
2685 &self,
2686 handle: &Handle,
2687 frame: Frame,
2688 ) -> Result<AppendFrameOutcome, EngineError> {
2689 let pool = &self.inner.pool;
2690 let pubsub = &self.inner.pubsub;
2691 retry_serializable(|| append_frame_impl(pool, pubsub, handle, frame.clone())).await
2692 }
2693
2694 async fn complete(&self, handle: &Handle, payload: Option<Vec<u8>>) -> Result<(), EngineError> {
2695 let pool = &self.inner.pool;
2696 let pubsub = &self.inner.pubsub;
2697 retry_serializable(|| complete_impl(pool, pubsub, handle, payload.clone())).await
2698 }
2699
2700 async fn fail(
2701 &self,
2702 handle: &Handle,
2703 reason: FailureReason,
2704 classification: FailureClass,
2705 ) -> Result<FailOutcome, EngineError> {
2706 let pool = &self.inner.pool;
2707 let pubsub = &self.inner.pubsub;
2708 retry_serializable(|| fail_impl(pool, pubsub, handle, reason.clone(), classification)).await
2709 }
2710
2711 async fn cancel(&self, _handle: &Handle, _reason: &str) -> Result<(), EngineError> {
2712 unavailable("sqlite.cancel")
2713 }
2714
2715 async fn suspend(
2716 &self,
2717 handle: &Handle,
2718 args: SuspendArgs,
2719 ) -> Result<SuspendOutcome, EngineError> {
2720 let pool = &self.inner.pool;
2721 let pubsub = &self.inner.pubsub;
2722 retry_serializable(|| crate::suspend_ops::suspend_impl(pool, pubsub, handle, args.clone()))
2723 .await
2724 }
2725
2726 async fn suspend_by_triple(
2727 &self,
2728 exec_id: ExecutionId,
2729 triple: LeaseFence,
2730 args: SuspendArgs,
2731 ) -> Result<SuspendOutcome, EngineError> {
2732 let pool = &self.inner.pool;
2733 let pubsub = &self.inner.pubsub;
2734 retry_serializable(|| {
2735 crate::suspend_ops::suspend_by_triple_impl(
2736 pool,
2737 pubsub,
2738 exec_id.clone(),
2739 triple.clone(),
2740 args.clone(),
2741 )
2742 })
2743 .await
2744 }
2745
2746 async fn create_waitpoint(
2747 &self,
2748 handle: &Handle,
2749 waitpoint_key: &str,
2750 expires_in: Duration,
2751 ) -> Result<PendingWaitpoint, EngineError> {
2752 let pool = &self.inner.pool;
2753 retry_serializable(|| {
2754 crate::suspend_ops::create_waitpoint_impl(pool, handle, waitpoint_key, expires_in)
2755 })
2756 .await
2757 }
2758
2759 #[cfg(feature = "core")]
2760 async fn read_waitpoint_token(
2761 &self,
2762 partition: PartitionKey,
2763 waitpoint_id: &ff_core::types::WaitpointId,
2764 ) -> Result<Option<String>, EngineError> {
2765 crate::reads::read_waitpoint_token_impl(&self.inner.pool, &partition, waitpoint_id).await
2766 }
2767
2768 async fn observe_signals(&self, handle: &Handle) -> Result<Vec<ResumeSignal>, EngineError> {
2769 let pool = &self.inner.pool;
2770 retry_serializable(|| crate::suspend_ops::observe_signals_impl(pool, handle)).await
2771 }
2772
2773 async fn claim_from_resume_grant(&self, token: ResumeToken) -> Result<Option<Handle>, EngineError> {
2774 let pool = &self.inner.pool;
2775 let pubsub = &self.inner.pubsub;
2776 retry_serializable(|| claim_from_reclaim_impl(pool, pubsub, &token)).await
2777 }
2778
2779 async fn issue_reclaim_grant(
2780 &self,
2781 args: IssueReclaimGrantArgs,
2782 ) -> Result<IssueReclaimGrantOutcome, EngineError> {
2783 let pool = &self.inner.pool;
2784 retry_serializable(|| crate::reclaim::issue_reclaim_grant_impl(pool, &args)).await
2785 }
2786
2787 async fn reclaim_execution(
2788 &self,
2789 args: ReclaimExecutionArgs,
2790 ) -> Result<ReclaimExecutionOutcome, EngineError> {
2791 let pool = &self.inner.pool;
2792 let pubsub = &self.inner.pubsub;
2793 retry_serializable(|| crate::reclaim::reclaim_execution_impl(pool, pubsub, &args)).await
2794 }
2795
2796 async fn delay(&self, _handle: &Handle, _delay_until: TimestampMs) -> Result<(), EngineError> {
2797 unavailable("sqlite.delay")
2798 }
2799
2800 async fn wait_children(&self, _handle: &Handle) -> Result<(), EngineError> {
2801 unavailable("sqlite.wait_children")
2802 }
2803
2804 async fn describe_execution(
2807 &self,
2808 _id: &ExecutionId,
2809 ) -> Result<Option<ExecutionSnapshot>, EngineError> {
2810 unavailable("sqlite.describe_execution")
2811 }
2812
2813 async fn read_execution_context(
2814 &self,
2815 execution_id: &ExecutionId,
2816 ) -> Result<ExecutionContext, EngineError> {
2817 crate::reads::read_execution_context_impl(&self.inner.pool, execution_id).await
2818 }
2819
2820 async fn read_current_attempt_index(
2821 &self,
2822 execution_id: &ExecutionId,
2823 ) -> Result<ff_core::types::AttemptIndex, EngineError> {
2824 crate::reads::read_current_attempt_index_impl(&self.inner.pool, execution_id).await
2825 }
2826
2827 async fn read_total_attempt_count(
2828 &self,
2829 execution_id: &ExecutionId,
2830 ) -> Result<ff_core::types::AttemptIndex, EngineError> {
2831 crate::reads::read_total_attempt_count_impl(&self.inner.pool, execution_id).await
2832 }
2833
2834 async fn describe_flow(&self, _id: &FlowId) -> Result<Option<FlowSnapshot>, EngineError> {
2835 unavailable("sqlite.describe_flow")
2836 }
2837
2838 async fn set_execution_tag(
2839 &self,
2840 execution_id: &ExecutionId,
2841 key: &str,
2842 value: &str,
2843 ) -> Result<(), EngineError> {
2844 ff_core::engine_backend::validate_tag_key(key)?;
2845 crate::reads::set_execution_tag_impl(&self.inner.pool, execution_id, key, value).await
2846 }
2847
2848 async fn set_flow_tag(
2849 &self,
2850 flow_id: &FlowId,
2851 key: &str,
2852 value: &str,
2853 ) -> Result<(), EngineError> {
2854 ff_core::engine_backend::validate_tag_key(key)?;
2855 crate::reads::set_flow_tag_impl(&self.inner.pool, flow_id, key, value).await
2856 }
2857
2858 async fn get_execution_tag(
2859 &self,
2860 execution_id: &ExecutionId,
2861 key: &str,
2862 ) -> Result<Option<String>, EngineError> {
2863 ff_core::engine_backend::validate_tag_key(key)?;
2864 crate::reads::get_execution_tag_impl(&self.inner.pool, execution_id, key).await
2865 }
2866
2867 async fn get_flow_tag(
2868 &self,
2869 flow_id: &FlowId,
2870 key: &str,
2871 ) -> Result<Option<String>, EngineError> {
2872 ff_core::engine_backend::validate_tag_key(key)?;
2873 crate::reads::get_flow_tag_impl(&self.inner.pool, flow_id, key).await
2874 }
2875
2876 async fn get_execution_namespace(
2877 &self,
2878 execution_id: &ExecutionId,
2879 ) -> Result<Option<String>, EngineError> {
2880 crate::reads::get_execution_namespace_impl(&self.inner.pool, execution_id).await
2881 }
2882
2883 #[cfg(feature = "core")]
2884 async fn list_edges(
2885 &self,
2886 _flow_id: &FlowId,
2887 _direction: EdgeDirection,
2888 ) -> Result<Vec<EdgeSnapshot>, EngineError> {
2889 unavailable("sqlite.list_edges")
2890 }
2891
2892 #[cfg(feature = "core")]
2893 async fn describe_edge(
2894 &self,
2895 _flow_id: &FlowId,
2896 _edge_id: &EdgeId,
2897 ) -> Result<Option<EdgeSnapshot>, EngineError> {
2898 unavailable("sqlite.describe_edge")
2899 }
2900
2901 #[cfg(feature = "core")]
2902 async fn resolve_execution_flow_id(
2903 &self,
2904 _eid: &ExecutionId,
2905 ) -> Result<Option<FlowId>, EngineError> {
2906 unavailable("sqlite.resolve_execution_flow_id")
2907 }
2908
2909 #[cfg(feature = "core")]
2910 async fn list_flows(
2911 &self,
2912 _partition: PartitionKey,
2913 _cursor: Option<FlowId>,
2914 _limit: usize,
2915 ) -> Result<ListFlowsPage, EngineError> {
2916 unavailable("sqlite.list_flows")
2917 }
2918
2919 #[cfg(feature = "core")]
2920 async fn list_lanes(
2921 &self,
2922 _cursor: Option<LaneId>,
2923 _limit: usize,
2924 ) -> Result<ListLanesPage, EngineError> {
2925 unavailable("sqlite.list_lanes")
2926 }
2927
2928 #[cfg(feature = "core")]
2929 async fn list_suspended(
2930 &self,
2931 _partition: PartitionKey,
2932 _cursor: Option<ExecutionId>,
2933 _limit: usize,
2934 ) -> Result<ListSuspendedPage, EngineError> {
2935 unavailable("sqlite.list_suspended")
2936 }
2937
2938 #[cfg(feature = "core")]
2939 async fn list_executions(
2940 &self,
2941 _partition: PartitionKey,
2942 _cursor: Option<ExecutionId>,
2943 _limit: usize,
2944 ) -> Result<ListExecutionsPage, EngineError> {
2945 unavailable("sqlite.list_executions")
2946 }
2947
2948 #[cfg(feature = "core")]
2949 async fn deliver_signal(
2950 &self,
2951 args: DeliverSignalArgs,
2952 ) -> Result<DeliverSignalResult, EngineError> {
2953 let pool = &self.inner.pool;
2954 let pubsub = &self.inner.pubsub;
2955 retry_serializable(|| crate::suspend_ops::deliver_signal_impl(pool, pubsub, args.clone()))
2956 .await
2957 }
2958
2959 #[cfg(feature = "core")]
2960 async fn claim_resumed_execution(
2961 &self,
2962 args: ClaimResumedExecutionArgs,
2963 ) -> Result<ClaimResumedExecutionResult, EngineError> {
2964 let pool = &self.inner.pool;
2965 let pubsub = &self.inner.pubsub;
2966 retry_serializable(|| {
2967 crate::suspend_ops::claim_resumed_execution_impl(pool, pubsub, args.clone())
2968 })
2969 .await
2970 }
2971
2972 async fn cancel_flow(
2973 &self,
2974 id: &FlowId,
2975 policy: CancelFlowPolicy,
2976 _wait: CancelFlowWait,
2977 ) -> Result<CancelFlowResult, EngineError> {
2978 let pool = &self.inner.pool;
2984 let pubsub = &self.inner.pubsub;
2985 retry_serializable(|| cancel_flow_impl(pool, pubsub, id, policy)).await
2986 }
2987
2988 #[cfg(feature = "core")]
2989 async fn set_edge_group_policy(
2990 &self,
2991 _flow_id: &FlowId,
2992 _downstream_execution_id: &ExecutionId,
2993 _policy: EdgeDependencyPolicy,
2994 ) -> Result<SetEdgeGroupPolicyResult, EngineError> {
2995 unavailable("sqlite.set_edge_group_policy")
2996 }
2997
2998 async fn report_usage(
3007 &self,
3008 _handle: &Handle,
3009 budget: &BudgetId,
3010 dimensions: UsageDimensions,
3011 ) -> Result<ReportUsageResult, EngineError> {
3012 crate::budget::report_usage_impl(&self.inner.pool, budget, dimensions).await
3013 }
3014
3015 #[cfg(feature = "core")]
3016 async fn create_budget(
3017 &self,
3018 args: CreateBudgetArgs,
3019 ) -> Result<CreateBudgetResult, EngineError> {
3020 crate::budget::create_budget_impl(&self.inner.pool, args).await
3021 }
3022
3023 #[cfg(feature = "core")]
3024 async fn reset_budget(
3025 &self,
3026 args: ResetBudgetArgs,
3027 ) -> Result<ResetBudgetResult, EngineError> {
3028 crate::budget::reset_budget_impl(&self.inner.pool, args).await
3029 }
3030
3031 #[cfg(feature = "core")]
3032 async fn create_quota_policy(
3033 &self,
3034 args: CreateQuotaPolicyArgs,
3035 ) -> Result<CreateQuotaPolicyResult, EngineError> {
3036 crate::budget::create_quota_policy_impl(&self.inner.pool, args).await
3037 }
3038
3039 #[cfg(feature = "core")]
3040 async fn get_budget_status(
3041 &self,
3042 id: &BudgetId,
3043 ) -> Result<BudgetStatus, EngineError> {
3044 crate::budget::get_budget_status_impl(&self.inner.pool, id).await
3045 }
3046
3047 #[cfg(feature = "core")]
3048 async fn report_usage_admin(
3049 &self,
3050 budget_id: &BudgetId,
3051 args: ReportUsageAdminArgs,
3052 ) -> Result<ReportUsageResult, EngineError> {
3053 crate::budget::report_usage_admin_impl(&self.inner.pool, budget_id, args).await
3054 }
3055
3056 #[cfg(feature = "core")]
3059 async fn record_spend(
3060 &self,
3061 args: ff_core::contracts::RecordSpendArgs,
3062 ) -> Result<ReportUsageResult, EngineError> {
3063 let pool = &self.inner.pool;
3064 retry_serializable(|| crate::typed_ops::record_spend(pool, args.clone())).await
3065 }
3066
3067 #[cfg(feature = "core")]
3068 async fn release_budget(
3069 &self,
3070 args: ff_core::contracts::ReleaseBudgetArgs,
3071 ) -> Result<(), EngineError> {
3072 let pool = &self.inner.pool;
3073 retry_serializable(|| crate::typed_ops::release_budget(pool, args.clone())).await
3074 }
3075
3076 #[cfg(feature = "core")]
3077 async fn deliver_approval_signal(
3078 &self,
3079 args: ff_core::contracts::DeliverApprovalSignalArgs,
3080 ) -> Result<ff_core::contracts::DeliverSignalResult, EngineError> {
3081 let pool = &self.inner.pool;
3082 let pubsub = &self.inner.pubsub;
3083 retry_serializable(|| {
3084 crate::typed_ops::deliver_approval_signal(pool, pubsub, args.clone())
3085 })
3086 .await
3087 }
3088
3089 #[cfg(feature = "core")]
3090 async fn issue_grant_and_claim(
3091 &self,
3092 args: ff_core::contracts::IssueGrantAndClaimArgs,
3093 ) -> Result<ff_core::contracts::ClaimGrantOutcome, EngineError> {
3094 let pool = &self.inner.pool;
3095 let pubsub = &self.inner.pubsub;
3096 retry_serializable(|| crate::typed_ops::issue_grant_and_claim(pool, pubsub, args.clone()))
3097 .await
3098 }
3099
3100 #[cfg(feature = "streaming")]
3101 async fn read_stream(
3102 &self,
3103 execution_id: &ExecutionId,
3104 attempt_index: AttemptIndex,
3105 from: StreamCursor,
3106 to: StreamCursor,
3107 count_limit: u64,
3108 ) -> Result<StreamFrames, EngineError> {
3109 let pool = &self.inner.pool;
3110 read_stream_impl(pool, execution_id, attempt_index, from, to, count_limit).await
3111 }
3112
3113 #[cfg(feature = "streaming")]
3114 async fn tail_stream(
3115 &self,
3116 execution_id: &ExecutionId,
3117 attempt_index: AttemptIndex,
3118 after: StreamCursor,
3119 block_ms: u64,
3120 count_limit: u64,
3121 visibility: TailVisibility,
3122 ) -> Result<StreamFrames, EngineError> {
3123 let pool = &self.inner.pool;
3124 let pubsub = &self.inner.pubsub;
3125 tail_stream_impl(
3126 pool,
3127 pubsub,
3128 execution_id,
3129 attempt_index,
3130 after,
3131 block_ms,
3132 count_limit,
3133 visibility,
3134 )
3135 .await
3136 }
3137
3138 #[cfg(feature = "streaming")]
3139 async fn read_summary(
3140 &self,
3141 execution_id: &ExecutionId,
3142 attempt_index: AttemptIndex,
3143 ) -> Result<Option<SummaryDocument>, EngineError> {
3144 let pool = &self.inner.pool;
3145 read_summary_impl(pool, execution_id, attempt_index).await
3146 }
3147
3148 #[cfg(feature = "core")]
3156 async fn create_execution(
3157 &self,
3158 args: CreateExecutionArgs,
3159 ) -> Result<CreateExecutionResult, EngineError> {
3160 let pool = &self.inner.pool;
3161 retry_serializable(|| create_execution_impl(pool, &args)).await
3162 }
3163
3164 #[cfg(feature = "core")]
3165 async fn create_flow(&self, args: CreateFlowArgs) -> Result<CreateFlowResult, EngineError> {
3166 let pool = &self.inner.pool;
3167 retry_serializable(|| create_flow_impl(pool, &args)).await
3168 }
3169
3170 #[cfg(feature = "core")]
3171 async fn add_execution_to_flow(
3172 &self,
3173 args: AddExecutionToFlowArgs,
3174 ) -> Result<AddExecutionToFlowResult, EngineError> {
3175 let pool = &self.inner.pool;
3176 retry_serializable(|| add_execution_to_flow_impl(pool, &args)).await
3177 }
3178
3179 #[cfg(feature = "core")]
3180 async fn stage_dependency_edge(
3181 &self,
3182 args: StageDependencyEdgeArgs,
3183 ) -> Result<StageDependencyEdgeResult, EngineError> {
3184 let pool = &self.inner.pool;
3185 retry_serializable(|| stage_dependency_edge_impl(pool, &args)).await
3186 }
3187
3188 #[cfg(feature = "core")]
3189 async fn apply_dependency_to_child(
3190 &self,
3191 args: ApplyDependencyToChildArgs,
3192 ) -> Result<ApplyDependencyToChildResult, EngineError> {
3193 let pool = &self.inner.pool;
3194 retry_serializable(|| apply_dependency_to_child_impl(pool, &args)).await
3195 }
3196
3197 #[cfg(feature = "core")]
3206 async fn cancel_execution(
3207 &self,
3208 args: CancelExecutionArgs,
3209 ) -> Result<CancelExecutionResult, EngineError> {
3210 crate::operator::cancel_execution_impl(&self.inner.pool, &self.inner.pubsub, args).await
3211 }
3212
3213 #[cfg(feature = "core")]
3214 async fn revoke_lease(
3215 &self,
3216 args: RevokeLeaseArgs,
3217 ) -> Result<RevokeLeaseResult, EngineError> {
3218 crate::operator::revoke_lease_impl(&self.inner.pool, &self.inner.pubsub, args).await
3219 }
3220
3221 #[cfg(feature = "core")]
3222 async fn change_priority(
3223 &self,
3224 args: ChangePriorityArgs,
3225 ) -> Result<ChangePriorityResult, EngineError> {
3226 crate::operator::change_priority_impl(&self.inner.pool, &self.inner.pubsub, args).await
3227 }
3228
3229 #[cfg(feature = "core")]
3230 async fn replay_execution(
3231 &self,
3232 args: ReplayExecutionArgs,
3233 ) -> Result<ReplayExecutionResult, EngineError> {
3234 crate::operator::replay_execution_impl(&self.inner.pool, &self.inner.pubsub, args).await
3235 }
3236
3237 #[cfg(feature = "core")]
3246 async fn read_execution_state(
3247 &self,
3248 id: &ExecutionId,
3249 ) -> Result<Option<PublicState>, EngineError> {
3250 crate::reads::read_execution_state_impl(&self.inner.pool, id).await
3251 }
3252
3253 #[cfg(feature = "core")]
3254 async fn read_execution_info(
3255 &self,
3256 id: &ExecutionId,
3257 ) -> Result<Option<ExecutionInfo>, EngineError> {
3258 crate::reads::read_execution_info_impl(&self.inner.pool, id).await
3259 }
3260
3261 async fn get_execution_result(
3262 &self,
3263 id: &ExecutionId,
3264 ) -> Result<Option<Vec<u8>>, EngineError> {
3265 crate::reads::get_execution_result_impl(&self.inner.pool, id).await
3266 }
3267
3268 #[cfg(feature = "core")]
3278 async fn cancel_flow_header(
3279 &self,
3280 args: CancelFlowArgs,
3281 ) -> Result<CancelFlowHeader, EngineError> {
3282 crate::operator::cancel_flow_header_impl(&self.inner.pool, &self.inner.pubsub, args).await
3283 }
3284
3285 #[cfg(feature = "core")]
3286 async fn ack_cancel_member(
3287 &self,
3288 flow_id: &FlowId,
3289 execution_id: &ExecutionId,
3290 ) -> Result<(), EngineError> {
3291 crate::operator::ack_cancel_member_impl(
3292 &self.inner.pool,
3293 flow_id.clone(),
3294 execution_id.clone(),
3295 )
3296 .await
3297 }
3298
3299 #[cfg(feature = "core")]
3302 async fn list_pending_waitpoints(
3303 &self,
3304 args: ListPendingWaitpointsArgs,
3305 ) -> Result<ListPendingWaitpointsResult, EngineError> {
3306 crate::suspend_ops::list_pending_waitpoints_impl(&self.inner.pool, args).await
3307 }
3308
3309 async fn subscribe_completion(
3323 &self,
3324 cursor: ff_core::stream_subscribe::StreamCursor,
3325 filter: &ff_core::backend::ScannerFilter,
3326 ) -> Result<ff_core::stream_events::CompletionSubscription, EngineError> {
3327 let pool = self.inner.pool.clone();
3328 let wakeup = self.inner.pubsub.completion.subscribe();
3329 crate::completion_subscribe::subscribe(pool, wakeup, cursor, filter.clone()).await
3330 }
3331
3332 async fn subscribe_lease_history(
3333 &self,
3334 cursor: ff_core::stream_subscribe::StreamCursor,
3335 filter: &ff_core::backend::ScannerFilter,
3336 ) -> Result<ff_core::stream_events::LeaseHistorySubscription, EngineError> {
3337 let pool = self.inner.pool.clone();
3338 let wakeup = self.inner.pubsub.lease_history.subscribe();
3339 crate::lease_event_subscribe::subscribe(pool, wakeup, cursor, filter.clone()).await
3340 }
3341
3342 async fn subscribe_signal_delivery(
3343 &self,
3344 cursor: ff_core::stream_subscribe::StreamCursor,
3345 filter: &ff_core::backend::ScannerFilter,
3346 ) -> Result<ff_core::stream_events::SignalDeliverySubscription, EngineError> {
3347 let pool = self.inner.pool.clone();
3348 let wakeup = self.inner.pubsub.signal_delivery.subscribe();
3349 crate::signal_delivery_subscribe::subscribe(pool, wakeup, cursor, filter.clone()).await
3350 }
3351
3352 async fn seed_waitpoint_hmac_secret(
3355 &self,
3356 args: SeedWaitpointHmacSecretArgs,
3357 ) -> Result<SeedOutcome, EngineError> {
3358 let pool = &self.inner.pool;
3359 retry_serializable(|| {
3360 crate::suspend_ops::seed_waitpoint_hmac_secret_impl(pool, args.clone())
3361 })
3362 .await
3363 }
3364
3365 async fn rotate_waitpoint_hmac_secret_all(
3366 &self,
3367 args: RotateWaitpointHmacSecretAllArgs,
3368 ) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
3369 let pool = &self.inner.pool;
3370 retry_serializable(|| {
3371 crate::suspend_ops::rotate_waitpoint_hmac_secret_all_impl(pool, args.clone())
3372 })
3373 .await
3374 }
3375
3376 fn backend_label(&self) -> &'static str {
3379 "sqlite"
3380 }
3381
3382 fn capabilities(&self) -> Capabilities {
3383 Capabilities::new(
3392 BackendIdentity::new(
3393 "sqlite",
3394 Version::new(
3395 env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap_or(0),
3396 env!("CARGO_PKG_VERSION_MINOR").parse().unwrap_or(0),
3397 env!("CARGO_PKG_VERSION_PATCH").parse().unwrap_or(0),
3398 ),
3399 "Phase-4",
3400 ),
3401 sqlite_supports_base(),
3402 )
3403 }
3404
3405 async fn prepare(&self) -> Result<PrepareOutcome, EngineError> {
3406 Ok(PrepareOutcome::NoOp)
3410 }
3411
3412 async fn read_exec_core_fields(
3415 &self,
3416 partition: ff_core::partition::Partition,
3417 execution_id: &ff_core::types::ExecutionId,
3418 fields: &[&str],
3419 ) -> Result<std::collections::HashMap<String, Option<String>>, EngineError> {
3420 if fields.is_empty() {
3421 return Ok(std::collections::HashMap::new());
3422 }
3423 let (part, exec_uuid) = split_exec_id(execution_id)?;
3424 if part as u16 != partition.index {
3425 return Err(EngineError::Validation {
3426 kind: ff_core::engine_error::ValidationKind::InvalidInput,
3427 detail: format!(
3428 "read_exec_core_fields: partition mismatch (arg={}, eid={})",
3429 partition.index, part
3430 ),
3431 });
3432 }
3433
3434 let mut projections: Vec<String> = Vec::with_capacity(fields.len());
3437 for field in fields {
3438 let expr = match *field {
3439 "lane_id" | "lifecycle_phase" | "ownership_state" | "eligibility_state"
3440 | "public_state" | "attempt_state" | "blocking_reason" | "cancellation_reason"
3441 | "cancelled_by" => format!("CAST({field} AS TEXT)"),
3442 "attempt_index" => "CAST(attempt_index AS TEXT)".to_string(),
3443 "flow_id" => "CAST(flow_id AS TEXT)".to_string(),
3444 "priority" => "CAST(priority AS TEXT)".to_string(),
3445 "created_at_ms" => "CAST(created_at_ms AS TEXT)".to_string(),
3446 "terminal_at_ms" => "CAST(terminal_at_ms AS TEXT)".to_string(),
3447 "deadline_at_ms" => "CAST(deadline_at_ms AS TEXT)".to_string(),
3448 "current_attempt_index" => "CAST(attempt_index AS TEXT)".to_string(),
3449 "completed_at" => "CAST(terminal_at_ms AS TEXT)".to_string(),
3450 "cancel_reason" => "CAST(cancellation_reason AS TEXT)".to_string(),
3451 "required_capabilities" => {
3452 "(SELECT group_concat(capability, ',') \
3454 FROM ff_execution_capabilities \
3455 WHERE execution_id = ff_exec_core.execution_id)"
3456 .to_string()
3457 }
3458 other => match other {
3459 "current_waitpoint_id"
3460 | "current_worker_instance_id"
3461 | "budget_ids"
3462 | "quota_policy_id" => {
3463 format!("json_extract(raw_fields, '$.{other}')")
3464 }
3465 _ => "NULL".to_string(),
3466 },
3467 };
3468 projections.push(expr);
3469 }
3470 let projection_sql = projections.join(", ");
3471 let query = format!(
3472 "SELECT {projection_sql} FROM ff_exec_core \
3473 WHERE partition_key = ?1 AND execution_id = ?2"
3474 );
3475 let row_opt = sqlx::query(&query)
3476 .bind(part)
3477 .bind(exec_uuid)
3478 .fetch_optional(self.pool())
3479 .await
3480 .map_err(|e| EngineError::Transport {
3481 backend: "sqlite",
3482 source: format!("read_exec_core_fields: {e}").into(),
3483 })?;
3484
3485 let mut out = std::collections::HashMap::with_capacity(fields.len());
3486 if let Some(row) = row_opt {
3487 use sqlx::Row;
3488 for (idx, field) in fields.iter().enumerate() {
3489 let val: Option<String> =
3490 row.try_get(idx).map_err(|e| EngineError::Transport {
3491 backend: "sqlite",
3492 source: format!("read_exec_core_fields[{field}]: {e}").into(),
3493 })?;
3494 out.insert((*field).to_string(), val);
3495 }
3496 } else {
3497 for field in fields {
3498 out.insert((*field).to_string(), None);
3499 }
3500 }
3501 Ok(out)
3502 }
3503
3504 async fn server_time_ms(&self) -> Result<u64, EngineError> {
3507 let ms: i64 = sqlx::query_scalar(
3510 "SELECT CAST((julianday('now') - 2440587.5) * 86400000 AS INTEGER)",
3511 )
3512 .fetch_one(self.pool())
3513 .await
3514 .map_err(|e| EngineError::Transport {
3515 backend: "sqlite",
3516 source: format!("server_time_ms: {e}").into(),
3517 })?;
3518 if ms < 0 {
3519 return Err(EngineError::Transport {
3520 backend: "sqlite",
3521 source: "server_time_ms: negative epoch".into(),
3522 });
3523 }
3524 Ok(ms as u64)
3525 }
3526}
3527
3528#[cfg(test)]
3529mod tests {
3530 use super::is_memory_uri;
3531
3532 #[test]
3539 fn is_memory_detects_all_uri_forms() {
3540 assert!(is_memory_uri(":memory:"));
3542 assert!(is_memory_uri("file::memory:"));
3544 assert!(is_memory_uri("file::memory:?cache=shared"));
3545 assert!(is_memory_uri(
3547 "file:ff-test-abc123?mode=memory&cache=shared"
3548 ));
3549 assert!(is_memory_uri(
3550 "file:ff-test-00000000-0000-0000-0000-000000000000?mode=memory&cache=shared"
3551 ));
3552 assert!(!is_memory_uri("/tmp/ff.sqlite"));
3554 assert!(!is_memory_uri("./ff.sqlite"));
3555 assert!(!is_memory_uri("file:/tmp/ff.sqlite"));
3556 assert!(!is_memory_uri("file:ff-test?cache=shared"));
3557 assert!(!is_memory_uri("file:my_mode=memory_db.sqlite"));
3560 assert!(is_memory_uri("file:ff-test?cache=shared&mode=memory"));
3563 }
3564
3565 #[test]
3569 fn is_memory_uri_rejects_filename_with_mode_memory() {
3570 assert!(!is_memory_uri("file:my_mode=memory_db.sqlite"));
3571 assert!(!is_memory_uri("file:foo?mode=memory_extra"));
3575 }
3576
3577 #[test]
3579 fn is_memory_uri_accepts_query_param() {
3580 assert!(is_memory_uri("file:test?mode=memory"));
3581 }
3582
3583 #[test]
3586 fn is_memory_uri_accepts_shared_cache_form() {
3587 assert!(is_memory_uri(
3588 "file:ff-test-00000000-0000-0000-0000-000000000000?mode=memory&cache=shared"
3589 ));
3590 }
3591
3592 #[test]
3594 fn is_memory_uri_rejects_plain_file() {
3595 assert!(!is_memory_uri("file:./data.db"));
3596 }
3597}