oxigdal_workflow/integrations/
prefect.rs1use crate::engine::WorkflowDefinition;
4use crate::error::{Result, WorkflowError};
5
6pub struct PrefectIntegration;
8
9impl PrefectIntegration {
10 pub fn export_workflow(workflow: &WorkflowDefinition) -> Result<String> {
12 let mut python_code = String::new();
13
14 python_code.push_str("from prefect import flow, task\n");
16 python_code.push_str("from datetime import timedelta\n\n");
17
18 for (idx, _task) in workflow.dag.tasks().iter().enumerate() {
20 python_code.push_str(&format!("@task(name='task_{}')\n", idx));
21 python_code.push_str(&format!("def task_{}():\n", idx));
22 python_code.push_str(" print('Task executed')\n");
23 python_code.push_str(" return True\n\n");
24 }
25
26 python_code.push_str(&format!("@flow(name='{}')\n", workflow.name));
28 python_code.push_str(&format!("def {}():\n", Self::sanitize_id(&workflow.id)));
29
30 for (idx, _task) in workflow.dag.tasks().iter().enumerate() {
31 python_code.push_str(&format!(" result_{} = task_{}()\n", idx, idx));
32 }
33
34 python_code.push('\n');
35 python_code.push_str("if __name__ == '__main__':\n");
36 python_code.push_str(&format!(" {}()\n", Self::sanitize_id(&workflow.id)));
37
38 Ok(python_code)
39 }
40
41 pub fn import_workflow(_flow_code: &str) -> Result<WorkflowDefinition> {
43 Err(WorkflowError::integration(
44 "prefect",
45 "Import from Prefect not yet implemented",
46 ))
47 }
48
49 fn sanitize_id(id: &str) -> String {
51 id.replace(['-', ' '], "_")
52 }
53
54 #[cfg(feature = "integrations")]
56 pub async fn trigger_flow(
57 base_url: &str,
58 flow_id: &str,
59 api_key: Option<&str>,
60 ) -> Result<String> {
61 use reqwest::Client;
62
63 let url = format!("{}/api/flows/{}/runs", base_url, flow_id);
64 let client = Client::new();
65
66 let mut request = client
67 .post(&url)
68 .header("Content-Type", "application/json")
69 .json(&serde_json::json!({
70 "parameters": {}
71 }));
72
73 if let Some(key) = api_key {
74 request = request.bearer_auth(key);
75 }
76
77 let response = request
78 .send()
79 .await
80 .map_err(|e| WorkflowError::integration("prefect", format!("Request failed: {}", e)))?;
81
82 let body = response.text().await.map_err(|e| {
83 WorkflowError::integration("prefect", format!("Failed to read response: {}", e))
84 })?;
85
86 Ok(body)
87 }
88}
89
90#[cfg(test)]
91mod tests {
92 use super::*;
93 use crate::dag::WorkflowDag;
94
95 #[test]
96 fn test_export_to_prefect() {
97 let workflow = WorkflowDefinition {
98 id: "test-workflow".to_string(),
99 name: "Test Workflow".to_string(),
100 description: None,
101 version: "1.0.0".to_string(),
102 dag: WorkflowDag::new(),
103 };
104
105 let result = PrefectIntegration::export_workflow(&workflow);
106 assert!(result.is_ok());
107
108 let python_code = result.expect("Failed to export");
109 assert!(python_code.contains("from prefect import flow, task"));
110 assert!(python_code.contains("@flow"));
111 }
112
113 #[test]
114 fn test_sanitize_id() {
115 assert_eq!(
116 PrefectIntegration::sanitize_id("test-workflow-id"),
117 "test_workflow_id"
118 );
119 }
120}