1#![allow(clippy::result_large_err)]
20
21use std::sync::Arc;
22use std::time::Duration;
23
24use async_trait::async_trait;
25
26use ff_core::backend::{
27 AppendFrameOutcome, BackendConfig, CancelFlowPolicy, CancelFlowWait, CapabilitySet,
28 ClaimPolicy, FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal,
29 PendingWaitpoint, ReclaimToken, ResumeSignal, SummaryDocument, TailVisibility,
30 UsageDimensions,
31};
32#[cfg(feature = "core")]
33use ff_core::contracts::{
34 AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
35 ApplyDependencyToChildResult, ClaimResumedExecutionArgs, ClaimResumedExecutionResult,
36 CreateExecutionArgs, CreateExecutionResult, CreateFlowArgs, CreateFlowResult,
37 DeliverSignalArgs, DeliverSignalResult, EdgeDependencyPolicy, EdgeDirection, EdgeSnapshot,
38 ListExecutionsPage, ListFlowsPage, ListLanesPage, ListSuspendedPage,
39 SetEdgeGroupPolicyResult, StageDependencyEdgeArgs, StageDependencyEdgeResult,
40};
41#[cfg(feature = "core")]
42use ff_core::state::PublicState;
43use ff_core::contracts::{
44 CancelFlowResult, ExecutionSnapshot, FlowSnapshot, ReportUsageResult,
45 RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretAllResult, SeedOutcome,
46 SeedWaitpointHmacSecretArgs, SuspendArgs, SuspendOutcome,
47};
48#[cfg(feature = "streaming")]
49use ff_core::contracts::{StreamCursor, StreamFrames};
50use ff_core::engine_backend::EngineBackend;
51use ff_core::engine_error::EngineError;
52#[cfg(feature = "core")]
53use ff_core::partition::PartitionKey;
54use ff_core::partition::PartitionConfig;
55#[cfg(feature = "streaming")]
56use ff_core::types::AttemptIndex;
57#[cfg(feature = "core")]
58use ff_core::types::EdgeId;
59use ff_core::types::{BudgetId, ExecutionId, FlowId, LaneId, TimestampMs};
60pub use sqlx::PgPool;
64
65#[cfg(feature = "core")]
66mod admin;
67pub mod attempt;
68pub mod budget;
69pub mod completion;
70#[cfg(feature = "core")]
71pub mod dispatch;
72pub mod error;
73pub mod exec_core;
74pub mod flow;
75#[cfg(feature = "core")]
76pub mod flow_staging;
77pub mod handle_codec;
78pub mod listener;
79pub mod migrate;
80pub mod pool;
81#[cfg(feature = "core")]
82pub mod reconcilers;
83#[cfg(feature = "core")]
84pub mod scanner_supervisor;
85#[cfg(feature = "core")]
86pub mod scheduler;
87pub mod signal;
88#[cfg(feature = "streaming")]
89pub mod stream;
90pub mod suspend;
91pub mod suspend_ops;
92pub mod version;
93
94pub use completion::{PostgresCompletionStream, COMPLETION_CHANNEL};
95pub use error::{map_sqlx_error, PostgresTransportError};
96pub use listener::StreamNotifier;
97pub use migrate::{apply_migrations, MigrationError};
98#[cfg(feature = "core")]
99pub use scanner_supervisor::{PostgresScannerConfig, PostgresScannerHandle};
100pub use version::check_schema_version;
101
102pub use ff_core::backend::PostgresConnection;
108
109static POSTGRES_CAPS: &[(
132 ff_core::capability::Capability,
133 ff_core::capability::CapabilityStatus,
134)] = &[
135 (
136 ff_core::capability::Capability::ClaimForWorker,
137 ff_core::capability::CapabilityStatus::Supported,
138 ),
139 (
140 ff_core::capability::Capability::ClaimFromReclaim,
141 ff_core::capability::CapabilityStatus::Unsupported,
142 ),
143 (
144 ff_core::capability::Capability::SuspendResumeByCount,
145 ff_core::capability::CapabilityStatus::Unsupported,
146 ),
147 (
148 ff_core::capability::Capability::CancelExecution,
149 ff_core::capability::CapabilityStatus::Unsupported,
150 ),
151 (
152 ff_core::capability::Capability::CancelFlow,
153 ff_core::capability::CapabilityStatus::Supported,
154 ),
155 (
156 ff_core::capability::Capability::CancelFlowWaitTimeout,
157 ff_core::capability::CapabilityStatus::Supported,
158 ),
159 (
160 ff_core::capability::Capability::CancelFlowWaitIndefinite,
161 ff_core::capability::CapabilityStatus::Supported,
162 ),
163 (
164 ff_core::capability::Capability::StreamRead,
165 ff_core::capability::CapabilityStatus::Unsupported,
166 ),
167 (
168 ff_core::capability::Capability::StreamBestEffortLive,
169 ff_core::capability::CapabilityStatus::Unsupported,
170 ),
171 (
172 ff_core::capability::Capability::StreamDurableSummary,
173 ff_core::capability::CapabilityStatus::Unsupported,
174 ),
175 (
176 ff_core::capability::Capability::DeliverSignal,
177 ff_core::capability::CapabilityStatus::Unsupported,
178 ),
179 (
180 ff_core::capability::Capability::ListPendingWaitpoints,
181 ff_core::capability::CapabilityStatus::Unsupported,
182 ),
183 (
184 ff_core::capability::Capability::RotateWaitpointHmac,
185 ff_core::capability::CapabilityStatus::Unsupported,
186 ),
187 (
188 ff_core::capability::Capability::SeedWaitpointHmac,
189 ff_core::capability::CapabilityStatus::Supported,
190 ),
191 (
192 ff_core::capability::Capability::ReportUsage,
193 ff_core::capability::CapabilityStatus::Unsupported,
194 ),
195 (
196 ff_core::capability::Capability::ReportUsageAdminPath,
197 ff_core::capability::CapabilityStatus::Unsupported,
198 ),
199 (
200 ff_core::capability::Capability::ResetBudget,
201 ff_core::capability::CapabilityStatus::Unsupported,
202 ),
203 (
204 ff_core::capability::Capability::CreateFlow,
205 ff_core::capability::CapabilityStatus::Supported,
206 ),
207 (
208 ff_core::capability::Capability::CreateExecution,
209 ff_core::capability::CapabilityStatus::Supported,
210 ),
211 (
212 ff_core::capability::Capability::StageDependencyEdge,
213 ff_core::capability::CapabilityStatus::Supported,
214 ),
215 (
216 ff_core::capability::Capability::ApplyDependencyToChild,
217 ff_core::capability::CapabilityStatus::Supported,
218 ),
219 (
220 ff_core::capability::Capability::PreparableBoot,
221 ff_core::capability::CapabilityStatus::Unsupported,
225 ),
226 (
227 ff_core::capability::Capability::SubscribeLeaseHistory,
228 ff_core::capability::CapabilityStatus::Unsupported,
229 ),
230 (
231 ff_core::capability::Capability::SubscribeCompletion,
232 ff_core::capability::CapabilityStatus::Unsupported,
233 ),
234 (
235 ff_core::capability::Capability::SubscribeSignalDelivery,
236 ff_core::capability::CapabilityStatus::Unsupported,
237 ),
238 (
239 ff_core::capability::Capability::Ping,
240 ff_core::capability::CapabilityStatus::Supported,
241 ),
242];
243
244pub struct PostgresBackend {
245 #[allow(dead_code)] pool: PgPool,
247 #[allow(dead_code)]
248 partition_config: PartitionConfig,
249 #[allow(dead_code)]
250 metrics: Option<Arc<ff_observability::Metrics>>,
251 #[allow(dead_code)]
255 stream_notifier: Option<Arc<StreamNotifier>>,
256 #[cfg(feature = "core")]
262 scanner_handle: Option<Arc<scanner_supervisor::PostgresScannerHandle>>,
263}
264
265impl PostgresBackend {
266 pub async fn connect(config: BackendConfig) -> Result<Arc<dyn EngineBackend>, EngineError> {
281 let pool = pool::build_pool(&config).await?;
282 warn_if_max_locks_low(&pool).await;
283 let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
284 let backend = Self {
285 pool,
286 partition_config: PartitionConfig::default(),
287 metrics: None,
288 stream_notifier,
289 #[cfg(feature = "core")]
290 scanner_handle: None,
291 };
292 Ok(Arc::new(backend))
293 }
294
295 pub fn from_pool(pool: PgPool, partition_config: PartitionConfig) -> Arc<Self> {
301 let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
302 Arc::new(Self {
303 pool,
304 partition_config,
305 metrics: None,
306 stream_notifier,
307 #[cfg(feature = "core")]
308 scanner_handle: None,
309 })
310 }
311
312 pub async fn connect_with_metrics(
326 config: BackendConfig,
327 partition_config: PartitionConfig,
328 metrics: Arc<ff_observability::Metrics>,
329 ) -> Result<Arc<Self>, EngineError> {
330 let pool = pool::build_pool(&config).await?;
331 warn_if_max_locks_low(&pool).await;
332 let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
333 Ok(Arc::new(Self {
334 pool,
335 partition_config,
336 metrics: Some(metrics),
337 stream_notifier,
338 #[cfg(feature = "core")]
339 scanner_handle: None,
340 }))
341 }
342
343 #[cfg(feature = "core")]
350 pub fn with_scanners(
351 self: &mut Arc<Self>,
352 cfg: scanner_supervisor::PostgresScannerConfig,
353 ) -> bool {
354 let Some(inner) = Arc::get_mut(self) else {
355 return false;
356 };
357 let handle = scanner_supervisor::spawn_scanners(inner.pool.clone(), cfg);
358 inner.scanner_handle = Some(Arc::new(handle));
359 true
360 }
361
362 pub fn pool(&self) -> &PgPool {
367 &self.pool
368 }
369
370 #[cfg(feature = "core")]
383 #[tracing::instrument(name = "pg.create_execution", skip_all)]
384 pub async fn create_execution(
385 &self,
386 args: CreateExecutionArgs,
387 ) -> Result<ExecutionId, EngineError> {
388 exec_core::create_execution_impl(&self.pool, &self.partition_config, args).await
389 }
390
391 #[cfg(feature = "core")]
399 #[tracing::instrument(name = "pg.create_flow.inherent", skip_all)]
400 pub async fn create_flow(
401 &self,
402 args: &CreateFlowArgs,
403 ) -> Result<CreateFlowResult, EngineError> {
404 flow_staging::create_flow(&self.pool, &self.partition_config, args).await
405 }
406
407 #[cfg(feature = "core")]
408 #[tracing::instrument(name = "pg.add_execution_to_flow.inherent", skip_all)]
409 pub async fn add_execution_to_flow(
410 &self,
411 args: &AddExecutionToFlowArgs,
412 ) -> Result<AddExecutionToFlowResult, EngineError> {
413 flow_staging::add_execution_to_flow(&self.pool, &self.partition_config, args).await
414 }
415
416 #[cfg(feature = "core")]
417 #[tracing::instrument(name = "pg.stage_dependency_edge.inherent", skip_all)]
418 pub async fn stage_dependency_edge(
419 &self,
420 args: &StageDependencyEdgeArgs,
421 ) -> Result<StageDependencyEdgeResult, EngineError> {
422 flow_staging::stage_dependency_edge(&self.pool, &self.partition_config, args).await
423 }
424
425 #[cfg(feature = "core")]
426 #[tracing::instrument(name = "pg.apply_dependency_to_child.inherent", skip_all)]
427 pub async fn apply_dependency_to_child(
428 &self,
429 args: &ApplyDependencyToChildArgs,
430 ) -> Result<ApplyDependencyToChildResult, EngineError> {
431 flow_staging::apply_dependency_to_child(&self.pool, &self.partition_config, args).await
432 }
433}
434
435#[inline]
439fn unavailable<T>(op: &'static str) -> Result<T, EngineError> {
440 Err(EngineError::Unavailable { op })
441}
442
443#[async_trait]
444impl EngineBackend for PostgresBackend {
445 #[tracing::instrument(name = "pg.claim", skip_all)]
448 async fn claim(
449 &self,
450 lane: &LaneId,
451 capabilities: &CapabilitySet,
452 policy: ClaimPolicy,
453 ) -> Result<Option<Handle>, EngineError> {
454 attempt::claim(&self.pool, lane, capabilities, &policy).await
455 }
456
457 #[tracing::instrument(name = "pg.renew", skip_all)]
458 async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError> {
459 attempt::renew(&self.pool, handle).await
460 }
461
462 #[tracing::instrument(name = "pg.progress", skip_all)]
463 async fn progress(
464 &self,
465 handle: &Handle,
466 percent: Option<u8>,
467 message: Option<String>,
468 ) -> Result<(), EngineError> {
469 attempt::progress(&self.pool, handle, percent, message).await
470 }
471
472 #[tracing::instrument(name = "pg.append_frame", skip_all)]
473 async fn append_frame(
474 &self,
475 handle: &Handle,
476 frame: Frame,
477 ) -> Result<AppendFrameOutcome, EngineError> {
478 #[cfg(feature = "streaming")]
479 {
480 stream::append_frame(&self.pool, &self.partition_config, handle, frame).await
481 }
482 #[cfg(not(feature = "streaming"))]
483 {
484 let _ = (handle, frame);
485 unavailable("pg.append_frame")
486 }
487 }
488
489 #[tracing::instrument(name = "pg.complete", skip_all)]
490 async fn complete(
491 &self,
492 handle: &Handle,
493 payload: Option<Vec<u8>>,
494 ) -> Result<(), EngineError> {
495 attempt::complete(&self.pool, handle, payload).await
496 }
497
498 #[tracing::instrument(name = "pg.fail", skip_all)]
499 async fn fail(
500 &self,
501 handle: &Handle,
502 reason: FailureReason,
503 classification: FailureClass,
504 ) -> Result<FailOutcome, EngineError> {
505 attempt::fail(&self.pool, handle, reason, classification).await
506 }
507
508 #[tracing::instrument(name = "pg.cancel", skip_all)]
509 async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError> {
510 let payload = handle_codec::decode_handle(handle)?;
511 exec_core::cancel_impl(
512 &self.pool,
513 &self.partition_config,
514 &payload.execution_id,
515 reason,
516 )
517 .await
518 }
519
520 #[tracing::instrument(name = "pg.suspend", skip_all)]
521 async fn suspend(
522 &self,
523 handle: &Handle,
524 args: SuspendArgs,
525 ) -> Result<SuspendOutcome, EngineError> {
526 suspend_ops::suspend_impl(&self.pool, &self.partition_config, handle, args).await
527 }
528
529 #[tracing::instrument(name = "pg.create_waitpoint", skip_all)]
530 async fn create_waitpoint(
531 &self,
532 _handle: &Handle,
533 _waitpoint_key: &str,
534 _expires_in: Duration,
535 ) -> Result<PendingWaitpoint, EngineError> {
536 unavailable("pg.create_waitpoint")
537 }
538
539 #[tracing::instrument(name = "pg.observe_signals", skip_all)]
540 async fn observe_signals(
541 &self,
542 handle: &Handle,
543 ) -> Result<Vec<ResumeSignal>, EngineError> {
544 suspend_ops::observe_signals_impl(&self.pool, handle).await
545 }
546
547 #[tracing::instrument(name = "pg.claim_from_reclaim", skip_all)]
548 async fn claim_from_reclaim(
549 &self,
550 token: ReclaimToken,
551 ) -> Result<Option<Handle>, EngineError> {
552 attempt::claim_from_reclaim(&self.pool, token).await
553 }
554
555 #[tracing::instrument(name = "pg.delay", skip_all)]
556 async fn delay(
557 &self,
558 handle: &Handle,
559 delay_until: TimestampMs,
560 ) -> Result<(), EngineError> {
561 attempt::delay(&self.pool, handle, delay_until).await
562 }
563
564 #[tracing::instrument(name = "pg.wait_children", skip_all)]
565 async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError> {
566 attempt::wait_children(&self.pool, handle).await
567 }
568
569 #[tracing::instrument(name = "pg.describe_execution", skip_all)]
572 async fn describe_execution(
573 &self,
574 id: &ExecutionId,
575 ) -> Result<Option<ExecutionSnapshot>, EngineError> {
576 exec_core::describe_execution_impl(&self.pool, &self.partition_config, id).await
577 }
578
579 #[tracing::instrument(name = "pg.describe_flow", skip_all)]
580 async fn describe_flow(
581 &self,
582 id: &FlowId,
583 ) -> Result<Option<FlowSnapshot>, EngineError> {
584 flow::describe_flow(&self.pool, &self.partition_config, id).await
585 }
586
587 #[cfg(feature = "core")]
588 #[tracing::instrument(name = "pg.list_edges", skip_all)]
589 async fn list_edges(
590 &self,
591 flow_id: &FlowId,
592 direction: EdgeDirection,
593 ) -> Result<Vec<EdgeSnapshot>, EngineError> {
594 flow::list_edges(&self.pool, &self.partition_config, flow_id, direction).await
595 }
596
597 #[cfg(feature = "core")]
598 #[tracing::instrument(name = "pg.describe_edge", skip_all)]
599 async fn describe_edge(
600 &self,
601 flow_id: &FlowId,
602 edge_id: &EdgeId,
603 ) -> Result<Option<EdgeSnapshot>, EngineError> {
604 flow::describe_edge(&self.pool, &self.partition_config, flow_id, edge_id).await
605 }
606
607 #[cfg(feature = "core")]
608 #[tracing::instrument(name = "pg.resolve_execution_flow_id", skip_all)]
609 async fn resolve_execution_flow_id(
610 &self,
611 eid: &ExecutionId,
612 ) -> Result<Option<FlowId>, EngineError> {
613 exec_core::resolve_execution_flow_id_impl(&self.pool, &self.partition_config, eid).await
614 }
615
616 #[cfg(feature = "core")]
617 #[tracing::instrument(name = "pg.list_flows", skip_all)]
618 async fn list_flows(
619 &self,
620 partition: PartitionKey,
621 cursor: Option<FlowId>,
622 limit: usize,
623 ) -> Result<ListFlowsPage, EngineError> {
624 flow::list_flows(&self.pool, partition, cursor, limit).await
625 }
626
627 #[cfg(feature = "core")]
628 #[tracing::instrument(name = "pg.list_lanes", skip_all)]
629 async fn list_lanes(
630 &self,
631 cursor: Option<LaneId>,
632 limit: usize,
633 ) -> Result<ListLanesPage, EngineError> {
634 admin::list_lanes_impl(&self.pool, cursor, limit).await
635 }
636
637 #[cfg(feature = "core")]
638 #[tracing::instrument(name = "pg.list_suspended", skip_all)]
639 async fn list_suspended(
640 &self,
641 partition: PartitionKey,
642 cursor: Option<ExecutionId>,
643 limit: usize,
644 ) -> Result<ListSuspendedPage, EngineError> {
645 admin::list_suspended_impl(&self.pool, partition, cursor, limit).await
646 }
647
648 #[cfg(feature = "core")]
649 #[tracing::instrument(name = "pg.list_executions", skip_all)]
650 async fn list_executions(
651 &self,
652 partition: PartitionKey,
653 cursor: Option<ExecutionId>,
654 limit: usize,
655 ) -> Result<ListExecutionsPage, EngineError> {
656 exec_core::list_executions_impl(
657 &self.pool,
658 &self.partition_config,
659 partition,
660 cursor,
661 limit,
662 )
663 .await
664 }
665
666 #[cfg(feature = "core")]
669 #[tracing::instrument(name = "pg.deliver_signal", skip_all)]
670 async fn deliver_signal(
671 &self,
672 args: DeliverSignalArgs,
673 ) -> Result<DeliverSignalResult, EngineError> {
674 suspend_ops::deliver_signal_impl(&self.pool, &self.partition_config, args).await
675 }
676
677 #[cfg(feature = "core")]
678 #[tracing::instrument(name = "pg.claim_resumed_execution", skip_all)]
679 async fn claim_resumed_execution(
680 &self,
681 args: ClaimResumedExecutionArgs,
682 ) -> Result<ClaimResumedExecutionResult, EngineError> {
683 suspend_ops::claim_resumed_execution_impl(&self.pool, &self.partition_config, args).await
684 }
685
686 #[cfg(feature = "core")]
697 #[tracing::instrument(name = "pg.create_execution.trait", skip_all)]
698 async fn create_execution(
699 &self,
700 args: CreateExecutionArgs,
701 ) -> Result<CreateExecutionResult, EngineError> {
702 let eid = args.execution_id.clone();
703 exec_core::create_execution_impl(&self.pool, &self.partition_config, args).await?;
704 Ok(CreateExecutionResult::Created {
705 execution_id: eid,
706 public_state: PublicState::Waiting,
707 })
708 }
709
710 #[cfg(feature = "core")]
711 #[tracing::instrument(name = "pg.create_flow", skip_all)]
712 async fn create_flow(
713 &self,
714 args: CreateFlowArgs,
715 ) -> Result<CreateFlowResult, EngineError> {
716 flow_staging::create_flow(&self.pool, &self.partition_config, &args).await
717 }
718
719 #[cfg(feature = "core")]
720 #[tracing::instrument(name = "pg.add_execution_to_flow", skip_all)]
721 async fn add_execution_to_flow(
722 &self,
723 args: AddExecutionToFlowArgs,
724 ) -> Result<AddExecutionToFlowResult, EngineError> {
725 flow_staging::add_execution_to_flow(&self.pool, &self.partition_config, &args).await
726 }
727
728 #[cfg(feature = "core")]
729 #[tracing::instrument(name = "pg.stage_dependency_edge", skip_all)]
730 async fn stage_dependency_edge(
731 &self,
732 args: StageDependencyEdgeArgs,
733 ) -> Result<StageDependencyEdgeResult, EngineError> {
734 flow_staging::stage_dependency_edge(&self.pool, &self.partition_config, &args).await
735 }
736
737 #[cfg(feature = "core")]
738 #[tracing::instrument(name = "pg.apply_dependency_to_child", skip_all)]
739 async fn apply_dependency_to_child(
740 &self,
741 args: ApplyDependencyToChildArgs,
742 ) -> Result<ApplyDependencyToChildResult, EngineError> {
743 flow_staging::apply_dependency_to_child(&self.pool, &self.partition_config, &args).await
744 }
745
746 fn backend_label(&self) -> &'static str {
747 "postgres"
748 }
749
750 fn capabilities_matrix(&self) -> ff_core::capability::CapabilityMatrix {
759 let mut matrix = ff_core::capability::CapabilityMatrix::new(
760 ff_core::capability::BackendIdentity::new(
761 "postgres",
762 ff_core::capability::Version::new(0, 8, 1),
763 "E-shipped",
764 ),
765 );
766 for (cap, status) in POSTGRES_CAPS.iter() {
767 matrix.set(*cap, status.clone());
768 }
769 matrix
770 }
771
772 async fn prepare(
780 &self,
781 ) -> Result<ff_core::backend::PrepareOutcome, EngineError> {
782 Ok(ff_core::backend::PrepareOutcome::NoOp)
783 }
784
785 #[cfg(feature = "core")]
793 #[tracing::instrument(name = "pg.claim_for_worker", skip_all)]
794 async fn claim_for_worker(
795 &self,
796 args: ff_core::contracts::ClaimForWorkerArgs,
797 ) -> Result<ff_core::contracts::ClaimForWorkerOutcome, EngineError> {
798 let sched = scheduler::PostgresScheduler::new(self.pool.clone());
799 let grant_opt = sched
800 .claim_for_worker(
801 &args.lane_id,
802 &args.worker_id,
803 &args.worker_instance_id,
804 &args.worker_capabilities,
805 args.grant_ttl_ms,
806 )
807 .await?;
808 Ok(match grant_opt {
809 Some(g) => ff_core::contracts::ClaimForWorkerOutcome::granted(g),
810 None => ff_core::contracts::ClaimForWorkerOutcome::no_work(),
811 })
812 }
813
814 async fn ping(&self) -> Result<(), EngineError> {
815 let _ = sqlx::query_scalar::<_, i32>("SELECT 1")
819 .fetch_one(&self.pool)
820 .await
821 .map_err(error::map_sqlx_error)?;
822 Ok(())
823 }
824
825 async fn shutdown_prepare(&self, grace: Duration) -> Result<(), EngineError> {
830 #[cfg(feature = "core")]
831 if let Some(handle) = self.scanner_handle.as_ref() {
832 let timed_out = handle.shutdown(grace).await;
833 if timed_out > 0 {
834 tracing::warn!(
835 timed_out,
836 ?grace,
837 "postgres scanner supervisor exceeded grace on shutdown"
838 );
839 }
840 }
841 Ok(())
842 }
843
844 #[tracing::instrument(name = "pg.cancel_flow", skip_all)]
845 async fn cancel_flow(
846 &self,
847 id: &FlowId,
848 policy: CancelFlowPolicy,
849 wait: CancelFlowWait,
850 ) -> Result<CancelFlowResult, EngineError> {
851 let result = flow::cancel_flow(&self.pool, &self.partition_config, id, policy).await?;
852 if let Some(deadline) = ff_core::engine_backend::cancel_flow_wait_deadline(wait) {
853 ff_core::engine_backend::wait_for_flow_cancellation(self, id, deadline).await?;
854 }
855 Ok(result)
856 }
857
858 #[cfg(feature = "core")]
859 #[tracing::instrument(name = "pg.set_edge_group_policy", skip_all)]
860 async fn set_edge_group_policy(
861 &self,
862 flow_id: &FlowId,
863 downstream_execution_id: &ExecutionId,
864 policy: EdgeDependencyPolicy,
865 ) -> Result<SetEdgeGroupPolicyResult, EngineError> {
866 flow::set_edge_group_policy(
867 &self.pool,
868 &self.partition_config,
869 flow_id,
870 downstream_execution_id,
871 policy,
872 )
873 .await
874 }
875
876 #[tracing::instrument(name = "pg.report_usage", skip_all)]
879 async fn report_usage(
880 &self,
881 _handle: &Handle,
882 budget: &BudgetId,
883 dimensions: UsageDimensions,
884 ) -> Result<ReportUsageResult, EngineError> {
885 budget::report_usage_impl(&self.pool, &self.partition_config, budget, dimensions).await
886 }
887
888 #[tracing::instrument(name = "pg.rotate_waitpoint_hmac_secret_all", skip_all)]
895 async fn rotate_waitpoint_hmac_secret_all(
896 &self,
897 args: RotateWaitpointHmacSecretAllArgs,
898 ) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
899 let now_ms = std::time::SystemTime::now()
904 .duration_since(std::time::UNIX_EPOCH)
905 .map(|d| d.as_millis() as i64)
906 .unwrap_or(0);
907 signal::rotate_waitpoint_hmac_secret_all_impl(&self.pool, args, now_ms).await
908 }
909
910 #[tracing::instrument(name = "pg.seed_waitpoint_hmac_secret", skip_all)]
911 async fn seed_waitpoint_hmac_secret(
912 &self,
913 args: SeedWaitpointHmacSecretArgs,
914 ) -> Result<SeedOutcome, EngineError> {
915 let now_ms = std::time::SystemTime::now()
919 .duration_since(std::time::UNIX_EPOCH)
920 .map(|d| d.as_millis() as i64)
921 .unwrap_or(0);
922 signal::seed_waitpoint_hmac_secret_impl(&self.pool, args, now_ms).await
923 }
924
925 #[cfg(feature = "streaming")]
928 #[tracing::instrument(name = "pg.read_stream", skip_all)]
929 async fn read_stream(
930 &self,
931 execution_id: &ExecutionId,
932 attempt_index: AttemptIndex,
933 from: StreamCursor,
934 to: StreamCursor,
935 count_limit: u64,
936 ) -> Result<StreamFrames, EngineError> {
937 stream::read_stream(&self.pool, execution_id, attempt_index, from, to, count_limit).await
938 }
939
940 #[cfg(feature = "streaming")]
941 #[tracing::instrument(name = "pg.tail_stream", skip_all)]
942 async fn tail_stream(
943 &self,
944 execution_id: &ExecutionId,
945 attempt_index: AttemptIndex,
946 after: StreamCursor,
947 block_ms: u64,
948 count_limit: u64,
949 visibility: TailVisibility,
950 ) -> Result<StreamFrames, EngineError> {
951 let notifier = self
952 .stream_notifier
953 .as_ref()
954 .ok_or(EngineError::Unavailable {
955 op: "pg.tail_stream (notifier not initialised)",
956 })?;
957 stream::tail_stream(
958 &self.pool,
959 notifier,
960 execution_id,
961 attempt_index,
962 after,
963 block_ms,
964 count_limit,
965 visibility,
966 )
967 .await
968 }
969
970 #[cfg(feature = "streaming")]
971 #[tracing::instrument(name = "pg.read_summary", skip_all)]
972 async fn read_summary(
973 &self,
974 execution_id: &ExecutionId,
975 attempt_index: AttemptIndex,
976 ) -> Result<Option<SummaryDocument>, EngineError> {
977 stream::read_summary(&self.pool, execution_id, attempt_index).await
978 }
979
980 #[tracing::instrument(name = "pg.subscribe_completion", skip_all)]
994 async fn subscribe_completion(
995 &self,
996 _cursor: ff_core::stream_subscribe::StreamCursor,
997 ) -> Result<ff_core::stream_subscribe::StreamSubscription, EngineError> {
998 use ff_core::stream_subscribe::{
999 encode_postgres_event_cursor, StreamEvent, StreamFamily,
1000 };
1001 use futures_core::Stream;
1002 use std::pin::Pin;
1003 use std::task::{Context, Poll};
1004
1005 let inner = ff_core::completion_backend::CompletionBackend::subscribe_completions(self)
1010 .await?;
1011
1012 struct Adapter {
1016 inner: ff_core::completion_backend::CompletionStream,
1017 }
1018
1019 impl Stream for Adapter {
1020 type Item = Result<StreamEvent, EngineError>;
1021 fn poll_next(
1022 mut self: Pin<&mut Self>,
1023 cx: &mut Context<'_>,
1024 ) -> Poll<Option<Self::Item>> {
1025 match Pin::new(&mut self.inner).poll_next(cx) {
1026 Poll::Pending => Poll::Pending,
1027 Poll::Ready(None) => Poll::Ready(None),
1028 Poll::Ready(Some(payload)) => {
1029 let cursor = encode_postgres_event_cursor(0);
1039 let event = StreamEvent::new(
1040 StreamFamily::Completion,
1041 cursor,
1042 payload.produced_at_ms,
1043 bytes::Bytes::from(payload.outcome.clone().into_bytes()),
1044 )
1045 .with_execution_id(payload.execution_id.clone());
1046 Poll::Ready(Some(Ok(event)))
1047 }
1048 }
1049 }
1050 }
1051
1052 Ok(Box::pin(Adapter { inner }))
1053 }
1054}
1055
1056const MIN_MAX_LOCKS_PER_TRANSACTION: i64 = 256;
1065
1066async fn warn_if_max_locks_low(pool: &PgPool) {
1071 let row: Result<(String,), sqlx::Error> =
1072 sqlx::query_as("SHOW max_locks_per_transaction")
1073 .fetch_one(pool)
1074 .await;
1075 match row {
1076 Ok((raw,)) => emit_max_locks_decision(&raw),
1077 Err(e) => {
1078 tracing::debug!("failed to probe max_locks_per_transaction: {e}");
1079 }
1080 }
1081}
1082
1083fn max_locks_warn_value(raw: &str) -> Option<i64> {
1089 match raw.parse::<i64>() {
1090 Ok(v) if v < MIN_MAX_LOCKS_PER_TRANSACTION => Some(v),
1091 Ok(_) => None,
1092 Err(e) => {
1093 tracing::debug!(raw, "failed to parse max_locks_per_transaction: {e}");
1094 None
1095 }
1096 }
1097}
1098
1099fn emit_max_locks_decision(raw: &str) {
1100 if let Some(v) = max_locks_warn_value(raw) {
1101 tracing::warn!(
1102 current = v,
1103 recommended = MIN_MAX_LOCKS_PER_TRANSACTION,
1104 "postgres max_locks_per_transaction={v} is below the recommended \
1105 minimum ({MIN_MAX_LOCKS_PER_TRANSACTION}); partition-heavy workloads \
1106 may hit 'out of shared memory' under concurrent load. \
1107 See docs/operator-guide-postgres.md."
1108 );
1109 }
1110}
1111
1112#[cfg(test)]
1113mod max_locks_tests {
1114 use super::{max_locks_warn_value, MIN_MAX_LOCKS_PER_TRANSACTION};
1115
1116 #[test]
1117 fn warns_when_below_threshold() {
1118 assert_eq!(max_locks_warn_value("64"), Some(64));
1119 assert_eq!(
1120 max_locks_warn_value(&(MIN_MAX_LOCKS_PER_TRANSACTION - 1).to_string()),
1121 Some(MIN_MAX_LOCKS_PER_TRANSACTION - 1)
1122 );
1123 }
1124
1125 #[test]
1126 fn silent_at_or_above_threshold() {
1127 assert_eq!(
1128 max_locks_warn_value(&MIN_MAX_LOCKS_PER_TRANSACTION.to_string()),
1129 None
1130 );
1131 assert_eq!(max_locks_warn_value("1024"), None);
1132 }
1133
1134 #[test]
1135 fn silent_for_unparseable_raw() {
1136 assert_eq!(max_locks_warn_value("not-a-number"), None);
1137 }
1138}