#[cfg(feature = "integrations")]
pub mod airflow;
#[cfg(feature = "integrations")]
pub mod prefect;
#[cfg(feature = "integrations")]
pub mod temporal;
use crate::engine::WorkflowDefinition;
use crate::error::{Result, WorkflowError};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[cfg(feature = "integrations")]
pub use airflow::AirflowIntegration;
#[cfg(feature = "integrations")]
pub use prefect::PrefectIntegration;
#[cfg(feature = "integrations")]
pub use temporal::TemporalIntegration;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum IntegrationType {
Airflow,
Prefect,
Temporal,
Webhook,
Kafka,
RabbitMq,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IntegrationConfig {
pub integration_type: IntegrationType,
pub endpoint: String,
pub auth: Option<AuthConfig>,
pub extra_config: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AuthConfig {
ApiKey {
key: String,
},
Basic {
username: String,
password: String,
},
OAuth2 {
token: String,
},
None,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookConfig {
pub url: String,
pub method: HttpMethod,
pub headers: HashMap<String, String>,
pub auth: Option<AuthConfig>,
pub retry: Option<RetryConfig>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum HttpMethod {
Get,
Post,
Put,
Delete,
Patch,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetryConfig {
pub max_retries: usize,
pub initial_delay_ms: u64,
pub max_delay_ms: u64,
pub backoff_multiplier: f64,
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_retries: 3,
initial_delay_ms: 1000,
max_delay_ms: 30000,
backoff_multiplier: 2.0,
}
}
}
pub struct IntegrationManager {
configs: HashMap<String, IntegrationConfig>,
}
impl IntegrationManager {
pub fn new() -> Self {
Self {
configs: HashMap::new(),
}
}
pub fn register(&mut self, name: String, config: IntegrationConfig) {
self.configs.insert(name, config);
}
pub fn get(&self, name: &str) -> Option<&IntegrationConfig> {
self.configs.get(name)
}
pub fn remove(&mut self, name: &str) -> Option<IntegrationConfig> {
self.configs.remove(name)
}
pub fn list(&self) -> Vec<String> {
self.configs.keys().cloned().collect()
}
pub fn export_workflow(
&self,
workflow: &WorkflowDefinition,
integration_type: IntegrationType,
) -> Result<String> {
match integration_type {
#[cfg(feature = "integrations")]
IntegrationType::Airflow => AirflowIntegration::export_workflow(workflow),
#[cfg(feature = "integrations")]
IntegrationType::Prefect => PrefectIntegration::export_workflow(workflow),
#[cfg(feature = "integrations")]
IntegrationType::Temporal => TemporalIntegration::export_workflow(workflow),
_ => Err(WorkflowError::integration(
integration_type.as_str(),
"Export not implemented for this integration type",
)),
}
}
#[cfg(feature = "integrations")]
pub async fn trigger_webhook(
&self,
config: &WebhookConfig,
payload: &serde_json::Value,
) -> Result<String> {
use reqwest::Client;
let client = Client::new();
let mut request = match config.method {
HttpMethod::Get => client.get(&config.url),
HttpMethod::Post => client.post(&config.url),
HttpMethod::Put => client.put(&config.url),
HttpMethod::Delete => client.delete(&config.url),
HttpMethod::Patch => client.patch(&config.url),
};
for (key, value) in &config.headers {
request = request.header(key, value);
}
if let Some(auth) = &config.auth {
request = match auth {
AuthConfig::ApiKey { key } => request.header("X-API-Key", key),
AuthConfig::Basic { username, password } => {
request.basic_auth(username, Some(password))
}
AuthConfig::OAuth2 { token } => request.bearer_auth(token),
AuthConfig::None => request,
};
}
let response =
request.json(payload).send().await.map_err(|e| {
WorkflowError::integration("webhook", format!("Request failed: {}", e))
})?;
let status = response.status();
let body = response.text().await.map_err(|e| {
WorkflowError::integration("webhook", format!("Failed to read response: {}", e))
})?;
if !status.is_success() {
return Err(WorkflowError::integration(
"webhook",
format!("Request failed with status {}: {}", status, body),
));
}
Ok(body)
}
}
impl Default for IntegrationManager {
fn default() -> Self {
Self::new()
}
}
impl IntegrationType {
pub fn as_str(&self) -> &'static str {
match self {
Self::Airflow => "airflow",
Self::Prefect => "prefect",
Self::Temporal => "temporal",
Self::Webhook => "webhook",
Self::Kafka => "kafka",
Self::RabbitMq => "rabbitmq",
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_integration_manager() {
let mut manager = IntegrationManager::new();
let config = IntegrationConfig {
integration_type: IntegrationType::Webhook,
endpoint: "https://example.com/webhook".to_string(),
auth: Some(AuthConfig::ApiKey {
key: "test-key".to_string(),
}),
extra_config: HashMap::new(),
};
manager.register("test-integration".to_string(), config);
assert!(manager.get("test-integration").is_some());
assert_eq!(manager.list().len(), 1);
}
#[test]
fn test_integration_type_str() {
assert_eq!(IntegrationType::Airflow.as_str(), "airflow");
assert_eq!(IntegrationType::Webhook.as_str(), "webhook");
}
#[test]
fn test_retry_config_default() {
let config = RetryConfig::default();
assert_eq!(config.max_retries, 3);
assert_eq!(config.initial_delay_ms, 1000);
}
}