kotoba_workflow/
integrations.rs

1//! External System Integrations - Phase 3
2//!
3//! 外部システムとの統合を提供します。
4//! HTTP, データベース, メッセージング, クラウドサービスなどの統合。
5
6use std::collections::HashMap;
7use serde::{Deserialize, Serialize};
8use async_trait::async_trait;
9
10/// 統合設定
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct IntegrationConfig {
13    pub name: String,
14    pub integration_type: IntegrationType,
15    pub config: HashMap<String, serde_json::Value>,
16    pub timeout: Option<std::time::Duration>,
17    pub retry_config: Option<RetryConfig>,
18}
19
20/// 統合種別
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub enum IntegrationType {
23    Http,
24    Database,
25    MessageQueue,
26    CloudStorage,
27    Email,
28    SMS,
29    Webhook,
30    GraphQL,
31    REST,
32}
33
34/// リトライ設定
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct RetryConfig {
37    pub max_attempts: u32,
38    pub initial_delay: std::time::Duration,
39    pub max_delay: std::time::Duration,
40    pub backoff_multiplier: f64,
41}
42
43/// 統合マネージャー
44pub struct IntegrationManager {
45    integrations: HashMap<String, Box<dyn Integration>>,
46}
47
48impl IntegrationManager {
49    pub fn new() -> Self {
50        Self {
51            integrations: HashMap::new(),
52        }
53    }
54
55    /// 統合を登録
56    pub fn register_integration(&mut self, name: &str, integration: Box<dyn Integration>) {
57        self.integrations.insert(name.to_string(), integration);
58    }
59
60    /// 統合を取得
61    pub fn get_integration(&self, name: &str) -> Option<&Box<dyn Integration>> {
62        self.integrations.get(name)
63    }
64
65    /// 統合を実行
66    pub async fn execute_integration(
67        &self,
68        name: &str,
69        operation: &str,
70        params: HashMap<String, serde_json::Value>,
71    ) -> Result<serde_json::Value, IntegrationError> {
72        if let Some(integration) = self.integrations.get(name) {
73            integration.execute(operation, params).await
74        } else {
75            Err(IntegrationError::IntegrationNotFound(name.to_string()))
76        }
77    }
78}
79
80/// 統合インターフェース
81#[async_trait]
82pub trait Integration: Send + Sync {
83    /// 統合を実行
84    async fn execute(
85        &self,
86        operation: &str,
87        params: HashMap<String, serde_json::Value>,
88    ) -> Result<serde_json::Value, IntegrationError>;
89
90    /// 統合のヘルスチェック
91    async fn health_check(&self) -> Result<(), IntegrationError>;
92
93    /// 統合の種類を取得
94    fn integration_type(&self) -> IntegrationType;
95}
96
97/// HTTP統合
98#[cfg(feature = "activities-http")]
99pub struct HttpIntegration {
100    client: reqwest::Client,
101    base_url: String,
102    headers: HashMap<String, String>,
103    timeout: std::time::Duration,
104}
105
106#[cfg(feature = "activities-http")]
107impl HttpIntegration {
108    pub fn new(base_url: &str, timeout: std::time::Duration) -> Self {
109        Self {
110            client: reqwest::Client::new(),
111            base_url: base_url.to_string(),
112            headers: HashMap::new(),
113            timeout,
114        }
115    }
116
117    pub fn with_header(mut self, key: &str, value: &str) -> Self {
118        self.headers.insert(key.to_string(), value.to_string());
119        self
120    }
121
122    pub fn with_bearer_token(mut self, token: &str) -> Self {
123        self.headers.insert("Authorization".to_string(), format!("Bearer {}", token));
124        self
125    }
126}
127
128#[cfg(feature = "activities-http")]
129#[async_trait]
130impl Integration for HttpIntegration {
131    async fn execute(
132        &self,
133        operation: &str,
134        params: HashMap<String, serde_json::Value>,
135    ) -> Result<serde_json::Value, IntegrationError> {
136        let url = format!("{}/{}", self.base_url.trim_end_matches('/'), operation.trim_start_matches('/'));
137
138        let mut request = self.client.post(&url).timeout(self.timeout);
139
140        // ヘッダーを設定
141        for (key, value) in &self.headers {
142            request = request.header(key, value);
143        }
144
145        // JSONボディを設定
146        request = request.json(&params);
147
148        let response = request.send().await
149            .map_err(|e| IntegrationError::HttpError(e.to_string()))?;
150
151        if !response.status().is_success() {
152            return Err(IntegrationError::HttpError(format!("HTTP {}: {}", response.status(), response.text().await.unwrap_or_default())));
153        }
154
155        let json_response = response.json().await
156            .map_err(|e| IntegrationError::ParseError(e.to_string()))?;
157
158        Ok(json_response)
159    }
160
161    async fn health_check(&self) -> Result<(), IntegrationError> {
162        let response = self.client.get(&self.base_url).timeout(std::time::Duration::from_secs(5)).send().await
163            .map_err(|e| IntegrationError::HttpError(e.to_string()))?;
164
165        if response.status().is_success() {
166            Ok(())
167        } else {
168            Err(IntegrationError::HealthCheckFailed)
169        }
170    }
171
172    fn integration_type(&self) -> IntegrationType {
173        IntegrationType::Http
174    }
175}
176
177/// データベース統合
178#[cfg(feature = "activities-db")]
179pub struct DatabaseIntegration {
180    connection_string: String,
181    pool: Option<sqlx::PgPool>, // PostgreSQLを例として使用
182}
183
184#[cfg(feature = "activities-db")]
185impl DatabaseIntegration {
186    pub fn new(connection_string: &str) -> Self {
187        Self {
188            connection_string: connection_string.to_string(),
189            pool: None,
190        }
191    }
192
193    /// データベース接続を初期化
194    pub async fn initialize(&mut self) -> Result<(), IntegrationError> {
195        self.pool = Some(sqlx::PgPool::connect(&self.connection_string).await
196            .map_err(|e| IntegrationError::DatabaseError(e.to_string()))?);
197        Ok(())
198    }
199}
200
201#[cfg(feature = "activities-db")]
202#[async_trait]
203impl Integration for DatabaseIntegration {
204    async fn execute(
205        &self,
206        operation: &str,
207        params: HashMap<String, serde_json::Value>,
208    ) -> Result<serde_json::Value, IntegrationError> {
209        let pool = self.pool.as_ref().ok_or(IntegrationError::DatabaseError("Not initialized".to_string()))?;
210
211        match operation {
212            #[cfg(feature = "activities-db")]
213            "query" => {
214                let sql = params.get("sql").and_then(|v| v.as_str()).ok_or(IntegrationError::InvalidParams)?;
215                let rows = sqlx::query(sql).fetch_all(pool).await
216                    .map_err(|e| IntegrationError::DatabaseError(e.to_string()))?;
217
218                let result: Vec<HashMap<String, serde_json::Value>> = rows.iter().map(|row| {
219                    // TODO: 実際の行データをJSONに変換
220                    HashMap::new()
221                }).collect();
222
223                Ok(serde_json::json!(result))
224            }
225            #[cfg(feature = "activities-db")]
226            "execute" => {
227                let sql = params.get("sql").and_then(|v| v.as_str()).ok_or(IntegrationError::InvalidParams)?;
228                let result = sqlx::query(sql).execute(pool).await
229                    .map_err(|e| IntegrationError::DatabaseError(e.to_string()))?;
230
231                Ok(serde_json::json!({
232                    "rows_affected": result.rows_affected()
233                }))
234            }
235            _ => Err(IntegrationError::UnsupportedOperation(operation.to_string())),
236        }
237    }
238
239    async fn health_check(&self) -> Result<(), IntegrationError> {
240        #[cfg(feature = "activities-db")]
241        {
242            let pool = self.pool.as_ref().ok_or(IntegrationError::DatabaseError("Not initialized".to_string()))?;
243
244            sqlx::query("SELECT 1").fetch_one(pool).await
245                .map_err(|e| IntegrationError::DatabaseError(e.to_string()))?;
246        }
247
248        Ok(())
249    }
250
251    fn integration_type(&self) -> IntegrationType {
252        IntegrationType::Database
253    }
254}
255
256/// メッセージキュー統合
257#[cfg(feature = "activities-db")]
258pub struct MessageQueueIntegration {
259    broker_url: String,
260    client: Option<lapin::Connection>,
261}
262
263#[cfg(feature = "activities-db")]
264impl MessageQueueIntegration {
265    pub fn new(broker_url: &str) -> Self {
266        Self {
267            broker_url: broker_url.to_string(),
268            client: None,
269        }
270    }
271
272    /// メッセージキュー接続を初期化
273    pub async fn initialize(&mut self) -> Result<(), IntegrationError> {
274        let connection = lapin::Connection::connect(&self.broker_url, lapin::ConnectionProperties::default()).await
275            .map_err(|e| IntegrationError::MessageQueueError(e.to_string()))?;
276        self.client = Some(connection);
277        Ok(())
278    }
279}
280
281#[cfg(feature = "activities-db")]
282#[async_trait]
283impl Integration for MessageQueueIntegration {
284    async fn execute(
285        &self,
286        operation: &str,
287        params: HashMap<String, serde_json::Value>,
288    ) -> Result<serde_json::Value, IntegrationError> {
289        let connection = self.client.as_ref().ok_or(IntegrationError::MessageQueueError("Not initialized".to_string()))?;
290        let channel = connection.create_channel().await
291            .map_err(|e| IntegrationError::MessageQueueError(e.to_string()))?;
292
293        match operation {
294            "publish" => {
295                let queue = params.get("queue").and_then(|v| v.as_str()).ok_or(IntegrationError::InvalidParams)?;
296                let message = params.get("message").ok_or(IntegrationError::InvalidParams)?;
297
298                channel.queue_declare(queue, Default::default(), Default::default()).await
299                    .map_err(|e| IntegrationError::MessageQueueError(e.to_string()))?;
300
301                channel.basic_publish(
302                    "",
303                    queue,
304                    Default::default(),
305                    &serde_json::to_vec(message).unwrap_or_default(),
306                    Default::default(),
307                ).await
308                    .map_err(|e| IntegrationError::MessageQueueError(e.to_string()))?;
309
310                Ok(serde_json::json!({"status": "published"}))
311            }
312            "consume" => {
313                let queue = params.get("queue").and_then(|v| v.as_str()).ok_or(IntegrationError::InvalidParams)?;
314
315                channel.queue_declare(queue, Default::default(), Default::default()).await
316                    .map_err(|e| IntegrationError::MessageQueueError(e.to_string()))?;
317
318                // TODO: メッセージ消費の実装
319                Ok(serde_json::json!({"status": "consuming"}))
320            }
321            _ => Err(IntegrationError::UnsupportedOperation(operation.to_string())),
322        }
323    }
324
325    async fn health_check(&self) -> Result<(), IntegrationError> {
326        let connection = self.client.as_ref().ok_or(IntegrationError::MessageQueueError("Not initialized".to_string()))?;
327
328        if connection.status().connected() {
329            Ok(())
330        } else {
331            Err(IntegrationError::HealthCheckFailed)
332        }
333    }
334
335    fn integration_type(&self) -> IntegrationType {
336        IntegrationType::MessageQueue
337    }
338}
339
340/// クラウドストレージ統合
341pub struct CloudStorageIntegration {
342    provider: CloudProvider,
343    bucket_name: String,
344    credentials: HashMap<String, String>,
345}
346
347#[derive(Debug, Clone, Serialize, Deserialize)]
348pub enum CloudProvider {
349    AWS,
350    GCP,
351    Azure,
352}
353
354impl CloudStorageIntegration {
355    pub fn new(provider: CloudProvider, bucket_name: &str, credentials: HashMap<String, String>) -> Self {
356        Self {
357            provider,
358            bucket_name: bucket_name.to_string(),
359            credentials,
360        }
361    }
362}
363
364#[async_trait]
365impl Integration for CloudStorageIntegration {
366    async fn execute(
367        &self,
368        operation: &str,
369        params: HashMap<String, serde_json::Value>,
370    ) -> Result<serde_json::Value, IntegrationError> {
371        match operation {
372            "upload" => {
373                let key = params.get("key").and_then(|v| v.as_str()).ok_or(IntegrationError::InvalidParams)?;
374                let data = params.get("data").and_then(|v| v.as_str()).ok_or(IntegrationError::InvalidParams)?;
375
376                // TODO: 実際のクラウドストレージアップロード実装
377                println!("Uploading {} to {} in {}", key, self.bucket_name, format!("{:?}", self.provider));
378
379                Ok(serde_json::json!({
380                    "status": "uploaded",
381                    "key": key,
382                    "bucket": self.bucket_name
383                }))
384            }
385            "download" => {
386                let key = params.get("key").and_then(|v| v.as_str()).ok_or(IntegrationError::InvalidParams)?;
387
388                // TODO: 実際のクラウドストレージダウンロード実装
389                println!("Downloading {} from {}", key, self.bucket_name);
390
391                Ok(serde_json::json!({
392                    "status": "downloaded",
393                    "key": key,
394                    "data": "downloaded_content"
395                }))
396            }
397            _ => Err(IntegrationError::UnsupportedOperation(operation.to_string())),
398        }
399    }
400
401    async fn health_check(&self) -> Result<(), IntegrationError> {
402        // TODO: 実際のヘルスチェック実装
403        Ok(())
404    }
405
406    fn integration_type(&self) -> IntegrationType {
407        IntegrationType::CloudStorage
408    }
409}
410
411/// Email統合
412pub struct EmailIntegration {
413    smtp_server: String,
414    smtp_port: u16,
415    username: String,
416    password: String,
417    from_email: String,
418}
419
420impl EmailIntegration {
421    pub fn new(smtp_server: &str, smtp_port: u16, username: &str, password: &str, from_email: &str) -> Self {
422        Self {
423            smtp_server: smtp_server.to_string(),
424            smtp_port,
425            username: username.to_string(),
426            password: password.to_string(),
427            from_email: from_email.to_string(),
428        }
429    }
430}
431
432#[async_trait]
433impl Integration for EmailIntegration {
434    async fn execute(
435        &self,
436        operation: &str,
437        params: HashMap<String, serde_json::Value>,
438    ) -> Result<serde_json::Value, IntegrationError> {
439        match operation {
440            "send" => {
441                let to = params.get("to").and_then(|v| v.as_str()).ok_or(IntegrationError::InvalidParams)?;
442                let subject = params.get("subject").and_then(|v| v.as_str()).ok_or(IntegrationError::InvalidParams)?;
443                let body = params.get("body").and_then(|v| v.as_str()).ok_or(IntegrationError::InvalidParams)?;
444
445                // TODO: 実際のEmail送信実装
446                println!("Sending email to {} with subject: {}", to, subject);
447
448                Ok(serde_json::json!({
449                    "status": "sent",
450                    "to": to,
451                    "subject": subject
452                }))
453            }
454            _ => Err(IntegrationError::UnsupportedOperation(operation.to_string())),
455        }
456    }
457
458    async fn health_check(&self) -> Result<(), IntegrationError> {
459        // TODO: SMTP接続テスト
460        Ok(())
461    }
462
463    fn integration_type(&self) -> IntegrationType {
464        IntegrationType::Email
465    }
466}
467
468/// Webhook統合
469#[cfg(feature = "activities-http")]
470pub struct WebhookIntegration {
471    client: reqwest::Client,
472    timeout: std::time::Duration,
473}
474
475#[cfg(feature = "activities-http")]
476impl WebhookIntegration {
477    pub fn new(timeout: std::time::Duration) -> Self {
478        Self {
479            client: reqwest::Client::new(),
480            timeout,
481        }
482    }
483}
484
485#[cfg(feature = "activities-http")]
486#[async_trait]
487impl Integration for WebhookIntegration {
488    async fn execute(
489        &self,
490        operation: &str,
491        params: HashMap<String, serde_json::Value>,
492    ) -> Result<serde_json::Value, IntegrationError> {
493        match operation {
494            "post" => {
495                let url = params.get("url").and_then(|v| v.as_str()).ok_or(IntegrationError::InvalidParams)?;
496                let payload = params.get("payload").ok_or(IntegrationError::InvalidParams)?;
497
498                let response = self.client.post(url)
499                    .timeout(self.timeout)
500                    .json(payload)
501                    .send().await
502                    .map_err(|e| IntegrationError::HttpError(e.to_string()))?;
503
504                if response.status().is_success() {
505                    let result = response.json().await
506                        .map_err(|e| IntegrationError::ParseError(e.to_string()))?;
507                    Ok(result)
508                } else {
509                    Err(IntegrationError::HttpError(format!("Webhook failed with status: {}", response.status())))
510                }
511            }
512            _ => Err(IntegrationError::UnsupportedOperation(operation.to_string())),
513        }
514    }
515
516    async fn health_check(&self) -> Result<(), IntegrationError> {
517        Ok(())
518    }
519
520    fn integration_type(&self) -> IntegrationType {
521        IntegrationType::Webhook
522    }
523}
524
525#[derive(Debug, thiserror::Error)]
526pub enum IntegrationError {
527    #[error("Integration not found: {0}")]
528    IntegrationNotFound(String),
529    #[error("HTTP error: {0}")]
530    HttpError(String),
531    #[error("Database error: {0}")]
532    DatabaseError(String),
533    #[error("Message queue error: {0}")]
534    MessageQueueError(String),
535    #[error("Parse error: {0}")]
536    ParseError(String),
537    #[error("Invalid parameters")]
538    InvalidParams,
539    #[error("Unsupported operation: {0}")]
540    UnsupportedOperation(String),
541    #[error("Health check failed")]
542    HealthCheckFailed,
543}