Skip to main content

oxigdal_workflow/integrations/
prefect.rs

1//! Prefect integration.
2
3use crate::engine::WorkflowDefinition;
4use crate::error::{Result, WorkflowError};
5
6/// Prefect integration.
7pub struct PrefectIntegration;
8
9impl PrefectIntegration {
10    /// Export workflow to Prefect flow format (Python).
11    pub fn export_workflow(workflow: &WorkflowDefinition) -> Result<String> {
12        let mut python_code = String::new();
13
14        // Add imports
15        python_code.push_str("from prefect import flow, task\n");
16        python_code.push_str("from datetime import timedelta\n\n");
17
18        // Define tasks
19        for (idx, _task) in workflow.dag.tasks().iter().enumerate() {
20            python_code.push_str(&format!("@task(name='task_{}')\n", idx));
21            python_code.push_str(&format!("def task_{}():\n", idx));
22            python_code.push_str("    print('Task executed')\n");
23            python_code.push_str("    return True\n\n");
24        }
25
26        // Define flow
27        python_code.push_str(&format!("@flow(name='{}')\n", workflow.name));
28        python_code.push_str(&format!("def {}():\n", Self::sanitize_id(&workflow.id)));
29
30        for (idx, _task) in workflow.dag.tasks().iter().enumerate() {
31            python_code.push_str(&format!("    result_{} = task_{}()\n", idx, idx));
32        }
33
34        python_code.push('\n');
35        python_code.push_str("if __name__ == '__main__':\n");
36        python_code.push_str(&format!("    {}()\n", Self::sanitize_id(&workflow.id)));
37
38        Ok(python_code)
39    }
40
41    /// Import workflow from Prefect flow.
42    pub fn import_workflow(_flow_code: &str) -> Result<WorkflowDefinition> {
43        Err(WorkflowError::integration(
44            "prefect",
45            "Import from Prefect not yet implemented",
46        ))
47    }
48
49    /// Sanitize ID for Prefect compatibility.
50    fn sanitize_id(id: &str) -> String {
51        id.replace(['-', ' '], "_")
52    }
53
54    /// Trigger a Prefect flow via API.
55    #[cfg(feature = "integrations")]
56    pub async fn trigger_flow(
57        base_url: &str,
58        flow_id: &str,
59        api_key: Option<&str>,
60    ) -> Result<String> {
61        use reqwest::Client;
62
63        let url = format!("{}/api/flows/{}/runs", base_url, flow_id);
64        let client = Client::new();
65
66        let mut request = client
67            .post(&url)
68            .header("Content-Type", "application/json")
69            .json(&serde_json::json!({
70                "parameters": {}
71            }));
72
73        if let Some(key) = api_key {
74            request = request.bearer_auth(key);
75        }
76
77        let response = request
78            .send()
79            .await
80            .map_err(|e| WorkflowError::integration("prefect", format!("Request failed: {}", e)))?;
81
82        let body = response.text().await.map_err(|e| {
83            WorkflowError::integration("prefect", format!("Failed to read response: {}", e))
84        })?;
85
86        Ok(body)
87    }
88}
89
90#[cfg(test)]
91mod tests {
92    use super::*;
93    use crate::dag::WorkflowDag;
94
95    #[test]
96    fn test_export_to_prefect() {
97        let workflow = WorkflowDefinition {
98            id: "test-workflow".to_string(),
99            name: "Test Workflow".to_string(),
100            description: None,
101            version: "1.0.0".to_string(),
102            dag: WorkflowDag::new(),
103        };
104
105        let result = PrefectIntegration::export_workflow(&workflow);
106        assert!(result.is_ok());
107
108        let python_code = result.expect("Failed to export");
109        assert!(python_code.contains("from prefect import flow, task"));
110        assert!(python_code.contains("@flow"));
111    }
112
113    #[test]
114    fn test_sanitize_id() {
115        assert_eq!(
116            PrefectIntegration::sanitize_id("test-workflow-id"),
117            "test_workflow_id"
118        );
119    }
120}