Skip to main content

oxigdal_workflow/integrations/
temporal.rs

1//! Temporal.io integration.
2
3use crate::engine::WorkflowDefinition;
4use crate::error::{Result, WorkflowError};
5
6/// Temporal.io integration.
7pub struct TemporalIntegration;
8
9impl TemporalIntegration {
10    /// Export workflow to Temporal workflow format (Go).
11    pub fn export_workflow(workflow: &WorkflowDefinition) -> Result<String> {
12        let mut go_code = String::new();
13
14        // Add package and imports
15        go_code.push_str("package workflows\n\n");
16        go_code.push_str("import (\n");
17        go_code.push_str("    \"time\"\n");
18        go_code.push_str("    \"go.temporal.io/sdk/workflow\"\n");
19        go_code.push_str(")\n\n");
20
21        // Define activity interfaces
22        for (idx, _task) in workflow.dag.tasks().iter().enumerate() {
23            go_code.push_str(&format!(
24                "func Task{}Activity(ctx workflow.Context) error {{\n",
25                idx
26            ));
27            go_code.push_str("    // TODO: Implement activity logic\n");
28            go_code.push_str("    return nil\n");
29            go_code.push_str("}\n\n");
30        }
31
32        // Define workflow
33        go_code.push_str(&format!(
34            "func {}Workflow(ctx workflow.Context) error {{\n",
35            Self::to_camel_case(&workflow.id)
36        ));
37
38        for (idx, _task) in workflow.dag.tasks().iter().enumerate() {
39            go_code.push_str("    ao := workflow.ActivityOptions{\n");
40            go_code.push_str("        StartToCloseTimeout: 1 * time.Minute,\n");
41            go_code.push_str("    }\n");
42            go_code.push_str(&format!(
43                "    ctx{} := workflow.WithActivityOptions(ctx, ao)\n",
44                idx
45            ));
46            go_code.push_str(&format!(
47                "    err{} := workflow.ExecuteActivity(ctx{}, Task{}Activity).Get(ctx{}, nil)\n",
48                idx, idx, idx, idx
49            ));
50            go_code.push_str(&format!("    if err{} != nil {{\n", idx));
51            go_code.push_str(&format!("        return err{}\n", idx));
52            go_code.push_str("    }\n\n");
53        }
54
55        go_code.push_str("    return nil\n");
56        go_code.push_str("}\n");
57
58        Ok(go_code)
59    }
60
61    /// Import workflow from Temporal workflow.
62    pub fn import_workflow(_workflow_code: &str) -> Result<WorkflowDefinition> {
63        Err(WorkflowError::integration(
64            "temporal",
65            "Import from Temporal not yet implemented",
66        ))
67    }
68
69    /// Convert to CamelCase for Go naming.
70    fn to_camel_case(s: &str) -> String {
71        s.split(['-', '_'])
72            .filter(|s| !s.is_empty())
73            .enumerate()
74            .map(|(i, s)| {
75                if i == 0 {
76                    s.chars()
77                        .enumerate()
78                        .map(|(j, c)| if j == 0 { c.to_ascii_uppercase() } else { c })
79                        .collect()
80                } else {
81                    let mut chars = s.chars();
82                    match chars.next() {
83                        None => String::new(),
84                        Some(first) => first.to_uppercase().chain(chars).collect(),
85                    }
86                }
87            })
88            .collect()
89    }
90
91    /// Start a Temporal workflow via API.
92    #[cfg(feature = "integrations")]
93    pub async fn start_workflow(
94        base_url: &str,
95        namespace: &str,
96        workflow_id: &str,
97        workflow_type: &str,
98    ) -> Result<String> {
99        use reqwest::Client;
100
101        let url = format!(
102            "{}/api/v1/namespaces/{}/workflows/{}",
103            base_url, namespace, workflow_id
104        );
105        let client = Client::new();
106
107        let response = client
108            .post(&url)
109            .header("Content-Type", "application/json")
110            .json(&serde_json::json!({
111                "workflowId": workflow_id,
112                "workflowType": {
113                    "name": workflow_type
114                },
115                "input": {}
116            }))
117            .send()
118            .await
119            .map_err(|e| {
120                WorkflowError::integration("temporal", format!("Request failed: {}", e))
121            })?;
122
123        let body = response.text().await.map_err(|e| {
124            WorkflowError::integration("temporal", format!("Failed to read response: {}", e))
125        })?;
126
127        Ok(body)
128    }
129}
130
131#[cfg(test)]
132mod tests {
133    use super::*;
134    use crate::dag::WorkflowDag;
135
136    #[test]
137    fn test_export_to_temporal() {
138        let workflow = WorkflowDefinition {
139            id: "test-workflow".to_string(),
140            name: "Test Workflow".to_string(),
141            description: None,
142            version: "1.0.0".to_string(),
143            dag: WorkflowDag::new(),
144        };
145
146        let result = TemporalIntegration::export_workflow(&workflow);
147        assert!(result.is_ok());
148
149        let go_code = result.expect("Failed to export");
150        assert!(go_code.contains("package workflows"));
151        assert!(go_code.contains("go.temporal.io/sdk/workflow"));
152    }
153
154    #[test]
155    fn test_to_camel_case() {
156        assert_eq!(
157            TemporalIntegration::to_camel_case("test-workflow-id"),
158            "TestWorkflowId"
159        );
160        assert_eq!(
161            TemporalIntegration::to_camel_case("my_workflow"),
162            "MyWorkflow"
163        );
164    }
165}