Skip to main content

PostgresOrchestrator

Struct PostgresOrchestrator 

Source
pub struct PostgresOrchestrator { /* private fields */ }
Expand description

PostgreSQL-backed orchestrator implementation.

Implementations§

Trait Implementations§

Source§

impl OrchestratorBlocking for PostgresOrchestrator

Source§

fn set_waiting_for<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, waiter: &'life1 InvocationId, waited_on: &'life2 InvocationId, ) -> Pin<Box<dyn Future<Output = RustvelloResult<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Mark that waiter is waiting for waited_on to complete.
Source§

fn get_waiters<'life0, 'life1, 'async_trait>( &'life0 self, waited_on: &'life1 InvocationId, ) -> Pin<Box<dyn Future<Output = RustvelloResult<Vec<InvocationId>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Get invocations that are waiting on the given invocation.
Source§

fn release_waiters<'life0, 'life1, 'async_trait>( &'life0 self, completed: &'life1 InvocationId, ) -> Pin<Box<dyn Future<Output = RustvelloResult<Vec<InvocationId>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Release all invocations waiting on the given completed invocation.
Source§

impl OrchestratorConcurrency for PostgresOrchestrator

Source§

fn check_running_concurrency<'life0, 'life1, 'life2, 'life3, 'async_trait>( &'life0 self, task_id: &'life1 TaskId, task_config: &'life2 TaskConfig, cc_args: Option<&'life3 SerializedArguments>, ) -> Pin<Box<dyn Future<Output = RustvelloResult<bool>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait,

Note: This check-and-decide pattern is inherently subject to TOCTOU races in multi-node PostgreSQL deployments. Two concurrent callers may both read the same count and both admit a new invocation, briefly exceeding the concurrency limit. An advisory lock or INSERT … WHERE (SELECT COUNT …) < limit would be needed for strict enforcement, which is a trait-level design change.

Source§

fn try_acquire_concurrency_slot<'life0, 'life1, 'life2, 'life3, 'life4, 'async_trait>( &'life0 self, invocation_id: &'life1 InvocationId, task_id: &'life2 TaskId, task_config: &'life3 TaskConfig, cc_args: Option<&'life4 SerializedArguments>, ) -> Pin<Box<dyn Future<Output = RustvelloResult<bool>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait, 'life4: 'async_trait,

Atomic check-and-index via a single INSERT … SELECT … WHERE count < limit.

For per-pair CC, indexes each arg pair atomically. Checks all pairs collectively (GROUP BY/HAVING intersection) before allowing the insert.

Source§

fn index_for_concurrency_control<'life0, 'life1, 'life2, 'life3, 'async_trait>( &'life0 self, invocation_id: &'life1 InvocationId, task_id: &'life2 TaskId, cc_args: Option<&'life3 SerializedArguments>, ) -> Pin<Box<dyn Future<Output = RustvelloResult<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait,

Index an invocation’s arguments for concurrency control tracking. Read more
Source§

fn remove_from_concurrency_index<'life0, 'life1, 'async_trait>( &'life0 self, invocation_id: &'life1 InvocationId, ) -> Pin<Box<dyn Future<Output = RustvelloResult<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Remove an invocation from the concurrency control index. Read more
Source§

impl OrchestratorQuery for PostgresOrchestrator

Source§

fn get_invocations_by_task<'life0, 'life1, 'async_trait>( &'life0 self, task_id: &'life1 TaskId, ) -> Pin<Box<dyn Future<Output = RustvelloResult<Vec<InvocationId>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Get all invocation IDs for a given task.
Source§

fn get_invocations_by_call<'life0, 'life1, 'async_trait>( &'life0 self, call_id: &'life1 CallId, ) -> Pin<Box<dyn Future<Output = RustvelloResult<Vec<InvocationId>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Get all invocation IDs for a given call.
Source§

fn get_invocations_by_status<'life0, 'life1, 'async_trait>( &'life0 self, status: InvocationStatus, task_id: Option<&'life1 TaskId>, ) -> Pin<Box<dyn Future<Output = RustvelloResult<Vec<InvocationId>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Get invocations with a specific status, optionally filtered by task.
Source§

fn count_invocations<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, task_id: Option<&'life1 TaskId>, statuses: Option<&'life2 [InvocationStatus]>, ) -> Pin<Box<dyn Future<Output = RustvelloResult<usize>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Count invocations, optionally filtered by task and/or statuses. Read more
Source§

fn get_invocation_ids_paginated<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, task_id: Option<&'life1 TaskId>, statuses: Option<&'life2 [InvocationStatus]>, limit: usize, offset: usize, ) -> Pin<Box<dyn Future<Output = RustvelloResult<Vec<InvocationId>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Get paginated invocation IDs, optionally filtered by task and statuses. Read more
Source§

fn get_blocking_invocations<'life0, 'async_trait>( &'life0 self, max_num: usize, ) -> Pin<Box<dyn Future<Output = RustvelloResult<Vec<InvocationId>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Get invocations that are blocking other invocations but are not blocked themselves. Read more
Source§

fn get_existing_invocations<'life0, 'life1, 'life2, 'life3, 'async_trait>( &'life0 self, task_id: &'life1 TaskId, cc_args: Option<&'life2 SerializedArguments>, statuses: &'life3 [InvocationStatus], ) -> Pin<Box<dyn Future<Output = RustvelloResult<Vec<InvocationId>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait,

Find existing invocations for a task, filtered by concurrency-control key arguments and statuses. Read more
Source§

fn filter_by_status<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, invocation_ids: &'life1 [InvocationId], statuses: &'life2 [InvocationStatus], ) -> Pin<Box<dyn Future<Output = Result<Vec<InvocationId>, RustvelloError>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, Self: Sync + 'async_trait,

Filter a set of invocation IDs by status. Read more
Source§

impl OrchestratorRecovery for PostgresOrchestrator

Source§

fn register_heartbeat<'life0, 'life1, 'async_trait>( &'life0 self, runner_id: &'life1 RunnerId, _can_run_atomic_service: bool, ) -> Pin<Box<dyn Future<Output = RustvelloResult<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Register a heartbeat for a runner, indicating it is still alive. Read more
Source§

fn get_stale_pending_invocations<'life0, 'async_trait>( &'life0 self, max_pending_seconds: u64, ) -> Pin<Box<dyn Future<Output = RustvelloResult<Vec<InvocationId>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Get invocations stuck in Pending beyond the configured threshold. Read more
Source§

fn get_stale_running_invocations<'life0, 'async_trait>( &'life0 self, runner_dead_after_seconds: u64, ) -> Pin<Box<dyn Future<Output = RustvelloResult<Vec<InvocationId>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Get Running invocations owned by runners that haven’t sent a heartbeat within runner_dead_after_seconds.
Source§

fn get_active_runner_ids<'life0, 'async_trait>( &'life0 self, timeout_seconds: u64, ) -> Pin<Box<dyn Future<Output = RustvelloResult<Vec<RunnerId>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Get the IDs of all runners that have registered a heartbeat within the given timeout window. Read more
Source§

fn get_active_runners<'life0, 'async_trait>( &'life0 self, timeout_seconds: u64, _can_run_atomic_service: Option<bool>, ) -> Pin<Box<dyn Future<Output = RustvelloResult<Vec<ActiveRunnerInfo>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Get detailed info for active runners, optionally filtered by can_run_atomic_service eligibility. Read more
Source§

fn record_atomic_service_execution<'life0, 'life1, 'async_trait>( &'life0 self, _runner_id: &'life1 RunnerId, _start: DateTime<Utc>, _end: DateTime<Utc>, ) -> Pin<Box<dyn Future<Output = RustvelloResult<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Record that this runner executed the atomic global service. Read more
Source§

fn get_atomic_service_timeline<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = RustvelloResult<Vec<AtomicServiceExecution>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Get the timeline of atomic service executions (most recent first).
Source§

impl OrchestratorStatus for PostgresOrchestrator

Source§

fn register_invocation<'life0, 'life1, 'async_trait>( &'life0 self, call: &'life1 CallDTO, ) -> Pin<Box<dyn Future<Output = RustvelloResult<InvocationId>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Register a new invocation for the given call. Sets initial status to Registered.
Source§

fn get_invocation_status<'life0, 'life1, 'async_trait>( &'life0 self, invocation_id: &'life1 InvocationId, ) -> Pin<Box<dyn Future<Output = RustvelloResult<InvocationStatusRecord>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Get the current status of an invocation.
Source§

fn set_invocation_status<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, invocation_id: &'life1 InvocationId, status: InvocationStatus, runner_id: Option<&'life2 RunnerId>, ) -> Pin<Box<dyn Future<Output = RustvelloResult<InvocationStatusRecord>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Atomically transition an invocation to a new status. Validates the transition against the state machine. runner_id is required for Running/RunningRecovery transitions.
Source§

fn register_invocation_with_id<'life0, 'life1, 'life2, 'life3, 'async_trait>( &'life0 self, invocation_id: &'life1 InvocationId, call: &'life2 CallDTO, runner_id: Option<&'life3 RunnerId>, ) -> Pin<Box<dyn Future<Output = RustvelloResult<InvocationStatusRecord>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait,

Register an invocation with a pre-existing ID and call. Read more
Source§

fn increment_invocation_retries<'life0, 'life1, 'async_trait>( &'life0 self, invocation_id: &'life1 InvocationId, ) -> Pin<Box<dyn Future<Output = RustvelloResult<u32>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Increment the retry counter for an invocation. Returns the new count.
Source§

fn get_invocation_retries<'life0, 'life1, 'async_trait>( &'life0 self, invocation_id: &'life1 InvocationId, ) -> Pin<Box<dyn Future<Output = RustvelloResult<u32>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Get the current retry count for an invocation.
Source§

fn remove_invocation<'life0, 'life1, 'async_trait>( &'life0 self, invocation_id: &'life1 InvocationId, ) -> Pin<Box<dyn Future<Output = RustvelloResult<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Remove an invocation from all indexes (status, task, call, retries, CC). Used during auto-purge of terminal invocations.
Source§

fn purge<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = RustvelloResult<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Remove all orchestrator data (invocations, statuses, CC index, etc.). Read more
Source§

fn schedule_auto_purge<'life0, 'life1, 'async_trait>( &'life0 self, _invocation_id: &'life1 InvocationId, ) -> Pin<Box<dyn Future<Output = RustvelloResult<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Mark an invocation for future auto-purge. Read more
Source§

fn run_auto_purge<'life0, 'async_trait>( &'life0 self, _max_age_secs: u64, ) -> Pin<Box<dyn Future<Output = RustvelloResult<Vec<InvocationId>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Execute scheduled auto-purges older than max_age_secs. Read more
Source§

fn backend_name(&self) -> &'static str

Human-readable name of the orchestrator backend (e.g. “In-Memory”, “SQLite”).
Source§

fn usage_stats<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Vec<(&'static str, String)>> + Send + 'async_trait>>
where 'life0: 'async_trait, Self: 'async_trait,

Key-value statistics about this orchestrator’s current state. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> Orchestrator for T