use crate::engine::WorkflowDefinition;
use crate::error::{Result, WorkflowError};
pub struct PrefectIntegration;
impl PrefectIntegration {
pub fn export_workflow(workflow: &WorkflowDefinition) -> Result<String> {
let mut python_code = String::new();
python_code.push_str("from prefect import flow, task\n");
python_code.push_str("from datetime import timedelta\n\n");
for (idx, _task) in workflow.dag.tasks().iter().enumerate() {
python_code.push_str(&format!("@task(name='task_{}')\n", idx));
python_code.push_str(&format!("def task_{}():\n", idx));
python_code.push_str(" print('Task executed')\n");
python_code.push_str(" return True\n\n");
}
python_code.push_str(&format!("@flow(name='{}')\n", workflow.name));
python_code.push_str(&format!("def {}():\n", Self::sanitize_id(&workflow.id)));
for (idx, _task) in workflow.dag.tasks().iter().enumerate() {
python_code.push_str(&format!(" result_{} = task_{}()\n", idx, idx));
}
python_code.push('\n');
python_code.push_str("if __name__ == '__main__':\n");
python_code.push_str(&format!(" {}()\n", Self::sanitize_id(&workflow.id)));
Ok(python_code)
}
pub fn import_workflow(_flow_code: &str) -> Result<WorkflowDefinition> {
Err(WorkflowError::integration(
"prefect",
"Import from Prefect not yet implemented",
))
}
fn sanitize_id(id: &str) -> String {
id.replace(['-', ' '], "_")
}
#[cfg(feature = "integrations")]
pub async fn trigger_flow(
base_url: &str,
flow_id: &str,
api_key: Option<&str>,
) -> Result<String> {
use reqwest::Client;
let url = format!("{}/api/flows/{}/runs", base_url, flow_id);
let client = Client::new();
let mut request = client
.post(&url)
.header("Content-Type", "application/json")
.json(&serde_json::json!({
"parameters": {}
}));
if let Some(key) = api_key {
request = request.bearer_auth(key);
}
let response = request
.send()
.await
.map_err(|e| WorkflowError::integration("prefect", format!("Request failed: {}", e)))?;
let body = response.text().await.map_err(|e| {
WorkflowError::integration("prefect", format!("Failed to read response: {}", e))
})?;
Ok(body)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::dag::WorkflowDag;
#[test]
fn test_export_to_prefect() {
let workflow = WorkflowDefinition {
id: "test-workflow".to_string(),
name: "Test Workflow".to_string(),
description: None,
version: "1.0.0".to_string(),
dag: WorkflowDag::new(),
};
let result = PrefectIntegration::export_workflow(&workflow);
assert!(result.is_ok());
let python_code = result.expect("Failed to export");
assert!(python_code.contains("from prefect import flow, task"));
assert!(python_code.contains("@flow"));
}
#[test]
fn test_sanitize_id() {
assert_eq!(
PrefectIntegration::sanitize_id("test-workflow-id"),
"test_workflow_id"
);
}
}