mod blocking;
mod concurrency;
mod query;
mod recovery;
mod status;
#[cfg(test)]
mod tests;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::sync::Mutex;
use chrono::{DateTime, Utc};
use rustvello_core::orchestrator::AtomicServiceExecution;
use rustvello_proto::identifiers::InvocationId;
use rustvello_proto::invocation::InvocationDTO;
use rustvello_proto::status::InvocationStatusRecord;
pub struct OrchestratorState {
pub invocations: HashMap<Arc<str>, InvocationDTO>,
pub status_records: HashMap<Arc<str>, InvocationStatusRecord>,
pub(crate) task_invocations: HashMap<String, HashSet<Arc<str>>>,
pub(crate) call_invocations: HashMap<String, HashSet<Arc<str>>>,
pub(crate) waiting_for: HashMap<Arc<str>, Arc<str>>,
pub(crate) waiters: HashMap<Arc<str>, HashSet<Arc<str>>>,
pub(crate) cc_index: HashMap<(String, String, String), HashSet<Arc<str>>>,
pub(crate) cc_reverse: HashMap<Arc<str>, Vec<(String, String, String)>>,
pub(crate) retries: HashMap<Arc<str>, u32>,
pub(crate) heartbeats: HashMap<Arc<str>, DateTime<Utc>>,
pub(crate) runner_created: HashMap<Arc<str>, DateTime<Utc>>,
pub(crate) runner_atomic_eligible: HashMap<Arc<str>, bool>,
pub(crate) runner_last_service_start: HashMap<Arc<str>, DateTime<Utc>>,
pub(crate) runner_last_service_end: HashMap<Arc<str>, DateTime<Utc>>,
pub(crate) atomic_timeline: Vec<AtomicServiceExecution>,
pub(crate) auto_purge_queue: HashMap<Arc<str>, DateTime<Utc>>,
}
pub struct MemOrchestrator {
pub(crate) state: Mutex<OrchestratorState>,
}
impl MemOrchestrator {
pub fn new() -> Self {
Self {
state: Mutex::new(OrchestratorState {
invocations: HashMap::with_capacity(64),
status_records: HashMap::with_capacity(64),
task_invocations: HashMap::with_capacity(8),
call_invocations: HashMap::with_capacity(64),
waiting_for: HashMap::new(),
waiters: HashMap::new(),
cc_index: HashMap::with_capacity(64),
cc_reverse: HashMap::with_capacity(64),
retries: HashMap::new(),
heartbeats: HashMap::with_capacity(4),
runner_created: HashMap::with_capacity(4),
runner_atomic_eligible: HashMap::with_capacity(4),
runner_last_service_start: HashMap::with_capacity(4),
runner_last_service_end: HashMap::with_capacity(4),
atomic_timeline: Vec::new(),
auto_purge_queue: HashMap::new(),
}),
}
}
}
impl Default for MemOrchestrator {
fn default() -> Self {
Self::new()
}
}
impl MemOrchestrator {
pub async fn backdate_status_for_testing(
&self,
inv_id: &InvocationId,
offset: chrono::Duration,
) {
let mut state = self.state.lock().await;
if let Some(record) = state.status_records.get_mut(inv_id.as_str()) {
record.timestamp = chrono::Utc::now() - offset;
}
}
}