1#![allow(clippy::result_large_err)]
20
21use std::sync::Arc;
22use std::time::Duration;
23
24use async_trait::async_trait;
25
26use ff_core::backend::{
27 AppendFrameOutcome, BackendConfig, CancelFlowPolicy, CancelFlowWait, CapabilitySet,
28 ClaimPolicy, FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal,
29 PendingWaitpoint, ResumeToken, ResumeSignal, SummaryDocument, TailVisibility,
30 UsageDimensions,
31};
32#[cfg(feature = "core")]
33use ff_core::contracts::{
34 AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
35 ApplyDependencyToChildResult, ClaimResumedExecutionArgs, ClaimResumedExecutionResult,
36 CreateExecutionArgs, CreateExecutionResult, CreateFlowArgs, CreateFlowResult,
37 DeliverApprovalSignalArgs, DeliverSignalArgs, DeliverSignalResult, EdgeDependencyPolicy,
38 EdgeDirection, EdgeSnapshot,
39 ListExecutionsPage, ListFlowsPage, ListLanesPage, ListPendingWaitpointsArgs,
40 ListPendingWaitpointsResult, ListSuspendedPage, SetEdgeGroupPolicyResult,
41 StageDependencyEdgeArgs, StageDependencyEdgeResult,
42};
43#[cfg(feature = "core")]
44use ff_core::state::PublicState;
45use ff_core::contracts::{
46 CancelFlowResult, ExecutionContext, ExecutionSnapshot, FlowSnapshot, IssueReclaimGrantArgs,
47 IssueReclaimGrantOutcome, ReclaimExecutionArgs, ReclaimExecutionOutcome, ReportUsageResult,
48 RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretAllResult, SeedOutcome,
49 SeedWaitpointHmacSecretArgs, SuspendArgs, SuspendOutcome,
50};
51#[cfg(feature = "core")]
52use ff_core::contracts::ExecutionInfo;
53#[cfg(feature = "core")]
55use ff_core::contracts::{
56 CancelExecutionArgs, CancelExecutionResult, RevokeLeaseArgs, RevokeLeaseResult,
57};
58#[cfg(feature = "core")]
60use ff_core::contracts::{
61 CancelFlowArgs, CancelFlowHeader, ChangePriorityArgs, ChangePriorityResult,
62 ReplayExecutionArgs, ReplayExecutionResult,
63};
64#[cfg(feature = "core")]
66use ff_core::contracts::{
67 BudgetStatus, CreateBudgetArgs, CreateBudgetResult, CreateQuotaPolicyArgs,
68 CreateQuotaPolicyResult, ReportUsageAdminArgs, ResetBudgetArgs, ResetBudgetResult,
69};
70#[cfg(feature = "streaming")]
71use ff_core::contracts::{StreamCursor, StreamFrames};
72use ff_core::engine_backend::{EngineBackend, ExpirePhase};
73use ff_core::engine_error::EngineError;
74#[cfg(feature = "core")]
75use ff_core::partition::PartitionKey;
76use ff_core::partition::{Partition, PartitionConfig};
77#[cfg(feature = "streaming")]
78use ff_core::types::AttemptIndex;
79#[cfg(feature = "core")]
80use ff_core::types::EdgeId;
81use ff_core::types::{BudgetId, ExecutionId, FlowId, LaneId, LeaseFence, TimestampMs};
82pub use sqlx::PgPool;
86
87#[cfg(feature = "core")]
88mod admin;
89pub mod attempt;
90pub mod budget;
91pub mod claim_grant;
92pub mod completion;
93#[cfg(feature = "core")]
94pub mod dispatch;
95pub mod error;
96pub mod exec_core;
97pub mod flow;
98#[cfg(feature = "core")]
99pub mod flow_staging;
100pub mod handle_codec;
101mod lease_event;
102mod lease_event_subscribe;
103pub mod listener;
104pub mod migrate;
105#[cfg(feature = "core")]
106pub mod operator;
107#[cfg(feature = "core")]
108mod operator_event;
109pub mod pool;
110#[cfg(feature = "core")]
111pub mod reconcilers;
112#[cfg(feature = "core")]
113pub mod scanner_supervisor;
114#[cfg(feature = "core")]
115pub mod scheduler;
116pub mod signal;
117mod signal_delivery_subscribe;
118mod signal_event;
119#[cfg(feature = "streaming")]
120pub mod stream;
121pub mod suspend;
122pub mod suspend_ops;
123#[cfg(feature = "core")]
124pub(crate) mod typed_ops;
125pub mod version;
126
127pub use completion::{PostgresCompletionStream, COMPLETION_CHANNEL};
128pub use error::{map_sqlx_error, PostgresTransportError};
129pub use listener::StreamNotifier;
130pub use migrate::{apply_migrations, MigrationError};
131#[cfg(feature = "core")]
132pub use scanner_supervisor::{PostgresScannerConfig, PostgresScannerHandle};
133pub use version::check_schema_version;
134
135pub use ff_core::backend::PostgresConnection;
141
142fn postgres_supports_base() -> ff_core::capability::Supports {
172 let mut s = ff_core::capability::Supports::none();
173
174 s.cancel_flow_wait_timeout = true;
176 s.cancel_flow_wait_indefinite = true;
177
178 s.rotate_waitpoint_hmac_secret_all = true;
180 s.seed_waitpoint_hmac_secret = true;
181
182 s.claim_for_worker = true;
184
185 s.subscribe_lease_history = true;
187 s.subscribe_completion = true;
188 s.subscribe_signal_delivery = true;
189 s.subscribe_instance_tags = false;
190
191 s.stream_durable_summary = true;
193 s.stream_best_effort_live = true;
194
195 s.prepare = true;
197
198 s.cancel_execution = true;
205 s.change_priority = true;
206 s.replay_execution = true;
207 s.revoke_lease = true;
208 s.read_execution_state = true;
209 s.read_execution_info = true;
210 s.get_execution_result = true;
211 s.budget_admin = true;
212 s.quota_admin = true;
213 s.list_pending_waitpoints = true;
214 s.cancel_flow_header = true;
215 s.ack_cancel_member = true;
216
217 s
218}
219
220pub struct PostgresBackend {
221 #[allow(dead_code)] pool: PgPool,
223 #[allow(dead_code)]
224 partition_config: PartitionConfig,
225 #[allow(dead_code)]
226 metrics: Option<Arc<ff_observability::Metrics>>,
227 #[allow(dead_code)]
231 stream_notifier: Option<Arc<StreamNotifier>>,
232 #[cfg(feature = "core")]
238 scanner_handle: Option<Arc<scanner_supervisor::PostgresScannerHandle>>,
239}
240
241impl PostgresBackend {
242 pub async fn connect(config: BackendConfig) -> Result<Arc<dyn EngineBackend>, EngineError> {
257 let pool = pool::build_pool(&config).await?;
258 warn_if_max_locks_low(&pool).await;
259 let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
260 let backend = Self {
261 pool,
262 partition_config: PartitionConfig::default(),
263 metrics: None,
264 stream_notifier,
265 #[cfg(feature = "core")]
266 scanner_handle: None,
267 };
268 Ok(Arc::new(backend))
269 }
270
271 pub fn from_pool(pool: PgPool, partition_config: PartitionConfig) -> Arc<Self> {
277 let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
278 Arc::new(Self {
279 pool,
280 partition_config,
281 metrics: None,
282 stream_notifier,
283 #[cfg(feature = "core")]
284 scanner_handle: None,
285 })
286 }
287
288 pub async fn connect_with_metrics(
302 config: BackendConfig,
303 partition_config: PartitionConfig,
304 metrics: Arc<ff_observability::Metrics>,
305 ) -> Result<Arc<Self>, EngineError> {
306 let pool = pool::build_pool(&config).await?;
307 warn_if_max_locks_low(&pool).await;
308 let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
309 Ok(Arc::new(Self {
310 pool,
311 partition_config,
312 metrics: Some(metrics),
313 stream_notifier,
314 #[cfg(feature = "core")]
315 scanner_handle: None,
316 }))
317 }
318
319 #[cfg(feature = "core")]
326 pub fn with_scanners(
327 self: &mut Arc<Self>,
328 cfg: scanner_supervisor::PostgresScannerConfig,
329 ) -> bool {
330 let Some(inner) = Arc::get_mut(self) else {
331 return false;
332 };
333 let handle = scanner_supervisor::spawn_scanners(inner.pool.clone(), cfg);
334 inner.scanner_handle = Some(Arc::new(handle));
335 true
336 }
337
338 pub fn pool(&self) -> &PgPool {
343 &self.pool
344 }
345
346 #[cfg(feature = "core")]
359 #[tracing::instrument(name = "pg.create_execution", skip_all)]
360 pub async fn create_execution(
361 &self,
362 args: CreateExecutionArgs,
363 ) -> Result<ExecutionId, EngineError> {
364 exec_core::create_execution_impl(&self.pool, &self.partition_config, args).await
365 }
366
367 #[cfg(feature = "core")]
375 #[tracing::instrument(name = "pg.create_flow.inherent", skip_all)]
376 pub async fn create_flow(
377 &self,
378 args: &CreateFlowArgs,
379 ) -> Result<CreateFlowResult, EngineError> {
380 flow_staging::create_flow(&self.pool, &self.partition_config, args).await
381 }
382
383 #[cfg(feature = "core")]
384 #[tracing::instrument(name = "pg.add_execution_to_flow.inherent", skip_all)]
385 pub async fn add_execution_to_flow(
386 &self,
387 args: &AddExecutionToFlowArgs,
388 ) -> Result<AddExecutionToFlowResult, EngineError> {
389 flow_staging::add_execution_to_flow(&self.pool, &self.partition_config, args).await
390 }
391
392 #[cfg(feature = "core")]
393 #[tracing::instrument(name = "pg.stage_dependency_edge.inherent", skip_all)]
394 pub async fn stage_dependency_edge(
395 &self,
396 args: &StageDependencyEdgeArgs,
397 ) -> Result<StageDependencyEdgeResult, EngineError> {
398 flow_staging::stage_dependency_edge(&self.pool, &self.partition_config, args).await
399 }
400
401 #[cfg(feature = "core")]
402 #[tracing::instrument(name = "pg.apply_dependency_to_child.inherent", skip_all)]
403 pub async fn apply_dependency_to_child(
404 &self,
405 args: &ApplyDependencyToChildArgs,
406 ) -> Result<ApplyDependencyToChildResult, EngineError> {
407 flow_staging::apply_dependency_to_child(&self.pool, &self.partition_config, args).await
408 }
409}
410
411#[inline]
415#[cfg_attr(feature = "streaming", allow(dead_code))]
416fn unavailable<T>(op: &'static str) -> Result<T, EngineError> {
417 Err(EngineError::Unavailable { op })
418}
419
420#[async_trait]
421impl EngineBackend for PostgresBackend {
422 #[tracing::instrument(name = "pg.claim", skip_all)]
425 async fn claim(
426 &self,
427 lane: &LaneId,
428 capabilities: &CapabilitySet,
429 policy: ClaimPolicy,
430 ) -> Result<Option<Handle>, EngineError> {
431 attempt::claim(&self.pool, lane, capabilities, &policy).await
432 }
433
434 #[tracing::instrument(name = "pg.renew", skip_all)]
435 async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError> {
436 attempt::renew(&self.pool, handle).await
437 }
438
439 #[tracing::instrument(name = "pg.progress", skip_all)]
440 async fn progress(
441 &self,
442 handle: &Handle,
443 percent: Option<u8>,
444 message: Option<String>,
445 ) -> Result<(), EngineError> {
446 attempt::progress(&self.pool, handle, percent, message).await
447 }
448
449 #[tracing::instrument(name = "pg.append_frame", skip_all)]
450 async fn append_frame(
451 &self,
452 handle: &Handle,
453 frame: Frame,
454 ) -> Result<AppendFrameOutcome, EngineError> {
455 #[cfg(feature = "streaming")]
456 {
457 stream::append_frame(&self.pool, &self.partition_config, handle, frame).await
458 }
459 #[cfg(not(feature = "streaming"))]
460 {
461 let _ = (handle, frame);
462 unavailable("pg.append_frame")
463 }
464 }
465
466 #[tracing::instrument(name = "pg.complete", skip_all)]
467 async fn complete(
468 &self,
469 handle: &Handle,
470 payload: Option<Vec<u8>>,
471 ) -> Result<(), EngineError> {
472 attempt::complete(&self.pool, handle, payload).await
473 }
474
475 #[tracing::instrument(name = "pg.fail", skip_all)]
476 async fn fail(
477 &self,
478 handle: &Handle,
479 reason: FailureReason,
480 classification: FailureClass,
481 ) -> Result<FailOutcome, EngineError> {
482 attempt::fail(&self.pool, handle, reason, classification).await
483 }
484
485 #[tracing::instrument(name = "pg.cancel", skip_all)]
486 async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError> {
487 let payload = handle_codec::decode_handle(handle)?;
488 exec_core::cancel_impl(
489 &self.pool,
490 &self.partition_config,
491 &payload.execution_id,
492 reason,
493 )
494 .await
495 }
496
497 #[tracing::instrument(name = "pg.suspend", skip_all)]
498 async fn suspend(
499 &self,
500 handle: &Handle,
501 args: SuspendArgs,
502 ) -> Result<SuspendOutcome, EngineError> {
503 suspend_ops::suspend_impl(&self.pool, &self.partition_config, handle, args).await
504 }
505
506 #[tracing::instrument(name = "pg.suspend_by_triple", skip_all)]
507 async fn suspend_by_triple(
508 &self,
509 exec_id: ExecutionId,
510 triple: LeaseFence,
511 args: SuspendArgs,
512 ) -> Result<SuspendOutcome, EngineError> {
513 suspend_ops::suspend_by_triple_impl(
514 &self.pool,
515 &self.partition_config,
516 exec_id,
517 triple,
518 args,
519 )
520 .await
521 }
522
523 #[tracing::instrument(name = "pg.create_waitpoint", skip_all)]
524 async fn create_waitpoint(
525 &self,
526 handle: &Handle,
527 waitpoint_key: &str,
528 expires_in: Duration,
529 ) -> Result<PendingWaitpoint, EngineError> {
530 suspend_ops::create_waitpoint_impl(&self.pool, handle, waitpoint_key, expires_in).await
531 }
532
533 #[cfg(feature = "core")]
534 #[tracing::instrument(name = "pg.read_waitpoint_token", skip_all)]
535 async fn read_waitpoint_token(
536 &self,
537 partition: PartitionKey,
538 waitpoint_id: &ff_core::types::WaitpointId,
539 ) -> Result<Option<String>, EngineError> {
540 suspend_ops::read_waitpoint_token_impl(&self.pool, &partition, waitpoint_id).await
541 }
542
543 #[tracing::instrument(name = "pg.observe_signals", skip_all)]
544 async fn observe_signals(
545 &self,
546 handle: &Handle,
547 ) -> Result<Vec<ResumeSignal>, EngineError> {
548 suspend_ops::observe_signals_impl(&self.pool, handle).await
549 }
550
551 #[tracing::instrument(name = "pg.claim_from_resume_grant", skip_all)]
552 async fn claim_from_resume_grant(
553 &self,
554 token: ResumeToken,
555 ) -> Result<Option<Handle>, EngineError> {
556 attempt::claim_from_resume_grant(&self.pool, token).await
557 }
558
559 #[tracing::instrument(name = "pg.issue_reclaim_grant", skip_all)]
562 async fn issue_reclaim_grant(
563 &self,
564 args: IssueReclaimGrantArgs,
565 ) -> Result<IssueReclaimGrantOutcome, EngineError> {
566 claim_grant::issue_reclaim_grant_impl(&self.pool, args).await
567 }
568
569 #[tracing::instrument(name = "pg.reclaim_execution", skip_all)]
570 async fn reclaim_execution(
571 &self,
572 args: ReclaimExecutionArgs,
573 ) -> Result<ReclaimExecutionOutcome, EngineError> {
574 claim_grant::reclaim_execution_impl(&self.pool, args).await
575 }
576
577 #[tracing::instrument(name = "pg.delay", skip_all)]
578 async fn delay(
579 &self,
580 handle: &Handle,
581 delay_until: TimestampMs,
582 ) -> Result<(), EngineError> {
583 attempt::delay(&self.pool, handle, delay_until).await
584 }
585
586 #[tracing::instrument(name = "pg.wait_children", skip_all)]
587 async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError> {
588 attempt::wait_children(&self.pool, handle).await
589 }
590
591 #[tracing::instrument(name = "pg.describe_execution", skip_all)]
594 async fn describe_execution(
595 &self,
596 id: &ExecutionId,
597 ) -> Result<Option<ExecutionSnapshot>, EngineError> {
598 exec_core::describe_execution_impl(&self.pool, &self.partition_config, id).await
599 }
600
601 #[tracing::instrument(name = "pg.read_execution_context", skip_all)]
602 async fn read_execution_context(
603 &self,
604 execution_id: &ExecutionId,
605 ) -> Result<ExecutionContext, EngineError> {
606 exec_core::read_execution_context_impl(&self.pool, &self.partition_config, execution_id)
607 .await
608 }
609
610 #[tracing::instrument(name = "pg.read_current_attempt_index", skip_all)]
611 async fn read_current_attempt_index(
612 &self,
613 execution_id: &ExecutionId,
614 ) -> Result<ff_core::types::AttemptIndex, EngineError> {
615 exec_core::read_current_attempt_index_impl(
616 &self.pool,
617 &self.partition_config,
618 execution_id,
619 )
620 .await
621 }
622
623 #[tracing::instrument(name = "pg.read_total_attempt_count", skip_all)]
624 async fn read_total_attempt_count(
625 &self,
626 execution_id: &ExecutionId,
627 ) -> Result<ff_core::types::AttemptIndex, EngineError> {
628 exec_core::read_total_attempt_count_impl(
629 &self.pool,
630 &self.partition_config,
631 execution_id,
632 )
633 .await
634 }
635
636 #[tracing::instrument(name = "pg.describe_flow", skip_all)]
637 async fn describe_flow(
638 &self,
639 id: &FlowId,
640 ) -> Result<Option<FlowSnapshot>, EngineError> {
641 flow::describe_flow(&self.pool, &self.partition_config, id).await
642 }
643
644 #[tracing::instrument(name = "pg.set_execution_tag", skip_all)]
645 async fn set_execution_tag(
646 &self,
647 execution_id: &ExecutionId,
648 key: &str,
649 value: &str,
650 ) -> Result<(), EngineError> {
651 ff_core::engine_backend::validate_tag_key(key)?;
652 exec_core::set_execution_tag_impl(&self.pool, execution_id, key, value).await
653 }
654
655 #[tracing::instrument(name = "pg.set_flow_tag", skip_all)]
656 async fn set_flow_tag(
657 &self,
658 flow_id: &FlowId,
659 key: &str,
660 value: &str,
661 ) -> Result<(), EngineError> {
662 ff_core::engine_backend::validate_tag_key(key)?;
663 flow::set_flow_tag_impl(&self.pool, &self.partition_config, flow_id, key, value).await
664 }
665
666 #[tracing::instrument(name = "pg.get_execution_tag", skip_all)]
667 async fn get_execution_tag(
668 &self,
669 execution_id: &ExecutionId,
670 key: &str,
671 ) -> Result<Option<String>, EngineError> {
672 ff_core::engine_backend::validate_tag_key(key)?;
673 exec_core::get_execution_tag_impl(&self.pool, execution_id, key).await
674 }
675
676 #[tracing::instrument(name = "pg.get_flow_tag", skip_all)]
677 async fn get_flow_tag(
678 &self,
679 flow_id: &FlowId,
680 key: &str,
681 ) -> Result<Option<String>, EngineError> {
682 ff_core::engine_backend::validate_tag_key(key)?;
683 flow::get_flow_tag_impl(&self.pool, &self.partition_config, flow_id, key).await
684 }
685
686 #[tracing::instrument(name = "pg.get_execution_namespace", skip_all)]
687 async fn get_execution_namespace(
688 &self,
689 execution_id: &ExecutionId,
690 ) -> Result<Option<String>, EngineError> {
691 exec_core::get_execution_namespace_impl(&self.pool, execution_id).await
692 }
693
694 #[cfg(feature = "core")]
695 #[tracing::instrument(name = "pg.list_edges", skip_all)]
696 async fn list_edges(
697 &self,
698 flow_id: &FlowId,
699 direction: EdgeDirection,
700 ) -> Result<Vec<EdgeSnapshot>, EngineError> {
701 flow::list_edges(&self.pool, &self.partition_config, flow_id, direction).await
702 }
703
704 #[cfg(feature = "core")]
705 #[tracing::instrument(name = "pg.describe_edge", skip_all)]
706 async fn describe_edge(
707 &self,
708 flow_id: &FlowId,
709 edge_id: &EdgeId,
710 ) -> Result<Option<EdgeSnapshot>, EngineError> {
711 flow::describe_edge(&self.pool, &self.partition_config, flow_id, edge_id).await
712 }
713
714 #[cfg(feature = "core")]
715 #[tracing::instrument(name = "pg.resolve_execution_flow_id", skip_all)]
716 async fn resolve_execution_flow_id(
717 &self,
718 eid: &ExecutionId,
719 ) -> Result<Option<FlowId>, EngineError> {
720 exec_core::resolve_execution_flow_id_impl(&self.pool, &self.partition_config, eid).await
721 }
722
723 #[cfg(feature = "core")]
724 #[tracing::instrument(name = "pg.list_flows", skip_all)]
725 async fn list_flows(
726 &self,
727 partition: PartitionKey,
728 cursor: Option<FlowId>,
729 limit: usize,
730 ) -> Result<ListFlowsPage, EngineError> {
731 flow::list_flows(&self.pool, partition, cursor, limit).await
732 }
733
734 #[cfg(feature = "core")]
735 #[tracing::instrument(name = "pg.list_lanes", skip_all)]
736 async fn list_lanes(
737 &self,
738 cursor: Option<LaneId>,
739 limit: usize,
740 ) -> Result<ListLanesPage, EngineError> {
741 admin::list_lanes_impl(&self.pool, cursor, limit).await
742 }
743
744 #[cfg(feature = "core")]
745 #[tracing::instrument(name = "pg.list_suspended", skip_all)]
746 async fn list_suspended(
747 &self,
748 partition: PartitionKey,
749 cursor: Option<ExecutionId>,
750 limit: usize,
751 ) -> Result<ListSuspendedPage, EngineError> {
752 admin::list_suspended_impl(&self.pool, partition, cursor, limit).await
753 }
754
755 #[cfg(feature = "core")]
756 #[tracing::instrument(name = "pg.list_executions", skip_all)]
757 async fn list_executions(
758 &self,
759 partition: PartitionKey,
760 cursor: Option<ExecutionId>,
761 limit: usize,
762 ) -> Result<ListExecutionsPage, EngineError> {
763 exec_core::list_executions_impl(
764 &self.pool,
765 &self.partition_config,
766 partition,
767 cursor,
768 limit,
769 )
770 .await
771 }
772
773 #[cfg(feature = "core")]
776 #[tracing::instrument(name = "pg.deliver_signal", skip_all)]
777 async fn deliver_signal(
778 &self,
779 args: DeliverSignalArgs,
780 ) -> Result<DeliverSignalResult, EngineError> {
781 suspend_ops::deliver_signal_impl(&self.pool, &self.partition_config, args).await
782 }
783
784 #[cfg(feature = "core")]
785 #[tracing::instrument(name = "pg.deliver_approval_signal", skip_all)]
786 async fn deliver_approval_signal(
787 &self,
788 args: DeliverApprovalSignalArgs,
789 ) -> Result<DeliverSignalResult, EngineError> {
790 suspend_ops::deliver_approval_signal_impl(&self.pool, &self.partition_config, args).await
791 }
792
793 #[cfg(feature = "core")]
794 #[tracing::instrument(name = "pg.claim_resumed_execution", skip_all)]
795 async fn claim_resumed_execution(
796 &self,
797 args: ClaimResumedExecutionArgs,
798 ) -> Result<ClaimResumedExecutionResult, EngineError> {
799 suspend_ops::claim_resumed_execution_impl(&self.pool, &self.partition_config, args).await
800 }
801
802 #[cfg(feature = "core")]
812 #[tracing::instrument(name = "pg.read_execution_state", skip_all)]
813 async fn read_execution_state(
814 &self,
815 id: &ExecutionId,
816 ) -> Result<Option<PublicState>, EngineError> {
817 exec_core::read_execution_state_impl(&self.pool, &self.partition_config, id).await
818 }
819
820 #[cfg(feature = "core")]
821 #[tracing::instrument(name = "pg.read_execution_info", skip_all)]
822 async fn read_execution_info(
823 &self,
824 id: &ExecutionId,
825 ) -> Result<Option<ExecutionInfo>, EngineError> {
826 exec_core::read_execution_info_impl(&self.pool, &self.partition_config, id).await
827 }
828
829 #[tracing::instrument(name = "pg.get_execution_result", skip_all)]
830 async fn get_execution_result(
831 &self,
832 id: &ExecutionId,
833 ) -> Result<Option<Vec<u8>>, EngineError> {
834 exec_core::get_execution_result_impl(&self.pool, &self.partition_config, id).await
835 }
836
837 #[cfg(feature = "core")]
846 #[tracing::instrument(name = "pg.list_pending_waitpoints", skip_all)]
847 async fn list_pending_waitpoints(
848 &self,
849 args: ListPendingWaitpointsArgs,
850 ) -> Result<ListPendingWaitpointsResult, EngineError> {
851 suspend_ops::list_pending_waitpoints_impl(&self.pool, args).await
852 }
853
854 #[cfg(feature = "core")]
862 #[tracing::instrument(name = "pg.cancel_execution", skip_all)]
863 async fn cancel_execution(
864 &self,
865 args: CancelExecutionArgs,
866 ) -> Result<CancelExecutionResult, EngineError> {
867 operator::cancel_execution_impl(&self.pool, args).await
868 }
869
870 #[cfg(feature = "core")]
871 #[tracing::instrument(name = "pg.revoke_lease", skip_all)]
872 async fn revoke_lease(
873 &self,
874 args: RevokeLeaseArgs,
875 ) -> Result<RevokeLeaseResult, EngineError> {
876 operator::revoke_lease_impl(&self.pool, args).await
877 }
878
879 #[cfg(feature = "core")]
889 #[tracing::instrument(name = "pg.change_priority", skip_all)]
890 async fn change_priority(
891 &self,
892 args: ChangePriorityArgs,
893 ) -> Result<ChangePriorityResult, EngineError> {
894 operator::change_priority_impl(&self.pool, args).await
895 }
896
897 #[cfg(feature = "core")]
898 #[tracing::instrument(name = "pg.replay_execution", skip_all)]
899 async fn replay_execution(
900 &self,
901 args: ReplayExecutionArgs,
902 ) -> Result<ReplayExecutionResult, EngineError> {
903 operator::replay_execution_impl(&self.pool, args).await
904 }
905
906 #[cfg(feature = "core")]
907 #[tracing::instrument(name = "pg.cancel_flow_header", skip_all)]
908 async fn cancel_flow_header(
909 &self,
910 args: CancelFlowArgs,
911 ) -> Result<CancelFlowHeader, EngineError> {
912 operator::cancel_flow_header_impl(&self.pool, &self.partition_config, args).await
913 }
914
915 #[cfg(feature = "core")]
916 #[tracing::instrument(name = "pg.ack_cancel_member", skip_all)]
917 async fn ack_cancel_member(
918 &self,
919 flow_id: &FlowId,
920 execution_id: &ExecutionId,
921 ) -> Result<(), EngineError> {
922 operator::ack_cancel_member_impl(
923 &self.pool,
924 &self.partition_config,
925 flow_id.clone(),
926 execution_id.clone(),
927 )
928 .await
929 }
930
931 #[cfg(feature = "core")]
942 #[tracing::instrument(name = "pg.create_execution.trait", skip_all)]
943 async fn create_execution(
944 &self,
945 args: CreateExecutionArgs,
946 ) -> Result<CreateExecutionResult, EngineError> {
947 let eid = args.execution_id.clone();
948 exec_core::create_execution_impl(&self.pool, &self.partition_config, args).await?;
949 Ok(CreateExecutionResult::Created {
950 execution_id: eid,
951 public_state: PublicState::Waiting,
952 })
953 }
954
955 #[cfg(feature = "core")]
956 #[tracing::instrument(name = "pg.create_flow", skip_all)]
957 async fn create_flow(
958 &self,
959 args: CreateFlowArgs,
960 ) -> Result<CreateFlowResult, EngineError> {
961 flow_staging::create_flow(&self.pool, &self.partition_config, &args).await
962 }
963
964 #[cfg(feature = "core")]
965 #[tracing::instrument(name = "pg.add_execution_to_flow", skip_all)]
966 async fn add_execution_to_flow(
967 &self,
968 args: AddExecutionToFlowArgs,
969 ) -> Result<AddExecutionToFlowResult, EngineError> {
970 flow_staging::add_execution_to_flow(&self.pool, &self.partition_config, &args).await
971 }
972
973 #[cfg(feature = "core")]
974 #[tracing::instrument(name = "pg.stage_dependency_edge", skip_all)]
975 async fn stage_dependency_edge(
976 &self,
977 args: StageDependencyEdgeArgs,
978 ) -> Result<StageDependencyEdgeResult, EngineError> {
979 flow_staging::stage_dependency_edge(&self.pool, &self.partition_config, &args).await
980 }
981
982 #[cfg(feature = "core")]
983 #[tracing::instrument(name = "pg.apply_dependency_to_child", skip_all)]
984 async fn apply_dependency_to_child(
985 &self,
986 args: ApplyDependencyToChildArgs,
987 ) -> Result<ApplyDependencyToChildResult, EngineError> {
988 flow_staging::apply_dependency_to_child(&self.pool, &self.partition_config, &args).await
989 }
990
991 #[cfg(feature = "core")]
997 async fn cascade_completion(
998 &self,
999 payload: &ff_core::backend::CompletionPayload,
1000 ) -> Result<ff_core::contracts::CascadeOutcome, EngineError> {
1001 let event_id = match resolve_event_id(&self.pool, payload).await {
1002 Some(id) => id,
1003 None => {
1004 tracing::warn!(
1007 execution_id = %payload.execution_id,
1008 produced_at_ms = payload.produced_at_ms.0,
1009 "pg.cascade_completion: could not resolve event_id; reconciler will claim"
1010 );
1011 return Ok(ff_core::contracts::CascadeOutcome::async_dispatched(0));
1012 }
1013 };
1014 let outcome = crate::dispatch::dispatch_completion(&self.pool, event_id).await?;
1015 let advanced = match outcome {
1016 crate::dispatch::DispatchOutcome::NoOp => 0,
1017 crate::dispatch::DispatchOutcome::Advanced(n) => n,
1018 };
1019 Ok(ff_core::contracts::CascadeOutcome::async_dispatched(advanced))
1020 }
1021
1022 fn backend_label(&self) -> &'static str {
1023 "postgres"
1024 }
1025
1026 fn capabilities(&self) -> ff_core::capability::Capabilities {
1034 ff_core::capability::Capabilities::new(
1035 ff_core::capability::BackendIdentity::new(
1036 "postgres",
1037 ff_core::capability::Version::new(0, 11, 0),
1038 "E-shipped",
1039 ),
1040 postgres_supports_base(),
1041 )
1042 }
1043
1044 async fn prepare(
1052 &self,
1053 ) -> Result<ff_core::backend::PrepareOutcome, EngineError> {
1054 Ok(ff_core::backend::PrepareOutcome::NoOp)
1055 }
1056
1057 #[cfg(feature = "core")]
1065 #[tracing::instrument(name = "pg.claim_for_worker", skip_all)]
1066 async fn claim_for_worker(
1067 &self,
1068 args: ff_core::contracts::ClaimForWorkerArgs,
1069 ) -> Result<ff_core::contracts::ClaimForWorkerOutcome, EngineError> {
1070 let sched = scheduler::PostgresScheduler::new(self.pool.clone());
1071 let grant_opt = sched
1072 .claim_for_worker(
1073 &args.lane_id,
1074 &args.worker_id,
1075 &args.worker_instance_id,
1076 &args.worker_capabilities,
1077 args.grant_ttl_ms,
1078 )
1079 .await?;
1080 Ok(match grant_opt {
1081 Some(g) => ff_core::contracts::ClaimForWorkerOutcome::granted(g),
1082 None => ff_core::contracts::ClaimForWorkerOutcome::no_work(),
1083 })
1084 }
1085
1086 async fn ping(&self) -> Result<(), EngineError> {
1087 let _ = sqlx::query_scalar::<_, i32>("SELECT 1")
1091 .fetch_one(&self.pool)
1092 .await
1093 .map_err(error::map_sqlx_error)?;
1094 Ok(())
1095 }
1096
1097 async fn shutdown_prepare(&self, grace: Duration) -> Result<(), EngineError> {
1102 #[cfg(feature = "core")]
1103 if let Some(handle) = self.scanner_handle.as_ref() {
1104 let timed_out = handle.shutdown(grace).await;
1105 if timed_out > 0 {
1106 tracing::warn!(
1107 timed_out,
1108 ?grace,
1109 "postgres scanner supervisor exceeded grace on shutdown"
1110 );
1111 }
1112 }
1113 Ok(())
1114 }
1115
1116 #[tracing::instrument(name = "pg.cancel_flow", skip_all)]
1117 async fn cancel_flow(
1118 &self,
1119 id: &FlowId,
1120 policy: CancelFlowPolicy,
1121 wait: CancelFlowWait,
1122 ) -> Result<CancelFlowResult, EngineError> {
1123 let result = flow::cancel_flow(&self.pool, &self.partition_config, id, policy).await?;
1124 if let Some(deadline) = ff_core::engine_backend::cancel_flow_wait_deadline(wait) {
1125 ff_core::engine_backend::wait_for_flow_cancellation(self, id, deadline).await?;
1126 }
1127 Ok(result)
1128 }
1129
1130 #[cfg(feature = "core")]
1131 #[tracing::instrument(name = "pg.set_edge_group_policy", skip_all)]
1132 async fn set_edge_group_policy(
1133 &self,
1134 flow_id: &FlowId,
1135 downstream_execution_id: &ExecutionId,
1136 policy: EdgeDependencyPolicy,
1137 ) -> Result<SetEdgeGroupPolicyResult, EngineError> {
1138 flow::set_edge_group_policy(
1139 &self.pool,
1140 &self.partition_config,
1141 flow_id,
1142 downstream_execution_id,
1143 policy,
1144 )
1145 .await
1146 }
1147
1148 #[tracing::instrument(name = "pg.report_usage", skip_all)]
1151 async fn report_usage(
1152 &self,
1153 _handle: &Handle,
1154 budget: &BudgetId,
1155 dimensions: UsageDimensions,
1156 ) -> Result<ReportUsageResult, EngineError> {
1157 budget::report_usage_impl(&self.pool, &self.partition_config, budget, dimensions).await
1158 }
1159
1160 #[cfg(feature = "core")]
1168 #[tracing::instrument(name = "pg.create_budget", skip_all)]
1169 async fn create_budget(
1170 &self,
1171 args: CreateBudgetArgs,
1172 ) -> Result<CreateBudgetResult, EngineError> {
1173 budget::create_budget_impl(&self.pool, &self.partition_config, args).await
1174 }
1175
1176 #[cfg(feature = "core")]
1177 #[tracing::instrument(name = "pg.reset_budget", skip_all)]
1178 async fn reset_budget(
1179 &self,
1180 args: ResetBudgetArgs,
1181 ) -> Result<ResetBudgetResult, EngineError> {
1182 budget::reset_budget_impl(&self.pool, &self.partition_config, args).await
1183 }
1184
1185 #[cfg(feature = "core")]
1186 #[tracing::instrument(name = "pg.create_quota_policy", skip_all)]
1187 async fn create_quota_policy(
1188 &self,
1189 args: CreateQuotaPolicyArgs,
1190 ) -> Result<CreateQuotaPolicyResult, EngineError> {
1191 budget::create_quota_policy_impl(&self.pool, &self.partition_config, args).await
1192 }
1193
1194 #[cfg(feature = "core")]
1195 #[tracing::instrument(name = "pg.get_budget_status", skip_all)]
1196 async fn get_budget_status(
1197 &self,
1198 id: &BudgetId,
1199 ) -> Result<BudgetStatus, EngineError> {
1200 budget::get_budget_status_impl(&self.pool, &self.partition_config, id).await
1201 }
1202
1203 #[cfg(feature = "core")]
1204 #[tracing::instrument(name = "pg.report_usage_admin", skip_all)]
1205 async fn report_usage_admin(
1206 &self,
1207 budget_id: &BudgetId,
1208 args: ReportUsageAdminArgs,
1209 ) -> Result<ReportUsageResult, EngineError> {
1210 budget::report_usage_admin_impl(&self.pool, &self.partition_config, budget_id, args).await
1211 }
1212
1213 #[cfg(feature = "core")]
1216 #[tracing::instrument(name = "pg.record_spend", skip_all)]
1217 async fn record_spend(
1218 &self,
1219 args: ff_core::contracts::RecordSpendArgs,
1220 ) -> Result<ReportUsageResult, EngineError> {
1221 budget::record_spend_impl(&self.pool, &self.partition_config, args).await
1222 }
1223
1224 #[cfg(feature = "core")]
1225 #[tracing::instrument(name = "pg.release_budget", skip_all)]
1226 async fn release_budget(
1227 &self,
1228 args: ff_core::contracts::ReleaseBudgetArgs,
1229 ) -> Result<(), EngineError> {
1230 budget::release_budget_impl(&self.pool, &self.partition_config, args).await
1231 }
1232
1233 #[tracing::instrument(name = "pg.rotate_waitpoint_hmac_secret_all", skip_all)]
1240 async fn rotate_waitpoint_hmac_secret_all(
1241 &self,
1242 args: RotateWaitpointHmacSecretAllArgs,
1243 ) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
1244 let now_ms = std::time::SystemTime::now()
1249 .duration_since(std::time::UNIX_EPOCH)
1250 .map(|d| d.as_millis() as i64)
1251 .unwrap_or(0);
1252 signal::rotate_waitpoint_hmac_secret_all_impl(&self.pool, args, now_ms).await
1253 }
1254
1255 #[tracing::instrument(name = "pg.seed_waitpoint_hmac_secret", skip_all)]
1256 async fn seed_waitpoint_hmac_secret(
1257 &self,
1258 args: SeedWaitpointHmacSecretArgs,
1259 ) -> Result<SeedOutcome, EngineError> {
1260 let now_ms = std::time::SystemTime::now()
1264 .duration_since(std::time::UNIX_EPOCH)
1265 .map(|d| d.as_millis() as i64)
1266 .unwrap_or(0);
1267 signal::seed_waitpoint_hmac_secret_impl(&self.pool, args, now_ms).await
1268 }
1269
1270 #[cfg(feature = "streaming")]
1273 #[tracing::instrument(name = "pg.read_stream", skip_all)]
1274 async fn read_stream(
1275 &self,
1276 execution_id: &ExecutionId,
1277 attempt_index: AttemptIndex,
1278 from: StreamCursor,
1279 to: StreamCursor,
1280 count_limit: u64,
1281 ) -> Result<StreamFrames, EngineError> {
1282 stream::read_stream(&self.pool, execution_id, attempt_index, from, to, count_limit).await
1283 }
1284
1285 #[cfg(feature = "streaming")]
1286 #[tracing::instrument(name = "pg.tail_stream", skip_all)]
1287 async fn tail_stream(
1288 &self,
1289 execution_id: &ExecutionId,
1290 attempt_index: AttemptIndex,
1291 after: StreamCursor,
1292 block_ms: u64,
1293 count_limit: u64,
1294 visibility: TailVisibility,
1295 ) -> Result<StreamFrames, EngineError> {
1296 let notifier = self
1297 .stream_notifier
1298 .as_ref()
1299 .ok_or(EngineError::Unavailable {
1300 op: "pg.tail_stream (notifier not initialised)",
1301 })?;
1302 stream::tail_stream(
1303 &self.pool,
1304 notifier,
1305 execution_id,
1306 attempt_index,
1307 after,
1308 block_ms,
1309 count_limit,
1310 visibility,
1311 )
1312 .await
1313 }
1314
1315 #[cfg(feature = "streaming")]
1316 #[tracing::instrument(name = "pg.read_summary", skip_all)]
1317 async fn read_summary(
1318 &self,
1319 execution_id: &ExecutionId,
1320 attempt_index: AttemptIndex,
1321 ) -> Result<Option<SummaryDocument>, EngineError> {
1322 stream::read_summary(&self.pool, execution_id, attempt_index).await
1323 }
1324
1325 #[tracing::instrument(name = "pg.subscribe_completion", skip_all)]
1339 async fn subscribe_completion(
1340 &self,
1341 _cursor: ff_core::stream_subscribe::StreamCursor,
1342 filter: &ff_core::backend::ScannerFilter,
1343 ) -> Result<ff_core::stream_events::CompletionSubscription, EngineError> {
1344 use ff_core::stream_events::{CompletionEvent, CompletionOutcome};
1345 use ff_core::stream_subscribe::encode_postgres_event_cursor;
1346 use futures_core::Stream;
1347 use std::pin::Pin;
1348 use std::task::{Context, Poll};
1349
1350 let inner = if filter.is_noop() {
1357 ff_core::completion_backend::CompletionBackend::subscribe_completions(self).await?
1358 } else {
1359 ff_core::completion_backend::CompletionBackend::subscribe_completions_filtered(
1360 self, filter,
1361 )
1362 .await?
1363 };
1364
1365 struct Adapter {
1366 inner: ff_core::completion_backend::CompletionStream,
1367 }
1368
1369 impl Stream for Adapter {
1370 type Item = Result<CompletionEvent, EngineError>;
1371 fn poll_next(
1372 mut self: Pin<&mut Self>,
1373 cx: &mut Context<'_>,
1374 ) -> Poll<Option<Self::Item>> {
1375 match Pin::new(&mut self.inner).poll_next(cx) {
1376 Poll::Pending => Poll::Pending,
1377 Poll::Ready(None) => Poll::Ready(None),
1378 Poll::Ready(Some(payload)) => {
1379 let cursor = encode_postgres_event_cursor(0);
1384 let event = CompletionEvent::new(
1385 cursor,
1386 payload.execution_id.clone(),
1387 CompletionOutcome::from_wire(&payload.outcome),
1388 payload.produced_at_ms,
1389 );
1390 Poll::Ready(Some(Ok(event)))
1391 }
1392 }
1393 }
1394 }
1395
1396 Ok(Box::pin(Adapter { inner }))
1397 }
1398
1399 #[tracing::instrument(name = "pg.subscribe_lease_history", skip_all)]
1413 async fn subscribe_lease_history(
1414 &self,
1415 cursor: ff_core::stream_subscribe::StreamCursor,
1416 filter: &ff_core::backend::ScannerFilter,
1417 ) -> Result<ff_core::stream_events::LeaseHistorySubscription, EngineError> {
1418 lease_event_subscribe::subscribe(&self.pool, 0, cursor, filter.clone()).await
1419 }
1420
1421 #[tracing::instrument(name = "pg.subscribe_signal_delivery", skip_all)]
1431 async fn subscribe_signal_delivery(
1432 &self,
1433 cursor: ff_core::stream_subscribe::StreamCursor,
1434 filter: &ff_core::backend::ScannerFilter,
1435 ) -> Result<ff_core::stream_events::SignalDeliverySubscription, EngineError> {
1436 signal_delivery_subscribe::subscribe(&self.pool, 0, cursor, filter.clone()).await
1437 }
1438
1439 #[cfg(feature = "core")]
1461 async fn mark_lease_expired_if_due(
1462 &self,
1463 partition: Partition,
1464 execution_id: &ExecutionId,
1465 ) -> Result<(), EngineError> {
1466 let (_pk_from_eid, exec_uuid) = attempt::split_exec_id(execution_id)?;
1467 let partition_key = partition_index_to_i16(partition)?;
1468 reconcilers::lease_expiry::release_for_execution(&self.pool, partition_key, exec_uuid)
1469 .await
1470 }
1471
1472 #[cfg(feature = "core")]
1473 async fn promote_delayed(
1474 &self,
1475 partition: Partition,
1476 _lane: &LaneId,
1477 execution_id: &ExecutionId,
1478 now_ms: TimestampMs,
1479 ) -> Result<(), EngineError> {
1480 let (_pk_from_eid, exec_uuid) = attempt::split_exec_id(execution_id)?;
1481 let partition_key = partition_index_to_i16(partition)?;
1482 reconcilers::delayed_promoter::promote_for_execution(
1488 &self.pool,
1489 partition_key,
1490 exec_uuid,
1491 now_ms.0,
1492 )
1493 .await
1494 }
1495
1496 #[cfg(feature = "core")]
1497 async fn close_waitpoint(
1498 &self,
1499 partition: Partition,
1500 _execution_id: &ExecutionId,
1501 waitpoint_id: &str,
1502 now_ms: TimestampMs,
1503 ) -> Result<(), EngineError> {
1504 let partition_key = partition_index_to_i16(partition)?;
1510 let waitpoint_uuid = uuid::Uuid::parse_str(waitpoint_id).map_err(|e| {
1511 EngineError::Validation {
1512 kind: ff_core::engine_error::ValidationKind::InvalidInput,
1513 detail: format!("waitpoint_id not a UUID: {e}"),
1514 }
1515 })?;
1516 reconcilers::pending_wp_expiry::close_for_execution(
1517 &self.pool,
1518 partition_key,
1519 waitpoint_uuid,
1520 now_ms.0,
1521 )
1522 .await
1523 }
1524
1525 #[cfg(feature = "core")]
1526 async fn expire_execution(
1527 &self,
1528 partition: Partition,
1529 execution_id: &ExecutionId,
1530 phase: ExpirePhase,
1531 now_ms: TimestampMs,
1532 ) -> Result<(), EngineError> {
1533 let (_pk_from_eid, exec_uuid) = attempt::split_exec_id(execution_id)?;
1534 let partition_key = partition_index_to_i16(partition)?;
1535 match phase {
1536 ExpirePhase::AttemptTimeout => {
1537 reconcilers::attempt_timeout::expire_for_execution(
1538 &self.pool,
1539 partition_key,
1540 exec_uuid,
1541 )
1542 .await
1543 }
1544 ExpirePhase::ExecutionDeadline => {
1545 reconcilers::execution_deadline::expire_for_execution(
1546 &self.pool,
1547 partition_key,
1548 exec_uuid,
1549 now_ms.0,
1550 )
1551 .await
1552 }
1553 }
1554 }
1555
1556 #[cfg(feature = "core")]
1557 async fn expire_suspension(
1558 &self,
1559 partition: Partition,
1560 execution_id: &ExecutionId,
1561 _now_ms: TimestampMs,
1562 ) -> Result<(), EngineError> {
1563 let (_pk_from_eid, exec_uuid) = attempt::split_exec_id(execution_id)?;
1564 let partition_key = partition_index_to_i16(partition)?;
1565 reconcilers::suspension_timeout::expire_for_execution(
1566 &self.pool,
1567 partition_key,
1568 exec_uuid,
1569 )
1570 .await
1571 }
1572
1573 #[cfg(feature = "core")]
1578 async fn project_flow_summary(
1579 &self,
1580 partition: Partition,
1581 flow_id: &FlowId,
1582 now_ms: TimestampMs,
1583 ) -> Result<bool, EngineError> {
1584 let partition_key = partition_index_to_i16(partition)?;
1585 let flow_uuid: sqlx::types::Uuid = flow_id.0;
1586 flow::project_flow_summary_impl(
1587 &self.pool,
1588 partition_key,
1589 flow_uuid,
1590 now_ms.0,
1591 )
1592 .await
1593 }
1594
1595 #[cfg(feature = "core")]
1600 async fn trim_retention(
1601 &self,
1602 partition: Partition,
1603 lane_id: &LaneId,
1604 retention_ms: u64,
1605 now_ms: TimestampMs,
1606 batch_size: u32,
1607 filter: &ff_core::backend::ScannerFilter,
1608 ) -> Result<u32, EngineError> {
1609 let partition_key = partition_index_to_i16(partition)?;
1610 exec_core::trim_retention_impl(
1611 &self.pool,
1612 partition_key,
1613 lane_id.as_str(),
1614 retention_ms,
1615 now_ms.0,
1616 batch_size,
1617 filter,
1618 )
1619 .await
1620 }
1621
1622 #[cfg(feature = "core")]
1625 async fn renew_lease(
1626 &self,
1627 args: ff_core::contracts::RenewLeaseArgs,
1628 ) -> Result<ff_core::contracts::RenewLeaseResult, EngineError> {
1629 crate::typed_ops::renew_lease(self.pool(), args).await
1630 }
1631
1632 #[cfg(feature = "core")]
1633 async fn complete_execution(
1634 &self,
1635 args: ff_core::contracts::CompleteExecutionArgs,
1636 ) -> Result<ff_core::contracts::CompleteExecutionResult, EngineError> {
1637 crate::typed_ops::complete_execution(self.pool(), args).await
1638 }
1639
1640 #[cfg(feature = "core")]
1641 async fn fail_execution(
1642 &self,
1643 args: ff_core::contracts::FailExecutionArgs,
1644 ) -> Result<ff_core::contracts::FailExecutionResult, EngineError> {
1645 crate::typed_ops::fail_execution(self.pool(), args).await
1646 }
1647
1648 #[cfg(feature = "core")]
1649 async fn resume_execution(
1650 &self,
1651 args: ff_core::contracts::ResumeExecutionArgs,
1652 ) -> Result<ff_core::contracts::ResumeExecutionResult, EngineError> {
1653 crate::typed_ops::resume_execution(self.pool(), args).await
1654 }
1655
1656 #[cfg(feature = "core")]
1657 async fn check_admission(
1658 &self,
1659 quota_policy_id: &ff_core::types::QuotaPolicyId,
1660 _dimension: &str,
1661 args: ff_core::contracts::CheckAdmissionArgs,
1662 ) -> Result<ff_core::contracts::CheckAdmissionResult, EngineError> {
1663 crate::typed_ops::check_admission(
1664 self.pool(),
1665 &self.partition_config,
1666 quota_policy_id,
1667 args,
1668 )
1669 .await
1670 }
1671
1672 #[cfg(feature = "core")]
1673 async fn evaluate_flow_eligibility(
1674 &self,
1675 args: ff_core::contracts::EvaluateFlowEligibilityArgs,
1676 ) -> Result<ff_core::contracts::EvaluateFlowEligibilityResult, EngineError> {
1677 crate::typed_ops::evaluate_flow_eligibility(self.pool(), args).await
1678 }
1679
1680 #[cfg(feature = "core")]
1681 async fn claim_execution(
1682 &self,
1683 args: ff_core::contracts::ClaimExecutionArgs,
1684 ) -> Result<ff_core::contracts::ClaimExecutionResult, EngineError> {
1685 crate::typed_ops::claim_execution(self.pool(), &self.partition_config, args).await
1686 }
1687
1688 #[cfg(feature = "core")]
1692 #[tracing::instrument(name = "pg.issue_grant_and_claim", skip_all)]
1693 async fn issue_grant_and_claim(
1694 &self,
1695 args: ff_core::contracts::IssueGrantAndClaimArgs,
1696 ) -> Result<ff_core::contracts::ClaimGrantOutcome, EngineError> {
1697 crate::typed_ops::issue_grant_and_claim(self.pool(), &self.partition_config, args).await
1698 }
1699
1700 async fn read_exec_core_fields(
1703 &self,
1704 partition: ff_core::partition::Partition,
1705 execution_id: &ff_core::types::ExecutionId,
1706 fields: &[&str],
1707 ) -> Result<std::collections::HashMap<String, Option<String>>, EngineError> {
1708 if fields.is_empty() {
1709 return Ok(std::collections::HashMap::new());
1710 }
1711 let derived: u16 = execution_id.partition();
1716 if partition.index != derived {
1717 return Err(EngineError::Validation {
1718 kind: ff_core::engine_error::ValidationKind::InvalidInput,
1719 detail: format!(
1720 "read_exec_core_fields: partition mismatch (arg={}, eid={})",
1721 partition.index, derived
1722 ),
1723 });
1724 }
1725 let partition_key: i16 = partition.index as i16;
1726 let exec_uuid = crate::exec_core::eid_uuid(execution_id);
1727
1728 let mut projections: Vec<String> = Vec::with_capacity(fields.len());
1748 for field in fields {
1749 let expr = match *field {
1750 "lane_id" | "lifecycle_phase" | "ownership_state" | "eligibility_state"
1752 | "public_state" | "attempt_state" | "blocking_reason" | "cancellation_reason"
1753 | "cancelled_by" => format!("{f}::text", f = field),
1754 "attempt_index" => "attempt_index::text".to_string(),
1755 "flow_id" => "flow_id::text".to_string(),
1756 "priority" => "priority::text".to_string(),
1757 "created_at_ms" => "created_at_ms::text".to_string(),
1758 "terminal_at_ms" => "terminal_at_ms::text".to_string(),
1759 "deadline_at_ms" => "deadline_at_ms::text".to_string(),
1760 "current_attempt_index" => "attempt_index::text".to_string(),
1763 "completed_at" => "terminal_at_ms::text".to_string(),
1764 "cancel_reason" => "cancellation_reason::text".to_string(),
1765 "required_capabilities" => {
1768 "array_to_string(required_capabilities, ',')".to_string()
1769 }
1770 other => {
1772 match other {
1775 "current_waitpoint_id"
1776 | "current_worker_instance_id"
1777 | "budget_ids"
1778 | "quota_policy_id" => format!("raw_fields ->> '{other}'"),
1779 _ => "NULL".to_string(),
1780 }
1781 }
1782 };
1783 projections.push(expr);
1784 }
1785 let projection_sql = projections.join(", ");
1786 let query = format!(
1787 "SELECT {projection_sql} FROM ff_exec_core \
1788 WHERE partition_key = $1 AND execution_id = $2"
1789 );
1790 let row_opt = sqlx::query(&query)
1791 .bind(partition_key)
1792 .bind(exec_uuid)
1793 .fetch_optional(self.pool())
1794 .await
1795 .map_err(|e| EngineError::Transport {
1796 backend: "postgres",
1797 source: format!("read_exec_core_fields: {e}").into(),
1798 })?;
1799
1800 let mut out = std::collections::HashMap::with_capacity(fields.len());
1801 if let Some(row) = row_opt {
1802 use sqlx::Row;
1803 for (idx, field) in fields.iter().enumerate() {
1804 let val: Option<String> =
1805 row.try_get(idx).map_err(|e| EngineError::Transport {
1806 backend: "postgres",
1807 source: format!("read_exec_core_fields[{field}]: {e}").into(),
1808 })?;
1809 out.insert((*field).to_string(), val);
1810 }
1811 } else {
1812 for field in fields {
1813 out.insert((*field).to_string(), None);
1814 }
1815 }
1816 Ok(out)
1817 }
1818
1819 async fn server_time_ms(&self) -> Result<u64, EngineError> {
1822 let ms: i64 = sqlx::query_scalar(
1827 "SELECT (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint",
1828 )
1829 .fetch_one(self.pool())
1830 .await
1831 .map_err(|e| EngineError::Transport {
1832 backend: "postgres",
1833 source: format!("server_time_ms: {e}").into(),
1834 })?;
1835 if ms < 0 {
1836 return Err(EngineError::Transport {
1837 backend: "postgres",
1838 source: "server_time_ms: negative epoch".into(),
1839 });
1840 }
1841 Ok(ms as u64)
1842 }
1843}
1844
1845async fn resolve_event_id(
1864 pool: &PgPool,
1865 payload: &ff_core::backend::CompletionPayload,
1866) -> Option<i64> {
1867 let eid_str = payload.execution_id.as_str();
1868 let uuid_str = eid_str.rsplit_once(':').map(|(_, u)| u)?;
1871 let uuid = uuid::Uuid::parse_str(uuid_str).ok()?;
1872 let partition_key = i16::try_from(payload.execution_id.partition()).ok()?;
1876 let occurred_at_ms = payload.produced_at_ms.0;
1877
1878 match sqlx::query_scalar::<_, i64>(
1879 "SELECT event_id FROM ff_completion_event \
1880 WHERE partition_key = $1 AND execution_id = $2 AND occurred_at_ms = $3 \
1881 ORDER BY event_id ASC LIMIT 1",
1882 )
1883 .bind(partition_key)
1884 .bind(uuid)
1885 .bind(occurred_at_ms)
1886 .fetch_optional(pool)
1887 .await
1888 {
1889 Ok(row) => row,
1890 Err(err) => {
1891 tracing::warn!(
1892 partition_key,
1893 execution_id = %uuid,
1894 occurred_at_ms,
1895 error = %err,
1896 "resolve_event_id: ff_completion_event lookup failed; falling back to \
1897 dependency_reconciler backstop"
1898 );
1899 None
1900 }
1901 }
1902}
1903
1904fn partition_index_to_i16(partition: Partition) -> Result<i16, EngineError> {
1913 i16::try_from(partition.index).map_err(|_| EngineError::Validation {
1914 kind: ff_core::engine_error::ValidationKind::InvalidInput,
1915 detail: format!(
1916 "partition index {} exceeds i16 range (max {})",
1917 partition.index,
1918 i16::MAX
1919 ),
1920 })
1921}
1922
1923const MIN_MAX_LOCKS_PER_TRANSACTION: i64 = 256;
1932
1933async fn warn_if_max_locks_low(pool: &PgPool) {
1938 let row: Result<(String,), sqlx::Error> =
1939 sqlx::query_as("SHOW max_locks_per_transaction")
1940 .fetch_one(pool)
1941 .await;
1942 match row {
1943 Ok((raw,)) => emit_max_locks_decision(&raw),
1944 Err(e) => {
1945 tracing::debug!("failed to probe max_locks_per_transaction: {e}");
1946 }
1947 }
1948}
1949
1950fn max_locks_warn_value(raw: &str) -> Option<i64> {
1956 match raw.parse::<i64>() {
1957 Ok(v) if v < MIN_MAX_LOCKS_PER_TRANSACTION => Some(v),
1958 Ok(_) => None,
1959 Err(e) => {
1960 tracing::debug!(raw, "failed to parse max_locks_per_transaction: {e}");
1961 None
1962 }
1963 }
1964}
1965
1966fn emit_max_locks_decision(raw: &str) {
1967 if let Some(v) = max_locks_warn_value(raw) {
1968 tracing::warn!(
1969 current = v,
1970 recommended = MIN_MAX_LOCKS_PER_TRANSACTION,
1971 "postgres max_locks_per_transaction={v} is below the recommended \
1972 minimum ({MIN_MAX_LOCKS_PER_TRANSACTION}); partition-heavy workloads \
1973 may hit 'out of shared memory' under concurrent load. \
1974 See docs/operator-guide-postgres.md."
1975 );
1976 }
1977}
1978
1979#[cfg(test)]
1980mod max_locks_tests {
1981 use super::{max_locks_warn_value, MIN_MAX_LOCKS_PER_TRANSACTION};
1982
1983 #[test]
1984 fn warns_when_below_threshold() {
1985 assert_eq!(max_locks_warn_value("64"), Some(64));
1986 assert_eq!(
1987 max_locks_warn_value(&(MIN_MAX_LOCKS_PER_TRANSACTION - 1).to_string()),
1988 Some(MIN_MAX_LOCKS_PER_TRANSACTION - 1)
1989 );
1990 }
1991
1992 #[test]
1993 fn silent_at_or_above_threshold() {
1994 assert_eq!(
1995 max_locks_warn_value(&MIN_MAX_LOCKS_PER_TRANSACTION.to_string()),
1996 None
1997 );
1998 assert_eq!(max_locks_warn_value("1024"), None);
1999 }
2000
2001 #[test]
2002 fn silent_for_unparseable_raw() {
2003 assert_eq!(max_locks_warn_value("not-a-number"), None);
2004 }
2005}
2006
2007#[cfg(test)]
2008mod partition_index_tests {
2009 use super::partition_index_to_i16;
2010 use ff_core::engine_error::{EngineError, ValidationKind};
2011 use ff_core::partition::{Partition, PartitionFamily};
2012
2013 #[test]
2014 fn accepts_values_within_i16_range() {
2015 let p = Partition { family: PartitionFamily::Flow, index: 0 };
2016 assert_eq!(partition_index_to_i16(p).unwrap(), 0);
2017
2018 let p = Partition { family: PartitionFamily::Flow, index: 255 };
2019 assert_eq!(partition_index_to_i16(p).unwrap(), 255);
2020
2021 let p = Partition { family: PartitionFamily::Budget, index: i16::MAX as u16 };
2022 assert_eq!(partition_index_to_i16(p).unwrap(), i16::MAX);
2023 }
2024
2025 #[test]
2026 fn rejects_overflow_above_i16_max() {
2027 let p = Partition { family: PartitionFamily::Flow, index: (i16::MAX as u16) + 1 };
2028 let err = partition_index_to_i16(p).unwrap_err();
2029 match err {
2030 EngineError::Validation { kind, detail } => {
2031 assert_eq!(kind, ValidationKind::InvalidInput);
2032 assert!(detail.contains("exceeds i16 range"), "unexpected detail: {detail}");
2033 }
2034 other => panic!("expected Validation error, got {other:?}"),
2035 }
2036 }
2037
2038 #[test]
2039 fn rejects_u16_max() {
2040 let p = Partition { family: PartitionFamily::Quota, index: u16::MAX };
2041 assert!(matches!(
2042 partition_index_to_i16(p),
2043 Err(EngineError::Validation { kind: ValidationKind::InvalidInput, .. })
2044 ));
2045 }
2046}