Skip to main content

lab_ops_natmap/
daemon.rs

1//! Natmap daemon — HTTP API server over Unix socket.
2//!
3//! The daemon is the central authority for all iptables NAT rules. It:
4//!
5//! - Hosts an HTTP API on a Unix socket (`/run/natmap.sock`)
6//! - Auto-discovers Docker container ports on start/stop events
7//! - Persists state to JSON and recovers after crashes
8//! - Prevents port conflicts using [`PortAllocator`]
9
10use std::collections::HashMap;
11use std::collections::HashSet;
12use std::fs;
13use std::net::SocketAddr;
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::sync::atomic::AtomicU64;
17use std::sync::atomic::Ordering;
18
19use axum::Router;
20use axum::routing::delete;
21use axum::routing::get;
22use axum::routing::post;
23use axum::routing::put;
24use bollard::Docker;
25use bollard::query_parameters::EventsOptions;
26use bollard::query_parameters::ListContainersOptionsBuilder;
27use color_eyre::Result;
28use color_eyre::eyre::eyre;
29use futures_util::stream::StreamExt;
30use hyper_util::rt::TokioExecutor;
31use hyper_util::rt::TokioIo;
32use hyper_util::server::conn::auto::Builder;
33use lab_ops_lab_lib::port::PortAllocator;
34use serde::Serialize;
35use tokio::process::Command;
36use tokio::sync::RwLock;
37use tower_service::Service;
38use tracing::info;
39
40use crate::api::add_dnat;
41use crate::api::add_hairpin;
42use crate::api::add_mapping;
43use crate::api::add_policy_route;
44use crate::api::add_snat;
45use crate::api::clear_all;
46use crate::api::list_mappings;
47use crate::api::remap_port;
48use crate::api::remove_dnat;
49use crate::api::remove_hairpin;
50use crate::api::remove_mapping;
51use crate::api::remove_mapping_by_id;
52use crate::api::remove_policy_route;
53use crate::api::remove_snat;
54use crate::api::unbind_ports;
55use crate::docker;
56use crate::iptables::IptablesManager;
57use crate::models::DaemonState;
58use crate::models::DockerPortMap;
59use crate::policy_route::PolicyRouteManager;
60
61/// Shared application state held by all Axum route handlers.
62#[derive(Clone)]
63pub struct AppState {
64    /// The in-memory daemon state.
65    pub daemon_state: Arc<RwLock<DaemonState>>,
66    /// iptables rule manager.
67    pub iptables: Arc<IptablesManager>,
68    /// Policy routing manager.
69    pub policy_route: Arc<PolicyRouteManager>,
70    /// Docker client (None if Docker is unavailable).
71    pub docker: Option<Docker>,
72    /// Filesystem path for persisting state to JSON.
73    pub state_path: PathBuf,
74    /// Path to natmap socket.
75    pub socket_path: PathBuf,
76    /// Group name owning the natmap socket.
77    pub socket_group: String,
78    /// Auto-incrementing ID counter for mapping entries.
79    pub next_id: Arc<AtomicU64>,
80    /// Port reservation system for conflict prevention.
81    pub ports: Arc<PortAllocator>,
82}
83
84impl AppState {
85    /// Returns the next unique mapping ID and advances the counter.
86    pub fn allocate_id(&self) -> u64 {
87        self.next_id.fetch_add(1, Ordering::SeqCst)
88    }
89
90    /// Writes the current daemon state to disk (atomically via a temp file).
91    pub async fn persist(&self) {
92        let data = {
93            let lock = self.daemon_state.read().await;
94            serde_json::to_string(&*lock).unwrap_or_default()
95        };
96        let tmp = self.state_path.with_extension("tmp");
97        if fs::write(&tmp, data).is_ok() {
98            let _ = fs::rename(&tmp, &self.state_path);
99        }
100    }
101}
102
103/// JSON error response returned by the daemon API on failures.
104#[derive(Serialize)]
105pub struct ErrorResponse {
106    pub error: String,
107}
108
109#[derive(Clone)]
110pub struct Daemon {
111    state: AppState,
112    app: Router<()>,
113}
114
115impl Daemon {
116    pub async fn new(
117        socket_path: PathBuf,
118        state_path: PathBuf,
119        socket_group: String,
120    ) -> Result<Self> {
121        tracing::info!(daemon = "natmap", "starting natmap daemon");
122
123        let docker = docker::connect().ok();
124        if docker.is_none() {
125            tracing::info!(
126                "failed connecting to Docker daemon via Unix socket — running without Docker support"
127            );
128        }
129
130        let state_dir = state_path.parent().unwrap();
131        if !state_dir.exists() {
132            fs::create_dir_all(state_dir).map_err(|e| {
133                eyre!(
134                    "Failed to create state directory {}: {e}",
135                    state_dir.display()
136                )
137            })?;
138        }
139
140        let iptables = Arc::new(IptablesManager::new());
141        iptables
142            .setup()
143            .map_err(|e| eyre!("Failed to set up iptables chains: {e}"))?;
144
145        let ports = Arc::new(PortAllocator::new());
146        let daemon_state = Arc::new(RwLock::new(DaemonState::default()));
147        let policy_route = Arc::new(PolicyRouteManager::new());
148
149        let state = AppState {
150            daemon_state,
151            iptables,
152            policy_route,
153            docker,
154            state_path,
155            next_id: Arc::new(AtomicU64::new(1)),
156            ports,
157            socket_group,
158            socket_path,
159        };
160
161        let app = Router::new()
162            .route("/mappings", get(list_mappings))
163            .route("/remap/:container_id", put(remap_port))
164            .route("/mapping/:container_id", post(add_mapping))
165            .route("/mapping/{container_id}/{port}", delete(remove_mapping))
166            .route("/mapping/by-id/:id", delete(remove_mapping_by_id))
167            .route("/dnat", post(add_dnat))
168            .route("/dnat", delete(remove_dnat))
169            .route("/snat", post(add_snat))
170            .route("/snat", delete(remove_snat))
171            .route("/hairpin", post(add_hairpin))
172            .route("/hairpin", delete(remove_hairpin))
173            .route("/policy-route", post(add_policy_route))
174            .route("/policy-route", delete(remove_policy_route))
175            .route("/clear", delete(clear_all))
176            .with_state(state.clone());
177
178        Ok(Self { state, app })
179    }
180
181    /// Runs the natmap daemon with explicit paths for the socket, state file, and group.
182    ///
183    /// Sets up iptables chains, loads persisted state, spawns Docker event listeners,
184    /// installs a Ctrl-C handler for clean shutdown, and starts the HTTP API server.
185    #[tracing::instrument(skip_all, fields(daemon = "natmap", socket.path = %self.state.socket_path.display()))]
186    pub async fn run(&self) -> Result<()> {
187        self.reload().await?;
188
189        let state = self.state.clone();
190
191        if state.docker.is_some() {
192            let self_clone = self.clone();
193            tokio::spawn(async move {
194                if let Err(e) = self_clone.listen_docker_events().await {
195                    tracing::error!(error = %e, "docker listener exited with error");
196                }
197            });
198        }
199
200        tokio::spawn(async move {
201            tokio::signal::ctrl_c().await.ok();
202            tracing::info!("shutting down: flushing iptables rules");
203            let _ = state.iptables.flush_all_natmap();
204            let daemon_state = state.daemon_state.read().await;
205            let _ = state.policy_route.flush_all(&daemon_state.policy_routes);
206            drop(daemon_state);
207            state.ports.deallocate_all().await;
208            tracing::info!("shutdown complete");
209            std::process::exit(0);
210        });
211
212        if state.socket_path.exists() {
213            let _ = fs::remove_file(&state.socket_path);
214        }
215
216        let socket_path_str = state.socket_path.display().to_string();
217        let listener = tokio::net::UnixListener::bind(state.socket_path)
218            .map_err(|e| eyre!("Failed to bind Unix socket at {}: {e}", socket_path_str))?;
219
220        let _ = Command::new("chown")
221            .args([
222                format!("root:{}", state.socket_group),
223                socket_path_str.to_string(),
224            ])
225            .status()
226            .await;
227        let _ = Command::new("chmod")
228            .args(["660", &socket_path_str])
229            .status()
230            .await;
231
232        tracing::info!(socket.path = %socket_path_str, "listening on unix socket");
233
234        loop {
235            let (socket, _) = listener.accept().await?;
236            let app = self.app.clone();
237
238            tokio::spawn(async move {
239                let socket = TokioIo::new(socket);
240
241                let srv = hyper::service::service_fn(
242                    move |request: hyper::Request<hyper::body::Incoming>| app.clone().call(request),
243                );
244
245                if let Err(err) = Builder::new(TokioExecutor::new())
246                    .serve_connection_with_upgrades(socket, srv)
247                    .await
248                {
249                    tracing::error!(error = %err, "failed to serve connection");
250                }
251            });
252        }
253
254        #[allow(unreachable_code)]
255        Ok(())
256    }
257
258    /// Loads persisted state from disk and reconciles with the current system state.
259    ///
260    /// Flushes iptables rules, releases old port reservations, and re-installs
261    /// rules for surviving containers and static configurations.
262    #[tracing::instrument(skip_all, fields(mappings.count = tracing::field::Empty, dnats.count = tracing::field::Empty))]
263    pub async fn reload(&self) -> Result<()> {
264        info!("crash recovery: flushing stale iptables rules");
265        let state = self.state.clone();
266        let ports = self.state.ports.clone();
267        let iptables = self.state.iptables.clone();
268        let policy_route = self.state.policy_route.clone();
269
270        // ignore flush fail. we still have more cleanup to do independent from flush
271        let _ = iptables.flush_all_natmap();
272        let _ = policy_route.flush_all(&state.daemon_state.read().await.policy_routes);
273        ports.deallocate_all().await;
274
275        let mut daemon_state = self.create_daemon_state();
276
277        // Reconcile Docker mappings
278        let _ = self
279            .reconcile_docker_portmaps(&mut daemon_state)
280            .await
281            .map_err(|e| tracing::error!(error = %e, "error when reconciling docker portmaps"));
282
283        // Reconcile NAT rules
284        self.reconcile_hairpins(&mut daemon_state).await;
285        self.reconcile_dnats(&mut daemon_state).await;
286        self.reconcile_snats(&daemon_state).await;
287        self.reconcile_policy_routes(&mut daemon_state).await;
288
289        let mappings_count: usize = daemon_state.mapping.values().map(|m| m.len()).sum();
290        let dnats_count = daemon_state.dnats.len();
291
292        let span = tracing::Span::current();
293        span.record("mappings.count", mappings_count);
294        span.record("dnats.count", dnats_count);
295
296        *state.daemon_state.write().await = daemon_state;
297        self.state.persist().await;
298
299        tracing::info!("reload complete");
300        Ok(())
301    }
302
303    /// Handles a single Docker event, creating the required span.
304    #[tracing::instrument(skip_all, fields(container.id = tracing::field::Empty, event.action = tracing::field::Empty))]
305    pub async fn handle_docker_event(&self, event: bollard::models::EventMessage, docker: &Docker) {
306        tracing::trace!(?event, "raw docker event");
307
308        let Some(action) = event.action else {
309            return;
310        };
311        let Some(actor) = event.actor else {
312            return;
313        };
314        let Some(container_id) = actor.id else {
315            return;
316        };
317
318        use bollard::plugin::EventMessageTypeEnum::*;
319        let Some(typ) = event.typ else {
320            return;
321        };
322
323        let span = tracing::Span::current();
324        span.record("container.id", &container_id);
325        span.record("event.action", &action);
326
327        match (typ, action.as_str()) {
328            (CONTAINER, "start") | (NETWORK, "connect") => {
329                self.on_container_start(container_id, docker).await
330            }
331            (CONTAINER, "die" | "kill") | (NETWORK, "disconnect") => {
332                self.on_container_stop(container_id).await
333            }
334            _ => {}
335        }
336    }
337
338    /// Listens for Docker container events and automatically manages port mappings.
339    ///
340    /// On `start` / `network connect`: discovers published ports and installs rules.
341    /// On `die` / `kill` / `network disconnect`: removes all rules for the container.
342    async fn listen_docker_events(&self) -> Result<()> {
343        let docker = self
344            .state
345            .docker
346            .as_ref()
347            .ok_or_else(|| eyre!("Docker not available"))?;
348        let opts = EventsOptions {
349            since: None,
350            until: None,
351            filters: Some(
352                [("type".to_string(), vec!["container".to_string()])]
353                    .into_iter()
354                    .collect(),
355            ),
356        };
357        let mut stream = docker.events(Some(opts));
358
359        while let Some(msg) = stream.next().await {
360            let Ok(event) = msg else { continue };
361            self.handle_docker_event(event, docker).await;
362        }
363        Ok(())
364    }
365
366    async fn on_container_stop(&self, container_id: String) {
367        tracing::debug!("container died, flushing rules");
368        let state = &self.state;
369        let mut lock = state.daemon_state.write().await;
370
371        let Some(mappings) = lock.mapping.remove(&container_id) else {
372            return;
373        };
374
375        for m in &mappings {
376            let _ = state.iptables.remove_mapping(m);
377            state.ports.deallocate(m.request.host_addr).await;
378        }
379        drop(lock);
380        state.persist().await;
381    }
382
383    async fn on_container_start(&self, container_id: String, docker: &Docker) {
384        tracing::debug!("container started, parsing mappings");
385        let state = &self.state;
386
387        let Ok(discovered) = docker::get_port_mappings(docker, &container_id).await else {
388            return;
389        };
390
391        let mut assigned = Vec::new();
392        for mut m in discovered {
393            m.id = state.allocate_id();
394            let host_addr = m.request.host_addr;
395            if state.ports.is_allocated(host_addr).await {
396                tracing::warn!(host.addr = %host_addr, "address already allocated, skipping");
397                continue;
398            }
399            if let Err(e) = state.ports.allocate(host_addr).await {
400                tracing::warn!(host.addr = %host_addr, error = %e, "failed allocating, skipping");
401                continue;
402            }
403            if let Err(e) = state.iptables.install_dockermap(&m) {
404                tracing::error!(mapping = ?m, error = %e, "failed to install mapping");
405                state.ports.deallocate(host_addr).await;
406                continue;
407            }
408            assigned.push(m);
409        }
410        let mut lock = state.daemon_state.write().await;
411        let existing = lock.mapping.entry(container_id.clone()).or_default();
412        let auto_comments: HashSet<String> =
413            assigned.iter().map(|m| m.rule_comment.clone()).collect();
414        existing.retain(|m| !auto_comments.contains(&m.rule_comment));
415        existing.extend(assigned);
416        drop(lock);
417        state.persist().await;
418    }
419
420    /// Create daemon state from [`AppState::state_path`] if exists, otherwise create default.
421    fn create_daemon_state(&self) -> DaemonState {
422        if self.state.state_path.exists()
423            && let Ok(data) = fs::read_to_string(&self.state.state_path)
424        {
425            serde_json::from_str(&data).unwrap_or_default()
426        } else {
427            DaemonState::default()
428        }
429    }
430
431    async fn reconcile_docker_portmaps(&self, daemon_state: &mut DaemonState) -> Result<()> {
432        let state = &self.state;
433        let ports = &self.state.ports;
434        let iptables = &self.state.iptables;
435
436        if let Some(docker) = &state.docker {
437            let opt = ListContainersOptionsBuilder::new().build();
438            let running_ids: HashSet<String> = docker
439                .list_containers(Some(opt))
440                .await?
441                .into_iter()
442                .filter_map(|c| c.id)
443                .collect();
444
445            let mut max_id: u64 = 0;
446            let old_maps: Vec<(String, Vec<DockerPortMap>)> =
447                daemon_state.mapping.drain().collect();
448            let mut new_docker = HashMap::new();
449
450            // iter containers
451            for (id, maps) in old_maps {
452                if !running_ids.contains(&id) {
453                    tracing::info!(container.id = %id, "container gone, removing mappings");
454                    continue;
455                }
456                let mut kept = Vec::new();
457                // iter port mappings for this container
458                for m in maps {
459                    let host_addr = m.request.host_addr;
460
461                    // check this map
462                    if ports.is_allocated(host_addr).await {
463                        tracing::warn!(host.addr = %host_addr, "address already held, removing stale mapping");
464                        continue;
465                    }
466                    if let Err(e) = ports.allocate(host_addr).await {
467                        tracing::warn!(host.addr = %host_addr, error = %e, "address in use, dropping mapping");
468                        continue;
469                    }
470
471                    // keep this port mapping
472                    let _ = iptables.install_dockermap(&m);
473                    max_id = max_id.max(m.id);
474                    kept.push(m);
475                }
476                if !kept.is_empty() {
477                    new_docker.insert(id, kept);
478                }
479            }
480            daemon_state.mapping = new_docker;
481            state
482                .next_id
483                .store(max_id.saturating_add(1), Ordering::SeqCst);
484        }
485        Ok(())
486    }
487
488    async fn reconcile_hairpins(&self, daemon_state: &mut DaemonState) {
489        let mut keep = Vec::new();
490        for config in daemon_state.hairpins.drain(..) {
491            if Self::should_reconcile(&config.ports, &config.ext_ip, &self.state.ports).await {
492                let _ = self.state.iptables.install_hairpin(&config);
493                keep.push(config);
494            } else {
495                unbind_ports(self.state.ports.clone(), &config.ext_ip, &config.ports).await;
496            }
497        }
498        daemon_state.hairpins = keep;
499    }
500
501    async fn reconcile_dnats(&self, daemon_state: &mut DaemonState) {
502        let mut keep = Vec::new();
503        for config in daemon_state.dnats.drain(..) {
504            if Self::should_reconcile(&config.ports, &config.ext_ip, &self.state.ports).await {
505                let _ = self.state.iptables.install_dnat(&config);
506                keep.push(config);
507            } else {
508                unbind_ports(self.state.ports.clone(), &config.ext_ip, &config.ports).await;
509            }
510        }
511        daemon_state.dnats = keep;
512    }
513
514    async fn reconcile_snats(&self, daemon_state: &DaemonState) {
515        for config in &daemon_state.snats {
516            let _ = self.state.iptables.install_snat(config);
517        }
518    }
519
520    async fn reconcile_policy_routes(&self, daemon_state: &mut DaemonState) {
521        let mut keep = Vec::new();
522        for config in daemon_state.policy_routes.drain(..) {
523            if let Err(e) = self.state.policy_route.install(&config) {
524                tracing::error!(error = %e, "failed to install policy route");
525            } else {
526                keep.push(config);
527            }
528        }
529        daemon_state.policy_routes = keep;
530    }
531
532    /// Check if this port can be reconciled.
533    async fn should_reconcile(configs_ports: &str, ext_ip: &str, ports: &PortAllocator) -> bool {
534        let ip = match ext_ip.parse() {
535            Ok(ip) => ip,
536            Err(e) => {
537                tracing::error!(ext.ip = %ext_ip, error = %e, "invalid IP");
538                return false;
539            }
540        };
541
542        for port in configs_ports
543            .split(',')
544            .filter_map(|p| p.trim().parse::<u16>().ok())
545        {
546            let addr = SocketAddr::new(ip, port);
547            if ports.is_allocated(addr).await {
548                continue;
549            }
550            if let Err(e) = ports.allocate(addr).await {
551                tracing::warn!(port = %port, error = %e, "port in use, dropping");
552                return false;
553            }
554        }
555        true
556    }
557}
558
559#[cfg(test)]
560mod tests {
561    use std::path::PathBuf;
562    use std::sync::Arc;
563    use std::sync::atomic::AtomicU64;
564
565    use axum::Router;
566    use bollard::Docker;
567    use bollard::models::EventActor;
568    use bollard::models::EventMessage;
569    use lab_ops_lab_lib::port::PortAllocator;
570    use tokio::sync::RwLock;
571    use tracing_test::traced_test;
572
573    use super::AppState;
574    use super::Daemon;
575    use crate::iptables::IptablesManager;
576    use crate::models::DaemonState;
577    use crate::policy_route::PolicyRouteManager;
578
579    fn create_test_daemon(state_path: PathBuf) -> Daemon {
580        let iptables = Arc::new(IptablesManager::new());
581        let ports = Arc::new(PortAllocator::new());
582        let daemon_state = Arc::new(RwLock::new(DaemonState::default()));
583        let policy_route = Arc::new(PolicyRouteManager::new());
584
585        let state = AppState {
586            daemon_state,
587            iptables,
588            policy_route,
589            docker: None,
590            state_path,
591            next_id: Arc::new(AtomicU64::new(1)),
592            ports,
593            socket_group: "root".to_string(),
594            socket_path: PathBuf::from("/tmp/natmap.sock"),
595        };
596
597        Daemon {
598            state,
599            app: Router::new(),
600        }
601    }
602
603    #[tokio::test]
604    #[traced_test]
605    async fn reload_state_logs_mapping_count() {
606        let temp_dir = tempfile::tempdir().unwrap();
607        let state_path = temp_dir.path().join("state.json");
608
609        let daemon = create_test_daemon(state_path);
610
611        let _ = daemon.reload().await;
612
613        assert!(logs_contain("mappings.count="));
614    }
615
616    #[tokio::test]
617    #[traced_test]
618    async fn handle_docker_event_span_has_container_id() {
619        let temp_dir = tempfile::tempdir().unwrap();
620        let state_path = temp_dir.path().join("state.json");
621
622        let daemon = create_test_daemon(state_path);
623        let docker = Docker::connect_with_local_defaults().unwrap();
624
625        let event = EventMessage {
626            action: Some("start".to_string()),
627            actor: Some(EventActor {
628                id: Some("1234567890".to_string()),
629                ..Default::default()
630            }),
631            typ: Some(bollard::plugin::EventMessageTypeEnum::CONTAINER),
632            ..Default::default()
633        };
634
635        daemon.handle_docker_event(event, &docker).await;
636
637        assert!(logs_contain("container.id=\"1234567890\""));
638    }
639}