use super::{
communication::{setup_comm, SetupCommunicationError, WorkerSender},
watchmap::WatchMap,
};
use crate::{
runtime::communication::CoordinatorWorkerComm, snapshot::SnapshotVersion, types::WorkerId,
};
use indexmap::{IndexMap, IndexSet};
use serde::{Deserialize, Serialize};
use thiserror::Error;
#[derive(PartialEq, Eq, Clone, Serialize, Deserialize)]
pub(super) enum WorkerPhase {
Unknown,
BuildComplete,
Running,
Snapshotting,
Reconfiguring,
Suspended,
Completed,
}
impl Default for WorkerPhase {
fn default() -> Self {
Self::Unknown
}
}
#[derive(Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub(super) struct WorkerState {
pub phase: WorkerPhase,
pub snapshot_version: Option<SnapshotVersion>,
}
#[derive(Clone)]
pub(crate) struct CoordinatorState {
pub(super) worker_states: WatchMap<WorkerId, WorkerState>,
pub(super) active_workers: IndexMap<WorkerId, WorkerSender>,
pub(super) config_version: Option<u64>,
}
impl CoordinatorState {
pub(crate) async fn from_serialized<C>(
ser: SerializableCoordinatorState,
comm: &C,
) -> Result<Self, FromSerializedError>
where
C: CoordinatorWorkerComm,
{
let worker_states = WatchMap::from(ser.worker_states);
let (active_workers, _recv_tasks) =
setup_comm(comm, &ser.active_workers, &worker_states).await?;
Ok(Self {
worker_states,
active_workers,
config_version: ser.config_version,
})
}
pub(crate) async fn get_serializable(&self) -> SerializableCoordinatorState {
SerializableCoordinatorState {
worker_states: self.worker_states.clone_inner_map().await,
active_workers: self.active_workers.keys().cloned().collect(),
config_version: self.config_version,
}
}
}
#[derive(Debug, Error)]
pub enum FromSerializedError {
#[error("Error setting up communication")]
SetupCommunication(#[from] SetupCommunicationError),
}
#[derive(Default, Serialize, Deserialize)]
pub(crate) struct SerializableCoordinatorState {
pub(super) worker_states: IndexMap<WorkerId, WorkerState>,
pub(super) active_workers: IndexSet<WorkerId>,
pub(super) config_version: Option<u64>,
}