1use std::collections::HashMap;
11use std::collections::HashSet;
12use std::fs;
13use std::net::SocketAddr;
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::sync::atomic::AtomicU64;
17use std::sync::atomic::Ordering;
18
19use axum::Router;
20use axum::routing::delete;
21use axum::routing::get;
22use axum::routing::post;
23use axum::routing::put;
24use bollard::Docker;
25use bollard::query_parameters::EventsOptions;
26use bollard::query_parameters::ListContainersOptionsBuilder;
27use color_eyre::Result;
28use color_eyre::eyre::eyre;
29use futures_util::stream::StreamExt;
30use hyper_util::rt::TokioExecutor;
31use hyper_util::rt::TokioIo;
32use hyper_util::server::conn::auto::Builder;
33use lab_ops_lab_lib::port::PortAllocator;
34use serde::Serialize;
35use tokio::process::Command;
36use tokio::sync::RwLock;
37use tower_service::Service;
38use tracing::info;
39
40use crate::api::add_dnat;
41use crate::api::add_hairpin;
42use crate::api::add_mapping;
43use crate::api::add_snat;
44use crate::api::clear_all;
45use crate::api::list_mappings;
46use crate::api::remap_port;
47use crate::api::remove_dnat;
48use crate::api::remove_hairpin;
49use crate::api::remove_mapping;
50use crate::api::remove_mapping_by_id;
51use crate::api::remove_snat;
52use crate::api::unbind_ports;
53use crate::docker;
54use crate::iptables::IptablesManager;
55use crate::models::DaemonState;
56use crate::models::DockerPortMap;
57
58#[derive(Clone)]
60pub struct AppState {
61 pub daemon_state: Arc<RwLock<DaemonState>>,
63 pub iptables: Arc<IptablesManager>,
65 pub docker: Option<Docker>,
67 pub state_path: PathBuf,
69 pub socket_path: PathBuf,
71 pub socket_group: String,
73 pub next_id: Arc<AtomicU64>,
75 pub ports: Arc<PortAllocator>,
77}
78
79impl AppState {
80 pub fn allocate_id(&self) -> u64 {
82 self.next_id.fetch_add(1, Ordering::SeqCst)
83 }
84
85 pub async fn persist(&self) {
87 let data = {
88 let lock = self.daemon_state.read().await;
89 serde_json::to_string(&*lock).unwrap_or_default()
90 };
91 let tmp = self.state_path.with_extension("tmp");
92 if fs::write(&tmp, data).is_ok() {
93 let _ = fs::rename(&tmp, &self.state_path);
94 }
95 }
96}
97
98#[derive(Serialize)]
100pub struct ErrorResponse {
101 pub error: String,
102}
103
104#[derive(Clone)]
105pub struct Daemon {
106 state: AppState,
107 app: Router<()>,
108}
109
110impl Daemon {
111 pub async fn new(
112 socket_path: PathBuf,
113 state_path: PathBuf,
114 socket_group: String,
115 ) -> Result<Self> {
116 tracing::info!(daemon = "natmap", "starting natmap daemon");
117
118 let docker = docker::connect().ok();
119 if docker.is_none() {
120 tracing::info!(
121 "failed connecting to Docker daemon via Unix socket — running without Docker support"
122 );
123 }
124
125 let state_dir = state_path.parent().unwrap();
126 if !state_dir.exists() {
127 fs::create_dir_all(state_dir).map_err(|e| {
128 eyre!(
129 "Failed to create state directory {}: {e}",
130 state_dir.display()
131 )
132 })?;
133 }
134
135 let iptables = Arc::new(IptablesManager::new());
136 iptables
137 .setup()
138 .map_err(|e| eyre!("Failed to set up iptables chains: {e}"))?;
139
140 let ports = Arc::new(PortAllocator::new());
141 let daemon_state = Arc::new(RwLock::new(DaemonState::default()));
142
143 let state = AppState {
144 daemon_state,
145 iptables,
146 docker,
147 state_path,
148 next_id: Arc::new(AtomicU64::new(1)),
149 ports,
150 socket_group,
151 socket_path,
152 };
153
154 let app = Router::new()
155 .route("/mappings", get(list_mappings))
156 .route("/remap/:container_id", put(remap_port))
157 .route("/mapping/:container_id", post(add_mapping))
158 .route("/mapping/{container_id}/{port}", delete(remove_mapping))
159 .route("/mapping/by-id/:id", delete(remove_mapping_by_id))
160 .route("/dnat", post(add_dnat))
161 .route("/dnat", delete(remove_dnat))
162 .route("/snat", post(add_snat))
163 .route("/snat", delete(remove_snat))
164 .route("/hairpin", post(add_hairpin))
165 .route("/hairpin", delete(remove_hairpin))
166 .route("/clear", delete(clear_all))
167 .with_state(state.clone());
168
169 Ok(Self { state, app })
170 }
171
172 #[tracing::instrument(skip_all, fields(daemon = "natmap", socket.path = %self.state.socket_path.display()))]
177 pub async fn run(&self) -> Result<()> {
178 self.reload().await?;
179
180 let state = self.state.clone();
181
182 if state.docker.is_some() {
183 let self_clone = self.clone();
184 tokio::spawn(async move {
185 if let Err(e) = self_clone.listen_docker_events().await {
186 tracing::error!(error = %e, "docker listener exited with error");
187 }
188 });
189 }
190
191 tokio::spawn(async move {
192 tokio::signal::ctrl_c().await.ok();
193 tracing::info!("shutting down: flushing iptables rules");
194 let _ = state.iptables.flush_all_natmap();
195 state.ports.deallocate_all().await;
196 tracing::info!("shutdown complete");
197 std::process::exit(0);
198 });
199
200 if state.socket_path.exists() {
201 let _ = fs::remove_file(&state.socket_path);
202 }
203
204 let socket_path_str = state.socket_path.display().to_string();
205 let listener = tokio::net::UnixListener::bind(state.socket_path)
206 .map_err(|e| eyre!("Failed to bind Unix socket at {}: {e}", socket_path_str))?;
207
208 let _ = Command::new("chown")
209 .args([
210 format!("root:{}", state.socket_group),
211 socket_path_str.to_string(),
212 ])
213 .status()
214 .await;
215 let _ = Command::new("chmod")
216 .args(["660", &socket_path_str])
217 .status()
218 .await;
219
220 tracing::info!(socket.path = %socket_path_str, "listening on unix socket");
221
222 loop {
223 let (socket, _) = listener.accept().await?;
224 let app = self.app.clone();
225
226 tokio::spawn(async move {
227 let socket = TokioIo::new(socket);
228
229 let srv = hyper::service::service_fn(
230 move |request: hyper::Request<hyper::body::Incoming>| app.clone().call(request),
231 );
232
233 if let Err(err) = Builder::new(TokioExecutor::new())
234 .serve_connection_with_upgrades(socket, srv)
235 .await
236 {
237 tracing::error!(error = %err, "failed to serve connection");
238 }
239 });
240 }
241
242 #[allow(unreachable_code)]
243 Ok(())
244 }
245
246 #[tracing::instrument(skip_all, fields(mappings.count = tracing::field::Empty, dnats.count = tracing::field::Empty))]
251 pub async fn reload(&self) -> Result<()> {
252 info!("crash recovery: flushing stale iptables rules");
253 let state = self.state.clone();
254 let ports = self.state.ports.clone();
255 let iptables = self.state.iptables.clone();
256
257 let _ = iptables.flush_all_natmap();
259 ports.deallocate_all().await;
260
261 let mut daemon_state = self.create_daemon_state();
262
263 let _ = self
265 .reconcile_docker_portmaps(&mut daemon_state)
266 .await
267 .map_err(|e| tracing::error!(error = %e, "error when reconciling docker portmaps"));
268
269 self.reconcile_hairpins(&mut daemon_state).await;
271 self.reconcile_dnats(&mut daemon_state).await;
272 self.reconcile_snats(&daemon_state).await;
273
274 let mappings_count: usize = daemon_state.mapping.values().map(|m| m.len()).sum();
275 let dnats_count = daemon_state.dnats.len();
276
277 let span = tracing::Span::current();
278 span.record("mappings.count", mappings_count);
279 span.record("dnats.count", dnats_count);
280
281 *state.daemon_state.write().await = daemon_state;
282 self.state.persist().await;
283
284 tracing::info!("reload complete");
285 Ok(())
286 }
287
288 #[tracing::instrument(skip_all, fields(container.id = tracing::field::Empty, event.action = tracing::field::Empty))]
290 pub async fn handle_docker_event(&self, event: bollard::models::EventMessage, docker: &Docker) {
291 tracing::trace!(?event, "raw docker event");
292
293 let Some(action) = event.action else {
294 return;
295 };
296 let Some(actor) = event.actor else {
297 return;
298 };
299 let Some(container_id) = actor.id else {
300 return;
301 };
302
303 use bollard::plugin::EventMessageTypeEnum::*;
304 let Some(typ) = event.typ else {
305 return;
306 };
307
308 let span = tracing::Span::current();
309 span.record("container.id", &container_id);
310 span.record("event.action", &action);
311
312 match (typ, action.as_str()) {
313 (CONTAINER, "start") | (NETWORK, "connect") => {
314 self.on_container_start(container_id, docker).await
315 }
316 (CONTAINER, "die" | "kill") | (NETWORK, "disconnect") => {
317 self.on_container_stop(container_id).await
318 }
319 _ => {}
320 }
321 }
322
323 async fn listen_docker_events(&self) -> Result<()> {
328 let docker = self
329 .state
330 .docker
331 .as_ref()
332 .ok_or_else(|| eyre!("Docker not available"))?;
333 let opts = EventsOptions {
334 since: None,
335 until: None,
336 filters: Some(
337 [("type".to_string(), vec!["container".to_string()])]
338 .into_iter()
339 .collect(),
340 ),
341 };
342 let mut stream = docker.events(Some(opts));
343
344 while let Some(msg) = stream.next().await {
345 let Ok(event) = msg else { continue };
346 self.handle_docker_event(event, docker).await;
347 }
348 Ok(())
349 }
350
351 async fn on_container_stop(&self, container_id: String) {
352 tracing::debug!("container died, flushing rules");
353 let state = &self.state;
354 let mut lock = state.daemon_state.write().await;
355
356 let Some(mappings) = lock.mapping.remove(&container_id) else {
357 return;
358 };
359
360 for m in &mappings {
361 let _ = state.iptables.remove_mapping(m);
362 state.ports.deallocate(m.request.host_addr).await;
363 }
364 drop(lock);
365 state.persist().await;
366 }
367
368 async fn on_container_start(&self, container_id: String, docker: &Docker) {
369 tracing::debug!("container started, parsing mappings");
370 let state = &self.state;
371
372 let Ok(discovered) = docker::get_port_mappings(docker, &container_id).await else {
373 return;
374 };
375
376 let mut assigned = Vec::new();
377 for mut m in discovered {
378 m.id = state.allocate_id();
379 let host_addr = m.request.host_addr;
380 if state.ports.is_allocated(host_addr).await {
381 tracing::warn!(host.addr = %host_addr, "address already allocated, skipping");
382 continue;
383 }
384 if let Err(e) = state.ports.allocate(host_addr).await {
385 tracing::warn!(host.addr = %host_addr, error = %e, "failed allocating, skipping");
386 continue;
387 }
388 if let Err(e) = state.iptables.install_dockermap(&m) {
389 tracing::error!(mapping = ?m, error = %e, "failed to install mapping");
390 state.ports.deallocate(host_addr).await;
391 continue;
392 }
393 assigned.push(m);
394 }
395 let mut lock = state.daemon_state.write().await;
396 let existing = lock.mapping.entry(container_id.clone()).or_default();
397 let auto_comments: HashSet<String> =
398 assigned.iter().map(|m| m.rule_comment.clone()).collect();
399 existing.retain(|m| !auto_comments.contains(&m.rule_comment));
400 existing.extend(assigned);
401 drop(lock);
402 state.persist().await;
403 }
404
405 fn create_daemon_state(&self) -> DaemonState {
407 if self.state.state_path.exists()
408 && let Ok(data) = fs::read_to_string(&self.state.state_path)
409 {
410 serde_json::from_str(&data).unwrap_or_default()
411 } else {
412 DaemonState::default()
413 }
414 }
415
416 async fn reconcile_docker_portmaps(&self, daemon_state: &mut DaemonState) -> Result<()> {
417 let state = &self.state;
418 let ports = &self.state.ports;
419 let iptables = &self.state.iptables;
420
421 if let Some(docker) = &state.docker {
422 let opt = ListContainersOptionsBuilder::new().build();
423 let running_ids: HashSet<String> = docker
424 .list_containers(Some(opt))
425 .await?
426 .into_iter()
427 .filter_map(|c| c.id)
428 .collect();
429
430 let mut max_id: u64 = 0;
431 let old_maps: Vec<(String, Vec<DockerPortMap>)> =
432 daemon_state.mapping.drain().collect();
433 let mut new_docker = HashMap::new();
434
435 for (id, maps) in old_maps {
437 if !running_ids.contains(&id) {
438 tracing::info!(container.id = %id, "container gone, removing mappings");
439 continue;
440 }
441 let mut kept = Vec::new();
442 for m in maps {
444 let host_addr = m.request.host_addr;
445
446 if ports.is_allocated(host_addr).await {
448 tracing::warn!(host.addr = %host_addr, "address already held, removing stale mapping");
449 continue;
450 }
451 if let Err(e) = ports.allocate(host_addr).await {
452 tracing::warn!(host.addr = %host_addr, error = %e, "address in use, dropping mapping");
453 continue;
454 }
455
456 let _ = iptables.install_dockermap(&m);
458 max_id = max_id.max(m.id);
459 kept.push(m);
460 }
461 if !kept.is_empty() {
462 new_docker.insert(id, kept);
463 }
464 }
465 daemon_state.mapping = new_docker;
466 state
467 .next_id
468 .store(max_id.saturating_add(1), Ordering::SeqCst);
469 }
470 Ok(())
471 }
472
473 async fn reconcile_hairpins(&self, daemon_state: &mut DaemonState) {
474 let mut keep = Vec::new();
475 for config in daemon_state.hairpins.drain(..) {
476 if Self::should_reconcile(&config.ports, &config.ext_ip, &self.state.ports).await {
477 let _ = self.state.iptables.install_hairpin(&config);
478 keep.push(config);
479 } else {
480 unbind_ports(self.state.ports.clone(), &config.ext_ip, &config.ports).await;
481 }
482 }
483 daemon_state.hairpins = keep;
484 }
485
486 async fn reconcile_dnats(&self, daemon_state: &mut DaemonState) {
487 let mut keep = Vec::new();
488 for config in daemon_state.dnats.drain(..) {
489 if Self::should_reconcile(&config.ports, &config.ext_ip, &self.state.ports).await {
490 let _ = self.state.iptables.install_dnat(&config);
491 keep.push(config);
492 } else {
493 unbind_ports(self.state.ports.clone(), &config.ext_ip, &config.ports).await;
494 }
495 }
496 daemon_state.dnats = keep;
497 }
498
499 async fn reconcile_snats(&self, daemon_state: &DaemonState) {
500 for config in &daemon_state.snats {
501 let _ = self.state.iptables.install_snat(config);
502 }
503 }
504
505 async fn should_reconcile(configs_ports: &str, ext_ip: &str, ports: &PortAllocator) -> bool {
507 let ip = match ext_ip.parse() {
508 Ok(ip) => ip,
509 Err(e) => {
510 tracing::error!(ext.ip = %ext_ip, error = %e, "invalid IP");
511 return false;
512 }
513 };
514
515 for port in configs_ports
516 .split(',')
517 .filter_map(|p| p.trim().parse::<u16>().ok())
518 {
519 let addr = SocketAddr::new(ip, port);
520 if ports.is_allocated(addr).await {
521 continue;
522 }
523 if let Err(e) = ports.allocate(addr).await {
524 tracing::warn!(port = %port, error = %e, "port in use, dropping");
525 return false;
526 }
527 }
528 true
529 }
530}
531
532#[cfg(test)]
533mod tests {
534 use std::path::PathBuf;
535 use std::sync::Arc;
536 use std::sync::atomic::AtomicU64;
537
538 use axum::Router;
539 use bollard::Docker;
540 use bollard::models::EventActor;
541 use bollard::models::EventMessage;
542 use lab_ops_lab_lib::port::PortAllocator;
543 use tokio::sync::RwLock;
544 use tracing_test::traced_test;
545
546 use super::AppState;
547 use super::Daemon;
548 use crate::iptables::IptablesManager;
549 use crate::models::DaemonState;
550
551 fn create_test_daemon(state_path: PathBuf) -> Daemon {
552 let iptables = Arc::new(IptablesManager::new());
553 let ports = Arc::new(PortAllocator::new());
554 let daemon_state = Arc::new(RwLock::new(DaemonState::default()));
555
556 let state = AppState {
557 daemon_state,
558 iptables,
559 docker: None,
560 state_path,
561 next_id: Arc::new(AtomicU64::new(1)),
562 ports,
563 socket_group: "root".to_string(),
564 socket_path: PathBuf::from("/tmp/natmap.sock"),
565 };
566
567 Daemon {
568 state,
569 app: Router::new(),
570 }
571 }
572
573 #[tokio::test]
574 #[traced_test]
575 async fn reload_state_logs_mapping_count() {
576 let temp_dir = tempfile::tempdir().unwrap();
577 let state_path = temp_dir.path().join("state.json");
578
579 let daemon = create_test_daemon(state_path);
580
581 let _ = daemon.reload().await;
582
583 assert!(logs_contain("mappings.count="));
584 }
585
586 #[tokio::test]
587 #[traced_test]
588 async fn handle_docker_event_span_has_container_id() {
589 let temp_dir = tempfile::tempdir().unwrap();
590 let state_path = temp_dir.path().join("state.json");
591
592 let daemon = create_test_daemon(state_path);
593 let docker = Docker::connect_with_local_defaults().unwrap();
594
595 let event = EventMessage {
596 action: Some("start".to_string()),
597 actor: Some(EventActor {
598 id: Some("1234567890".to_string()),
599 ..Default::default()
600 }),
601 typ: Some(bollard::plugin::EventMessageTypeEnum::CONTAINER),
602 ..Default::default()
603 };
604
605 daemon.handle_docker_event(event, &docker).await;
606
607 assert!(logs_contain("container.id=\"1234567890\""));
608 }
609}