use std::{
collections::{BTreeMap, BTreeSet},
path::PathBuf,
time::Duration,
};
use crate::{
BuildId, DataflowId, SessionId,
common::{DaemonId, GitSource},
descriptor::{Descriptor, ResolvedNode},
id::{NodeId, OperatorId},
};
pub use crate::common::Timestamped;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum RegisterResult {
Ok {
daemon_id: DaemonId,
},
Err(String),
}
impl RegisterResult {
pub fn to_result(self) -> eyre::Result<DaemonId> {
match self {
RegisterResult::Ok { daemon_id } => Ok(daemon_id),
RegisterResult::Err(err) => Err(eyre::eyre!(err)),
}
}
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct BuildDataflowNodes {
pub build_id: BuildId,
pub session_id: SessionId,
pub local_working_dir: Option<PathBuf>,
pub git_sources: BTreeMap<NodeId, GitSource>,
pub prev_git_sources: BTreeMap<NodeId, GitSource>,
pub dataflow_descriptor: Descriptor,
pub nodes_on_machine: BTreeSet<NodeId>,
pub uv: bool,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct SpawnDataflowNodes {
pub build_id: Option<BuildId>,
pub session_id: SessionId,
pub dataflow_id: DataflowId,
pub local_working_dir: Option<PathBuf>,
pub nodes: BTreeMap<NodeId, ResolvedNode>,
pub dataflow_descriptor: Descriptor,
pub spawn_nodes: BTreeSet<NodeId>,
pub uv: bool,
pub write_events_to: Option<PathBuf>,
}
type DaemonResult<T> = std::result::Result<T, String>;
#[tarpc::service]
pub trait DaemonControl {
async fn build(request: BuildDataflowNodes) -> DaemonResult<()>;
async fn spawn(request: SpawnDataflowNodes) -> DaemonResult<()>;
async fn all_nodes_ready(dataflow_id: DataflowId, exited_before_subscribe: Vec<NodeId>);
async fn stop_dataflow(
dataflow_id: DataflowId,
grace_duration: Option<Duration>,
force: bool,
) -> DaemonResult<()>;
async fn reload_dataflow(
dataflow_id: DataflowId,
node_id: NodeId,
operator_id: Option<OperatorId>,
) -> DaemonResult<()>;
async fn logs(
dataflow_id: DataflowId,
node_id: NodeId,
tail: Option<usize>,
) -> DaemonResult<Vec<u8>>;
async fn destroy() -> DaemonResult<()>;
async fn heartbeat();
async fn get_version() -> DaemonVersionInfo;
}
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct DaemonVersionInfo {
pub daemon_version: String,
pub message_format_version: String,
}