1use std::collections::HashMap;
7use serde::{Deserialize, Serialize};
8use async_trait::async_trait;
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct IntegrationConfig {
13 pub name: String,
14 pub integration_type: IntegrationType,
15 pub config: HashMap<String, serde_json::Value>,
16 pub timeout: Option<std::time::Duration>,
17 pub retry_config: Option<RetryConfig>,
18}
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
22pub enum IntegrationType {
23 Http,
24 Database,
25 MessageQueue,
26 CloudStorage,
27 Email,
28 SMS,
29 Webhook,
30 GraphQL,
31 REST,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct RetryConfig {
37 pub max_attempts: u32,
38 pub initial_delay: std::time::Duration,
39 pub max_delay: std::time::Duration,
40 pub backoff_multiplier: f64,
41}
42
43pub struct IntegrationManager {
45 integrations: HashMap<String, Box<dyn Integration>>,
46}
47
48impl IntegrationManager {
49 pub fn new() -> Self {
50 Self {
51 integrations: HashMap::new(),
52 }
53 }
54
55 pub fn register_integration(&mut self, name: &str, integration: Box<dyn Integration>) {
57 self.integrations.insert(name.to_string(), integration);
58 }
59
60 pub fn get_integration(&self, name: &str) -> Option<&Box<dyn Integration>> {
62 self.integrations.get(name)
63 }
64
65 pub async fn execute_integration(
67 &self,
68 name: &str,
69 operation: &str,
70 params: HashMap<String, serde_json::Value>,
71 ) -> Result<serde_json::Value, IntegrationError> {
72 if let Some(integration) = self.integrations.get(name) {
73 integration.execute(operation, params).await
74 } else {
75 Err(IntegrationError::IntegrationNotFound(name.to_string()))
76 }
77 }
78}
79
80#[async_trait]
82pub trait Integration: Send + Sync {
83 async fn execute(
85 &self,
86 operation: &str,
87 params: HashMap<String, serde_json::Value>,
88 ) -> Result<serde_json::Value, IntegrationError>;
89
90 async fn health_check(&self) -> Result<(), IntegrationError>;
92
93 fn integration_type(&self) -> IntegrationType;
95}
96
97#[cfg(feature = "activities-http")]
99pub struct HttpIntegration {
100 client: reqwest::Client,
101 base_url: String,
102 headers: HashMap<String, String>,
103 timeout: std::time::Duration,
104}
105
106#[cfg(feature = "activities-http")]
107impl HttpIntegration {
108 pub fn new(base_url: &str, timeout: std::time::Duration) -> Self {
109 Self {
110 client: reqwest::Client::new(),
111 base_url: base_url.to_string(),
112 headers: HashMap::new(),
113 timeout,
114 }
115 }
116
117 pub fn with_header(mut self, key: &str, value: &str) -> Self {
118 self.headers.insert(key.to_string(), value.to_string());
119 self
120 }
121
122 pub fn with_bearer_token(mut self, token: &str) -> Self {
123 self.headers.insert("Authorization".to_string(), format!("Bearer {}", token));
124 self
125 }
126}
127
128#[cfg(feature = "activities-http")]
129#[async_trait]
130impl Integration for HttpIntegration {
131 async fn execute(
132 &self,
133 operation: &str,
134 params: HashMap<String, serde_json::Value>,
135 ) -> Result<serde_json::Value, IntegrationError> {
136 let url = format!("{}/{}", self.base_url.trim_end_matches('/'), operation.trim_start_matches('/'));
137
138 let mut request = self.client.post(&url).timeout(self.timeout);
139
140 for (key, value) in &self.headers {
142 request = request.header(key, value);
143 }
144
145 request = request.json(¶ms);
147
148 let response = request.send().await
149 .map_err(|e| IntegrationError::HttpError(e.to_string()))?;
150
151 if !response.status().is_success() {
152 return Err(IntegrationError::HttpError(format!("HTTP {}: {}", response.status(), response.text().await.unwrap_or_default())));
153 }
154
155 let json_response = response.json().await
156 .map_err(|e| IntegrationError::ParseError(e.to_string()))?;
157
158 Ok(json_response)
159 }
160
161 async fn health_check(&self) -> Result<(), IntegrationError> {
162 let response = self.client.get(&self.base_url).timeout(std::time::Duration::from_secs(5)).send().await
163 .map_err(|e| IntegrationError::HttpError(e.to_string()))?;
164
165 if response.status().is_success() {
166 Ok(())
167 } else {
168 Err(IntegrationError::HealthCheckFailed)
169 }
170 }
171
172 fn integration_type(&self) -> IntegrationType {
173 IntegrationType::Http
174 }
175}
176
177#[cfg(feature = "activities-db")]
179pub struct DatabaseIntegration {
180 connection_string: String,
181 pool: Option<sqlx::PgPool>, }
183
184#[cfg(feature = "activities-db")]
185impl DatabaseIntegration {
186 pub fn new(connection_string: &str) -> Self {
187 Self {
188 connection_string: connection_string.to_string(),
189 pool: None,
190 }
191 }
192
193 pub async fn initialize(&mut self) -> Result<(), IntegrationError> {
195 self.pool = Some(sqlx::PgPool::connect(&self.connection_string).await
196 .map_err(|e| IntegrationError::DatabaseError(e.to_string()))?);
197 Ok(())
198 }
199}
200
201#[cfg(feature = "activities-db")]
202#[async_trait]
203impl Integration for DatabaseIntegration {
204 async fn execute(
205 &self,
206 operation: &str,
207 params: HashMap<String, serde_json::Value>,
208 ) -> Result<serde_json::Value, IntegrationError> {
209 let pool = self.pool.as_ref().ok_or(IntegrationError::DatabaseError("Not initialized".to_string()))?;
210
211 match operation {
212 #[cfg(feature = "activities-db")]
213 "query" => {
214 let sql = params.get("sql").and_then(|v| v.as_str()).ok_or(IntegrationError::InvalidParams)?;
215 let rows = sqlx::query(sql).fetch_all(pool).await
216 .map_err(|e| IntegrationError::DatabaseError(e.to_string()))?;
217
218 let result: Vec<HashMap<String, serde_json::Value>> = rows.iter().map(|row| {
219 HashMap::new()
221 }).collect();
222
223 Ok(serde_json::json!(result))
224 }
225 #[cfg(feature = "activities-db")]
226 "execute" => {
227 let sql = params.get("sql").and_then(|v| v.as_str()).ok_or(IntegrationError::InvalidParams)?;
228 let result = sqlx::query(sql).execute(pool).await
229 .map_err(|e| IntegrationError::DatabaseError(e.to_string()))?;
230
231 Ok(serde_json::json!({
232 "rows_affected": result.rows_affected()
233 }))
234 }
235 _ => Err(IntegrationError::UnsupportedOperation(operation.to_string())),
236 }
237 }
238
239 async fn health_check(&self) -> Result<(), IntegrationError> {
240 #[cfg(feature = "activities-db")]
241 {
242 let pool = self.pool.as_ref().ok_or(IntegrationError::DatabaseError("Not initialized".to_string()))?;
243
244 sqlx::query("SELECT 1").fetch_one(pool).await
245 .map_err(|e| IntegrationError::DatabaseError(e.to_string()))?;
246 }
247
248 Ok(())
249 }
250
251 fn integration_type(&self) -> IntegrationType {
252 IntegrationType::Database
253 }
254}
255
256#[cfg(feature = "activities-db")]
258pub struct MessageQueueIntegration {
259 broker_url: String,
260 client: Option<lapin::Connection>,
261}
262
263#[cfg(feature = "activities-db")]
264impl MessageQueueIntegration {
265 pub fn new(broker_url: &str) -> Self {
266 Self {
267 broker_url: broker_url.to_string(),
268 client: None,
269 }
270 }
271
272 pub async fn initialize(&mut self) -> Result<(), IntegrationError> {
274 let connection = lapin::Connection::connect(&self.broker_url, lapin::ConnectionProperties::default()).await
275 .map_err(|e| IntegrationError::MessageQueueError(e.to_string()))?;
276 self.client = Some(connection);
277 Ok(())
278 }
279}
280
281#[cfg(feature = "activities-db")]
282#[async_trait]
283impl Integration for MessageQueueIntegration {
284 async fn execute(
285 &self,
286 operation: &str,
287 params: HashMap<String, serde_json::Value>,
288 ) -> Result<serde_json::Value, IntegrationError> {
289 let connection = self.client.as_ref().ok_or(IntegrationError::MessageQueueError("Not initialized".to_string()))?;
290 let channel = connection.create_channel().await
291 .map_err(|e| IntegrationError::MessageQueueError(e.to_string()))?;
292
293 match operation {
294 "publish" => {
295 let queue = params.get("queue").and_then(|v| v.as_str()).ok_or(IntegrationError::InvalidParams)?;
296 let message = params.get("message").ok_or(IntegrationError::InvalidParams)?;
297
298 channel.queue_declare(queue, Default::default(), Default::default()).await
299 .map_err(|e| IntegrationError::MessageQueueError(e.to_string()))?;
300
301 channel.basic_publish(
302 "",
303 queue,
304 Default::default(),
305 &serde_json::to_vec(message).unwrap_or_default(),
306 Default::default(),
307 ).await
308 .map_err(|e| IntegrationError::MessageQueueError(e.to_string()))?;
309
310 Ok(serde_json::json!({"status": "published"}))
311 }
312 "consume" => {
313 let queue = params.get("queue").and_then(|v| v.as_str()).ok_or(IntegrationError::InvalidParams)?;
314
315 channel.queue_declare(queue, Default::default(), Default::default()).await
316 .map_err(|e| IntegrationError::MessageQueueError(e.to_string()))?;
317
318 Ok(serde_json::json!({"status": "consuming"}))
320 }
321 _ => Err(IntegrationError::UnsupportedOperation(operation.to_string())),
322 }
323 }
324
325 async fn health_check(&self) -> Result<(), IntegrationError> {
326 let connection = self.client.as_ref().ok_or(IntegrationError::MessageQueueError("Not initialized".to_string()))?;
327
328 if connection.status().connected() {
329 Ok(())
330 } else {
331 Err(IntegrationError::HealthCheckFailed)
332 }
333 }
334
335 fn integration_type(&self) -> IntegrationType {
336 IntegrationType::MessageQueue
337 }
338}
339
340pub struct CloudStorageIntegration {
342 provider: CloudProvider,
343 bucket_name: String,
344 credentials: HashMap<String, String>,
345}
346
347#[derive(Debug, Clone, Serialize, Deserialize)]
348pub enum CloudProvider {
349 AWS,
350 GCP,
351 Azure,
352}
353
354impl CloudStorageIntegration {
355 pub fn new(provider: CloudProvider, bucket_name: &str, credentials: HashMap<String, String>) -> Self {
356 Self {
357 provider,
358 bucket_name: bucket_name.to_string(),
359 credentials,
360 }
361 }
362}
363
364#[async_trait]
365impl Integration for CloudStorageIntegration {
366 async fn execute(
367 &self,
368 operation: &str,
369 params: HashMap<String, serde_json::Value>,
370 ) -> Result<serde_json::Value, IntegrationError> {
371 match operation {
372 "upload" => {
373 let key = params.get("key").and_then(|v| v.as_str()).ok_or(IntegrationError::InvalidParams)?;
374 let data = params.get("data").and_then(|v| v.as_str()).ok_or(IntegrationError::InvalidParams)?;
375
376 println!("Uploading {} to {} in {}", key, self.bucket_name, format!("{:?}", self.provider));
378
379 Ok(serde_json::json!({
380 "status": "uploaded",
381 "key": key,
382 "bucket": self.bucket_name
383 }))
384 }
385 "download" => {
386 let key = params.get("key").and_then(|v| v.as_str()).ok_or(IntegrationError::InvalidParams)?;
387
388 println!("Downloading {} from {}", key, self.bucket_name);
390
391 Ok(serde_json::json!({
392 "status": "downloaded",
393 "key": key,
394 "data": "downloaded_content"
395 }))
396 }
397 _ => Err(IntegrationError::UnsupportedOperation(operation.to_string())),
398 }
399 }
400
401 async fn health_check(&self) -> Result<(), IntegrationError> {
402 Ok(())
404 }
405
406 fn integration_type(&self) -> IntegrationType {
407 IntegrationType::CloudStorage
408 }
409}
410
411pub struct EmailIntegration {
413 smtp_server: String,
414 smtp_port: u16,
415 username: String,
416 password: String,
417 from_email: String,
418}
419
420impl EmailIntegration {
421 pub fn new(smtp_server: &str, smtp_port: u16, username: &str, password: &str, from_email: &str) -> Self {
422 Self {
423 smtp_server: smtp_server.to_string(),
424 smtp_port,
425 username: username.to_string(),
426 password: password.to_string(),
427 from_email: from_email.to_string(),
428 }
429 }
430}
431
432#[async_trait]
433impl Integration for EmailIntegration {
434 async fn execute(
435 &self,
436 operation: &str,
437 params: HashMap<String, serde_json::Value>,
438 ) -> Result<serde_json::Value, IntegrationError> {
439 match operation {
440 "send" => {
441 let to = params.get("to").and_then(|v| v.as_str()).ok_or(IntegrationError::InvalidParams)?;
442 let subject = params.get("subject").and_then(|v| v.as_str()).ok_or(IntegrationError::InvalidParams)?;
443 let body = params.get("body").and_then(|v| v.as_str()).ok_or(IntegrationError::InvalidParams)?;
444
445 println!("Sending email to {} with subject: {}", to, subject);
447
448 Ok(serde_json::json!({
449 "status": "sent",
450 "to": to,
451 "subject": subject
452 }))
453 }
454 _ => Err(IntegrationError::UnsupportedOperation(operation.to_string())),
455 }
456 }
457
458 async fn health_check(&self) -> Result<(), IntegrationError> {
459 Ok(())
461 }
462
463 fn integration_type(&self) -> IntegrationType {
464 IntegrationType::Email
465 }
466}
467
468#[cfg(feature = "activities-http")]
470pub struct WebhookIntegration {
471 client: reqwest::Client,
472 timeout: std::time::Duration,
473}
474
475#[cfg(feature = "activities-http")]
476impl WebhookIntegration {
477 pub fn new(timeout: std::time::Duration) -> Self {
478 Self {
479 client: reqwest::Client::new(),
480 timeout,
481 }
482 }
483}
484
485#[cfg(feature = "activities-http")]
486#[async_trait]
487impl Integration for WebhookIntegration {
488 async fn execute(
489 &self,
490 operation: &str,
491 params: HashMap<String, serde_json::Value>,
492 ) -> Result<serde_json::Value, IntegrationError> {
493 match operation {
494 "post" => {
495 let url = params.get("url").and_then(|v| v.as_str()).ok_or(IntegrationError::InvalidParams)?;
496 let payload = params.get("payload").ok_or(IntegrationError::InvalidParams)?;
497
498 let response = self.client.post(url)
499 .timeout(self.timeout)
500 .json(payload)
501 .send().await
502 .map_err(|e| IntegrationError::HttpError(e.to_string()))?;
503
504 if response.status().is_success() {
505 let result = response.json().await
506 .map_err(|e| IntegrationError::ParseError(e.to_string()))?;
507 Ok(result)
508 } else {
509 Err(IntegrationError::HttpError(format!("Webhook failed with status: {}", response.status())))
510 }
511 }
512 _ => Err(IntegrationError::UnsupportedOperation(operation.to_string())),
513 }
514 }
515
516 async fn health_check(&self) -> Result<(), IntegrationError> {
517 Ok(())
518 }
519
520 fn integration_type(&self) -> IntegrationType {
521 IntegrationType::Webhook
522 }
523}
524
525#[derive(Debug, thiserror::Error)]
526pub enum IntegrationError {
527 #[error("Integration not found: {0}")]
528 IntegrationNotFound(String),
529 #[error("HTTP error: {0}")]
530 HttpError(String),
531 #[error("Database error: {0}")]
532 DatabaseError(String),
533 #[error("Message queue error: {0}")]
534 MessageQueueError(String),
535 #[error("Parse error: {0}")]
536 ParseError(String),
537 #[error("Invalid parameters")]
538 InvalidParams,
539 #[error("Unsupported operation: {0}")]
540 UnsupportedOperation(String),
541 #[error("Health check failed")]
542 HealthCheckFailed,
543}