Skip to main content

lab_ops_natmap/
daemon.rs

1//! Natmap daemon — HTTP API server over Unix socket.
2//!
3//! The daemon is the central authority for all iptables NAT rules. It:
4//!
5//! - Hosts an HTTP API on a Unix socket (`/run/natmap.sock`)
6//! - Auto-discovers Docker container ports on start/stop events
7//! - Persists state to JSON and recovers after crashes
8//! - Prevents port conflicts using [`PortAllocator`]
9
10use std::collections::HashMap;
11use std::collections::HashSet;
12use std::fs;
13use std::net::SocketAddr;
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::sync::atomic::AtomicU64;
17use std::sync::atomic::Ordering;
18
19use axum::Router;
20use axum::routing::delete;
21use axum::routing::get;
22use axum::routing::post;
23use axum::routing::put;
24use bollard::Docker;
25use bollard::query_parameters::EventsOptions;
26use bollard::query_parameters::ListContainersOptionsBuilder;
27use color_eyre::Result;
28use color_eyre::eyre::eyre;
29use futures_util::stream::StreamExt;
30use hyper_util::rt::TokioExecutor;
31use hyper_util::rt::TokioIo;
32use hyper_util::server::conn::auto::Builder;
33use lab_ops_lab_lib::port::PortAllocator;
34use serde::Serialize;
35use tokio::process::Command;
36use tokio::sync::RwLock;
37use tower_service::Service;
38use tracing::info;
39
40use crate::api::add_dnat;
41use crate::api::add_hairpin;
42use crate::api::add_mapping;
43use crate::api::add_snat;
44use crate::api::clear_all;
45use crate::api::list_mappings;
46use crate::api::remap_port;
47use crate::api::remove_dnat;
48use crate::api::remove_hairpin;
49use crate::api::remove_mapping;
50use crate::api::remove_mapping_by_id;
51use crate::api::remove_snat;
52use crate::api::unbind_ports;
53use crate::docker;
54use crate::iptables::IptablesManager;
55use crate::models::DaemonState;
56use crate::models::DockerPortMap;
57
58/// Shared application state held by all Axum route handlers.
59#[derive(Clone)]
60pub struct AppState {
61    /// The in-memory daemon state.
62    pub daemon_state: Arc<RwLock<DaemonState>>,
63    /// iptables rule manager.
64    pub iptables: Arc<IptablesManager>,
65    /// Docker client (None if Docker is unavailable).
66    pub docker: Option<Docker>,
67    /// Filesystem path for persisting state to JSON.
68    pub state_path: PathBuf,
69    /// Path to natmap socket.
70    pub socket_path: PathBuf,
71    /// Group name owning the natmap socket.
72    pub socket_group: String,
73    /// Auto-incrementing ID counter for mapping entries.
74    pub next_id: Arc<AtomicU64>,
75    /// Port reservation system for conflict prevention.
76    pub ports: Arc<PortAllocator>,
77}
78
79impl AppState {
80    /// Returns the next unique mapping ID and advances the counter.
81    pub fn allocate_id(&self) -> u64 {
82        self.next_id.fetch_add(1, Ordering::SeqCst)
83    }
84
85    /// Writes the current daemon state to disk (atomically via a temp file).
86    pub async fn persist(&self) {
87        let data = {
88            let lock = self.daemon_state.read().await;
89            serde_json::to_string(&*lock).unwrap_or_default()
90        };
91        let tmp = self.state_path.with_extension("tmp");
92        if fs::write(&tmp, data).is_ok() {
93            let _ = fs::rename(&tmp, &self.state_path);
94        }
95    }
96}
97
98/// JSON error response returned by the daemon API on failures.
99#[derive(Serialize)]
100pub struct ErrorResponse {
101    pub error: String,
102}
103
104#[derive(Clone)]
105pub struct Daemon {
106    state: AppState,
107    app: Router<()>,
108}
109
110impl Daemon {
111    pub async fn new(
112        socket_path: PathBuf,
113        state_path: PathBuf,
114        socket_group: String,
115    ) -> Result<Self> {
116        tracing::info!(daemon = "natmap", "starting natmap daemon");
117
118        let docker = docker::connect().ok();
119        if docker.is_none() {
120            tracing::info!(
121                "failed connecting to Docker daemon via Unix socket — running without Docker support"
122            );
123        }
124
125        let state_dir = state_path.parent().unwrap();
126        if !state_dir.exists() {
127            fs::create_dir_all(state_dir).map_err(|e| {
128                eyre!(
129                    "Failed to create state directory {}: {e}",
130                    state_dir.display()
131                )
132            })?;
133        }
134
135        let iptables = Arc::new(IptablesManager::new());
136        iptables
137            .setup()
138            .map_err(|e| eyre!("Failed to set up iptables chains: {e}"))?;
139
140        let ports = Arc::new(PortAllocator::new());
141        let daemon_state = Arc::new(RwLock::new(DaemonState::default()));
142
143        let state = AppState {
144            daemon_state,
145            iptables,
146            docker,
147            state_path,
148            next_id: Arc::new(AtomicU64::new(1)),
149            ports,
150            socket_group,
151            socket_path,
152        };
153
154        let app = Router::new()
155            .route("/mappings", get(list_mappings))
156            .route("/remap/:container_id", put(remap_port))
157            .route("/mapping/:container_id", post(add_mapping))
158            .route("/mapping/{container_id}/{port}", delete(remove_mapping))
159            .route("/mapping/by-id/:id", delete(remove_mapping_by_id))
160            .route("/dnat", post(add_dnat))
161            .route("/dnat", delete(remove_dnat))
162            .route("/snat", post(add_snat))
163            .route("/snat", delete(remove_snat))
164            .route("/hairpin", post(add_hairpin))
165            .route("/hairpin", delete(remove_hairpin))
166            .route("/clear", delete(clear_all))
167            .with_state(state.clone());
168
169        Ok(Self { state, app })
170    }
171
172    /// Runs the natmap daemon with explicit paths for the socket, state file, and group.
173    ///
174    /// Sets up iptables chains, loads persisted state, spawns Docker event listeners,
175    /// installs a Ctrl-C handler for clean shutdown, and starts the HTTP API server.
176    #[tracing::instrument(skip_all, fields(daemon = "natmap", socket.path = %self.state.socket_path.display()))]
177    pub async fn run(&self) -> Result<()> {
178        self.reload().await?;
179
180        let state = self.state.clone();
181
182        if state.docker.is_some() {
183            let self_clone = self.clone();
184            tokio::spawn(async move {
185                if let Err(e) = self_clone.listen_docker_events().await {
186                    tracing::error!(error = %e, "docker listener exited with error");
187                }
188            });
189        }
190
191        tokio::spawn(async move {
192            tokio::signal::ctrl_c().await.ok();
193            tracing::info!("shutting down: flushing iptables rules");
194            let _ = state.iptables.flush_all_natmap();
195            state.ports.deallocate_all().await;
196            tracing::info!("shutdown complete");
197            std::process::exit(0);
198        });
199
200        if state.socket_path.exists() {
201            let _ = fs::remove_file(&state.socket_path);
202        }
203
204        let socket_path_str = state.socket_path.display().to_string();
205        let listener = tokio::net::UnixListener::bind(state.socket_path)
206            .map_err(|e| eyre!("Failed to bind Unix socket at {}: {e}", socket_path_str))?;
207
208        let _ = Command::new("chown")
209            .args([
210                format!("root:{}", state.socket_group),
211                socket_path_str.to_string(),
212            ])
213            .status()
214            .await;
215        let _ = Command::new("chmod")
216            .args(["660", &socket_path_str])
217            .status()
218            .await;
219
220        tracing::info!(socket.path = %socket_path_str, "listening on unix socket");
221
222        loop {
223            let (socket, _) = listener.accept().await?;
224            let app = self.app.clone();
225
226            tokio::spawn(async move {
227                let socket = TokioIo::new(socket);
228
229                let srv = hyper::service::service_fn(
230                    move |request: hyper::Request<hyper::body::Incoming>| app.clone().call(request),
231                );
232
233                if let Err(err) = Builder::new(TokioExecutor::new())
234                    .serve_connection_with_upgrades(socket, srv)
235                    .await
236                {
237                    tracing::error!(error = %err, "failed to serve connection");
238                }
239            });
240        }
241
242        #[allow(unreachable_code)]
243        Ok(())
244    }
245
246    /// Loads persisted state from disk and reconciles with the current system state.
247    ///
248    /// Flushes iptables rules, releases old port reservations, and re-installs
249    /// rules for surviving containers and static configurations.
250    #[tracing::instrument(skip_all, fields(mappings.count = tracing::field::Empty, dnats.count = tracing::field::Empty))]
251    pub async fn reload(&self) -> Result<()> {
252        info!("crash recovery: flushing stale iptables rules");
253        let state = self.state.clone();
254        let ports = self.state.ports.clone();
255        let iptables = self.state.iptables.clone();
256
257        // ignore flush fail. we still have more cleanup to do independent from flush
258        let _ = iptables.flush_all_natmap();
259        ports.deallocate_all().await;
260
261        let mut daemon_state = self.create_daemon_state();
262
263        // Reconcile Docker mappings
264        let _ = self
265            .reconcile_docker_portmaps(&mut daemon_state)
266            .await
267            .map_err(|e| tracing::error!(error = %e, "error when reconciling docker portmaps"));
268
269        // Reconcile NAT rules
270        self.reconcile_hairpins(&mut daemon_state).await;
271        self.reconcile_dnats(&mut daemon_state).await;
272        self.reconcile_snats(&daemon_state).await;
273
274        let mappings_count: usize = daemon_state.mapping.values().map(|m| m.len()).sum();
275        let dnats_count = daemon_state.dnats.len();
276
277        let span = tracing::Span::current();
278        span.record("mappings.count", mappings_count);
279        span.record("dnats.count", dnats_count);
280
281        *state.daemon_state.write().await = daemon_state;
282        self.state.persist().await;
283
284        tracing::info!("reload complete");
285        Ok(())
286    }
287
288    /// Handles a single Docker event, creating the required span.
289    #[tracing::instrument(skip_all, fields(container.id = tracing::field::Empty, event.action = tracing::field::Empty))]
290    pub async fn handle_docker_event(&self, event: bollard::models::EventMessage, docker: &Docker) {
291        tracing::trace!(?event, "raw docker event");
292
293        let Some(action) = event.action else {
294            return;
295        };
296        let Some(actor) = event.actor else {
297            return;
298        };
299        let Some(container_id) = actor.id else {
300            return;
301        };
302
303        use bollard::plugin::EventMessageTypeEnum::*;
304        let Some(typ) = event.typ else {
305            return;
306        };
307
308        let span = tracing::Span::current();
309        span.record("container.id", &container_id);
310        span.record("event.action", &action);
311
312        match (typ, action.as_str()) {
313            (CONTAINER, "start") | (NETWORK, "connect") => {
314                self.on_container_start(container_id, docker).await
315            }
316            (CONTAINER, "die" | "kill") | (NETWORK, "disconnect") => {
317                self.on_container_stop(container_id).await
318            }
319            _ => {}
320        }
321    }
322
323    /// Listens for Docker container events and automatically manages port mappings.
324    ///
325    /// On `start` / `network connect`: discovers published ports and installs rules.
326    /// On `die` / `kill` / `network disconnect`: removes all rules for the container.
327    async fn listen_docker_events(&self) -> Result<()> {
328        let docker = self
329            .state
330            .docker
331            .as_ref()
332            .ok_or_else(|| eyre!("Docker not available"))?;
333        let opts = EventsOptions {
334            since: None,
335            until: None,
336            filters: Some(
337                [("type".to_string(), vec!["container".to_string()])]
338                    .into_iter()
339                    .collect(),
340            ),
341        };
342        let mut stream = docker.events(Some(opts));
343
344        while let Some(msg) = stream.next().await {
345            let Ok(event) = msg else { continue };
346            self.handle_docker_event(event, docker).await;
347        }
348        Ok(())
349    }
350
351    async fn on_container_stop(&self, container_id: String) {
352        tracing::debug!("container died, flushing rules");
353        let state = &self.state;
354        let mut lock = state.daemon_state.write().await;
355
356        let Some(mappings) = lock.mapping.remove(&container_id) else {
357            return;
358        };
359
360        for m in &mappings {
361            let _ = state.iptables.remove_mapping(m);
362            state.ports.deallocate(m.request.host_addr).await;
363        }
364        drop(lock);
365        state.persist().await;
366    }
367
368    async fn on_container_start(&self, container_id: String, docker: &Docker) {
369        tracing::debug!("container started, parsing mappings");
370        let state = &self.state;
371
372        let Ok(discovered) = docker::get_port_mappings(docker, &container_id).await else {
373            return;
374        };
375
376        let mut assigned = Vec::new();
377        for mut m in discovered {
378            m.id = state.allocate_id();
379            let host_addr = m.request.host_addr;
380            if state.ports.is_allocated(host_addr).await {
381                tracing::warn!(host.addr = %host_addr, "address already allocated, skipping");
382                continue;
383            }
384            if let Err(e) = state.ports.allocate(host_addr).await {
385                tracing::warn!(host.addr = %host_addr, error = %e, "failed allocating, skipping");
386                continue;
387            }
388            if let Err(e) = state.iptables.install_dockermap(&m) {
389                tracing::error!(mapping = ?m, error = %e, "failed to install mapping");
390                state.ports.deallocate(host_addr).await;
391                continue;
392            }
393            assigned.push(m);
394        }
395        let mut lock = state.daemon_state.write().await;
396        let existing = lock.mapping.entry(container_id.clone()).or_default();
397        let auto_comments: HashSet<String> =
398            assigned.iter().map(|m| m.rule_comment.clone()).collect();
399        existing.retain(|m| !auto_comments.contains(&m.rule_comment));
400        existing.extend(assigned);
401        drop(lock);
402        state.persist().await;
403    }
404
405    /// Create daemon state from [`AppState::state_path`] if exists, otherwise create default.
406    fn create_daemon_state(&self) -> DaemonState {
407        if self.state.state_path.exists()
408            && let Ok(data) = fs::read_to_string(&self.state.state_path)
409        {
410            serde_json::from_str(&data).unwrap_or_default()
411        } else {
412            DaemonState::default()
413        }
414    }
415
416    async fn reconcile_docker_portmaps(&self, daemon_state: &mut DaemonState) -> Result<()> {
417        let state = &self.state;
418        let ports = &self.state.ports;
419        let iptables = &self.state.iptables;
420
421        if let Some(docker) = &state.docker {
422            let opt = ListContainersOptionsBuilder::new().build();
423            let running_ids: HashSet<String> = docker
424                .list_containers(Some(opt))
425                .await?
426                .into_iter()
427                .filter_map(|c| c.id)
428                .collect();
429
430            let mut max_id: u64 = 0;
431            let old_maps: Vec<(String, Vec<DockerPortMap>)> =
432                daemon_state.mapping.drain().collect();
433            let mut new_docker = HashMap::new();
434
435            // iter containers
436            for (id, maps) in old_maps {
437                if !running_ids.contains(&id) {
438                    tracing::info!(container.id = %id, "container gone, removing mappings");
439                    continue;
440                }
441                let mut kept = Vec::new();
442                // iter port mappings for this container
443                for m in maps {
444                    let host_addr = m.request.host_addr;
445
446                    // check this map
447                    if ports.is_allocated(host_addr).await {
448                        tracing::warn!(host.addr = %host_addr, "address already held, removing stale mapping");
449                        continue;
450                    }
451                    if let Err(e) = ports.allocate(host_addr).await {
452                        tracing::warn!(host.addr = %host_addr, error = %e, "address in use, dropping mapping");
453                        continue;
454                    }
455
456                    // keep this port mapping
457                    let _ = iptables.install_dockermap(&m);
458                    max_id = max_id.max(m.id);
459                    kept.push(m);
460                }
461                if !kept.is_empty() {
462                    new_docker.insert(id, kept);
463                }
464            }
465            daemon_state.mapping = new_docker;
466            state
467                .next_id
468                .store(max_id.saturating_add(1), Ordering::SeqCst);
469        }
470        Ok(())
471    }
472
473    async fn reconcile_hairpins(&self, daemon_state: &mut DaemonState) {
474        let mut keep = Vec::new();
475        for config in daemon_state.hairpins.drain(..) {
476            if Self::should_reconcile(&config.ports, &config.ext_ip, &self.state.ports).await {
477                let _ = self.state.iptables.install_hairpin(&config);
478                keep.push(config);
479            } else {
480                unbind_ports(self.state.ports.clone(), &config.ext_ip, &config.ports).await;
481            }
482        }
483        daemon_state.hairpins = keep;
484    }
485
486    async fn reconcile_dnats(&self, daemon_state: &mut DaemonState) {
487        let mut keep = Vec::new();
488        for config in daemon_state.dnats.drain(..) {
489            if Self::should_reconcile(&config.ports, &config.ext_ip, &self.state.ports).await {
490                let _ = self.state.iptables.install_dnat(&config);
491                keep.push(config);
492            } else {
493                unbind_ports(self.state.ports.clone(), &config.ext_ip, &config.ports).await;
494            }
495        }
496        daemon_state.dnats = keep;
497    }
498
499    async fn reconcile_snats(&self, daemon_state: &DaemonState) {
500        for config in &daemon_state.snats {
501            let _ = self.state.iptables.install_snat(config);
502        }
503    }
504
505    /// Check if this port can be reconciled.
506    async fn should_reconcile(configs_ports: &str, ext_ip: &str, ports: &PortAllocator) -> bool {
507        let ip = match ext_ip.parse() {
508            Ok(ip) => ip,
509            Err(e) => {
510                tracing::error!(ext.ip = %ext_ip, error = %e, "invalid IP");
511                return false;
512            }
513        };
514
515        for port in configs_ports
516            .split(',')
517            .filter_map(|p| p.trim().parse::<u16>().ok())
518        {
519            let addr = SocketAddr::new(ip, port);
520            if ports.is_allocated(addr).await {
521                continue;
522            }
523            if let Err(e) = ports.allocate(addr).await {
524                tracing::warn!(port = %port, error = %e, "port in use, dropping");
525                return false;
526            }
527        }
528        true
529    }
530}
531
532#[cfg(test)]
533mod tests {
534    use std::path::PathBuf;
535    use std::sync::Arc;
536    use std::sync::atomic::AtomicU64;
537
538    use axum::Router;
539    use bollard::Docker;
540    use bollard::models::EventActor;
541    use bollard::models::EventMessage;
542    use lab_ops_lab_lib::port::PortAllocator;
543    use tokio::sync::RwLock;
544    use tracing_test::traced_test;
545
546    use super::AppState;
547    use super::Daemon;
548    use crate::iptables::IptablesManager;
549    use crate::models::DaemonState;
550
551    fn create_test_daemon(state_path: PathBuf) -> Daemon {
552        let iptables = Arc::new(IptablesManager::new());
553        let ports = Arc::new(PortAllocator::new());
554        let daemon_state = Arc::new(RwLock::new(DaemonState::default()));
555
556        let state = AppState {
557            daemon_state,
558            iptables,
559            docker: None,
560            state_path,
561            next_id: Arc::new(AtomicU64::new(1)),
562            ports,
563            socket_group: "root".to_string(),
564            socket_path: PathBuf::from("/tmp/natmap.sock"),
565        };
566
567        Daemon {
568            state,
569            app: Router::new(),
570        }
571    }
572
573    #[tokio::test]
574    #[traced_test]
575    async fn reload_state_logs_mapping_count() {
576        let temp_dir = tempfile::tempdir().unwrap();
577        let state_path = temp_dir.path().join("state.json");
578
579        let daemon = create_test_daemon(state_path);
580
581        let _ = daemon.reload().await;
582
583        assert!(logs_contain("mappings.count="));
584    }
585
586    #[tokio::test]
587    #[traced_test]
588    async fn handle_docker_event_span_has_container_id() {
589        let temp_dir = tempfile::tempdir().unwrap();
590        let state_path = temp_dir.path().join("state.json");
591
592        let daemon = create_test_daemon(state_path);
593        let docker = Docker::connect_with_local_defaults().unwrap();
594
595        let event = EventMessage {
596            action: Some("start".to_string()),
597            actor: Some(EventActor {
598                id: Some("1234567890".to_string()),
599                ..Default::default()
600            }),
601            typ: Some(bollard::plugin::EventMessageTypeEnum::CONTAINER),
602            ..Default::default()
603        };
604
605        daemon.handle_docker_event(event, &docker).await;
606
607        assert!(logs_contain("container.id=\"1234567890\""));
608    }
609}