oxigdal_workflow/integrations/
mod.rs1#[cfg(feature = "integrations")]
11pub mod airflow;
12#[cfg(feature = "integrations")]
13pub mod prefect;
14#[cfg(feature = "integrations")]
15pub mod temporal;
16
17use crate::engine::WorkflowDefinition;
18use crate::error::{Result, WorkflowError};
19use serde::{Deserialize, Serialize};
20use std::collections::HashMap;
21
22#[cfg(feature = "integrations")]
23pub use airflow::AirflowIntegration;
24#[cfg(feature = "integrations")]
25pub use prefect::PrefectIntegration;
26#[cfg(feature = "integrations")]
27pub use temporal::TemporalIntegration;
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
31pub enum IntegrationType {
32 Airflow,
34 Prefect,
36 Temporal,
38 Webhook,
40 Kafka,
42 RabbitMq,
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct IntegrationConfig {
49 pub integration_type: IntegrationType,
51 pub endpoint: String,
53 pub auth: Option<AuthConfig>,
55 pub extra_config: HashMap<String, String>,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
61pub enum AuthConfig {
62 ApiKey {
64 key: String,
66 },
67 Basic {
69 username: String,
71 password: String,
73 },
74 OAuth2 {
76 token: String,
78 },
79 None,
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct WebhookConfig {
86 pub url: String,
88 pub method: HttpMethod,
90 pub headers: HashMap<String, String>,
92 pub auth: Option<AuthConfig>,
94 pub retry: Option<RetryConfig>,
96}
97
98#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
100pub enum HttpMethod {
101 Get,
103 Post,
105 Put,
107 Delete,
109 Patch,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct RetryConfig {
116 pub max_retries: usize,
118 pub initial_delay_ms: u64,
120 pub max_delay_ms: u64,
122 pub backoff_multiplier: f64,
124}
125
126impl Default for RetryConfig {
127 fn default() -> Self {
128 Self {
129 max_retries: 3,
130 initial_delay_ms: 1000,
131 max_delay_ms: 30000,
132 backoff_multiplier: 2.0,
133 }
134 }
135}
136
137pub struct IntegrationManager {
139 configs: HashMap<String, IntegrationConfig>,
140}
141
142impl IntegrationManager {
143 pub fn new() -> Self {
145 Self {
146 configs: HashMap::new(),
147 }
148 }
149
150 pub fn register(&mut self, name: String, config: IntegrationConfig) {
152 self.configs.insert(name, config);
153 }
154
155 pub fn get(&self, name: &str) -> Option<&IntegrationConfig> {
157 self.configs.get(name)
158 }
159
160 pub fn remove(&mut self, name: &str) -> Option<IntegrationConfig> {
162 self.configs.remove(name)
163 }
164
165 pub fn list(&self) -> Vec<String> {
167 self.configs.keys().cloned().collect()
168 }
169
170 pub fn export_workflow(
172 &self,
173 workflow: &WorkflowDefinition,
174 integration_type: IntegrationType,
175 ) -> Result<String> {
176 match integration_type {
177 #[cfg(feature = "integrations")]
178 IntegrationType::Airflow => AirflowIntegration::export_workflow(workflow),
179 #[cfg(feature = "integrations")]
180 IntegrationType::Prefect => PrefectIntegration::export_workflow(workflow),
181 #[cfg(feature = "integrations")]
182 IntegrationType::Temporal => TemporalIntegration::export_workflow(workflow),
183 _ => Err(WorkflowError::integration(
184 integration_type.as_str(),
185 "Export not implemented for this integration type",
186 )),
187 }
188 }
189
190 #[cfg(feature = "integrations")]
192 pub async fn trigger_webhook(
193 &self,
194 config: &WebhookConfig,
195 payload: &serde_json::Value,
196 ) -> Result<String> {
197 use reqwest::Client;
198
199 let client = Client::new();
200 let mut request = match config.method {
201 HttpMethod::Get => client.get(&config.url),
202 HttpMethod::Post => client.post(&config.url),
203 HttpMethod::Put => client.put(&config.url),
204 HttpMethod::Delete => client.delete(&config.url),
205 HttpMethod::Patch => client.patch(&config.url),
206 };
207
208 for (key, value) in &config.headers {
210 request = request.header(key, value);
211 }
212
213 if let Some(auth) = &config.auth {
215 request = match auth {
216 AuthConfig::ApiKey { key } => request.header("X-API-Key", key),
217 AuthConfig::Basic { username, password } => {
218 request.basic_auth(username, Some(password))
219 }
220 AuthConfig::OAuth2 { token } => request.bearer_auth(token),
221 AuthConfig::None => request,
222 };
223 }
224
225 let response =
227 request.json(payload).send().await.map_err(|e| {
228 WorkflowError::integration("webhook", format!("Request failed: {}", e))
229 })?;
230
231 let status = response.status();
232 let body = response.text().await.map_err(|e| {
233 WorkflowError::integration("webhook", format!("Failed to read response: {}", e))
234 })?;
235
236 if !status.is_success() {
237 return Err(WorkflowError::integration(
238 "webhook",
239 format!("Request failed with status {}: {}", status, body),
240 ));
241 }
242
243 Ok(body)
244 }
245}
246
247impl Default for IntegrationManager {
248 fn default() -> Self {
249 Self::new()
250 }
251}
252
253impl IntegrationType {
254 pub fn as_str(&self) -> &'static str {
256 match self {
257 Self::Airflow => "airflow",
258 Self::Prefect => "prefect",
259 Self::Temporal => "temporal",
260 Self::Webhook => "webhook",
261 Self::Kafka => "kafka",
262 Self::RabbitMq => "rabbitmq",
263 }
264 }
265}
266
267#[cfg(test)]
268mod tests {
269 use super::*;
270
271 #[test]
272 fn test_integration_manager() {
273 let mut manager = IntegrationManager::new();
274
275 let config = IntegrationConfig {
276 integration_type: IntegrationType::Webhook,
277 endpoint: "https://example.com/webhook".to_string(),
278 auth: Some(AuthConfig::ApiKey {
279 key: "test-key".to_string(),
280 }),
281 extra_config: HashMap::new(),
282 };
283
284 manager.register("test-integration".to_string(), config);
285
286 assert!(manager.get("test-integration").is_some());
287 assert_eq!(manager.list().len(), 1);
288 }
289
290 #[test]
291 fn test_integration_type_str() {
292 assert_eq!(IntegrationType::Airflow.as_str(), "airflow");
293 assert_eq!(IntegrationType::Webhook.as_str(), "webhook");
294 }
295
296 #[test]
297 fn test_retry_config_default() {
298 let config = RetryConfig::default();
299 assert_eq!(config.max_retries, 3);
300 assert_eq!(config.initial_delay_ms, 1000);
301 }
302}