use std::{collections::BTreeMap, path::PathBuf, sync::Arc, time::Instant};
use dashmap::DashMap;
use dora_core::{
build::{BuildInfo, GitManager},
uhlc::HLC,
};
use dora_message::{
BuildId, DataflowId, SessionId,
common::{DaemonId, NodeError},
daemon_to_coordinator::{CoordinatorNotifyClient, DataflowDaemonResult},
id::NodeId,
node_to_daemon::Timestamped,
tarpc,
};
use tokio::sync::{Mutex, mpsc};
use uuid::Uuid;
use crate::{Event, InterDaemonEvent, RunningDataflow};
pub(crate) struct DaemonState {
pub(crate) clock: Arc<HLC>,
daemon_id: std::sync::OnceLock<DaemonId>,
pub(crate) events_tx: mpsc::Sender<Timestamped<Event>>,
pub(crate) running: DashMap<DataflowId, RunningDataflow>,
pub(crate) working_dir: DashMap<DataflowId, PathBuf>,
pub(crate) dataflow_node_results: DashMap<DataflowId, BTreeMap<NodeId, Result<(), NodeError>>>,
pub(crate) sessions: DashMap<SessionId, BuildId>,
pub(crate) builds: DashMap<BuildId, BuildInfo>,
coordinator_client: std::sync::OnceLock<CoordinatorNotifyClient>,
pub(crate) last_coordinator_heartbeat: Mutex<Instant>,
pub(crate) git_manager: Mutex<GitManager>,
pub(crate) zenoh_session: Option<zenoh::Session>,
pub(crate) remote_daemon_events_tx:
Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
}
impl DaemonState {
pub(crate) fn new(
clock: Arc<HLC>,
events_tx: mpsc::Sender<Timestamped<Event>>,
zenoh_session: Option<zenoh::Session>,
remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
) -> Self {
Self {
clock,
daemon_id: std::sync::OnceLock::new(),
events_tx,
running: Default::default(),
working_dir: Default::default(),
dataflow_node_results: Default::default(),
sessions: Default::default(),
builds: Default::default(),
coordinator_client: std::sync::OnceLock::new(),
last_coordinator_heartbeat: Mutex::new(Instant::now()),
git_manager: Mutex::new(Default::default()),
zenoh_session,
remote_daemon_events_tx,
}
}
pub(crate) fn new_standalone(
clock: Arc<HLC>,
daemon_id: DaemonId,
events_tx: mpsc::Sender<Timestamped<Event>>,
zenoh_session: zenoh::Session,
builds: BTreeMap<BuildId, BuildInfo>,
) -> Self {
let state = Self {
clock,
daemon_id: std::sync::OnceLock::new(),
events_tx,
running: Default::default(),
working_dir: Default::default(),
dataflow_node_results: Default::default(),
sessions: Default::default(),
builds: {
let map = DashMap::new();
for (k, v) in builds {
map.insert(k, v);
}
map
},
coordinator_client: std::sync::OnceLock::new(),
last_coordinator_heartbeat: Mutex::new(Instant::now()),
git_manager: Mutex::new(Default::default()),
zenoh_session: Some(zenoh_session),
remote_daemon_events_tx: None,
};
let _ = state.daemon_id.set(daemon_id);
state
}
pub(crate) fn set_daemon_id(&self, id: DaemonId) {
let _ = self.daemon_id.set(id);
}
pub(crate) fn daemon_id(&self) -> &DaemonId {
self.daemon_id
.get()
.expect("daemon_id accessed before registration")
}
pub(crate) fn set_coordinator_client(&self, client: CoordinatorNotifyClient) {
let _ = self.coordinator_client.set(client);
}
pub(crate) fn coordinator_client(&self) -> Option<&CoordinatorNotifyClient> {
self.coordinator_client.get()
}
pub(crate) async fn finish_dataflow(&self, dataflow_id: DataflowId) -> eyre::Result<()> {
let result = DataflowDaemonResult {
timestamp: self.clock.new_timestamp(),
node_results: self
.dataflow_node_results
.get(&dataflow_id)
.map(|entry| entry.value().clone())
.unwrap_or_default(),
};
{
let mut git_manager = self.git_manager.lock().await;
git_manager
.clones_in_use
.values_mut()
.for_each(|dataflows| {
dataflows.remove(&dataflow_id);
});
}
if let Some(client) = self.coordinator_client.get() {
let client = client.clone();
tokio::spawn(async move {
let _ = client
.all_nodes_finished(tarpc::context::current(), dataflow_id, result)
.await;
});
}
self.running.remove(&dataflow_id);
Ok(())
}
}