1#![allow(clippy::result_large_err)]
20
21use std::sync::Arc;
22use std::time::Duration;
23
24use async_trait::async_trait;
25
26use ff_core::backend::{
27 AppendFrameOutcome, BackendConfig, CancelFlowPolicy, CancelFlowWait, CapabilitySet,
28 ClaimPolicy, FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal,
29 PendingWaitpoint, ResumeToken, ResumeSignal, SummaryDocument, TailVisibility,
30 UsageDimensions,
31};
32#[cfg(feature = "core")]
33use ff_core::contracts::{
34 AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
35 ApplyDependencyToChildResult, ClaimResumedExecutionArgs, ClaimResumedExecutionResult,
36 CreateExecutionArgs, CreateExecutionResult, CreateFlowArgs, CreateFlowResult,
37 DeliverApprovalSignalArgs, DeliverSignalArgs, DeliverSignalResult, EdgeDependencyPolicy,
38 EdgeDirection, EdgeSnapshot,
39 ListExecutionsPage, ListFlowsPage, ListLanesPage, ListPendingWaitpointsArgs,
40 ListPendingWaitpointsResult, ListSuspendedPage, SetEdgeGroupPolicyResult,
41 StageDependencyEdgeArgs, StageDependencyEdgeResult,
42};
43#[cfg(feature = "core")]
44use ff_core::state::PublicState;
45use ff_core::contracts::{
46 CancelFlowResult, ExecutionContext, ExecutionSnapshot, FlowSnapshot, IssueReclaimGrantArgs,
47 IssueReclaimGrantOutcome, ReclaimExecutionArgs, ReclaimExecutionOutcome, ReportUsageResult,
48 RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretAllResult, SeedOutcome,
49 SeedWaitpointHmacSecretArgs, SuspendArgs, SuspendOutcome,
50};
51#[cfg(feature = "core")]
52use ff_core::contracts::ExecutionInfo;
53#[cfg(feature = "core")]
55use ff_core::contracts::{
56 CancelExecutionArgs, CancelExecutionResult, RevokeLeaseArgs, RevokeLeaseResult,
57};
58#[cfg(feature = "core")]
60use ff_core::contracts::{
61 CancelFlowArgs, CancelFlowHeader, ChangePriorityArgs, ChangePriorityResult,
62 ReplayExecutionArgs, ReplayExecutionResult,
63};
64#[cfg(feature = "core")]
66use ff_core::contracts::{
67 BudgetStatus, CreateBudgetArgs, CreateBudgetResult, CreateQuotaPolicyArgs,
68 CreateQuotaPolicyResult, ReportUsageAdminArgs, ResetBudgetArgs, ResetBudgetResult,
69};
70#[cfg(feature = "streaming")]
71use ff_core::contracts::{StreamCursor, StreamFrames};
72use ff_core::engine_backend::{EngineBackend, ExpirePhase};
73use ff_core::engine_error::EngineError;
74#[cfg(feature = "core")]
75use ff_core::partition::PartitionKey;
76use ff_core::partition::{Partition, PartitionConfig};
77#[cfg(feature = "streaming")]
78use ff_core::types::AttemptIndex;
79#[cfg(feature = "core")]
80use ff_core::types::EdgeId;
81use ff_core::types::{BudgetId, ExecutionId, FlowId, LaneId, LeaseFence, TimestampMs};
82pub use sqlx::PgPool;
86
87#[cfg(feature = "core")]
88mod admin;
89pub mod attempt;
90pub mod budget;
91pub mod claim_grant;
92pub mod completion;
93#[cfg(feature = "core")]
94pub mod dispatch;
95pub mod error;
96pub mod exec_core;
97pub mod flow;
98#[cfg(feature = "core")]
99pub mod flow_staging;
100pub mod handle_codec;
101mod lease_event;
102mod lease_event_subscribe;
103pub mod listener;
104pub mod migrate;
105#[cfg(feature = "core")]
106pub mod operator;
107#[cfg(feature = "core")]
108mod operator_event;
109pub mod pool;
110#[cfg(feature = "core")]
111pub mod reconcilers;
112#[cfg(feature = "core")]
113pub mod scanner_supervisor;
114#[cfg(feature = "core")]
115pub mod scheduler;
116pub mod signal;
117mod signal_delivery_subscribe;
118mod signal_event;
119#[cfg(feature = "streaming")]
120pub mod stream;
121pub mod suspend;
122pub mod suspend_ops;
123#[cfg(feature = "core")]
124pub(crate) mod typed_ops;
125pub mod version;
126#[cfg(feature = "core")]
127pub mod worker_registry;
128
129pub use completion::{PostgresCompletionStream, COMPLETION_CHANNEL};
130pub use error::{map_sqlx_error, PostgresTransportError};
131pub use listener::StreamNotifier;
132pub use migrate::{apply_migrations, MigrationError};
133#[cfg(feature = "core")]
134pub use scanner_supervisor::{PostgresScannerConfig, PostgresScannerHandle};
135pub use version::check_schema_version;
136
137pub use ff_core::backend::PostgresConnection;
143
144fn postgres_supports_base() -> ff_core::capability::Supports {
174 let mut s = ff_core::capability::Supports::none();
175
176 s.cancel_flow_wait_timeout = true;
178 s.cancel_flow_wait_indefinite = true;
179
180 s.rotate_waitpoint_hmac_secret_all = true;
182 s.seed_waitpoint_hmac_secret = true;
183
184 s.claim_for_worker = true;
186
187 s.subscribe_lease_history = true;
189 s.subscribe_completion = true;
190 s.subscribe_signal_delivery = true;
191 s.subscribe_instance_tags = false;
192
193 s.stream_durable_summary = true;
195 s.stream_best_effort_live = true;
196
197 s.prepare = true;
199
200 s.cancel_execution = true;
207 s.change_priority = true;
208 s.replay_execution = true;
209 s.revoke_lease = true;
210 s.read_execution_state = true;
211 s.read_execution_info = true;
212 s.get_execution_result = true;
213 s.budget_admin = true;
214 s.quota_admin = true;
215 s.list_pending_waitpoints = true;
216 s.cancel_flow_header = true;
217 s.ack_cancel_member = true;
218
219 s.register_worker = true;
221 s.heartbeat_worker = true;
222 s.mark_worker_dead = true;
223 s.list_expired_leases = true;
224 s.list_workers = true;
225
226 s.release_admission = true;
230 s.read_quota_policy_limits = true;
231
232 s
233}
234
235pub struct PostgresBackend {
236 #[allow(dead_code)] pool: PgPool,
238 #[allow(dead_code)]
239 partition_config: PartitionConfig,
240 #[allow(dead_code)]
241 metrics: Option<Arc<ff_observability::Metrics>>,
242 #[allow(dead_code)]
246 stream_notifier: Option<Arc<StreamNotifier>>,
247 #[cfg(feature = "core")]
253 scanner_handle: Option<Arc<scanner_supervisor::PostgresScannerHandle>>,
254}
255
256impl PostgresBackend {
257 pub async fn connect(config: BackendConfig) -> Result<Arc<dyn EngineBackend>, EngineError> {
272 let pool = pool::build_pool(&config).await?;
273 warn_if_max_locks_low(&pool).await;
274 let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
275 let backend = Self {
276 pool,
277 partition_config: PartitionConfig::default(),
278 metrics: None,
279 stream_notifier,
280 #[cfg(feature = "core")]
281 scanner_handle: None,
282 };
283 Ok(Arc::new(backend))
284 }
285
286 pub fn from_pool(pool: PgPool, partition_config: PartitionConfig) -> Arc<Self> {
292 let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
293 Arc::new(Self {
294 pool,
295 partition_config,
296 metrics: None,
297 stream_notifier,
298 #[cfg(feature = "core")]
299 scanner_handle: None,
300 })
301 }
302
303 pub async fn connect_with_metrics(
317 config: BackendConfig,
318 partition_config: PartitionConfig,
319 metrics: Arc<ff_observability::Metrics>,
320 ) -> Result<Arc<Self>, EngineError> {
321 let pool = pool::build_pool(&config).await?;
322 warn_if_max_locks_low(&pool).await;
323 let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
324 Ok(Arc::new(Self {
325 pool,
326 partition_config,
327 metrics: Some(metrics),
328 stream_notifier,
329 #[cfg(feature = "core")]
330 scanner_handle: None,
331 }))
332 }
333
334 #[cfg(feature = "core")]
341 pub fn with_scanners(
342 self: &mut Arc<Self>,
343 cfg: scanner_supervisor::PostgresScannerConfig,
344 ) -> bool {
345 let Some(inner) = Arc::get_mut(self) else {
346 return false;
347 };
348 let handle = scanner_supervisor::spawn_scanners(inner.pool.clone(), cfg);
349 inner.scanner_handle = Some(Arc::new(handle));
350 true
351 }
352
353 pub fn pool(&self) -> &PgPool {
358 &self.pool
359 }
360
361 #[cfg(feature = "core")]
374 #[tracing::instrument(name = "pg.create_execution", skip_all)]
375 pub async fn create_execution(
376 &self,
377 args: CreateExecutionArgs,
378 ) -> Result<ExecutionId, EngineError> {
379 exec_core::create_execution_impl(&self.pool, &self.partition_config, args).await
380 }
381
382 #[cfg(feature = "core")]
390 #[tracing::instrument(name = "pg.create_flow.inherent", skip_all)]
391 pub async fn create_flow(
392 &self,
393 args: &CreateFlowArgs,
394 ) -> Result<CreateFlowResult, EngineError> {
395 flow_staging::create_flow(&self.pool, &self.partition_config, args).await
396 }
397
398 #[cfg(feature = "core")]
399 #[tracing::instrument(name = "pg.add_execution_to_flow.inherent", skip_all)]
400 pub async fn add_execution_to_flow(
401 &self,
402 args: &AddExecutionToFlowArgs,
403 ) -> Result<AddExecutionToFlowResult, EngineError> {
404 flow_staging::add_execution_to_flow(&self.pool, &self.partition_config, args).await
405 }
406
407 #[cfg(feature = "core")]
408 #[tracing::instrument(name = "pg.stage_dependency_edge.inherent", skip_all)]
409 pub async fn stage_dependency_edge(
410 &self,
411 args: &StageDependencyEdgeArgs,
412 ) -> Result<StageDependencyEdgeResult, EngineError> {
413 flow_staging::stage_dependency_edge(&self.pool, &self.partition_config, args).await
414 }
415
416 #[cfg(feature = "core")]
417 #[tracing::instrument(name = "pg.apply_dependency_to_child.inherent", skip_all)]
418 pub async fn apply_dependency_to_child(
419 &self,
420 args: &ApplyDependencyToChildArgs,
421 ) -> Result<ApplyDependencyToChildResult, EngineError> {
422 flow_staging::apply_dependency_to_child(&self.pool, &self.partition_config, args).await
423 }
424}
425
426#[inline]
430#[cfg_attr(feature = "streaming", allow(dead_code))]
431fn unavailable<T>(op: &'static str) -> Result<T, EngineError> {
432 Err(EngineError::Unavailable { op })
433}
434
435#[async_trait]
436impl EngineBackend for PostgresBackend {
437 #[tracing::instrument(name = "pg.claim", skip_all)]
440 async fn claim(
441 &self,
442 lane: &LaneId,
443 capabilities: &CapabilitySet,
444 policy: ClaimPolicy,
445 ) -> Result<Option<Handle>, EngineError> {
446 attempt::claim(&self.pool, lane, capabilities, &policy).await
447 }
448
449 #[tracing::instrument(name = "pg.renew", skip_all)]
450 async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError> {
451 attempt::renew(&self.pool, handle).await
452 }
453
454 #[tracing::instrument(name = "pg.progress", skip_all)]
455 async fn progress(
456 &self,
457 handle: &Handle,
458 percent: Option<u8>,
459 message: Option<String>,
460 ) -> Result<(), EngineError> {
461 attempt::progress(&self.pool, handle, percent, message).await
462 }
463
464 #[tracing::instrument(name = "pg.append_frame", skip_all)]
465 async fn append_frame(
466 &self,
467 handle: &Handle,
468 frame: Frame,
469 ) -> Result<AppendFrameOutcome, EngineError> {
470 #[cfg(feature = "streaming")]
471 {
472 stream::append_frame(&self.pool, &self.partition_config, handle, frame).await
473 }
474 #[cfg(not(feature = "streaming"))]
475 {
476 let _ = (handle, frame);
477 unavailable("pg.append_frame")
478 }
479 }
480
481 #[tracing::instrument(name = "pg.complete", skip_all)]
482 async fn complete(
483 &self,
484 handle: &Handle,
485 payload: Option<Vec<u8>>,
486 ) -> Result<(), EngineError> {
487 attempt::complete(&self.pool, handle, payload).await
488 }
489
490 #[tracing::instrument(name = "pg.fail", skip_all)]
491 async fn fail(
492 &self,
493 handle: &Handle,
494 reason: FailureReason,
495 classification: FailureClass,
496 ) -> Result<FailOutcome, EngineError> {
497 attempt::fail(&self.pool, handle, reason, classification).await
498 }
499
500 #[tracing::instrument(name = "pg.cancel", skip_all)]
501 async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError> {
502 let payload = handle_codec::decode_handle(handle)?;
503 exec_core::cancel_impl(
504 &self.pool,
505 &self.partition_config,
506 &payload.execution_id,
507 reason,
508 )
509 .await
510 }
511
512 #[tracing::instrument(name = "pg.suspend", skip_all)]
513 async fn suspend(
514 &self,
515 handle: &Handle,
516 args: SuspendArgs,
517 ) -> Result<SuspendOutcome, EngineError> {
518 suspend_ops::suspend_impl(&self.pool, &self.partition_config, handle, args).await
519 }
520
521 #[tracing::instrument(name = "pg.suspend_by_triple", skip_all)]
522 async fn suspend_by_triple(
523 &self,
524 exec_id: ExecutionId,
525 triple: LeaseFence,
526 args: SuspendArgs,
527 ) -> Result<SuspendOutcome, EngineError> {
528 suspend_ops::suspend_by_triple_impl(
529 &self.pool,
530 &self.partition_config,
531 exec_id,
532 triple,
533 args,
534 )
535 .await
536 }
537
538 #[tracing::instrument(name = "pg.create_waitpoint", skip_all)]
539 async fn create_waitpoint(
540 &self,
541 handle: &Handle,
542 waitpoint_key: &str,
543 expires_in: Duration,
544 ) -> Result<PendingWaitpoint, EngineError> {
545 suspend_ops::create_waitpoint_impl(&self.pool, handle, waitpoint_key, expires_in).await
546 }
547
548 #[cfg(feature = "core")]
549 #[tracing::instrument(name = "pg.read_waitpoint_token", skip_all)]
550 async fn read_waitpoint_token(
551 &self,
552 partition: PartitionKey,
553 waitpoint_id: &ff_core::types::WaitpointId,
554 ) -> Result<Option<String>, EngineError> {
555 suspend_ops::read_waitpoint_token_impl(&self.pool, &partition, waitpoint_id).await
556 }
557
558 #[tracing::instrument(name = "pg.observe_signals", skip_all)]
559 async fn observe_signals(
560 &self,
561 handle: &Handle,
562 ) -> Result<Vec<ResumeSignal>, EngineError> {
563 suspend_ops::observe_signals_impl(&self.pool, handle).await
564 }
565
566 #[tracing::instrument(name = "pg.claim_from_resume_grant", skip_all)]
567 async fn claim_from_resume_grant(
568 &self,
569 token: ResumeToken,
570 ) -> Result<Option<Handle>, EngineError> {
571 attempt::claim_from_resume_grant(&self.pool, token).await
572 }
573
574 #[tracing::instrument(name = "pg.issue_reclaim_grant", skip_all)]
577 async fn issue_reclaim_grant(
578 &self,
579 args: IssueReclaimGrantArgs,
580 ) -> Result<IssueReclaimGrantOutcome, EngineError> {
581 claim_grant::issue_reclaim_grant_impl(&self.pool, args).await
582 }
583
584 #[tracing::instrument(name = "pg.reclaim_execution", skip_all)]
585 async fn reclaim_execution(
586 &self,
587 args: ReclaimExecutionArgs,
588 ) -> Result<ReclaimExecutionOutcome, EngineError> {
589 claim_grant::reclaim_execution_impl(&self.pool, args).await
590 }
591
592 #[tracing::instrument(name = "pg.delay", skip_all)]
593 async fn delay(
594 &self,
595 handle: &Handle,
596 delay_until: TimestampMs,
597 ) -> Result<(), EngineError> {
598 attempt::delay(&self.pool, handle, delay_until).await
599 }
600
601 #[tracing::instrument(name = "pg.wait_children", skip_all)]
602 async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError> {
603 attempt::wait_children(&self.pool, handle).await
604 }
605
606 #[tracing::instrument(name = "pg.describe_execution", skip_all)]
609 async fn describe_execution(
610 &self,
611 id: &ExecutionId,
612 ) -> Result<Option<ExecutionSnapshot>, EngineError> {
613 exec_core::describe_execution_impl(&self.pool, &self.partition_config, id).await
614 }
615
616 #[tracing::instrument(name = "pg.read_execution_context", skip_all)]
617 async fn read_execution_context(
618 &self,
619 execution_id: &ExecutionId,
620 ) -> Result<ExecutionContext, EngineError> {
621 exec_core::read_execution_context_impl(&self.pool, &self.partition_config, execution_id)
622 .await
623 }
624
625 #[tracing::instrument(name = "pg.read_current_attempt_index", skip_all)]
626 async fn read_current_attempt_index(
627 &self,
628 execution_id: &ExecutionId,
629 ) -> Result<ff_core::types::AttemptIndex, EngineError> {
630 exec_core::read_current_attempt_index_impl(
631 &self.pool,
632 &self.partition_config,
633 execution_id,
634 )
635 .await
636 }
637
638 #[tracing::instrument(name = "pg.read_total_attempt_count", skip_all)]
639 async fn read_total_attempt_count(
640 &self,
641 execution_id: &ExecutionId,
642 ) -> Result<ff_core::types::AttemptIndex, EngineError> {
643 exec_core::read_total_attempt_count_impl(
644 &self.pool,
645 &self.partition_config,
646 execution_id,
647 )
648 .await
649 }
650
651 #[tracing::instrument(name = "pg.describe_flow", skip_all)]
652 async fn describe_flow(
653 &self,
654 id: &FlowId,
655 ) -> Result<Option<FlowSnapshot>, EngineError> {
656 flow::describe_flow(&self.pool, &self.partition_config, id).await
657 }
658
659 #[tracing::instrument(name = "pg.set_execution_tag", skip_all)]
660 async fn set_execution_tag(
661 &self,
662 execution_id: &ExecutionId,
663 key: &str,
664 value: &str,
665 ) -> Result<(), EngineError> {
666 ff_core::engine_backend::validate_tag_key(key)?;
667 exec_core::set_execution_tag_impl(&self.pool, execution_id, key, value).await
668 }
669
670 #[tracing::instrument(name = "pg.set_flow_tag", skip_all)]
671 async fn set_flow_tag(
672 &self,
673 flow_id: &FlowId,
674 key: &str,
675 value: &str,
676 ) -> Result<(), EngineError> {
677 ff_core::engine_backend::validate_tag_key(key)?;
678 flow::set_flow_tag_impl(&self.pool, &self.partition_config, flow_id, key, value).await
679 }
680
681 #[tracing::instrument(name = "pg.get_execution_tag", skip_all)]
682 async fn get_execution_tag(
683 &self,
684 execution_id: &ExecutionId,
685 key: &str,
686 ) -> Result<Option<String>, EngineError> {
687 ff_core::engine_backend::validate_tag_key(key)?;
688 exec_core::get_execution_tag_impl(&self.pool, execution_id, key).await
689 }
690
691 #[tracing::instrument(name = "pg.get_flow_tag", skip_all)]
692 async fn get_flow_tag(
693 &self,
694 flow_id: &FlowId,
695 key: &str,
696 ) -> Result<Option<String>, EngineError> {
697 ff_core::engine_backend::validate_tag_key(key)?;
698 flow::get_flow_tag_impl(&self.pool, &self.partition_config, flow_id, key).await
699 }
700
701 #[tracing::instrument(name = "pg.get_execution_namespace", skip_all)]
702 async fn get_execution_namespace(
703 &self,
704 execution_id: &ExecutionId,
705 ) -> Result<Option<String>, EngineError> {
706 exec_core::get_execution_namespace_impl(&self.pool, execution_id).await
707 }
708
709 #[cfg(feature = "core")]
710 #[tracing::instrument(name = "pg.list_edges", skip_all)]
711 async fn list_edges(
712 &self,
713 flow_id: &FlowId,
714 direction: EdgeDirection,
715 ) -> Result<Vec<EdgeSnapshot>, EngineError> {
716 flow::list_edges(&self.pool, &self.partition_config, flow_id, direction).await
717 }
718
719 #[cfg(feature = "core")]
720 #[tracing::instrument(name = "pg.describe_edge", skip_all)]
721 async fn describe_edge(
722 &self,
723 flow_id: &FlowId,
724 edge_id: &EdgeId,
725 ) -> Result<Option<EdgeSnapshot>, EngineError> {
726 flow::describe_edge(&self.pool, &self.partition_config, flow_id, edge_id).await
727 }
728
729 #[cfg(feature = "core")]
730 #[tracing::instrument(name = "pg.resolve_execution_flow_id", skip_all)]
731 async fn resolve_execution_flow_id(
732 &self,
733 eid: &ExecutionId,
734 ) -> Result<Option<FlowId>, EngineError> {
735 exec_core::resolve_execution_flow_id_impl(&self.pool, &self.partition_config, eid).await
736 }
737
738 #[cfg(feature = "core")]
739 #[tracing::instrument(name = "pg.list_flows", skip_all)]
740 async fn list_flows(
741 &self,
742 partition: PartitionKey,
743 cursor: Option<FlowId>,
744 limit: usize,
745 ) -> Result<ListFlowsPage, EngineError> {
746 flow::list_flows(&self.pool, partition, cursor, limit).await
747 }
748
749 #[cfg(feature = "core")]
750 #[tracing::instrument(name = "pg.list_lanes", skip_all)]
751 async fn list_lanes(
752 &self,
753 cursor: Option<LaneId>,
754 limit: usize,
755 ) -> Result<ListLanesPage, EngineError> {
756 admin::list_lanes_impl(&self.pool, cursor, limit).await
757 }
758
759 #[cfg(feature = "core")]
760 #[tracing::instrument(name = "pg.list_suspended", skip_all)]
761 async fn list_suspended(
762 &self,
763 partition: PartitionKey,
764 cursor: Option<ExecutionId>,
765 limit: usize,
766 ) -> Result<ListSuspendedPage, EngineError> {
767 admin::list_suspended_impl(&self.pool, partition, cursor, limit).await
768 }
769
770 #[cfg(feature = "core")]
771 #[tracing::instrument(name = "pg.list_executions", skip_all)]
772 async fn list_executions(
773 &self,
774 partition: PartitionKey,
775 cursor: Option<ExecutionId>,
776 limit: usize,
777 ) -> Result<ListExecutionsPage, EngineError> {
778 exec_core::list_executions_impl(
779 &self.pool,
780 &self.partition_config,
781 partition,
782 cursor,
783 limit,
784 )
785 .await
786 }
787
788 #[cfg(feature = "core")]
791 #[tracing::instrument(name = "pg.deliver_signal", skip_all)]
792 async fn deliver_signal(
793 &self,
794 args: DeliverSignalArgs,
795 ) -> Result<DeliverSignalResult, EngineError> {
796 suspend_ops::deliver_signal_impl(&self.pool, &self.partition_config, args).await
797 }
798
799 #[cfg(feature = "core")]
800 #[tracing::instrument(name = "pg.deliver_approval_signal", skip_all)]
801 async fn deliver_approval_signal(
802 &self,
803 args: DeliverApprovalSignalArgs,
804 ) -> Result<DeliverSignalResult, EngineError> {
805 suspend_ops::deliver_approval_signal_impl(&self.pool, &self.partition_config, args).await
806 }
807
808 #[cfg(feature = "core")]
809 #[tracing::instrument(name = "pg.claim_resumed_execution", skip_all)]
810 async fn claim_resumed_execution(
811 &self,
812 args: ClaimResumedExecutionArgs,
813 ) -> Result<ClaimResumedExecutionResult, EngineError> {
814 suspend_ops::claim_resumed_execution_impl(&self.pool, &self.partition_config, args).await
815 }
816
817 #[cfg(feature = "core")]
827 #[tracing::instrument(name = "pg.read_execution_state", skip_all)]
828 async fn read_execution_state(
829 &self,
830 id: &ExecutionId,
831 ) -> Result<Option<PublicState>, EngineError> {
832 exec_core::read_execution_state_impl(&self.pool, &self.partition_config, id).await
833 }
834
835 #[cfg(feature = "core")]
836 #[tracing::instrument(name = "pg.read_execution_info", skip_all)]
837 async fn read_execution_info(
838 &self,
839 id: &ExecutionId,
840 ) -> Result<Option<ExecutionInfo>, EngineError> {
841 exec_core::read_execution_info_impl(&self.pool, &self.partition_config, id).await
842 }
843
844 #[tracing::instrument(name = "pg.get_execution_result", skip_all)]
845 async fn get_execution_result(
846 &self,
847 id: &ExecutionId,
848 ) -> Result<Option<Vec<u8>>, EngineError> {
849 exec_core::get_execution_result_impl(&self.pool, &self.partition_config, id).await
850 }
851
852 #[cfg(feature = "core")]
861 #[tracing::instrument(name = "pg.list_pending_waitpoints", skip_all)]
862 async fn list_pending_waitpoints(
863 &self,
864 args: ListPendingWaitpointsArgs,
865 ) -> Result<ListPendingWaitpointsResult, EngineError> {
866 suspend_ops::list_pending_waitpoints_impl(&self.pool, args).await
867 }
868
869 #[cfg(feature = "core")]
877 #[tracing::instrument(name = "pg.cancel_execution", skip_all)]
878 async fn cancel_execution(
879 &self,
880 args: CancelExecutionArgs,
881 ) -> Result<CancelExecutionResult, EngineError> {
882 operator::cancel_execution_impl(&self.pool, args).await
883 }
884
885 #[cfg(feature = "core")]
886 #[tracing::instrument(name = "pg.revoke_lease", skip_all)]
887 async fn revoke_lease(
888 &self,
889 args: RevokeLeaseArgs,
890 ) -> Result<RevokeLeaseResult, EngineError> {
891 operator::revoke_lease_impl(&self.pool, args).await
892 }
893
894 #[cfg(feature = "core")]
904 #[tracing::instrument(name = "pg.change_priority", skip_all)]
905 async fn change_priority(
906 &self,
907 args: ChangePriorityArgs,
908 ) -> Result<ChangePriorityResult, EngineError> {
909 operator::change_priority_impl(&self.pool, args).await
910 }
911
912 #[cfg(feature = "core")]
913 #[tracing::instrument(name = "pg.replay_execution", skip_all)]
914 async fn replay_execution(
915 &self,
916 args: ReplayExecutionArgs,
917 ) -> Result<ReplayExecutionResult, EngineError> {
918 operator::replay_execution_impl(&self.pool, args).await
919 }
920
921 #[cfg(feature = "core")]
922 #[tracing::instrument(name = "pg.cancel_flow_header", skip_all)]
923 async fn cancel_flow_header(
924 &self,
925 args: CancelFlowArgs,
926 ) -> Result<CancelFlowHeader, EngineError> {
927 operator::cancel_flow_header_impl(&self.pool, &self.partition_config, args).await
928 }
929
930 #[cfg(feature = "core")]
931 #[tracing::instrument(name = "pg.ack_cancel_member", skip_all)]
932 async fn ack_cancel_member(
933 &self,
934 flow_id: &FlowId,
935 execution_id: &ExecutionId,
936 ) -> Result<(), EngineError> {
937 operator::ack_cancel_member_impl(
938 &self.pool,
939 &self.partition_config,
940 flow_id.clone(),
941 execution_id.clone(),
942 )
943 .await
944 }
945
946 #[cfg(feature = "core")]
957 #[tracing::instrument(name = "pg.create_execution.trait", skip_all)]
958 async fn create_execution(
959 &self,
960 args: CreateExecutionArgs,
961 ) -> Result<CreateExecutionResult, EngineError> {
962 let eid = args.execution_id.clone();
963 exec_core::create_execution_impl(&self.pool, &self.partition_config, args).await?;
964 Ok(CreateExecutionResult::Created {
965 execution_id: eid,
966 public_state: PublicState::Waiting,
967 })
968 }
969
970 #[cfg(feature = "core")]
971 #[tracing::instrument(name = "pg.create_flow", skip_all)]
972 async fn create_flow(
973 &self,
974 args: CreateFlowArgs,
975 ) -> Result<CreateFlowResult, EngineError> {
976 flow_staging::create_flow(&self.pool, &self.partition_config, &args).await
977 }
978
979 #[cfg(feature = "core")]
980 #[tracing::instrument(name = "pg.add_execution_to_flow", skip_all)]
981 async fn add_execution_to_flow(
982 &self,
983 args: AddExecutionToFlowArgs,
984 ) -> Result<AddExecutionToFlowResult, EngineError> {
985 flow_staging::add_execution_to_flow(&self.pool, &self.partition_config, &args).await
986 }
987
988 #[cfg(feature = "core")]
989 #[tracing::instrument(name = "pg.stage_dependency_edge", skip_all)]
990 async fn stage_dependency_edge(
991 &self,
992 args: StageDependencyEdgeArgs,
993 ) -> Result<StageDependencyEdgeResult, EngineError> {
994 flow_staging::stage_dependency_edge(&self.pool, &self.partition_config, &args).await
995 }
996
997 #[cfg(feature = "core")]
998 #[tracing::instrument(name = "pg.apply_dependency_to_child", skip_all)]
999 async fn apply_dependency_to_child(
1000 &self,
1001 args: ApplyDependencyToChildArgs,
1002 ) -> Result<ApplyDependencyToChildResult, EngineError> {
1003 flow_staging::apply_dependency_to_child(&self.pool, &self.partition_config, &args).await
1004 }
1005
1006 #[cfg(feature = "core")]
1012 async fn cascade_completion(
1013 &self,
1014 payload: &ff_core::backend::CompletionPayload,
1015 ) -> Result<ff_core::contracts::CascadeOutcome, EngineError> {
1016 let event_id = match resolve_event_id(&self.pool, payload).await {
1017 Some(id) => id,
1018 None => {
1019 tracing::warn!(
1022 execution_id = %payload.execution_id,
1023 produced_at_ms = payload.produced_at_ms.0,
1024 "pg.cascade_completion: could not resolve event_id; reconciler will claim"
1025 );
1026 return Ok(ff_core::contracts::CascadeOutcome::async_dispatched(0));
1027 }
1028 };
1029 let outcome = crate::dispatch::dispatch_completion(&self.pool, event_id).await?;
1030 let advanced = match outcome {
1031 crate::dispatch::DispatchOutcome::NoOp => 0,
1032 crate::dispatch::DispatchOutcome::Advanced(n) => n,
1033 };
1034 Ok(ff_core::contracts::CascadeOutcome::async_dispatched(advanced))
1035 }
1036
1037 fn backend_label(&self) -> &'static str {
1038 "postgres"
1039 }
1040
1041 fn capabilities(&self) -> ff_core::capability::Capabilities {
1049 ff_core::capability::Capabilities::new(
1050 ff_core::capability::BackendIdentity::new(
1051 "postgres",
1052 ff_core::capability::Version::new(0, 11, 0),
1053 "E-shipped",
1054 ),
1055 postgres_supports_base(),
1056 )
1057 }
1058
1059 async fn prepare(
1067 &self,
1068 ) -> Result<ff_core::backend::PrepareOutcome, EngineError> {
1069 Ok(ff_core::backend::PrepareOutcome::NoOp)
1070 }
1071
1072 #[cfg(feature = "core")]
1080 #[tracing::instrument(name = "pg.claim_for_worker", skip_all)]
1081 async fn claim_for_worker(
1082 &self,
1083 args: ff_core::contracts::ClaimForWorkerArgs,
1084 ) -> Result<ff_core::contracts::ClaimForWorkerOutcome, EngineError> {
1085 let sched = scheduler::PostgresScheduler::new(self.pool.clone());
1086 let grant_opt = sched
1087 .claim_for_worker(
1088 &args.lane_id,
1089 &args.worker_id,
1090 &args.worker_instance_id,
1091 &args.worker_capabilities,
1092 args.grant_ttl_ms,
1093 )
1094 .await?;
1095 Ok(match grant_opt {
1096 Some(g) => ff_core::contracts::ClaimForWorkerOutcome::granted(g),
1097 None => ff_core::contracts::ClaimForWorkerOutcome::no_work(),
1098 })
1099 }
1100
1101 async fn ping(&self) -> Result<(), EngineError> {
1102 let _ = sqlx::query_scalar::<_, i32>("SELECT 1")
1106 .fetch_one(&self.pool)
1107 .await
1108 .map_err(error::map_sqlx_error)?;
1109 Ok(())
1110 }
1111
1112 async fn shutdown_prepare(&self, grace: Duration) -> Result<(), EngineError> {
1117 #[cfg(feature = "core")]
1118 if let Some(handle) = self.scanner_handle.as_ref() {
1119 let timed_out = handle.shutdown(grace).await;
1120 if timed_out > 0 {
1121 tracing::warn!(
1122 timed_out,
1123 ?grace,
1124 "postgres scanner supervisor exceeded grace on shutdown"
1125 );
1126 }
1127 }
1128 Ok(())
1129 }
1130
1131 #[tracing::instrument(name = "pg.cancel_flow", skip_all)]
1132 async fn cancel_flow(
1133 &self,
1134 id: &FlowId,
1135 policy: CancelFlowPolicy,
1136 wait: CancelFlowWait,
1137 ) -> Result<CancelFlowResult, EngineError> {
1138 let result = flow::cancel_flow(&self.pool, &self.partition_config, id, policy).await?;
1139 if let Some(deadline) = ff_core::engine_backend::cancel_flow_wait_deadline(wait) {
1140 ff_core::engine_backend::wait_for_flow_cancellation(self, id, deadline).await?;
1141 }
1142 Ok(result)
1143 }
1144
1145 #[cfg(feature = "core")]
1146 #[tracing::instrument(name = "pg.set_edge_group_policy", skip_all)]
1147 async fn set_edge_group_policy(
1148 &self,
1149 flow_id: &FlowId,
1150 downstream_execution_id: &ExecutionId,
1151 policy: EdgeDependencyPolicy,
1152 ) -> Result<SetEdgeGroupPolicyResult, EngineError> {
1153 flow::set_edge_group_policy(
1154 &self.pool,
1155 &self.partition_config,
1156 flow_id,
1157 downstream_execution_id,
1158 policy,
1159 )
1160 .await
1161 }
1162
1163 #[tracing::instrument(name = "pg.report_usage", skip_all)]
1166 async fn report_usage(
1167 &self,
1168 _handle: &Handle,
1169 budget: &BudgetId,
1170 dimensions: UsageDimensions,
1171 ) -> Result<ReportUsageResult, EngineError> {
1172 budget::report_usage_impl(&self.pool, &self.partition_config, budget, dimensions).await
1173 }
1174
1175 #[cfg(feature = "core")]
1183 #[tracing::instrument(name = "pg.create_budget", skip_all)]
1184 async fn create_budget(
1185 &self,
1186 args: CreateBudgetArgs,
1187 ) -> Result<CreateBudgetResult, EngineError> {
1188 budget::create_budget_impl(&self.pool, &self.partition_config, args).await
1189 }
1190
1191 #[cfg(feature = "core")]
1192 #[tracing::instrument(name = "pg.reset_budget", skip_all)]
1193 async fn reset_budget(
1194 &self,
1195 args: ResetBudgetArgs,
1196 ) -> Result<ResetBudgetResult, EngineError> {
1197 budget::reset_budget_impl(&self.pool, &self.partition_config, args).await
1198 }
1199
1200 #[cfg(feature = "core")]
1201 #[tracing::instrument(name = "pg.create_quota_policy", skip_all)]
1202 async fn create_quota_policy(
1203 &self,
1204 args: CreateQuotaPolicyArgs,
1205 ) -> Result<CreateQuotaPolicyResult, EngineError> {
1206 budget::create_quota_policy_impl(&self.pool, &self.partition_config, args).await
1207 }
1208
1209 #[cfg(feature = "core")]
1210 #[tracing::instrument(name = "pg.get_budget_status", skip_all)]
1211 async fn get_budget_status(
1212 &self,
1213 id: &BudgetId,
1214 ) -> Result<BudgetStatus, EngineError> {
1215 budget::get_budget_status_impl(&self.pool, &self.partition_config, id).await
1216 }
1217
1218 #[cfg(feature = "core")]
1219 #[tracing::instrument(name = "pg.report_usage_admin", skip_all)]
1220 async fn report_usage_admin(
1221 &self,
1222 budget_id: &BudgetId,
1223 args: ReportUsageAdminArgs,
1224 ) -> Result<ReportUsageResult, EngineError> {
1225 budget::report_usage_admin_impl(&self.pool, &self.partition_config, budget_id, args).await
1226 }
1227
1228 #[cfg(feature = "core")]
1231 #[tracing::instrument(name = "pg.record_spend", skip_all)]
1232 async fn record_spend(
1233 &self,
1234 args: ff_core::contracts::RecordSpendArgs,
1235 ) -> Result<ReportUsageResult, EngineError> {
1236 budget::record_spend_impl(&self.pool, &self.partition_config, args).await
1237 }
1238
1239 #[cfg(feature = "core")]
1240 #[tracing::instrument(name = "pg.release_budget", skip_all)]
1241 async fn release_budget(
1242 &self,
1243 args: ff_core::contracts::ReleaseBudgetArgs,
1244 ) -> Result<(), EngineError> {
1245 budget::release_budget_impl(&self.pool, &self.partition_config, args).await
1246 }
1247
1248 #[cfg(feature = "core")]
1249 #[tracing::instrument(name = "pg.release_admission", skip_all)]
1250 async fn release_admission(
1251 &self,
1252 args: ff_core::contracts::ReleaseAdmissionArgs,
1253 ) -> Result<ff_core::contracts::ReleaseAdmissionResult, EngineError> {
1254 crate::typed_ops::release_admission(&self.pool, &self.partition_config, args).await
1255 }
1256
1257 #[cfg(feature = "core")]
1258 #[tracing::instrument(name = "pg.read_quota_policy_limits", skip_all)]
1259 async fn read_quota_policy_limits(
1260 &self,
1261 quota_policy_id: &ff_core::types::QuotaPolicyId,
1262 ) -> Result<Option<ff_core::contracts::QuotaPolicyLimits>, EngineError> {
1263 crate::typed_ops::read_quota_policy_limits(
1264 &self.pool,
1265 &self.partition_config,
1266 quota_policy_id,
1267 )
1268 .await
1269 }
1270
1271 #[tracing::instrument(name = "pg.rotate_waitpoint_hmac_secret_all", skip_all)]
1278 async fn rotate_waitpoint_hmac_secret_all(
1279 &self,
1280 args: RotateWaitpointHmacSecretAllArgs,
1281 ) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
1282 let now_ms = std::time::SystemTime::now()
1287 .duration_since(std::time::UNIX_EPOCH)
1288 .map(|d| d.as_millis() as i64)
1289 .unwrap_or(0);
1290 signal::rotate_waitpoint_hmac_secret_all_impl(&self.pool, args, now_ms).await
1291 }
1292
1293 #[tracing::instrument(name = "pg.seed_waitpoint_hmac_secret", skip_all)]
1294 async fn seed_waitpoint_hmac_secret(
1295 &self,
1296 args: SeedWaitpointHmacSecretArgs,
1297 ) -> Result<SeedOutcome, EngineError> {
1298 let now_ms = std::time::SystemTime::now()
1302 .duration_since(std::time::UNIX_EPOCH)
1303 .map(|d| d.as_millis() as i64)
1304 .unwrap_or(0);
1305 signal::seed_waitpoint_hmac_secret_impl(&self.pool, args, now_ms).await
1306 }
1307
1308 #[cfg(feature = "streaming")]
1311 #[tracing::instrument(name = "pg.read_stream", skip_all)]
1312 async fn read_stream(
1313 &self,
1314 execution_id: &ExecutionId,
1315 attempt_index: AttemptIndex,
1316 from: StreamCursor,
1317 to: StreamCursor,
1318 count_limit: u64,
1319 ) -> Result<StreamFrames, EngineError> {
1320 stream::read_stream(&self.pool, execution_id, attempt_index, from, to, count_limit).await
1321 }
1322
1323 #[cfg(feature = "streaming")]
1324 #[tracing::instrument(name = "pg.tail_stream", skip_all)]
1325 async fn tail_stream(
1326 &self,
1327 execution_id: &ExecutionId,
1328 attempt_index: AttemptIndex,
1329 after: StreamCursor,
1330 block_ms: u64,
1331 count_limit: u64,
1332 visibility: TailVisibility,
1333 ) -> Result<StreamFrames, EngineError> {
1334 let notifier = self
1335 .stream_notifier
1336 .as_ref()
1337 .ok_or(EngineError::Unavailable {
1338 op: "pg.tail_stream (notifier not initialised)",
1339 })?;
1340 stream::tail_stream(
1341 &self.pool,
1342 notifier,
1343 execution_id,
1344 attempt_index,
1345 after,
1346 block_ms,
1347 count_limit,
1348 visibility,
1349 )
1350 .await
1351 }
1352
1353 #[cfg(feature = "streaming")]
1354 #[tracing::instrument(name = "pg.read_summary", skip_all)]
1355 async fn read_summary(
1356 &self,
1357 execution_id: &ExecutionId,
1358 attempt_index: AttemptIndex,
1359 ) -> Result<Option<SummaryDocument>, EngineError> {
1360 stream::read_summary(&self.pool, execution_id, attempt_index).await
1361 }
1362
1363 #[tracing::instrument(name = "pg.subscribe_completion", skip_all)]
1377 async fn subscribe_completion(
1378 &self,
1379 _cursor: ff_core::stream_subscribe::StreamCursor,
1380 filter: &ff_core::backend::ScannerFilter,
1381 ) -> Result<ff_core::stream_events::CompletionSubscription, EngineError> {
1382 use ff_core::stream_events::{CompletionEvent, CompletionOutcome};
1383 use ff_core::stream_subscribe::encode_postgres_event_cursor;
1384 use futures_core::Stream;
1385 use std::pin::Pin;
1386 use std::task::{Context, Poll};
1387
1388 let inner = if filter.is_noop() {
1395 ff_core::completion_backend::CompletionBackend::subscribe_completions(self).await?
1396 } else {
1397 ff_core::completion_backend::CompletionBackend::subscribe_completions_filtered(
1398 self, filter,
1399 )
1400 .await?
1401 };
1402
1403 struct Adapter {
1404 inner: ff_core::completion_backend::CompletionStream,
1405 }
1406
1407 impl Stream for Adapter {
1408 type Item = Result<CompletionEvent, EngineError>;
1409 fn poll_next(
1410 mut self: Pin<&mut Self>,
1411 cx: &mut Context<'_>,
1412 ) -> Poll<Option<Self::Item>> {
1413 match Pin::new(&mut self.inner).poll_next(cx) {
1414 Poll::Pending => Poll::Pending,
1415 Poll::Ready(None) => Poll::Ready(None),
1416 Poll::Ready(Some(payload)) => {
1417 let cursor = encode_postgres_event_cursor(0);
1422 let event = CompletionEvent::new(
1423 cursor,
1424 payload.execution_id.clone(),
1425 CompletionOutcome::from_wire(&payload.outcome),
1426 payload.produced_at_ms,
1427 );
1428 Poll::Ready(Some(Ok(event)))
1429 }
1430 }
1431 }
1432 }
1433
1434 Ok(Box::pin(Adapter { inner }))
1435 }
1436
1437 #[tracing::instrument(name = "pg.subscribe_lease_history", skip_all)]
1451 async fn subscribe_lease_history(
1452 &self,
1453 cursor: ff_core::stream_subscribe::StreamCursor,
1454 filter: &ff_core::backend::ScannerFilter,
1455 ) -> Result<ff_core::stream_events::LeaseHistorySubscription, EngineError> {
1456 lease_event_subscribe::subscribe(&self.pool, 0, cursor, filter.clone()).await
1457 }
1458
1459 #[tracing::instrument(name = "pg.subscribe_signal_delivery", skip_all)]
1469 async fn subscribe_signal_delivery(
1470 &self,
1471 cursor: ff_core::stream_subscribe::StreamCursor,
1472 filter: &ff_core::backend::ScannerFilter,
1473 ) -> Result<ff_core::stream_events::SignalDeliverySubscription, EngineError> {
1474 signal_delivery_subscribe::subscribe(&self.pool, 0, cursor, filter.clone()).await
1475 }
1476
1477 #[cfg(feature = "core")]
1499 async fn mark_lease_expired_if_due(
1500 &self,
1501 partition: Partition,
1502 execution_id: &ExecutionId,
1503 ) -> Result<(), EngineError> {
1504 let (_pk_from_eid, exec_uuid) = attempt::split_exec_id(execution_id)?;
1505 let partition_key = partition_index_to_i16(partition)?;
1506 reconcilers::lease_expiry::release_for_execution(&self.pool, partition_key, exec_uuid)
1507 .await
1508 }
1509
1510 #[cfg(feature = "core")]
1511 async fn promote_delayed(
1512 &self,
1513 partition: Partition,
1514 _lane: &LaneId,
1515 execution_id: &ExecutionId,
1516 now_ms: TimestampMs,
1517 ) -> Result<(), EngineError> {
1518 let (_pk_from_eid, exec_uuid) = attempt::split_exec_id(execution_id)?;
1519 let partition_key = partition_index_to_i16(partition)?;
1520 reconcilers::delayed_promoter::promote_for_execution(
1526 &self.pool,
1527 partition_key,
1528 exec_uuid,
1529 now_ms.0,
1530 )
1531 .await
1532 }
1533
1534 #[cfg(feature = "core")]
1535 async fn close_waitpoint(
1536 &self,
1537 partition: Partition,
1538 _execution_id: &ExecutionId,
1539 waitpoint_id: &str,
1540 now_ms: TimestampMs,
1541 ) -> Result<(), EngineError> {
1542 let partition_key = partition_index_to_i16(partition)?;
1548 let waitpoint_uuid = uuid::Uuid::parse_str(waitpoint_id).map_err(|e| {
1549 EngineError::Validation {
1550 kind: ff_core::engine_error::ValidationKind::InvalidInput,
1551 detail: format!("waitpoint_id not a UUID: {e}"),
1552 }
1553 })?;
1554 reconcilers::pending_wp_expiry::close_for_execution(
1555 &self.pool,
1556 partition_key,
1557 waitpoint_uuid,
1558 now_ms.0,
1559 )
1560 .await
1561 }
1562
1563 #[cfg(feature = "core")]
1564 async fn expire_execution(
1565 &self,
1566 partition: Partition,
1567 execution_id: &ExecutionId,
1568 phase: ExpirePhase,
1569 now_ms: TimestampMs,
1570 ) -> Result<(), EngineError> {
1571 let (_pk_from_eid, exec_uuid) = attempt::split_exec_id(execution_id)?;
1572 let partition_key = partition_index_to_i16(partition)?;
1573 match phase {
1574 ExpirePhase::AttemptTimeout => {
1575 reconcilers::attempt_timeout::expire_for_execution(
1576 &self.pool,
1577 partition_key,
1578 exec_uuid,
1579 )
1580 .await
1581 }
1582 ExpirePhase::ExecutionDeadline => {
1583 reconcilers::execution_deadline::expire_for_execution(
1584 &self.pool,
1585 partition_key,
1586 exec_uuid,
1587 now_ms.0,
1588 )
1589 .await
1590 }
1591 }
1592 }
1593
1594 #[cfg(feature = "core")]
1595 async fn expire_suspension(
1596 &self,
1597 partition: Partition,
1598 execution_id: &ExecutionId,
1599 _now_ms: TimestampMs,
1600 ) -> Result<(), EngineError> {
1601 let (_pk_from_eid, exec_uuid) = attempt::split_exec_id(execution_id)?;
1602 let partition_key = partition_index_to_i16(partition)?;
1603 reconcilers::suspension_timeout::expire_for_execution(
1604 &self.pool,
1605 partition_key,
1606 exec_uuid,
1607 )
1608 .await
1609 }
1610
1611 #[cfg(feature = "core")]
1616 async fn project_flow_summary(
1617 &self,
1618 partition: Partition,
1619 flow_id: &FlowId,
1620 now_ms: TimestampMs,
1621 ) -> Result<bool, EngineError> {
1622 let partition_key = partition_index_to_i16(partition)?;
1623 let flow_uuid: sqlx::types::Uuid = flow_id.0;
1624 flow::project_flow_summary_impl(
1625 &self.pool,
1626 partition_key,
1627 flow_uuid,
1628 now_ms.0,
1629 )
1630 .await
1631 }
1632
1633 #[cfg(feature = "core")]
1638 async fn trim_retention(
1639 &self,
1640 partition: Partition,
1641 lane_id: &LaneId,
1642 retention_ms: u64,
1643 now_ms: TimestampMs,
1644 batch_size: u32,
1645 filter: &ff_core::backend::ScannerFilter,
1646 ) -> Result<u32, EngineError> {
1647 let partition_key = partition_index_to_i16(partition)?;
1648 exec_core::trim_retention_impl(
1649 &self.pool,
1650 partition_key,
1651 lane_id.as_str(),
1652 retention_ms,
1653 now_ms.0,
1654 batch_size,
1655 filter,
1656 )
1657 .await
1658 }
1659
1660 #[cfg(feature = "core")]
1663 async fn renew_lease(
1664 &self,
1665 args: ff_core::contracts::RenewLeaseArgs,
1666 ) -> Result<ff_core::contracts::RenewLeaseResult, EngineError> {
1667 crate::typed_ops::renew_lease(self.pool(), args).await
1668 }
1669
1670 #[cfg(feature = "core")]
1671 async fn complete_execution(
1672 &self,
1673 args: ff_core::contracts::CompleteExecutionArgs,
1674 ) -> Result<ff_core::contracts::CompleteExecutionResult, EngineError> {
1675 crate::typed_ops::complete_execution(self.pool(), args).await
1676 }
1677
1678 #[cfg(feature = "core")]
1679 async fn fail_execution(
1680 &self,
1681 args: ff_core::contracts::FailExecutionArgs,
1682 ) -> Result<ff_core::contracts::FailExecutionResult, EngineError> {
1683 crate::typed_ops::fail_execution(self.pool(), args).await
1684 }
1685
1686 #[cfg(feature = "core")]
1687 async fn resume_execution(
1688 &self,
1689 args: ff_core::contracts::ResumeExecutionArgs,
1690 ) -> Result<ff_core::contracts::ResumeExecutionResult, EngineError> {
1691 crate::typed_ops::resume_execution(self.pool(), args).await
1692 }
1693
1694 #[cfg(feature = "core")]
1695 async fn check_admission(
1696 &self,
1697 quota_policy_id: &ff_core::types::QuotaPolicyId,
1698 _dimension: &str,
1699 args: ff_core::contracts::CheckAdmissionArgs,
1700 ) -> Result<ff_core::contracts::CheckAdmissionResult, EngineError> {
1701 crate::typed_ops::check_admission(
1702 self.pool(),
1703 &self.partition_config,
1704 quota_policy_id,
1705 args,
1706 )
1707 .await
1708 }
1709
1710 #[cfg(feature = "core")]
1711 async fn evaluate_flow_eligibility(
1712 &self,
1713 args: ff_core::contracts::EvaluateFlowEligibilityArgs,
1714 ) -> Result<ff_core::contracts::EvaluateFlowEligibilityResult, EngineError> {
1715 crate::typed_ops::evaluate_flow_eligibility(self.pool(), args).await
1716 }
1717
1718 #[cfg(feature = "core")]
1719 async fn claim_execution(
1720 &self,
1721 args: ff_core::contracts::ClaimExecutionArgs,
1722 ) -> Result<ff_core::contracts::ClaimExecutionResult, EngineError> {
1723 crate::typed_ops::claim_execution(self.pool(), &self.partition_config, args).await
1724 }
1725
1726 #[cfg(feature = "core")]
1730 #[tracing::instrument(name = "pg.issue_grant_and_claim", skip_all)]
1731 async fn issue_grant_and_claim(
1732 &self,
1733 args: ff_core::contracts::IssueGrantAndClaimArgs,
1734 ) -> Result<ff_core::contracts::ClaimGrantOutcome, EngineError> {
1735 crate::typed_ops::issue_grant_and_claim(self.pool(), &self.partition_config, args).await
1736 }
1737
1738 async fn read_exec_core_fields(
1741 &self,
1742 partition: ff_core::partition::Partition,
1743 execution_id: &ff_core::types::ExecutionId,
1744 fields: &[&str],
1745 ) -> Result<std::collections::HashMap<String, Option<String>>, EngineError> {
1746 if fields.is_empty() {
1747 return Ok(std::collections::HashMap::new());
1748 }
1749 let derived: u16 = execution_id.partition();
1754 if partition.index != derived {
1755 return Err(EngineError::Validation {
1756 kind: ff_core::engine_error::ValidationKind::InvalidInput,
1757 detail: format!(
1758 "read_exec_core_fields: partition mismatch (arg={}, eid={})",
1759 partition.index, derived
1760 ),
1761 });
1762 }
1763 let partition_key: i16 = partition.index as i16;
1764 let exec_uuid = crate::exec_core::eid_uuid(execution_id);
1765
1766 let mut projections: Vec<String> = Vec::with_capacity(fields.len());
1786 for field in fields {
1787 let expr = match *field {
1788 "lane_id" | "lifecycle_phase" | "ownership_state" | "eligibility_state"
1790 | "public_state" | "attempt_state" | "blocking_reason" | "cancellation_reason"
1791 | "cancelled_by" => format!("{f}::text", f = field),
1792 "attempt_index" => "attempt_index::text".to_string(),
1793 "flow_id" => "flow_id::text".to_string(),
1794 "priority" => "priority::text".to_string(),
1795 "created_at_ms" => "created_at_ms::text".to_string(),
1796 "terminal_at_ms" => "terminal_at_ms::text".to_string(),
1797 "deadline_at_ms" => "deadline_at_ms::text".to_string(),
1798 "current_attempt_index" => "attempt_index::text".to_string(),
1801 "completed_at" => "terminal_at_ms::text".to_string(),
1802 "cancel_reason" => "cancellation_reason::text".to_string(),
1803 "required_capabilities" => {
1806 "array_to_string(required_capabilities, ',')".to_string()
1807 }
1808 other => {
1810 match other {
1813 "current_waitpoint_id"
1814 | "current_worker_instance_id"
1815 | "budget_ids"
1816 | "quota_policy_id" => format!("raw_fields ->> '{other}'"),
1817 _ => "NULL".to_string(),
1818 }
1819 }
1820 };
1821 projections.push(expr);
1822 }
1823 let projection_sql = projections.join(", ");
1824 let query = format!(
1825 "SELECT {projection_sql} FROM ff_exec_core \
1826 WHERE partition_key = $1 AND execution_id = $2"
1827 );
1828 let row_opt = sqlx::query(&query)
1829 .bind(partition_key)
1830 .bind(exec_uuid)
1831 .fetch_optional(self.pool())
1832 .await
1833 .map_err(|e| EngineError::Transport {
1834 backend: "postgres",
1835 source: format!("read_exec_core_fields: {e}").into(),
1836 })?;
1837
1838 let mut out = std::collections::HashMap::with_capacity(fields.len());
1839 if let Some(row) = row_opt {
1840 use sqlx::Row;
1841 for (idx, field) in fields.iter().enumerate() {
1842 let val: Option<String> =
1843 row.try_get(idx).map_err(|e| EngineError::Transport {
1844 backend: "postgres",
1845 source: format!("read_exec_core_fields[{field}]: {e}").into(),
1846 })?;
1847 out.insert((*field).to_string(), val);
1848 }
1849 } else {
1850 for field in fields {
1851 out.insert((*field).to_string(), None);
1852 }
1853 }
1854 Ok(out)
1855 }
1856
1857 async fn server_time_ms(&self) -> Result<u64, EngineError> {
1860 let ms: i64 = sqlx::query_scalar(
1865 "SELECT (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint",
1866 )
1867 .fetch_one(self.pool())
1868 .await
1869 .map_err(|e| EngineError::Transport {
1870 backend: "postgres",
1871 source: format!("server_time_ms: {e}").into(),
1872 })?;
1873 if ms < 0 {
1874 return Err(EngineError::Transport {
1875 backend: "postgres",
1876 source: "server_time_ms: negative epoch".into(),
1877 });
1878 }
1879 Ok(ms as u64)
1880 }
1881
1882 #[cfg(feature = "core")]
1889 #[tracing::instrument(name = "pg.register_worker", skip_all)]
1890 async fn register_worker(
1891 &self,
1892 args: ff_core::contracts::RegisterWorkerArgs,
1893 ) -> Result<ff_core::contracts::RegisterWorkerOutcome, EngineError> {
1894 worker_registry::register_worker(&self.pool, args).await
1895 }
1896
1897 #[cfg(feature = "core")]
1898 #[tracing::instrument(name = "pg.heartbeat_worker", skip_all)]
1899 async fn heartbeat_worker(
1900 &self,
1901 args: ff_core::contracts::HeartbeatWorkerArgs,
1902 ) -> Result<ff_core::contracts::HeartbeatWorkerOutcome, EngineError> {
1903 worker_registry::heartbeat_worker(&self.pool, args).await
1904 }
1905
1906 #[cfg(feature = "core")]
1907 #[tracing::instrument(name = "pg.mark_worker_dead", skip_all)]
1908 async fn mark_worker_dead(
1909 &self,
1910 args: ff_core::contracts::MarkWorkerDeadArgs,
1911 ) -> Result<ff_core::contracts::MarkWorkerDeadOutcome, EngineError> {
1912 worker_registry::mark_worker_dead(&self.pool, args).await
1913 }
1914
1915 #[cfg(all(feature = "core", feature = "suspension"))]
1920 #[tracing::instrument(name = "pg.list_expired_leases", skip_all)]
1921 async fn list_expired_leases(
1922 &self,
1923 args: ff_core::contracts::ListExpiredLeasesArgs,
1924 ) -> Result<ff_core::contracts::ListExpiredLeasesResult, EngineError> {
1925 worker_registry::list_expired_leases(&self.pool, args).await
1926 }
1927
1928 #[cfg(feature = "core")]
1929 #[tracing::instrument(name = "pg.list_workers", skip_all)]
1930 async fn list_workers(
1931 &self,
1932 args: ff_core::contracts::ListWorkersArgs,
1933 ) -> Result<ff_core::contracts::ListWorkersResult, EngineError> {
1934 worker_registry::list_workers(&self.pool, args).await
1935 }
1936}
1937
1938async fn resolve_event_id(
1957 pool: &PgPool,
1958 payload: &ff_core::backend::CompletionPayload,
1959) -> Option<i64> {
1960 let eid_str = payload.execution_id.as_str();
1961 let uuid_str = eid_str.rsplit_once(':').map(|(_, u)| u)?;
1964 let uuid = uuid::Uuid::parse_str(uuid_str).ok()?;
1965 let partition_key = i16::try_from(payload.execution_id.partition()).ok()?;
1969 let occurred_at_ms = payload.produced_at_ms.0;
1970
1971 match sqlx::query_scalar::<_, i64>(
1972 "SELECT event_id FROM ff_completion_event \
1973 WHERE partition_key = $1 AND execution_id = $2 AND occurred_at_ms = $3 \
1974 ORDER BY event_id ASC LIMIT 1",
1975 )
1976 .bind(partition_key)
1977 .bind(uuid)
1978 .bind(occurred_at_ms)
1979 .fetch_optional(pool)
1980 .await
1981 {
1982 Ok(row) => row,
1983 Err(err) => {
1984 tracing::warn!(
1985 partition_key,
1986 execution_id = %uuid,
1987 occurred_at_ms,
1988 error = %err,
1989 "resolve_event_id: ff_completion_event lookup failed; falling back to \
1990 dependency_reconciler backstop"
1991 );
1992 None
1993 }
1994 }
1995}
1996
1997fn partition_index_to_i16(partition: Partition) -> Result<i16, EngineError> {
2006 i16::try_from(partition.index).map_err(|_| EngineError::Validation {
2007 kind: ff_core::engine_error::ValidationKind::InvalidInput,
2008 detail: format!(
2009 "partition index {} exceeds i16 range (max {})",
2010 partition.index,
2011 i16::MAX
2012 ),
2013 })
2014}
2015
2016const MIN_MAX_LOCKS_PER_TRANSACTION: i64 = 256;
2025
2026async fn warn_if_max_locks_low(pool: &PgPool) {
2031 let row: Result<(String,), sqlx::Error> =
2032 sqlx::query_as("SHOW max_locks_per_transaction")
2033 .fetch_one(pool)
2034 .await;
2035 match row {
2036 Ok((raw,)) => emit_max_locks_decision(&raw),
2037 Err(e) => {
2038 tracing::debug!("failed to probe max_locks_per_transaction: {e}");
2039 }
2040 }
2041}
2042
2043fn max_locks_warn_value(raw: &str) -> Option<i64> {
2049 match raw.parse::<i64>() {
2050 Ok(v) if v < MIN_MAX_LOCKS_PER_TRANSACTION => Some(v),
2051 Ok(_) => None,
2052 Err(e) => {
2053 tracing::debug!(raw, "failed to parse max_locks_per_transaction: {e}");
2054 None
2055 }
2056 }
2057}
2058
2059fn emit_max_locks_decision(raw: &str) {
2060 if let Some(v) = max_locks_warn_value(raw) {
2061 tracing::warn!(
2062 current = v,
2063 recommended = MIN_MAX_LOCKS_PER_TRANSACTION,
2064 "postgres max_locks_per_transaction={v} is below the recommended \
2065 minimum ({MIN_MAX_LOCKS_PER_TRANSACTION}); partition-heavy workloads \
2066 may hit 'out of shared memory' under concurrent load. \
2067 See docs/operator-guide-postgres.md."
2068 );
2069 }
2070}
2071
2072#[cfg(test)]
2073mod max_locks_tests {
2074 use super::{max_locks_warn_value, MIN_MAX_LOCKS_PER_TRANSACTION};
2075
2076 #[test]
2077 fn warns_when_below_threshold() {
2078 assert_eq!(max_locks_warn_value("64"), Some(64));
2079 assert_eq!(
2080 max_locks_warn_value(&(MIN_MAX_LOCKS_PER_TRANSACTION - 1).to_string()),
2081 Some(MIN_MAX_LOCKS_PER_TRANSACTION - 1)
2082 );
2083 }
2084
2085 #[test]
2086 fn silent_at_or_above_threshold() {
2087 assert_eq!(
2088 max_locks_warn_value(&MIN_MAX_LOCKS_PER_TRANSACTION.to_string()),
2089 None
2090 );
2091 assert_eq!(max_locks_warn_value("1024"), None);
2092 }
2093
2094 #[test]
2095 fn silent_for_unparseable_raw() {
2096 assert_eq!(max_locks_warn_value("not-a-number"), None);
2097 }
2098}
2099
2100#[cfg(test)]
2101mod partition_index_tests {
2102 use super::partition_index_to_i16;
2103 use ff_core::engine_error::{EngineError, ValidationKind};
2104 use ff_core::partition::{Partition, PartitionFamily};
2105
2106 #[test]
2107 fn accepts_values_within_i16_range() {
2108 let p = Partition { family: PartitionFamily::Flow, index: 0 };
2109 assert_eq!(partition_index_to_i16(p).unwrap(), 0);
2110
2111 let p = Partition { family: PartitionFamily::Flow, index: 255 };
2112 assert_eq!(partition_index_to_i16(p).unwrap(), 255);
2113
2114 let p = Partition { family: PartitionFamily::Budget, index: i16::MAX as u16 };
2115 assert_eq!(partition_index_to_i16(p).unwrap(), i16::MAX);
2116 }
2117
2118 #[test]
2119 fn rejects_overflow_above_i16_max() {
2120 let p = Partition { family: PartitionFamily::Flow, index: (i16::MAX as u16) + 1 };
2121 let err = partition_index_to_i16(p).unwrap_err();
2122 match err {
2123 EngineError::Validation { kind, detail } => {
2124 assert_eq!(kind, ValidationKind::InvalidInput);
2125 assert!(detail.contains("exceeds i16 range"), "unexpected detail: {detail}");
2126 }
2127 other => panic!("expected Validation error, got {other:?}"),
2128 }
2129 }
2130
2131 #[test]
2132 fn rejects_u16_max() {
2133 let p = Partition { family: PartitionFamily::Quota, index: u16::MAX };
2134 assert!(matches!(
2135 partition_index_to_i16(p),
2136 Err(EngineError::Validation { kind: ValidationKind::InvalidInput, .. })
2137 ));
2138 }
2139}