Skip to main content

oxigdal_workflow/integrations/
airflow.rs

1//! Apache Airflow integration.
2
3use crate::engine::WorkflowDefinition;
4use crate::error::{Result, WorkflowError};
5
6/// Apache Airflow integration.
7pub struct AirflowIntegration;
8
9impl AirflowIntegration {
10    /// Export workflow to Airflow DAG format (Python).
11    pub fn export_workflow(workflow: &WorkflowDefinition) -> Result<String> {
12        let mut python_code = String::new();
13
14        // Add imports
15        python_code.push_str("from airflow import DAG\n");
16        python_code.push_str("from airflow.operators.python import PythonOperator\n");
17        python_code.push_str("from datetime import datetime, timedelta\n\n");
18
19        // Define default args
20        python_code.push_str("default_args = {\n");
21        python_code.push_str("    'owner': 'oxigdal',\n");
22        python_code.push_str("    'depends_on_past': False,\n");
23        python_code.push_str("    'retries': 1,\n");
24        python_code.push_str("    'retry_delay': timedelta(minutes=5),\n");
25        python_code.push_str("}\n\n");
26
27        // Define DAG
28        python_code.push_str(&format!(
29            "dag = DAG(\n    '{}',\n    default_args=default_args,\n",
30            Self::sanitize_id(&workflow.id)
31        ));
32        python_code.push_str(&format!(
33            "    description='{}',\n",
34            workflow.description.as_deref().unwrap_or("")
35        ));
36        python_code.push_str("    schedule_interval=None,\n");
37        python_code.push_str("    start_date=datetime(2024, 1, 1),\n");
38        python_code.push_str(")\n\n");
39
40        // Define tasks
41        for (idx, _task) in workflow.dag.tasks().iter().enumerate() {
42            python_code.push_str(&format!("task{} = PythonOperator(\n", idx));
43            python_code.push_str(&format!("    task_id='task_{}',\n", idx));
44            python_code.push_str("    python_callable=lambda: print('Task executed'),\n");
45            python_code.push_str("    dag=dag,\n");
46            python_code.push_str(")\n\n");
47        }
48
49        // Define dependencies
50        if !workflow.dag.dependency_count() == 0 {
51            python_code.push_str("# Define task dependencies\n");
52            // Dependencies would be mapped here
53        }
54
55        Ok(python_code)
56    }
57
58    /// Import workflow from Airflow DAG.
59    pub fn import_workflow(_dag_code: &str) -> Result<WorkflowDefinition> {
60        Err(WorkflowError::integration(
61            "airflow",
62            "Import from Airflow not yet implemented",
63        ))
64    }
65
66    /// Sanitize ID for Airflow compatibility.
67    fn sanitize_id(id: &str) -> String {
68        id.replace(['-', ' '], "_")
69    }
70
71    /// Trigger an Airflow DAG via REST API.
72    #[cfg(feature = "integrations")]
73    pub async fn trigger_dag(
74        base_url: &str,
75        dag_id: &str,
76        api_key: Option<&str>,
77    ) -> Result<String> {
78        use reqwest::Client;
79
80        let url = format!("{}/api/v1/dags/{}/dagRuns", base_url, dag_id);
81        let client = Client::new();
82
83        let mut request = client
84            .post(&url)
85            .header("Content-Type", "application/json")
86            .json(&serde_json::json!({
87                "conf": {}
88            }));
89
90        if let Some(key) = api_key {
91            request = request.bearer_auth(key);
92        }
93
94        let response = request
95            .send()
96            .await
97            .map_err(|e| WorkflowError::integration("airflow", format!("Request failed: {}", e)))?;
98
99        let body = response.text().await.map_err(|e| {
100            WorkflowError::integration("airflow", format!("Failed to read response: {}", e))
101        })?;
102
103        Ok(body)
104    }
105}
106
107#[cfg(test)]
108mod tests {
109    use super::*;
110    use crate::dag::WorkflowDag;
111
112    #[test]
113    fn test_export_to_airflow() {
114        let workflow = WorkflowDefinition {
115            id: "test-workflow".to_string(),
116            name: "Test Workflow".to_string(),
117            description: Some("Test description".to_string()),
118            version: "1.0.0".to_string(),
119            dag: WorkflowDag::new(),
120        };
121
122        let result = AirflowIntegration::export_workflow(&workflow);
123        assert!(result.is_ok());
124
125        let python_code = result.expect("Failed to export");
126        assert!(python_code.contains("from airflow import DAG"));
127        assert!(python_code.contains("test_workflow"));
128    }
129
130    #[test]
131    fn test_sanitize_id() {
132        assert_eq!(
133            AirflowIntegration::sanitize_id("test-workflow-id"),
134            "test_workflow_id"
135        );
136    }
137}