Skip to main content

rustvello_mem/orchestrator/
mod.rs

1mod blocking;
2mod concurrency;
3mod query;
4mod recovery;
5mod status;
6
7#[cfg(test)]
8mod tests;
9
10use std::collections::{HashMap, HashSet};
11use std::sync::Arc;
12use tokio::sync::Mutex;
13
14use chrono::{DateTime, Utc};
15
16use rustvello_core::orchestrator::AtomicServiceExecution;
17use rustvello_proto::identifiers::InvocationId;
18use rustvello_proto::invocation::InvocationDTO;
19use rustvello_proto::status::InvocationStatusRecord;
20
21/// In-memory orchestrator implementation.
22///
23/// Manages invocation lifecycle with atomic status transitions.
24/// All state is held in process memory.
25pub struct OrchestratorState {
26    pub invocations: HashMap<Arc<str>, InvocationDTO>,
27    pub status_records: HashMap<Arc<str>, InvocationStatusRecord>,
28    /// task_id string -> set of invocation_id strings
29    pub(crate) task_invocations: HashMap<String, HashSet<Arc<str>>>,
30    /// call_id string -> set of invocation_id strings
31    pub(crate) call_invocations: HashMap<String, HashSet<Arc<str>>>,
32    /// waiter_id -> waited_on_id
33    pub(crate) waiting_for: HashMap<Arc<str>, Arc<str>>,
34    /// waited_on_id -> set of waiter_ids
35    pub(crate) waiters: HashMap<Arc<str>, HashSet<Arc<str>>>,
36    /// Concurrency control index: (task_id, arg_key, arg_value) -> set of invocation_ids
37    /// Each argument pair is indexed individually (matching pynenc's per-pair design).
38    pub(crate) cc_index: HashMap<(String, String, String), HashSet<Arc<str>>>,
39    /// Reverse lookup: invocation_id -> list of (task_id, arg_key, arg_value) tuples
40    pub(crate) cc_reverse: HashMap<Arc<str>, Vec<(String, String, String)>>,
41    /// Retry counters: invocation_id -> retry count
42    pub(crate) retries: HashMap<Arc<str>, u32>,
43    /// Runner heartbeats: runner_id -> last heartbeat time
44    pub(crate) heartbeats: HashMap<Arc<str>, DateTime<Utc>>,
45    /// Runner creation times: runner_id -> first seen time
46    pub(crate) runner_created: HashMap<Arc<str>, DateTime<Utc>>,
47    /// Atomic service eligibility per runner
48    pub(crate) runner_atomic_eligible: HashMap<Arc<str>, bool>,
49    /// Last atomic service execution start time per runner
50    pub(crate) runner_last_service_start: HashMap<Arc<str>, DateTime<Utc>>,
51    /// Last atomic service execution end time per runner
52    pub(crate) runner_last_service_end: HashMap<Arc<str>, DateTime<Utc>>,
53    /// Full atomic service execution timeline (up to 200 most recent)
54    pub(crate) atomic_timeline: Vec<AtomicServiceExecution>,
55    /// Auto-purge schedule: invocation_id -> scheduled_at timestamp
56    pub(crate) auto_purge_queue: HashMap<Arc<str>, DateTime<Utc>>,
57}
58
59pub struct MemOrchestrator {
60    /// Internal state. Use query/mutation methods instead of direct access.
61    pub(crate) state: Mutex<OrchestratorState>,
62}
63
64impl MemOrchestrator {
65    pub fn new() -> Self {
66        Self {
67            state: Mutex::new(OrchestratorState {
68                invocations: HashMap::with_capacity(64),
69                status_records: HashMap::with_capacity(64),
70                task_invocations: HashMap::with_capacity(8),
71                call_invocations: HashMap::with_capacity(64),
72                waiting_for: HashMap::new(),
73                waiters: HashMap::new(),
74                cc_index: HashMap::with_capacity(64),
75                cc_reverse: HashMap::with_capacity(64),
76                retries: HashMap::new(),
77                heartbeats: HashMap::with_capacity(4),
78                runner_created: HashMap::with_capacity(4),
79                runner_atomic_eligible: HashMap::with_capacity(4),
80                runner_last_service_start: HashMap::with_capacity(4),
81                runner_last_service_end: HashMap::with_capacity(4),
82                atomic_timeline: Vec::new(),
83                auto_purge_queue: HashMap::new(),
84            }),
85        }
86    }
87}
88
89impl Default for MemOrchestrator {
90    fn default() -> Self {
91        Self::new()
92    }
93}
94
95impl MemOrchestrator {
96    /// Backdate the status record timestamp for a given invocation.
97    ///
98    /// Used in integration tests to simulate stale invocations for recovery
99    /// testing. Not intended for use in production code.
100    pub async fn backdate_status_for_testing(
101        &self,
102        inv_id: &InvocationId,
103        offset: chrono::Duration,
104    ) {
105        let mut state = self.state.lock().await;
106        if let Some(record) = state.status_records.get_mut(inv_id.as_str()) {
107            record.timestamp = chrono::Utc::now() - offset;
108        }
109    }
110}