oxigdal_workflow/integrations/
airflow.rs1use crate::engine::WorkflowDefinition;
4use crate::error::{Result, WorkflowError};
5
6pub struct AirflowIntegration;
8
9impl AirflowIntegration {
10 pub fn export_workflow(workflow: &WorkflowDefinition) -> Result<String> {
12 let mut python_code = String::new();
13
14 python_code.push_str("from airflow import DAG\n");
16 python_code.push_str("from airflow.operators.python import PythonOperator\n");
17 python_code.push_str("from datetime import datetime, timedelta\n\n");
18
19 python_code.push_str("default_args = {\n");
21 python_code.push_str(" 'owner': 'oxigdal',\n");
22 python_code.push_str(" 'depends_on_past': False,\n");
23 python_code.push_str(" 'retries': 1,\n");
24 python_code.push_str(" 'retry_delay': timedelta(minutes=5),\n");
25 python_code.push_str("}\n\n");
26
27 python_code.push_str(&format!(
29 "dag = DAG(\n '{}',\n default_args=default_args,\n",
30 Self::sanitize_id(&workflow.id)
31 ));
32 python_code.push_str(&format!(
33 " description='{}',\n",
34 workflow.description.as_deref().unwrap_or("")
35 ));
36 python_code.push_str(" schedule_interval=None,\n");
37 python_code.push_str(" start_date=datetime(2024, 1, 1),\n");
38 python_code.push_str(")\n\n");
39
40 for (idx, _task) in workflow.dag.tasks().iter().enumerate() {
42 python_code.push_str(&format!("task{} = PythonOperator(\n", idx));
43 python_code.push_str(&format!(" task_id='task_{}',\n", idx));
44 python_code.push_str(" python_callable=lambda: print('Task executed'),\n");
45 python_code.push_str(" dag=dag,\n");
46 python_code.push_str(")\n\n");
47 }
48
49 if !workflow.dag.dependency_count() == 0 {
51 python_code.push_str("# Define task dependencies\n");
52 }
54
55 Ok(python_code)
56 }
57
58 pub fn import_workflow(_dag_code: &str) -> Result<WorkflowDefinition> {
60 Err(WorkflowError::integration(
61 "airflow",
62 "Import from Airflow not yet implemented",
63 ))
64 }
65
66 fn sanitize_id(id: &str) -> String {
68 id.replace(['-', ' '], "_")
69 }
70
71 #[cfg(feature = "integrations")]
73 pub async fn trigger_dag(
74 base_url: &str,
75 dag_id: &str,
76 api_key: Option<&str>,
77 ) -> Result<String> {
78 use reqwest::Client;
79
80 let url = format!("{}/api/v1/dags/{}/dagRuns", base_url, dag_id);
81 let client = Client::new();
82
83 let mut request = client
84 .post(&url)
85 .header("Content-Type", "application/json")
86 .json(&serde_json::json!({
87 "conf": {}
88 }));
89
90 if let Some(key) = api_key {
91 request = request.bearer_auth(key);
92 }
93
94 let response = request
95 .send()
96 .await
97 .map_err(|e| WorkflowError::integration("airflow", format!("Request failed: {}", e)))?;
98
99 let body = response.text().await.map_err(|e| {
100 WorkflowError::integration("airflow", format!("Failed to read response: {}", e))
101 })?;
102
103 Ok(body)
104 }
105}
106
107#[cfg(test)]
108mod tests {
109 use super::*;
110 use crate::dag::WorkflowDag;
111
112 #[test]
113 fn test_export_to_airflow() {
114 let workflow = WorkflowDefinition {
115 id: "test-workflow".to_string(),
116 name: "Test Workflow".to_string(),
117 description: Some("Test description".to_string()),
118 version: "1.0.0".to_string(),
119 dag: WorkflowDag::new(),
120 };
121
122 let result = AirflowIntegration::export_workflow(&workflow);
123 assert!(result.is_ok());
124
125 let python_code = result.expect("Failed to export");
126 assert!(python_code.contains("from airflow import DAG"));
127 assert!(python_code.contains("test_workflow"));
128 }
129
130 #[test]
131 fn test_sanitize_id() {
132 assert_eq!(
133 AirflowIntegration::sanitize_id("test-workflow-id"),
134 "test_workflow_id"
135 );
136 }
137}