1#![allow(clippy::result_large_err)]
20
21use std::sync::Arc;
22use std::time::Duration;
23
24use async_trait::async_trait;
25
26use ff_core::backend::{
27 AppendFrameOutcome, BackendConfig, CancelFlowPolicy, CancelFlowWait, CapabilitySet,
28 ClaimPolicy, FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal,
29 PendingWaitpoint, ReclaimToken, ResumeSignal, SummaryDocument, TailVisibility,
30 UsageDimensions,
31};
32#[cfg(feature = "core")]
33use ff_core::contracts::{
34 AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
35 ApplyDependencyToChildResult, ClaimResumedExecutionArgs, ClaimResumedExecutionResult,
36 CreateExecutionArgs, CreateExecutionResult, CreateFlowArgs, CreateFlowResult,
37 DeliverSignalArgs, DeliverSignalResult, EdgeDependencyPolicy, EdgeDirection, EdgeSnapshot,
38 ListExecutionsPage, ListFlowsPage, ListLanesPage, ListSuspendedPage,
39 SetEdgeGroupPolicyResult, StageDependencyEdgeArgs, StageDependencyEdgeResult,
40};
41#[cfg(feature = "core")]
42use ff_core::state::PublicState;
43use ff_core::contracts::{
44 CancelFlowResult, ExecutionSnapshot, FlowSnapshot, ReportUsageResult,
45 RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretAllResult, SuspendArgs,
46 SuspendOutcome,
47};
48#[cfg(feature = "streaming")]
49use ff_core::contracts::{StreamCursor, StreamFrames};
50use ff_core::engine_backend::EngineBackend;
51use ff_core::engine_error::EngineError;
52#[cfg(feature = "core")]
53use ff_core::partition::PartitionKey;
54use ff_core::partition::PartitionConfig;
55#[cfg(feature = "streaming")]
56use ff_core::types::AttemptIndex;
57#[cfg(feature = "core")]
58use ff_core::types::EdgeId;
59use ff_core::types::{BudgetId, ExecutionId, FlowId, LaneId, TimestampMs};
60pub use sqlx::PgPool;
64
65#[cfg(feature = "core")]
66mod admin;
67pub mod attempt;
68pub mod budget;
69pub mod completion;
70#[cfg(feature = "core")]
71pub mod dispatch;
72pub mod error;
73pub mod exec_core;
74pub mod flow;
75#[cfg(feature = "core")]
76pub mod flow_staging;
77pub mod handle_codec;
78pub mod listener;
79pub mod migrate;
80pub mod pool;
81#[cfg(feature = "core")]
82pub mod reconcilers;
83#[cfg(feature = "core")]
84pub mod scanner_supervisor;
85#[cfg(feature = "core")]
86pub mod scheduler;
87pub mod signal;
88#[cfg(feature = "streaming")]
89pub mod stream;
90pub mod suspend;
91pub mod suspend_ops;
92pub mod version;
93
94pub use completion::{PostgresCompletionStream, COMPLETION_CHANNEL};
95pub use error::{map_sqlx_error, PostgresTransportError};
96pub use listener::StreamNotifier;
97pub use migrate::{apply_migrations, MigrationError};
98#[cfg(feature = "core")]
99pub use scanner_supervisor::{PostgresScannerConfig, PostgresScannerHandle};
100pub use version::check_schema_version;
101
102pub use ff_core::backend::PostgresConnection;
108
109pub struct PostgresBackend {
118 #[allow(dead_code)] pool: PgPool,
120 #[allow(dead_code)]
121 partition_config: PartitionConfig,
122 #[allow(dead_code)]
123 metrics: Option<Arc<ff_observability::Metrics>>,
124 #[allow(dead_code)]
128 stream_notifier: Option<Arc<StreamNotifier>>,
129 #[cfg(feature = "core")]
135 scanner_handle: Option<Arc<scanner_supervisor::PostgresScannerHandle>>,
136}
137
138impl PostgresBackend {
139 pub async fn connect(config: BackendConfig) -> Result<Arc<dyn EngineBackend>, EngineError> {
154 let pool = pool::build_pool(&config).await?;
155 warn_if_max_locks_low(&pool).await;
156 let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
157 let backend = Self {
158 pool,
159 partition_config: PartitionConfig::default(),
160 metrics: None,
161 stream_notifier,
162 #[cfg(feature = "core")]
163 scanner_handle: None,
164 };
165 Ok(Arc::new(backend))
166 }
167
168 pub fn from_pool(pool: PgPool, partition_config: PartitionConfig) -> Arc<Self> {
174 let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
175 Arc::new(Self {
176 pool,
177 partition_config,
178 metrics: None,
179 stream_notifier,
180 #[cfg(feature = "core")]
181 scanner_handle: None,
182 })
183 }
184
185 pub async fn connect_with_metrics(
199 config: BackendConfig,
200 partition_config: PartitionConfig,
201 metrics: Arc<ff_observability::Metrics>,
202 ) -> Result<Arc<Self>, EngineError> {
203 let pool = pool::build_pool(&config).await?;
204 warn_if_max_locks_low(&pool).await;
205 let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
206 Ok(Arc::new(Self {
207 pool,
208 partition_config,
209 metrics: Some(metrics),
210 stream_notifier,
211 #[cfg(feature = "core")]
212 scanner_handle: None,
213 }))
214 }
215
216 #[cfg(feature = "core")]
223 pub fn with_scanners(
224 self: &mut Arc<Self>,
225 cfg: scanner_supervisor::PostgresScannerConfig,
226 ) -> bool {
227 let Some(inner) = Arc::get_mut(self) else {
228 return false;
229 };
230 let handle = scanner_supervisor::spawn_scanners(inner.pool.clone(), cfg);
231 inner.scanner_handle = Some(Arc::new(handle));
232 true
233 }
234
235 pub fn pool(&self) -> &PgPool {
240 &self.pool
241 }
242
243 #[cfg(feature = "core")]
256 #[tracing::instrument(name = "pg.create_execution", skip_all)]
257 pub async fn create_execution(
258 &self,
259 args: CreateExecutionArgs,
260 ) -> Result<ExecutionId, EngineError> {
261 exec_core::create_execution_impl(&self.pool, &self.partition_config, args).await
262 }
263
264 #[cfg(feature = "core")]
272 #[tracing::instrument(name = "pg.create_flow.inherent", skip_all)]
273 pub async fn create_flow(
274 &self,
275 args: &CreateFlowArgs,
276 ) -> Result<CreateFlowResult, EngineError> {
277 flow_staging::create_flow(&self.pool, &self.partition_config, args).await
278 }
279
280 #[cfg(feature = "core")]
281 #[tracing::instrument(name = "pg.add_execution_to_flow.inherent", skip_all)]
282 pub async fn add_execution_to_flow(
283 &self,
284 args: &AddExecutionToFlowArgs,
285 ) -> Result<AddExecutionToFlowResult, EngineError> {
286 flow_staging::add_execution_to_flow(&self.pool, &self.partition_config, args).await
287 }
288
289 #[cfg(feature = "core")]
290 #[tracing::instrument(name = "pg.stage_dependency_edge.inherent", skip_all)]
291 pub async fn stage_dependency_edge(
292 &self,
293 args: &StageDependencyEdgeArgs,
294 ) -> Result<StageDependencyEdgeResult, EngineError> {
295 flow_staging::stage_dependency_edge(&self.pool, &self.partition_config, args).await
296 }
297
298 #[cfg(feature = "core")]
299 #[tracing::instrument(name = "pg.apply_dependency_to_child.inherent", skip_all)]
300 pub async fn apply_dependency_to_child(
301 &self,
302 args: &ApplyDependencyToChildArgs,
303 ) -> Result<ApplyDependencyToChildResult, EngineError> {
304 flow_staging::apply_dependency_to_child(&self.pool, &self.partition_config, args).await
305 }
306}
307
308#[inline]
312fn unavailable<T>(op: &'static str) -> Result<T, EngineError> {
313 Err(EngineError::Unavailable { op })
314}
315
316#[async_trait]
317impl EngineBackend for PostgresBackend {
318 #[tracing::instrument(name = "pg.claim", skip_all)]
321 async fn claim(
322 &self,
323 lane: &LaneId,
324 capabilities: &CapabilitySet,
325 policy: ClaimPolicy,
326 ) -> Result<Option<Handle>, EngineError> {
327 attempt::claim(&self.pool, lane, capabilities, &policy).await
328 }
329
330 #[tracing::instrument(name = "pg.renew", skip_all)]
331 async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError> {
332 attempt::renew(&self.pool, handle).await
333 }
334
335 #[tracing::instrument(name = "pg.progress", skip_all)]
336 async fn progress(
337 &self,
338 handle: &Handle,
339 percent: Option<u8>,
340 message: Option<String>,
341 ) -> Result<(), EngineError> {
342 attempt::progress(&self.pool, handle, percent, message).await
343 }
344
345 #[tracing::instrument(name = "pg.append_frame", skip_all)]
346 async fn append_frame(
347 &self,
348 handle: &Handle,
349 frame: Frame,
350 ) -> Result<AppendFrameOutcome, EngineError> {
351 #[cfg(feature = "streaming")]
352 {
353 stream::append_frame(&self.pool, &self.partition_config, handle, frame).await
354 }
355 #[cfg(not(feature = "streaming"))]
356 {
357 let _ = (handle, frame);
358 unavailable("pg.append_frame")
359 }
360 }
361
362 #[tracing::instrument(name = "pg.complete", skip_all)]
363 async fn complete(
364 &self,
365 handle: &Handle,
366 payload: Option<Vec<u8>>,
367 ) -> Result<(), EngineError> {
368 attempt::complete(&self.pool, handle, payload).await
369 }
370
371 #[tracing::instrument(name = "pg.fail", skip_all)]
372 async fn fail(
373 &self,
374 handle: &Handle,
375 reason: FailureReason,
376 classification: FailureClass,
377 ) -> Result<FailOutcome, EngineError> {
378 attempt::fail(&self.pool, handle, reason, classification).await
379 }
380
381 #[tracing::instrument(name = "pg.cancel", skip_all)]
382 async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError> {
383 let payload = handle_codec::decode_handle(handle)?;
384 exec_core::cancel_impl(
385 &self.pool,
386 &self.partition_config,
387 &payload.execution_id,
388 reason,
389 )
390 .await
391 }
392
393 #[tracing::instrument(name = "pg.suspend", skip_all)]
394 async fn suspend(
395 &self,
396 handle: &Handle,
397 args: SuspendArgs,
398 ) -> Result<SuspendOutcome, EngineError> {
399 suspend_ops::suspend_impl(&self.pool, &self.partition_config, handle, args).await
400 }
401
402 #[tracing::instrument(name = "pg.create_waitpoint", skip_all)]
403 async fn create_waitpoint(
404 &self,
405 _handle: &Handle,
406 _waitpoint_key: &str,
407 _expires_in: Duration,
408 ) -> Result<PendingWaitpoint, EngineError> {
409 unavailable("pg.create_waitpoint")
410 }
411
412 #[tracing::instrument(name = "pg.observe_signals", skip_all)]
413 async fn observe_signals(
414 &self,
415 handle: &Handle,
416 ) -> Result<Vec<ResumeSignal>, EngineError> {
417 suspend_ops::observe_signals_impl(&self.pool, handle).await
418 }
419
420 #[tracing::instrument(name = "pg.claim_from_reclaim", skip_all)]
421 async fn claim_from_reclaim(
422 &self,
423 token: ReclaimToken,
424 ) -> Result<Option<Handle>, EngineError> {
425 attempt::claim_from_reclaim(&self.pool, token).await
426 }
427
428 #[tracing::instrument(name = "pg.delay", skip_all)]
429 async fn delay(
430 &self,
431 handle: &Handle,
432 delay_until: TimestampMs,
433 ) -> Result<(), EngineError> {
434 attempt::delay(&self.pool, handle, delay_until).await
435 }
436
437 #[tracing::instrument(name = "pg.wait_children", skip_all)]
438 async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError> {
439 attempt::wait_children(&self.pool, handle).await
440 }
441
442 #[tracing::instrument(name = "pg.describe_execution", skip_all)]
445 async fn describe_execution(
446 &self,
447 id: &ExecutionId,
448 ) -> Result<Option<ExecutionSnapshot>, EngineError> {
449 exec_core::describe_execution_impl(&self.pool, &self.partition_config, id).await
450 }
451
452 #[tracing::instrument(name = "pg.describe_flow", skip_all)]
453 async fn describe_flow(
454 &self,
455 id: &FlowId,
456 ) -> Result<Option<FlowSnapshot>, EngineError> {
457 flow::describe_flow(&self.pool, &self.partition_config, id).await
458 }
459
460 #[cfg(feature = "core")]
461 #[tracing::instrument(name = "pg.list_edges", skip_all)]
462 async fn list_edges(
463 &self,
464 flow_id: &FlowId,
465 direction: EdgeDirection,
466 ) -> Result<Vec<EdgeSnapshot>, EngineError> {
467 flow::list_edges(&self.pool, &self.partition_config, flow_id, direction).await
468 }
469
470 #[cfg(feature = "core")]
471 #[tracing::instrument(name = "pg.describe_edge", skip_all)]
472 async fn describe_edge(
473 &self,
474 flow_id: &FlowId,
475 edge_id: &EdgeId,
476 ) -> Result<Option<EdgeSnapshot>, EngineError> {
477 flow::describe_edge(&self.pool, &self.partition_config, flow_id, edge_id).await
478 }
479
480 #[cfg(feature = "core")]
481 #[tracing::instrument(name = "pg.resolve_execution_flow_id", skip_all)]
482 async fn resolve_execution_flow_id(
483 &self,
484 eid: &ExecutionId,
485 ) -> Result<Option<FlowId>, EngineError> {
486 exec_core::resolve_execution_flow_id_impl(&self.pool, &self.partition_config, eid).await
487 }
488
489 #[cfg(feature = "core")]
490 #[tracing::instrument(name = "pg.list_flows", skip_all)]
491 async fn list_flows(
492 &self,
493 partition: PartitionKey,
494 cursor: Option<FlowId>,
495 limit: usize,
496 ) -> Result<ListFlowsPage, EngineError> {
497 flow::list_flows(&self.pool, partition, cursor, limit).await
498 }
499
500 #[cfg(feature = "core")]
501 #[tracing::instrument(name = "pg.list_lanes", skip_all)]
502 async fn list_lanes(
503 &self,
504 cursor: Option<LaneId>,
505 limit: usize,
506 ) -> Result<ListLanesPage, EngineError> {
507 admin::list_lanes_impl(&self.pool, cursor, limit).await
508 }
509
510 #[cfg(feature = "core")]
511 #[tracing::instrument(name = "pg.list_suspended", skip_all)]
512 async fn list_suspended(
513 &self,
514 partition: PartitionKey,
515 cursor: Option<ExecutionId>,
516 limit: usize,
517 ) -> Result<ListSuspendedPage, EngineError> {
518 admin::list_suspended_impl(&self.pool, partition, cursor, limit).await
519 }
520
521 #[cfg(feature = "core")]
522 #[tracing::instrument(name = "pg.list_executions", skip_all)]
523 async fn list_executions(
524 &self,
525 partition: PartitionKey,
526 cursor: Option<ExecutionId>,
527 limit: usize,
528 ) -> Result<ListExecutionsPage, EngineError> {
529 exec_core::list_executions_impl(
530 &self.pool,
531 &self.partition_config,
532 partition,
533 cursor,
534 limit,
535 )
536 .await
537 }
538
539 #[cfg(feature = "core")]
542 #[tracing::instrument(name = "pg.deliver_signal", skip_all)]
543 async fn deliver_signal(
544 &self,
545 args: DeliverSignalArgs,
546 ) -> Result<DeliverSignalResult, EngineError> {
547 suspend_ops::deliver_signal_impl(&self.pool, &self.partition_config, args).await
548 }
549
550 #[cfg(feature = "core")]
551 #[tracing::instrument(name = "pg.claim_resumed_execution", skip_all)]
552 async fn claim_resumed_execution(
553 &self,
554 args: ClaimResumedExecutionArgs,
555 ) -> Result<ClaimResumedExecutionResult, EngineError> {
556 suspend_ops::claim_resumed_execution_impl(&self.pool, &self.partition_config, args).await
557 }
558
559 #[cfg(feature = "core")]
570 #[tracing::instrument(name = "pg.create_execution.trait", skip_all)]
571 async fn create_execution(
572 &self,
573 args: CreateExecutionArgs,
574 ) -> Result<CreateExecutionResult, EngineError> {
575 let eid = args.execution_id.clone();
576 exec_core::create_execution_impl(&self.pool, &self.partition_config, args).await?;
577 Ok(CreateExecutionResult::Created {
578 execution_id: eid,
579 public_state: PublicState::Waiting,
580 })
581 }
582
583 #[cfg(feature = "core")]
584 #[tracing::instrument(name = "pg.create_flow", skip_all)]
585 async fn create_flow(
586 &self,
587 args: CreateFlowArgs,
588 ) -> Result<CreateFlowResult, EngineError> {
589 flow_staging::create_flow(&self.pool, &self.partition_config, &args).await
590 }
591
592 #[cfg(feature = "core")]
593 #[tracing::instrument(name = "pg.add_execution_to_flow", skip_all)]
594 async fn add_execution_to_flow(
595 &self,
596 args: AddExecutionToFlowArgs,
597 ) -> Result<AddExecutionToFlowResult, EngineError> {
598 flow_staging::add_execution_to_flow(&self.pool, &self.partition_config, &args).await
599 }
600
601 #[cfg(feature = "core")]
602 #[tracing::instrument(name = "pg.stage_dependency_edge", skip_all)]
603 async fn stage_dependency_edge(
604 &self,
605 args: StageDependencyEdgeArgs,
606 ) -> Result<StageDependencyEdgeResult, EngineError> {
607 flow_staging::stage_dependency_edge(&self.pool, &self.partition_config, &args).await
608 }
609
610 #[cfg(feature = "core")]
611 #[tracing::instrument(name = "pg.apply_dependency_to_child", skip_all)]
612 async fn apply_dependency_to_child(
613 &self,
614 args: ApplyDependencyToChildArgs,
615 ) -> Result<ApplyDependencyToChildResult, EngineError> {
616 flow_staging::apply_dependency_to_child(&self.pool, &self.partition_config, &args).await
617 }
618
619 fn backend_label(&self) -> &'static str {
620 "postgres"
621 }
622
623 #[cfg(feature = "core")]
631 #[tracing::instrument(name = "pg.claim_for_worker", skip_all)]
632 async fn claim_for_worker(
633 &self,
634 args: ff_core::contracts::ClaimForWorkerArgs,
635 ) -> Result<ff_core::contracts::ClaimForWorkerOutcome, EngineError> {
636 let sched = scheduler::PostgresScheduler::new(self.pool.clone());
637 let grant_opt = sched
638 .claim_for_worker(
639 &args.lane_id,
640 &args.worker_id,
641 &args.worker_instance_id,
642 &args.worker_capabilities,
643 args.grant_ttl_ms,
644 )
645 .await?;
646 Ok(match grant_opt {
647 Some(g) => ff_core::contracts::ClaimForWorkerOutcome::granted(g),
648 None => ff_core::contracts::ClaimForWorkerOutcome::no_work(),
649 })
650 }
651
652 async fn ping(&self) -> Result<(), EngineError> {
653 let _ = sqlx::query_scalar::<_, i32>("SELECT 1")
657 .fetch_one(&self.pool)
658 .await
659 .map_err(error::map_sqlx_error)?;
660 Ok(())
661 }
662
663 async fn shutdown_prepare(&self, grace: Duration) -> Result<(), EngineError> {
668 #[cfg(feature = "core")]
669 if let Some(handle) = self.scanner_handle.as_ref() {
670 let timed_out = handle.shutdown(grace).await;
671 if timed_out > 0 {
672 tracing::warn!(
673 timed_out,
674 ?grace,
675 "postgres scanner supervisor exceeded grace on shutdown"
676 );
677 }
678 }
679 Ok(())
680 }
681
682 #[tracing::instrument(name = "pg.cancel_flow", skip_all)]
683 async fn cancel_flow(
684 &self,
685 id: &FlowId,
686 policy: CancelFlowPolicy,
687 wait: CancelFlowWait,
688 ) -> Result<CancelFlowResult, EngineError> {
689 flow::cancel_flow(&self.pool, &self.partition_config, id, policy, wait).await
690 }
691
692 #[cfg(feature = "core")]
693 #[tracing::instrument(name = "pg.set_edge_group_policy", skip_all)]
694 async fn set_edge_group_policy(
695 &self,
696 flow_id: &FlowId,
697 downstream_execution_id: &ExecutionId,
698 policy: EdgeDependencyPolicy,
699 ) -> Result<SetEdgeGroupPolicyResult, EngineError> {
700 flow::set_edge_group_policy(
701 &self.pool,
702 &self.partition_config,
703 flow_id,
704 downstream_execution_id,
705 policy,
706 )
707 .await
708 }
709
710 #[tracing::instrument(name = "pg.report_usage", skip_all)]
713 async fn report_usage(
714 &self,
715 _handle: &Handle,
716 budget: &BudgetId,
717 dimensions: UsageDimensions,
718 ) -> Result<ReportUsageResult, EngineError> {
719 budget::report_usage_impl(&self.pool, &self.partition_config, budget, dimensions).await
720 }
721
722 #[tracing::instrument(name = "pg.rotate_waitpoint_hmac_secret_all", skip_all)]
729 async fn rotate_waitpoint_hmac_secret_all(
730 &self,
731 args: RotateWaitpointHmacSecretAllArgs,
732 ) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
733 let now_ms = std::time::SystemTime::now()
738 .duration_since(std::time::UNIX_EPOCH)
739 .map(|d| d.as_millis() as i64)
740 .unwrap_or(0);
741 signal::rotate_waitpoint_hmac_secret_all_impl(&self.pool, args, now_ms).await
742 }
743
744 #[cfg(feature = "streaming")]
747 #[tracing::instrument(name = "pg.read_stream", skip_all)]
748 async fn read_stream(
749 &self,
750 execution_id: &ExecutionId,
751 attempt_index: AttemptIndex,
752 from: StreamCursor,
753 to: StreamCursor,
754 count_limit: u64,
755 ) -> Result<StreamFrames, EngineError> {
756 stream::read_stream(&self.pool, execution_id, attempt_index, from, to, count_limit).await
757 }
758
759 #[cfg(feature = "streaming")]
760 #[tracing::instrument(name = "pg.tail_stream", skip_all)]
761 async fn tail_stream(
762 &self,
763 execution_id: &ExecutionId,
764 attempt_index: AttemptIndex,
765 after: StreamCursor,
766 block_ms: u64,
767 count_limit: u64,
768 visibility: TailVisibility,
769 ) -> Result<StreamFrames, EngineError> {
770 let notifier = self
771 .stream_notifier
772 .as_ref()
773 .ok_or(EngineError::Unavailable {
774 op: "pg.tail_stream (notifier not initialised)",
775 })?;
776 stream::tail_stream(
777 &self.pool,
778 notifier,
779 execution_id,
780 attempt_index,
781 after,
782 block_ms,
783 count_limit,
784 visibility,
785 )
786 .await
787 }
788
789 #[cfg(feature = "streaming")]
790 #[tracing::instrument(name = "pg.read_summary", skip_all)]
791 async fn read_summary(
792 &self,
793 execution_id: &ExecutionId,
794 attempt_index: AttemptIndex,
795 ) -> Result<Option<SummaryDocument>, EngineError> {
796 stream::read_summary(&self.pool, execution_id, attempt_index).await
797 }
798}
799
800const MIN_MAX_LOCKS_PER_TRANSACTION: i64 = 256;
809
810async fn warn_if_max_locks_low(pool: &PgPool) {
815 let row: Result<(String,), sqlx::Error> =
816 sqlx::query_as("SHOW max_locks_per_transaction")
817 .fetch_one(pool)
818 .await;
819 match row {
820 Ok((raw,)) => emit_max_locks_decision(&raw),
821 Err(e) => {
822 tracing::debug!("failed to probe max_locks_per_transaction: {e}");
823 }
824 }
825}
826
827fn max_locks_warn_value(raw: &str) -> Option<i64> {
833 match raw.parse::<i64>() {
834 Ok(v) if v < MIN_MAX_LOCKS_PER_TRANSACTION => Some(v),
835 Ok(_) => None,
836 Err(e) => {
837 tracing::debug!(raw, "failed to parse max_locks_per_transaction: {e}");
838 None
839 }
840 }
841}
842
843fn emit_max_locks_decision(raw: &str) {
844 if let Some(v) = max_locks_warn_value(raw) {
845 tracing::warn!(
846 current = v,
847 recommended = MIN_MAX_LOCKS_PER_TRANSACTION,
848 "postgres max_locks_per_transaction={v} is below the recommended \
849 minimum ({MIN_MAX_LOCKS_PER_TRANSACTION}); partition-heavy workloads \
850 may hit 'out of shared memory' under concurrent load. \
851 See docs/operator-guide-postgres.md."
852 );
853 }
854}
855
856#[cfg(test)]
857mod max_locks_tests {
858 use super::{max_locks_warn_value, MIN_MAX_LOCKS_PER_TRANSACTION};
859
860 #[test]
861 fn warns_when_below_threshold() {
862 assert_eq!(max_locks_warn_value("64"), Some(64));
863 assert_eq!(
864 max_locks_warn_value(&(MIN_MAX_LOCKS_PER_TRANSACTION - 1).to_string()),
865 Some(MIN_MAX_LOCKS_PER_TRANSACTION - 1)
866 );
867 }
868
869 #[test]
870 fn silent_at_or_above_threshold() {
871 assert_eq!(
872 max_locks_warn_value(&MIN_MAX_LOCKS_PER_TRANSACTION.to_string()),
873 None
874 );
875 assert_eq!(max_locks_warn_value("1024"), None);
876 }
877
878 #[test]
879 fn silent_for_unparseable_raw() {
880 assert_eq!(max_locks_warn_value("not-a-number"), None);
881 }
882}