Skip to main content

msg_gateway/
adapter.rs

1//! External Adapter Management
2//!
3//! Handles discovery, spawning, and communication with external adapter processes.
4
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::path::Path;
8use std::process::Stdio;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::process::{Child, Command};
12use tokio::sync::RwLock;
13
14use crate::error::AppError;
15
16/// HTTP client for adapter health checks
17static HEALTH_CLIENT: std::sync::OnceLock<reqwest::Client> = std::sync::OnceLock::new();
18
19fn get_health_client() -> &'static reqwest::Client {
20    HEALTH_CLIENT.get_or_init(|| {
21        reqwest::Client::builder()
22            .timeout(Duration::from_secs(5))
23            .build()
24            .expect("Failed to create HTTP client")
25    })
26}
27
28/// Adapter definition from adapter.json
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct AdapterDef {
31    pub name: String,
32    pub version: String,
33    pub command: String,
34    #[serde(default)]
35    pub args: Vec<String>,
36}
37
38/// Load adapter definition from a directory
39pub fn load_adapter_def(adapter_dir: &Path) -> Result<AdapterDef, AppError> {
40    let adapter_json = adapter_dir.join("adapter.json");
41    let content = std::fs::read_to_string(&adapter_json).map_err(|e| {
42        AppError::Config(format!("Failed to read {}: {}", adapter_json.display(), e))
43    })?;
44
45    serde_json::from_str(&content)
46        .map_err(|e| AppError::Config(format!("Failed to parse {}: {}", adapter_json.display(), e)))
47}
48
49/// Discover all adapters in the adapters directory
50pub fn discover_adapters(adapters_dir: &Path) -> Result<HashMap<String, AdapterDef>, AppError> {
51    let mut adapters = HashMap::new();
52
53    if !adapters_dir.exists() {
54        tracing::warn!(
55            path = %adapters_dir.display(),
56            "Adapters directory does not exist"
57        );
58        return Ok(adapters);
59    }
60
61    let entries = std::fs::read_dir(adapters_dir).map_err(|e| {
62        AppError::Config(format!(
63            "Failed to read adapters directory {}: {}",
64            adapters_dir.display(),
65            e
66        ))
67    })?;
68
69    for entry in entries {
70        let entry = entry
71            .map_err(|e| AppError::Config(format!("Failed to read directory entry: {}", e)))?;
72
73        let path = entry.path();
74        if path.is_dir() {
75            let adapter_json = path.join("adapter.json");
76            if adapter_json.exists() {
77                match load_adapter_def(&path) {
78                    Ok(def) => {
79                        tracing::info!(
80                            adapter = %def.name,
81                            version = %def.version,
82                            "Discovered adapter"
83                        );
84                        adapters.insert(def.name.clone(), def);
85                    }
86                    Err(e) => {
87                        tracing::warn!(
88                            path = %path.display(),
89                            error = %e,
90                            "Failed to load adapter definition"
91                        );
92                    }
93                }
94            }
95        }
96    }
97
98    Ok(adapters)
99}
100
101/// Adapter health state
102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
103pub enum AdapterHealth {
104    /// Initial state after spawn, waiting for health check
105    Starting,
106    /// Adapter is healthy and responding
107    Healthy,
108    /// Adapter failed to respond to health check
109    Unhealthy,
110    /// Adapter process has exited
111    Dead,
112}
113
114/// Running adapter process info
115#[allow(dead_code)]
116pub struct AdapterProcess {
117    pub instance_id: String,
118    pub credential_id: String,
119    pub adapter_name: String,
120    pub port: u16,
121    pub process: Child,
122    pub health: AdapterHealth,
123    pub consecutive_failures: u32,
124    pub restart_count: u32,
125    pub last_restart: Option<std::time::Instant>,
126    /// Stored for restart
127    pub token: String,
128    pub config: Option<serde_json::Value>,
129}
130
131/// Port allocator for adapter processes
132pub struct PortAllocator {
133    range_start: u16,
134    range_end: u16,
135    allocated: RwLock<Vec<u16>>,
136}
137
138impl PortAllocator {
139    pub fn new(range: (u16, u16)) -> Self {
140        Self {
141            range_start: range.0,
142            range_end: range.1,
143            allocated: RwLock::new(Vec::new()),
144        }
145    }
146
147    pub async fn allocate(&self) -> Option<u16> {
148        let mut allocated = self.allocated.write().await;
149        for port in self.range_start..=self.range_end {
150            if !allocated.contains(&port) {
151                allocated.push(port);
152                return Some(port);
153            }
154        }
155        None
156    }
157
158    pub async fn release(&self, port: u16) {
159        let mut allocated = self.allocated.write().await;
160        allocated.retain(|&p| p != port);
161    }
162}
163
164/// Adapter Instance Manager
165pub struct AdapterInstanceManager {
166    /// Discovered adapter definitions
167    pub adapters: HashMap<String, AdapterDef>,
168    /// Adapters directory path
169    pub adapters_dir: String,
170    /// Running adapter processes (credential_id -> process info)
171    processes: RwLock<HashMap<String, AdapterProcess>>,
172    /// Port allocator
173    port_allocator: PortAllocator,
174    /// Gateway URL for adapters to call back
175    gateway_url: String,
176}
177
178impl AdapterInstanceManager {
179    pub fn new(
180        adapters_dir: String,
181        port_range: (u16, u16),
182        gateway_listen: &str,
183    ) -> Result<Self, AppError> {
184        let adapters_path = Path::new(&adapters_dir);
185        let adapters = discover_adapters(adapters_path)?;
186
187        // Construct gateway URL from listen address
188        let gateway_url = if gateway_listen.starts_with("0.0.0.0") {
189            format!(
190                "http://127.0.0.1:{}",
191                gateway_listen.split(':').next_back().unwrap_or("8080")
192            )
193        } else {
194            format!("http://{}", gateway_listen)
195        };
196
197        Ok(Self {
198            adapters,
199            adapters_dir,
200            processes: RwLock::new(HashMap::new()),
201            port_allocator: PortAllocator::new(port_range),
202            gateway_url,
203        })
204    }
205
206    /// Check if an adapter exists
207    #[allow(dead_code)]
208    pub fn has_adapter(&self, name: &str) -> bool {
209        name == "generic" || self.adapters.contains_key(name)
210    }
211
212    /// Spawn an adapter process for a credential
213    pub async fn spawn(
214        &self,
215        credential_id: &str,
216        adapter_name: &str,
217        token: &str,
218        config: Option<&serde_json::Value>,
219    ) -> Result<(String, u16), AppError> {
220        // Generic adapter is built-in, no process to spawn
221        if adapter_name == "generic" {
222            let instance_id = format!("generic_{}", uuid::Uuid::new_v4());
223            return Ok((instance_id, 0));
224        }
225
226        // Get adapter definition
227        let adapter_def = self
228            .adapters
229            .get(adapter_name)
230            .ok_or_else(|| AppError::Config(format!("Adapter not found: {}", adapter_name)))?;
231
232        // Allocate port
233        let port = self
234            .port_allocator
235            .allocate()
236            .await
237            .ok_or_else(|| AppError::Internal("No available ports for adapter".to_string()))?;
238
239        let instance_id = format!("{}_{}", adapter_name, uuid::Uuid::new_v4());
240
241        // Build command
242        let adapter_path = Path::new(&self.adapters_dir).join(adapter_name);
243        let mut cmd = Command::new(&adapter_def.command);
244
245        cmd.args(&adapter_def.args)
246            .current_dir(&adapter_path)
247            .env("INSTANCE_ID", &instance_id)
248            .env("ADAPTER_PORT", port.to_string())
249            .env("GATEWAY_URL", &self.gateway_url)
250            .env("CREDENTIAL_ID", credential_id)
251            .env("CREDENTIAL_TOKEN", token)
252            .stdin(Stdio::null())
253            .stdout(Stdio::piped())
254            .stderr(Stdio::piped());
255
256        if let Some(cfg) = config {
257            cmd.env(
258                "CREDENTIAL_CONFIG",
259                serde_json::to_string(cfg).unwrap_or_default(),
260            );
261        }
262
263        tracing::info!(
264            credential_id = %credential_id,
265            adapter = %adapter_name,
266            port = %port,
267            instance_id = %instance_id,
268            "Spawning adapter process"
269        );
270
271        let process = cmd
272            .spawn()
273            .map_err(|e| AppError::Internal(format!("Failed to spawn adapter process: {}", e)))?;
274
275        // Store process info
276        let mut processes = self.processes.write().await;
277        processes.insert(
278            credential_id.to_string(),
279            AdapterProcess {
280                instance_id: instance_id.clone(),
281                credential_id: credential_id.to_string(),
282                adapter_name: adapter_name.to_string(),
283                port,
284                process,
285                health: AdapterHealth::Starting,
286                consecutive_failures: 0,
287                restart_count: 0,
288                last_restart: None,
289                token: token.to_string(),
290                config: config.cloned(),
291            },
292        );
293
294        Ok((instance_id, port))
295    }
296
297    /// Stop an adapter process
298    pub async fn stop(&self, credential_id: &str) -> Result<(), AppError> {
299        let mut processes = self.processes.write().await;
300
301        if let Some(mut process_info) = processes.remove(credential_id) {
302            // Release port
303            if process_info.port > 0 {
304                self.port_allocator.release(process_info.port).await;
305            }
306
307            // Kill process
308            if let Err(e) = process_info.process.kill().await {
309                tracing::warn!(
310                    credential_id = %credential_id,
311                    error = %e,
312                    "Failed to kill adapter process (may have already exited)"
313                );
314            }
315
316            tracing::info!(
317                credential_id = %credential_id,
318                adapter = %process_info.adapter_name,
319                "Adapter process stopped"
320            );
321        }
322
323        Ok(())
324    }
325
326    /// Get the port for a credential's adapter
327    pub async fn get_port(&self, credential_id: &str) -> Option<u16> {
328        let processes = self.processes.read().await;
329        processes.get(credential_id).map(|p| p.port)
330    }
331
332    /// Get instance_id for a credential
333    #[allow(dead_code)]
334    pub async fn get_instance_id(&self, credential_id: &str) -> Option<String> {
335        let processes = self.processes.read().await;
336        processes.get(credential_id).map(|p| p.instance_id.clone())
337    }
338
339    /// Check if adapter process is running for a credential
340    #[allow(dead_code)]
341    pub async fn is_running(&self, credential_id: &str) -> bool {
342        let processes = self.processes.read().await;
343        processes.contains_key(credential_id)
344    }
345
346    /// Stop all adapter processes
347    pub async fn stop_all(&self) {
348        let processes = self.processes.read().await;
349        let ids: Vec<_> = processes.keys().cloned().collect();
350        drop(processes);
351
352        for id in ids {
353            if let Err(e) = self.stop(&id).await {
354                tracing::error!(
355                    credential_id = %id,
356                    error = %e,
357                    "Failed to stop adapter"
358                );
359            }
360        }
361    }
362
363    /// Find credential_id by instance_id
364    pub async fn get_credential_id(&self, instance_id: &str) -> Option<String> {
365        let processes = self.processes.read().await;
366        for (cred_id, process) in processes.iter() {
367            if process.instance_id == instance_id {
368                return Some(cred_id.clone());
369            }
370        }
371        None
372    }
373
374    /// Check health of a specific adapter by credential_id
375    pub async fn check_health(&self, credential_id: &str) -> AdapterHealth {
376        let port = {
377            let processes = self.processes.read().await;
378            match processes.get(credential_id) {
379                Some(p) if p.port > 0 => p.port,
380                _ => return AdapterHealth::Dead, // Generic adapter or not found
381            }
382        };
383
384        let client = get_health_client();
385        let url = format!("http://127.0.0.1:{}/health", port);
386
387        match client.get(&url).send().await {
388            Ok(resp) if resp.status().is_success() => AdapterHealth::Healthy,
389            Ok(resp) => {
390                tracing::warn!(
391                    credential_id = %credential_id,
392                    status = %resp.status(),
393                    "Adapter health check returned non-success status"
394                );
395                AdapterHealth::Unhealthy
396            }
397            Err(e) => {
398                tracing::warn!(
399                    credential_id = %credential_id,
400                    error = %e,
401                    "Adapter health check failed"
402                );
403                AdapterHealth::Unhealthy
404            }
405        }
406    }
407
408    /// Update health state for a credential
409    pub async fn update_health(
410        &self,
411        credential_id: &str,
412        health: AdapterHealth,
413        reset_failures: bool,
414    ) {
415        let mut processes = self.processes.write().await;
416        if let Some(process) = processes.get_mut(credential_id) {
417            process.health = health;
418            if reset_failures {
419                process.consecutive_failures = 0;
420            } else if health == AdapterHealth::Unhealthy {
421                process.consecutive_failures += 1;
422            }
423        }
424    }
425
426    /// Get health state for a credential
427    #[allow(dead_code)]
428    pub async fn get_health(&self, credential_id: &str) -> Option<(AdapterHealth, u32)> {
429        let processes = self.processes.read().await;
430        processes
431            .get(credential_id)
432            .map(|p| (p.health, p.consecutive_failures))
433    }
434
435    /// Check if adapter process has exited
436    pub async fn check_process_alive(&self, credential_id: &str) -> bool {
437        let mut processes = self.processes.write().await;
438        if let Some(process) = processes.get_mut(credential_id) {
439            if process.port == 0 {
440                return true; // Generic adapter, always "alive"
441            }
442            match process.process.try_wait() {
443                Ok(Some(status)) => {
444                    tracing::warn!(
445                        credential_id = %credential_id,
446                        status = ?status,
447                        "Adapter process has exited"
448                    );
449                    process.health = AdapterHealth::Dead;
450                    false
451                }
452                Ok(None) => true, // Still running
453                Err(e) => {
454                    tracing::error!(
455                        credential_id = %credential_id,
456                        error = %e,
457                        "Failed to check process status"
458                    );
459                    false
460                }
461            }
462        } else {
463            false
464        }
465    }
466
467    /// Get all credentials with their health status
468    pub async fn get_all_health(&self) -> HashMap<String, (String, AdapterHealth, u32)> {
469        let processes = self.processes.read().await;
470        processes
471            .iter()
472            .map(|(cred_id, p)| {
473                (
474                    cred_id.clone(),
475                    (p.adapter_name.clone(), p.health, p.consecutive_failures),
476                )
477            })
478            .collect()
479    }
480
481    /// Restart an adapter process
482    /// Returns Ok(true) if restart succeeded, Ok(false) if should wait (backoff), Err on failure
483    pub async fn restart(&self, credential_id: &str, max_restarts: u32) -> Result<bool, AppError> {
484        // Get info needed for restart
485        let (adapter_name, token, config, restart_count, last_restart, _old_port) = {
486            let processes = self.processes.read().await;
487            let process = processes.get(credential_id).ok_or_else(|| {
488                AppError::Internal(format!(
489                    "Process not found for credential: {}",
490                    credential_id
491                ))
492            })?;
493
494            (
495                process.adapter_name.clone(),
496                process.token.clone(),
497                process.config.clone(),
498                process.restart_count,
499                process.last_restart,
500                process.port,
501            )
502        };
503
504        // Check if we've exceeded max restarts
505        if restart_count >= max_restarts {
506            tracing::error!(
507                credential_id = %credential_id,
508                restart_count = %restart_count,
509                max_restarts = %max_restarts,
510                "Max restarts exceeded, not restarting"
511            );
512            return Err(AppError::Internal("Max restarts exceeded".to_string()));
513        }
514
515        // Calculate backoff delay (exponential: 1s, 2s, 4s, 8s, etc. up to 60s)
516        let backoff_secs = std::cmp::min(60, 1u64 << restart_count);
517        let backoff = Duration::from_secs(backoff_secs);
518
519        // Check if we need to wait
520        if let Some(last) = last_restart {
521            let elapsed = last.elapsed();
522            if elapsed < backoff {
523                let remaining = backoff - elapsed;
524                tracing::info!(
525                    credential_id = %credential_id,
526                    remaining_secs = remaining.as_secs(),
527                    "Backoff in effect, waiting before restart"
528                );
529                return Ok(false);
530            }
531        }
532
533        tracing::info!(
534            credential_id = %credential_id,
535            adapter = %adapter_name,
536            restart_count = restart_count + 1,
537            "Restarting adapter"
538        );
539
540        // Stop the old process first
541        self.stop(credential_id).await?;
542
543        // Respawn with same settings
544        let result = self
545            .spawn(credential_id, &adapter_name, &token, config.as_ref())
546            .await;
547
548        // Update restart tracking
549        if result.is_ok() {
550            let mut processes = self.processes.write().await;
551            if let Some(process) = processes.get_mut(credential_id) {
552                process.restart_count = restart_count + 1;
553                process.last_restart = Some(std::time::Instant::now());
554            }
555        }
556
557        result.map(|_| true)
558    }
559
560    /// Reset restart count for a credential (called when adapter is healthy for a while)
561    pub async fn reset_restart_count(&self, credential_id: &str) {
562        let mut processes = self.processes.write().await;
563        if let Some(process) = processes.get_mut(credential_id)
564            && process.restart_count > 0
565        {
566            tracing::debug!(
567                credential_id = %credential_id,
568                old_count = %process.restart_count,
569                "Resetting restart count"
570            );
571            process.restart_count = 0;
572        }
573    }
574
575    /// Get restart info for a credential
576    #[allow(dead_code)]
577    pub async fn get_restart_info(&self, credential_id: &str) -> Option<(u32, Option<Duration>)> {
578        let processes = self.processes.read().await;
579        processes.get(credential_id).map(|p| {
580            let time_since_restart = p.last_restart.map(|t| t.elapsed());
581            (p.restart_count, time_since_restart)
582        })
583    }
584}
585
586/// Configuration for adapter health monitor
587pub struct HealthMonitorConfig {
588    /// How often to check adapter health (seconds)
589    pub interval_secs: u64,
590    /// Number of consecutive failures before restart
591    pub max_failures: u32,
592    /// Maximum number of restart attempts
593    pub max_restarts: u32,
594    /// How long an adapter must be healthy before resetting restart count (seconds)
595    pub healthy_reset_secs: u64,
596}
597
598impl Default for HealthMonitorConfig {
599    fn default() -> Self {
600        Self {
601            interval_secs: 30,
602            max_failures: 3,
603            max_restarts: 5,
604            healthy_reset_secs: 300, // 5 minutes
605        }
606    }
607}
608
609/// Start health monitoring for all adapters
610/// This runs in a background task and periodically checks adapter health
611pub async fn start_adapter_health_monitor(
612    manager: Arc<AdapterInstanceManager>,
613    interval_secs: u64,
614    max_failures: u32,
615) {
616    let config = HealthMonitorConfig {
617        interval_secs,
618        max_failures,
619        ..Default::default()
620    };
621
622    start_adapter_health_monitor_with_config(manager, config).await;
623}
624
625/// Start health monitoring with full configuration
626pub async fn start_adapter_health_monitor_with_config(
627    manager: Arc<AdapterInstanceManager>,
628    config: HealthMonitorConfig,
629) {
630    let interval = Duration::from_secs(config.interval_secs);
631    let healthy_reset = Duration::from_secs(config.healthy_reset_secs);
632
633    tracing::info!(
634        interval_secs = %config.interval_secs,
635        max_failures = %config.max_failures,
636        max_restarts = %config.max_restarts,
637        "Starting adapter health monitor"
638    );
639
640    // Track how long each adapter has been healthy
641    let mut healthy_since: HashMap<String, std::time::Instant> = HashMap::new();
642
643    loop {
644        tokio::time::sleep(interval).await;
645
646        let health_status = manager.get_all_health().await;
647
648        for (credential_id, (adapter_name, current_health, consecutive_failures)) in health_status {
649            // Skip generic adapter (built-in)
650            if adapter_name == "generic" {
651                continue;
652            }
653
654            // First check if process is still running
655            if !manager.check_process_alive(&credential_id).await {
656                tracing::warn!(
657                    credential_id = %credential_id,
658                    adapter = %adapter_name,
659                    "Adapter process died, attempting restart"
660                );
661                healthy_since.remove(&credential_id);
662
663                match manager.restart(&credential_id, config.max_restarts).await {
664                    Ok(true) => {
665                        tracing::info!(
666                            credential_id = %credential_id,
667                            "Adapter restart initiated"
668                        );
669                        // Wait for adapter to become ready
670                        let ready = wait_for_adapter_ready(
671                            &manager,
672                            &credential_id,
673                            Duration::from_secs(30),
674                            Duration::from_millis(500),
675                        )
676                        .await;
677                        if ready {
678                            tracing::info!(
679                                credential_id = %credential_id,
680                                "Restarted adapter is ready"
681                            );
682                        }
683                    }
684                    Ok(false) => {
685                        tracing::debug!(
686                            credential_id = %credential_id,
687                            "Restart postponed due to backoff"
688                        );
689                    }
690                    Err(e) => {
691                        tracing::error!(
692                            credential_id = %credential_id,
693                            error = %e,
694                            "Failed to restart adapter"
695                        );
696                    }
697                }
698                continue;
699            }
700
701            // Run health check
702            let health = manager.check_health(&credential_id).await;
703
704            match health {
705                AdapterHealth::Healthy => {
706                    if current_health != AdapterHealth::Healthy {
707                        tracing::info!(
708                            credential_id = %credential_id,
709                            adapter = %adapter_name,
710                            "Adapter is now healthy"
711                        );
712                        healthy_since.insert(credential_id.clone(), std::time::Instant::now());
713                    }
714                    manager
715                        .update_health(&credential_id, AdapterHealth::Healthy, true)
716                        .await;
717
718                    // Check if we should reset restart count
719                    if let Some(since) = healthy_since.get(&credential_id) {
720                        if since.elapsed() >= healthy_reset {
721                            manager.reset_restart_count(&credential_id).await;
722                            // Reset the timer
723                            healthy_since.insert(credential_id.clone(), std::time::Instant::now());
724                        }
725                    } else {
726                        healthy_since.insert(credential_id.clone(), std::time::Instant::now());
727                    }
728                }
729                AdapterHealth::Unhealthy => {
730                    healthy_since.remove(&credential_id);
731                    manager
732                        .update_health(&credential_id, AdapterHealth::Unhealthy, false)
733                        .await;
734
735                    let new_failures = consecutive_failures + 1;
736                    if new_failures >= config.max_failures {
737                        tracing::warn!(
738                            credential_id = %credential_id,
739                            adapter = %adapter_name,
740                            consecutive_failures = %new_failures,
741                            "Adapter exceeded max failures, attempting restart"
742                        );
743
744                        match manager.restart(&credential_id, config.max_restarts).await {
745                            Ok(true) => {
746                                tracing::info!(
747                                    credential_id = %credential_id,
748                                    "Adapter restart initiated due to health failures"
749                                );
750                                // Wait for adapter to become ready
751                                let ready = wait_for_adapter_ready(
752                                    &manager,
753                                    &credential_id,
754                                    Duration::from_secs(30),
755                                    Duration::from_millis(500),
756                                )
757                                .await;
758                                if ready {
759                                    tracing::info!(
760                                        credential_id = %credential_id,
761                                        "Restarted adapter is ready"
762                                    );
763                                }
764                            }
765                            Ok(false) => {
766                                tracing::debug!(
767                                    credential_id = %credential_id,
768                                    "Restart postponed due to backoff"
769                                );
770                            }
771                            Err(e) => {
772                                tracing::error!(
773                                    credential_id = %credential_id,
774                                    error = %e,
775                                    "Failed to restart adapter"
776                                );
777                            }
778                        }
779                    } else {
780                        tracing::warn!(
781                            credential_id = %credential_id,
782                            adapter = %adapter_name,
783                            consecutive_failures = %new_failures,
784                            "Adapter health check failed"
785                        );
786                    }
787                }
788                _ => {}
789            }
790        }
791    }
792}
793
794/// Wait for an adapter to become healthy after spawn
795pub async fn wait_for_adapter_ready(
796    manager: &AdapterInstanceManager,
797    credential_id: &str,
798    timeout: Duration,
799    poll_interval: Duration,
800) -> bool {
801    let start = std::time::Instant::now();
802
803    while start.elapsed() < timeout {
804        let health = manager.check_health(credential_id).await;
805        if health == AdapterHealth::Healthy {
806            manager
807                .update_health(credential_id, AdapterHealth::Healthy, true)
808                .await;
809            return true;
810        }
811        tokio::time::sleep(poll_interval).await;
812    }
813
814    tracing::warn!(
815        credential_id = %credential_id,
816        timeout_secs = timeout.as_secs(),
817        "Adapter did not become ready in time"
818    );
819    false
820}
821
822/// Request body for adapter inbound messages
823#[derive(Debug, Deserialize)]
824pub struct AdapterInboundRequest {
825    pub instance_id: String,
826    pub chat_id: String,
827    pub message_id: String,
828    #[serde(default)]
829    pub reply_to_message_id: Option<String>,
830    pub text: String,
831    pub from: AdapterUser,
832    /// Single file attachment (v0.1 compat, deprecated — use `files` instead)
833    #[serde(default)]
834    pub file: Option<AdapterFileInfo>,
835    /// Multiple file attachments (v0.2+)
836    #[serde(default)]
837    pub files: Vec<AdapterFileInfo>,
838    #[serde(default)]
839    pub timestamp: Option<String>,
840    #[serde(default)]
841    pub extra_data: Option<serde_json::Value>,
842}
843
844#[derive(Debug, Deserialize)]
845pub struct AdapterUser {
846    pub id: String,
847    #[serde(default)]
848    pub username: Option<String>,
849    #[serde(default)]
850    pub display_name: Option<String>,
851}
852
853#[derive(Debug, Deserialize)]
854pub struct AdapterFileInfo {
855    pub url: String,
856    #[serde(default)]
857    pub auth_header: Option<String>,
858    pub filename: String,
859    pub mime_type: String,
860}
861
862/// Request body for sending to adapter
863#[derive(Debug, Serialize)]
864pub struct AdapterSendRequest {
865    pub chat_id: String,
866    pub text: String,
867    #[serde(skip_serializing_if = "Option::is_none")]
868    pub reply_to_message_id: Option<String>,
869    /// Single file path (v0.1 compat, deprecated — use `file_paths` instead)
870    #[serde(skip_serializing_if = "Option::is_none")]
871    pub file_path: Option<String>,
872    /// Multiple file paths (v0.2+)
873    #[serde(default, skip_serializing_if = "Vec::is_empty")]
874    pub file_paths: Vec<String>,
875    #[serde(default, skip_serializing_if = "Option::is_none")]
876    pub extra_data: Option<serde_json::Value>,
877}
878
879/// Response from adapter send
880#[derive(Debug, Deserialize)]
881pub struct AdapterSendResponse {
882    pub protocol_message_id: String,
883}
884
885#[cfg(test)]
886mod tests {
887    use super::*;
888    use std::fs;
889    use tempfile::TempDir;
890
891    // ==================== PortAllocator Tests ====================
892
893    #[tokio::test]
894    async fn test_port_allocator_basic() {
895        let allocator = PortAllocator::new((9000, 9002));
896
897        // Allocate first port
898        let port1 = allocator.allocate().await;
899        assert_eq!(port1, Some(9000));
900
901        // Allocate second port
902        let port2 = allocator.allocate().await;
903        assert_eq!(port2, Some(9001));
904
905        // Allocate third port
906        let port3 = allocator.allocate().await;
907        assert_eq!(port3, Some(9002));
908
909        // No more ports available
910        let port4 = allocator.allocate().await;
911        assert_eq!(port4, None);
912    }
913
914    #[tokio::test]
915    async fn test_port_allocator_release() {
916        let allocator = PortAllocator::new((9000, 9001));
917
918        // Allocate all ports
919        let port1 = allocator.allocate().await.unwrap();
920        let _port2 = allocator.allocate().await.unwrap();
921        assert!(allocator.allocate().await.is_none());
922
923        // Release first port
924        allocator.release(port1).await;
925
926        // Should be able to allocate again
927        let port3 = allocator.allocate().await;
928        assert_eq!(port3, Some(9000));
929    }
930
931    #[tokio::test]
932    async fn test_port_allocator_release_unallocated() {
933        let allocator = PortAllocator::new((9000, 9001));
934
935        // Release a port that was never allocated - should not panic
936        allocator.release(9000).await;
937
938        // Should still be able to allocate normally
939        let port = allocator.allocate().await;
940        assert_eq!(port, Some(9000));
941    }
942
943    #[tokio::test]
944    async fn test_port_allocator_single_port_range() {
945        let allocator = PortAllocator::new((9000, 9000));
946
947        let port1 = allocator.allocate().await;
948        assert_eq!(port1, Some(9000));
949
950        let port2 = allocator.allocate().await;
951        assert_eq!(port2, None);
952
953        allocator.release(9000).await;
954
955        let port3 = allocator.allocate().await;
956        assert_eq!(port3, Some(9000));
957    }
958
959    // ==================== AdapterDef Tests ====================
960
961    #[test]
962    fn test_adapter_def_parse() {
963        let json = r#"{
964            "name": "telegram",
965            "version": "1.0.0",
966            "command": "python3",
967            "args": ["main.py"]
968        }"#;
969
970        let def: AdapterDef = serde_json::from_str(json).unwrap();
971        assert_eq!(def.name, "telegram");
972        assert_eq!(def.version, "1.0.0");
973        assert_eq!(def.command, "python3");
974        assert_eq!(def.args, vec!["main.py"]);
975    }
976
977    #[test]
978    fn test_adapter_def_parse_minimal() {
979        let json = r#"{
980            "name": "test",
981            "version": "0.1.0",
982            "command": "node"
983        }"#;
984
985        let def: AdapterDef = serde_json::from_str(json).unwrap();
986        assert_eq!(def.name, "test");
987        assert!(def.args.is_empty());
988    }
989
990    #[test]
991    fn test_adapter_def_serialize() {
992        let def = AdapterDef {
993            name: "test".to_string(),
994            version: "1.0.0".to_string(),
995            command: "python3".to_string(),
996            args: vec!["main.py".to_string()],
997        };
998
999        let json = serde_json::to_string(&def).unwrap();
1000        assert!(json.contains("\"name\":\"test\""));
1001        assert!(json.contains("\"version\":\"1.0.0\""));
1002    }
1003
1004    // ==================== load_adapter_def Tests ====================
1005
1006    #[test]
1007    fn test_load_adapter_def_success() {
1008        let temp_dir = TempDir::new().unwrap();
1009        let adapter_dir = temp_dir.path().join("test_adapter");
1010        fs::create_dir(&adapter_dir).unwrap();
1011
1012        let adapter_json = adapter_dir.join("adapter.json");
1013        fs::write(
1014            &adapter_json,
1015            r#"{
1016            "name": "test_adapter",
1017            "version": "1.0.0",
1018            "command": "python3",
1019            "args": ["main.py"]
1020        }"#,
1021        )
1022        .unwrap();
1023
1024        let def = load_adapter_def(&adapter_dir).unwrap();
1025        assert_eq!(def.name, "test_adapter");
1026        assert_eq!(def.version, "1.0.0");
1027    }
1028
1029    #[test]
1030    fn test_load_adapter_def_file_not_found() {
1031        let temp_dir = TempDir::new().unwrap();
1032        let adapter_dir = temp_dir.path().join("nonexistent");
1033        fs::create_dir(&adapter_dir).unwrap();
1034
1035        let result = load_adapter_def(&adapter_dir);
1036        assert!(result.is_err());
1037        let err = result.unwrap_err();
1038        assert!(matches!(err, AppError::Config(_)));
1039    }
1040
1041    #[test]
1042    fn test_load_adapter_def_invalid_json() {
1043        let temp_dir = TempDir::new().unwrap();
1044        let adapter_dir = temp_dir.path().join("invalid_adapter");
1045        fs::create_dir(&adapter_dir).unwrap();
1046
1047        let adapter_json = adapter_dir.join("adapter.json");
1048        fs::write(&adapter_json, "{ invalid json }").unwrap();
1049
1050        let result = load_adapter_def(&adapter_dir);
1051        assert!(result.is_err());
1052        let err = result.unwrap_err();
1053        assert!(matches!(err, AppError::Config(_)));
1054    }
1055
1056    // ==================== discover_adapters Tests ====================
1057
1058    #[test]
1059    fn test_discover_adapters_empty_dir() {
1060        let temp_dir = TempDir::new().unwrap();
1061
1062        let adapters = discover_adapters(temp_dir.path()).unwrap();
1063        assert!(adapters.is_empty());
1064    }
1065
1066    #[test]
1067    fn test_discover_adapters_nonexistent_dir() {
1068        let result = discover_adapters(Path::new("/nonexistent/dir"));
1069        assert!(result.is_ok());
1070        assert!(result.unwrap().is_empty());
1071    }
1072
1073    #[test]
1074    fn test_discover_adapters_with_adapters() {
1075        let temp_dir = TempDir::new().unwrap();
1076
1077        // Create adapter 1
1078        let adapter1_dir = temp_dir.path().join("adapter1");
1079        fs::create_dir(&adapter1_dir).unwrap();
1080        fs::write(
1081            adapter1_dir.join("adapter.json"),
1082            r#"{
1083            "name": "adapter1",
1084            "version": "1.0.0",
1085            "command": "python3"
1086        }"#,
1087        )
1088        .unwrap();
1089
1090        // Create adapter 2
1091        let adapter2_dir = temp_dir.path().join("adapter2");
1092        fs::create_dir(&adapter2_dir).unwrap();
1093        fs::write(
1094            adapter2_dir.join("adapter.json"),
1095            r#"{
1096            "name": "adapter2",
1097            "version": "2.0.0",
1098            "command": "node"
1099        }"#,
1100        )
1101        .unwrap();
1102
1103        let adapters = discover_adapters(temp_dir.path()).unwrap();
1104        assert_eq!(adapters.len(), 2);
1105        assert!(adapters.contains_key("adapter1"));
1106        assert!(adapters.contains_key("adapter2"));
1107    }
1108
1109    #[test]
1110    fn test_discover_adapters_skips_files() {
1111        let temp_dir = TempDir::new().unwrap();
1112
1113        // Create a file (not a directory)
1114        fs::write(temp_dir.path().join("not_a_dir.txt"), "content").unwrap();
1115
1116        let adapters = discover_adapters(temp_dir.path()).unwrap();
1117        assert!(adapters.is_empty());
1118    }
1119
1120    #[test]
1121    fn test_discover_adapters_skips_invalid() {
1122        let temp_dir = TempDir::new().unwrap();
1123
1124        // Create valid adapter
1125        let valid_dir = temp_dir.path().join("valid");
1126        fs::create_dir(&valid_dir).unwrap();
1127        fs::write(
1128            valid_dir.join("adapter.json"),
1129            r#"{
1130            "name": "valid",
1131            "version": "1.0.0",
1132            "command": "python3"
1133        }"#,
1134        )
1135        .unwrap();
1136
1137        // Create invalid adapter (bad JSON)
1138        let invalid_dir = temp_dir.path().join("invalid");
1139        fs::create_dir(&invalid_dir).unwrap();
1140        fs::write(invalid_dir.join("adapter.json"), "{ bad json }").unwrap();
1141
1142        // Create dir without adapter.json
1143        let no_json_dir = temp_dir.path().join("no_json");
1144        fs::create_dir(&no_json_dir).unwrap();
1145
1146        let adapters = discover_adapters(temp_dir.path()).unwrap();
1147        // Only valid adapter should be discovered
1148        assert_eq!(adapters.len(), 1);
1149        assert!(adapters.contains_key("valid"));
1150    }
1151
1152    // ==================== AdapterInstanceManager Tests ====================
1153
1154    #[tokio::test]
1155    async fn test_adapter_instance_manager_new() {
1156        let temp_dir = TempDir::new().unwrap();
1157
1158        let manager = AdapterInstanceManager::new(
1159            temp_dir.path().to_string_lossy().to_string(),
1160            (9000, 9100),
1161            "127.0.0.1:8080",
1162        )
1163        .unwrap();
1164
1165        assert_eq!(
1166            manager.adapters_dir,
1167            temp_dir.path().to_string_lossy().to_string()
1168        );
1169        assert!(manager.adapters.is_empty());
1170    }
1171
1172    #[tokio::test]
1173    async fn test_adapter_instance_manager_new_with_adapters() {
1174        let temp_dir = TempDir::new().unwrap();
1175
1176        // Create an adapter
1177        let adapter_dir = temp_dir.path().join("test_adapter");
1178        fs::create_dir(&adapter_dir).unwrap();
1179        fs::write(
1180            adapter_dir.join("adapter.json"),
1181            r#"{
1182            "name": "test_adapter",
1183            "version": "1.0.0",
1184            "command": "echo"
1185        }"#,
1186        )
1187        .unwrap();
1188
1189        let manager = AdapterInstanceManager::new(
1190            temp_dir.path().to_string_lossy().to_string(),
1191            (9000, 9100),
1192            "127.0.0.1:8080",
1193        )
1194        .unwrap();
1195
1196        assert_eq!(manager.adapters.len(), 1);
1197        assert!(manager.adapters.contains_key("test_adapter"));
1198    }
1199
1200    #[tokio::test]
1201    async fn test_adapter_instance_manager_gateway_url_conversion() {
1202        let temp_dir = TempDir::new().unwrap();
1203
1204        // Test with 0.0.0.0 address
1205        let manager = AdapterInstanceManager::new(
1206            temp_dir.path().to_string_lossy().to_string(),
1207            (9000, 9100),
1208            "0.0.0.0:8080",
1209        )
1210        .unwrap();
1211
1212        assert_eq!(manager.gateway_url, "http://127.0.0.1:8080");
1213    }
1214
1215    #[tokio::test]
1216    async fn test_adapter_instance_manager_has_adapter() {
1217        let temp_dir = TempDir::new().unwrap();
1218
1219        let adapter_dir = temp_dir.path().join("telegram");
1220        fs::create_dir(&adapter_dir).unwrap();
1221        fs::write(
1222            adapter_dir.join("adapter.json"),
1223            r#"{
1224            "name": "telegram",
1225            "version": "1.0.0",
1226            "command": "python3"
1227        }"#,
1228        )
1229        .unwrap();
1230
1231        let manager = AdapterInstanceManager::new(
1232            temp_dir.path().to_string_lossy().to_string(),
1233            (9000, 9100),
1234            "127.0.0.1:8080",
1235        )
1236        .unwrap();
1237
1238        // Generic is always available
1239        assert!(manager.has_adapter("generic"));
1240        // Discovered adapter
1241        assert!(manager.has_adapter("telegram"));
1242        // Non-existent adapter
1243        assert!(!manager.has_adapter("nonexistent"));
1244    }
1245
1246    #[tokio::test]
1247    async fn test_adapter_instance_manager_spawn_generic() {
1248        let temp_dir = TempDir::new().unwrap();
1249
1250        let manager = AdapterInstanceManager::new(
1251            temp_dir.path().to_string_lossy().to_string(),
1252            (9000, 9100),
1253            "127.0.0.1:8080",
1254        )
1255        .unwrap();
1256
1257        let result = manager.spawn("cred1", "generic", "token123", None).await;
1258        assert!(result.is_ok());
1259
1260        let (instance_id, port) = result.unwrap();
1261        assert!(instance_id.starts_with("generic_"));
1262        assert_eq!(port, 0); // Generic adapter uses port 0
1263    }
1264
1265    #[tokio::test]
1266    async fn test_adapter_instance_manager_spawn_nonexistent() {
1267        let temp_dir = TempDir::new().unwrap();
1268
1269        let manager = AdapterInstanceManager::new(
1270            temp_dir.path().to_string_lossy().to_string(),
1271            (9000, 9100),
1272            "127.0.0.1:8080",
1273        )
1274        .unwrap();
1275
1276        let result = manager
1277            .spawn("cred1", "nonexistent", "token123", None)
1278            .await;
1279        assert!(result.is_err());
1280        let err = result.unwrap_err();
1281        assert!(matches!(err, AppError::Config(_)));
1282    }
1283
1284    #[tokio::test]
1285    async fn test_adapter_instance_manager_stop_nonexistent() {
1286        let temp_dir = TempDir::new().unwrap();
1287
1288        let manager = AdapterInstanceManager::new(
1289            temp_dir.path().to_string_lossy().to_string(),
1290            (9000, 9100),
1291            "127.0.0.1:8080",
1292        )
1293        .unwrap();
1294
1295        // Should return Ok even for nonexistent credential
1296        let result = manager.stop("nonexistent").await;
1297        assert!(result.is_ok());
1298    }
1299
1300    #[tokio::test]
1301    async fn test_adapter_instance_manager_get_port() {
1302        let temp_dir = TempDir::new().unwrap();
1303
1304        let manager = AdapterInstanceManager::new(
1305            temp_dir.path().to_string_lossy().to_string(),
1306            (9000, 9100),
1307            "127.0.0.1:8080",
1308        )
1309        .unwrap();
1310
1311        // Non-existent credential
1312        assert_eq!(manager.get_port("nonexistent").await, None);
1313    }
1314
1315    #[tokio::test]
1316    async fn test_adapter_instance_manager_get_instance_id() {
1317        let temp_dir = TempDir::new().unwrap();
1318
1319        let manager = AdapterInstanceManager::new(
1320            temp_dir.path().to_string_lossy().to_string(),
1321            (9000, 9100),
1322            "127.0.0.1:8080",
1323        )
1324        .unwrap();
1325
1326        // Non-existent credential
1327        assert_eq!(manager.get_instance_id("nonexistent").await, None);
1328    }
1329
1330    #[tokio::test]
1331    async fn test_adapter_instance_manager_is_running() {
1332        let temp_dir = TempDir::new().unwrap();
1333
1334        let manager = AdapterInstanceManager::new(
1335            temp_dir.path().to_string_lossy().to_string(),
1336            (9000, 9100),
1337            "127.0.0.1:8080",
1338        )
1339        .unwrap();
1340
1341        // Non-existent credential
1342        assert!(!manager.is_running("nonexistent").await);
1343    }
1344
1345    #[tokio::test]
1346    async fn test_adapter_instance_manager_get_credential_id() {
1347        let temp_dir = TempDir::new().unwrap();
1348
1349        let manager = AdapterInstanceManager::new(
1350            temp_dir.path().to_string_lossy().to_string(),
1351            (9000, 9100),
1352            "127.0.0.1:8080",
1353        )
1354        .unwrap();
1355
1356        // Non-existent instance
1357        assert_eq!(
1358            manager.get_credential_id("nonexistent_instance").await,
1359            None
1360        );
1361    }
1362
1363    #[tokio::test]
1364    async fn test_adapter_instance_manager_get_all_health_empty() {
1365        let temp_dir = TempDir::new().unwrap();
1366
1367        let manager = AdapterInstanceManager::new(
1368            temp_dir.path().to_string_lossy().to_string(),
1369            (9000, 9100),
1370            "127.0.0.1:8080",
1371        )
1372        .unwrap();
1373
1374        let health = manager.get_all_health().await;
1375        assert!(health.is_empty());
1376    }
1377
1378    #[tokio::test]
1379    async fn test_adapter_instance_manager_check_health_nonexistent() {
1380        let temp_dir = TempDir::new().unwrap();
1381
1382        let manager = AdapterInstanceManager::new(
1383            temp_dir.path().to_string_lossy().to_string(),
1384            (9000, 9100),
1385            "127.0.0.1:8080",
1386        )
1387        .unwrap();
1388
1389        // Non-existent credential should return Dead
1390        let health = manager.check_health("nonexistent").await;
1391        assert_eq!(health, AdapterHealth::Dead);
1392    }
1393
1394    #[tokio::test]
1395    async fn test_adapter_instance_manager_get_health_nonexistent() {
1396        let temp_dir = TempDir::new().unwrap();
1397
1398        let manager = AdapterInstanceManager::new(
1399            temp_dir.path().to_string_lossy().to_string(),
1400            (9000, 9100),
1401            "127.0.0.1:8080",
1402        )
1403        .unwrap();
1404
1405        let health = manager.get_health("nonexistent").await;
1406        assert!(health.is_none());
1407    }
1408
1409    #[tokio::test]
1410    async fn test_adapter_instance_manager_check_process_alive_nonexistent() {
1411        let temp_dir = TempDir::new().unwrap();
1412
1413        let manager = AdapterInstanceManager::new(
1414            temp_dir.path().to_string_lossy().to_string(),
1415            (9000, 9100),
1416            "127.0.0.1:8080",
1417        )
1418        .unwrap();
1419
1420        // Non-existent credential should return false
1421        assert!(!manager.check_process_alive("nonexistent").await);
1422    }
1423
1424    #[tokio::test]
1425    async fn test_adapter_instance_manager_stop_all_empty() {
1426        let temp_dir = TempDir::new().unwrap();
1427
1428        let manager = AdapterInstanceManager::new(
1429            temp_dir.path().to_string_lossy().to_string(),
1430            (9000, 9100),
1431            "127.0.0.1:8080",
1432        )
1433        .unwrap();
1434
1435        // Should not panic with no processes
1436        manager.stop_all().await;
1437    }
1438
1439    #[tokio::test]
1440    async fn test_adapter_instance_manager_restart_nonexistent() {
1441        let temp_dir = TempDir::new().unwrap();
1442
1443        let manager = AdapterInstanceManager::new(
1444            temp_dir.path().to_string_lossy().to_string(),
1445            (9000, 9100),
1446            "127.0.0.1:8080",
1447        )
1448        .unwrap();
1449
1450        let result = manager.restart("nonexistent", 5).await;
1451        assert!(result.is_err());
1452    }
1453
1454    #[tokio::test]
1455    async fn test_adapter_instance_manager_reset_restart_count_nonexistent() {
1456        let temp_dir = TempDir::new().unwrap();
1457
1458        let manager = AdapterInstanceManager::new(
1459            temp_dir.path().to_string_lossy().to_string(),
1460            (9000, 9100),
1461            "127.0.0.1:8080",
1462        )
1463        .unwrap();
1464
1465        // Should not panic
1466        manager.reset_restart_count("nonexistent").await;
1467    }
1468
1469    #[tokio::test]
1470    async fn test_adapter_instance_manager_get_restart_info_nonexistent() {
1471        let temp_dir = TempDir::new().unwrap();
1472
1473        let manager = AdapterInstanceManager::new(
1474            temp_dir.path().to_string_lossy().to_string(),
1475            (9000, 9100),
1476            "127.0.0.1:8080",
1477        )
1478        .unwrap();
1479
1480        let info = manager.get_restart_info("nonexistent").await;
1481        assert!(info.is_none());
1482    }
1483
1484    // ==================== AdapterHealth Tests ====================
1485
1486    #[test]
1487    fn test_adapter_health_eq() {
1488        assert_eq!(AdapterHealth::Starting, AdapterHealth::Starting);
1489        assert_eq!(AdapterHealth::Healthy, AdapterHealth::Healthy);
1490        assert_eq!(AdapterHealth::Unhealthy, AdapterHealth::Unhealthy);
1491        assert_eq!(AdapterHealth::Dead, AdapterHealth::Dead);
1492        assert_ne!(AdapterHealth::Healthy, AdapterHealth::Unhealthy);
1493    }
1494
1495    #[test]
1496    fn test_adapter_health_clone() {
1497        let health = AdapterHealth::Healthy;
1498        let cloned = health;
1499        assert_eq!(health, cloned);
1500    }
1501
1502    #[test]
1503    fn test_adapter_health_debug() {
1504        let health = AdapterHealth::Healthy;
1505        let debug_str = format!("{:?}", health);
1506        assert!(debug_str.contains("Healthy"));
1507    }
1508
1509    // ==================== HealthMonitorConfig Tests ====================
1510
1511    #[test]
1512    fn test_health_monitor_config_default() {
1513        let config = HealthMonitorConfig::default();
1514        assert_eq!(config.interval_secs, 30);
1515        assert_eq!(config.max_failures, 3);
1516        assert_eq!(config.max_restarts, 5);
1517        assert_eq!(config.healthy_reset_secs, 300);
1518    }
1519
1520    #[test]
1521    fn test_health_monitor_config_custom() {
1522        let config = HealthMonitorConfig {
1523            interval_secs: 60,
1524            max_failures: 5,
1525            max_restarts: 10,
1526            healthy_reset_secs: 600,
1527        };
1528        assert_eq!(config.interval_secs, 60);
1529        assert_eq!(config.max_failures, 5);
1530    }
1531
1532    // ==================== Request/Response Types Tests ====================
1533
1534    #[test]
1535    fn test_adapter_inbound_request_parse() {
1536        let json = r#"{
1537            "instance_id": "telegram_abc123",
1538            "chat_id": "12345",
1539            "message_id": "msg_001",
1540            "from": {
1541                "id": "user_1",
1542                "username": "testuser",
1543                "display_name": "Test User"
1544            },
1545            "text": "Hello, world!"
1546        }"#;
1547
1548        let req: AdapterInboundRequest = serde_json::from_str(json).unwrap();
1549        assert_eq!(req.instance_id, "telegram_abc123");
1550        assert_eq!(req.chat_id, "12345");
1551        assert_eq!(req.text, "Hello, world!");
1552        assert_eq!(req.from.id, "user_1");
1553        assert_eq!(req.from.username, Some("testuser".to_string()));
1554    }
1555
1556    #[test]
1557    fn test_adapter_inbound_request_with_file() {
1558        let json = r#"{
1559            "instance_id": "telegram_abc123",
1560            "chat_id": "12345",
1561            "message_id": "msg_001",
1562            "from": {
1563                "id": "user_1"
1564            },
1565            "text": "File attached",
1566            "file": {
1567                "url": "https://example.com/file.pdf",
1568                "filename": "document.pdf",
1569                "mime_type": "application/pdf"
1570            }
1571        }"#;
1572
1573        let req: AdapterInboundRequest = serde_json::from_str(json).unwrap();
1574        assert!(req.file.is_some());
1575        let file = req.file.unwrap();
1576        assert_eq!(file.filename, "document.pdf");
1577        assert_eq!(file.mime_type, "application/pdf");
1578    }
1579
1580    #[test]
1581    fn test_adapter_inbound_request_with_timestamp() {
1582        let json = r#"{
1583            "instance_id": "telegram_abc123",
1584            "chat_id": "12345",
1585            "message_id": "msg_001",
1586            "from": {"id": "user_1"},
1587            "text": "Hello",
1588            "timestamp": "2024-01-15T10:30:00Z"
1589        }"#;
1590
1591        let req: AdapterInboundRequest = serde_json::from_str(json).unwrap();
1592        assert_eq!(req.timestamp, Some("2024-01-15T10:30:00Z".to_string()));
1593    }
1594
1595    #[test]
1596    fn test_adapter_user_minimal() {
1597        let json = r#"{"id": "user_123"}"#;
1598        let user: AdapterUser = serde_json::from_str(json).unwrap();
1599        assert_eq!(user.id, "user_123");
1600        assert!(user.username.is_none());
1601        assert!(user.display_name.is_none());
1602    }
1603
1604    #[test]
1605    fn test_adapter_file_info_with_auth() {
1606        let json = r#"{
1607            "url": "https://example.com/file.pdf",
1608            "auth_header": "Bearer token123",
1609            "filename": "doc.pdf",
1610            "mime_type": "application/pdf"
1611        }"#;
1612
1613        let file: AdapterFileInfo = serde_json::from_str(json).unwrap();
1614        assert_eq!(file.auth_header, Some("Bearer token123".to_string()));
1615    }
1616
1617    #[test]
1618    fn test_adapter_send_request_serialize() {
1619        let req = AdapterSendRequest {
1620            chat_id: "12345".to_string(),
1621            text: "Hello!".to_string(),
1622            reply_to_message_id: None,
1623            file_path: None,
1624            file_paths: vec![],
1625            extra_data: None,
1626        };
1627
1628        let json = serde_json::to_string(&req).unwrap();
1629        assert!(json.contains("\"chat_id\":\"12345\""));
1630        assert!(json.contains("\"text\":\"Hello!\""));
1631        // Optional fields should be skipped
1632        assert!(!json.contains("reply_to_message_id"));
1633        assert!(!json.contains("file_path"));
1634        assert!(!json.contains("file_paths"));
1635        assert!(!json.contains("extra_data"));
1636    }
1637
1638    #[test]
1639    fn test_adapter_send_request_with_reply() {
1640        let req = AdapterSendRequest {
1641            chat_id: "12345".to_string(),
1642            text: "Reply!".to_string(),
1643            reply_to_message_id: Some("msg_001".to_string()),
1644            file_path: None,
1645            file_paths: vec![],
1646            extra_data: None,
1647        };
1648
1649        let json = serde_json::to_string(&req).unwrap();
1650        assert!(json.contains("\"reply_to_message_id\":\"msg_001\""));
1651    }
1652
1653    #[test]
1654    fn test_adapter_send_request_with_file() {
1655        let req = AdapterSendRequest {
1656            chat_id: "12345".to_string(),
1657            text: "File!".to_string(),
1658            reply_to_message_id: None,
1659            file_path: Some("/tmp/file.pdf".to_string()),
1660            file_paths: vec![],
1661            extra_data: None,
1662        };
1663
1664        let json = serde_json::to_string(&req).unwrap();
1665        assert!(json.contains("\"file_path\":\"/tmp/file.pdf\""));
1666    }
1667
1668    #[test]
1669    fn test_adapter_send_request_with_file_paths() {
1670        let req = AdapterSendRequest {
1671            chat_id: "12345".to_string(),
1672            text: "Multiple files!".to_string(),
1673            reply_to_message_id: None,
1674            file_path: None,
1675            file_paths: vec!["/tmp/a.pdf".to_string(), "/tmp/b.png".to_string()],
1676            extra_data: Some(serde_json::json!({"thread_id": "123"})),
1677        };
1678
1679        let json = serde_json::to_string(&req).unwrap();
1680        assert!(json.contains("\"file_paths\""));
1681        assert!(json.contains("/tmp/a.pdf"));
1682        assert!(json.contains("/tmp/b.png"));
1683        assert!(json.contains("\"extra_data\""));
1684        assert!(json.contains("\"thread_id\""));
1685    }
1686
1687    #[test]
1688    fn test_adapter_send_response_parse() {
1689        let json = r#"{"protocol_message_id": "msg_123456"}"#;
1690        let resp: AdapterSendResponse = serde_json::from_str(json).unwrap();
1691        assert_eq!(resp.protocol_message_id, "msg_123456");
1692    }
1693
1694    // ==================== wait_for_adapter_ready Tests ====================
1695
1696    #[tokio::test]
1697    async fn test_wait_for_adapter_ready_timeout() {
1698        let temp_dir = TempDir::new().unwrap();
1699
1700        let manager = Arc::new(
1701            AdapterInstanceManager::new(
1702                temp_dir.path().to_string_lossy().to_string(),
1703                (9000, 9100),
1704                "127.0.0.1:8080",
1705            )
1706            .unwrap(),
1707        );
1708
1709        // Non-existent credential should timeout
1710        let ready = wait_for_adapter_ready(
1711            &manager,
1712            "nonexistent",
1713            Duration::from_millis(100),
1714            Duration::from_millis(20),
1715        )
1716        .await;
1717
1718        assert!(!ready);
1719    }
1720}