use std::sync::{Arc, Mutex};
use crate::airflow::{client::AirFlowClient, model::dag::Dag};
use super::model::popup::taskinstances::mark::MarkState as taskMarkState;
use super::{model::popup::dagruns::mark::MarkState, state::App};
use anyhow::Result;
use futures::future::join_all;
use log::debug;
use tokio::sync::mpsc::Receiver;
use url::{form_urlencoded, Url};
pub struct Worker {
app: Arc<Mutex<App>>,
client: Option<AirFlowClient>,
rx_worker: Receiver<WorkerMessage>,
}
#[derive(Debug)]
pub enum WorkerMessage {
ConfigSelected(usize),
UpdateDags,
ToggleDag {
dag_id: String,
is_paused: bool,
},
UpdateDagRuns {
dag_id: String,
clear: bool,
},
UpdateTaskInstances {
dag_id: String,
dag_run_id: String,
clear: bool,
},
GetDagCode {
dag_id: String,
},
UpdateDagStats {
clear: bool,
},
ClearDagRun {
dag_run_id: String,
dag_id: String,
},
UpdateTaskLogs {
dag_id: String,
dag_run_id: String,
task_id: String,
task_try: u16,
clear: bool,
},
MarkDagRun {
dag_run_id: String,
dag_id: String,
status: MarkState,
},
ClearTaskInstance {
task_id: String,
dag_id: String,
dag_run_id: String,
},
MarkTaskInstance {
task_id: String,
dag_id: String,
dag_run_id: String,
status: taskMarkState,
},
TriggerDagRun {
dag_id: String,
},
OpenItem(OpenItem),
}
#[derive(Debug)]
pub enum OpenItem {
Config(String),
Dag {
dag_id: String,
},
DagRun {
dag_id: String,
dag_run_id: String,
},
TaskInstance {
dag_id: String,
dag_run_id: String,
task_id: String,
},
Log {
dag_id: String,
dag_run_id: String,
task_id: String,
#[allow(dead_code)]
task_try: u16,
},
}
impl Worker {
pub fn new(
app: Arc<Mutex<App>>,
client: Option<AirFlowClient>,
rx_worker: Receiver<WorkerMessage>,
) -> Self {
Worker {
app,
client,
rx_worker,
}
}
pub async fn process_message(&mut self, message: WorkerMessage) -> Result<()> {
if self.client.is_none() {
if let WorkerMessage::ConfigSelected(idx) = message {
self.switch_airflow_client(idx);
};
return Ok(());
}
let client = self.client.as_ref().unwrap();
match message {
WorkerMessage::UpdateDags => {
let dag_list = client.list_dags().await;
let mut app = self.app.lock().unwrap();
match dag_list {
Ok(dag_list) => {
app.dags.all = dag_list.dags;
app.dags.filter_dags();
}
Err(e) => app.dags.errors.push(e),
}
}
WorkerMessage::ToggleDag { dag_id, is_paused } => {
let dag = client.toggle_dag(&dag_id, is_paused).await;
if let Err(e) = dag {
let mut app = self.app.lock().unwrap();
app.dags.errors.push(e);
}
}
WorkerMessage::ConfigSelected(idx) => {
self.switch_airflow_client(idx);
}
WorkerMessage::UpdateDagRuns { dag_id, clear } => {
let dag_runs = client.list_dagruns(&dag_id).await;
let mut app = self.app.lock().unwrap();
if clear {
app.dagruns.dag_id = Some(dag_id);
}
match dag_runs {
Ok(dag_runs) => {
app.dagruns.all = dag_runs.dag_runs;
app.dagruns.filter_dag_runs();
}
Err(e) => app.dagruns.errors.push(e),
}
}
WorkerMessage::UpdateTaskInstances {
dag_id,
dag_run_id,
clear,
} => {
let task_instances = client.list_task_instances(&dag_id, &dag_run_id).await;
let mut app = self.app.lock().unwrap();
if clear {
app.task_instances.dag_run_id = Some(dag_run_id);
app.task_instances.dag_id = Some(dag_id);
}
match task_instances {
Ok(task_instances) => {
app.task_instances.all = task_instances.task_instances;
app.task_instances.filter_task_instances();
}
Err(e) => {
log::error!("Error getting task instances: {:?}", e);
app.task_instances.errors.push(e);
}
}
}
WorkerMessage::GetDagCode { dag_id } => {
let current_dag: Dag;
{
let app = self.app.lock().unwrap();
current_dag = app
.dags
.get_dag_by_id(&dag_id)
.expect("Dag not found")
.clone();
}
let dag_code = client.get_dag_code(¤t_dag.file_token).await;
let mut app = self.app.lock().unwrap();
match dag_code {
Ok(dag_code) => {
app.dagruns.dag_code.code = Some(dag_code);
}
Err(e) => app.dags.errors.push(e),
}
}
WorkerMessage::UpdateDagStats { clear } => {
let dag_ids = {
let app = self.app.lock().unwrap();
let dag_ids = app
.dags
.all
.iter()
.map(|dag| dag.dag_id.clone())
.collect::<Vec<_>>();
dag_ids
};
let dag_ids_str: Vec<&str> = dag_ids.iter().map(|s| s.as_str()).collect();
let dag_stats = client.get_dag_stats(dag_ids_str).await;
let mut app = self.app.lock().unwrap();
if clear {
app.dags.dag_stats = Default::default();
}
match dag_stats {
Ok(dag_stats) => {
for dag_stats in dag_stats.dags {
app.dags.dag_stats.insert(dag_stats.dag_id, dag_stats.stats);
}
}
Err(e) => app.dags.errors.push(e),
}
}
WorkerMessage::ClearDagRun { dag_run_id, dag_id } => {
debug!("Clearing dag_run: {}", dag_run_id);
let dag_run = client.clear_dagrun(&dag_id, &dag_run_id).await;
if let Err(e) = dag_run {
debug!("Error clearing dag_run: {}", e);
let mut app = self.app.lock().unwrap();
app.dagruns.errors.push(e);
}
}
WorkerMessage::UpdateTaskLogs {
dag_id,
dag_run_id,
task_id,
task_try,
clear,
} => {
debug!("Getting logs for task: {task_id}, try number {task_try}");
let logs = join_all(
(1..=task_try)
.map(|i| client.get_task_logs(&dag_id, &dag_run_id, &task_id, i))
.collect::<Vec<_>>(),
)
.await;
if clear {
let mut app = self.app.lock().unwrap();
app.logs.dag_id = Some(dag_id);
app.logs.dag_run_id = Some(dag_run_id);
app.logs.task_id = Some(task_id);
}
let mut app = self.app.lock().unwrap();
app.logs.all.clear();
for log in logs {
match log {
Ok(log) => {
app.logs.all.push(log);
}
Err(e) => {
debug!("Error getting logs: {}", e);
app.logs.errors.push(e);
}
}
}
}
WorkerMessage::MarkDagRun {
dag_run_id,
dag_id,
status,
} => {
debug!("Marking dag_run: {}", dag_run_id);
{
let mut app = self.app.lock().unwrap();
app.dagruns.mark_dag_run(&dag_run_id, &status.to_string());
}
let dag_run = client
.mark_dag_run(&dag_id, &dag_run_id, &status.to_string())
.await;
if let Err(e) = dag_run {
debug!("Error marking dag_run: {}", e);
let mut app = self.app.lock().unwrap();
app.dagruns.errors.push(e);
}
}
WorkerMessage::ClearTaskInstance {
task_id,
dag_id,
dag_run_id,
} => {
debug!("Clearing task_instance: {}", task_id);
let task_instance = client
.clear_task_instance(&dag_id, &dag_run_id, &task_id)
.await;
if let Err(e) = task_instance {
debug!("Error clearing task_instance: {}", e);
let mut app = self.app.lock().unwrap();
app.task_instances.errors.push(e);
}
}
WorkerMessage::MarkTaskInstance {
task_id,
dag_id,
dag_run_id,
status,
} => {
debug!("Marking task_instance: {}", task_id);
{
let mut app = self.app.lock().unwrap();
app.task_instances
.mark_task_instance(&task_id, &status.to_string());
}
let task_instance = client
.mark_task_instance(&dag_id, &dag_run_id, &task_id, &status.to_string())
.await;
if let Err(e) = task_instance {
debug!("Error marking task_instance: {}", e);
let mut app = self.app.lock().unwrap();
app.task_instances.errors.push(e);
}
}
WorkerMessage::TriggerDagRun { dag_id } => {
debug!("Triggering dag_run: {}", dag_id);
let dag_run = client.trigger_dag_run(&dag_id).await;
if let Err(e) = dag_run {
debug!("Error triggering dag_run: {}", e);
let mut app = self.app.lock().unwrap();
app.dagruns.errors.push(e);
}
}
WorkerMessage::OpenItem(item) => {
let mut base_url = Url::parse(&client.config.endpoint)?;
match item {
OpenItem::Config(config_endpoint) => {
base_url = config_endpoint.parse()?;
}
OpenItem::Dag { dag_id } => {
base_url = base_url.join(&format!("dags/{dag_id}"))?;
}
OpenItem::DagRun { dag_id, dag_run_id } => {
base_url = base_url.join(&format!("dags/{dag_id}/grid"))?;
let escaped_dag_run_id: String =
form_urlencoded::byte_serialize(dag_run_id.as_bytes()).collect();
base_url.set_query(Some(&format!("dag_run_id={escaped_dag_run_id}")));
}
OpenItem::TaskInstance {
dag_id,
dag_run_id,
task_id,
} => {
base_url = base_url.join(&format!("dags/{dag_id}/grid"))?;
let escaped_dag_run_id: String =
form_urlencoded::byte_serialize(dag_run_id.as_bytes()).collect();
base_url.set_query(Some(&format!(
"dag_run_id={escaped_dag_run_id}&task_id={task_id}"
)));
}
OpenItem::Log {
dag_id,
dag_run_id,
task_id,
task_try: _,
} => {
base_url = base_url.join(&format!("/dags/{dag_id}/grid"))?;
let escaped_dag_run_id: String =
form_urlencoded::byte_serialize(dag_run_id.as_bytes()).collect();
base_url.set_query(Some(&format!(
"dag_run_id={escaped_dag_run_id}&task_id={task_id}&tab=logs"
)));
}
}
webbrowser::open(base_url.as_str()).unwrap();
}
}
Ok(())
}
pub fn switch_airflow_client(&mut self, idx: usize) {
let selected_config = self.app.lock().unwrap().configs.filtered.items[idx].clone();
self.client = Some(AirFlowClient::from(&selected_config));
let mut app = self.app.lock().unwrap();
app.config.active_server = Some(selected_config.name.clone());
app.clear_state();
}
pub async fn run(&mut self) -> Result<()> {
loop {
if let Some(message) = self.rx_worker.recv().await {
self.process_message(message).await?;
}
}
}
}