oxigdal_workflow/integrations/
temporal.rs1use crate::engine::WorkflowDefinition;
4use crate::error::{Result, WorkflowError};
5
6pub struct TemporalIntegration;
8
9impl TemporalIntegration {
10 pub fn export_workflow(workflow: &WorkflowDefinition) -> Result<String> {
12 let mut go_code = String::new();
13
14 go_code.push_str("package workflows\n\n");
16 go_code.push_str("import (\n");
17 go_code.push_str(" \"time\"\n");
18 go_code.push_str(" \"go.temporal.io/sdk/workflow\"\n");
19 go_code.push_str(")\n\n");
20
21 for (idx, _task) in workflow.dag.tasks().iter().enumerate() {
23 go_code.push_str(&format!(
24 "func Task{}Activity(ctx workflow.Context) error {{\n",
25 idx
26 ));
27 go_code.push_str(" // TODO: Implement activity logic\n");
28 go_code.push_str(" return nil\n");
29 go_code.push_str("}\n\n");
30 }
31
32 go_code.push_str(&format!(
34 "func {}Workflow(ctx workflow.Context) error {{\n",
35 Self::to_camel_case(&workflow.id)
36 ));
37
38 for (idx, _task) in workflow.dag.tasks().iter().enumerate() {
39 go_code.push_str(" ao := workflow.ActivityOptions{\n");
40 go_code.push_str(" StartToCloseTimeout: 1 * time.Minute,\n");
41 go_code.push_str(" }\n");
42 go_code.push_str(&format!(
43 " ctx{} := workflow.WithActivityOptions(ctx, ao)\n",
44 idx
45 ));
46 go_code.push_str(&format!(
47 " err{} := workflow.ExecuteActivity(ctx{}, Task{}Activity).Get(ctx{}, nil)\n",
48 idx, idx, idx, idx
49 ));
50 go_code.push_str(&format!(" if err{} != nil {{\n", idx));
51 go_code.push_str(&format!(" return err{}\n", idx));
52 go_code.push_str(" }\n\n");
53 }
54
55 go_code.push_str(" return nil\n");
56 go_code.push_str("}\n");
57
58 Ok(go_code)
59 }
60
61 pub fn import_workflow(_workflow_code: &str) -> Result<WorkflowDefinition> {
63 Err(WorkflowError::integration(
64 "temporal",
65 "Import from Temporal not yet implemented",
66 ))
67 }
68
69 fn to_camel_case(s: &str) -> String {
71 s.split(['-', '_'])
72 .filter(|s| !s.is_empty())
73 .enumerate()
74 .map(|(i, s)| {
75 if i == 0 {
76 s.chars()
77 .enumerate()
78 .map(|(j, c)| if j == 0 { c.to_ascii_uppercase() } else { c })
79 .collect()
80 } else {
81 let mut chars = s.chars();
82 match chars.next() {
83 None => String::new(),
84 Some(first) => first.to_uppercase().chain(chars).collect(),
85 }
86 }
87 })
88 .collect()
89 }
90
91 #[cfg(feature = "integrations")]
93 pub async fn start_workflow(
94 base_url: &str,
95 namespace: &str,
96 workflow_id: &str,
97 workflow_type: &str,
98 ) -> Result<String> {
99 use reqwest::Client;
100
101 let url = format!(
102 "{}/api/v1/namespaces/{}/workflows/{}",
103 base_url, namespace, workflow_id
104 );
105 let client = Client::new();
106
107 let response = client
108 .post(&url)
109 .header("Content-Type", "application/json")
110 .json(&serde_json::json!({
111 "workflowId": workflow_id,
112 "workflowType": {
113 "name": workflow_type
114 },
115 "input": {}
116 }))
117 .send()
118 .await
119 .map_err(|e| {
120 WorkflowError::integration("temporal", format!("Request failed: {}", e))
121 })?;
122
123 let body = response.text().await.map_err(|e| {
124 WorkflowError::integration("temporal", format!("Failed to read response: {}", e))
125 })?;
126
127 Ok(body)
128 }
129}
130
131#[cfg(test)]
132mod tests {
133 use super::*;
134 use crate::dag::WorkflowDag;
135
136 #[test]
137 fn test_export_to_temporal() {
138 let workflow = WorkflowDefinition {
139 id: "test-workflow".to_string(),
140 name: "Test Workflow".to_string(),
141 description: None,
142 version: "1.0.0".to_string(),
143 dag: WorkflowDag::new(),
144 };
145
146 let result = TemporalIntegration::export_workflow(&workflow);
147 assert!(result.is_ok());
148
149 let go_code = result.expect("Failed to export");
150 assert!(go_code.contains("package workflows"));
151 assert!(go_code.contains("go.temporal.io/sdk/workflow"));
152 }
153
154 #[test]
155 fn test_to_camel_case() {
156 assert_eq!(
157 TemporalIntegration::to_camel_case("test-workflow-id"),
158 "TestWorkflowId"
159 );
160 assert_eq!(
161 TemporalIntegration::to_camel_case("my_workflow"),
162 "MyWorkflow"
163 );
164 }
165}