1#![allow(clippy::result_large_err)]
20
21use std::sync::Arc;
22use std::time::Duration;
23
24use async_trait::async_trait;
25
26use ff_core::backend::{
27 AppendFrameOutcome, BackendConfig, CancelFlowPolicy, CancelFlowWait, CapabilitySet,
28 ClaimPolicy, FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal,
29 PendingWaitpoint, ReclaimToken, ResumeSignal, SummaryDocument, TailVisibility,
30 UsageDimensions,
31};
32#[cfg(feature = "core")]
33use ff_core::contracts::{
34 AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
35 ApplyDependencyToChildResult, ClaimResumedExecutionArgs, ClaimResumedExecutionResult,
36 CreateExecutionArgs, CreateExecutionResult, CreateFlowArgs, CreateFlowResult,
37 DeliverSignalArgs, DeliverSignalResult, EdgeDependencyPolicy, EdgeDirection, EdgeSnapshot,
38 ListExecutionsPage, ListFlowsPage, ListLanesPage, ListSuspendedPage,
39 SetEdgeGroupPolicyResult, StageDependencyEdgeArgs, StageDependencyEdgeResult,
40};
41#[cfg(feature = "core")]
42use ff_core::state::PublicState;
43use ff_core::contracts::{
44 CancelFlowResult, ExecutionSnapshot, FlowSnapshot, ReportUsageResult,
45 RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretAllResult, SeedOutcome,
46 SeedWaitpointHmacSecretArgs, SuspendArgs, SuspendOutcome,
47};
48#[cfg(feature = "streaming")]
49use ff_core::contracts::{StreamCursor, StreamFrames};
50use ff_core::engine_backend::EngineBackend;
51use ff_core::engine_error::EngineError;
52#[cfg(feature = "core")]
53use ff_core::partition::PartitionKey;
54use ff_core::partition::PartitionConfig;
55#[cfg(feature = "streaming")]
56use ff_core::types::AttemptIndex;
57#[cfg(feature = "core")]
58use ff_core::types::EdgeId;
59use ff_core::types::{BudgetId, ExecutionId, FlowId, LaneId, LeaseFence, TimestampMs};
60pub use sqlx::PgPool;
64
65#[cfg(feature = "core")]
66mod admin;
67pub mod attempt;
68pub mod budget;
69pub mod completion;
70#[cfg(feature = "core")]
71pub mod dispatch;
72pub mod error;
73pub mod exec_core;
74pub mod flow;
75#[cfg(feature = "core")]
76pub mod flow_staging;
77pub mod handle_codec;
78mod lease_event;
79mod lease_event_subscribe;
80pub mod listener;
81pub mod migrate;
82pub mod pool;
83#[cfg(feature = "core")]
84pub mod reconcilers;
85#[cfg(feature = "core")]
86pub mod scanner_supervisor;
87#[cfg(feature = "core")]
88pub mod scheduler;
89pub mod signal;
90mod signal_delivery_subscribe;
91mod signal_event;
92#[cfg(feature = "streaming")]
93pub mod stream;
94pub mod suspend;
95pub mod suspend_ops;
96pub mod version;
97
98pub use completion::{PostgresCompletionStream, COMPLETION_CHANNEL};
99pub use error::{map_sqlx_error, PostgresTransportError};
100pub use listener::StreamNotifier;
101pub use migrate::{apply_migrations, MigrationError};
102#[cfg(feature = "core")]
103pub use scanner_supervisor::{PostgresScannerConfig, PostgresScannerHandle};
104pub use version::check_schema_version;
105
106pub use ff_core::backend::PostgresConnection;
112
113fn postgres_supports_base() -> ff_core::capability::Supports {
143 let mut s = ff_core::capability::Supports::none();
144
145 s.cancel_flow_wait_timeout = true;
147 s.cancel_flow_wait_indefinite = true;
148
149 s.rotate_waitpoint_hmac_secret_all = true;
151 s.seed_waitpoint_hmac_secret = true;
152
153 s.claim_for_worker = true;
155
156 s.subscribe_lease_history = true;
158 s.subscribe_completion = true;
159 s.subscribe_signal_delivery = true;
160 s.subscribe_instance_tags = false;
161
162 s.stream_durable_summary = true;
164 s.stream_best_effort_live = true;
165
166 s.prepare = true;
168
169 s
174}
175
176pub struct PostgresBackend {
177 #[allow(dead_code)] pool: PgPool,
179 #[allow(dead_code)]
180 partition_config: PartitionConfig,
181 #[allow(dead_code)]
182 metrics: Option<Arc<ff_observability::Metrics>>,
183 #[allow(dead_code)]
187 stream_notifier: Option<Arc<StreamNotifier>>,
188 #[cfg(feature = "core")]
194 scanner_handle: Option<Arc<scanner_supervisor::PostgresScannerHandle>>,
195}
196
197impl PostgresBackend {
198 pub async fn connect(config: BackendConfig) -> Result<Arc<dyn EngineBackend>, EngineError> {
213 let pool = pool::build_pool(&config).await?;
214 warn_if_max_locks_low(&pool).await;
215 let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
216 let backend = Self {
217 pool,
218 partition_config: PartitionConfig::default(),
219 metrics: None,
220 stream_notifier,
221 #[cfg(feature = "core")]
222 scanner_handle: None,
223 };
224 Ok(Arc::new(backend))
225 }
226
227 pub fn from_pool(pool: PgPool, partition_config: PartitionConfig) -> Arc<Self> {
233 let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
234 Arc::new(Self {
235 pool,
236 partition_config,
237 metrics: None,
238 stream_notifier,
239 #[cfg(feature = "core")]
240 scanner_handle: None,
241 })
242 }
243
244 pub async fn connect_with_metrics(
258 config: BackendConfig,
259 partition_config: PartitionConfig,
260 metrics: Arc<ff_observability::Metrics>,
261 ) -> Result<Arc<Self>, EngineError> {
262 let pool = pool::build_pool(&config).await?;
263 warn_if_max_locks_low(&pool).await;
264 let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
265 Ok(Arc::new(Self {
266 pool,
267 partition_config,
268 metrics: Some(metrics),
269 stream_notifier,
270 #[cfg(feature = "core")]
271 scanner_handle: None,
272 }))
273 }
274
275 #[cfg(feature = "core")]
282 pub fn with_scanners(
283 self: &mut Arc<Self>,
284 cfg: scanner_supervisor::PostgresScannerConfig,
285 ) -> bool {
286 let Some(inner) = Arc::get_mut(self) else {
287 return false;
288 };
289 let handle = scanner_supervisor::spawn_scanners(inner.pool.clone(), cfg);
290 inner.scanner_handle = Some(Arc::new(handle));
291 true
292 }
293
294 pub fn pool(&self) -> &PgPool {
299 &self.pool
300 }
301
302 #[cfg(feature = "core")]
315 #[tracing::instrument(name = "pg.create_execution", skip_all)]
316 pub async fn create_execution(
317 &self,
318 args: CreateExecutionArgs,
319 ) -> Result<ExecutionId, EngineError> {
320 exec_core::create_execution_impl(&self.pool, &self.partition_config, args).await
321 }
322
323 #[cfg(feature = "core")]
331 #[tracing::instrument(name = "pg.create_flow.inherent", skip_all)]
332 pub async fn create_flow(
333 &self,
334 args: &CreateFlowArgs,
335 ) -> Result<CreateFlowResult, EngineError> {
336 flow_staging::create_flow(&self.pool, &self.partition_config, args).await
337 }
338
339 #[cfg(feature = "core")]
340 #[tracing::instrument(name = "pg.add_execution_to_flow.inherent", skip_all)]
341 pub async fn add_execution_to_flow(
342 &self,
343 args: &AddExecutionToFlowArgs,
344 ) -> Result<AddExecutionToFlowResult, EngineError> {
345 flow_staging::add_execution_to_flow(&self.pool, &self.partition_config, args).await
346 }
347
348 #[cfg(feature = "core")]
349 #[tracing::instrument(name = "pg.stage_dependency_edge.inherent", skip_all)]
350 pub async fn stage_dependency_edge(
351 &self,
352 args: &StageDependencyEdgeArgs,
353 ) -> Result<StageDependencyEdgeResult, EngineError> {
354 flow_staging::stage_dependency_edge(&self.pool, &self.partition_config, args).await
355 }
356
357 #[cfg(feature = "core")]
358 #[tracing::instrument(name = "pg.apply_dependency_to_child.inherent", skip_all)]
359 pub async fn apply_dependency_to_child(
360 &self,
361 args: &ApplyDependencyToChildArgs,
362 ) -> Result<ApplyDependencyToChildResult, EngineError> {
363 flow_staging::apply_dependency_to_child(&self.pool, &self.partition_config, args).await
364 }
365}
366
367#[inline]
371fn unavailable<T>(op: &'static str) -> Result<T, EngineError> {
372 Err(EngineError::Unavailable { op })
373}
374
375#[async_trait]
376impl EngineBackend for PostgresBackend {
377 #[tracing::instrument(name = "pg.claim", skip_all)]
380 async fn claim(
381 &self,
382 lane: &LaneId,
383 capabilities: &CapabilitySet,
384 policy: ClaimPolicy,
385 ) -> Result<Option<Handle>, EngineError> {
386 attempt::claim(&self.pool, lane, capabilities, &policy).await
387 }
388
389 #[tracing::instrument(name = "pg.renew", skip_all)]
390 async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError> {
391 attempt::renew(&self.pool, handle).await
392 }
393
394 #[tracing::instrument(name = "pg.progress", skip_all)]
395 async fn progress(
396 &self,
397 handle: &Handle,
398 percent: Option<u8>,
399 message: Option<String>,
400 ) -> Result<(), EngineError> {
401 attempt::progress(&self.pool, handle, percent, message).await
402 }
403
404 #[tracing::instrument(name = "pg.append_frame", skip_all)]
405 async fn append_frame(
406 &self,
407 handle: &Handle,
408 frame: Frame,
409 ) -> Result<AppendFrameOutcome, EngineError> {
410 #[cfg(feature = "streaming")]
411 {
412 stream::append_frame(&self.pool, &self.partition_config, handle, frame).await
413 }
414 #[cfg(not(feature = "streaming"))]
415 {
416 let _ = (handle, frame);
417 unavailable("pg.append_frame")
418 }
419 }
420
421 #[tracing::instrument(name = "pg.complete", skip_all)]
422 async fn complete(
423 &self,
424 handle: &Handle,
425 payload: Option<Vec<u8>>,
426 ) -> Result<(), EngineError> {
427 attempt::complete(&self.pool, handle, payload).await
428 }
429
430 #[tracing::instrument(name = "pg.fail", skip_all)]
431 async fn fail(
432 &self,
433 handle: &Handle,
434 reason: FailureReason,
435 classification: FailureClass,
436 ) -> Result<FailOutcome, EngineError> {
437 attempt::fail(&self.pool, handle, reason, classification).await
438 }
439
440 #[tracing::instrument(name = "pg.cancel", skip_all)]
441 async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError> {
442 let payload = handle_codec::decode_handle(handle)?;
443 exec_core::cancel_impl(
444 &self.pool,
445 &self.partition_config,
446 &payload.execution_id,
447 reason,
448 )
449 .await
450 }
451
452 #[tracing::instrument(name = "pg.suspend", skip_all)]
453 async fn suspend(
454 &self,
455 handle: &Handle,
456 args: SuspendArgs,
457 ) -> Result<SuspendOutcome, EngineError> {
458 suspend_ops::suspend_impl(&self.pool, &self.partition_config, handle, args).await
459 }
460
461 #[tracing::instrument(name = "pg.suspend_by_triple", skip_all)]
462 async fn suspend_by_triple(
463 &self,
464 exec_id: ExecutionId,
465 triple: LeaseFence,
466 args: SuspendArgs,
467 ) -> Result<SuspendOutcome, EngineError> {
468 suspend_ops::suspend_by_triple_impl(
469 &self.pool,
470 &self.partition_config,
471 exec_id,
472 triple,
473 args,
474 )
475 .await
476 }
477
478 #[tracing::instrument(name = "pg.create_waitpoint", skip_all)]
479 async fn create_waitpoint(
480 &self,
481 _handle: &Handle,
482 _waitpoint_key: &str,
483 _expires_in: Duration,
484 ) -> Result<PendingWaitpoint, EngineError> {
485 unavailable("pg.create_waitpoint")
486 }
487
488 #[tracing::instrument(name = "pg.observe_signals", skip_all)]
489 async fn observe_signals(
490 &self,
491 handle: &Handle,
492 ) -> Result<Vec<ResumeSignal>, EngineError> {
493 suspend_ops::observe_signals_impl(&self.pool, handle).await
494 }
495
496 #[tracing::instrument(name = "pg.claim_from_reclaim", skip_all)]
497 async fn claim_from_reclaim(
498 &self,
499 token: ReclaimToken,
500 ) -> Result<Option<Handle>, EngineError> {
501 attempt::claim_from_reclaim(&self.pool, token).await
502 }
503
504 #[tracing::instrument(name = "pg.delay", skip_all)]
505 async fn delay(
506 &self,
507 handle: &Handle,
508 delay_until: TimestampMs,
509 ) -> Result<(), EngineError> {
510 attempt::delay(&self.pool, handle, delay_until).await
511 }
512
513 #[tracing::instrument(name = "pg.wait_children", skip_all)]
514 async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError> {
515 attempt::wait_children(&self.pool, handle).await
516 }
517
518 #[tracing::instrument(name = "pg.describe_execution", skip_all)]
521 async fn describe_execution(
522 &self,
523 id: &ExecutionId,
524 ) -> Result<Option<ExecutionSnapshot>, EngineError> {
525 exec_core::describe_execution_impl(&self.pool, &self.partition_config, id).await
526 }
527
528 #[tracing::instrument(name = "pg.describe_flow", skip_all)]
529 async fn describe_flow(
530 &self,
531 id: &FlowId,
532 ) -> Result<Option<FlowSnapshot>, EngineError> {
533 flow::describe_flow(&self.pool, &self.partition_config, id).await
534 }
535
536 #[cfg(feature = "core")]
537 #[tracing::instrument(name = "pg.list_edges", skip_all)]
538 async fn list_edges(
539 &self,
540 flow_id: &FlowId,
541 direction: EdgeDirection,
542 ) -> Result<Vec<EdgeSnapshot>, EngineError> {
543 flow::list_edges(&self.pool, &self.partition_config, flow_id, direction).await
544 }
545
546 #[cfg(feature = "core")]
547 #[tracing::instrument(name = "pg.describe_edge", skip_all)]
548 async fn describe_edge(
549 &self,
550 flow_id: &FlowId,
551 edge_id: &EdgeId,
552 ) -> Result<Option<EdgeSnapshot>, EngineError> {
553 flow::describe_edge(&self.pool, &self.partition_config, flow_id, edge_id).await
554 }
555
556 #[cfg(feature = "core")]
557 #[tracing::instrument(name = "pg.resolve_execution_flow_id", skip_all)]
558 async fn resolve_execution_flow_id(
559 &self,
560 eid: &ExecutionId,
561 ) -> Result<Option<FlowId>, EngineError> {
562 exec_core::resolve_execution_flow_id_impl(&self.pool, &self.partition_config, eid).await
563 }
564
565 #[cfg(feature = "core")]
566 #[tracing::instrument(name = "pg.list_flows", skip_all)]
567 async fn list_flows(
568 &self,
569 partition: PartitionKey,
570 cursor: Option<FlowId>,
571 limit: usize,
572 ) -> Result<ListFlowsPage, EngineError> {
573 flow::list_flows(&self.pool, partition, cursor, limit).await
574 }
575
576 #[cfg(feature = "core")]
577 #[tracing::instrument(name = "pg.list_lanes", skip_all)]
578 async fn list_lanes(
579 &self,
580 cursor: Option<LaneId>,
581 limit: usize,
582 ) -> Result<ListLanesPage, EngineError> {
583 admin::list_lanes_impl(&self.pool, cursor, limit).await
584 }
585
586 #[cfg(feature = "core")]
587 #[tracing::instrument(name = "pg.list_suspended", skip_all)]
588 async fn list_suspended(
589 &self,
590 partition: PartitionKey,
591 cursor: Option<ExecutionId>,
592 limit: usize,
593 ) -> Result<ListSuspendedPage, EngineError> {
594 admin::list_suspended_impl(&self.pool, partition, cursor, limit).await
595 }
596
597 #[cfg(feature = "core")]
598 #[tracing::instrument(name = "pg.list_executions", skip_all)]
599 async fn list_executions(
600 &self,
601 partition: PartitionKey,
602 cursor: Option<ExecutionId>,
603 limit: usize,
604 ) -> Result<ListExecutionsPage, EngineError> {
605 exec_core::list_executions_impl(
606 &self.pool,
607 &self.partition_config,
608 partition,
609 cursor,
610 limit,
611 )
612 .await
613 }
614
615 #[cfg(feature = "core")]
618 #[tracing::instrument(name = "pg.deliver_signal", skip_all)]
619 async fn deliver_signal(
620 &self,
621 args: DeliverSignalArgs,
622 ) -> Result<DeliverSignalResult, EngineError> {
623 suspend_ops::deliver_signal_impl(&self.pool, &self.partition_config, args).await
624 }
625
626 #[cfg(feature = "core")]
627 #[tracing::instrument(name = "pg.claim_resumed_execution", skip_all)]
628 async fn claim_resumed_execution(
629 &self,
630 args: ClaimResumedExecutionArgs,
631 ) -> Result<ClaimResumedExecutionResult, EngineError> {
632 suspend_ops::claim_resumed_execution_impl(&self.pool, &self.partition_config, args).await
633 }
634
635 #[cfg(feature = "core")]
646 #[tracing::instrument(name = "pg.create_execution.trait", skip_all)]
647 async fn create_execution(
648 &self,
649 args: CreateExecutionArgs,
650 ) -> Result<CreateExecutionResult, EngineError> {
651 let eid = args.execution_id.clone();
652 exec_core::create_execution_impl(&self.pool, &self.partition_config, args).await?;
653 Ok(CreateExecutionResult::Created {
654 execution_id: eid,
655 public_state: PublicState::Waiting,
656 })
657 }
658
659 #[cfg(feature = "core")]
660 #[tracing::instrument(name = "pg.create_flow", skip_all)]
661 async fn create_flow(
662 &self,
663 args: CreateFlowArgs,
664 ) -> Result<CreateFlowResult, EngineError> {
665 flow_staging::create_flow(&self.pool, &self.partition_config, &args).await
666 }
667
668 #[cfg(feature = "core")]
669 #[tracing::instrument(name = "pg.add_execution_to_flow", skip_all)]
670 async fn add_execution_to_flow(
671 &self,
672 args: AddExecutionToFlowArgs,
673 ) -> Result<AddExecutionToFlowResult, EngineError> {
674 flow_staging::add_execution_to_flow(&self.pool, &self.partition_config, &args).await
675 }
676
677 #[cfg(feature = "core")]
678 #[tracing::instrument(name = "pg.stage_dependency_edge", skip_all)]
679 async fn stage_dependency_edge(
680 &self,
681 args: StageDependencyEdgeArgs,
682 ) -> Result<StageDependencyEdgeResult, EngineError> {
683 flow_staging::stage_dependency_edge(&self.pool, &self.partition_config, &args).await
684 }
685
686 #[cfg(feature = "core")]
687 #[tracing::instrument(name = "pg.apply_dependency_to_child", skip_all)]
688 async fn apply_dependency_to_child(
689 &self,
690 args: ApplyDependencyToChildArgs,
691 ) -> Result<ApplyDependencyToChildResult, EngineError> {
692 flow_staging::apply_dependency_to_child(&self.pool, &self.partition_config, &args).await
693 }
694
695 fn backend_label(&self) -> &'static str {
696 "postgres"
697 }
698
699 fn capabilities(&self) -> ff_core::capability::Capabilities {
707 ff_core::capability::Capabilities::new(
708 ff_core::capability::BackendIdentity::new(
709 "postgres",
710 ff_core::capability::Version::new(0, 10, 0),
711 "E-shipped",
712 ),
713 postgres_supports_base(),
714 )
715 }
716
717 async fn prepare(
725 &self,
726 ) -> Result<ff_core::backend::PrepareOutcome, EngineError> {
727 Ok(ff_core::backend::PrepareOutcome::NoOp)
728 }
729
730 #[cfg(feature = "core")]
738 #[tracing::instrument(name = "pg.claim_for_worker", skip_all)]
739 async fn claim_for_worker(
740 &self,
741 args: ff_core::contracts::ClaimForWorkerArgs,
742 ) -> Result<ff_core::contracts::ClaimForWorkerOutcome, EngineError> {
743 let sched = scheduler::PostgresScheduler::new(self.pool.clone());
744 let grant_opt = sched
745 .claim_for_worker(
746 &args.lane_id,
747 &args.worker_id,
748 &args.worker_instance_id,
749 &args.worker_capabilities,
750 args.grant_ttl_ms,
751 )
752 .await?;
753 Ok(match grant_opt {
754 Some(g) => ff_core::contracts::ClaimForWorkerOutcome::granted(g),
755 None => ff_core::contracts::ClaimForWorkerOutcome::no_work(),
756 })
757 }
758
759 async fn ping(&self) -> Result<(), EngineError> {
760 let _ = sqlx::query_scalar::<_, i32>("SELECT 1")
764 .fetch_one(&self.pool)
765 .await
766 .map_err(error::map_sqlx_error)?;
767 Ok(())
768 }
769
770 async fn shutdown_prepare(&self, grace: Duration) -> Result<(), EngineError> {
775 #[cfg(feature = "core")]
776 if let Some(handle) = self.scanner_handle.as_ref() {
777 let timed_out = handle.shutdown(grace).await;
778 if timed_out > 0 {
779 tracing::warn!(
780 timed_out,
781 ?grace,
782 "postgres scanner supervisor exceeded grace on shutdown"
783 );
784 }
785 }
786 Ok(())
787 }
788
789 #[tracing::instrument(name = "pg.cancel_flow", skip_all)]
790 async fn cancel_flow(
791 &self,
792 id: &FlowId,
793 policy: CancelFlowPolicy,
794 wait: CancelFlowWait,
795 ) -> Result<CancelFlowResult, EngineError> {
796 let result = flow::cancel_flow(&self.pool, &self.partition_config, id, policy).await?;
797 if let Some(deadline) = ff_core::engine_backend::cancel_flow_wait_deadline(wait) {
798 ff_core::engine_backend::wait_for_flow_cancellation(self, id, deadline).await?;
799 }
800 Ok(result)
801 }
802
803 #[cfg(feature = "core")]
804 #[tracing::instrument(name = "pg.set_edge_group_policy", skip_all)]
805 async fn set_edge_group_policy(
806 &self,
807 flow_id: &FlowId,
808 downstream_execution_id: &ExecutionId,
809 policy: EdgeDependencyPolicy,
810 ) -> Result<SetEdgeGroupPolicyResult, EngineError> {
811 flow::set_edge_group_policy(
812 &self.pool,
813 &self.partition_config,
814 flow_id,
815 downstream_execution_id,
816 policy,
817 )
818 .await
819 }
820
821 #[tracing::instrument(name = "pg.report_usage", skip_all)]
824 async fn report_usage(
825 &self,
826 _handle: &Handle,
827 budget: &BudgetId,
828 dimensions: UsageDimensions,
829 ) -> Result<ReportUsageResult, EngineError> {
830 budget::report_usage_impl(&self.pool, &self.partition_config, budget, dimensions).await
831 }
832
833 #[tracing::instrument(name = "pg.rotate_waitpoint_hmac_secret_all", skip_all)]
840 async fn rotate_waitpoint_hmac_secret_all(
841 &self,
842 args: RotateWaitpointHmacSecretAllArgs,
843 ) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
844 let now_ms = std::time::SystemTime::now()
849 .duration_since(std::time::UNIX_EPOCH)
850 .map(|d| d.as_millis() as i64)
851 .unwrap_or(0);
852 signal::rotate_waitpoint_hmac_secret_all_impl(&self.pool, args, now_ms).await
853 }
854
855 #[tracing::instrument(name = "pg.seed_waitpoint_hmac_secret", skip_all)]
856 async fn seed_waitpoint_hmac_secret(
857 &self,
858 args: SeedWaitpointHmacSecretArgs,
859 ) -> Result<SeedOutcome, EngineError> {
860 let now_ms = std::time::SystemTime::now()
864 .duration_since(std::time::UNIX_EPOCH)
865 .map(|d| d.as_millis() as i64)
866 .unwrap_or(0);
867 signal::seed_waitpoint_hmac_secret_impl(&self.pool, args, now_ms).await
868 }
869
870 #[cfg(feature = "streaming")]
873 #[tracing::instrument(name = "pg.read_stream", skip_all)]
874 async fn read_stream(
875 &self,
876 execution_id: &ExecutionId,
877 attempt_index: AttemptIndex,
878 from: StreamCursor,
879 to: StreamCursor,
880 count_limit: u64,
881 ) -> Result<StreamFrames, EngineError> {
882 stream::read_stream(&self.pool, execution_id, attempt_index, from, to, count_limit).await
883 }
884
885 #[cfg(feature = "streaming")]
886 #[tracing::instrument(name = "pg.tail_stream", skip_all)]
887 async fn tail_stream(
888 &self,
889 execution_id: &ExecutionId,
890 attempt_index: AttemptIndex,
891 after: StreamCursor,
892 block_ms: u64,
893 count_limit: u64,
894 visibility: TailVisibility,
895 ) -> Result<StreamFrames, EngineError> {
896 let notifier = self
897 .stream_notifier
898 .as_ref()
899 .ok_or(EngineError::Unavailable {
900 op: "pg.tail_stream (notifier not initialised)",
901 })?;
902 stream::tail_stream(
903 &self.pool,
904 notifier,
905 execution_id,
906 attempt_index,
907 after,
908 block_ms,
909 count_limit,
910 visibility,
911 )
912 .await
913 }
914
915 #[cfg(feature = "streaming")]
916 #[tracing::instrument(name = "pg.read_summary", skip_all)]
917 async fn read_summary(
918 &self,
919 execution_id: &ExecutionId,
920 attempt_index: AttemptIndex,
921 ) -> Result<Option<SummaryDocument>, EngineError> {
922 stream::read_summary(&self.pool, execution_id, attempt_index).await
923 }
924
925 #[tracing::instrument(name = "pg.subscribe_completion", skip_all)]
939 async fn subscribe_completion(
940 &self,
941 _cursor: ff_core::stream_subscribe::StreamCursor,
942 filter: &ff_core::backend::ScannerFilter,
943 ) -> Result<ff_core::stream_events::CompletionSubscription, EngineError> {
944 use ff_core::stream_events::{CompletionEvent, CompletionOutcome};
945 use ff_core::stream_subscribe::encode_postgres_event_cursor;
946 use futures_core::Stream;
947 use std::pin::Pin;
948 use std::task::{Context, Poll};
949
950 let inner = if filter.is_noop() {
957 ff_core::completion_backend::CompletionBackend::subscribe_completions(self).await?
958 } else {
959 ff_core::completion_backend::CompletionBackend::subscribe_completions_filtered(
960 self, filter,
961 )
962 .await?
963 };
964
965 struct Adapter {
966 inner: ff_core::completion_backend::CompletionStream,
967 }
968
969 impl Stream for Adapter {
970 type Item = Result<CompletionEvent, EngineError>;
971 fn poll_next(
972 mut self: Pin<&mut Self>,
973 cx: &mut Context<'_>,
974 ) -> Poll<Option<Self::Item>> {
975 match Pin::new(&mut self.inner).poll_next(cx) {
976 Poll::Pending => Poll::Pending,
977 Poll::Ready(None) => Poll::Ready(None),
978 Poll::Ready(Some(payload)) => {
979 let cursor = encode_postgres_event_cursor(0);
984 let event = CompletionEvent::new(
985 cursor,
986 payload.execution_id.clone(),
987 CompletionOutcome::from_wire(&payload.outcome),
988 payload.produced_at_ms,
989 );
990 Poll::Ready(Some(Ok(event)))
991 }
992 }
993 }
994 }
995
996 Ok(Box::pin(Adapter { inner }))
997 }
998
999 #[tracing::instrument(name = "pg.subscribe_lease_history", skip_all)]
1013 async fn subscribe_lease_history(
1014 &self,
1015 cursor: ff_core::stream_subscribe::StreamCursor,
1016 filter: &ff_core::backend::ScannerFilter,
1017 ) -> Result<ff_core::stream_events::LeaseHistorySubscription, EngineError> {
1018 lease_event_subscribe::subscribe(&self.pool, 0, cursor, filter.clone()).await
1019 }
1020
1021 #[tracing::instrument(name = "pg.subscribe_signal_delivery", skip_all)]
1031 async fn subscribe_signal_delivery(
1032 &self,
1033 cursor: ff_core::stream_subscribe::StreamCursor,
1034 filter: &ff_core::backend::ScannerFilter,
1035 ) -> Result<ff_core::stream_events::SignalDeliverySubscription, EngineError> {
1036 signal_delivery_subscribe::subscribe(&self.pool, 0, cursor, filter.clone()).await
1037 }
1038}
1039
1040const MIN_MAX_LOCKS_PER_TRANSACTION: i64 = 256;
1049
1050async fn warn_if_max_locks_low(pool: &PgPool) {
1055 let row: Result<(String,), sqlx::Error> =
1056 sqlx::query_as("SHOW max_locks_per_transaction")
1057 .fetch_one(pool)
1058 .await;
1059 match row {
1060 Ok((raw,)) => emit_max_locks_decision(&raw),
1061 Err(e) => {
1062 tracing::debug!("failed to probe max_locks_per_transaction: {e}");
1063 }
1064 }
1065}
1066
1067fn max_locks_warn_value(raw: &str) -> Option<i64> {
1073 match raw.parse::<i64>() {
1074 Ok(v) if v < MIN_MAX_LOCKS_PER_TRANSACTION => Some(v),
1075 Ok(_) => None,
1076 Err(e) => {
1077 tracing::debug!(raw, "failed to parse max_locks_per_transaction: {e}");
1078 None
1079 }
1080 }
1081}
1082
1083fn emit_max_locks_decision(raw: &str) {
1084 if let Some(v) = max_locks_warn_value(raw) {
1085 tracing::warn!(
1086 current = v,
1087 recommended = MIN_MAX_LOCKS_PER_TRANSACTION,
1088 "postgres max_locks_per_transaction={v} is below the recommended \
1089 minimum ({MIN_MAX_LOCKS_PER_TRANSACTION}); partition-heavy workloads \
1090 may hit 'out of shared memory' under concurrent load. \
1091 See docs/operator-guide-postgres.md."
1092 );
1093 }
1094}
1095
1096#[cfg(test)]
1097mod max_locks_tests {
1098 use super::{max_locks_warn_value, MIN_MAX_LOCKS_PER_TRANSACTION};
1099
1100 #[test]
1101 fn warns_when_below_threshold() {
1102 assert_eq!(max_locks_warn_value("64"), Some(64));
1103 assert_eq!(
1104 max_locks_warn_value(&(MIN_MAX_LOCKS_PER_TRANSACTION - 1).to_string()),
1105 Some(MIN_MAX_LOCKS_PER_TRANSACTION - 1)
1106 );
1107 }
1108
1109 #[test]
1110 fn silent_at_or_above_threshold() {
1111 assert_eq!(
1112 max_locks_warn_value(&MIN_MAX_LOCKS_PER_TRANSACTION.to_string()),
1113 None
1114 );
1115 assert_eq!(max_locks_warn_value("1024"), None);
1116 }
1117
1118 #[test]
1119 fn silent_for_unparseable_raw() {
1120 assert_eq!(max_locks_warn_value("not-a-number"), None);
1121 }
1122}