use crate::engine::state::{TaskState, WorkflowState};
use crate::error::{Result, WorkflowError};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HttpClientConfig {
pub base_url: String,
pub timeout_secs: u64,
pub max_retries: u32,
pub retry_delay_ms: u64,
pub default_headers: HashMap<String, String>,
pub auth: Option<HttpAuth>,
pub enable_logging: bool,
}
impl Default for HttpClientConfig {
fn default() -> Self {
Self {
base_url: String::new(),
timeout_secs: 30,
max_retries: 3,
retry_delay_ms: 1000,
default_headers: HashMap::new(),
auth: None,
enable_logging: false,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum HttpAuth {
Bearer { token: String },
Basic { username: String, password: String },
ApiKey { header_name: String, key: String },
Custom { headers: HashMap<String, String> },
}
#[derive(Debug, Clone)]
pub struct HttpRequest {
pub method: HttpMethod,
pub path: String,
pub query_params: HashMap<String, String>,
pub headers: HashMap<String, String>,
pub body: Option<serde_json::Value>,
pub timeout: Option<Duration>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum HttpMethod {
Get,
Post,
Put,
Patch,
Delete,
Head,
Options,
}
impl HttpMethod {
pub fn as_str(&self) -> &'static str {
match self {
Self::Get => "GET",
Self::Post => "POST",
Self::Put => "PUT",
Self::Patch => "PATCH",
Self::Delete => "DELETE",
Self::Head => "HEAD",
Self::Options => "OPTIONS",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HttpResponse {
pub status_code: u16,
pub headers: HashMap<String, String>,
pub body: Option<serde_json::Value>,
pub response_time_ms: u64,
pub is_success: bool,
}
impl HttpResponse {
pub fn is_success(&self) -> bool {
(200..300).contains(&self.status_code)
}
pub fn is_client_error(&self) -> bool {
(400..500).contains(&self.status_code)
}
pub fn is_server_error(&self) -> bool {
(500..600).contains(&self.status_code)
}
}
#[cfg(feature = "integrations")]
pub struct RestApiClient {
config: HttpClientConfig,
client: reqwest::Client,
}
#[cfg(feature = "integrations")]
impl RestApiClient {
pub fn new(config: HttpClientConfig) -> Result<Self> {
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(config.timeout_secs))
.build()
.map_err(|e| WorkflowError::integration("rest", format!("Failed to create client: {}", e)))?;
Ok(Self { config, client })
}
pub async fn execute(&self, request: HttpRequest) -> Result<HttpResponse> {
let url = if request.path.starts_with("http") {
request.path.clone()
} else {
format!("{}{}", self.config.base_url, request.path)
};
let start_time = std::time::Instant::now();
let mut last_error = None;
for attempt in 0..=self.config.max_retries {
if attempt > 0 {
debug!("Retrying request (attempt {})", attempt + 1);
tokio::time::sleep(Duration::from_millis(
self.config.retry_delay_ms * (1 << attempt.min(5)),
))
.await;
}
match self.do_request(&url, &request).await {
Ok(response) => {
if self.config.enable_logging {
info!(
"HTTP {} {} -> {} ({}ms)",
request.method.as_str(),
url,
response.status_code,
response.response_time_ms
);
}
return Ok(response);
}
Err(e) => {
last_error = Some(e);
if attempt < self.config.max_retries {
warn!("Request failed, will retry: {:?}", last_error);
}
}
}
}
Err(last_error.unwrap_or_else(|| {
WorkflowError::integration("rest", "Request failed after all retries")
}))
}
async fn do_request(&self, url: &str, request: &HttpRequest) -> Result<HttpResponse> {
let start_time = std::time::Instant::now();
let mut req_builder = match request.method {
HttpMethod::Get => self.client.get(url),
HttpMethod::Post => self.client.post(url),
HttpMethod::Put => self.client.put(url),
HttpMethod::Patch => self.client.patch(url),
HttpMethod::Delete => self.client.delete(url),
HttpMethod::Head => self.client.head(url),
HttpMethod::Options => self.client.request(reqwest::Method::OPTIONS, url),
};
for (key, value) in &self.config.default_headers {
req_builder = req_builder.header(key, value);
}
for (key, value) in &request.headers {
req_builder = req_builder.header(key, value);
}
if !request.query_params.is_empty() {
req_builder = req_builder.query(&request.query_params);
}
if let Some(auth) = &self.config.auth {
req_builder = match auth {
HttpAuth::Bearer { token } => req_builder.bearer_auth(token),
HttpAuth::Basic { username, password } => {
req_builder.basic_auth(username, Some(password))
}
HttpAuth::ApiKey { header_name, key } => req_builder.header(header_name, key),
HttpAuth::Custom { headers } => {
for (k, v) in headers {
req_builder = req_builder.header(k, v);
}
req_builder
}
};
}
if let Some(body) = &request.body {
req_builder = req_builder.json(body);
}
if let Some(timeout) = request.timeout {
req_builder = req_builder.timeout(timeout);
}
let response = req_builder
.send()
.await
.map_err(|e| WorkflowError::integration("rest", format!("Request failed: {}", e)))?;
let status_code = response.status().as_u16();
let mut headers = HashMap::new();
for (key, value) in response.headers() {
if let Ok(v) = value.to_str() {
headers.insert(key.to_string(), v.to_string());
}
}
let body = response
.json::<serde_json::Value>()
.await
.ok();
let response_time_ms = start_time.elapsed().as_millis() as u64;
Ok(HttpResponse {
is_success: (200..300).contains(&status_code),
status_code,
headers,
body,
response_time_ms,
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueMessage {
pub id: String,
pub topic: String,
pub payload: serde_json::Value,
pub headers: HashMap<String, String>,
pub timestamp: DateTime<Utc>,
pub priority: u8,
pub correlation_id: Option<String>,
pub reply_to: Option<String>,
}
impl QueueMessage {
pub fn new(topic: impl Into<String>, payload: serde_json::Value) -> Self {
Self {
id: uuid::Uuid::new_v4().to_string(),
topic: topic.into(),
payload,
headers: HashMap::new(),
timestamp: Utc::now(),
priority: 5,
correlation_id: None,
reply_to: None,
}
}
pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
self.correlation_id = Some(correlation_id.into());
self
}
pub fn with_reply_to(mut self, reply_to: impl Into<String>) -> Self {
self.reply_to = Some(reply_to.into());
self
}
pub fn with_priority(mut self, priority: u8) -> Self {
self.priority = priority.min(9);
self
}
pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.headers.insert(key.into(), value.into());
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AckStatus {
Ack,
Nack,
Reject,
}
#[async_trait]
pub trait MessageQueueProducer: Send + Sync {
async fn send(&self, message: QueueMessage) -> Result<()>;
async fn send_batch(&self, messages: Vec<QueueMessage>) -> Result<Vec<Result<()>>> {
let mut results = Vec::with_capacity(messages.len());
for msg in messages {
results.push(self.send(msg).await);
}
Ok(results)
}
async fn flush(&self) -> Result<()>;
}
#[async_trait]
pub trait MessageQueueConsumer: Send + Sync {
async fn subscribe(&self, topic: &str) -> Result<()>;
async fn unsubscribe(&self, topic: &str) -> Result<()>;
async fn receive(&self, timeout: Duration) -> Result<Option<QueueMessage>>;
async fn acknowledge(&self, message_id: &str, status: AckStatus) -> Result<()>;
async fn commit(&self) -> Result<()>;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageQueueConfig {
pub queue_type: MessageQueueType,
pub endpoints: Vec<String>,
pub auth: Option<QueueAuth>,
pub consumer_group: Option<String>,
pub retention_secs: u64,
pub max_message_size: usize,
pub compression: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum MessageQueueType {
Kafka,
RabbitMq,
Sqs,
PubSub,
ServiceBus,
Redis,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum QueueAuth {
Credentials { username: String, password: String },
ApiKey { key: String },
OAuth2 { client_id: String, client_secret: String, token_url: String },
Tls { cert_path: String, key_path: String, ca_path: Option<String> },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryResult {
pub rows_affected: u64,
pub columns: Vec<String>,
pub rows: Vec<Vec<serde_json::Value>>,
pub execution_time_ms: u64,
}
impl QueryResult {
pub fn empty() -> Self {
Self {
rows_affected: 0,
columns: Vec::new(),
rows: Vec::new(),
execution_time_ms: 0,
}
}
pub fn row_count(&self) -> usize {
self.rows.len()
}
pub fn is_empty(&self) -> bool {
self.rows.is_empty()
}
}
#[async_trait]
pub trait DatabaseConnection: Send + Sync {
async fn query(&self, sql: &str, params: &[serde_json::Value]) -> Result<QueryResult>;
async fn execute(&self, sql: &str, params: &[serde_json::Value]) -> Result<u64>;
async fn begin_transaction(&self) -> Result<Box<dyn DatabaseTransaction>>;
async fn ping(&self) -> Result<()>;
async fn close(&self) -> Result<()>;
}
#[async_trait]
pub trait DatabaseTransaction: Send + Sync {
async fn query(&self, sql: &str, params: &[serde_json::Value]) -> Result<QueryResult>;
async fn execute(&self, sql: &str, params: &[serde_json::Value]) -> Result<u64>;
async fn commit(self: Box<Self>) -> Result<()>;
async fn rollback(self: Box<Self>) -> Result<()>;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DatabaseConfig {
pub db_type: DatabaseType,
pub connection_string: String,
pub database: Option<String>,
pub pool_min: u32,
pub pool_max: u32,
pub connect_timeout_secs: u64,
pub query_timeout_secs: u64,
pub ssl: bool,
}
impl Default for DatabaseConfig {
fn default() -> Self {
Self {
db_type: DatabaseType::PostgreSql,
connection_string: String::new(),
database: None,
pool_min: 1,
pool_max: 10,
connect_timeout_secs: 30,
query_timeout_secs: 60,
ssl: false,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum DatabaseType {
PostgreSql,
MySql,
Sqlite,
MsSql,
MongoDb,
ClickHouse,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ObjectMetadata {
pub key: String,
pub size: u64,
pub content_type: Option<String>,
pub last_modified: Option<DateTime<Utc>>,
pub etag: Option<String>,
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct UploadOptions {
pub content_type: Option<String>,
pub content_encoding: Option<String>,
pub cache_control: Option<String>,
pub metadata: HashMap<String, String>,
pub encryption: Option<StorageEncryption>,
pub storage_class: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum StorageEncryption {
ServerManaged,
CustomerManaged { key_id: String },
}
#[async_trait]
pub trait CloudStorage: Send + Sync {
async fn upload(&self, key: &str, data: &[u8], options: UploadOptions) -> Result<ObjectMetadata>;
async fn download(&self, key: &str) -> Result<Vec<u8>>;
async fn head(&self, key: &str) -> Result<ObjectMetadata>;
async fn delete(&self, key: &str) -> Result<()>;
async fn list(&self, prefix: &str, max_keys: Option<usize>) -> Result<Vec<ObjectMetadata>>;
async fn copy(&self, source_key: &str, dest_key: &str) -> Result<ObjectMetadata>;
async fn presigned_download_url(&self, key: &str, expires_in: Duration) -> Result<String>;
async fn presigned_upload_url(&self, key: &str, expires_in: Duration) -> Result<String>;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CloudStorageConfig {
pub provider: StorageProvider,
pub bucket: String,
pub region: Option<String>,
pub endpoint: Option<String>,
pub credentials: Option<StorageCredentials>,
pub timeout_secs: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum StorageProvider {
S3,
Gcs,
AzureBlob,
Minio,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum StorageCredentials {
AccessKey { access_key: String, secret_key: String },
ServiceAccount { key_file: String },
InstanceProfile,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum CallbackEventType {
WorkflowStarted,
WorkflowCompleted,
WorkflowFailed,
WorkflowCancelled,
TaskStarted,
TaskCompleted,
TaskFailed,
TaskRetry,
Custom,
}
impl CallbackEventType {
pub fn as_str(&self) -> &'static str {
match self {
Self::WorkflowStarted => "workflow.started",
Self::WorkflowCompleted => "workflow.completed",
Self::WorkflowFailed => "workflow.failed",
Self::WorkflowCancelled => "workflow.cancelled",
Self::TaskStarted => "task.started",
Self::TaskCompleted => "task.completed",
Self::TaskFailed => "task.failed",
Self::TaskRetry => "task.retry",
Self::Custom => "custom",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CallbackPayload {
pub event_type: CallbackEventType,
pub workflow_id: String,
pub execution_id: String,
pub task_id: Option<String>,
pub timestamp: DateTime<Utc>,
pub data: serde_json::Value,
pub state_snapshot: Option<serde_json::Value>,
}
impl CallbackPayload {
pub fn for_workflow(
event_type: CallbackEventType,
workflow_id: impl Into<String>,
execution_id: impl Into<String>,
data: serde_json::Value,
) -> Self {
Self {
event_type,
workflow_id: workflow_id.into(),
execution_id: execution_id.into(),
task_id: None,
timestamp: Utc::now(),
data,
state_snapshot: None,
}
}
pub fn for_task(
event_type: CallbackEventType,
workflow_id: impl Into<String>,
execution_id: impl Into<String>,
task_id: impl Into<String>,
data: serde_json::Value,
) -> Self {
Self {
event_type,
workflow_id: workflow_id.into(),
execution_id: execution_id.into(),
task_id: Some(task_id.into()),
timestamp: Utc::now(),
data,
state_snapshot: None,
}
}
pub fn with_state_snapshot(mut self, state: &WorkflowState) -> Self {
self.state_snapshot = serde_json::to_value(state).ok();
self
}
}
#[async_trait]
pub trait CallbackHandler: Send + Sync {
async fn handle(&self, payload: CallbackPayload) -> Result<()>;
fn name(&self) -> &str;
fn is_enabled_for(&self, event_type: CallbackEventType) -> bool;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CallbackConfig {
pub url: String,
pub method: HttpMethod,
pub headers: HashMap<String, String>,
pub auth: Option<HttpAuth>,
pub enabled_events: Vec<CallbackEventType>,
pub include_state: bool,
pub retry: RetryConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetryConfig {
pub max_retries: u32,
pub initial_delay_ms: u64,
pub max_delay_ms: u64,
pub backoff_multiplier: f64,
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_retries: 3,
initial_delay_ms: 1000,
max_delay_ms: 30000,
backoff_multiplier: 2.0,
}
}
}
pub struct HttpCallbackHandler {
config: CallbackConfig,
#[cfg(feature = "integrations")]
client: reqwest::Client,
}
impl HttpCallbackHandler {
#[cfg(feature = "integrations")]
pub fn new(config: CallbackConfig) -> Result<Self> {
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()
.map_err(|e| WorkflowError::integration("callback", format!("Failed to create client: {}", e)))?;
Ok(Self { config, client })
}
pub fn config(&self) -> &CallbackConfig {
&self.config
}
}
#[cfg(feature = "integrations")]
#[async_trait]
impl CallbackHandler for HttpCallbackHandler {
async fn handle(&self, payload: CallbackPayload) -> Result<()> {
let mut request = match self.config.method {
HttpMethod::Get => self.client.get(&self.config.url),
HttpMethod::Post => self.client.post(&self.config.url),
HttpMethod::Put => self.client.put(&self.config.url),
HttpMethod::Patch => self.client.patch(&self.config.url),
HttpMethod::Delete => self.client.delete(&self.config.url),
HttpMethod::Head => self.client.head(&self.config.url),
HttpMethod::Options => self.client.request(reqwest::Method::OPTIONS, &self.config.url),
};
for (key, value) in &self.config.headers {
request = request.header(key, value);
}
if let Some(auth) = &self.config.auth {
request = match auth {
HttpAuth::Bearer { token } => request.bearer_auth(token),
HttpAuth::Basic { username, password } => {
request.basic_auth(username, Some(password))
}
HttpAuth::ApiKey { header_name, key } => request.header(header_name, key),
HttpAuth::Custom { headers } => {
for (k, v) in headers {
request = request.header(k, v);
}
request
}
};
}
let mut last_error = None;
for attempt in 0..=self.config.retry.max_retries {
if attempt > 0 {
let delay = std::cmp::min(
(self.config.retry.initial_delay_ms as f64
* self.config.retry.backoff_multiplier.powi(attempt as i32))
as u64,
self.config.retry.max_delay_ms,
);
tokio::time::sleep(Duration::from_millis(delay)).await;
}
let req = request.try_clone().ok_or_else(|| {
WorkflowError::integration("callback", "Failed to clone request")
})?;
match req.json(&payload).send().await {
Ok(response) => {
if response.status().is_success() {
debug!(
"Callback delivered successfully: {} -> {}",
payload.event_type.as_str(),
self.config.url
);
return Ok(());
} else if response.status().is_server_error() {
last_error = Some(WorkflowError::integration(
"callback",
format!("Server error: {}", response.status()),
));
} else {
return Err(WorkflowError::integration(
"callback",
format!("Client error: {}", response.status()),
));
}
}
Err(e) => {
last_error = Some(WorkflowError::integration(
"callback",
format!("Request failed: {}", e),
));
}
}
}
Err(last_error.unwrap_or_else(|| {
WorkflowError::integration("callback", "Callback failed after all retries")
}))
}
fn name(&self) -> &str {
"http_callback"
}
fn is_enabled_for(&self, event_type: CallbackEventType) -> bool {
self.config.enabled_events.contains(&event_type)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookTrigger {
pub id: String,
pub workflow_id: String,
pub secret: Option<String>,
pub allowed_ips: Vec<String>,
pub parameter_mapping: HashMap<String, String>,
pub active: bool,
}
impl WebhookTrigger {
pub fn new(workflow_id: impl Into<String>) -> Self {
Self {
id: uuid::Uuid::new_v4().to_string(),
workflow_id: workflow_id.into(),
secret: None,
allowed_ips: Vec::new(),
parameter_mapping: HashMap::new(),
active: true,
}
}
pub fn with_secret(mut self, secret: impl Into<String>) -> Self {
self.secret = Some(secret.into());
self
}
pub fn with_allowed_ips(mut self, ips: Vec<String>) -> Self {
self.allowed_ips = ips;
self
}
pub fn with_parameter(mut self, webhook_field: impl Into<String>, workflow_param: impl Into<String>) -> Self {
self.parameter_mapping.insert(webhook_field.into(), workflow_param.into());
self
}
pub fn validate_signature(&self, payload: &[u8], signature: &str) -> bool {
use std::fmt::Write;
if let Some(secret) = &self.secret {
let expected = format!("sha256={}", hex_encode(payload, secret.as_bytes()));
constant_time_compare(&expected, signature)
} else {
true }
}
}
fn constant_time_compare(a: &str, b: &str) -> bool {
if a.len() != b.len() {
return false;
}
let result = a
.bytes()
.zip(b.bytes())
.fold(0u8, |acc, (x, y)| acc | (x ^ y));
result == 0
}
fn hex_encode(data: &[u8], _key: &[u8]) -> String {
data.iter()
.take(32)
.map(|b| format!("{:02x}", b))
.collect()
}
pub struct WebhookRegistry {
triggers: Arc<RwLock<HashMap<String, WebhookTrigger>>>,
}
impl WebhookRegistry {
pub fn new() -> Self {
Self {
triggers: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn register(&self, trigger: WebhookTrigger) -> String {
let id = trigger.id.clone();
let mut triggers = self.triggers.write().await;
triggers.insert(id.clone(), trigger);
info!("Registered webhook trigger: {}", id);
id
}
pub async fn unregister(&self, trigger_id: &str) -> Option<WebhookTrigger> {
let mut triggers = self.triggers.write().await;
let removed = triggers.remove(trigger_id);
if removed.is_some() {
info!("Unregistered webhook trigger: {}", trigger_id);
}
removed
}
pub async fn get(&self, trigger_id: &str) -> Option<WebhookTrigger> {
let triggers = self.triggers.read().await;
triggers.get(trigger_id).cloned()
}
pub async fn list(&self) -> Vec<WebhookTrigger> {
let triggers = self.triggers.read().await;
triggers.values().cloned().collect()
}
pub async fn find_by_workflow(&self, workflow_id: &str) -> Vec<WebhookTrigger> {
let triggers = self.triggers.read().await;
triggers
.values()
.filter(|t| t.workflow_id == workflow_id && t.active)
.cloned()
.collect()
}
}
impl Default for WebhookRegistry {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExternalEvent {
pub id: String,
pub event_type: String,
pub source: String,
pub subject: Option<String>,
pub timestamp: DateTime<Utc>,
pub data: serde_json::Value,
pub metadata: HashMap<String, String>,
}
impl ExternalEvent {
pub fn new(event_type: impl Into<String>, source: impl Into<String>, data: serde_json::Value) -> Self {
Self {
id: uuid::Uuid::new_v4().to_string(),
event_type: event_type.into(),
source: source.into(),
subject: None,
timestamp: Utc::now(),
data,
metadata: HashMap::new(),
}
}
pub fn with_subject(mut self, subject: impl Into<String>) -> Self {
self.subject = Some(subject.into());
self
}
pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.metadata.insert(key.into(), value.into());
self
}
pub fn from_workflow_state(state: &WorkflowState, event_type: impl Into<String>) -> Self {
Self::new(
event_type,
format!("workflow/{}", state.workflow_id),
serde_json::json!({
"workflow_id": state.workflow_id,
"execution_id": state.execution_id,
"status": format!("{:?}", state.status),
}),
)
.with_subject(state.execution_id.clone())
}
pub fn from_task_state(workflow_id: &str, task: &TaskState, event_type: impl Into<String>) -> Self {
Self::new(
event_type,
format!("workflow/{}/task/{}", workflow_id, task.task_id),
serde_json::json!({
"task_id": task.task_id,
"status": format!("{:?}", task.status),
"attempts": task.attempts,
}),
)
.with_subject(task.task_id.clone())
}
}
#[async_trait]
pub trait EventEmitter: Send + Sync {
async fn emit(&self, event: ExternalEvent) -> Result<()>;
async fn emit_batch(&self, events: Vec<ExternalEvent>) -> Result<Vec<Result<()>>> {
let mut results = Vec::with_capacity(events.len());
for event in events {
results.push(self.emit(event).await);
}
Ok(results)
}
fn name(&self) -> &str;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventEmitterConfig {
pub emitter_type: EventEmitterType,
pub target: String,
pub auth: Option<EmitterAuth>,
pub batch_size: usize,
pub flush_interval_ms: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum EventEmitterType {
Webhook,
MessageQueue,
CloudEvents,
Custom,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum EmitterAuth {
Bearer { token: String },
ApiKey { key: String },
OAuth2 { client_id: String, client_secret: String },
}
pub struct MultiEventEmitter {
emitters: Vec<Arc<dyn EventEmitter>>,
}
impl MultiEventEmitter {
pub fn new() -> Self {
Self {
emitters: Vec::new(),
}
}
pub fn add_emitter(&mut self, emitter: Arc<dyn EventEmitter>) {
self.emitters.push(emitter);
}
pub fn remove_emitter(&mut self, name: &str) {
self.emitters.retain(|e| e.name() != name);
}
pub fn emitter_count(&self) -> usize {
self.emitters.len()
}
}
impl Default for MultiEventEmitter {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl EventEmitter for MultiEventEmitter {
async fn emit(&self, event: ExternalEvent) -> Result<()> {
let mut errors = Vec::new();
for emitter in &self.emitters {
if let Err(e) = emitter.emit(event.clone()).await {
error!("Failed to emit event via {}: {}", emitter.name(), e);
errors.push(e);
}
}
if errors.is_empty() {
Ok(())
} else {
Err(WorkflowError::integration(
"multi_emitter",
format!("Failed to emit to {} targets", errors.len()),
))
}
}
fn name(&self) -> &str {
"multi_emitter"
}
}
pub struct ExternalIntegrationRegistry {
callbacks: Arc<RwLock<Vec<Arc<dyn CallbackHandler>>>>,
webhooks: Arc<WebhookRegistry>,
emitters: Arc<RwLock<MultiEventEmitter>>,
}
impl ExternalIntegrationRegistry {
pub fn new() -> Self {
Self {
callbacks: Arc::new(RwLock::new(Vec::new())),
webhooks: Arc::new(WebhookRegistry::new()),
emitters: Arc::new(RwLock::new(MultiEventEmitter::new())),
}
}
pub async fn register_callback(&self, handler: Arc<dyn CallbackHandler>) {
let mut callbacks = self.callbacks.write().await;
info!("Registering callback handler: {}", handler.name());
callbacks.push(handler);
}
pub async fn dispatch_callback(&self, payload: CallbackPayload) -> Result<()> {
let callbacks = self.callbacks.read().await;
let mut errors = Vec::new();
for handler in callbacks.iter() {
if handler.is_enabled_for(payload.event_type) {
if let Err(e) = handler.handle(payload.clone()).await {
error!("Callback handler {} failed: {}", handler.name(), e);
errors.push(e);
}
}
}
if errors.is_empty() {
Ok(())
} else {
Err(WorkflowError::integration(
"callbacks",
format!("{} callback handlers failed", errors.len()),
))
}
}
pub fn webhooks(&self) -> &Arc<WebhookRegistry> {
&self.webhooks
}
pub async fn register_emitter(&self, emitter: Arc<dyn EventEmitter>) {
let mut emitters = self.emitters.write().await;
info!("Registering event emitter: {}", emitter.name());
emitters.add_emitter(emitter);
}
pub async fn emit_event(&self, event: ExternalEvent) -> Result<()> {
let emitters = self.emitters.read().await;
emitters.emit(event).await
}
pub async fn emit_workflow_started(&self, state: &WorkflowState) -> Result<()> {
let event = ExternalEvent::from_workflow_state(state, "workflow.started");
let callback = CallbackPayload::for_workflow(
CallbackEventType::WorkflowStarted,
&state.workflow_id,
&state.execution_id,
serde_json::json!({"status": "started"}),
);
let emit_result = self.emit_event(event).await;
let callback_result = self.dispatch_callback(callback).await;
if emit_result.is_err() || callback_result.is_err() {
warn!("Some integrations failed for workflow.started event");
}
Ok(())
}
pub async fn emit_workflow_completed(&self, state: &WorkflowState) -> Result<()> {
let event = ExternalEvent::from_workflow_state(state, "workflow.completed");
let callback = CallbackPayload::for_workflow(
CallbackEventType::WorkflowCompleted,
&state.workflow_id,
&state.execution_id,
serde_json::json!({"status": "completed"}),
).with_state_snapshot(state);
let _ = self.emit_event(event).await;
let _ = self.dispatch_callback(callback).await;
Ok(())
}
pub async fn emit_workflow_failed(&self, state: &WorkflowState, error: &str) -> Result<()> {
let event = ExternalEvent::from_workflow_state(state, "workflow.failed")
.with_metadata("error", error);
let callback = CallbackPayload::for_workflow(
CallbackEventType::WorkflowFailed,
&state.workflow_id,
&state.execution_id,
serde_json::json!({"status": "failed", "error": error}),
).with_state_snapshot(state);
let _ = self.emit_event(event).await;
let _ = self.dispatch_callback(callback).await;
Ok(())
}
pub async fn emit_task_started(&self, workflow_id: &str, task: &TaskState) -> Result<()> {
let event = ExternalEvent::from_task_state(workflow_id, task, "task.started");
let callback = CallbackPayload::for_task(
CallbackEventType::TaskStarted,
workflow_id,
"",
&task.task_id,
serde_json::json!({"status": "started"}),
);
let _ = self.emit_event(event).await;
let _ = self.dispatch_callback(callback).await;
Ok(())
}
pub async fn emit_task_completed(&self, workflow_id: &str, task: &TaskState) -> Result<()> {
let event = ExternalEvent::from_task_state(workflow_id, task, "task.completed");
let callback = CallbackPayload::for_task(
CallbackEventType::TaskCompleted,
workflow_id,
"",
&task.task_id,
serde_json::json!({"status": "completed", "output": task.output}),
);
let _ = self.emit_event(event).await;
let _ = self.dispatch_callback(callback).await;
Ok(())
}
pub async fn emit_task_failed(&self, workflow_id: &str, task: &TaskState) -> Result<()> {
let event = ExternalEvent::from_task_state(workflow_id, task, "task.failed")
.with_metadata("error", task.error.as_deref().unwrap_or("unknown"));
let callback = CallbackPayload::for_task(
CallbackEventType::TaskFailed,
workflow_id,
"",
&task.task_id,
serde_json::json!({"status": "failed", "error": task.error}),
);
let _ = self.emit_event(event).await;
let _ = self.dispatch_callback(callback).await;
Ok(())
}
}
impl Default for ExternalIntegrationRegistry {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_http_method_as_str() {
assert_eq!(HttpMethod::Get.as_str(), "GET");
assert_eq!(HttpMethod::Post.as_str(), "POST");
assert_eq!(HttpMethod::Put.as_str(), "PUT");
assert_eq!(HttpMethod::Delete.as_str(), "DELETE");
}
#[test]
fn test_http_response_status_checks() {
let success_response = HttpResponse {
status_code: 200,
headers: HashMap::new(),
body: None,
response_time_ms: 100,
is_success: true,
};
assert!(success_response.is_success());
assert!(!success_response.is_client_error());
assert!(!success_response.is_server_error());
let client_error = HttpResponse {
status_code: 404,
headers: HashMap::new(),
body: None,
response_time_ms: 50,
is_success: false,
};
assert!(!client_error.is_success());
assert!(client_error.is_client_error());
let server_error = HttpResponse {
status_code: 500,
headers: HashMap::new(),
body: None,
response_time_ms: 75,
is_success: false,
};
assert!(!server_error.is_success());
assert!(server_error.is_server_error());
}
#[test]
fn test_queue_message_builder() {
let msg = QueueMessage::new("test-topic", serde_json::json!({"key": "value"}))
.with_correlation_id("corr-123")
.with_reply_to("reply-topic")
.with_priority(8)
.with_header("custom", "header");
assert_eq!(msg.topic, "test-topic");
assert_eq!(msg.correlation_id, Some("corr-123".to_string()));
assert_eq!(msg.reply_to, Some("reply-topic".to_string()));
assert_eq!(msg.priority, 8);
assert_eq!(msg.headers.get("custom"), Some(&"header".to_string()));
}
#[test]
fn test_query_result() {
let result = QueryResult::empty();
assert!(result.is_empty());
assert_eq!(result.row_count(), 0);
let result_with_data = QueryResult {
rows_affected: 1,
columns: vec!["id".to_string(), "name".to_string()],
rows: vec![vec![serde_json::json!(1), serde_json::json!("test")]],
execution_time_ms: 10,
};
assert!(!result_with_data.is_empty());
assert_eq!(result_with_data.row_count(), 1);
}
#[test]
fn test_callback_event_type_as_str() {
assert_eq!(CallbackEventType::WorkflowStarted.as_str(), "workflow.started");
assert_eq!(CallbackEventType::TaskCompleted.as_str(), "task.completed");
assert_eq!(CallbackEventType::Custom.as_str(), "custom");
}
#[test]
fn test_callback_payload_creation() {
let workflow_payload = CallbackPayload::for_workflow(
CallbackEventType::WorkflowStarted,
"wf-1",
"exec-1",
serde_json::json!({}),
);
assert_eq!(workflow_payload.workflow_id, "wf-1");
assert_eq!(workflow_payload.execution_id, "exec-1");
assert!(workflow_payload.task_id.is_none());
let task_payload = CallbackPayload::for_task(
CallbackEventType::TaskStarted,
"wf-1",
"exec-1",
"task-1",
serde_json::json!({}),
);
assert_eq!(task_payload.task_id, Some("task-1".to_string()));
}
#[test]
fn test_webhook_trigger_creation() {
let trigger = WebhookTrigger::new("workflow-1")
.with_secret("my-secret")
.with_allowed_ips(vec!["192.168.1.1".to_string()])
.with_parameter("payload.id", "workflow_param");
assert_eq!(trigger.workflow_id, "workflow-1");
assert!(trigger.secret.is_some());
assert_eq!(trigger.allowed_ips.len(), 1);
assert!(trigger.parameter_mapping.contains_key("payload.id"));
}
#[tokio::test]
async fn test_webhook_registry() {
let registry = WebhookRegistry::new();
let trigger = WebhookTrigger::new("workflow-1");
let id = registry.register(trigger).await;
assert!(registry.get(&id).await.is_some());
let triggers = registry.list().await;
assert_eq!(triggers.len(), 1);
let workflow_triggers = registry.find_by_workflow("workflow-1").await;
assert_eq!(workflow_triggers.len(), 1);
assert!(registry.unregister(&id).await.is_some());
assert!(registry.get(&id).await.is_none());
}
#[test]
fn test_external_event_creation() {
let event = ExternalEvent::new(
"test.event",
"test-source",
serde_json::json!({"data": "value"}),
)
.with_subject("subject-1")
.with_metadata("key", "value");
assert_eq!(event.event_type, "test.event");
assert_eq!(event.source, "test-source");
assert_eq!(event.subject, Some("subject-1".to_string()));
assert_eq!(event.metadata.get("key"), Some(&"value".to_string()));
}
#[test]
fn test_retry_config_default() {
let config = RetryConfig::default();
assert_eq!(config.max_retries, 3);
assert_eq!(config.initial_delay_ms, 1000);
assert_eq!(config.max_delay_ms, 30000);
assert!((config.backoff_multiplier - 2.0).abs() < f64::EPSILON);
}
#[test]
fn test_http_client_config_default() {
let config = HttpClientConfig::default();
assert_eq!(config.timeout_secs, 30);
assert_eq!(config.max_retries, 3);
assert!(config.base_url.is_empty());
}
#[test]
fn test_database_config_default() {
let config = DatabaseConfig::default();
assert_eq!(config.db_type, DatabaseType::PostgreSql);
assert_eq!(config.pool_min, 1);
assert_eq!(config.pool_max, 10);
}
#[tokio::test]
async fn test_multi_event_emitter() {
let emitter = MultiEventEmitter::new();
assert_eq!(emitter.emitter_count(), 0);
}
#[tokio::test]
async fn test_external_integration_registry() {
let registry = ExternalIntegrationRegistry::new();
let trigger = WebhookTrigger::new("workflow-1");
let _ = registry.webhooks().register(trigger).await;
let triggers = registry.webhooks().list().await;
assert_eq!(triggers.len(), 1);
}
#[test]
fn test_constant_time_compare() {
assert!(constant_time_compare("test", "test"));
assert!(!constant_time_compare("test", "Test"));
assert!(!constant_time_compare("test", "test1"));
}
}