1#![allow(clippy::result_large_err)]
20
21use std::sync::Arc;
22use std::time::Duration;
23
24use async_trait::async_trait;
25
26use ff_core::backend::{
27 AppendFrameOutcome, BackendConfig, CancelFlowPolicy, CancelFlowWait, CapabilitySet,
28 ClaimPolicy, FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal,
29 PendingWaitpoint, ReclaimToken, ResumeSignal, SummaryDocument, TailVisibility,
30 UsageDimensions,
31};
32#[cfg(feature = "core")]
33use ff_core::contracts::{
34 AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
35 ApplyDependencyToChildResult, ClaimResumedExecutionArgs, ClaimResumedExecutionResult,
36 CreateExecutionArgs, CreateExecutionResult, CreateFlowArgs, CreateFlowResult,
37 DeliverSignalArgs, DeliverSignalResult, EdgeDependencyPolicy, EdgeDirection, EdgeSnapshot,
38 ListExecutionsPage, ListFlowsPage, ListLanesPage, ListPendingWaitpointsArgs,
39 ListPendingWaitpointsResult, ListSuspendedPage, SetEdgeGroupPolicyResult,
40 StageDependencyEdgeArgs, StageDependencyEdgeResult,
41};
42#[cfg(feature = "core")]
43use ff_core::state::PublicState;
44use ff_core::contracts::{
45 CancelFlowResult, ExecutionSnapshot, FlowSnapshot, ReportUsageResult,
46 RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretAllResult, SeedOutcome,
47 SeedWaitpointHmacSecretArgs, SuspendArgs, SuspendOutcome,
48};
49#[cfg(feature = "core")]
50use ff_core::contracts::ExecutionInfo;
51#[cfg(feature = "core")]
53use ff_core::contracts::{
54 CancelExecutionArgs, CancelExecutionResult, RevokeLeaseArgs, RevokeLeaseResult,
55};
56#[cfg(feature = "core")]
58use ff_core::contracts::{
59 CancelFlowArgs, CancelFlowHeader, ChangePriorityArgs, ChangePriorityResult,
60 ReplayExecutionArgs, ReplayExecutionResult,
61};
62#[cfg(feature = "core")]
64use ff_core::contracts::{
65 BudgetStatus, CreateBudgetArgs, CreateBudgetResult, CreateQuotaPolicyArgs,
66 CreateQuotaPolicyResult, ReportUsageAdminArgs, ResetBudgetArgs, ResetBudgetResult,
67};
68#[cfg(feature = "streaming")]
69use ff_core::contracts::{StreamCursor, StreamFrames};
70use ff_core::engine_backend::EngineBackend;
71use ff_core::engine_error::EngineError;
72#[cfg(feature = "core")]
73use ff_core::partition::PartitionKey;
74use ff_core::partition::PartitionConfig;
75#[cfg(feature = "streaming")]
76use ff_core::types::AttemptIndex;
77#[cfg(feature = "core")]
78use ff_core::types::EdgeId;
79use ff_core::types::{BudgetId, ExecutionId, FlowId, LaneId, LeaseFence, TimestampMs};
80pub use sqlx::PgPool;
84
85#[cfg(feature = "core")]
86mod admin;
87pub mod attempt;
88pub mod budget;
89pub mod completion;
90#[cfg(feature = "core")]
91pub mod dispatch;
92pub mod error;
93pub mod exec_core;
94pub mod flow;
95#[cfg(feature = "core")]
96pub mod flow_staging;
97pub mod handle_codec;
98mod lease_event;
99mod lease_event_subscribe;
100pub mod listener;
101pub mod migrate;
102#[cfg(feature = "core")]
103pub mod operator;
104#[cfg(feature = "core")]
105mod operator_event;
106pub mod pool;
107#[cfg(feature = "core")]
108pub mod reconcilers;
109#[cfg(feature = "core")]
110pub mod scanner_supervisor;
111#[cfg(feature = "core")]
112pub mod scheduler;
113pub mod signal;
114mod signal_delivery_subscribe;
115mod signal_event;
116#[cfg(feature = "streaming")]
117pub mod stream;
118pub mod suspend;
119pub mod suspend_ops;
120pub mod version;
121
122pub use completion::{PostgresCompletionStream, COMPLETION_CHANNEL};
123pub use error::{map_sqlx_error, PostgresTransportError};
124pub use listener::StreamNotifier;
125pub use migrate::{apply_migrations, MigrationError};
126#[cfg(feature = "core")]
127pub use scanner_supervisor::{PostgresScannerConfig, PostgresScannerHandle};
128pub use version::check_schema_version;
129
130pub use ff_core::backend::PostgresConnection;
136
137fn postgres_supports_base() -> ff_core::capability::Supports {
167 let mut s = ff_core::capability::Supports::none();
168
169 s.cancel_flow_wait_timeout = true;
171 s.cancel_flow_wait_indefinite = true;
172
173 s.rotate_waitpoint_hmac_secret_all = true;
175 s.seed_waitpoint_hmac_secret = true;
176
177 s.claim_for_worker = true;
179
180 s.subscribe_lease_history = true;
182 s.subscribe_completion = true;
183 s.subscribe_signal_delivery = true;
184 s.subscribe_instance_tags = false;
185
186 s.stream_durable_summary = true;
188 s.stream_best_effort_live = true;
189
190 s.prepare = true;
192
193 s.cancel_execution = true;
200 s.change_priority = true;
201 s.replay_execution = true;
202 s.revoke_lease = true;
203 s.read_execution_state = true;
204 s.read_execution_info = true;
205 s.get_execution_result = true;
206 s.budget_admin = true;
207 s.quota_admin = true;
208 s.list_pending_waitpoints = true;
209 s.cancel_flow_header = true;
210 s.ack_cancel_member = true;
211
212 s
213}
214
215pub struct PostgresBackend {
216 #[allow(dead_code)] pool: PgPool,
218 #[allow(dead_code)]
219 partition_config: PartitionConfig,
220 #[allow(dead_code)]
221 metrics: Option<Arc<ff_observability::Metrics>>,
222 #[allow(dead_code)]
226 stream_notifier: Option<Arc<StreamNotifier>>,
227 #[cfg(feature = "core")]
233 scanner_handle: Option<Arc<scanner_supervisor::PostgresScannerHandle>>,
234}
235
236impl PostgresBackend {
237 pub async fn connect(config: BackendConfig) -> Result<Arc<dyn EngineBackend>, EngineError> {
252 let pool = pool::build_pool(&config).await?;
253 warn_if_max_locks_low(&pool).await;
254 let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
255 let backend = Self {
256 pool,
257 partition_config: PartitionConfig::default(),
258 metrics: None,
259 stream_notifier,
260 #[cfg(feature = "core")]
261 scanner_handle: None,
262 };
263 Ok(Arc::new(backend))
264 }
265
266 pub fn from_pool(pool: PgPool, partition_config: PartitionConfig) -> Arc<Self> {
272 let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
273 Arc::new(Self {
274 pool,
275 partition_config,
276 metrics: None,
277 stream_notifier,
278 #[cfg(feature = "core")]
279 scanner_handle: None,
280 })
281 }
282
283 pub async fn connect_with_metrics(
297 config: BackendConfig,
298 partition_config: PartitionConfig,
299 metrics: Arc<ff_observability::Metrics>,
300 ) -> Result<Arc<Self>, EngineError> {
301 let pool = pool::build_pool(&config).await?;
302 warn_if_max_locks_low(&pool).await;
303 let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
304 Ok(Arc::new(Self {
305 pool,
306 partition_config,
307 metrics: Some(metrics),
308 stream_notifier,
309 #[cfg(feature = "core")]
310 scanner_handle: None,
311 }))
312 }
313
314 #[cfg(feature = "core")]
321 pub fn with_scanners(
322 self: &mut Arc<Self>,
323 cfg: scanner_supervisor::PostgresScannerConfig,
324 ) -> bool {
325 let Some(inner) = Arc::get_mut(self) else {
326 return false;
327 };
328 let handle = scanner_supervisor::spawn_scanners(inner.pool.clone(), cfg);
329 inner.scanner_handle = Some(Arc::new(handle));
330 true
331 }
332
333 pub fn pool(&self) -> &PgPool {
338 &self.pool
339 }
340
341 #[cfg(feature = "core")]
354 #[tracing::instrument(name = "pg.create_execution", skip_all)]
355 pub async fn create_execution(
356 &self,
357 args: CreateExecutionArgs,
358 ) -> Result<ExecutionId, EngineError> {
359 exec_core::create_execution_impl(&self.pool, &self.partition_config, args).await
360 }
361
362 #[cfg(feature = "core")]
370 #[tracing::instrument(name = "pg.create_flow.inherent", skip_all)]
371 pub async fn create_flow(
372 &self,
373 args: &CreateFlowArgs,
374 ) -> Result<CreateFlowResult, EngineError> {
375 flow_staging::create_flow(&self.pool, &self.partition_config, args).await
376 }
377
378 #[cfg(feature = "core")]
379 #[tracing::instrument(name = "pg.add_execution_to_flow.inherent", skip_all)]
380 pub async fn add_execution_to_flow(
381 &self,
382 args: &AddExecutionToFlowArgs,
383 ) -> Result<AddExecutionToFlowResult, EngineError> {
384 flow_staging::add_execution_to_flow(&self.pool, &self.partition_config, args).await
385 }
386
387 #[cfg(feature = "core")]
388 #[tracing::instrument(name = "pg.stage_dependency_edge.inherent", skip_all)]
389 pub async fn stage_dependency_edge(
390 &self,
391 args: &StageDependencyEdgeArgs,
392 ) -> Result<StageDependencyEdgeResult, EngineError> {
393 flow_staging::stage_dependency_edge(&self.pool, &self.partition_config, args).await
394 }
395
396 #[cfg(feature = "core")]
397 #[tracing::instrument(name = "pg.apply_dependency_to_child.inherent", skip_all)]
398 pub async fn apply_dependency_to_child(
399 &self,
400 args: &ApplyDependencyToChildArgs,
401 ) -> Result<ApplyDependencyToChildResult, EngineError> {
402 flow_staging::apply_dependency_to_child(&self.pool, &self.partition_config, args).await
403 }
404}
405
406#[inline]
410fn unavailable<T>(op: &'static str) -> Result<T, EngineError> {
411 Err(EngineError::Unavailable { op })
412}
413
414#[async_trait]
415impl EngineBackend for PostgresBackend {
416 #[tracing::instrument(name = "pg.claim", skip_all)]
419 async fn claim(
420 &self,
421 lane: &LaneId,
422 capabilities: &CapabilitySet,
423 policy: ClaimPolicy,
424 ) -> Result<Option<Handle>, EngineError> {
425 attempt::claim(&self.pool, lane, capabilities, &policy).await
426 }
427
428 #[tracing::instrument(name = "pg.renew", skip_all)]
429 async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError> {
430 attempt::renew(&self.pool, handle).await
431 }
432
433 #[tracing::instrument(name = "pg.progress", skip_all)]
434 async fn progress(
435 &self,
436 handle: &Handle,
437 percent: Option<u8>,
438 message: Option<String>,
439 ) -> Result<(), EngineError> {
440 attempt::progress(&self.pool, handle, percent, message).await
441 }
442
443 #[tracing::instrument(name = "pg.append_frame", skip_all)]
444 async fn append_frame(
445 &self,
446 handle: &Handle,
447 frame: Frame,
448 ) -> Result<AppendFrameOutcome, EngineError> {
449 #[cfg(feature = "streaming")]
450 {
451 stream::append_frame(&self.pool, &self.partition_config, handle, frame).await
452 }
453 #[cfg(not(feature = "streaming"))]
454 {
455 let _ = (handle, frame);
456 unavailable("pg.append_frame")
457 }
458 }
459
460 #[tracing::instrument(name = "pg.complete", skip_all)]
461 async fn complete(
462 &self,
463 handle: &Handle,
464 payload: Option<Vec<u8>>,
465 ) -> Result<(), EngineError> {
466 attempt::complete(&self.pool, handle, payload).await
467 }
468
469 #[tracing::instrument(name = "pg.fail", skip_all)]
470 async fn fail(
471 &self,
472 handle: &Handle,
473 reason: FailureReason,
474 classification: FailureClass,
475 ) -> Result<FailOutcome, EngineError> {
476 attempt::fail(&self.pool, handle, reason, classification).await
477 }
478
479 #[tracing::instrument(name = "pg.cancel", skip_all)]
480 async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError> {
481 let payload = handle_codec::decode_handle(handle)?;
482 exec_core::cancel_impl(
483 &self.pool,
484 &self.partition_config,
485 &payload.execution_id,
486 reason,
487 )
488 .await
489 }
490
491 #[tracing::instrument(name = "pg.suspend", skip_all)]
492 async fn suspend(
493 &self,
494 handle: &Handle,
495 args: SuspendArgs,
496 ) -> Result<SuspendOutcome, EngineError> {
497 suspend_ops::suspend_impl(&self.pool, &self.partition_config, handle, args).await
498 }
499
500 #[tracing::instrument(name = "pg.suspend_by_triple", skip_all)]
501 async fn suspend_by_triple(
502 &self,
503 exec_id: ExecutionId,
504 triple: LeaseFence,
505 args: SuspendArgs,
506 ) -> Result<SuspendOutcome, EngineError> {
507 suspend_ops::suspend_by_triple_impl(
508 &self.pool,
509 &self.partition_config,
510 exec_id,
511 triple,
512 args,
513 )
514 .await
515 }
516
517 #[tracing::instrument(name = "pg.create_waitpoint", skip_all)]
518 async fn create_waitpoint(
519 &self,
520 _handle: &Handle,
521 _waitpoint_key: &str,
522 _expires_in: Duration,
523 ) -> Result<PendingWaitpoint, EngineError> {
524 unavailable("pg.create_waitpoint")
525 }
526
527 #[tracing::instrument(name = "pg.observe_signals", skip_all)]
528 async fn observe_signals(
529 &self,
530 handle: &Handle,
531 ) -> Result<Vec<ResumeSignal>, EngineError> {
532 suspend_ops::observe_signals_impl(&self.pool, handle).await
533 }
534
535 #[tracing::instrument(name = "pg.claim_from_reclaim", skip_all)]
536 async fn claim_from_reclaim(
537 &self,
538 token: ReclaimToken,
539 ) -> Result<Option<Handle>, EngineError> {
540 attempt::claim_from_reclaim(&self.pool, token).await
541 }
542
543 #[tracing::instrument(name = "pg.delay", skip_all)]
544 async fn delay(
545 &self,
546 handle: &Handle,
547 delay_until: TimestampMs,
548 ) -> Result<(), EngineError> {
549 attempt::delay(&self.pool, handle, delay_until).await
550 }
551
552 #[tracing::instrument(name = "pg.wait_children", skip_all)]
553 async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError> {
554 attempt::wait_children(&self.pool, handle).await
555 }
556
557 #[tracing::instrument(name = "pg.describe_execution", skip_all)]
560 async fn describe_execution(
561 &self,
562 id: &ExecutionId,
563 ) -> Result<Option<ExecutionSnapshot>, EngineError> {
564 exec_core::describe_execution_impl(&self.pool, &self.partition_config, id).await
565 }
566
567 #[tracing::instrument(name = "pg.describe_flow", skip_all)]
568 async fn describe_flow(
569 &self,
570 id: &FlowId,
571 ) -> Result<Option<FlowSnapshot>, EngineError> {
572 flow::describe_flow(&self.pool, &self.partition_config, id).await
573 }
574
575 #[cfg(feature = "core")]
576 #[tracing::instrument(name = "pg.list_edges", skip_all)]
577 async fn list_edges(
578 &self,
579 flow_id: &FlowId,
580 direction: EdgeDirection,
581 ) -> Result<Vec<EdgeSnapshot>, EngineError> {
582 flow::list_edges(&self.pool, &self.partition_config, flow_id, direction).await
583 }
584
585 #[cfg(feature = "core")]
586 #[tracing::instrument(name = "pg.describe_edge", skip_all)]
587 async fn describe_edge(
588 &self,
589 flow_id: &FlowId,
590 edge_id: &EdgeId,
591 ) -> Result<Option<EdgeSnapshot>, EngineError> {
592 flow::describe_edge(&self.pool, &self.partition_config, flow_id, edge_id).await
593 }
594
595 #[cfg(feature = "core")]
596 #[tracing::instrument(name = "pg.resolve_execution_flow_id", skip_all)]
597 async fn resolve_execution_flow_id(
598 &self,
599 eid: &ExecutionId,
600 ) -> Result<Option<FlowId>, EngineError> {
601 exec_core::resolve_execution_flow_id_impl(&self.pool, &self.partition_config, eid).await
602 }
603
604 #[cfg(feature = "core")]
605 #[tracing::instrument(name = "pg.list_flows", skip_all)]
606 async fn list_flows(
607 &self,
608 partition: PartitionKey,
609 cursor: Option<FlowId>,
610 limit: usize,
611 ) -> Result<ListFlowsPage, EngineError> {
612 flow::list_flows(&self.pool, partition, cursor, limit).await
613 }
614
615 #[cfg(feature = "core")]
616 #[tracing::instrument(name = "pg.list_lanes", skip_all)]
617 async fn list_lanes(
618 &self,
619 cursor: Option<LaneId>,
620 limit: usize,
621 ) -> Result<ListLanesPage, EngineError> {
622 admin::list_lanes_impl(&self.pool, cursor, limit).await
623 }
624
625 #[cfg(feature = "core")]
626 #[tracing::instrument(name = "pg.list_suspended", skip_all)]
627 async fn list_suspended(
628 &self,
629 partition: PartitionKey,
630 cursor: Option<ExecutionId>,
631 limit: usize,
632 ) -> Result<ListSuspendedPage, EngineError> {
633 admin::list_suspended_impl(&self.pool, partition, cursor, limit).await
634 }
635
636 #[cfg(feature = "core")]
637 #[tracing::instrument(name = "pg.list_executions", skip_all)]
638 async fn list_executions(
639 &self,
640 partition: PartitionKey,
641 cursor: Option<ExecutionId>,
642 limit: usize,
643 ) -> Result<ListExecutionsPage, EngineError> {
644 exec_core::list_executions_impl(
645 &self.pool,
646 &self.partition_config,
647 partition,
648 cursor,
649 limit,
650 )
651 .await
652 }
653
654 #[cfg(feature = "core")]
657 #[tracing::instrument(name = "pg.deliver_signal", skip_all)]
658 async fn deliver_signal(
659 &self,
660 args: DeliverSignalArgs,
661 ) -> Result<DeliverSignalResult, EngineError> {
662 suspend_ops::deliver_signal_impl(&self.pool, &self.partition_config, args).await
663 }
664
665 #[cfg(feature = "core")]
666 #[tracing::instrument(name = "pg.claim_resumed_execution", skip_all)]
667 async fn claim_resumed_execution(
668 &self,
669 args: ClaimResumedExecutionArgs,
670 ) -> Result<ClaimResumedExecutionResult, EngineError> {
671 suspend_ops::claim_resumed_execution_impl(&self.pool, &self.partition_config, args).await
672 }
673
674 #[cfg(feature = "core")]
684 #[tracing::instrument(name = "pg.read_execution_state", skip_all)]
685 async fn read_execution_state(
686 &self,
687 id: &ExecutionId,
688 ) -> Result<Option<PublicState>, EngineError> {
689 exec_core::read_execution_state_impl(&self.pool, &self.partition_config, id).await
690 }
691
692 #[cfg(feature = "core")]
693 #[tracing::instrument(name = "pg.read_execution_info", skip_all)]
694 async fn read_execution_info(
695 &self,
696 id: &ExecutionId,
697 ) -> Result<Option<ExecutionInfo>, EngineError> {
698 exec_core::read_execution_info_impl(&self.pool, &self.partition_config, id).await
699 }
700
701 #[tracing::instrument(name = "pg.get_execution_result", skip_all)]
702 async fn get_execution_result(
703 &self,
704 id: &ExecutionId,
705 ) -> Result<Option<Vec<u8>>, EngineError> {
706 exec_core::get_execution_result_impl(&self.pool, &self.partition_config, id).await
707 }
708
709 #[cfg(feature = "core")]
718 #[tracing::instrument(name = "pg.list_pending_waitpoints", skip_all)]
719 async fn list_pending_waitpoints(
720 &self,
721 args: ListPendingWaitpointsArgs,
722 ) -> Result<ListPendingWaitpointsResult, EngineError> {
723 suspend_ops::list_pending_waitpoints_impl(&self.pool, args).await
724 }
725
726 #[cfg(feature = "core")]
734 #[tracing::instrument(name = "pg.cancel_execution", skip_all)]
735 async fn cancel_execution(
736 &self,
737 args: CancelExecutionArgs,
738 ) -> Result<CancelExecutionResult, EngineError> {
739 operator::cancel_execution_impl(&self.pool, args).await
740 }
741
742 #[cfg(feature = "core")]
743 #[tracing::instrument(name = "pg.revoke_lease", skip_all)]
744 async fn revoke_lease(
745 &self,
746 args: RevokeLeaseArgs,
747 ) -> Result<RevokeLeaseResult, EngineError> {
748 operator::revoke_lease_impl(&self.pool, args).await
749 }
750
751 #[cfg(feature = "core")]
761 #[tracing::instrument(name = "pg.change_priority", skip_all)]
762 async fn change_priority(
763 &self,
764 args: ChangePriorityArgs,
765 ) -> Result<ChangePriorityResult, EngineError> {
766 operator::change_priority_impl(&self.pool, args).await
767 }
768
769 #[cfg(feature = "core")]
770 #[tracing::instrument(name = "pg.replay_execution", skip_all)]
771 async fn replay_execution(
772 &self,
773 args: ReplayExecutionArgs,
774 ) -> Result<ReplayExecutionResult, EngineError> {
775 operator::replay_execution_impl(&self.pool, args).await
776 }
777
778 #[cfg(feature = "core")]
779 #[tracing::instrument(name = "pg.cancel_flow_header", skip_all)]
780 async fn cancel_flow_header(
781 &self,
782 args: CancelFlowArgs,
783 ) -> Result<CancelFlowHeader, EngineError> {
784 operator::cancel_flow_header_impl(&self.pool, &self.partition_config, args).await
785 }
786
787 #[cfg(feature = "core")]
788 #[tracing::instrument(name = "pg.ack_cancel_member", skip_all)]
789 async fn ack_cancel_member(
790 &self,
791 flow_id: &FlowId,
792 execution_id: &ExecutionId,
793 ) -> Result<(), EngineError> {
794 operator::ack_cancel_member_impl(
795 &self.pool,
796 &self.partition_config,
797 flow_id.clone(),
798 execution_id.clone(),
799 )
800 .await
801 }
802
803 #[cfg(feature = "core")]
814 #[tracing::instrument(name = "pg.create_execution.trait", skip_all)]
815 async fn create_execution(
816 &self,
817 args: CreateExecutionArgs,
818 ) -> Result<CreateExecutionResult, EngineError> {
819 let eid = args.execution_id.clone();
820 exec_core::create_execution_impl(&self.pool, &self.partition_config, args).await?;
821 Ok(CreateExecutionResult::Created {
822 execution_id: eid,
823 public_state: PublicState::Waiting,
824 })
825 }
826
827 #[cfg(feature = "core")]
828 #[tracing::instrument(name = "pg.create_flow", skip_all)]
829 async fn create_flow(
830 &self,
831 args: CreateFlowArgs,
832 ) -> Result<CreateFlowResult, EngineError> {
833 flow_staging::create_flow(&self.pool, &self.partition_config, &args).await
834 }
835
836 #[cfg(feature = "core")]
837 #[tracing::instrument(name = "pg.add_execution_to_flow", skip_all)]
838 async fn add_execution_to_flow(
839 &self,
840 args: AddExecutionToFlowArgs,
841 ) -> Result<AddExecutionToFlowResult, EngineError> {
842 flow_staging::add_execution_to_flow(&self.pool, &self.partition_config, &args).await
843 }
844
845 #[cfg(feature = "core")]
846 #[tracing::instrument(name = "pg.stage_dependency_edge", skip_all)]
847 async fn stage_dependency_edge(
848 &self,
849 args: StageDependencyEdgeArgs,
850 ) -> Result<StageDependencyEdgeResult, EngineError> {
851 flow_staging::stage_dependency_edge(&self.pool, &self.partition_config, &args).await
852 }
853
854 #[cfg(feature = "core")]
855 #[tracing::instrument(name = "pg.apply_dependency_to_child", skip_all)]
856 async fn apply_dependency_to_child(
857 &self,
858 args: ApplyDependencyToChildArgs,
859 ) -> Result<ApplyDependencyToChildResult, EngineError> {
860 flow_staging::apply_dependency_to_child(&self.pool, &self.partition_config, &args).await
861 }
862
863 fn backend_label(&self) -> &'static str {
864 "postgres"
865 }
866
867 fn capabilities(&self) -> ff_core::capability::Capabilities {
875 ff_core::capability::Capabilities::new(
876 ff_core::capability::BackendIdentity::new(
877 "postgres",
878 ff_core::capability::Version::new(0, 11, 0),
879 "E-shipped",
880 ),
881 postgres_supports_base(),
882 )
883 }
884
885 async fn prepare(
893 &self,
894 ) -> Result<ff_core::backend::PrepareOutcome, EngineError> {
895 Ok(ff_core::backend::PrepareOutcome::NoOp)
896 }
897
898 #[cfg(feature = "core")]
906 #[tracing::instrument(name = "pg.claim_for_worker", skip_all)]
907 async fn claim_for_worker(
908 &self,
909 args: ff_core::contracts::ClaimForWorkerArgs,
910 ) -> Result<ff_core::contracts::ClaimForWorkerOutcome, EngineError> {
911 let sched = scheduler::PostgresScheduler::new(self.pool.clone());
912 let grant_opt = sched
913 .claim_for_worker(
914 &args.lane_id,
915 &args.worker_id,
916 &args.worker_instance_id,
917 &args.worker_capabilities,
918 args.grant_ttl_ms,
919 )
920 .await?;
921 Ok(match grant_opt {
922 Some(g) => ff_core::contracts::ClaimForWorkerOutcome::granted(g),
923 None => ff_core::contracts::ClaimForWorkerOutcome::no_work(),
924 })
925 }
926
927 async fn ping(&self) -> Result<(), EngineError> {
928 let _ = sqlx::query_scalar::<_, i32>("SELECT 1")
932 .fetch_one(&self.pool)
933 .await
934 .map_err(error::map_sqlx_error)?;
935 Ok(())
936 }
937
938 async fn shutdown_prepare(&self, grace: Duration) -> Result<(), EngineError> {
943 #[cfg(feature = "core")]
944 if let Some(handle) = self.scanner_handle.as_ref() {
945 let timed_out = handle.shutdown(grace).await;
946 if timed_out > 0 {
947 tracing::warn!(
948 timed_out,
949 ?grace,
950 "postgres scanner supervisor exceeded grace on shutdown"
951 );
952 }
953 }
954 Ok(())
955 }
956
957 #[tracing::instrument(name = "pg.cancel_flow", skip_all)]
958 async fn cancel_flow(
959 &self,
960 id: &FlowId,
961 policy: CancelFlowPolicy,
962 wait: CancelFlowWait,
963 ) -> Result<CancelFlowResult, EngineError> {
964 let result = flow::cancel_flow(&self.pool, &self.partition_config, id, policy).await?;
965 if let Some(deadline) = ff_core::engine_backend::cancel_flow_wait_deadline(wait) {
966 ff_core::engine_backend::wait_for_flow_cancellation(self, id, deadline).await?;
967 }
968 Ok(result)
969 }
970
971 #[cfg(feature = "core")]
972 #[tracing::instrument(name = "pg.set_edge_group_policy", skip_all)]
973 async fn set_edge_group_policy(
974 &self,
975 flow_id: &FlowId,
976 downstream_execution_id: &ExecutionId,
977 policy: EdgeDependencyPolicy,
978 ) -> Result<SetEdgeGroupPolicyResult, EngineError> {
979 flow::set_edge_group_policy(
980 &self.pool,
981 &self.partition_config,
982 flow_id,
983 downstream_execution_id,
984 policy,
985 )
986 .await
987 }
988
989 #[tracing::instrument(name = "pg.report_usage", skip_all)]
992 async fn report_usage(
993 &self,
994 _handle: &Handle,
995 budget: &BudgetId,
996 dimensions: UsageDimensions,
997 ) -> Result<ReportUsageResult, EngineError> {
998 budget::report_usage_impl(&self.pool, &self.partition_config, budget, dimensions).await
999 }
1000
1001 #[cfg(feature = "core")]
1009 #[tracing::instrument(name = "pg.create_budget", skip_all)]
1010 async fn create_budget(
1011 &self,
1012 args: CreateBudgetArgs,
1013 ) -> Result<CreateBudgetResult, EngineError> {
1014 budget::create_budget_impl(&self.pool, &self.partition_config, args).await
1015 }
1016
1017 #[cfg(feature = "core")]
1018 #[tracing::instrument(name = "pg.reset_budget", skip_all)]
1019 async fn reset_budget(
1020 &self,
1021 args: ResetBudgetArgs,
1022 ) -> Result<ResetBudgetResult, EngineError> {
1023 budget::reset_budget_impl(&self.pool, &self.partition_config, args).await
1024 }
1025
1026 #[cfg(feature = "core")]
1027 #[tracing::instrument(name = "pg.create_quota_policy", skip_all)]
1028 async fn create_quota_policy(
1029 &self,
1030 args: CreateQuotaPolicyArgs,
1031 ) -> Result<CreateQuotaPolicyResult, EngineError> {
1032 budget::create_quota_policy_impl(&self.pool, &self.partition_config, args).await
1033 }
1034
1035 #[cfg(feature = "core")]
1036 #[tracing::instrument(name = "pg.get_budget_status", skip_all)]
1037 async fn get_budget_status(
1038 &self,
1039 id: &BudgetId,
1040 ) -> Result<BudgetStatus, EngineError> {
1041 budget::get_budget_status_impl(&self.pool, &self.partition_config, id).await
1042 }
1043
1044 #[cfg(feature = "core")]
1045 #[tracing::instrument(name = "pg.report_usage_admin", skip_all)]
1046 async fn report_usage_admin(
1047 &self,
1048 budget_id: &BudgetId,
1049 args: ReportUsageAdminArgs,
1050 ) -> Result<ReportUsageResult, EngineError> {
1051 budget::report_usage_admin_impl(&self.pool, &self.partition_config, budget_id, args).await
1052 }
1053
1054 #[tracing::instrument(name = "pg.rotate_waitpoint_hmac_secret_all", skip_all)]
1061 async fn rotate_waitpoint_hmac_secret_all(
1062 &self,
1063 args: RotateWaitpointHmacSecretAllArgs,
1064 ) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
1065 let now_ms = std::time::SystemTime::now()
1070 .duration_since(std::time::UNIX_EPOCH)
1071 .map(|d| d.as_millis() as i64)
1072 .unwrap_or(0);
1073 signal::rotate_waitpoint_hmac_secret_all_impl(&self.pool, args, now_ms).await
1074 }
1075
1076 #[tracing::instrument(name = "pg.seed_waitpoint_hmac_secret", skip_all)]
1077 async fn seed_waitpoint_hmac_secret(
1078 &self,
1079 args: SeedWaitpointHmacSecretArgs,
1080 ) -> Result<SeedOutcome, EngineError> {
1081 let now_ms = std::time::SystemTime::now()
1085 .duration_since(std::time::UNIX_EPOCH)
1086 .map(|d| d.as_millis() as i64)
1087 .unwrap_or(0);
1088 signal::seed_waitpoint_hmac_secret_impl(&self.pool, args, now_ms).await
1089 }
1090
1091 #[cfg(feature = "streaming")]
1094 #[tracing::instrument(name = "pg.read_stream", skip_all)]
1095 async fn read_stream(
1096 &self,
1097 execution_id: &ExecutionId,
1098 attempt_index: AttemptIndex,
1099 from: StreamCursor,
1100 to: StreamCursor,
1101 count_limit: u64,
1102 ) -> Result<StreamFrames, EngineError> {
1103 stream::read_stream(&self.pool, execution_id, attempt_index, from, to, count_limit).await
1104 }
1105
1106 #[cfg(feature = "streaming")]
1107 #[tracing::instrument(name = "pg.tail_stream", skip_all)]
1108 async fn tail_stream(
1109 &self,
1110 execution_id: &ExecutionId,
1111 attempt_index: AttemptIndex,
1112 after: StreamCursor,
1113 block_ms: u64,
1114 count_limit: u64,
1115 visibility: TailVisibility,
1116 ) -> Result<StreamFrames, EngineError> {
1117 let notifier = self
1118 .stream_notifier
1119 .as_ref()
1120 .ok_or(EngineError::Unavailable {
1121 op: "pg.tail_stream (notifier not initialised)",
1122 })?;
1123 stream::tail_stream(
1124 &self.pool,
1125 notifier,
1126 execution_id,
1127 attempt_index,
1128 after,
1129 block_ms,
1130 count_limit,
1131 visibility,
1132 )
1133 .await
1134 }
1135
1136 #[cfg(feature = "streaming")]
1137 #[tracing::instrument(name = "pg.read_summary", skip_all)]
1138 async fn read_summary(
1139 &self,
1140 execution_id: &ExecutionId,
1141 attempt_index: AttemptIndex,
1142 ) -> Result<Option<SummaryDocument>, EngineError> {
1143 stream::read_summary(&self.pool, execution_id, attempt_index).await
1144 }
1145
1146 #[tracing::instrument(name = "pg.subscribe_completion", skip_all)]
1160 async fn subscribe_completion(
1161 &self,
1162 _cursor: ff_core::stream_subscribe::StreamCursor,
1163 filter: &ff_core::backend::ScannerFilter,
1164 ) -> Result<ff_core::stream_events::CompletionSubscription, EngineError> {
1165 use ff_core::stream_events::{CompletionEvent, CompletionOutcome};
1166 use ff_core::stream_subscribe::encode_postgres_event_cursor;
1167 use futures_core::Stream;
1168 use std::pin::Pin;
1169 use std::task::{Context, Poll};
1170
1171 let inner = if filter.is_noop() {
1178 ff_core::completion_backend::CompletionBackend::subscribe_completions(self).await?
1179 } else {
1180 ff_core::completion_backend::CompletionBackend::subscribe_completions_filtered(
1181 self, filter,
1182 )
1183 .await?
1184 };
1185
1186 struct Adapter {
1187 inner: ff_core::completion_backend::CompletionStream,
1188 }
1189
1190 impl Stream for Adapter {
1191 type Item = Result<CompletionEvent, EngineError>;
1192 fn poll_next(
1193 mut self: Pin<&mut Self>,
1194 cx: &mut Context<'_>,
1195 ) -> Poll<Option<Self::Item>> {
1196 match Pin::new(&mut self.inner).poll_next(cx) {
1197 Poll::Pending => Poll::Pending,
1198 Poll::Ready(None) => Poll::Ready(None),
1199 Poll::Ready(Some(payload)) => {
1200 let cursor = encode_postgres_event_cursor(0);
1205 let event = CompletionEvent::new(
1206 cursor,
1207 payload.execution_id.clone(),
1208 CompletionOutcome::from_wire(&payload.outcome),
1209 payload.produced_at_ms,
1210 );
1211 Poll::Ready(Some(Ok(event)))
1212 }
1213 }
1214 }
1215 }
1216
1217 Ok(Box::pin(Adapter { inner }))
1218 }
1219
1220 #[tracing::instrument(name = "pg.subscribe_lease_history", skip_all)]
1234 async fn subscribe_lease_history(
1235 &self,
1236 cursor: ff_core::stream_subscribe::StreamCursor,
1237 filter: &ff_core::backend::ScannerFilter,
1238 ) -> Result<ff_core::stream_events::LeaseHistorySubscription, EngineError> {
1239 lease_event_subscribe::subscribe(&self.pool, 0, cursor, filter.clone()).await
1240 }
1241
1242 #[tracing::instrument(name = "pg.subscribe_signal_delivery", skip_all)]
1252 async fn subscribe_signal_delivery(
1253 &self,
1254 cursor: ff_core::stream_subscribe::StreamCursor,
1255 filter: &ff_core::backend::ScannerFilter,
1256 ) -> Result<ff_core::stream_events::SignalDeliverySubscription, EngineError> {
1257 signal_delivery_subscribe::subscribe(&self.pool, 0, cursor, filter.clone()).await
1258 }
1259}
1260
1261const MIN_MAX_LOCKS_PER_TRANSACTION: i64 = 256;
1270
1271async fn warn_if_max_locks_low(pool: &PgPool) {
1276 let row: Result<(String,), sqlx::Error> =
1277 sqlx::query_as("SHOW max_locks_per_transaction")
1278 .fetch_one(pool)
1279 .await;
1280 match row {
1281 Ok((raw,)) => emit_max_locks_decision(&raw),
1282 Err(e) => {
1283 tracing::debug!("failed to probe max_locks_per_transaction: {e}");
1284 }
1285 }
1286}
1287
1288fn max_locks_warn_value(raw: &str) -> Option<i64> {
1294 match raw.parse::<i64>() {
1295 Ok(v) if v < MIN_MAX_LOCKS_PER_TRANSACTION => Some(v),
1296 Ok(_) => None,
1297 Err(e) => {
1298 tracing::debug!(raw, "failed to parse max_locks_per_transaction: {e}");
1299 None
1300 }
1301 }
1302}
1303
1304fn emit_max_locks_decision(raw: &str) {
1305 if let Some(v) = max_locks_warn_value(raw) {
1306 tracing::warn!(
1307 current = v,
1308 recommended = MIN_MAX_LOCKS_PER_TRANSACTION,
1309 "postgres max_locks_per_transaction={v} is below the recommended \
1310 minimum ({MIN_MAX_LOCKS_PER_TRANSACTION}); partition-heavy workloads \
1311 may hit 'out of shared memory' under concurrent load. \
1312 See docs/operator-guide-postgres.md."
1313 );
1314 }
1315}
1316
1317#[cfg(test)]
1318mod max_locks_tests {
1319 use super::{max_locks_warn_value, MIN_MAX_LOCKS_PER_TRANSACTION};
1320
1321 #[test]
1322 fn warns_when_below_threshold() {
1323 assert_eq!(max_locks_warn_value("64"), Some(64));
1324 assert_eq!(
1325 max_locks_warn_value(&(MIN_MAX_LOCKS_PER_TRANSACTION - 1).to_string()),
1326 Some(MIN_MAX_LOCKS_PER_TRANSACTION - 1)
1327 );
1328 }
1329
1330 #[test]
1331 fn silent_at_or_above_threshold() {
1332 assert_eq!(
1333 max_locks_warn_value(&MIN_MAX_LOCKS_PER_TRANSACTION.to_string()),
1334 None
1335 );
1336 assert_eq!(max_locks_warn_value("1024"), None);
1337 }
1338
1339 #[test]
1340 fn silent_for_unparseable_raw() {
1341 assert_eq!(max_locks_warn_value("not-a-number"), None);
1342 }
1343}