1#![allow(clippy::result_large_err)]
20
21use std::sync::Arc;
22use std::time::Duration;
23
24use async_trait::async_trait;
25
26use ff_core::backend::{
27 AppendFrameOutcome, BackendConfig, CancelFlowPolicy, CancelFlowWait, CapabilitySet,
28 ClaimPolicy, FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal,
29 PendingWaitpoint, ResumeToken, ResumeSignal, SummaryDocument, TailVisibility,
30 UsageDimensions,
31};
32#[cfg(feature = "core")]
33use ff_core::contracts::{
34 AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
35 ApplyDependencyToChildResult, ClaimResumedExecutionArgs, ClaimResumedExecutionResult,
36 CreateExecutionArgs, CreateExecutionResult, CreateFlowArgs, CreateFlowResult,
37 DeliverApprovalSignalArgs, DeliverSignalArgs, DeliverSignalResult, EdgeDependencyPolicy,
38 EdgeDirection, EdgeSnapshot,
39 ListExecutionsPage, ListFlowsPage, ListLanesPage, ListPendingWaitpointsArgs,
40 ListPendingWaitpointsResult, ListSuspendedPage, SetEdgeGroupPolicyResult,
41 StageDependencyEdgeArgs, StageDependencyEdgeResult,
42};
43#[cfg(feature = "core")]
44use ff_core::state::PublicState;
45use ff_core::contracts::{
46 CancelFlowResult, ExecutionContext, ExecutionSnapshot, FlowSnapshot, IssueReclaimGrantArgs,
47 IssueReclaimGrantOutcome, ReclaimExecutionArgs, ReclaimExecutionOutcome, ReportUsageResult,
48 RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretAllResult, SeedOutcome,
49 SeedWaitpointHmacSecretArgs, SuspendArgs, SuspendOutcome,
50};
51#[cfg(feature = "core")]
52use ff_core::contracts::ExecutionInfo;
53#[cfg(feature = "core")]
55use ff_core::contracts::{
56 CancelExecutionArgs, CancelExecutionResult, RevokeLeaseArgs, RevokeLeaseResult,
57};
58#[cfg(feature = "core")]
60use ff_core::contracts::{
61 CancelFlowArgs, CancelFlowHeader, ChangePriorityArgs, ChangePriorityResult,
62 ReplayExecutionArgs, ReplayExecutionResult,
63};
64#[cfg(feature = "core")]
66use ff_core::contracts::{
67 BudgetStatus, CreateBudgetArgs, CreateBudgetResult, CreateQuotaPolicyArgs,
68 CreateQuotaPolicyResult, ReportUsageAdminArgs, ResetBudgetArgs, ResetBudgetResult,
69};
70#[cfg(feature = "streaming")]
71use ff_core::contracts::{StreamCursor, StreamFrames};
72use ff_core::engine_backend::{EngineBackend, ExpirePhase};
73use ff_core::engine_error::EngineError;
74#[cfg(feature = "core")]
75use ff_core::partition::PartitionKey;
76use ff_core::partition::{Partition, PartitionConfig};
77#[cfg(feature = "streaming")]
78use ff_core::types::AttemptIndex;
79#[cfg(feature = "core")]
80use ff_core::types::EdgeId;
81use ff_core::types::{BudgetId, ExecutionId, FlowId, LaneId, LeaseFence, TimestampMs};
82pub use sqlx::PgPool;
86
87#[cfg(feature = "core")]
88mod admin;
89pub mod attempt;
90pub mod budget;
91pub mod claim_grant;
92pub mod completion;
93#[cfg(feature = "core")]
94pub mod dispatch;
95pub mod error;
96pub mod exec_core;
97pub mod flow;
98#[cfg(feature = "core")]
99pub mod flow_staging;
100pub mod handle_codec;
101mod lease_event;
102mod lease_event_subscribe;
103pub mod listener;
104pub mod migrate;
105#[cfg(feature = "core")]
106pub mod operator;
107#[cfg(feature = "core")]
108mod operator_event;
109pub mod pool;
110#[cfg(feature = "core")]
111pub mod reconcilers;
112#[cfg(feature = "core")]
113pub mod scanner_supervisor;
114#[cfg(feature = "core")]
115pub mod scheduler;
116pub mod signal;
117mod signal_delivery_subscribe;
118mod signal_event;
119#[cfg(feature = "streaming")]
120pub mod stream;
121pub mod suspend;
122pub mod suspend_ops;
123#[cfg(feature = "core")]
124pub(crate) mod typed_ops;
125pub mod version;
126#[cfg(feature = "core")]
127pub mod worker_registry;
128
129pub use completion::{PostgresCompletionStream, COMPLETION_CHANNEL};
130pub use error::{map_sqlx_error, PostgresTransportError};
131pub use listener::StreamNotifier;
132pub use migrate::{apply_migrations, MigrationError};
133#[cfg(feature = "core")]
134pub use scanner_supervisor::{PostgresScannerConfig, PostgresScannerHandle};
135pub use version::check_schema_version;
136
137pub use ff_core::backend::PostgresConnection;
143
144fn postgres_supports_base() -> ff_core::capability::Supports {
174 let mut s = ff_core::capability::Supports::none();
175
176 s.cancel_flow_wait_timeout = true;
178 s.cancel_flow_wait_indefinite = true;
179
180 s.rotate_waitpoint_hmac_secret_all = true;
182 s.seed_waitpoint_hmac_secret = true;
183
184 s.claim_for_worker = true;
186
187 s.subscribe_lease_history = true;
189 s.subscribe_completion = true;
190 s.subscribe_signal_delivery = true;
191 s.subscribe_instance_tags = false;
192
193 s.stream_durable_summary = true;
195 s.stream_best_effort_live = true;
196
197 s.prepare = true;
199
200 s.cancel_execution = true;
207 s.change_priority = true;
208 s.replay_execution = true;
209 s.revoke_lease = true;
210 s.read_execution_state = true;
211 s.read_execution_info = true;
212 s.get_execution_result = true;
213 s.budget_admin = true;
214 s.quota_admin = true;
215 s.list_pending_waitpoints = true;
216 s.cancel_flow_header = true;
217 s.ack_cancel_member = true;
218
219 s.register_worker = true;
221 s.heartbeat_worker = true;
222 s.mark_worker_dead = true;
223 s.list_expired_leases = true;
224 s.list_workers = true;
225
226 s
227}
228
229pub struct PostgresBackend {
230 #[allow(dead_code)] pool: PgPool,
232 #[allow(dead_code)]
233 partition_config: PartitionConfig,
234 #[allow(dead_code)]
235 metrics: Option<Arc<ff_observability::Metrics>>,
236 #[allow(dead_code)]
240 stream_notifier: Option<Arc<StreamNotifier>>,
241 #[cfg(feature = "core")]
247 scanner_handle: Option<Arc<scanner_supervisor::PostgresScannerHandle>>,
248}
249
250impl PostgresBackend {
251 pub async fn connect(config: BackendConfig) -> Result<Arc<dyn EngineBackend>, EngineError> {
266 let pool = pool::build_pool(&config).await?;
267 warn_if_max_locks_low(&pool).await;
268 let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
269 let backend = Self {
270 pool,
271 partition_config: PartitionConfig::default(),
272 metrics: None,
273 stream_notifier,
274 #[cfg(feature = "core")]
275 scanner_handle: None,
276 };
277 Ok(Arc::new(backend))
278 }
279
280 pub fn from_pool(pool: PgPool, partition_config: PartitionConfig) -> Arc<Self> {
286 let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
287 Arc::new(Self {
288 pool,
289 partition_config,
290 metrics: None,
291 stream_notifier,
292 #[cfg(feature = "core")]
293 scanner_handle: None,
294 })
295 }
296
297 pub async fn connect_with_metrics(
311 config: BackendConfig,
312 partition_config: PartitionConfig,
313 metrics: Arc<ff_observability::Metrics>,
314 ) -> Result<Arc<Self>, EngineError> {
315 let pool = pool::build_pool(&config).await?;
316 warn_if_max_locks_low(&pool).await;
317 let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
318 Ok(Arc::new(Self {
319 pool,
320 partition_config,
321 metrics: Some(metrics),
322 stream_notifier,
323 #[cfg(feature = "core")]
324 scanner_handle: None,
325 }))
326 }
327
328 #[cfg(feature = "core")]
335 pub fn with_scanners(
336 self: &mut Arc<Self>,
337 cfg: scanner_supervisor::PostgresScannerConfig,
338 ) -> bool {
339 let Some(inner) = Arc::get_mut(self) else {
340 return false;
341 };
342 let handle = scanner_supervisor::spawn_scanners(inner.pool.clone(), cfg);
343 inner.scanner_handle = Some(Arc::new(handle));
344 true
345 }
346
347 pub fn pool(&self) -> &PgPool {
352 &self.pool
353 }
354
355 #[cfg(feature = "core")]
368 #[tracing::instrument(name = "pg.create_execution", skip_all)]
369 pub async fn create_execution(
370 &self,
371 args: CreateExecutionArgs,
372 ) -> Result<ExecutionId, EngineError> {
373 exec_core::create_execution_impl(&self.pool, &self.partition_config, args).await
374 }
375
376 #[cfg(feature = "core")]
384 #[tracing::instrument(name = "pg.create_flow.inherent", skip_all)]
385 pub async fn create_flow(
386 &self,
387 args: &CreateFlowArgs,
388 ) -> Result<CreateFlowResult, EngineError> {
389 flow_staging::create_flow(&self.pool, &self.partition_config, args).await
390 }
391
392 #[cfg(feature = "core")]
393 #[tracing::instrument(name = "pg.add_execution_to_flow.inherent", skip_all)]
394 pub async fn add_execution_to_flow(
395 &self,
396 args: &AddExecutionToFlowArgs,
397 ) -> Result<AddExecutionToFlowResult, EngineError> {
398 flow_staging::add_execution_to_flow(&self.pool, &self.partition_config, args).await
399 }
400
401 #[cfg(feature = "core")]
402 #[tracing::instrument(name = "pg.stage_dependency_edge.inherent", skip_all)]
403 pub async fn stage_dependency_edge(
404 &self,
405 args: &StageDependencyEdgeArgs,
406 ) -> Result<StageDependencyEdgeResult, EngineError> {
407 flow_staging::stage_dependency_edge(&self.pool, &self.partition_config, args).await
408 }
409
410 #[cfg(feature = "core")]
411 #[tracing::instrument(name = "pg.apply_dependency_to_child.inherent", skip_all)]
412 pub async fn apply_dependency_to_child(
413 &self,
414 args: &ApplyDependencyToChildArgs,
415 ) -> Result<ApplyDependencyToChildResult, EngineError> {
416 flow_staging::apply_dependency_to_child(&self.pool, &self.partition_config, args).await
417 }
418}
419
420#[inline]
424#[cfg_attr(feature = "streaming", allow(dead_code))]
425fn unavailable<T>(op: &'static str) -> Result<T, EngineError> {
426 Err(EngineError::Unavailable { op })
427}
428
429#[async_trait]
430impl EngineBackend for PostgresBackend {
431 #[tracing::instrument(name = "pg.claim", skip_all)]
434 async fn claim(
435 &self,
436 lane: &LaneId,
437 capabilities: &CapabilitySet,
438 policy: ClaimPolicy,
439 ) -> Result<Option<Handle>, EngineError> {
440 attempt::claim(&self.pool, lane, capabilities, &policy).await
441 }
442
443 #[tracing::instrument(name = "pg.renew", skip_all)]
444 async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError> {
445 attempt::renew(&self.pool, handle).await
446 }
447
448 #[tracing::instrument(name = "pg.progress", skip_all)]
449 async fn progress(
450 &self,
451 handle: &Handle,
452 percent: Option<u8>,
453 message: Option<String>,
454 ) -> Result<(), EngineError> {
455 attempt::progress(&self.pool, handle, percent, message).await
456 }
457
458 #[tracing::instrument(name = "pg.append_frame", skip_all)]
459 async fn append_frame(
460 &self,
461 handle: &Handle,
462 frame: Frame,
463 ) -> Result<AppendFrameOutcome, EngineError> {
464 #[cfg(feature = "streaming")]
465 {
466 stream::append_frame(&self.pool, &self.partition_config, handle, frame).await
467 }
468 #[cfg(not(feature = "streaming"))]
469 {
470 let _ = (handle, frame);
471 unavailable("pg.append_frame")
472 }
473 }
474
475 #[tracing::instrument(name = "pg.complete", skip_all)]
476 async fn complete(
477 &self,
478 handle: &Handle,
479 payload: Option<Vec<u8>>,
480 ) -> Result<(), EngineError> {
481 attempt::complete(&self.pool, handle, payload).await
482 }
483
484 #[tracing::instrument(name = "pg.fail", skip_all)]
485 async fn fail(
486 &self,
487 handle: &Handle,
488 reason: FailureReason,
489 classification: FailureClass,
490 ) -> Result<FailOutcome, EngineError> {
491 attempt::fail(&self.pool, handle, reason, classification).await
492 }
493
494 #[tracing::instrument(name = "pg.cancel", skip_all)]
495 async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError> {
496 let payload = handle_codec::decode_handle(handle)?;
497 exec_core::cancel_impl(
498 &self.pool,
499 &self.partition_config,
500 &payload.execution_id,
501 reason,
502 )
503 .await
504 }
505
506 #[tracing::instrument(name = "pg.suspend", skip_all)]
507 async fn suspend(
508 &self,
509 handle: &Handle,
510 args: SuspendArgs,
511 ) -> Result<SuspendOutcome, EngineError> {
512 suspend_ops::suspend_impl(&self.pool, &self.partition_config, handle, args).await
513 }
514
515 #[tracing::instrument(name = "pg.suspend_by_triple", skip_all)]
516 async fn suspend_by_triple(
517 &self,
518 exec_id: ExecutionId,
519 triple: LeaseFence,
520 args: SuspendArgs,
521 ) -> Result<SuspendOutcome, EngineError> {
522 suspend_ops::suspend_by_triple_impl(
523 &self.pool,
524 &self.partition_config,
525 exec_id,
526 triple,
527 args,
528 )
529 .await
530 }
531
532 #[tracing::instrument(name = "pg.create_waitpoint", skip_all)]
533 async fn create_waitpoint(
534 &self,
535 handle: &Handle,
536 waitpoint_key: &str,
537 expires_in: Duration,
538 ) -> Result<PendingWaitpoint, EngineError> {
539 suspend_ops::create_waitpoint_impl(&self.pool, handle, waitpoint_key, expires_in).await
540 }
541
542 #[cfg(feature = "core")]
543 #[tracing::instrument(name = "pg.read_waitpoint_token", skip_all)]
544 async fn read_waitpoint_token(
545 &self,
546 partition: PartitionKey,
547 waitpoint_id: &ff_core::types::WaitpointId,
548 ) -> Result<Option<String>, EngineError> {
549 suspend_ops::read_waitpoint_token_impl(&self.pool, &partition, waitpoint_id).await
550 }
551
552 #[tracing::instrument(name = "pg.observe_signals", skip_all)]
553 async fn observe_signals(
554 &self,
555 handle: &Handle,
556 ) -> Result<Vec<ResumeSignal>, EngineError> {
557 suspend_ops::observe_signals_impl(&self.pool, handle).await
558 }
559
560 #[tracing::instrument(name = "pg.claim_from_resume_grant", skip_all)]
561 async fn claim_from_resume_grant(
562 &self,
563 token: ResumeToken,
564 ) -> Result<Option<Handle>, EngineError> {
565 attempt::claim_from_resume_grant(&self.pool, token).await
566 }
567
568 #[tracing::instrument(name = "pg.issue_reclaim_grant", skip_all)]
571 async fn issue_reclaim_grant(
572 &self,
573 args: IssueReclaimGrantArgs,
574 ) -> Result<IssueReclaimGrantOutcome, EngineError> {
575 claim_grant::issue_reclaim_grant_impl(&self.pool, args).await
576 }
577
578 #[tracing::instrument(name = "pg.reclaim_execution", skip_all)]
579 async fn reclaim_execution(
580 &self,
581 args: ReclaimExecutionArgs,
582 ) -> Result<ReclaimExecutionOutcome, EngineError> {
583 claim_grant::reclaim_execution_impl(&self.pool, args).await
584 }
585
586 #[tracing::instrument(name = "pg.delay", skip_all)]
587 async fn delay(
588 &self,
589 handle: &Handle,
590 delay_until: TimestampMs,
591 ) -> Result<(), EngineError> {
592 attempt::delay(&self.pool, handle, delay_until).await
593 }
594
595 #[tracing::instrument(name = "pg.wait_children", skip_all)]
596 async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError> {
597 attempt::wait_children(&self.pool, handle).await
598 }
599
600 #[tracing::instrument(name = "pg.describe_execution", skip_all)]
603 async fn describe_execution(
604 &self,
605 id: &ExecutionId,
606 ) -> Result<Option<ExecutionSnapshot>, EngineError> {
607 exec_core::describe_execution_impl(&self.pool, &self.partition_config, id).await
608 }
609
610 #[tracing::instrument(name = "pg.read_execution_context", skip_all)]
611 async fn read_execution_context(
612 &self,
613 execution_id: &ExecutionId,
614 ) -> Result<ExecutionContext, EngineError> {
615 exec_core::read_execution_context_impl(&self.pool, &self.partition_config, execution_id)
616 .await
617 }
618
619 #[tracing::instrument(name = "pg.read_current_attempt_index", skip_all)]
620 async fn read_current_attempt_index(
621 &self,
622 execution_id: &ExecutionId,
623 ) -> Result<ff_core::types::AttemptIndex, EngineError> {
624 exec_core::read_current_attempt_index_impl(
625 &self.pool,
626 &self.partition_config,
627 execution_id,
628 )
629 .await
630 }
631
632 #[tracing::instrument(name = "pg.read_total_attempt_count", skip_all)]
633 async fn read_total_attempt_count(
634 &self,
635 execution_id: &ExecutionId,
636 ) -> Result<ff_core::types::AttemptIndex, EngineError> {
637 exec_core::read_total_attempt_count_impl(
638 &self.pool,
639 &self.partition_config,
640 execution_id,
641 )
642 .await
643 }
644
645 #[tracing::instrument(name = "pg.describe_flow", skip_all)]
646 async fn describe_flow(
647 &self,
648 id: &FlowId,
649 ) -> Result<Option<FlowSnapshot>, EngineError> {
650 flow::describe_flow(&self.pool, &self.partition_config, id).await
651 }
652
653 #[tracing::instrument(name = "pg.set_execution_tag", skip_all)]
654 async fn set_execution_tag(
655 &self,
656 execution_id: &ExecutionId,
657 key: &str,
658 value: &str,
659 ) -> Result<(), EngineError> {
660 ff_core::engine_backend::validate_tag_key(key)?;
661 exec_core::set_execution_tag_impl(&self.pool, execution_id, key, value).await
662 }
663
664 #[tracing::instrument(name = "pg.set_flow_tag", skip_all)]
665 async fn set_flow_tag(
666 &self,
667 flow_id: &FlowId,
668 key: &str,
669 value: &str,
670 ) -> Result<(), EngineError> {
671 ff_core::engine_backend::validate_tag_key(key)?;
672 flow::set_flow_tag_impl(&self.pool, &self.partition_config, flow_id, key, value).await
673 }
674
675 #[tracing::instrument(name = "pg.get_execution_tag", skip_all)]
676 async fn get_execution_tag(
677 &self,
678 execution_id: &ExecutionId,
679 key: &str,
680 ) -> Result<Option<String>, EngineError> {
681 ff_core::engine_backend::validate_tag_key(key)?;
682 exec_core::get_execution_tag_impl(&self.pool, execution_id, key).await
683 }
684
685 #[tracing::instrument(name = "pg.get_flow_tag", skip_all)]
686 async fn get_flow_tag(
687 &self,
688 flow_id: &FlowId,
689 key: &str,
690 ) -> Result<Option<String>, EngineError> {
691 ff_core::engine_backend::validate_tag_key(key)?;
692 flow::get_flow_tag_impl(&self.pool, &self.partition_config, flow_id, key).await
693 }
694
695 #[tracing::instrument(name = "pg.get_execution_namespace", skip_all)]
696 async fn get_execution_namespace(
697 &self,
698 execution_id: &ExecutionId,
699 ) -> Result<Option<String>, EngineError> {
700 exec_core::get_execution_namespace_impl(&self.pool, execution_id).await
701 }
702
703 #[cfg(feature = "core")]
704 #[tracing::instrument(name = "pg.list_edges", skip_all)]
705 async fn list_edges(
706 &self,
707 flow_id: &FlowId,
708 direction: EdgeDirection,
709 ) -> Result<Vec<EdgeSnapshot>, EngineError> {
710 flow::list_edges(&self.pool, &self.partition_config, flow_id, direction).await
711 }
712
713 #[cfg(feature = "core")]
714 #[tracing::instrument(name = "pg.describe_edge", skip_all)]
715 async fn describe_edge(
716 &self,
717 flow_id: &FlowId,
718 edge_id: &EdgeId,
719 ) -> Result<Option<EdgeSnapshot>, EngineError> {
720 flow::describe_edge(&self.pool, &self.partition_config, flow_id, edge_id).await
721 }
722
723 #[cfg(feature = "core")]
724 #[tracing::instrument(name = "pg.resolve_execution_flow_id", skip_all)]
725 async fn resolve_execution_flow_id(
726 &self,
727 eid: &ExecutionId,
728 ) -> Result<Option<FlowId>, EngineError> {
729 exec_core::resolve_execution_flow_id_impl(&self.pool, &self.partition_config, eid).await
730 }
731
732 #[cfg(feature = "core")]
733 #[tracing::instrument(name = "pg.list_flows", skip_all)]
734 async fn list_flows(
735 &self,
736 partition: PartitionKey,
737 cursor: Option<FlowId>,
738 limit: usize,
739 ) -> Result<ListFlowsPage, EngineError> {
740 flow::list_flows(&self.pool, partition, cursor, limit).await
741 }
742
743 #[cfg(feature = "core")]
744 #[tracing::instrument(name = "pg.list_lanes", skip_all)]
745 async fn list_lanes(
746 &self,
747 cursor: Option<LaneId>,
748 limit: usize,
749 ) -> Result<ListLanesPage, EngineError> {
750 admin::list_lanes_impl(&self.pool, cursor, limit).await
751 }
752
753 #[cfg(feature = "core")]
754 #[tracing::instrument(name = "pg.list_suspended", skip_all)]
755 async fn list_suspended(
756 &self,
757 partition: PartitionKey,
758 cursor: Option<ExecutionId>,
759 limit: usize,
760 ) -> Result<ListSuspendedPage, EngineError> {
761 admin::list_suspended_impl(&self.pool, partition, cursor, limit).await
762 }
763
764 #[cfg(feature = "core")]
765 #[tracing::instrument(name = "pg.list_executions", skip_all)]
766 async fn list_executions(
767 &self,
768 partition: PartitionKey,
769 cursor: Option<ExecutionId>,
770 limit: usize,
771 ) -> Result<ListExecutionsPage, EngineError> {
772 exec_core::list_executions_impl(
773 &self.pool,
774 &self.partition_config,
775 partition,
776 cursor,
777 limit,
778 )
779 .await
780 }
781
782 #[cfg(feature = "core")]
785 #[tracing::instrument(name = "pg.deliver_signal", skip_all)]
786 async fn deliver_signal(
787 &self,
788 args: DeliverSignalArgs,
789 ) -> Result<DeliverSignalResult, EngineError> {
790 suspend_ops::deliver_signal_impl(&self.pool, &self.partition_config, args).await
791 }
792
793 #[cfg(feature = "core")]
794 #[tracing::instrument(name = "pg.deliver_approval_signal", skip_all)]
795 async fn deliver_approval_signal(
796 &self,
797 args: DeliverApprovalSignalArgs,
798 ) -> Result<DeliverSignalResult, EngineError> {
799 suspend_ops::deliver_approval_signal_impl(&self.pool, &self.partition_config, args).await
800 }
801
802 #[cfg(feature = "core")]
803 #[tracing::instrument(name = "pg.claim_resumed_execution", skip_all)]
804 async fn claim_resumed_execution(
805 &self,
806 args: ClaimResumedExecutionArgs,
807 ) -> Result<ClaimResumedExecutionResult, EngineError> {
808 suspend_ops::claim_resumed_execution_impl(&self.pool, &self.partition_config, args).await
809 }
810
811 #[cfg(feature = "core")]
821 #[tracing::instrument(name = "pg.read_execution_state", skip_all)]
822 async fn read_execution_state(
823 &self,
824 id: &ExecutionId,
825 ) -> Result<Option<PublicState>, EngineError> {
826 exec_core::read_execution_state_impl(&self.pool, &self.partition_config, id).await
827 }
828
829 #[cfg(feature = "core")]
830 #[tracing::instrument(name = "pg.read_execution_info", skip_all)]
831 async fn read_execution_info(
832 &self,
833 id: &ExecutionId,
834 ) -> Result<Option<ExecutionInfo>, EngineError> {
835 exec_core::read_execution_info_impl(&self.pool, &self.partition_config, id).await
836 }
837
838 #[tracing::instrument(name = "pg.get_execution_result", skip_all)]
839 async fn get_execution_result(
840 &self,
841 id: &ExecutionId,
842 ) -> Result<Option<Vec<u8>>, EngineError> {
843 exec_core::get_execution_result_impl(&self.pool, &self.partition_config, id).await
844 }
845
846 #[cfg(feature = "core")]
855 #[tracing::instrument(name = "pg.list_pending_waitpoints", skip_all)]
856 async fn list_pending_waitpoints(
857 &self,
858 args: ListPendingWaitpointsArgs,
859 ) -> Result<ListPendingWaitpointsResult, EngineError> {
860 suspend_ops::list_pending_waitpoints_impl(&self.pool, args).await
861 }
862
863 #[cfg(feature = "core")]
871 #[tracing::instrument(name = "pg.cancel_execution", skip_all)]
872 async fn cancel_execution(
873 &self,
874 args: CancelExecutionArgs,
875 ) -> Result<CancelExecutionResult, EngineError> {
876 operator::cancel_execution_impl(&self.pool, args).await
877 }
878
879 #[cfg(feature = "core")]
880 #[tracing::instrument(name = "pg.revoke_lease", skip_all)]
881 async fn revoke_lease(
882 &self,
883 args: RevokeLeaseArgs,
884 ) -> Result<RevokeLeaseResult, EngineError> {
885 operator::revoke_lease_impl(&self.pool, args).await
886 }
887
888 #[cfg(feature = "core")]
898 #[tracing::instrument(name = "pg.change_priority", skip_all)]
899 async fn change_priority(
900 &self,
901 args: ChangePriorityArgs,
902 ) -> Result<ChangePriorityResult, EngineError> {
903 operator::change_priority_impl(&self.pool, args).await
904 }
905
906 #[cfg(feature = "core")]
907 #[tracing::instrument(name = "pg.replay_execution", skip_all)]
908 async fn replay_execution(
909 &self,
910 args: ReplayExecutionArgs,
911 ) -> Result<ReplayExecutionResult, EngineError> {
912 operator::replay_execution_impl(&self.pool, args).await
913 }
914
915 #[cfg(feature = "core")]
916 #[tracing::instrument(name = "pg.cancel_flow_header", skip_all)]
917 async fn cancel_flow_header(
918 &self,
919 args: CancelFlowArgs,
920 ) -> Result<CancelFlowHeader, EngineError> {
921 operator::cancel_flow_header_impl(&self.pool, &self.partition_config, args).await
922 }
923
924 #[cfg(feature = "core")]
925 #[tracing::instrument(name = "pg.ack_cancel_member", skip_all)]
926 async fn ack_cancel_member(
927 &self,
928 flow_id: &FlowId,
929 execution_id: &ExecutionId,
930 ) -> Result<(), EngineError> {
931 operator::ack_cancel_member_impl(
932 &self.pool,
933 &self.partition_config,
934 flow_id.clone(),
935 execution_id.clone(),
936 )
937 .await
938 }
939
940 #[cfg(feature = "core")]
951 #[tracing::instrument(name = "pg.create_execution.trait", skip_all)]
952 async fn create_execution(
953 &self,
954 args: CreateExecutionArgs,
955 ) -> Result<CreateExecutionResult, EngineError> {
956 let eid = args.execution_id.clone();
957 exec_core::create_execution_impl(&self.pool, &self.partition_config, args).await?;
958 Ok(CreateExecutionResult::Created {
959 execution_id: eid,
960 public_state: PublicState::Waiting,
961 })
962 }
963
964 #[cfg(feature = "core")]
965 #[tracing::instrument(name = "pg.create_flow", skip_all)]
966 async fn create_flow(
967 &self,
968 args: CreateFlowArgs,
969 ) -> Result<CreateFlowResult, EngineError> {
970 flow_staging::create_flow(&self.pool, &self.partition_config, &args).await
971 }
972
973 #[cfg(feature = "core")]
974 #[tracing::instrument(name = "pg.add_execution_to_flow", skip_all)]
975 async fn add_execution_to_flow(
976 &self,
977 args: AddExecutionToFlowArgs,
978 ) -> Result<AddExecutionToFlowResult, EngineError> {
979 flow_staging::add_execution_to_flow(&self.pool, &self.partition_config, &args).await
980 }
981
982 #[cfg(feature = "core")]
983 #[tracing::instrument(name = "pg.stage_dependency_edge", skip_all)]
984 async fn stage_dependency_edge(
985 &self,
986 args: StageDependencyEdgeArgs,
987 ) -> Result<StageDependencyEdgeResult, EngineError> {
988 flow_staging::stage_dependency_edge(&self.pool, &self.partition_config, &args).await
989 }
990
991 #[cfg(feature = "core")]
992 #[tracing::instrument(name = "pg.apply_dependency_to_child", skip_all)]
993 async fn apply_dependency_to_child(
994 &self,
995 args: ApplyDependencyToChildArgs,
996 ) -> Result<ApplyDependencyToChildResult, EngineError> {
997 flow_staging::apply_dependency_to_child(&self.pool, &self.partition_config, &args).await
998 }
999
1000 #[cfg(feature = "core")]
1006 async fn cascade_completion(
1007 &self,
1008 payload: &ff_core::backend::CompletionPayload,
1009 ) -> Result<ff_core::contracts::CascadeOutcome, EngineError> {
1010 let event_id = match resolve_event_id(&self.pool, payload).await {
1011 Some(id) => id,
1012 None => {
1013 tracing::warn!(
1016 execution_id = %payload.execution_id,
1017 produced_at_ms = payload.produced_at_ms.0,
1018 "pg.cascade_completion: could not resolve event_id; reconciler will claim"
1019 );
1020 return Ok(ff_core::contracts::CascadeOutcome::async_dispatched(0));
1021 }
1022 };
1023 let outcome = crate::dispatch::dispatch_completion(&self.pool, event_id).await?;
1024 let advanced = match outcome {
1025 crate::dispatch::DispatchOutcome::NoOp => 0,
1026 crate::dispatch::DispatchOutcome::Advanced(n) => n,
1027 };
1028 Ok(ff_core::contracts::CascadeOutcome::async_dispatched(advanced))
1029 }
1030
1031 fn backend_label(&self) -> &'static str {
1032 "postgres"
1033 }
1034
1035 fn capabilities(&self) -> ff_core::capability::Capabilities {
1043 ff_core::capability::Capabilities::new(
1044 ff_core::capability::BackendIdentity::new(
1045 "postgres",
1046 ff_core::capability::Version::new(0, 11, 0),
1047 "E-shipped",
1048 ),
1049 postgres_supports_base(),
1050 )
1051 }
1052
1053 async fn prepare(
1061 &self,
1062 ) -> Result<ff_core::backend::PrepareOutcome, EngineError> {
1063 Ok(ff_core::backend::PrepareOutcome::NoOp)
1064 }
1065
1066 #[cfg(feature = "core")]
1074 #[tracing::instrument(name = "pg.claim_for_worker", skip_all)]
1075 async fn claim_for_worker(
1076 &self,
1077 args: ff_core::contracts::ClaimForWorkerArgs,
1078 ) -> Result<ff_core::contracts::ClaimForWorkerOutcome, EngineError> {
1079 let sched = scheduler::PostgresScheduler::new(self.pool.clone());
1080 let grant_opt = sched
1081 .claim_for_worker(
1082 &args.lane_id,
1083 &args.worker_id,
1084 &args.worker_instance_id,
1085 &args.worker_capabilities,
1086 args.grant_ttl_ms,
1087 )
1088 .await?;
1089 Ok(match grant_opt {
1090 Some(g) => ff_core::contracts::ClaimForWorkerOutcome::granted(g),
1091 None => ff_core::contracts::ClaimForWorkerOutcome::no_work(),
1092 })
1093 }
1094
1095 async fn ping(&self) -> Result<(), EngineError> {
1096 let _ = sqlx::query_scalar::<_, i32>("SELECT 1")
1100 .fetch_one(&self.pool)
1101 .await
1102 .map_err(error::map_sqlx_error)?;
1103 Ok(())
1104 }
1105
1106 async fn shutdown_prepare(&self, grace: Duration) -> Result<(), EngineError> {
1111 #[cfg(feature = "core")]
1112 if let Some(handle) = self.scanner_handle.as_ref() {
1113 let timed_out = handle.shutdown(grace).await;
1114 if timed_out > 0 {
1115 tracing::warn!(
1116 timed_out,
1117 ?grace,
1118 "postgres scanner supervisor exceeded grace on shutdown"
1119 );
1120 }
1121 }
1122 Ok(())
1123 }
1124
1125 #[tracing::instrument(name = "pg.cancel_flow", skip_all)]
1126 async fn cancel_flow(
1127 &self,
1128 id: &FlowId,
1129 policy: CancelFlowPolicy,
1130 wait: CancelFlowWait,
1131 ) -> Result<CancelFlowResult, EngineError> {
1132 let result = flow::cancel_flow(&self.pool, &self.partition_config, id, policy).await?;
1133 if let Some(deadline) = ff_core::engine_backend::cancel_flow_wait_deadline(wait) {
1134 ff_core::engine_backend::wait_for_flow_cancellation(self, id, deadline).await?;
1135 }
1136 Ok(result)
1137 }
1138
1139 #[cfg(feature = "core")]
1140 #[tracing::instrument(name = "pg.set_edge_group_policy", skip_all)]
1141 async fn set_edge_group_policy(
1142 &self,
1143 flow_id: &FlowId,
1144 downstream_execution_id: &ExecutionId,
1145 policy: EdgeDependencyPolicy,
1146 ) -> Result<SetEdgeGroupPolicyResult, EngineError> {
1147 flow::set_edge_group_policy(
1148 &self.pool,
1149 &self.partition_config,
1150 flow_id,
1151 downstream_execution_id,
1152 policy,
1153 )
1154 .await
1155 }
1156
1157 #[tracing::instrument(name = "pg.report_usage", skip_all)]
1160 async fn report_usage(
1161 &self,
1162 _handle: &Handle,
1163 budget: &BudgetId,
1164 dimensions: UsageDimensions,
1165 ) -> Result<ReportUsageResult, EngineError> {
1166 budget::report_usage_impl(&self.pool, &self.partition_config, budget, dimensions).await
1167 }
1168
1169 #[cfg(feature = "core")]
1177 #[tracing::instrument(name = "pg.create_budget", skip_all)]
1178 async fn create_budget(
1179 &self,
1180 args: CreateBudgetArgs,
1181 ) -> Result<CreateBudgetResult, EngineError> {
1182 budget::create_budget_impl(&self.pool, &self.partition_config, args).await
1183 }
1184
1185 #[cfg(feature = "core")]
1186 #[tracing::instrument(name = "pg.reset_budget", skip_all)]
1187 async fn reset_budget(
1188 &self,
1189 args: ResetBudgetArgs,
1190 ) -> Result<ResetBudgetResult, EngineError> {
1191 budget::reset_budget_impl(&self.pool, &self.partition_config, args).await
1192 }
1193
1194 #[cfg(feature = "core")]
1195 #[tracing::instrument(name = "pg.create_quota_policy", skip_all)]
1196 async fn create_quota_policy(
1197 &self,
1198 args: CreateQuotaPolicyArgs,
1199 ) -> Result<CreateQuotaPolicyResult, EngineError> {
1200 budget::create_quota_policy_impl(&self.pool, &self.partition_config, args).await
1201 }
1202
1203 #[cfg(feature = "core")]
1204 #[tracing::instrument(name = "pg.get_budget_status", skip_all)]
1205 async fn get_budget_status(
1206 &self,
1207 id: &BudgetId,
1208 ) -> Result<BudgetStatus, EngineError> {
1209 budget::get_budget_status_impl(&self.pool, &self.partition_config, id).await
1210 }
1211
1212 #[cfg(feature = "core")]
1213 #[tracing::instrument(name = "pg.report_usage_admin", skip_all)]
1214 async fn report_usage_admin(
1215 &self,
1216 budget_id: &BudgetId,
1217 args: ReportUsageAdminArgs,
1218 ) -> Result<ReportUsageResult, EngineError> {
1219 budget::report_usage_admin_impl(&self.pool, &self.partition_config, budget_id, args).await
1220 }
1221
1222 #[cfg(feature = "core")]
1225 #[tracing::instrument(name = "pg.record_spend", skip_all)]
1226 async fn record_spend(
1227 &self,
1228 args: ff_core::contracts::RecordSpendArgs,
1229 ) -> Result<ReportUsageResult, EngineError> {
1230 budget::record_spend_impl(&self.pool, &self.partition_config, args).await
1231 }
1232
1233 #[cfg(feature = "core")]
1234 #[tracing::instrument(name = "pg.release_budget", skip_all)]
1235 async fn release_budget(
1236 &self,
1237 args: ff_core::contracts::ReleaseBudgetArgs,
1238 ) -> Result<(), EngineError> {
1239 budget::release_budget_impl(&self.pool, &self.partition_config, args).await
1240 }
1241
1242 #[tracing::instrument(name = "pg.rotate_waitpoint_hmac_secret_all", skip_all)]
1249 async fn rotate_waitpoint_hmac_secret_all(
1250 &self,
1251 args: RotateWaitpointHmacSecretAllArgs,
1252 ) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
1253 let now_ms = std::time::SystemTime::now()
1258 .duration_since(std::time::UNIX_EPOCH)
1259 .map(|d| d.as_millis() as i64)
1260 .unwrap_or(0);
1261 signal::rotate_waitpoint_hmac_secret_all_impl(&self.pool, args, now_ms).await
1262 }
1263
1264 #[tracing::instrument(name = "pg.seed_waitpoint_hmac_secret", skip_all)]
1265 async fn seed_waitpoint_hmac_secret(
1266 &self,
1267 args: SeedWaitpointHmacSecretArgs,
1268 ) -> Result<SeedOutcome, EngineError> {
1269 let now_ms = std::time::SystemTime::now()
1273 .duration_since(std::time::UNIX_EPOCH)
1274 .map(|d| d.as_millis() as i64)
1275 .unwrap_or(0);
1276 signal::seed_waitpoint_hmac_secret_impl(&self.pool, args, now_ms).await
1277 }
1278
1279 #[cfg(feature = "streaming")]
1282 #[tracing::instrument(name = "pg.read_stream", skip_all)]
1283 async fn read_stream(
1284 &self,
1285 execution_id: &ExecutionId,
1286 attempt_index: AttemptIndex,
1287 from: StreamCursor,
1288 to: StreamCursor,
1289 count_limit: u64,
1290 ) -> Result<StreamFrames, EngineError> {
1291 stream::read_stream(&self.pool, execution_id, attempt_index, from, to, count_limit).await
1292 }
1293
1294 #[cfg(feature = "streaming")]
1295 #[tracing::instrument(name = "pg.tail_stream", skip_all)]
1296 async fn tail_stream(
1297 &self,
1298 execution_id: &ExecutionId,
1299 attempt_index: AttemptIndex,
1300 after: StreamCursor,
1301 block_ms: u64,
1302 count_limit: u64,
1303 visibility: TailVisibility,
1304 ) -> Result<StreamFrames, EngineError> {
1305 let notifier = self
1306 .stream_notifier
1307 .as_ref()
1308 .ok_or(EngineError::Unavailable {
1309 op: "pg.tail_stream (notifier not initialised)",
1310 })?;
1311 stream::tail_stream(
1312 &self.pool,
1313 notifier,
1314 execution_id,
1315 attempt_index,
1316 after,
1317 block_ms,
1318 count_limit,
1319 visibility,
1320 )
1321 .await
1322 }
1323
1324 #[cfg(feature = "streaming")]
1325 #[tracing::instrument(name = "pg.read_summary", skip_all)]
1326 async fn read_summary(
1327 &self,
1328 execution_id: &ExecutionId,
1329 attempt_index: AttemptIndex,
1330 ) -> Result<Option<SummaryDocument>, EngineError> {
1331 stream::read_summary(&self.pool, execution_id, attempt_index).await
1332 }
1333
1334 #[tracing::instrument(name = "pg.subscribe_completion", skip_all)]
1348 async fn subscribe_completion(
1349 &self,
1350 _cursor: ff_core::stream_subscribe::StreamCursor,
1351 filter: &ff_core::backend::ScannerFilter,
1352 ) -> Result<ff_core::stream_events::CompletionSubscription, EngineError> {
1353 use ff_core::stream_events::{CompletionEvent, CompletionOutcome};
1354 use ff_core::stream_subscribe::encode_postgres_event_cursor;
1355 use futures_core::Stream;
1356 use std::pin::Pin;
1357 use std::task::{Context, Poll};
1358
1359 let inner = if filter.is_noop() {
1366 ff_core::completion_backend::CompletionBackend::subscribe_completions(self).await?
1367 } else {
1368 ff_core::completion_backend::CompletionBackend::subscribe_completions_filtered(
1369 self, filter,
1370 )
1371 .await?
1372 };
1373
1374 struct Adapter {
1375 inner: ff_core::completion_backend::CompletionStream,
1376 }
1377
1378 impl Stream for Adapter {
1379 type Item = Result<CompletionEvent, EngineError>;
1380 fn poll_next(
1381 mut self: Pin<&mut Self>,
1382 cx: &mut Context<'_>,
1383 ) -> Poll<Option<Self::Item>> {
1384 match Pin::new(&mut self.inner).poll_next(cx) {
1385 Poll::Pending => Poll::Pending,
1386 Poll::Ready(None) => Poll::Ready(None),
1387 Poll::Ready(Some(payload)) => {
1388 let cursor = encode_postgres_event_cursor(0);
1393 let event = CompletionEvent::new(
1394 cursor,
1395 payload.execution_id.clone(),
1396 CompletionOutcome::from_wire(&payload.outcome),
1397 payload.produced_at_ms,
1398 );
1399 Poll::Ready(Some(Ok(event)))
1400 }
1401 }
1402 }
1403 }
1404
1405 Ok(Box::pin(Adapter { inner }))
1406 }
1407
1408 #[tracing::instrument(name = "pg.subscribe_lease_history", skip_all)]
1422 async fn subscribe_lease_history(
1423 &self,
1424 cursor: ff_core::stream_subscribe::StreamCursor,
1425 filter: &ff_core::backend::ScannerFilter,
1426 ) -> Result<ff_core::stream_events::LeaseHistorySubscription, EngineError> {
1427 lease_event_subscribe::subscribe(&self.pool, 0, cursor, filter.clone()).await
1428 }
1429
1430 #[tracing::instrument(name = "pg.subscribe_signal_delivery", skip_all)]
1440 async fn subscribe_signal_delivery(
1441 &self,
1442 cursor: ff_core::stream_subscribe::StreamCursor,
1443 filter: &ff_core::backend::ScannerFilter,
1444 ) -> Result<ff_core::stream_events::SignalDeliverySubscription, EngineError> {
1445 signal_delivery_subscribe::subscribe(&self.pool, 0, cursor, filter.clone()).await
1446 }
1447
1448 #[cfg(feature = "core")]
1470 async fn mark_lease_expired_if_due(
1471 &self,
1472 partition: Partition,
1473 execution_id: &ExecutionId,
1474 ) -> Result<(), EngineError> {
1475 let (_pk_from_eid, exec_uuid) = attempt::split_exec_id(execution_id)?;
1476 let partition_key = partition_index_to_i16(partition)?;
1477 reconcilers::lease_expiry::release_for_execution(&self.pool, partition_key, exec_uuid)
1478 .await
1479 }
1480
1481 #[cfg(feature = "core")]
1482 async fn promote_delayed(
1483 &self,
1484 partition: Partition,
1485 _lane: &LaneId,
1486 execution_id: &ExecutionId,
1487 now_ms: TimestampMs,
1488 ) -> Result<(), EngineError> {
1489 let (_pk_from_eid, exec_uuid) = attempt::split_exec_id(execution_id)?;
1490 let partition_key = partition_index_to_i16(partition)?;
1491 reconcilers::delayed_promoter::promote_for_execution(
1497 &self.pool,
1498 partition_key,
1499 exec_uuid,
1500 now_ms.0,
1501 )
1502 .await
1503 }
1504
1505 #[cfg(feature = "core")]
1506 async fn close_waitpoint(
1507 &self,
1508 partition: Partition,
1509 _execution_id: &ExecutionId,
1510 waitpoint_id: &str,
1511 now_ms: TimestampMs,
1512 ) -> Result<(), EngineError> {
1513 let partition_key = partition_index_to_i16(partition)?;
1519 let waitpoint_uuid = uuid::Uuid::parse_str(waitpoint_id).map_err(|e| {
1520 EngineError::Validation {
1521 kind: ff_core::engine_error::ValidationKind::InvalidInput,
1522 detail: format!("waitpoint_id not a UUID: {e}"),
1523 }
1524 })?;
1525 reconcilers::pending_wp_expiry::close_for_execution(
1526 &self.pool,
1527 partition_key,
1528 waitpoint_uuid,
1529 now_ms.0,
1530 )
1531 .await
1532 }
1533
1534 #[cfg(feature = "core")]
1535 async fn expire_execution(
1536 &self,
1537 partition: Partition,
1538 execution_id: &ExecutionId,
1539 phase: ExpirePhase,
1540 now_ms: TimestampMs,
1541 ) -> Result<(), EngineError> {
1542 let (_pk_from_eid, exec_uuid) = attempt::split_exec_id(execution_id)?;
1543 let partition_key = partition_index_to_i16(partition)?;
1544 match phase {
1545 ExpirePhase::AttemptTimeout => {
1546 reconcilers::attempt_timeout::expire_for_execution(
1547 &self.pool,
1548 partition_key,
1549 exec_uuid,
1550 )
1551 .await
1552 }
1553 ExpirePhase::ExecutionDeadline => {
1554 reconcilers::execution_deadline::expire_for_execution(
1555 &self.pool,
1556 partition_key,
1557 exec_uuid,
1558 now_ms.0,
1559 )
1560 .await
1561 }
1562 }
1563 }
1564
1565 #[cfg(feature = "core")]
1566 async fn expire_suspension(
1567 &self,
1568 partition: Partition,
1569 execution_id: &ExecutionId,
1570 _now_ms: TimestampMs,
1571 ) -> Result<(), EngineError> {
1572 let (_pk_from_eid, exec_uuid) = attempt::split_exec_id(execution_id)?;
1573 let partition_key = partition_index_to_i16(partition)?;
1574 reconcilers::suspension_timeout::expire_for_execution(
1575 &self.pool,
1576 partition_key,
1577 exec_uuid,
1578 )
1579 .await
1580 }
1581
1582 #[cfg(feature = "core")]
1587 async fn project_flow_summary(
1588 &self,
1589 partition: Partition,
1590 flow_id: &FlowId,
1591 now_ms: TimestampMs,
1592 ) -> Result<bool, EngineError> {
1593 let partition_key = partition_index_to_i16(partition)?;
1594 let flow_uuid: sqlx::types::Uuid = flow_id.0;
1595 flow::project_flow_summary_impl(
1596 &self.pool,
1597 partition_key,
1598 flow_uuid,
1599 now_ms.0,
1600 )
1601 .await
1602 }
1603
1604 #[cfg(feature = "core")]
1609 async fn trim_retention(
1610 &self,
1611 partition: Partition,
1612 lane_id: &LaneId,
1613 retention_ms: u64,
1614 now_ms: TimestampMs,
1615 batch_size: u32,
1616 filter: &ff_core::backend::ScannerFilter,
1617 ) -> Result<u32, EngineError> {
1618 let partition_key = partition_index_to_i16(partition)?;
1619 exec_core::trim_retention_impl(
1620 &self.pool,
1621 partition_key,
1622 lane_id.as_str(),
1623 retention_ms,
1624 now_ms.0,
1625 batch_size,
1626 filter,
1627 )
1628 .await
1629 }
1630
1631 #[cfg(feature = "core")]
1634 async fn renew_lease(
1635 &self,
1636 args: ff_core::contracts::RenewLeaseArgs,
1637 ) -> Result<ff_core::contracts::RenewLeaseResult, EngineError> {
1638 crate::typed_ops::renew_lease(self.pool(), args).await
1639 }
1640
1641 #[cfg(feature = "core")]
1642 async fn complete_execution(
1643 &self,
1644 args: ff_core::contracts::CompleteExecutionArgs,
1645 ) -> Result<ff_core::contracts::CompleteExecutionResult, EngineError> {
1646 crate::typed_ops::complete_execution(self.pool(), args).await
1647 }
1648
1649 #[cfg(feature = "core")]
1650 async fn fail_execution(
1651 &self,
1652 args: ff_core::contracts::FailExecutionArgs,
1653 ) -> Result<ff_core::contracts::FailExecutionResult, EngineError> {
1654 crate::typed_ops::fail_execution(self.pool(), args).await
1655 }
1656
1657 #[cfg(feature = "core")]
1658 async fn resume_execution(
1659 &self,
1660 args: ff_core::contracts::ResumeExecutionArgs,
1661 ) -> Result<ff_core::contracts::ResumeExecutionResult, EngineError> {
1662 crate::typed_ops::resume_execution(self.pool(), args).await
1663 }
1664
1665 #[cfg(feature = "core")]
1666 async fn check_admission(
1667 &self,
1668 quota_policy_id: &ff_core::types::QuotaPolicyId,
1669 _dimension: &str,
1670 args: ff_core::contracts::CheckAdmissionArgs,
1671 ) -> Result<ff_core::contracts::CheckAdmissionResult, EngineError> {
1672 crate::typed_ops::check_admission(
1673 self.pool(),
1674 &self.partition_config,
1675 quota_policy_id,
1676 args,
1677 )
1678 .await
1679 }
1680
1681 #[cfg(feature = "core")]
1682 async fn evaluate_flow_eligibility(
1683 &self,
1684 args: ff_core::contracts::EvaluateFlowEligibilityArgs,
1685 ) -> Result<ff_core::contracts::EvaluateFlowEligibilityResult, EngineError> {
1686 crate::typed_ops::evaluate_flow_eligibility(self.pool(), args).await
1687 }
1688
1689 #[cfg(feature = "core")]
1690 async fn claim_execution(
1691 &self,
1692 args: ff_core::contracts::ClaimExecutionArgs,
1693 ) -> Result<ff_core::contracts::ClaimExecutionResult, EngineError> {
1694 crate::typed_ops::claim_execution(self.pool(), &self.partition_config, args).await
1695 }
1696
1697 #[cfg(feature = "core")]
1701 #[tracing::instrument(name = "pg.issue_grant_and_claim", skip_all)]
1702 async fn issue_grant_and_claim(
1703 &self,
1704 args: ff_core::contracts::IssueGrantAndClaimArgs,
1705 ) -> Result<ff_core::contracts::ClaimGrantOutcome, EngineError> {
1706 crate::typed_ops::issue_grant_and_claim(self.pool(), &self.partition_config, args).await
1707 }
1708
1709 async fn read_exec_core_fields(
1712 &self,
1713 partition: ff_core::partition::Partition,
1714 execution_id: &ff_core::types::ExecutionId,
1715 fields: &[&str],
1716 ) -> Result<std::collections::HashMap<String, Option<String>>, EngineError> {
1717 if fields.is_empty() {
1718 return Ok(std::collections::HashMap::new());
1719 }
1720 let derived: u16 = execution_id.partition();
1725 if partition.index != derived {
1726 return Err(EngineError::Validation {
1727 kind: ff_core::engine_error::ValidationKind::InvalidInput,
1728 detail: format!(
1729 "read_exec_core_fields: partition mismatch (arg={}, eid={})",
1730 partition.index, derived
1731 ),
1732 });
1733 }
1734 let partition_key: i16 = partition.index as i16;
1735 let exec_uuid = crate::exec_core::eid_uuid(execution_id);
1736
1737 let mut projections: Vec<String> = Vec::with_capacity(fields.len());
1757 for field in fields {
1758 let expr = match *field {
1759 "lane_id" | "lifecycle_phase" | "ownership_state" | "eligibility_state"
1761 | "public_state" | "attempt_state" | "blocking_reason" | "cancellation_reason"
1762 | "cancelled_by" => format!("{f}::text", f = field),
1763 "attempt_index" => "attempt_index::text".to_string(),
1764 "flow_id" => "flow_id::text".to_string(),
1765 "priority" => "priority::text".to_string(),
1766 "created_at_ms" => "created_at_ms::text".to_string(),
1767 "terminal_at_ms" => "terminal_at_ms::text".to_string(),
1768 "deadline_at_ms" => "deadline_at_ms::text".to_string(),
1769 "current_attempt_index" => "attempt_index::text".to_string(),
1772 "completed_at" => "terminal_at_ms::text".to_string(),
1773 "cancel_reason" => "cancellation_reason::text".to_string(),
1774 "required_capabilities" => {
1777 "array_to_string(required_capabilities, ',')".to_string()
1778 }
1779 other => {
1781 match other {
1784 "current_waitpoint_id"
1785 | "current_worker_instance_id"
1786 | "budget_ids"
1787 | "quota_policy_id" => format!("raw_fields ->> '{other}'"),
1788 _ => "NULL".to_string(),
1789 }
1790 }
1791 };
1792 projections.push(expr);
1793 }
1794 let projection_sql = projections.join(", ");
1795 let query = format!(
1796 "SELECT {projection_sql} FROM ff_exec_core \
1797 WHERE partition_key = $1 AND execution_id = $2"
1798 );
1799 let row_opt = sqlx::query(&query)
1800 .bind(partition_key)
1801 .bind(exec_uuid)
1802 .fetch_optional(self.pool())
1803 .await
1804 .map_err(|e| EngineError::Transport {
1805 backend: "postgres",
1806 source: format!("read_exec_core_fields: {e}").into(),
1807 })?;
1808
1809 let mut out = std::collections::HashMap::with_capacity(fields.len());
1810 if let Some(row) = row_opt {
1811 use sqlx::Row;
1812 for (idx, field) in fields.iter().enumerate() {
1813 let val: Option<String> =
1814 row.try_get(idx).map_err(|e| EngineError::Transport {
1815 backend: "postgres",
1816 source: format!("read_exec_core_fields[{field}]: {e}").into(),
1817 })?;
1818 out.insert((*field).to_string(), val);
1819 }
1820 } else {
1821 for field in fields {
1822 out.insert((*field).to_string(), None);
1823 }
1824 }
1825 Ok(out)
1826 }
1827
1828 async fn server_time_ms(&self) -> Result<u64, EngineError> {
1831 let ms: i64 = sqlx::query_scalar(
1836 "SELECT (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint",
1837 )
1838 .fetch_one(self.pool())
1839 .await
1840 .map_err(|e| EngineError::Transport {
1841 backend: "postgres",
1842 source: format!("server_time_ms: {e}").into(),
1843 })?;
1844 if ms < 0 {
1845 return Err(EngineError::Transport {
1846 backend: "postgres",
1847 source: "server_time_ms: negative epoch".into(),
1848 });
1849 }
1850 Ok(ms as u64)
1851 }
1852
1853 #[cfg(feature = "core")]
1860 #[tracing::instrument(name = "pg.register_worker", skip_all)]
1861 async fn register_worker(
1862 &self,
1863 args: ff_core::contracts::RegisterWorkerArgs,
1864 ) -> Result<ff_core::contracts::RegisterWorkerOutcome, EngineError> {
1865 worker_registry::register_worker(&self.pool, args).await
1866 }
1867
1868 #[cfg(feature = "core")]
1869 #[tracing::instrument(name = "pg.heartbeat_worker", skip_all)]
1870 async fn heartbeat_worker(
1871 &self,
1872 args: ff_core::contracts::HeartbeatWorkerArgs,
1873 ) -> Result<ff_core::contracts::HeartbeatWorkerOutcome, EngineError> {
1874 worker_registry::heartbeat_worker(&self.pool, args).await
1875 }
1876
1877 #[cfg(feature = "core")]
1878 #[tracing::instrument(name = "pg.mark_worker_dead", skip_all)]
1879 async fn mark_worker_dead(
1880 &self,
1881 args: ff_core::contracts::MarkWorkerDeadArgs,
1882 ) -> Result<ff_core::contracts::MarkWorkerDeadOutcome, EngineError> {
1883 worker_registry::mark_worker_dead(&self.pool, args).await
1884 }
1885
1886 #[cfg(all(feature = "core", feature = "suspension"))]
1891 #[tracing::instrument(name = "pg.list_expired_leases", skip_all)]
1892 async fn list_expired_leases(
1893 &self,
1894 args: ff_core::contracts::ListExpiredLeasesArgs,
1895 ) -> Result<ff_core::contracts::ListExpiredLeasesResult, EngineError> {
1896 worker_registry::list_expired_leases(&self.pool, args).await
1897 }
1898
1899 #[cfg(feature = "core")]
1900 #[tracing::instrument(name = "pg.list_workers", skip_all)]
1901 async fn list_workers(
1902 &self,
1903 args: ff_core::contracts::ListWorkersArgs,
1904 ) -> Result<ff_core::contracts::ListWorkersResult, EngineError> {
1905 worker_registry::list_workers(&self.pool, args).await
1906 }
1907}
1908
1909async fn resolve_event_id(
1928 pool: &PgPool,
1929 payload: &ff_core::backend::CompletionPayload,
1930) -> Option<i64> {
1931 let eid_str = payload.execution_id.as_str();
1932 let uuid_str = eid_str.rsplit_once(':').map(|(_, u)| u)?;
1935 let uuid = uuid::Uuid::parse_str(uuid_str).ok()?;
1936 let partition_key = i16::try_from(payload.execution_id.partition()).ok()?;
1940 let occurred_at_ms = payload.produced_at_ms.0;
1941
1942 match sqlx::query_scalar::<_, i64>(
1943 "SELECT event_id FROM ff_completion_event \
1944 WHERE partition_key = $1 AND execution_id = $2 AND occurred_at_ms = $3 \
1945 ORDER BY event_id ASC LIMIT 1",
1946 )
1947 .bind(partition_key)
1948 .bind(uuid)
1949 .bind(occurred_at_ms)
1950 .fetch_optional(pool)
1951 .await
1952 {
1953 Ok(row) => row,
1954 Err(err) => {
1955 tracing::warn!(
1956 partition_key,
1957 execution_id = %uuid,
1958 occurred_at_ms,
1959 error = %err,
1960 "resolve_event_id: ff_completion_event lookup failed; falling back to \
1961 dependency_reconciler backstop"
1962 );
1963 None
1964 }
1965 }
1966}
1967
1968fn partition_index_to_i16(partition: Partition) -> Result<i16, EngineError> {
1977 i16::try_from(partition.index).map_err(|_| EngineError::Validation {
1978 kind: ff_core::engine_error::ValidationKind::InvalidInput,
1979 detail: format!(
1980 "partition index {} exceeds i16 range (max {})",
1981 partition.index,
1982 i16::MAX
1983 ),
1984 })
1985}
1986
1987const MIN_MAX_LOCKS_PER_TRANSACTION: i64 = 256;
1996
1997async fn warn_if_max_locks_low(pool: &PgPool) {
2002 let row: Result<(String,), sqlx::Error> =
2003 sqlx::query_as("SHOW max_locks_per_transaction")
2004 .fetch_one(pool)
2005 .await;
2006 match row {
2007 Ok((raw,)) => emit_max_locks_decision(&raw),
2008 Err(e) => {
2009 tracing::debug!("failed to probe max_locks_per_transaction: {e}");
2010 }
2011 }
2012}
2013
2014fn max_locks_warn_value(raw: &str) -> Option<i64> {
2020 match raw.parse::<i64>() {
2021 Ok(v) if v < MIN_MAX_LOCKS_PER_TRANSACTION => Some(v),
2022 Ok(_) => None,
2023 Err(e) => {
2024 tracing::debug!(raw, "failed to parse max_locks_per_transaction: {e}");
2025 None
2026 }
2027 }
2028}
2029
2030fn emit_max_locks_decision(raw: &str) {
2031 if let Some(v) = max_locks_warn_value(raw) {
2032 tracing::warn!(
2033 current = v,
2034 recommended = MIN_MAX_LOCKS_PER_TRANSACTION,
2035 "postgres max_locks_per_transaction={v} is below the recommended \
2036 minimum ({MIN_MAX_LOCKS_PER_TRANSACTION}); partition-heavy workloads \
2037 may hit 'out of shared memory' under concurrent load. \
2038 See docs/operator-guide-postgres.md."
2039 );
2040 }
2041}
2042
2043#[cfg(test)]
2044mod max_locks_tests {
2045 use super::{max_locks_warn_value, MIN_MAX_LOCKS_PER_TRANSACTION};
2046
2047 #[test]
2048 fn warns_when_below_threshold() {
2049 assert_eq!(max_locks_warn_value("64"), Some(64));
2050 assert_eq!(
2051 max_locks_warn_value(&(MIN_MAX_LOCKS_PER_TRANSACTION - 1).to_string()),
2052 Some(MIN_MAX_LOCKS_PER_TRANSACTION - 1)
2053 );
2054 }
2055
2056 #[test]
2057 fn silent_at_or_above_threshold() {
2058 assert_eq!(
2059 max_locks_warn_value(&MIN_MAX_LOCKS_PER_TRANSACTION.to_string()),
2060 None
2061 );
2062 assert_eq!(max_locks_warn_value("1024"), None);
2063 }
2064
2065 #[test]
2066 fn silent_for_unparseable_raw() {
2067 assert_eq!(max_locks_warn_value("not-a-number"), None);
2068 }
2069}
2070
2071#[cfg(test)]
2072mod partition_index_tests {
2073 use super::partition_index_to_i16;
2074 use ff_core::engine_error::{EngineError, ValidationKind};
2075 use ff_core::partition::{Partition, PartitionFamily};
2076
2077 #[test]
2078 fn accepts_values_within_i16_range() {
2079 let p = Partition { family: PartitionFamily::Flow, index: 0 };
2080 assert_eq!(partition_index_to_i16(p).unwrap(), 0);
2081
2082 let p = Partition { family: PartitionFamily::Flow, index: 255 };
2083 assert_eq!(partition_index_to_i16(p).unwrap(), 255);
2084
2085 let p = Partition { family: PartitionFamily::Budget, index: i16::MAX as u16 };
2086 assert_eq!(partition_index_to_i16(p).unwrap(), i16::MAX);
2087 }
2088
2089 #[test]
2090 fn rejects_overflow_above_i16_max() {
2091 let p = Partition { family: PartitionFamily::Flow, index: (i16::MAX as u16) + 1 };
2092 let err = partition_index_to_i16(p).unwrap_err();
2093 match err {
2094 EngineError::Validation { kind, detail } => {
2095 assert_eq!(kind, ValidationKind::InvalidInput);
2096 assert!(detail.contains("exceeds i16 range"), "unexpected detail: {detail}");
2097 }
2098 other => panic!("expected Validation error, got {other:?}"),
2099 }
2100 }
2101
2102 #[test]
2103 fn rejects_u16_max() {
2104 let p = Partition { family: PartitionFamily::Quota, index: u16::MAX };
2105 assert!(matches!(
2106 partition_index_to_i16(p),
2107 Err(EngineError::Validation { kind: ValidationKind::InvalidInput, .. })
2108 ));
2109 }
2110}