Skip to main content

rustvello_core/
orchestrator.rs

1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use tracing::instrument;
4
5use rustvello_proto::call::{CallDTO, SerializedArguments};
6use rustvello_proto::config::TaskConfig;
7use rustvello_proto::identifiers::{CallId, InvocationId, RunnerId, TaskId};
8use rustvello_proto::status::{InvocationStatus, InvocationStatusRecord};
9
10use crate::error::RustvelloResult;
11
12/// A recorded execution of the atomic global service by a specific runner.
13#[derive(Debug, Clone)]
14pub struct AtomicServiceExecution {
15    pub runner_id: String,
16    pub start: DateTime<Utc>,
17    pub end: DateTime<Utc>,
18}
19
20impl AtomicServiceExecution {
21    pub fn duration_secs(&self) -> f64 {
22        (self.end - self.start).num_milliseconds() as f64 / 1000.0
23    }
24}
25
26/// Information about an active runner, including heartbeat and atomic service metadata.
27///
28/// All timestamps use wall-clock `DateTime<Utc>` so the struct can cross
29/// FFI boundaries and be serialized/persisted by any backend.
30#[derive(Debug, Clone)]
31pub struct ActiveRunnerInfo {
32    pub runner_id: RunnerId,
33    pub creation_time: DateTime<Utc>,
34    pub last_heartbeat: DateTime<Utc>,
35    pub can_run_atomic_service: bool,
36    /// Wall-clock time of the last atomic service execution start.
37    pub last_service_start: Option<DateTime<Utc>>,
38    /// Wall-clock time of the last atomic service execution end.
39    pub last_service_end: Option<DateTime<Utc>>,
40}
41
42/// Orchestrator interface — manages the invocation lifecycle.
43///
44/// Mirrors pynenc's `BaseOrchestrator`. Responsible for:
45/// - Creating and registering invocations
46/// - Managing status transitions (atomic state machine)
47/// - Routing calls to the broker
48/// - Concurrency control
49/// - Blocking control (waiting for results)
50///
51/// This is a composite trait combining five sub-traits:
52/// - [`OrchestratorStatus`] — registration, status, retries, cleanup
53/// - [`OrchestratorConcurrency`] — concurrency control indexing
54/// - [`OrchestratorBlocking`] — waiting/release for blocking invocations
55/// - [`OrchestratorQuery`] — listing, filtering, pagination
56/// - [`OrchestratorRecovery`] — heartbeats, stale detection, atomic service
57///
58/// Implementations should implement the sub-traits directly.
59/// This supertrait is auto-implemented via a blanket impl.
60pub trait Orchestrator:
61    OrchestratorStatus
62    + OrchestratorConcurrency
63    + OrchestratorBlocking
64    + OrchestratorQuery
65    + OrchestratorRecovery
66{
67}
68
69impl<
70        T: OrchestratorStatus
71            + OrchestratorConcurrency
72            + OrchestratorBlocking
73            + OrchestratorQuery
74            + OrchestratorRecovery,
75    > Orchestrator for T
76{
77}
78
79// ===========================================================================
80// OrchestratorStatus — registration, status transitions, retries, cleanup
81// ===========================================================================
82
83/// Invocation lifecycle and status management.
84///
85/// Handles registration, status transitions, retry tracking, and cleanup
86/// (purge / auto-purge).
87#[async_trait]
88pub trait OrchestratorStatus: Send + Sync {
89    // --- Invocation registration ---
90
91    /// Register a new invocation for the given call.
92    /// Sets initial status to `Registered`.
93    async fn register_invocation(&self, call: &CallDTO) -> RustvelloResult<InvocationId>;
94
95    /// Register an invocation with a pre-existing ID and call.
96    ///
97    /// Used when the invocation ID is generated externally (e.g., by a
98    /// language framework such as pynenc). Sets initial status to
99    /// `Registered` and indexes by task and call for later queries.
100    async fn register_invocation_with_id(
101        &self,
102        invocation_id: &InvocationId,
103        call: &CallDTO,
104        runner_id: Option<&RunnerId>,
105    ) -> RustvelloResult<InvocationStatusRecord>;
106
107    // --- Retry tracking ---
108
109    /// Increment the retry counter for an invocation. Returns the new count.
110    async fn increment_invocation_retries(
111        &self,
112        invocation_id: &InvocationId,
113    ) -> RustvelloResult<u32>;
114
115    /// Get the current retry count for an invocation.
116    async fn get_invocation_retries(&self, invocation_id: &InvocationId) -> RustvelloResult<u32>;
117
118    /// Remove an invocation from all indexes (status, task, call, retries, CC).
119    /// Used during auto-purge of terminal invocations.
120    async fn remove_invocation(&self, invocation_id: &InvocationId) -> RustvelloResult<()>;
121
122    // --- Status management ---
123
124    /// Get the current status of an invocation.
125    async fn get_invocation_status(
126        &self,
127        invocation_id: &InvocationId,
128    ) -> RustvelloResult<InvocationStatusRecord>;
129
130    /// Atomically transition an invocation to a new status.
131    /// Validates the transition against the state machine.
132    /// `runner_id` is required for Running/RunningRecovery transitions.
133    async fn set_invocation_status(
134        &self,
135        invocation_id: &InvocationId,
136        status: InvocationStatus,
137        runner_id: Option<&RunnerId>,
138    ) -> RustvelloResult<InvocationStatusRecord>;
139
140    // --- Introspection ---
141
142    /// Human-readable name of the orchestrator backend (e.g. "In-Memory", "SQLite").
143    fn backend_name(&self) -> &'static str {
144        "Unknown"
145    }
146
147    /// Key-value statistics about this orchestrator's current state.
148    ///
149    /// Each entry is a `(label, value)` pair for display. Implementations
150    /// should return counts, sizes, or any relevant runtime metrics.
151    async fn usage_stats(&self) -> Vec<(&'static str, String)> {
152        Vec::new()
153    }
154
155    // --- Cleanup ---
156
157    /// Remove all orchestrator data (invocations, statuses, CC index, etc.).
158    ///
159    /// Used in tests and monitoring dashboard "purge" action.
160    /// Mirrors pynenc's `BaseOrchestrator.purge`.
161    async fn purge(&self) -> RustvelloResult<()>;
162
163    // --- Auto-purge ---
164
165    /// Mark an invocation for future auto-purge.
166    ///
167    /// Stores `(invocation_id, now)` — the timestamp when the invocation
168    /// became eligible for purging. The actual purge decision happens in
169    /// `run_auto_purge`, which uses the *current* max-age configuration.
170    async fn schedule_auto_purge(&self, invocation_id: &InvocationId) -> RustvelloResult<()>;
171
172    /// Execute scheduled auto-purges older than `max_age_secs`.
173    ///
174    /// Purges invocations whose schedule timestamp is ≤ `now - max_age_secs`.
175    /// Returns the IDs of purged invocations.
176    async fn run_auto_purge(&self, max_age_secs: u64) -> RustvelloResult<Vec<InvocationId>>;
177}
178
179// ===========================================================================
180// OrchestratorConcurrency — concurrency control indexing and slot acquisition
181// ===========================================================================
182
183/// Concurrency control operations.
184///
185/// Manages the concurrency index that tracks which invocations are currently
186/// using concurrency-controlled slots.
187#[async_trait]
188pub trait OrchestratorConcurrency: Send + Sync {
189    /// Check if a new invocation is authorized to run given concurrency constraints.
190    ///
191    /// Returns `true` if the invocation can proceed, `false` if it should be
192    /// held back or rejected. Mirrors pynenc's
193    /// `is_candidate_to_run_by_concurrency_control` (checks **Pending + Running**
194    /// invocations). Pynenc also has a stricter `is_authorize_to_run_by_concurrency_control`
195    /// (Running only, used as a safety-net in `DistributedInvocation.run()`); the Rust
196    /// runner replaces that with ownership claims + CC indexing before execution.
197    async fn check_running_concurrency(
198        &self,
199        task_id: &TaskId,
200        task_config: &TaskConfig,
201        cc_args: Option<&SerializedArguments>,
202    ) -> RustvelloResult<bool>;
203
204    /// Index an invocation's arguments for concurrency control tracking.
205    ///
206    /// Called after creating a new invocation so future CC checks can find it.
207    async fn index_for_concurrency_control(
208        &self,
209        invocation_id: &InvocationId,
210        task_id: &TaskId,
211        cc_args: Option<&SerializedArguments>,
212    ) -> RustvelloResult<()>;
213
214    /// Remove an invocation from the concurrency control index.
215    ///
216    /// Called when an invocation reaches a terminal state.
217    async fn remove_from_concurrency_index(
218        &self,
219        invocation_id: &InvocationId,
220    ) -> RustvelloResult<()>;
221
222    /// Atomically check concurrency and index if under the limit.
223    ///
224    /// Returns `true` if the slot was acquired (invocation indexed),
225    /// `false` if the task is already at its concurrency limit.
226    ///
227    /// The default implementation calls `check_running_concurrency` then
228    /// `index_for_concurrency_control` — non-atomic but correct for
229    /// single-process backends (Mem, SQLite). Distributed backends
230    /// (Redis, Postgres, MongoDB) should override this with an atomic
231    /// implementation to prevent TOCTOU races.
232    #[instrument(skip(self, task_config, cc_args), fields(%invocation_id, %task_id))]
233    async fn try_acquire_concurrency_slot(
234        &self,
235        invocation_id: &InvocationId,
236        task_id: &TaskId,
237        task_config: &TaskConfig,
238        cc_args: Option<&SerializedArguments>,
239    ) -> RustvelloResult<bool> {
240        if self
241            .check_running_concurrency(task_id, task_config, cc_args)
242            .await?
243        {
244            self.index_for_concurrency_control(invocation_id, task_id, cc_args)
245                .await?;
246            Ok(true)
247        } else {
248            Ok(false)
249        }
250    }
251}
252
253// ===========================================================================
254// OrchestratorBlocking — waiting and release for blocking invocations
255// ===========================================================================
256
257/// Blocking control operations.
258///
259/// Manages the waiting graph: which invocations are waiting on other
260/// invocations to complete before they can proceed.
261#[async_trait]
262pub trait OrchestratorBlocking: Send + Sync {
263    /// Mark that `waiter` is waiting for `waited_on` to complete.
264    async fn set_waiting_for(
265        &self,
266        waiter: &InvocationId,
267        waited_on: &InvocationId,
268    ) -> RustvelloResult<()>;
269
270    /// Get invocations that are waiting on the given invocation.
271    async fn get_waiters(&self, waited_on: &InvocationId) -> RustvelloResult<Vec<InvocationId>>;
272
273    /// Release all invocations waiting on the given completed invocation.
274    async fn release_waiters(&self, completed: &InvocationId)
275        -> RustvelloResult<Vec<InvocationId>>;
276}
277
278// ===========================================================================
279// OrchestratorQuery — listing, filtering, pagination
280// ===========================================================================
281
282/// Query operations for finding and filtering invocations.
283#[async_trait]
284pub trait OrchestratorQuery: OrchestratorStatus {
285    /// Get all invocation IDs for a given task.
286    async fn get_invocations_by_task(&self, task_id: &TaskId)
287        -> RustvelloResult<Vec<InvocationId>>;
288
289    /// Get all invocation IDs for a given call.
290    async fn get_invocations_by_call(&self, call_id: &CallId)
291        -> RustvelloResult<Vec<InvocationId>>;
292
293    /// Get invocations with a specific status, optionally filtered by task.
294    async fn get_invocations_by_status(
295        &self,
296        status: InvocationStatus,
297        task_id: Option<&TaskId>,
298    ) -> RustvelloResult<Vec<InvocationId>>;
299
300    /// Count invocations, optionally filtered by task and/or statuses.
301    ///
302    /// Mirrors pynenc's `BaseOrchestrator.count_invocations`.
303    async fn count_invocations(
304        &self,
305        task_id: Option<&TaskId>,
306        statuses: Option<&[InvocationStatus]>,
307    ) -> RustvelloResult<usize>;
308
309    /// Get paginated invocation IDs, optionally filtered by task and statuses.
310    ///
311    /// Mirrors pynenc's `BaseOrchestrator.get_invocation_ids_paginated`.
312    async fn get_invocation_ids_paginated(
313        &self,
314        task_id: Option<&TaskId>,
315        statuses: Option<&[InvocationStatus]>,
316        limit: usize,
317        offset: usize,
318    ) -> RustvelloResult<Vec<InvocationId>>;
319
320    /// Filter a set of invocation IDs by status.
321    ///
322    /// Returns only the invocation IDs whose current status matches any status in the filter.
323    /// Mirrors pynenc's `BaseOrchestrator.filter_by_status`.
324    #[instrument(skip(self, invocation_ids, statuses), fields(count = invocation_ids.len()))]
325    async fn filter_by_status(
326        &self,
327        invocation_ids: &[InvocationId],
328        statuses: &[InvocationStatus],
329    ) -> RustvelloResult<Vec<InvocationId>> {
330        // Default: check each invocation individually
331        let mut result = Vec::new();
332        for inv_id in invocation_ids {
333            if let Ok(record) = self.get_invocation_status(inv_id).await {
334                if statuses.contains(&record.status) {
335                    result.push(inv_id.clone());
336                }
337            }
338        }
339        Ok(result)
340    }
341
342    /// Get invocations that are blocking other invocations but are not blocked themselves.
343    ///
344    /// Returns up to `max_num` invocations that have waiters but are themselves
345    /// in an available-for-run status.
346    /// Mirrors pynenc's `BaseBlockingControl.get_blocking_invocations`.
347    async fn get_blocking_invocations(&self, max_num: usize) -> RustvelloResult<Vec<InvocationId>>;
348
349    /// Find existing invocations for a task, filtered by concurrency-control
350    /// key arguments and statuses.
351    ///
352    /// Used by `route_call()` / `submit_with_cc()` to implement registration
353    /// concurrency control — deduplicating invocations with the same key args
354    /// that are still in a non-terminal status.
355    ///
356    /// Mirrors pynenc's `BaseOrchestrator.get_existing_invocations`.
357    ///
358    /// - `cc_args`: When `Some`, only return invocations whose CC key matches.
359    ///   When `None`, return all invocations for the task (task-level CC).
360    /// - `statuses`: Only return invocations in one of these statuses.
361    async fn get_existing_invocations(
362        &self,
363        task_id: &TaskId,
364        cc_args: Option<&SerializedArguments>,
365        statuses: &[InvocationStatus],
366    ) -> RustvelloResult<Vec<InvocationId>>;
367}
368
369// ===========================================================================
370// OrchestratorRecovery — heartbeats, stale detection, runner info
371// ===========================================================================
372
373/// Recovery and heartbeat operations for distributed runner management.
374#[async_trait]
375pub trait OrchestratorRecovery: Send + Sync {
376    /// Register a heartbeat for a runner, indicating it is still alive.
377    ///
378    /// `can_run_atomic_service` marks whether this runner is eligible to run
379    /// shared atomic services (recovery, triggers). Parent/main runners
380    /// typically pass `true`; child workers pass `false`.
381    async fn register_heartbeat(
382        &self,
383        runner_id: &RunnerId,
384        can_run_atomic_service: bool,
385    ) -> RustvelloResult<()>;
386
387    /// Get invocations stuck in Pending beyond the configured threshold.
388    ///
389    /// Returns invocation IDs that have been in Pending status for longer
390    /// than `max_pending_seconds`.
391    async fn get_stale_pending_invocations(
392        &self,
393        max_pending_seconds: u64,
394    ) -> RustvelloResult<Vec<InvocationId>>;
395
396    /// Get Running invocations owned by runners that haven't sent a
397    /// heartbeat within `runner_dead_after_seconds`.
398    async fn get_stale_running_invocations(
399        &self,
400        runner_dead_after_seconds: u64,
401    ) -> RustvelloResult<Vec<InvocationId>>;
402
403    /// Get the IDs of all runners that have registered a heartbeat within the
404    /// given timeout window.
405    ///
406    /// Mirrors pynenc's `BaseOrchestrator.get_active_runners`.
407    async fn get_active_runner_ids(&self, timeout_seconds: u64) -> RustvelloResult<Vec<RunnerId>>;
408
409    /// Get detailed info for active runners, optionally filtered by
410    /// `can_run_atomic_service` eligibility.
411    ///
412    /// Mirrors pynenc's `BaseOrchestrator.get_active_runners`.
413    async fn get_active_runners(
414        &self,
415        timeout_seconds: u64,
416        can_run_atomic_service: Option<bool>,
417    ) -> RustvelloResult<Vec<ActiveRunnerInfo>>;
418
419    // --- Atomic service coordination ---
420
421    /// Record that this runner executed the atomic global service.
422    ///
423    /// Called by the runner after recovery + trigger evaluation completes.
424    async fn record_atomic_service_execution(
425        &self,
426        runner_id: &RunnerId,
427        start: DateTime<Utc>,
428        end: DateTime<Utc>,
429    ) -> RustvelloResult<()>;
430
431    /// Get the timeline of atomic service executions (most recent first).
432    async fn get_atomic_service_timeline(&self) -> RustvelloResult<Vec<AtomicServiceExecution>>;
433}