use std::collections::BTreeMap;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use dora_daemon::Daemon;
pub use dora_daemon::LogDestination;
use dora_message::common::LogMessage;
use dora_message::coordinator_to_cli::DataflowResult as DoraDataflowResult;
use dora_message::{BuildId, SessionId};
use eyre::{Context, Result};
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use tracing::info;
use uuid::Uuid;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum RuntimeMode {
#[default]
Embedded,
Distributed,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RuntimeConfig {
pub mode: RuntimeMode,
pub dataflow_path: PathBuf,
pub embedded: EmbeddedConfig,
pub distributed: DistributedConfig,
}
impl RuntimeConfig {
pub fn embedded<P: AsRef<Path>>(dataflow_path: P) -> Self {
Self {
mode: RuntimeMode::Embedded,
dataflow_path: dataflow_path.as_ref().to_path_buf(),
embedded: EmbeddedConfig::default(),
distributed: DistributedConfig::default(),
}
}
pub fn distributed<P: AsRef<Path>>(dataflow_path: P, coordinator_addr: SocketAddr) -> Self {
Self {
mode: RuntimeMode::Distributed,
dataflow_path: dataflow_path.as_ref().to_path_buf(),
embedded: EmbeddedConfig::default(),
distributed: DistributedConfig {
coordinator_addr,
..Default::default()
},
}
}
pub fn with_mode(mut self, mode: RuntimeMode) -> Self {
self.mode = mode;
self
}
pub fn with_uv(mut self, uv: bool) -> Self {
self.embedded.uv = uv;
self
}
pub fn with_events_output<P: AsRef<Path>>(mut self, path: P) -> Self {
self.embedded.write_events_to = Some(path.as_ref().to_path_buf());
self
}
pub fn with_log_destination(mut self, dest: LogDestinationType) -> Self {
self.embedded.log_destination = dest;
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmbeddedConfig {
pub uv: bool,
pub write_events_to: Option<PathBuf>,
pub log_destination: LogDestinationType,
pub build_id: Option<Uuid>,
}
impl Default for EmbeddedConfig {
fn default() -> Self {
Self {
uv: false,
write_events_to: None,
log_destination: LogDestinationType::Channel,
build_id: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub enum LogDestinationType {
Tracing,
#[default]
Channel,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DistributedConfig {
pub coordinator_addr: SocketAddr,
pub machine_id: Option<String>,
pub local_listen_port: u16,
}
impl Default for DistributedConfig {
fn default() -> Self {
Self {
coordinator_addr: "127.0.0.1:5000".parse().unwrap(),
machine_id: None,
local_listen_port: 5001,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum RuntimeState {
#[default]
Created,
Running,
Stopped,
Error,
}
#[derive(Debug, Clone)]
pub struct DataflowResult {
pub uuid: Uuid,
pub node_results: BTreeMap<String, NodeResult>,
pub success: bool,
}
#[derive(Debug, Clone)]
pub enum NodeResult {
Success,
Error(String),
}
impl From<DoraDataflowResult> for DataflowResult {
fn from(result: DoraDataflowResult) -> Self {
let node_results: BTreeMap<String, NodeResult> = result
.node_results
.into_iter()
.map(|(node_id, res)| {
let result = match res {
Ok(()) => NodeResult::Success,
Err(err) => NodeResult::Error(format!("{:?}", err)),
};
(node_id.to_string(), result)
})
.collect();
let success = node_results
.values()
.all(|r| matches!(r, NodeResult::Success));
Self {
uuid: result.uuid,
node_results,
success,
}
}
}
pub struct DoraRuntime {
config: RuntimeConfig,
state: Arc<RwLock<RuntimeState>>,
log_receiver: Option<flume::Receiver<LogMessage>>,
}
impl DoraRuntime {
pub fn new(config: RuntimeConfig) -> Self {
Self {
config,
state: Arc::new(RwLock::new(RuntimeState::Created)),
log_receiver: None,
}
}
pub fn embedded<P: AsRef<Path>>(dataflow_path: P) -> Self {
Self::new(RuntimeConfig::embedded(dataflow_path))
}
pub fn distributed<P: AsRef<Path>>(dataflow_path: P, coordinator_addr: SocketAddr) -> Self {
Self::new(RuntimeConfig::distributed(dataflow_path, coordinator_addr))
}
pub async fn state(&self) -> RuntimeState {
*self.state.read().await
}
pub fn config(&self) -> &RuntimeConfig {
&self.config
}
pub fn log_receiver(&self) -> Option<&flume::Receiver<LogMessage>> {
self.log_receiver.as_ref()
}
pub fn take_log_receiver(&mut self) -> Option<flume::Receiver<LogMessage>> {
self.log_receiver.take()
}
pub async fn run(&mut self) -> Result<DataflowResult> {
*self.state.write().await = RuntimeState::Running;
let result = match self.config.mode {
RuntimeMode::Embedded => self.run_embedded().await,
RuntimeMode::Distributed => self.run_distributed().await,
};
match &result {
Ok(_) => *self.state.write().await = RuntimeState::Stopped,
Err(_) => *self.state.write().await = RuntimeState::Error,
}
result
}
async fn run_embedded(&mut self) -> Result<DataflowResult> {
let dataflow_path = &self.config.dataflow_path;
info!("Running dataflow in embedded mode: {:?}", dataflow_path);
if !dataflow_path.exists() {
return Err(eyre::eyre!("Dataflow file not found: {:?}", dataflow_path));
}
let session_id = SessionId::generate();
let (log_destination, log_rx) = match self.config.embedded.log_destination {
LogDestinationType::Tracing => (LogDestination::Tracing, None),
LogDestinationType::Channel => {
let (tx, rx) = flume::bounded(100);
(LogDestination::Channel { sender: tx }, Some(rx))
}
};
self.log_receiver = log_rx;
let build_id = self.config.embedded.build_id.map(|uuid| {
BuildId::generate()
});
let result = Daemon::run_dataflow(
dataflow_path,
build_id,
None, session_id,
self.config.embedded.uv,
log_destination,
)
.await
.context("Failed to run dataflow")?;
info!("Dataflow {} completed", result.uuid);
Ok(DataflowResult::from(result))
}
async fn run_distributed(&mut self) -> Result<DataflowResult> {
let coordinator_addr = self.config.distributed.coordinator_addr;
let machine_id = self.config.distributed.machine_id.clone();
let local_listen_port = self.config.distributed.local_listen_port;
info!(
"Connecting to dora-coordinator at {} in distributed mode",
coordinator_addr
);
Daemon::run(coordinator_addr, machine_id, local_listen_port)
.await
.context("Failed to run daemon in distributed mode")?;
Ok(DataflowResult {
uuid: Uuid::nil(),
node_results: BTreeMap::new(),
success: true,
})
}
}
pub struct DoraRuntimeBuilder {
config: RuntimeConfig,
}
impl DoraRuntimeBuilder {
pub fn new<P: AsRef<Path>>(dataflow_path: P) -> Self {
Self {
config: RuntimeConfig::embedded(dataflow_path),
}
}
pub fn embedded(mut self) -> Self {
self.config.mode = RuntimeMode::Embedded;
self
}
pub fn distributed(mut self, coordinator_addr: SocketAddr) -> Self {
self.config.mode = RuntimeMode::Distributed;
self.config.distributed.coordinator_addr = coordinator_addr;
self
}
pub fn uv(mut self, uv: bool) -> Self {
self.config.embedded.uv = uv;
self
}
pub fn write_events_to<P: AsRef<Path>>(mut self, path: P) -> Self {
self.config.embedded.write_events_to = Some(path.as_ref().to_path_buf());
self
}
pub fn log_destination(mut self, dest: LogDestinationType) -> Self {
self.config.embedded.log_destination = dest;
self
}
pub fn machine_id(mut self, id: String) -> Self {
self.config.distributed.machine_id = Some(id);
self
}
pub fn local_listen_port(mut self, port: u16) -> Self {
self.config.distributed.local_listen_port = port;
self
}
pub fn build(self) -> DoraRuntime {
DoraRuntime::new(self.config)
}
}
pub async fn run_dataflow<P: AsRef<Path>>(dataflow_path: P) -> Result<DataflowResult> {
let mut runtime = DoraRuntime::embedded(dataflow_path);
runtime.run().await
}
pub async fn run_dataflow_with_logs<P: AsRef<Path>>(dataflow_path: P) -> Result<DataflowResult> {
let config =
RuntimeConfig::embedded(dataflow_path).with_log_destination(LogDestinationType::Channel);
let mut runtime = DoraRuntime::new(config);
if let Some(rx) = runtime.take_log_receiver() {
tokio::spawn(async move {
while let Ok(msg) = rx.recv_async().await {
info!("[{:?}] {}", msg.level, msg.message);
}
});
}
let result = runtime.run().await;
result
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_runtime_config_embedded() {
let config = RuntimeConfig::embedded("test.yml");
assert_eq!(config.mode, RuntimeMode::Embedded);
assert_eq!(config.dataflow_path, PathBuf::from("test.yml"));
}
#[test]
fn test_runtime_config_distributed() {
let addr: SocketAddr = "127.0.0.1:5000".parse().unwrap();
let config = RuntimeConfig::distributed("test.yml", addr);
assert_eq!(config.mode, RuntimeMode::Distributed);
assert_eq!(config.distributed.coordinator_addr, addr);
}
#[test]
fn test_runtime_builder() {
let runtime = DoraRuntimeBuilder::new("test.yml")
.embedded()
.uv(true)
.log_destination(LogDestinationType::Channel)
.build();
assert_eq!(runtime.config.mode, RuntimeMode::Embedded);
assert!(runtime.config.embedded.uv);
}
#[tokio::test]
async fn test_runtime_state() {
let runtime = DoraRuntime::embedded("test.yml");
assert_eq!(runtime.state().await, RuntimeState::Created);
}
}