#![allow(clippy::result_large_err)]
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use ff_core::backend::{
AppendFrameOutcome, BackendConfig, CancelFlowPolicy, CancelFlowWait, CapabilitySet,
ClaimPolicy, FailOutcome, FailureClass, FailureReason, Frame, Handle, LeaseRenewal,
PendingWaitpoint, ReclaimToken, ResumeSignal, SummaryDocument, TailVisibility,
UsageDimensions,
};
#[cfg(feature = "core")]
use ff_core::contracts::{
AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
ApplyDependencyToChildResult, ClaimResumedExecutionArgs, ClaimResumedExecutionResult,
CreateExecutionArgs, CreateExecutionResult, CreateFlowArgs, CreateFlowResult,
DeliverSignalArgs, DeliverSignalResult, EdgeDependencyPolicy, EdgeDirection, EdgeSnapshot,
ListExecutionsPage, ListFlowsPage, ListLanesPage, ListPendingWaitpointsArgs,
ListPendingWaitpointsResult, ListSuspendedPage, SetEdgeGroupPolicyResult,
StageDependencyEdgeArgs, StageDependencyEdgeResult,
};
#[cfg(feature = "core")]
use ff_core::state::PublicState;
use ff_core::contracts::{
CancelFlowResult, ExecutionSnapshot, FlowSnapshot, ReportUsageResult,
RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretAllResult, SeedOutcome,
SeedWaitpointHmacSecretArgs, SuspendArgs, SuspendOutcome,
};
#[cfg(feature = "core")]
use ff_core::contracts::ExecutionInfo;
#[cfg(feature = "core")]
use ff_core::contracts::{
CancelExecutionArgs, CancelExecutionResult, RevokeLeaseArgs, RevokeLeaseResult,
};
#[cfg(feature = "core")]
use ff_core::contracts::{
CancelFlowArgs, CancelFlowHeader, ChangePriorityArgs, ChangePriorityResult,
ReplayExecutionArgs, ReplayExecutionResult,
};
#[cfg(feature = "core")]
use ff_core::contracts::{
BudgetStatus, CreateBudgetArgs, CreateBudgetResult, CreateQuotaPolicyArgs,
CreateQuotaPolicyResult, ReportUsageAdminArgs, ResetBudgetArgs, ResetBudgetResult,
};
#[cfg(feature = "streaming")]
use ff_core::contracts::{StreamCursor, StreamFrames};
use ff_core::engine_backend::EngineBackend;
use ff_core::engine_error::EngineError;
#[cfg(feature = "core")]
use ff_core::partition::PartitionKey;
use ff_core::partition::PartitionConfig;
#[cfg(feature = "streaming")]
use ff_core::types::AttemptIndex;
#[cfg(feature = "core")]
use ff_core::types::EdgeId;
use ff_core::types::{BudgetId, ExecutionId, FlowId, LaneId, LeaseFence, TimestampMs};
pub use sqlx::PgPool;
#[cfg(feature = "core")]
mod admin;
pub mod attempt;
pub mod budget;
pub mod completion;
#[cfg(feature = "core")]
pub mod dispatch;
pub mod error;
pub mod exec_core;
pub mod flow;
#[cfg(feature = "core")]
pub mod flow_staging;
pub mod handle_codec;
mod lease_event;
mod lease_event_subscribe;
pub mod listener;
pub mod migrate;
#[cfg(feature = "core")]
pub mod operator;
#[cfg(feature = "core")]
mod operator_event;
pub mod pool;
#[cfg(feature = "core")]
pub mod reconcilers;
#[cfg(feature = "core")]
pub mod scanner_supervisor;
#[cfg(feature = "core")]
pub mod scheduler;
pub mod signal;
mod signal_delivery_subscribe;
mod signal_event;
#[cfg(feature = "streaming")]
pub mod stream;
pub mod suspend;
pub mod suspend_ops;
pub mod version;
pub use completion::{PostgresCompletionStream, COMPLETION_CHANNEL};
pub use error::{map_sqlx_error, PostgresTransportError};
pub use listener::StreamNotifier;
pub use migrate::{apply_migrations, MigrationError};
#[cfg(feature = "core")]
pub use scanner_supervisor::{PostgresScannerConfig, PostgresScannerHandle};
pub use version::check_schema_version;
pub use ff_core::backend::PostgresConnection;
fn postgres_supports_base() -> ff_core::capability::Supports {
let mut s = ff_core::capability::Supports::none();
s.cancel_flow_wait_timeout = true;
s.cancel_flow_wait_indefinite = true;
s.rotate_waitpoint_hmac_secret_all = true;
s.seed_waitpoint_hmac_secret = true;
s.claim_for_worker = true;
s.subscribe_lease_history = true;
s.subscribe_completion = true;
s.subscribe_signal_delivery = true;
s.subscribe_instance_tags = false;
s.stream_durable_summary = true;
s.stream_best_effort_live = true;
s.prepare = true;
s.cancel_execution = true;
s.change_priority = true;
s.replay_execution = true;
s.revoke_lease = true;
s.read_execution_state = true;
s.read_execution_info = true;
s.get_execution_result = true;
s.budget_admin = true;
s.quota_admin = true;
s.list_pending_waitpoints = true;
s.cancel_flow_header = true;
s.ack_cancel_member = true;
s
}
pub struct PostgresBackend {
#[allow(dead_code)] pool: PgPool,
#[allow(dead_code)]
partition_config: PartitionConfig,
#[allow(dead_code)]
metrics: Option<Arc<ff_observability::Metrics>>,
#[allow(dead_code)]
stream_notifier: Option<Arc<StreamNotifier>>,
#[cfg(feature = "core")]
scanner_handle: Option<Arc<scanner_supervisor::PostgresScannerHandle>>,
}
impl PostgresBackend {
pub async fn connect(config: BackendConfig) -> Result<Arc<dyn EngineBackend>, EngineError> {
let pool = pool::build_pool(&config).await?;
warn_if_max_locks_low(&pool).await;
let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
let backend = Self {
pool,
partition_config: PartitionConfig::default(),
metrics: None,
stream_notifier,
#[cfg(feature = "core")]
scanner_handle: None,
};
Ok(Arc::new(backend))
}
pub fn from_pool(pool: PgPool, partition_config: PartitionConfig) -> Arc<Self> {
let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
Arc::new(Self {
pool,
partition_config,
metrics: None,
stream_notifier,
#[cfg(feature = "core")]
scanner_handle: None,
})
}
pub async fn connect_with_metrics(
config: BackendConfig,
partition_config: PartitionConfig,
metrics: Arc<ff_observability::Metrics>,
) -> Result<Arc<Self>, EngineError> {
let pool = pool::build_pool(&config).await?;
warn_if_max_locks_low(&pool).await;
let stream_notifier = Some(StreamNotifier::spawn(pool.clone()));
Ok(Arc::new(Self {
pool,
partition_config,
metrics: Some(metrics),
stream_notifier,
#[cfg(feature = "core")]
scanner_handle: None,
}))
}
#[cfg(feature = "core")]
pub fn with_scanners(
self: &mut Arc<Self>,
cfg: scanner_supervisor::PostgresScannerConfig,
) -> bool {
let Some(inner) = Arc::get_mut(self) else {
return false;
};
let handle = scanner_supervisor::spawn_scanners(inner.pool.clone(), cfg);
inner.scanner_handle = Some(Arc::new(handle));
true
}
pub fn pool(&self) -> &PgPool {
&self.pool
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.create_execution", skip_all)]
pub async fn create_execution(
&self,
args: CreateExecutionArgs,
) -> Result<ExecutionId, EngineError> {
exec_core::create_execution_impl(&self.pool, &self.partition_config, args).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.create_flow.inherent", skip_all)]
pub async fn create_flow(
&self,
args: &CreateFlowArgs,
) -> Result<CreateFlowResult, EngineError> {
flow_staging::create_flow(&self.pool, &self.partition_config, args).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.add_execution_to_flow.inherent", skip_all)]
pub async fn add_execution_to_flow(
&self,
args: &AddExecutionToFlowArgs,
) -> Result<AddExecutionToFlowResult, EngineError> {
flow_staging::add_execution_to_flow(&self.pool, &self.partition_config, args).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.stage_dependency_edge.inherent", skip_all)]
pub async fn stage_dependency_edge(
&self,
args: &StageDependencyEdgeArgs,
) -> Result<StageDependencyEdgeResult, EngineError> {
flow_staging::stage_dependency_edge(&self.pool, &self.partition_config, args).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.apply_dependency_to_child.inherent", skip_all)]
pub async fn apply_dependency_to_child(
&self,
args: &ApplyDependencyToChildArgs,
) -> Result<ApplyDependencyToChildResult, EngineError> {
flow_staging::apply_dependency_to_child(&self.pool, &self.partition_config, args).await
}
}
#[inline]
fn unavailable<T>(op: &'static str) -> Result<T, EngineError> {
Err(EngineError::Unavailable { op })
}
#[async_trait]
impl EngineBackend for PostgresBackend {
#[tracing::instrument(name = "pg.claim", skip_all)]
async fn claim(
&self,
lane: &LaneId,
capabilities: &CapabilitySet,
policy: ClaimPolicy,
) -> Result<Option<Handle>, EngineError> {
attempt::claim(&self.pool, lane, capabilities, &policy).await
}
#[tracing::instrument(name = "pg.renew", skip_all)]
async fn renew(&self, handle: &Handle) -> Result<LeaseRenewal, EngineError> {
attempt::renew(&self.pool, handle).await
}
#[tracing::instrument(name = "pg.progress", skip_all)]
async fn progress(
&self,
handle: &Handle,
percent: Option<u8>,
message: Option<String>,
) -> Result<(), EngineError> {
attempt::progress(&self.pool, handle, percent, message).await
}
#[tracing::instrument(name = "pg.append_frame", skip_all)]
async fn append_frame(
&self,
handle: &Handle,
frame: Frame,
) -> Result<AppendFrameOutcome, EngineError> {
#[cfg(feature = "streaming")]
{
stream::append_frame(&self.pool, &self.partition_config, handle, frame).await
}
#[cfg(not(feature = "streaming"))]
{
let _ = (handle, frame);
unavailable("pg.append_frame")
}
}
#[tracing::instrument(name = "pg.complete", skip_all)]
async fn complete(
&self,
handle: &Handle,
payload: Option<Vec<u8>>,
) -> Result<(), EngineError> {
attempt::complete(&self.pool, handle, payload).await
}
#[tracing::instrument(name = "pg.fail", skip_all)]
async fn fail(
&self,
handle: &Handle,
reason: FailureReason,
classification: FailureClass,
) -> Result<FailOutcome, EngineError> {
attempt::fail(&self.pool, handle, reason, classification).await
}
#[tracing::instrument(name = "pg.cancel", skip_all)]
async fn cancel(&self, handle: &Handle, reason: &str) -> Result<(), EngineError> {
let payload = handle_codec::decode_handle(handle)?;
exec_core::cancel_impl(
&self.pool,
&self.partition_config,
&payload.execution_id,
reason,
)
.await
}
#[tracing::instrument(name = "pg.suspend", skip_all)]
async fn suspend(
&self,
handle: &Handle,
args: SuspendArgs,
) -> Result<SuspendOutcome, EngineError> {
suspend_ops::suspend_impl(&self.pool, &self.partition_config, handle, args).await
}
#[tracing::instrument(name = "pg.suspend_by_triple", skip_all)]
async fn suspend_by_triple(
&self,
exec_id: ExecutionId,
triple: LeaseFence,
args: SuspendArgs,
) -> Result<SuspendOutcome, EngineError> {
suspend_ops::suspend_by_triple_impl(
&self.pool,
&self.partition_config,
exec_id,
triple,
args,
)
.await
}
#[tracing::instrument(name = "pg.create_waitpoint", skip_all)]
async fn create_waitpoint(
&self,
_handle: &Handle,
_waitpoint_key: &str,
_expires_in: Duration,
) -> Result<PendingWaitpoint, EngineError> {
unavailable("pg.create_waitpoint")
}
#[tracing::instrument(name = "pg.observe_signals", skip_all)]
async fn observe_signals(
&self,
handle: &Handle,
) -> Result<Vec<ResumeSignal>, EngineError> {
suspend_ops::observe_signals_impl(&self.pool, handle).await
}
#[tracing::instrument(name = "pg.claim_from_reclaim", skip_all)]
async fn claim_from_reclaim(
&self,
token: ReclaimToken,
) -> Result<Option<Handle>, EngineError> {
attempt::claim_from_reclaim(&self.pool, token).await
}
#[tracing::instrument(name = "pg.delay", skip_all)]
async fn delay(
&self,
handle: &Handle,
delay_until: TimestampMs,
) -> Result<(), EngineError> {
attempt::delay(&self.pool, handle, delay_until).await
}
#[tracing::instrument(name = "pg.wait_children", skip_all)]
async fn wait_children(&self, handle: &Handle) -> Result<(), EngineError> {
attempt::wait_children(&self.pool, handle).await
}
#[tracing::instrument(name = "pg.describe_execution", skip_all)]
async fn describe_execution(
&self,
id: &ExecutionId,
) -> Result<Option<ExecutionSnapshot>, EngineError> {
exec_core::describe_execution_impl(&self.pool, &self.partition_config, id).await
}
#[tracing::instrument(name = "pg.describe_flow", skip_all)]
async fn describe_flow(
&self,
id: &FlowId,
) -> Result<Option<FlowSnapshot>, EngineError> {
flow::describe_flow(&self.pool, &self.partition_config, id).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.list_edges", skip_all)]
async fn list_edges(
&self,
flow_id: &FlowId,
direction: EdgeDirection,
) -> Result<Vec<EdgeSnapshot>, EngineError> {
flow::list_edges(&self.pool, &self.partition_config, flow_id, direction).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.describe_edge", skip_all)]
async fn describe_edge(
&self,
flow_id: &FlowId,
edge_id: &EdgeId,
) -> Result<Option<EdgeSnapshot>, EngineError> {
flow::describe_edge(&self.pool, &self.partition_config, flow_id, edge_id).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.resolve_execution_flow_id", skip_all)]
async fn resolve_execution_flow_id(
&self,
eid: &ExecutionId,
) -> Result<Option<FlowId>, EngineError> {
exec_core::resolve_execution_flow_id_impl(&self.pool, &self.partition_config, eid).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.list_flows", skip_all)]
async fn list_flows(
&self,
partition: PartitionKey,
cursor: Option<FlowId>,
limit: usize,
) -> Result<ListFlowsPage, EngineError> {
flow::list_flows(&self.pool, partition, cursor, limit).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.list_lanes", skip_all)]
async fn list_lanes(
&self,
cursor: Option<LaneId>,
limit: usize,
) -> Result<ListLanesPage, EngineError> {
admin::list_lanes_impl(&self.pool, cursor, limit).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.list_suspended", skip_all)]
async fn list_suspended(
&self,
partition: PartitionKey,
cursor: Option<ExecutionId>,
limit: usize,
) -> Result<ListSuspendedPage, EngineError> {
admin::list_suspended_impl(&self.pool, partition, cursor, limit).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.list_executions", skip_all)]
async fn list_executions(
&self,
partition: PartitionKey,
cursor: Option<ExecutionId>,
limit: usize,
) -> Result<ListExecutionsPage, EngineError> {
exec_core::list_executions_impl(
&self.pool,
&self.partition_config,
partition,
cursor,
limit,
)
.await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.deliver_signal", skip_all)]
async fn deliver_signal(
&self,
args: DeliverSignalArgs,
) -> Result<DeliverSignalResult, EngineError> {
suspend_ops::deliver_signal_impl(&self.pool, &self.partition_config, args).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.claim_resumed_execution", skip_all)]
async fn claim_resumed_execution(
&self,
args: ClaimResumedExecutionArgs,
) -> Result<ClaimResumedExecutionResult, EngineError> {
suspend_ops::claim_resumed_execution_impl(&self.pool, &self.partition_config, args).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.read_execution_state", skip_all)]
async fn read_execution_state(
&self,
id: &ExecutionId,
) -> Result<Option<PublicState>, EngineError> {
exec_core::read_execution_state_impl(&self.pool, &self.partition_config, id).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.read_execution_info", skip_all)]
async fn read_execution_info(
&self,
id: &ExecutionId,
) -> Result<Option<ExecutionInfo>, EngineError> {
exec_core::read_execution_info_impl(&self.pool, &self.partition_config, id).await
}
#[tracing::instrument(name = "pg.get_execution_result", skip_all)]
async fn get_execution_result(
&self,
id: &ExecutionId,
) -> Result<Option<Vec<u8>>, EngineError> {
exec_core::get_execution_result_impl(&self.pool, &self.partition_config, id).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.list_pending_waitpoints", skip_all)]
async fn list_pending_waitpoints(
&self,
args: ListPendingWaitpointsArgs,
) -> Result<ListPendingWaitpointsResult, EngineError> {
suspend_ops::list_pending_waitpoints_impl(&self.pool, args).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.cancel_execution", skip_all)]
async fn cancel_execution(
&self,
args: CancelExecutionArgs,
) -> Result<CancelExecutionResult, EngineError> {
operator::cancel_execution_impl(&self.pool, args).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.revoke_lease", skip_all)]
async fn revoke_lease(
&self,
args: RevokeLeaseArgs,
) -> Result<RevokeLeaseResult, EngineError> {
operator::revoke_lease_impl(&self.pool, args).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.change_priority", skip_all)]
async fn change_priority(
&self,
args: ChangePriorityArgs,
) -> Result<ChangePriorityResult, EngineError> {
operator::change_priority_impl(&self.pool, args).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.replay_execution", skip_all)]
async fn replay_execution(
&self,
args: ReplayExecutionArgs,
) -> Result<ReplayExecutionResult, EngineError> {
operator::replay_execution_impl(&self.pool, args).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.cancel_flow_header", skip_all)]
async fn cancel_flow_header(
&self,
args: CancelFlowArgs,
) -> Result<CancelFlowHeader, EngineError> {
operator::cancel_flow_header_impl(&self.pool, &self.partition_config, args).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.ack_cancel_member", skip_all)]
async fn ack_cancel_member(
&self,
flow_id: &FlowId,
execution_id: &ExecutionId,
) -> Result<(), EngineError> {
operator::ack_cancel_member_impl(
&self.pool,
&self.partition_config,
flow_id.clone(),
execution_id.clone(),
)
.await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.create_execution.trait", skip_all)]
async fn create_execution(
&self,
args: CreateExecutionArgs,
) -> Result<CreateExecutionResult, EngineError> {
let eid = args.execution_id.clone();
exec_core::create_execution_impl(&self.pool, &self.partition_config, args).await?;
Ok(CreateExecutionResult::Created {
execution_id: eid,
public_state: PublicState::Waiting,
})
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.create_flow", skip_all)]
async fn create_flow(
&self,
args: CreateFlowArgs,
) -> Result<CreateFlowResult, EngineError> {
flow_staging::create_flow(&self.pool, &self.partition_config, &args).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.add_execution_to_flow", skip_all)]
async fn add_execution_to_flow(
&self,
args: AddExecutionToFlowArgs,
) -> Result<AddExecutionToFlowResult, EngineError> {
flow_staging::add_execution_to_flow(&self.pool, &self.partition_config, &args).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.stage_dependency_edge", skip_all)]
async fn stage_dependency_edge(
&self,
args: StageDependencyEdgeArgs,
) -> Result<StageDependencyEdgeResult, EngineError> {
flow_staging::stage_dependency_edge(&self.pool, &self.partition_config, &args).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.apply_dependency_to_child", skip_all)]
async fn apply_dependency_to_child(
&self,
args: ApplyDependencyToChildArgs,
) -> Result<ApplyDependencyToChildResult, EngineError> {
flow_staging::apply_dependency_to_child(&self.pool, &self.partition_config, &args).await
}
fn backend_label(&self) -> &'static str {
"postgres"
}
fn capabilities(&self) -> ff_core::capability::Capabilities {
ff_core::capability::Capabilities::new(
ff_core::capability::BackendIdentity::new(
"postgres",
ff_core::capability::Version::new(0, 11, 0),
"E-shipped",
),
postgres_supports_base(),
)
}
async fn prepare(
&self,
) -> Result<ff_core::backend::PrepareOutcome, EngineError> {
Ok(ff_core::backend::PrepareOutcome::NoOp)
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.claim_for_worker", skip_all)]
async fn claim_for_worker(
&self,
args: ff_core::contracts::ClaimForWorkerArgs,
) -> Result<ff_core::contracts::ClaimForWorkerOutcome, EngineError> {
let sched = scheduler::PostgresScheduler::new(self.pool.clone());
let grant_opt = sched
.claim_for_worker(
&args.lane_id,
&args.worker_id,
&args.worker_instance_id,
&args.worker_capabilities,
args.grant_ttl_ms,
)
.await?;
Ok(match grant_opt {
Some(g) => ff_core::contracts::ClaimForWorkerOutcome::granted(g),
None => ff_core::contracts::ClaimForWorkerOutcome::no_work(),
})
}
async fn ping(&self) -> Result<(), EngineError> {
let _ = sqlx::query_scalar::<_, i32>("SELECT 1")
.fetch_one(&self.pool)
.await
.map_err(error::map_sqlx_error)?;
Ok(())
}
async fn shutdown_prepare(&self, grace: Duration) -> Result<(), EngineError> {
#[cfg(feature = "core")]
if let Some(handle) = self.scanner_handle.as_ref() {
let timed_out = handle.shutdown(grace).await;
if timed_out > 0 {
tracing::warn!(
timed_out,
?grace,
"postgres scanner supervisor exceeded grace on shutdown"
);
}
}
Ok(())
}
#[tracing::instrument(name = "pg.cancel_flow", skip_all)]
async fn cancel_flow(
&self,
id: &FlowId,
policy: CancelFlowPolicy,
wait: CancelFlowWait,
) -> Result<CancelFlowResult, EngineError> {
let result = flow::cancel_flow(&self.pool, &self.partition_config, id, policy).await?;
if let Some(deadline) = ff_core::engine_backend::cancel_flow_wait_deadline(wait) {
ff_core::engine_backend::wait_for_flow_cancellation(self, id, deadline).await?;
}
Ok(result)
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.set_edge_group_policy", skip_all)]
async fn set_edge_group_policy(
&self,
flow_id: &FlowId,
downstream_execution_id: &ExecutionId,
policy: EdgeDependencyPolicy,
) -> Result<SetEdgeGroupPolicyResult, EngineError> {
flow::set_edge_group_policy(
&self.pool,
&self.partition_config,
flow_id,
downstream_execution_id,
policy,
)
.await
}
#[tracing::instrument(name = "pg.report_usage", skip_all)]
async fn report_usage(
&self,
_handle: &Handle,
budget: &BudgetId,
dimensions: UsageDimensions,
) -> Result<ReportUsageResult, EngineError> {
budget::report_usage_impl(&self.pool, &self.partition_config, budget, dimensions).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.create_budget", skip_all)]
async fn create_budget(
&self,
args: CreateBudgetArgs,
) -> Result<CreateBudgetResult, EngineError> {
budget::create_budget_impl(&self.pool, &self.partition_config, args).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.reset_budget", skip_all)]
async fn reset_budget(
&self,
args: ResetBudgetArgs,
) -> Result<ResetBudgetResult, EngineError> {
budget::reset_budget_impl(&self.pool, &self.partition_config, args).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.create_quota_policy", skip_all)]
async fn create_quota_policy(
&self,
args: CreateQuotaPolicyArgs,
) -> Result<CreateQuotaPolicyResult, EngineError> {
budget::create_quota_policy_impl(&self.pool, &self.partition_config, args).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.get_budget_status", skip_all)]
async fn get_budget_status(
&self,
id: &BudgetId,
) -> Result<BudgetStatus, EngineError> {
budget::get_budget_status_impl(&self.pool, &self.partition_config, id).await
}
#[cfg(feature = "core")]
#[tracing::instrument(name = "pg.report_usage_admin", skip_all)]
async fn report_usage_admin(
&self,
budget_id: &BudgetId,
args: ReportUsageAdminArgs,
) -> Result<ReportUsageResult, EngineError> {
budget::report_usage_admin_impl(&self.pool, &self.partition_config, budget_id, args).await
}
#[tracing::instrument(name = "pg.rotate_waitpoint_hmac_secret_all", skip_all)]
async fn rotate_waitpoint_hmac_secret_all(
&self,
args: RotateWaitpointHmacSecretAllArgs,
) -> Result<RotateWaitpointHmacSecretAllResult, EngineError> {
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0);
signal::rotate_waitpoint_hmac_secret_all_impl(&self.pool, args, now_ms).await
}
#[tracing::instrument(name = "pg.seed_waitpoint_hmac_secret", skip_all)]
async fn seed_waitpoint_hmac_secret(
&self,
args: SeedWaitpointHmacSecretArgs,
) -> Result<SeedOutcome, EngineError> {
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0);
signal::seed_waitpoint_hmac_secret_impl(&self.pool, args, now_ms).await
}
#[cfg(feature = "streaming")]
#[tracing::instrument(name = "pg.read_stream", skip_all)]
async fn read_stream(
&self,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
from: StreamCursor,
to: StreamCursor,
count_limit: u64,
) -> Result<StreamFrames, EngineError> {
stream::read_stream(&self.pool, execution_id, attempt_index, from, to, count_limit).await
}
#[cfg(feature = "streaming")]
#[tracing::instrument(name = "pg.tail_stream", skip_all)]
async fn tail_stream(
&self,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
after: StreamCursor,
block_ms: u64,
count_limit: u64,
visibility: TailVisibility,
) -> Result<StreamFrames, EngineError> {
let notifier = self
.stream_notifier
.as_ref()
.ok_or(EngineError::Unavailable {
op: "pg.tail_stream (notifier not initialised)",
})?;
stream::tail_stream(
&self.pool,
notifier,
execution_id,
attempt_index,
after,
block_ms,
count_limit,
visibility,
)
.await
}
#[cfg(feature = "streaming")]
#[tracing::instrument(name = "pg.read_summary", skip_all)]
async fn read_summary(
&self,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
) -> Result<Option<SummaryDocument>, EngineError> {
stream::read_summary(&self.pool, execution_id, attempt_index).await
}
#[tracing::instrument(name = "pg.subscribe_completion", skip_all)]
async fn subscribe_completion(
&self,
_cursor: ff_core::stream_subscribe::StreamCursor,
filter: &ff_core::backend::ScannerFilter,
) -> Result<ff_core::stream_events::CompletionSubscription, EngineError> {
use ff_core::stream_events::{CompletionEvent, CompletionOutcome};
use ff_core::stream_subscribe::encode_postgres_event_cursor;
use futures_core::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
let inner = if filter.is_noop() {
ff_core::completion_backend::CompletionBackend::subscribe_completions(self).await?
} else {
ff_core::completion_backend::CompletionBackend::subscribe_completions_filtered(
self, filter,
)
.await?
};
struct Adapter {
inner: ff_core::completion_backend::CompletionStream,
}
impl Stream for Adapter {
type Item = Result<CompletionEvent, EngineError>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.inner).poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(payload)) => {
let cursor = encode_postgres_event_cursor(0);
let event = CompletionEvent::new(
cursor,
payload.execution_id.clone(),
CompletionOutcome::from_wire(&payload.outcome),
payload.produced_at_ms,
);
Poll::Ready(Some(Ok(event)))
}
}
}
}
Ok(Box::pin(Adapter { inner }))
}
#[tracing::instrument(name = "pg.subscribe_lease_history", skip_all)]
async fn subscribe_lease_history(
&self,
cursor: ff_core::stream_subscribe::StreamCursor,
filter: &ff_core::backend::ScannerFilter,
) -> Result<ff_core::stream_events::LeaseHistorySubscription, EngineError> {
lease_event_subscribe::subscribe(&self.pool, 0, cursor, filter.clone()).await
}
#[tracing::instrument(name = "pg.subscribe_signal_delivery", skip_all)]
async fn subscribe_signal_delivery(
&self,
cursor: ff_core::stream_subscribe::StreamCursor,
filter: &ff_core::backend::ScannerFilter,
) -> Result<ff_core::stream_events::SignalDeliverySubscription, EngineError> {
signal_delivery_subscribe::subscribe(&self.pool, 0, cursor, filter.clone()).await
}
}
const MIN_MAX_LOCKS_PER_TRANSACTION: i64 = 256;
async fn warn_if_max_locks_low(pool: &PgPool) {
let row: Result<(String,), sqlx::Error> =
sqlx::query_as("SHOW max_locks_per_transaction")
.fetch_one(pool)
.await;
match row {
Ok((raw,)) => emit_max_locks_decision(&raw),
Err(e) => {
tracing::debug!("failed to probe max_locks_per_transaction: {e}");
}
}
}
fn max_locks_warn_value(raw: &str) -> Option<i64> {
match raw.parse::<i64>() {
Ok(v) if v < MIN_MAX_LOCKS_PER_TRANSACTION => Some(v),
Ok(_) => None,
Err(e) => {
tracing::debug!(raw, "failed to parse max_locks_per_transaction: {e}");
None
}
}
}
fn emit_max_locks_decision(raw: &str) {
if let Some(v) = max_locks_warn_value(raw) {
tracing::warn!(
current = v,
recommended = MIN_MAX_LOCKS_PER_TRANSACTION,
"postgres max_locks_per_transaction={v} is below the recommended \
minimum ({MIN_MAX_LOCKS_PER_TRANSACTION}); partition-heavy workloads \
may hit 'out of shared memory' under concurrent load. \
See docs/operator-guide-postgres.md."
);
}
}
#[cfg(test)]
mod max_locks_tests {
use super::{max_locks_warn_value, MIN_MAX_LOCKS_PER_TRANSACTION};
#[test]
fn warns_when_below_threshold() {
assert_eq!(max_locks_warn_value("64"), Some(64));
assert_eq!(
max_locks_warn_value(&(MIN_MAX_LOCKS_PER_TRANSACTION - 1).to_string()),
Some(MIN_MAX_LOCKS_PER_TRANSACTION - 1)
);
}
#[test]
fn silent_at_or_above_threshold() {
assert_eq!(
max_locks_warn_value(&MIN_MAX_LOCKS_PER_TRANSACTION.to_string()),
None
);
assert_eq!(max_locks_warn_value("1024"), None);
}
#[test]
fn silent_for_unparseable_raw() {
assert_eq!(max_locks_warn_value("not-a-number"), None);
}
}