Skip to main content

trojan_agent/
cli.rs

1//! CLI entry point for the agent subcommand.
2
3use std::io;
4use std::path::PathBuf;
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6
7use clap::Parser;
8use tokio_util::sync::CancellationToken;
9use tracing::{error, info, warn};
10use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt};
11
12use crate::cache::{self, CachedConfig};
13use crate::client::{self, RegistrationResult};
14use crate::collector::TrafficCollector;
15use crate::config::AgentConfig;
16use crate::error::AgentError;
17use crate::protocol::{AgentMessage, PanelMessage, ServiceState};
18use crate::reporter;
19use crate::runner;
20
21/// CLI arguments for the agent subcommand.
22#[derive(Parser, Debug, Clone)]
23#[command(
24    name = "trojan-agent",
25    version,
26    about = "Panel agent — connects to management panel, receives config, boots services"
27)]
28pub struct AgentArgs {
29    /// Config file path (TOML).
30    #[arg(short, long, default_value = "agent.toml")]
31    pub config: PathBuf,
32
33    /// Log level override (e.g. "info", "debug", "trace").
34    #[arg(long)]
35    pub log_level: Option<String>,
36}
37
38/// Run the agent with the given CLI arguments.
39pub async fn run(args: AgentArgs) -> Result<(), Box<dyn std::error::Error>> {
40    let config_str = std::fs::read_to_string(&args.config)
41        .map_err(|e| format!("failed to read config file {:?}: {e}", args.config))?;
42    let config: AgentConfig =
43        toml::from_str(&config_str).map_err(|e| format!("failed to parse agent config: {e}"))?;
44
45    let log_level = args
46        .log_level
47        .as_deref()
48        .or(config.log_level.as_deref())
49        .unwrap_or("info");
50    init_tracing(log_level);
51
52    info!(
53        version = trojan_core::VERSION,
54        panel_url = %config.panel_url,
55        "trojan agent starting"
56    );
57
58    // Set up graceful shutdown on SIGTERM/SIGINT
59    let shutdown = CancellationToken::new();
60    let shutdown_signal = shutdown.clone();
61    tokio::spawn(async move {
62        shutdown_signal_handler().await;
63        info!("shutdown signal received");
64        shutdown_signal.cancel();
65    });
66
67    agent_loop(config, shutdown).await;
68    Ok(())
69}
70
71/// Outer reconnect loop with exponential backoff.
72///
73/// Keeps reconnecting to the panel until shutdown is signalled.
74async fn agent_loop(config: AgentConfig, shutdown: CancellationToken) {
75    let mut delay_ms = config.reconnect.initial_delay_ms;
76
77    loop {
78        match run_session(&config, shutdown.clone()).await {
79            Ok(()) => {
80                info!("session ended cleanly");
81                if shutdown.is_cancelled() {
82                    return;
83                }
84                // Reset backoff on clean exit
85                delay_ms = config.reconnect.initial_delay_ms;
86            }
87            Err(e) => {
88                if shutdown.is_cancelled() {
89                    return;
90                }
91                warn!(error = %e, "session failed");
92            }
93        }
94
95        if shutdown.is_cancelled() {
96            return;
97        }
98
99        // Apply jitter: delay * (1 ± jitter)
100        let jitter_factor = 1.0 + config.reconnect.jitter * (2.0 * rand_f64() - 1.0);
101        #[expect(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
102        let actual_delay = (delay_ms as f64 * jitter_factor) as u64;
103        let delay = Duration::from_millis(actual_delay);
104
105        info!(delay_ms = actual_delay, "reconnecting after delay");
106
107        tokio::select! {
108            _ = shutdown.cancelled() => return,
109            _ = tokio::time::sleep(delay) => {}
110        }
111
112        // Exponential backoff
113        #[expect(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
114        let next = (delay_ms as f64 * config.reconnect.multiplier) as u64;
115        delay_ms = next.min(config.reconnect.max_delay_ms);
116    }
117}
118
119/// Run a single agent session (connect → register → run service → event loop).
120async fn run_session(config: &AgentConfig, shutdown: CancellationToken) -> Result<(), AgentError> {
121    let cache_dir = cache::resolve_cache_dir(config.cache_dir.as_deref());
122
123    // Try to connect and register with the panel
124    let (reg, agent_tx, mut panel_rx) =
125        match client::connect_and_register(config, shutdown.clone()).await {
126            Ok(result) => result,
127            Err(e) => {
128                // Connection failed — try cached config for degraded mode
129                warn!(error = %e, "failed to connect to panel, checking local cache");
130                return run_degraded_mode(&cache_dir, shutdown).await;
131            }
132        };
133
134    let RegistrationResult {
135        node_id,
136        node_type,
137        config_version,
138        report_interval_secs,
139        config: service_config,
140    } = reg;
141
142    // Cache the received config
143    let cached = CachedConfig {
144        version: config_version,
145        node_type,
146        report_interval_secs,
147        config: service_config.clone(),
148        cached_at: unix_now(),
149    };
150    if let Err(e) = cache::write_cache(&cache_dir, &cached).await {
151        warn!(error = %e, "failed to cache config (non-fatal)");
152    }
153
154    // Determine report interval
155    let report_interval = Duration::from_secs(
156        config
157            .report_interval_secs
158            .unwrap_or_else(|| u64::from(report_interval_secs)),
159    );
160
161    // Spawn the service
162    let service_shutdown = CancellationToken::new();
163    let service_config_clone = service_config.clone();
164    let service_shutdown_clone = service_shutdown.clone();
165    let service_handle = tokio::spawn(async move {
166        runner::run_service(node_type, &service_config_clone, service_shutdown_clone).await
167    });
168
169    // Send initial service status
170    let started_at = unix_now();
171    let _ = agent_tx
172        .send(AgentMessage::ServiceStatus {
173            status: ServiceState::Running,
174            started_at,
175            config_version,
176        })
177        .await;
178
179    // Spawn reporter
180    let collector = TrafficCollector::new();
181    let reporter_shutdown = CancellationToken::new();
182    let reporter_handle = tokio::spawn(reporter::run_reporter(
183        agent_tx.clone(),
184        collector.clone(),
185        report_interval,
186        reporter_shutdown.clone(),
187    ));
188
189    // Event loop: handle panel messages, service exit, and shutdown
190    let mut current_config_version = config_version;
191
192    // Pin the service handle so we can poll it across loop iterations
193    let mut service_handle = std::pin::pin!(service_handle);
194
195    let result = loop {
196        tokio::select! {
197            biased;
198
199            _ = shutdown.cancelled() => {
200                info!("shutdown requested, stopping service");
201                service_shutdown.cancel();
202                reporter_shutdown.cancel();
203                break Ok(());
204            }
205
206            // Service exited on its own
207            service_result = &mut *service_handle => {
208                reporter_shutdown.cancel();
209                match service_result {
210                    Ok(Ok(())) => {
211                        info!(node_id = %node_id, "service exited cleanly");
212                        break Ok(());
213                    }
214                    Ok(Err(e)) => {
215                        error!(node_id = %node_id, error = %e, "service exited with error");
216                        let _ = agent_tx.send(AgentMessage::ServiceStatus {
217                            status: ServiceState::Error,
218                            started_at,
219                            config_version: current_config_version,
220                        }).await;
221                        break Err(e);
222                    }
223                    Err(e) => {
224                        error!(error = %e, "service task panicked");
225                        break Err(AgentError::Service("service task panicked".to_string()));
226                    }
227                }
228            }
229
230            // Panel messages
231            panel_msg = panel_rx.recv() => {
232                match panel_msg {
233                    Some(PanelMessage::ConfigPush { version, restart_required, drain_timeout_secs, config: config_bytes }) => {
234                        info!(
235                            version,
236                            restart_required,
237                            "received config push from panel"
238                        );
239
240                        // Parse opaque JSON bytes into Value
241                        let new_config: serde_json::Value = match serde_json::from_slice(&config_bytes) {
242                            Ok(v) => v,
243                            Err(e) => {
244                                error!(error = %e, "invalid config JSON in config push");
245                                let _ = agent_tx.send(AgentMessage::ConfigAck {
246                                    version,
247                                    ok: false,
248                                    message: Some(format!("invalid config JSON: {e}")),
249                                }).await;
250                                continue;
251                            }
252                        };
253
254                        // Cache new config
255                        let cached = CachedConfig {
256                            version,
257                            node_type,
258                            report_interval_secs,
259                            config: new_config,
260                            cached_at: unix_now(),
261                        };
262                        if let Err(e) = cache::write_cache(&cache_dir, &cached).await {
263                            warn!(error = %e, "failed to cache updated config");
264                        }
265
266                        if restart_required {
267                            // Restart the service with new config
268                            let _ = agent_tx.send(AgentMessage::ServiceStatus {
269                                status: ServiceState::Restarting,
270                                started_at,
271                                config_version: current_config_version,
272                            }).await;
273
274                            // Graceful shutdown of current service
275                            let drain_timeout = drain_timeout_secs
276                                .map(|s| Duration::from_secs(u64::from(s)))
277                                .unwrap_or(Duration::from_secs(30));
278                            service_shutdown.cancel();
279                            let _ = tokio::time::timeout(drain_timeout, &mut *service_handle).await;
280
281                            let _ = agent_tx.send(AgentMessage::ConfigAck {
282                                version,
283                                ok: true,
284                                message: None,
285                            }).await;
286
287                            // Session will end, outer loop will reconnect and re-register
288                            // which will boot the new config
289                            reporter_shutdown.cancel();
290                            break Ok(());
291                        }
292
293                        // Non-restart config push — just ack for now
294                        // TODO: hot-reload auth via ReloadableAuth
295                        current_config_version = version;
296
297                        let _ = agent_tx.send(AgentMessage::ConfigAck {
298                            version,
299                            ok: true,
300                            message: None,
301                        }).await;
302                    }
303
304                    Some(PanelMessage::Error { code, message }) => {
305                        error!(?code, %message, "received error from panel");
306                        // Continue running — the panel might recover
307                    }
308
309                    Some(PanelMessage::Registered { .. }) => {
310                        warn!("unexpected duplicate registration message");
311                    }
312
313                    Some(PanelMessage::Ping) => {
314                        // Handled by the recv task in client.rs
315                    }
316
317                    None => {
318                        warn!("panel connection lost");
319                        // Don't stop service — let it keep running
320                        // The outer loop will reconnect
321                        reporter_shutdown.cancel();
322                        break Err(AgentError::ConnectionClosed);
323                    }
324                }
325            }
326        }
327    };
328
329    // Wait for reporter to finish
330    let _ = reporter_handle.await;
331
332    result
333}
334
335/// Run in degraded mode from cached config (no panel connection).
336async fn run_degraded_mode(
337    cache_dir: &std::path::Path,
338    shutdown: CancellationToken,
339) -> Result<(), AgentError> {
340    let cached = match cache::read_cache(cache_dir).await {
341        Some(c) => c,
342        None => {
343            return Err(AgentError::Registration(
344                "panel unreachable and no cached config available".to_string(),
345            ));
346        }
347    };
348
349    warn!(
350        node_type = %cached.node_type,
351        config_version = cached.version,
352        cached_at = cached.cached_at,
353        "running in degraded mode from cached config (no panel connection)"
354    );
355
356    runner::run_service(cached.node_type, &cached.config, shutdown).await
357}
358
359/// Wait for shutdown signals (SIGTERM, SIGINT).
360async fn shutdown_signal_handler() {
361    let ctrl_c = async {
362        if let Err(e) = tokio::signal::ctrl_c().await {
363            warn!("failed to listen for Ctrl+C: {e}");
364            std::future::pending::<()>().await;
365        }
366    };
367
368    #[cfg(unix)]
369    let terminate = async {
370        match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
371            Ok(mut sig) => {
372                sig.recv().await;
373            }
374            Err(e) => {
375                warn!("failed to listen for SIGTERM: {e}");
376                std::future::pending::<()>().await;
377            }
378        }
379    };
380
381    #[cfg(not(unix))]
382    let terminate = std::future::pending::<()>();
383
384    tokio::select! {
385        _ = ctrl_c => {}
386        _ = terminate => {}
387    }
388}
389
390fn init_tracing(level: &str) {
391    let filter = EnvFilter::try_new(level).unwrap_or_else(|_| EnvFilter::new("info"));
392
393    tracing_subscriber::registry()
394        .with(filter)
395        .with(fmt::layer().with_writer(io::stderr))
396        .init();
397}
398
399fn unix_now() -> u64 {
400    SystemTime::now()
401        .duration_since(UNIX_EPOCH)
402        .unwrap_or_default()
403        .as_secs()
404}
405
406/// Simple pseudo-random f64 in [0, 1) for jitter.
407/// Uses system time nanoseconds — good enough for backoff jitter.
408fn rand_f64() -> f64 {
409    let nanos = SystemTime::now()
410        .duration_since(UNIX_EPOCH)
411        .unwrap_or_default()
412        .subsec_nanos();
413    f64::from(nanos) / f64::from(u32::MAX)
414}