rustvello_mem/orchestrator/
mod.rs1mod blocking;
2mod concurrency;
3mod query;
4mod recovery;
5mod status;
6
7#[cfg(test)]
8mod tests;
9
10use std::collections::{HashMap, HashSet};
11use std::sync::Arc;
12use tokio::sync::Mutex;
13
14use chrono::{DateTime, Utc};
15
16use rustvello_core::orchestrator::AtomicServiceExecution;
17use rustvello_proto::identifiers::InvocationId;
18use rustvello_proto::invocation::InvocationDTO;
19use rustvello_proto::status::InvocationStatusRecord;
20
21pub struct OrchestratorState {
26 pub invocations: HashMap<Arc<str>, InvocationDTO>,
27 pub status_records: HashMap<Arc<str>, InvocationStatusRecord>,
28 pub(crate) task_invocations: HashMap<String, HashSet<Arc<str>>>,
30 pub(crate) call_invocations: HashMap<String, HashSet<Arc<str>>>,
32 pub(crate) waiting_for: HashMap<Arc<str>, Arc<str>>,
34 pub(crate) waiters: HashMap<Arc<str>, HashSet<Arc<str>>>,
36 pub(crate) cc_index: HashMap<(String, String, String), HashSet<Arc<str>>>,
39 pub(crate) cc_reverse: HashMap<Arc<str>, Vec<(String, String, String)>>,
41 pub(crate) retries: HashMap<Arc<str>, u32>,
43 pub(crate) heartbeats: HashMap<Arc<str>, DateTime<Utc>>,
45 pub(crate) runner_created: HashMap<Arc<str>, DateTime<Utc>>,
47 pub(crate) runner_atomic_eligible: HashMap<Arc<str>, bool>,
49 pub(crate) runner_last_service_start: HashMap<Arc<str>, DateTime<Utc>>,
51 pub(crate) runner_last_service_end: HashMap<Arc<str>, DateTime<Utc>>,
53 pub(crate) atomic_timeline: Vec<AtomicServiceExecution>,
55 pub(crate) auto_purge_queue: HashMap<Arc<str>, DateTime<Utc>>,
57}
58
59pub struct MemOrchestrator {
60 pub(crate) state: Mutex<OrchestratorState>,
62}
63
64impl MemOrchestrator {
65 pub fn new() -> Self {
66 Self {
67 state: Mutex::new(OrchestratorState {
68 invocations: HashMap::with_capacity(64),
69 status_records: HashMap::with_capacity(64),
70 task_invocations: HashMap::with_capacity(8),
71 call_invocations: HashMap::with_capacity(64),
72 waiting_for: HashMap::new(),
73 waiters: HashMap::new(),
74 cc_index: HashMap::with_capacity(64),
75 cc_reverse: HashMap::with_capacity(64),
76 retries: HashMap::new(),
77 heartbeats: HashMap::with_capacity(4),
78 runner_created: HashMap::with_capacity(4),
79 runner_atomic_eligible: HashMap::with_capacity(4),
80 runner_last_service_start: HashMap::with_capacity(4),
81 runner_last_service_end: HashMap::with_capacity(4),
82 atomic_timeline: Vec::new(),
83 auto_purge_queue: HashMap::new(),
84 }),
85 }
86 }
87}
88
89impl Default for MemOrchestrator {
90 fn default() -> Self {
91 Self::new()
92 }
93}
94
95impl MemOrchestrator {
96 pub async fn backdate_status_for_testing(
101 &self,
102 inv_id: &InvocationId,
103 offset: chrono::Duration,
104 ) {
105 let mut state = self.state.lock().await;
106 if let Some(record) = state.status_records.get_mut(inv_id.as_str()) {
107 record.timestamp = chrono::Utc::now() - offset;
108 }
109 }
110}