rustvello_core/orchestrator.rs
1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use tracing::instrument;
4
5use rustvello_proto::call::{CallDTO, SerializedArguments};
6use rustvello_proto::config::TaskConfig;
7use rustvello_proto::identifiers::{CallId, InvocationId, RunnerId, TaskId};
8use rustvello_proto::status::{InvocationStatus, InvocationStatusRecord};
9
10use crate::error::RustvelloResult;
11
12/// A recorded execution of the atomic global service by a specific runner.
13#[derive(Debug, Clone)]
14pub struct AtomicServiceExecution {
15 pub runner_id: String,
16 pub start: DateTime<Utc>,
17 pub end: DateTime<Utc>,
18}
19
20impl AtomicServiceExecution {
21 pub fn duration_secs(&self) -> f64 {
22 (self.end - self.start).num_milliseconds() as f64 / 1000.0
23 }
24}
25
26/// Information about an active runner, including heartbeat and atomic service metadata.
27///
28/// All timestamps use wall-clock `DateTime<Utc>` so the struct can cross
29/// FFI boundaries and be serialized/persisted by any backend.
30#[derive(Debug, Clone)]
31pub struct ActiveRunnerInfo {
32 pub runner_id: RunnerId,
33 pub creation_time: DateTime<Utc>,
34 pub last_heartbeat: DateTime<Utc>,
35 pub can_run_atomic_service: bool,
36 /// Wall-clock time of the last atomic service execution start.
37 pub last_service_start: Option<DateTime<Utc>>,
38 /// Wall-clock time of the last atomic service execution end.
39 pub last_service_end: Option<DateTime<Utc>>,
40}
41
42/// Orchestrator interface — manages the invocation lifecycle.
43///
44/// Mirrors pynenc's `BaseOrchestrator`. Responsible for:
45/// - Creating and registering invocations
46/// - Managing status transitions (atomic state machine)
47/// - Routing calls to the broker
48/// - Concurrency control
49/// - Blocking control (waiting for results)
50///
51/// This is a composite trait combining five sub-traits:
52/// - [`OrchestratorStatus`] — registration, status, retries, cleanup
53/// - [`OrchestratorConcurrency`] — concurrency control indexing
54/// - [`OrchestratorBlocking`] — waiting/release for blocking invocations
55/// - [`OrchestratorQuery`] — listing, filtering, pagination
56/// - [`OrchestratorRecovery`] — heartbeats, stale detection, atomic service
57///
58/// Implementations should implement the sub-traits directly.
59/// This supertrait is auto-implemented via a blanket impl.
60pub trait Orchestrator:
61 OrchestratorStatus
62 + OrchestratorConcurrency
63 + OrchestratorBlocking
64 + OrchestratorQuery
65 + OrchestratorRecovery
66{
67}
68
69impl<
70 T: OrchestratorStatus
71 + OrchestratorConcurrency
72 + OrchestratorBlocking
73 + OrchestratorQuery
74 + OrchestratorRecovery,
75 > Orchestrator for T
76{
77}
78
79// ===========================================================================
80// OrchestratorStatus — registration, status transitions, retries, cleanup
81// ===========================================================================
82
83/// Invocation lifecycle and status management.
84///
85/// Handles registration, status transitions, retry tracking, and cleanup
86/// (purge / auto-purge).
87#[async_trait]
88pub trait OrchestratorStatus: Send + Sync {
89 // --- Invocation registration ---
90
91 /// Register a new invocation for the given call.
92 /// Sets initial status to `Registered`.
93 async fn register_invocation(&self, call: &CallDTO) -> RustvelloResult<InvocationId>;
94
95 /// Register an invocation with a pre-existing ID and call.
96 ///
97 /// Used when the invocation ID is generated externally (e.g., by a
98 /// language framework such as pynenc). Sets initial status to
99 /// `Registered` and indexes by task and call for later queries.
100 async fn register_invocation_with_id(
101 &self,
102 invocation_id: &InvocationId,
103 call: &CallDTO,
104 runner_id: Option<&RunnerId>,
105 ) -> RustvelloResult<InvocationStatusRecord>;
106
107 // --- Retry tracking ---
108
109 /// Increment the retry counter for an invocation. Returns the new count.
110 async fn increment_invocation_retries(
111 &self,
112 invocation_id: &InvocationId,
113 ) -> RustvelloResult<u32>;
114
115 /// Get the current retry count for an invocation.
116 async fn get_invocation_retries(&self, invocation_id: &InvocationId) -> RustvelloResult<u32>;
117
118 /// Remove an invocation from all indexes (status, task, call, retries, CC).
119 /// Used during auto-purge of terminal invocations.
120 async fn remove_invocation(&self, invocation_id: &InvocationId) -> RustvelloResult<()>;
121
122 // --- Status management ---
123
124 /// Get the current status of an invocation.
125 async fn get_invocation_status(
126 &self,
127 invocation_id: &InvocationId,
128 ) -> RustvelloResult<InvocationStatusRecord>;
129
130 /// Atomically transition an invocation to a new status.
131 /// Validates the transition against the state machine.
132 /// `runner_id` is required for Running/RunningRecovery transitions.
133 async fn set_invocation_status(
134 &self,
135 invocation_id: &InvocationId,
136 status: InvocationStatus,
137 runner_id: Option<&RunnerId>,
138 ) -> RustvelloResult<InvocationStatusRecord>;
139
140 // --- Introspection ---
141
142 /// Human-readable name of the orchestrator backend (e.g. "In-Memory", "SQLite").
143 fn backend_name(&self) -> &'static str {
144 "Unknown"
145 }
146
147 /// Key-value statistics about this orchestrator's current state.
148 ///
149 /// Each entry is a `(label, value)` pair for display. Implementations
150 /// should return counts, sizes, or any relevant runtime metrics.
151 async fn usage_stats(&self) -> Vec<(&'static str, String)> {
152 Vec::new()
153 }
154
155 // --- Cleanup ---
156
157 /// Remove all orchestrator data (invocations, statuses, CC index, etc.).
158 ///
159 /// Used in tests and monitoring dashboard "purge" action.
160 /// Mirrors pynenc's `BaseOrchestrator.purge`.
161 async fn purge(&self) -> RustvelloResult<()>;
162
163 // --- Auto-purge ---
164
165 /// Mark an invocation for future auto-purge.
166 ///
167 /// Stores `(invocation_id, now)` — the timestamp when the invocation
168 /// became eligible for purging. The actual purge decision happens in
169 /// `run_auto_purge`, which uses the *current* max-age configuration.
170 async fn schedule_auto_purge(&self, invocation_id: &InvocationId) -> RustvelloResult<()>;
171
172 /// Execute scheduled auto-purges older than `max_age_secs`.
173 ///
174 /// Purges invocations whose schedule timestamp is ≤ `now - max_age_secs`.
175 /// Returns the IDs of purged invocations.
176 async fn run_auto_purge(&self, max_age_secs: u64) -> RustvelloResult<Vec<InvocationId>>;
177}
178
179// ===========================================================================
180// OrchestratorConcurrency — concurrency control indexing and slot acquisition
181// ===========================================================================
182
183/// Concurrency control operations.
184///
185/// Manages the concurrency index that tracks which invocations are currently
186/// using concurrency-controlled slots.
187#[async_trait]
188pub trait OrchestratorConcurrency: Send + Sync {
189 /// Check if a new invocation is authorized to run given concurrency constraints.
190 ///
191 /// Returns `true` if the invocation can proceed, `false` if it should be
192 /// held back or rejected. Mirrors pynenc's
193 /// `is_candidate_to_run_by_concurrency_control` (checks **Pending + Running**
194 /// invocations). Pynenc also has a stricter `is_authorize_to_run_by_concurrency_control`
195 /// (Running only, used as a safety-net in `DistributedInvocation.run()`); the Rust
196 /// runner replaces that with ownership claims + CC indexing before execution.
197 async fn check_running_concurrency(
198 &self,
199 task_id: &TaskId,
200 task_config: &TaskConfig,
201 cc_args: Option<&SerializedArguments>,
202 ) -> RustvelloResult<bool>;
203
204 /// Index an invocation's arguments for concurrency control tracking.
205 ///
206 /// Called after creating a new invocation so future CC checks can find it.
207 async fn index_for_concurrency_control(
208 &self,
209 invocation_id: &InvocationId,
210 task_id: &TaskId,
211 cc_args: Option<&SerializedArguments>,
212 ) -> RustvelloResult<()>;
213
214 /// Remove an invocation from the concurrency control index.
215 ///
216 /// Called when an invocation reaches a terminal state.
217 async fn remove_from_concurrency_index(
218 &self,
219 invocation_id: &InvocationId,
220 ) -> RustvelloResult<()>;
221
222 /// Atomically check concurrency and index if under the limit.
223 ///
224 /// Returns `true` if the slot was acquired (invocation indexed),
225 /// `false` if the task is already at its concurrency limit.
226 ///
227 /// The default implementation calls `check_running_concurrency` then
228 /// `index_for_concurrency_control` — non-atomic but correct for
229 /// single-process backends (Mem, SQLite). Distributed backends
230 /// (Redis, Postgres, MongoDB) should override this with an atomic
231 /// implementation to prevent TOCTOU races.
232 #[instrument(skip(self, task_config, cc_args), fields(%invocation_id, %task_id))]
233 async fn try_acquire_concurrency_slot(
234 &self,
235 invocation_id: &InvocationId,
236 task_id: &TaskId,
237 task_config: &TaskConfig,
238 cc_args: Option<&SerializedArguments>,
239 ) -> RustvelloResult<bool> {
240 if self
241 .check_running_concurrency(task_id, task_config, cc_args)
242 .await?
243 {
244 self.index_for_concurrency_control(invocation_id, task_id, cc_args)
245 .await?;
246 Ok(true)
247 } else {
248 Ok(false)
249 }
250 }
251}
252
253// ===========================================================================
254// OrchestratorBlocking — waiting and release for blocking invocations
255// ===========================================================================
256
257/// Blocking control operations.
258///
259/// Manages the waiting graph: which invocations are waiting on other
260/// invocations to complete before they can proceed.
261#[async_trait]
262pub trait OrchestratorBlocking: Send + Sync {
263 /// Mark that `waiter` is waiting for `waited_on` to complete.
264 async fn set_waiting_for(
265 &self,
266 waiter: &InvocationId,
267 waited_on: &InvocationId,
268 ) -> RustvelloResult<()>;
269
270 /// Get invocations that are waiting on the given invocation.
271 async fn get_waiters(&self, waited_on: &InvocationId) -> RustvelloResult<Vec<InvocationId>>;
272
273 /// Release all invocations waiting on the given completed invocation.
274 async fn release_waiters(&self, completed: &InvocationId)
275 -> RustvelloResult<Vec<InvocationId>>;
276}
277
278// ===========================================================================
279// OrchestratorQuery — listing, filtering, pagination
280// ===========================================================================
281
282/// Query operations for finding and filtering invocations.
283#[async_trait]
284pub trait OrchestratorQuery: OrchestratorStatus {
285 /// Get all invocation IDs for a given task.
286 async fn get_invocations_by_task(&self, task_id: &TaskId)
287 -> RustvelloResult<Vec<InvocationId>>;
288
289 /// Get all invocation IDs for a given call.
290 async fn get_invocations_by_call(&self, call_id: &CallId)
291 -> RustvelloResult<Vec<InvocationId>>;
292
293 /// Get invocations with a specific status, optionally filtered by task.
294 async fn get_invocations_by_status(
295 &self,
296 status: InvocationStatus,
297 task_id: Option<&TaskId>,
298 ) -> RustvelloResult<Vec<InvocationId>>;
299
300 /// Count invocations, optionally filtered by task and/or statuses.
301 ///
302 /// Mirrors pynenc's `BaseOrchestrator.count_invocations`.
303 async fn count_invocations(
304 &self,
305 task_id: Option<&TaskId>,
306 statuses: Option<&[InvocationStatus]>,
307 ) -> RustvelloResult<usize>;
308
309 /// Get paginated invocation IDs, optionally filtered by task and statuses.
310 ///
311 /// Mirrors pynenc's `BaseOrchestrator.get_invocation_ids_paginated`.
312 async fn get_invocation_ids_paginated(
313 &self,
314 task_id: Option<&TaskId>,
315 statuses: Option<&[InvocationStatus]>,
316 limit: usize,
317 offset: usize,
318 ) -> RustvelloResult<Vec<InvocationId>>;
319
320 /// Filter a set of invocation IDs by status.
321 ///
322 /// Returns only the invocation IDs whose current status matches any status in the filter.
323 /// Mirrors pynenc's `BaseOrchestrator.filter_by_status`.
324 #[instrument(skip(self, invocation_ids, statuses), fields(count = invocation_ids.len()))]
325 async fn filter_by_status(
326 &self,
327 invocation_ids: &[InvocationId],
328 statuses: &[InvocationStatus],
329 ) -> RustvelloResult<Vec<InvocationId>> {
330 // Default: check each invocation individually
331 let mut result = Vec::new();
332 for inv_id in invocation_ids {
333 if let Ok(record) = self.get_invocation_status(inv_id).await {
334 if statuses.contains(&record.status) {
335 result.push(inv_id.clone());
336 }
337 }
338 }
339 Ok(result)
340 }
341
342 /// Get invocations that are blocking other invocations but are not blocked themselves.
343 ///
344 /// Returns up to `max_num` invocations that have waiters but are themselves
345 /// in an available-for-run status.
346 /// Mirrors pynenc's `BaseBlockingControl.get_blocking_invocations`.
347 async fn get_blocking_invocations(&self, max_num: usize) -> RustvelloResult<Vec<InvocationId>>;
348
349 /// Find existing invocations for a task, filtered by concurrency-control
350 /// key arguments and statuses.
351 ///
352 /// Used by `route_call()` / `submit_with_cc()` to implement registration
353 /// concurrency control — deduplicating invocations with the same key args
354 /// that are still in a non-terminal status.
355 ///
356 /// Mirrors pynenc's `BaseOrchestrator.get_existing_invocations`.
357 ///
358 /// - `cc_args`: When `Some`, only return invocations whose CC key matches.
359 /// When `None`, return all invocations for the task (task-level CC).
360 /// - `statuses`: Only return invocations in one of these statuses.
361 async fn get_existing_invocations(
362 &self,
363 task_id: &TaskId,
364 cc_args: Option<&SerializedArguments>,
365 statuses: &[InvocationStatus],
366 ) -> RustvelloResult<Vec<InvocationId>>;
367}
368
369// ===========================================================================
370// OrchestratorRecovery — heartbeats, stale detection, runner info
371// ===========================================================================
372
373/// Recovery and heartbeat operations for distributed runner management.
374#[async_trait]
375pub trait OrchestratorRecovery: Send + Sync {
376 /// Register a heartbeat for a runner, indicating it is still alive.
377 ///
378 /// `can_run_atomic_service` marks whether this runner is eligible to run
379 /// shared atomic services (recovery, triggers). Parent/main runners
380 /// typically pass `true`; child workers pass `false`.
381 async fn register_heartbeat(
382 &self,
383 runner_id: &RunnerId,
384 can_run_atomic_service: bool,
385 ) -> RustvelloResult<()>;
386
387 /// Get invocations stuck in Pending beyond the configured threshold.
388 ///
389 /// Returns invocation IDs that have been in Pending status for longer
390 /// than `max_pending_seconds`.
391 async fn get_stale_pending_invocations(
392 &self,
393 max_pending_seconds: u64,
394 ) -> RustvelloResult<Vec<InvocationId>>;
395
396 /// Get Running invocations owned by runners that haven't sent a
397 /// heartbeat within `runner_dead_after_seconds`.
398 async fn get_stale_running_invocations(
399 &self,
400 runner_dead_after_seconds: u64,
401 ) -> RustvelloResult<Vec<InvocationId>>;
402
403 /// Get the IDs of all runners that have registered a heartbeat within the
404 /// given timeout window.
405 ///
406 /// Mirrors pynenc's `BaseOrchestrator.get_active_runners`.
407 async fn get_active_runner_ids(&self, timeout_seconds: u64) -> RustvelloResult<Vec<RunnerId>>;
408
409 /// Get detailed info for active runners, optionally filtered by
410 /// `can_run_atomic_service` eligibility.
411 ///
412 /// Mirrors pynenc's `BaseOrchestrator.get_active_runners`.
413 async fn get_active_runners(
414 &self,
415 timeout_seconds: u64,
416 can_run_atomic_service: Option<bool>,
417 ) -> RustvelloResult<Vec<ActiveRunnerInfo>>;
418
419 // --- Atomic service coordination ---
420
421 /// Record that this runner executed the atomic global service.
422 ///
423 /// Called by the runner after recovery + trigger evaluation completes.
424 async fn record_atomic_service_execution(
425 &self,
426 runner_id: &RunnerId,
427 start: DateTime<Utc>,
428 end: DateTime<Utc>,
429 ) -> RustvelloResult<()>;
430
431 /// Get the timeline of atomic service executions (most recent first).
432 async fn get_atomic_service_timeline(&self) -> RustvelloResult<Vec<AtomicServiceExecution>>;
433}