Skip to main content

oxigdal_workflow/integrations/
mod.rs

1//! External workflow system integrations.
2//!
3//! Provides integration with popular workflow orchestration platforms:
4//! - Apache Airflow
5//! - Prefect
6//! - Temporal.io
7//! - Webhooks
8//! - Message queues (Kafka, RabbitMQ)
9
10#[cfg(feature = "integrations")]
11pub mod airflow;
12#[cfg(feature = "integrations")]
13pub mod prefect;
14#[cfg(feature = "integrations")]
15pub mod temporal;
16
17use crate::engine::WorkflowDefinition;
18use crate::error::{Result, WorkflowError};
19use serde::{Deserialize, Serialize};
20use std::collections::HashMap;
21
22#[cfg(feature = "integrations")]
23pub use airflow::AirflowIntegration;
24#[cfg(feature = "integrations")]
25pub use prefect::PrefectIntegration;
26#[cfg(feature = "integrations")]
27pub use temporal::TemporalIntegration;
28
29/// Integration type enumeration.
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
31pub enum IntegrationType {
32    /// Apache Airflow.
33    Airflow,
34    /// Prefect.
35    Prefect,
36    /// Temporal.io.
37    Temporal,
38    /// Webhook.
39    Webhook,
40    /// Kafka message queue.
41    Kafka,
42    /// RabbitMQ message queue.
43    RabbitMq,
44}
45
46/// Integration configuration.
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct IntegrationConfig {
49    /// Integration type.
50    pub integration_type: IntegrationType,
51    /// Endpoint URL.
52    pub endpoint: String,
53    /// Authentication credentials.
54    pub auth: Option<AuthConfig>,
55    /// Additional configuration.
56    pub extra_config: HashMap<String, String>,
57}
58
59/// Authentication configuration.
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub enum AuthConfig {
62    /// API key authentication.
63    ApiKey {
64        /// API key.
65        key: String,
66    },
67    /// Basic authentication.
68    Basic {
69        /// Username.
70        username: String,
71        /// Password.
72        password: String,
73    },
74    /// OAuth2 authentication.
75    OAuth2 {
76        /// Access token.
77        token: String,
78    },
79    /// None (no authentication).
80    None,
81}
82
83/// Webhook configuration.
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct WebhookConfig {
86    /// Webhook URL.
87    pub url: String,
88    /// HTTP method.
89    pub method: HttpMethod,
90    /// Headers to include.
91    pub headers: HashMap<String, String>,
92    /// Authentication.
93    pub auth: Option<AuthConfig>,
94    /// Retry configuration.
95    pub retry: Option<RetryConfig>,
96}
97
98/// HTTP method enumeration.
99#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
100pub enum HttpMethod {
101    /// GET method.
102    Get,
103    /// POST method.
104    Post,
105    /// PUT method.
106    Put,
107    /// DELETE method.
108    Delete,
109    /// PATCH method.
110    Patch,
111}
112
113/// Retry configuration.
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct RetryConfig {
116    /// Maximum number of retries.
117    pub max_retries: usize,
118    /// Initial retry delay in milliseconds.
119    pub initial_delay_ms: u64,
120    /// Maximum retry delay in milliseconds.
121    pub max_delay_ms: u64,
122    /// Backoff multiplier.
123    pub backoff_multiplier: f64,
124}
125
126impl Default for RetryConfig {
127    fn default() -> Self {
128        Self {
129            max_retries: 3,
130            initial_delay_ms: 1000,
131            max_delay_ms: 30000,
132            backoff_multiplier: 2.0,
133        }
134    }
135}
136
137/// Integration manager for external systems.
138pub struct IntegrationManager {
139    configs: HashMap<String, IntegrationConfig>,
140}
141
142impl IntegrationManager {
143    /// Create a new integration manager.
144    pub fn new() -> Self {
145        Self {
146            configs: HashMap::new(),
147        }
148    }
149
150    /// Register an integration.
151    pub fn register(&mut self, name: String, config: IntegrationConfig) {
152        self.configs.insert(name, config);
153    }
154
155    /// Get an integration configuration.
156    pub fn get(&self, name: &str) -> Option<&IntegrationConfig> {
157        self.configs.get(name)
158    }
159
160    /// Remove an integration.
161    pub fn remove(&mut self, name: &str) -> Option<IntegrationConfig> {
162        self.configs.remove(name)
163    }
164
165    /// List all integrations.
166    pub fn list(&self) -> Vec<String> {
167        self.configs.keys().cloned().collect()
168    }
169
170    /// Export workflow to external format.
171    pub fn export_workflow(
172        &self,
173        workflow: &WorkflowDefinition,
174        integration_type: IntegrationType,
175    ) -> Result<String> {
176        match integration_type {
177            #[cfg(feature = "integrations")]
178            IntegrationType::Airflow => AirflowIntegration::export_workflow(workflow),
179            #[cfg(feature = "integrations")]
180            IntegrationType::Prefect => PrefectIntegration::export_workflow(workflow),
181            #[cfg(feature = "integrations")]
182            IntegrationType::Temporal => TemporalIntegration::export_workflow(workflow),
183            _ => Err(WorkflowError::integration(
184                integration_type.as_str(),
185                "Export not implemented for this integration type",
186            )),
187        }
188    }
189
190    /// Trigger workflow via webhook.
191    #[cfg(feature = "integrations")]
192    pub async fn trigger_webhook(
193        &self,
194        config: &WebhookConfig,
195        payload: &serde_json::Value,
196    ) -> Result<String> {
197        use reqwest::Client;
198
199        let client = Client::new();
200        let mut request = match config.method {
201            HttpMethod::Get => client.get(&config.url),
202            HttpMethod::Post => client.post(&config.url),
203            HttpMethod::Put => client.put(&config.url),
204            HttpMethod::Delete => client.delete(&config.url),
205            HttpMethod::Patch => client.patch(&config.url),
206        };
207
208        // Add headers
209        for (key, value) in &config.headers {
210            request = request.header(key, value);
211        }
212
213        // Add authentication
214        if let Some(auth) = &config.auth {
215            request = match auth {
216                AuthConfig::ApiKey { key } => request.header("X-API-Key", key),
217                AuthConfig::Basic { username, password } => {
218                    request.basic_auth(username, Some(password))
219                }
220                AuthConfig::OAuth2 { token } => request.bearer_auth(token),
221                AuthConfig::None => request,
222            };
223        }
224
225        // Send request
226        let response =
227            request.json(payload).send().await.map_err(|e| {
228                WorkflowError::integration("webhook", format!("Request failed: {}", e))
229            })?;
230
231        let status = response.status();
232        let body = response.text().await.map_err(|e| {
233            WorkflowError::integration("webhook", format!("Failed to read response: {}", e))
234        })?;
235
236        if !status.is_success() {
237            return Err(WorkflowError::integration(
238                "webhook",
239                format!("Request failed with status {}: {}", status, body),
240            ));
241        }
242
243        Ok(body)
244    }
245}
246
247impl Default for IntegrationManager {
248    fn default() -> Self {
249        Self::new()
250    }
251}
252
253impl IntegrationType {
254    /// Get string representation.
255    pub fn as_str(&self) -> &'static str {
256        match self {
257            Self::Airflow => "airflow",
258            Self::Prefect => "prefect",
259            Self::Temporal => "temporal",
260            Self::Webhook => "webhook",
261            Self::Kafka => "kafka",
262            Self::RabbitMq => "rabbitmq",
263        }
264    }
265}
266
267#[cfg(test)]
268mod tests {
269    use super::*;
270
271    #[test]
272    fn test_integration_manager() {
273        let mut manager = IntegrationManager::new();
274
275        let config = IntegrationConfig {
276            integration_type: IntegrationType::Webhook,
277            endpoint: "https://example.com/webhook".to_string(),
278            auth: Some(AuthConfig::ApiKey {
279                key: "test-key".to_string(),
280            }),
281            extra_config: HashMap::new(),
282        };
283
284        manager.register("test-integration".to_string(), config);
285
286        assert!(manager.get("test-integration").is_some());
287        assert_eq!(manager.list().len(), 1);
288    }
289
290    #[test]
291    fn test_integration_type_str() {
292        assert_eq!(IntegrationType::Airflow.as_str(), "airflow");
293        assert_eq!(IntegrationType::Webhook.as_str(), "webhook");
294    }
295
296    #[test]
297    fn test_retry_config_default() {
298        let config = RetryConfig::default();
299        assert_eq!(config.max_retries, 3);
300        assert_eq!(config.initial_delay_ms, 1000);
301    }
302}