1use std::io;
4use std::path::PathBuf;
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6
7use clap::Parser;
8use tokio_util::sync::CancellationToken;
9use tracing::{error, info, warn};
10use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt};
11
12use crate::cache::{self, CachedConfig};
13use crate::client::{self, RegistrationResult};
14use crate::collector::TrafficCollector;
15use crate::config::AgentConfig;
16use crate::error::AgentError;
17use crate::protocol::{AgentMessage, PanelMessage, ServiceState};
18use crate::reporter;
19use crate::runner;
20
21#[derive(Parser, Debug, Clone)]
23#[command(
24 name = "trojan-agent",
25 version,
26 about = "Panel agent — connects to management panel, receives config, boots services"
27)]
28pub struct AgentArgs {
29 #[arg(short, long, default_value = "agent.toml")]
31 pub config: PathBuf,
32
33 #[arg(long)]
35 pub log_level: Option<String>,
36}
37
38pub async fn run(args: AgentArgs) -> Result<(), Box<dyn std::error::Error>> {
40 let config_str = std::fs::read_to_string(&args.config)
41 .map_err(|e| format!("failed to read config file {:?}: {e}", args.config))?;
42 let config: AgentConfig =
43 toml::from_str(&config_str).map_err(|e| format!("failed to parse agent config: {e}"))?;
44
45 let log_level = args
46 .log_level
47 .as_deref()
48 .or(config.log_level.as_deref())
49 .unwrap_or("info");
50 init_tracing(log_level);
51
52 info!(
53 version = trojan_core::VERSION,
54 panel_url = %config.panel_url,
55 "trojan agent starting"
56 );
57
58 let shutdown = CancellationToken::new();
60 let shutdown_signal = shutdown.clone();
61 tokio::spawn(async move {
62 shutdown_signal_handler().await;
63 info!("shutdown signal received");
64 shutdown_signal.cancel();
65 });
66
67 agent_loop(config, shutdown).await;
68 Ok(())
69}
70
71async fn agent_loop(config: AgentConfig, shutdown: CancellationToken) {
75 let mut delay_ms = config.reconnect.initial_delay_ms;
76
77 loop {
78 match run_session(&config, shutdown.clone()).await {
79 Ok(()) => {
80 info!("session ended cleanly");
81 if shutdown.is_cancelled() {
82 return;
83 }
84 delay_ms = config.reconnect.initial_delay_ms;
86 }
87 Err(e) => {
88 if shutdown.is_cancelled() {
89 return;
90 }
91 warn!(error = %e, "session failed");
92 }
93 }
94
95 if shutdown.is_cancelled() {
96 return;
97 }
98
99 let jitter_factor = 1.0 + config.reconnect.jitter * (2.0 * rand_f64() - 1.0);
101 #[expect(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
102 let actual_delay = (delay_ms as f64 * jitter_factor) as u64;
103 let delay = Duration::from_millis(actual_delay);
104
105 info!(delay_ms = actual_delay, "reconnecting after delay");
106
107 tokio::select! {
108 _ = shutdown.cancelled() => return,
109 _ = tokio::time::sleep(delay) => {}
110 }
111
112 #[expect(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
114 let next = (delay_ms as f64 * config.reconnect.multiplier) as u64;
115 delay_ms = next.min(config.reconnect.max_delay_ms);
116 }
117}
118
119async fn run_session(config: &AgentConfig, shutdown: CancellationToken) -> Result<(), AgentError> {
121 let cache_dir = cache::resolve_cache_dir(config.cache_dir.as_deref());
122
123 let (reg, agent_tx, mut panel_rx) =
125 match client::connect_and_register(config, shutdown.clone()).await {
126 Ok(result) => result,
127 Err(e) => {
128 warn!(error = %e, "failed to connect to panel, checking local cache");
130 return run_degraded_mode(&cache_dir, shutdown).await;
131 }
132 };
133
134 let RegistrationResult {
135 node_id,
136 node_type,
137 config_version,
138 report_interval_secs,
139 config: service_config,
140 } = reg;
141
142 let cached = CachedConfig {
144 version: config_version,
145 node_type,
146 report_interval_secs,
147 config: service_config.clone(),
148 cached_at: unix_now(),
149 };
150 if let Err(e) = cache::write_cache(&cache_dir, &cached).await {
151 warn!(error = %e, "failed to cache config (non-fatal)");
152 }
153
154 let report_interval = Duration::from_secs(
156 config
157 .report_interval_secs
158 .unwrap_or_else(|| u64::from(report_interval_secs)),
159 );
160
161 let service_shutdown = CancellationToken::new();
163 let service_config_clone = service_config.clone();
164 let service_shutdown_clone = service_shutdown.clone();
165 let service_handle = tokio::spawn(async move {
166 runner::run_service(node_type, &service_config_clone, service_shutdown_clone).await
167 });
168
169 let started_at = unix_now();
171 let _ = agent_tx
172 .send(AgentMessage::ServiceStatus {
173 status: ServiceState::Running,
174 started_at,
175 config_version,
176 })
177 .await;
178
179 let collector = TrafficCollector::new();
181 let reporter_shutdown = CancellationToken::new();
182 let reporter_handle = tokio::spawn(reporter::run_reporter(
183 agent_tx.clone(),
184 collector.clone(),
185 report_interval,
186 reporter_shutdown.clone(),
187 ));
188
189 let mut current_config_version = config_version;
191
192 let mut service_handle = std::pin::pin!(service_handle);
194
195 let result = loop {
196 tokio::select! {
197 biased;
198
199 _ = shutdown.cancelled() => {
200 info!("shutdown requested, stopping service");
201 service_shutdown.cancel();
202 reporter_shutdown.cancel();
203 break Ok(());
204 }
205
206 service_result = &mut *service_handle => {
208 reporter_shutdown.cancel();
209 match service_result {
210 Ok(Ok(())) => {
211 info!(node_id = %node_id, "service exited cleanly");
212 break Ok(());
213 }
214 Ok(Err(e)) => {
215 error!(node_id = %node_id, error = %e, "service exited with error");
216 let _ = agent_tx.send(AgentMessage::ServiceStatus {
217 status: ServiceState::Error,
218 started_at,
219 config_version: current_config_version,
220 }).await;
221 break Err(e);
222 }
223 Err(e) => {
224 error!(error = %e, "service task panicked");
225 break Err(AgentError::Service("service task panicked".to_string()));
226 }
227 }
228 }
229
230 panel_msg = panel_rx.recv() => {
232 match panel_msg {
233 Some(PanelMessage::ConfigPush { version, restart_required, drain_timeout_secs, config: config_bytes }) => {
234 info!(
235 version,
236 restart_required,
237 "received config push from panel"
238 );
239
240 let new_config: serde_json::Value = match serde_json::from_slice(&config_bytes) {
242 Ok(v) => v,
243 Err(e) => {
244 error!(error = %e, "invalid config JSON in config push");
245 let _ = agent_tx.send(AgentMessage::ConfigAck {
246 version,
247 ok: false,
248 message: Some(format!("invalid config JSON: {e}")),
249 }).await;
250 continue;
251 }
252 };
253
254 let cached = CachedConfig {
256 version,
257 node_type,
258 report_interval_secs,
259 config: new_config,
260 cached_at: unix_now(),
261 };
262 if let Err(e) = cache::write_cache(&cache_dir, &cached).await {
263 warn!(error = %e, "failed to cache updated config");
264 }
265
266 if restart_required {
267 let _ = agent_tx.send(AgentMessage::ServiceStatus {
269 status: ServiceState::Restarting,
270 started_at,
271 config_version: current_config_version,
272 }).await;
273
274 let drain_timeout = drain_timeout_secs
276 .map(|s| Duration::from_secs(u64::from(s)))
277 .unwrap_or(Duration::from_secs(30));
278 service_shutdown.cancel();
279 let _ = tokio::time::timeout(drain_timeout, &mut *service_handle).await;
280
281 let _ = agent_tx.send(AgentMessage::ConfigAck {
282 version,
283 ok: true,
284 message: None,
285 }).await;
286
287 reporter_shutdown.cancel();
290 break Ok(());
291 }
292
293 current_config_version = version;
296
297 let _ = agent_tx.send(AgentMessage::ConfigAck {
298 version,
299 ok: true,
300 message: None,
301 }).await;
302 }
303
304 Some(PanelMessage::Error { code, message }) => {
305 error!(?code, %message, "received error from panel");
306 }
308
309 Some(PanelMessage::Registered { .. }) => {
310 warn!("unexpected duplicate registration message");
311 }
312
313 Some(PanelMessage::Ping) => {
314 }
316
317 None => {
318 warn!("panel connection lost");
319 reporter_shutdown.cancel();
322 break Err(AgentError::ConnectionClosed);
323 }
324 }
325 }
326 }
327 };
328
329 let _ = reporter_handle.await;
331
332 result
333}
334
335async fn run_degraded_mode(
337 cache_dir: &std::path::Path,
338 shutdown: CancellationToken,
339) -> Result<(), AgentError> {
340 let cached = match cache::read_cache(cache_dir).await {
341 Some(c) => c,
342 None => {
343 return Err(AgentError::Registration(
344 "panel unreachable and no cached config available".to_string(),
345 ));
346 }
347 };
348
349 warn!(
350 node_type = %cached.node_type,
351 config_version = cached.version,
352 cached_at = cached.cached_at,
353 "running in degraded mode from cached config (no panel connection)"
354 );
355
356 runner::run_service(cached.node_type, &cached.config, shutdown).await
357}
358
359async fn shutdown_signal_handler() {
361 let ctrl_c = async {
362 if let Err(e) = tokio::signal::ctrl_c().await {
363 warn!("failed to listen for Ctrl+C: {e}");
364 std::future::pending::<()>().await;
365 }
366 };
367
368 #[cfg(unix)]
369 let terminate = async {
370 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
371 Ok(mut sig) => {
372 sig.recv().await;
373 }
374 Err(e) => {
375 warn!("failed to listen for SIGTERM: {e}");
376 std::future::pending::<()>().await;
377 }
378 }
379 };
380
381 #[cfg(not(unix))]
382 let terminate = std::future::pending::<()>();
383
384 tokio::select! {
385 _ = ctrl_c => {}
386 _ = terminate => {}
387 }
388}
389
390fn init_tracing(level: &str) {
391 let filter = EnvFilter::try_new(level).unwrap_or_else(|_| EnvFilter::new("info"));
392
393 tracing_subscriber::registry()
394 .with(filter)
395 .with(fmt::layer().with_writer(io::stderr))
396 .init();
397}
398
399fn unix_now() -> u64 {
400 SystemTime::now()
401 .duration_since(UNIX_EPOCH)
402 .unwrap_or_default()
403 .as_secs()
404}
405
406fn rand_f64() -> f64 {
409 let nanos = SystemTime::now()
410 .duration_since(UNIX_EPOCH)
411 .unwrap_or_default()
412 .subsec_nanos();
413 f64::from(nanos) / f64::from(u32::MAX)
414}