1use std::collections::HashMap;
11use std::collections::HashSet;
12use std::fs;
13use std::net::SocketAddr;
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::sync::atomic::AtomicU64;
17use std::sync::atomic::Ordering;
18
19use axum::Router;
20use axum::routing::delete;
21use axum::routing::get;
22use axum::routing::post;
23use axum::routing::put;
24use bollard::Docker;
25use bollard::query_parameters::EventsOptions;
26use bollard::query_parameters::ListContainersOptionsBuilder;
27use color_eyre::Result;
28use color_eyre::eyre::eyre;
29use futures_util::stream::StreamExt;
30use hyper_util::rt::TokioExecutor;
31use hyper_util::rt::TokioIo;
32use hyper_util::server::conn::auto::Builder;
33use lab_ops_lab_lib::port::PortAllocator;
34use serde::Serialize;
35use tokio::process::Command;
36use tokio::sync::RwLock;
37use tower_service::Service;
38use tracing::info;
39
40use crate::api::add_dnat;
41use crate::api::add_hairpin;
42use crate::api::add_mapping;
43use crate::api::add_policy_route;
44use crate::api::add_snat;
45use crate::api::clear_all;
46use crate::api::list_mappings;
47use crate::api::remap_port;
48use crate::api::remove_dnat;
49use crate::api::remove_hairpin;
50use crate::api::remove_mapping;
51use crate::api::remove_mapping_by_id;
52use crate::api::remove_policy_route;
53use crate::api::remove_snat;
54use crate::api::unbind_ports;
55use crate::docker;
56use crate::iptables::IptablesManager;
57use crate::models::DaemonState;
58use crate::models::DockerPortMap;
59use crate::policy_route::PolicyRouteManager;
60
61#[derive(Clone)]
63pub struct AppState {
64 pub daemon_state: Arc<RwLock<DaemonState>>,
66 pub iptables: Arc<IptablesManager>,
68 pub policy_route: Arc<PolicyRouteManager>,
70 pub docker: Option<Docker>,
72 pub state_path: PathBuf,
74 pub socket_path: PathBuf,
76 pub socket_group: String,
78 pub next_id: Arc<AtomicU64>,
80 pub ports: Arc<PortAllocator>,
82}
83
84impl AppState {
85 pub fn allocate_id(&self) -> u64 {
87 self.next_id.fetch_add(1, Ordering::SeqCst)
88 }
89
90 pub async fn persist(&self) {
92 let data = {
93 let lock = self.daemon_state.read().await;
94 serde_json::to_string(&*lock).unwrap_or_default()
95 };
96 let tmp = self.state_path.with_extension("tmp");
97 if fs::write(&tmp, data).is_ok() {
98 let _ = fs::rename(&tmp, &self.state_path);
99 }
100 }
101}
102
103#[derive(Serialize)]
105pub struct ErrorResponse {
106 pub error: String,
107}
108
109#[derive(Clone)]
110pub struct Daemon {
111 state: AppState,
112 app: Router<()>,
113}
114
115impl Daemon {
116 pub async fn new(
117 socket_path: PathBuf,
118 state_path: PathBuf,
119 socket_group: String,
120 ) -> Result<Self> {
121 tracing::info!(daemon = "natmap", "starting natmap daemon");
122
123 let docker = docker::connect().ok();
124 if docker.is_none() {
125 tracing::info!(
126 "failed connecting to Docker daemon via Unix socket — running without Docker support"
127 );
128 }
129
130 let state_dir = state_path.parent().unwrap();
131 if !state_dir.exists() {
132 fs::create_dir_all(state_dir).map_err(|e| {
133 eyre!(
134 "Failed to create state directory {}: {e}",
135 state_dir.display()
136 )
137 })?;
138 }
139
140 let iptables = Arc::new(IptablesManager::new());
141 iptables
142 .setup()
143 .map_err(|e| eyre!("Failed to set up iptables chains: {e}"))?;
144
145 let ports = Arc::new(PortAllocator::new());
146 let daemon_state = Arc::new(RwLock::new(DaemonState::default()));
147 let policy_route = Arc::new(PolicyRouteManager::new());
148
149 let state = AppState {
150 daemon_state,
151 iptables,
152 policy_route,
153 docker,
154 state_path,
155 next_id: Arc::new(AtomicU64::new(1)),
156 ports,
157 socket_group,
158 socket_path,
159 };
160
161 let app = Router::new()
162 .route("/mappings", get(list_mappings))
163 .route("/remap/:container_id", put(remap_port))
164 .route("/mapping/:container_id", post(add_mapping))
165 .route("/mapping/{container_id}/{port}", delete(remove_mapping))
166 .route("/mapping/by-id/:id", delete(remove_mapping_by_id))
167 .route("/dnat", post(add_dnat))
168 .route("/dnat", delete(remove_dnat))
169 .route("/snat", post(add_snat))
170 .route("/snat", delete(remove_snat))
171 .route("/hairpin", post(add_hairpin))
172 .route("/hairpin", delete(remove_hairpin))
173 .route("/policy-route", post(add_policy_route))
174 .route("/policy-route", delete(remove_policy_route))
175 .route("/clear", delete(clear_all))
176 .with_state(state.clone());
177
178 Ok(Self { state, app })
179 }
180
181 #[tracing::instrument(skip_all, fields(daemon = "natmap", socket.path = %self.state.socket_path.display()))]
186 pub async fn run(&self) -> Result<()> {
187 self.reload().await?;
188
189 let state = self.state.clone();
190
191 if state.docker.is_some() {
192 let self_clone = self.clone();
193 tokio::spawn(async move {
194 if let Err(e) = self_clone.listen_docker_events().await {
195 tracing::error!(error = %e, "docker listener exited with error");
196 }
197 });
198 }
199
200 tokio::spawn(async move {
201 tokio::signal::ctrl_c().await.ok();
202 tracing::info!("shutting down: flushing iptables rules");
203 let _ = state.iptables.flush_all_natmap();
204 let daemon_state = state.daemon_state.read().await;
205 let _ = state.policy_route.flush_all(&daemon_state.policy_routes);
206 drop(daemon_state);
207 state.ports.deallocate_all().await;
208 tracing::info!("shutdown complete");
209 std::process::exit(0);
210 });
211
212 if state.socket_path.exists() {
213 let _ = fs::remove_file(&state.socket_path);
214 }
215
216 let socket_path_str = state.socket_path.display().to_string();
217 let listener = tokio::net::UnixListener::bind(state.socket_path)
218 .map_err(|e| eyre!("Failed to bind Unix socket at {}: {e}", socket_path_str))?;
219
220 let _ = Command::new("chown")
221 .args([
222 format!("root:{}", state.socket_group),
223 socket_path_str.to_string(),
224 ])
225 .status()
226 .await;
227 let _ = Command::new("chmod")
228 .args(["660", &socket_path_str])
229 .status()
230 .await;
231
232 tracing::info!(socket.path = %socket_path_str, "listening on unix socket");
233
234 loop {
235 let (socket, _) = listener.accept().await?;
236 let app = self.app.clone();
237
238 tokio::spawn(async move {
239 let socket = TokioIo::new(socket);
240
241 let srv = hyper::service::service_fn(
242 move |request: hyper::Request<hyper::body::Incoming>| app.clone().call(request),
243 );
244
245 if let Err(err) = Builder::new(TokioExecutor::new())
246 .serve_connection_with_upgrades(socket, srv)
247 .await
248 {
249 tracing::error!(error = %err, "failed to serve connection");
250 }
251 });
252 }
253
254 #[allow(unreachable_code)]
255 Ok(())
256 }
257
258 #[tracing::instrument(skip_all, fields(mappings.count = tracing::field::Empty, dnats.count = tracing::field::Empty))]
263 pub async fn reload(&self) -> Result<()> {
264 info!("crash recovery: flushing stale iptables rules");
265 let state = self.state.clone();
266 let ports = self.state.ports.clone();
267 let iptables = self.state.iptables.clone();
268 let policy_route = self.state.policy_route.clone();
269
270 let _ = iptables.flush_all_natmap();
272 let _ = policy_route.flush_all(&state.daemon_state.read().await.policy_routes);
273 ports.deallocate_all().await;
274
275 let mut daemon_state = self.create_daemon_state();
276
277 let _ = self
279 .reconcile_docker_portmaps(&mut daemon_state)
280 .await
281 .map_err(|e| tracing::error!(error = %e, "error when reconciling docker portmaps"));
282
283 self.reconcile_hairpins(&mut daemon_state).await;
285 self.reconcile_dnats(&mut daemon_state).await;
286 self.reconcile_snats(&daemon_state).await;
287 self.reconcile_policy_routes(&mut daemon_state).await;
288
289 let mappings_count: usize = daemon_state.mapping.values().map(|m| m.len()).sum();
290 let dnats_count = daemon_state.dnats.len();
291
292 let span = tracing::Span::current();
293 span.record("mappings.count", mappings_count);
294 span.record("dnats.count", dnats_count);
295
296 *state.daemon_state.write().await = daemon_state;
297 self.state.persist().await;
298
299 tracing::info!("reload complete");
300 Ok(())
301 }
302
303 #[tracing::instrument(skip_all, fields(container.id = tracing::field::Empty, event.action = tracing::field::Empty))]
305 pub async fn handle_docker_event(&self, event: bollard::models::EventMessage, docker: &Docker) {
306 tracing::trace!(?event, "raw docker event");
307
308 let Some(action) = event.action else {
309 return;
310 };
311 let Some(actor) = event.actor else {
312 return;
313 };
314 let Some(container_id) = actor.id else {
315 return;
316 };
317
318 use bollard::plugin::EventMessageTypeEnum::*;
319 let Some(typ) = event.typ else {
320 return;
321 };
322
323 let span = tracing::Span::current();
324 span.record("container.id", &container_id);
325 span.record("event.action", &action);
326
327 match (typ, action.as_str()) {
328 (CONTAINER, "start") | (NETWORK, "connect") => {
329 self.on_container_start(container_id, docker).await
330 }
331 (CONTAINER, "die" | "kill") | (NETWORK, "disconnect") => {
332 self.on_container_stop(container_id).await
333 }
334 _ => {}
335 }
336 }
337
338 async fn listen_docker_events(&self) -> Result<()> {
343 let docker = self
344 .state
345 .docker
346 .as_ref()
347 .ok_or_else(|| eyre!("Docker not available"))?;
348 let opts = EventsOptions {
349 since: None,
350 until: None,
351 filters: Some(
352 [("type".to_string(), vec!["container".to_string()])]
353 .into_iter()
354 .collect(),
355 ),
356 };
357 let mut stream = docker.events(Some(opts));
358
359 while let Some(msg) = stream.next().await {
360 let Ok(event) = msg else { continue };
361 self.handle_docker_event(event, docker).await;
362 }
363 Ok(())
364 }
365
366 async fn on_container_stop(&self, container_id: String) {
367 tracing::debug!("container died, flushing rules");
368 let state = &self.state;
369 let mut lock = state.daemon_state.write().await;
370
371 let Some(mappings) = lock.mapping.remove(&container_id) else {
372 return;
373 };
374
375 for m in &mappings {
376 let _ = state.iptables.remove_mapping(m);
377 state.ports.deallocate(m.request.host_addr).await;
378 }
379 drop(lock);
380 state.persist().await;
381 }
382
383 async fn on_container_start(&self, container_id: String, docker: &Docker) {
384 tracing::debug!("container started, parsing mappings");
385 let state = &self.state;
386
387 let Ok(discovered) = docker::get_port_mappings(docker, &container_id).await else {
388 return;
389 };
390
391 let mut assigned = Vec::new();
392 for mut m in discovered {
393 m.id = state.allocate_id();
394 let host_addr = m.request.host_addr;
395 if state.ports.is_allocated(host_addr).await {
396 tracing::warn!(host.addr = %host_addr, "address already allocated, skipping");
397 continue;
398 }
399 if let Err(e) = state.ports.allocate(host_addr).await {
400 tracing::warn!(host.addr = %host_addr, error = %e, "failed allocating, skipping");
401 continue;
402 }
403 if let Err(e) = state.iptables.install_dockermap(&m) {
404 tracing::error!(mapping = ?m, error = %e, "failed to install mapping");
405 state.ports.deallocate(host_addr).await;
406 continue;
407 }
408 assigned.push(m);
409 }
410 let mut lock = state.daemon_state.write().await;
411 let existing = lock.mapping.entry(container_id.clone()).or_default();
412 let auto_comments: HashSet<String> =
413 assigned.iter().map(|m| m.rule_comment.clone()).collect();
414 existing.retain(|m| !auto_comments.contains(&m.rule_comment));
415 existing.extend(assigned);
416 drop(lock);
417 state.persist().await;
418 }
419
420 fn create_daemon_state(&self) -> DaemonState {
422 if self.state.state_path.exists()
423 && let Ok(data) = fs::read_to_string(&self.state.state_path)
424 {
425 serde_json::from_str(&data).unwrap_or_default()
426 } else {
427 DaemonState::default()
428 }
429 }
430
431 async fn reconcile_docker_portmaps(&self, daemon_state: &mut DaemonState) -> Result<()> {
432 let state = &self.state;
433 let ports = &self.state.ports;
434 let iptables = &self.state.iptables;
435
436 if let Some(docker) = &state.docker {
437 let opt = ListContainersOptionsBuilder::new().build();
438 let running_ids: HashSet<String> = docker
439 .list_containers(Some(opt))
440 .await?
441 .into_iter()
442 .filter_map(|c| c.id)
443 .collect();
444
445 let mut max_id: u64 = 0;
446 let old_maps: Vec<(String, Vec<DockerPortMap>)> =
447 daemon_state.mapping.drain().collect();
448 let mut new_docker = HashMap::new();
449
450 for (id, maps) in old_maps {
452 if !running_ids.contains(&id) {
453 tracing::info!(container.id = %id, "container gone, removing mappings");
454 continue;
455 }
456 let mut kept = Vec::new();
457 for m in maps {
459 let host_addr = m.request.host_addr;
460
461 if ports.is_allocated(host_addr).await {
463 tracing::warn!(host.addr = %host_addr, "address already held, removing stale mapping");
464 continue;
465 }
466 if let Err(e) = ports.allocate(host_addr).await {
467 tracing::warn!(host.addr = %host_addr, error = %e, "address in use, dropping mapping");
468 continue;
469 }
470
471 let _ = iptables.install_dockermap(&m);
473 max_id = max_id.max(m.id);
474 kept.push(m);
475 }
476 if !kept.is_empty() {
477 new_docker.insert(id, kept);
478 }
479 }
480 daemon_state.mapping = new_docker;
481 state
482 .next_id
483 .store(max_id.saturating_add(1), Ordering::SeqCst);
484 }
485 Ok(())
486 }
487
488 async fn reconcile_hairpins(&self, daemon_state: &mut DaemonState) {
489 let mut keep = Vec::new();
490 for config in daemon_state.hairpins.drain(..) {
491 if Self::should_reconcile(&config.ports, &config.ext_ip, &self.state.ports).await {
492 let _ = self.state.iptables.install_hairpin(&config);
493 keep.push(config);
494 } else {
495 unbind_ports(self.state.ports.clone(), &config.ext_ip, &config.ports).await;
496 }
497 }
498 daemon_state.hairpins = keep;
499 }
500
501 async fn reconcile_dnats(&self, daemon_state: &mut DaemonState) {
502 let mut keep = Vec::new();
503 for config in daemon_state.dnats.drain(..) {
504 if Self::should_reconcile(&config.ports, &config.ext_ip, &self.state.ports).await {
505 let _ = self.state.iptables.install_dnat(&config);
506 keep.push(config);
507 } else {
508 unbind_ports(self.state.ports.clone(), &config.ext_ip, &config.ports).await;
509 }
510 }
511 daemon_state.dnats = keep;
512 }
513
514 async fn reconcile_snats(&self, daemon_state: &DaemonState) {
515 for config in &daemon_state.snats {
516 let _ = self.state.iptables.install_snat(config);
517 }
518 }
519
520 async fn reconcile_policy_routes(&self, daemon_state: &mut DaemonState) {
521 let mut keep = Vec::new();
522 for config in daemon_state.policy_routes.drain(..) {
523 if let Err(e) = self.state.policy_route.install(&config) {
524 tracing::error!(error = %e, "failed to install policy route");
525 } else {
526 keep.push(config);
527 }
528 }
529 daemon_state.policy_routes = keep;
530 }
531
532 async fn should_reconcile(configs_ports: &str, ext_ip: &str, ports: &PortAllocator) -> bool {
534 let ip = match ext_ip.parse() {
535 Ok(ip) => ip,
536 Err(e) => {
537 tracing::error!(ext.ip = %ext_ip, error = %e, "invalid IP");
538 return false;
539 }
540 };
541
542 for port in configs_ports
543 .split(',')
544 .filter_map(|p| p.trim().parse::<u16>().ok())
545 {
546 let addr = SocketAddr::new(ip, port);
547 if ports.is_allocated(addr).await {
548 continue;
549 }
550 if let Err(e) = ports.allocate(addr).await {
551 tracing::warn!(port = %port, error = %e, "port in use, dropping");
552 return false;
553 }
554 }
555 true
556 }
557}
558
559#[cfg(test)]
560mod tests {
561 use std::path::PathBuf;
562 use std::sync::Arc;
563 use std::sync::atomic::AtomicU64;
564
565 use axum::Router;
566 use bollard::Docker;
567 use bollard::models::EventActor;
568 use bollard::models::EventMessage;
569 use lab_ops_lab_lib::port::PortAllocator;
570 use tokio::sync::RwLock;
571 use tracing_test::traced_test;
572
573 use super::AppState;
574 use super::Daemon;
575 use crate::iptables::IptablesManager;
576 use crate::models::DaemonState;
577 use crate::policy_route::PolicyRouteManager;
578
579 fn create_test_daemon(state_path: PathBuf) -> Daemon {
580 let iptables = Arc::new(IptablesManager::new());
581 let ports = Arc::new(PortAllocator::new());
582 let daemon_state = Arc::new(RwLock::new(DaemonState::default()));
583 let policy_route = Arc::new(PolicyRouteManager::new());
584
585 let state = AppState {
586 daemon_state,
587 iptables,
588 policy_route,
589 docker: None,
590 state_path,
591 next_id: Arc::new(AtomicU64::new(1)),
592 ports,
593 socket_group: "root".to_string(),
594 socket_path: PathBuf::from("/tmp/natmap.sock"),
595 };
596
597 Daemon {
598 state,
599 app: Router::new(),
600 }
601 }
602
603 #[tokio::test]
604 #[traced_test]
605 async fn reload_state_logs_mapping_count() {
606 let temp_dir = tempfile::tempdir().unwrap();
607 let state_path = temp_dir.path().join("state.json");
608
609 let daemon = create_test_daemon(state_path);
610
611 let _ = daemon.reload().await;
612
613 assert!(logs_contain("mappings.count="));
614 }
615
616 #[tokio::test]
617 #[traced_test]
618 async fn handle_docker_event_span_has_container_id() {
619 let temp_dir = tempfile::tempdir().unwrap();
620 let state_path = temp_dir.path().join("state.json");
621
622 let daemon = create_test_daemon(state_path);
623 let docker = Docker::connect_with_local_defaults().unwrap();
624
625 let event = EventMessage {
626 action: Some("start".to_string()),
627 actor: Some(EventActor {
628 id: Some("1234567890".to_string()),
629 ..Default::default()
630 }),
631 typ: Some(bollard::plugin::EventMessageTypeEnum::CONTAINER),
632 ..Default::default()
633 };
634
635 daemon.handle_docker_event(event, &docker).await;
636
637 assert!(logs_contain("container.id=\"1234567890\""));
638 }
639}