1use crate::{logs::LogEntry, process::PeerProcess, Error, Result};
8use bollard::{
9 container::{
10 Config, CreateContainerOptions, LogOutput, LogsOptions, RemoveContainerOptions,
11 StartContainerOptions, StopContainerOptions, UploadToContainerOptions,
12 },
13 exec::{CreateExecOptions, StartExecResults},
14 image::BuildImageOptions,
15 network::CreateNetworkOptions,
16 secret::{ContainerStateStatusEnum, HostConfig, Ipam, IpamConfig, PortBinding},
17 Docker,
18};
19use futures::StreamExt;
20use ipnetwork::Ipv4Network;
21use rand::Rng;
22use std::{
23 collections::HashMap,
24 net::Ipv4Addr,
25 path::{Path, PathBuf},
26 time::Duration,
27};
28
29#[derive(Debug, Clone)]
31pub struct DockerNatConfig {
32 pub topology: NatTopology,
34 pub public_subnet: Ipv4Network,
36 pub private_subnet_base: Ipv4Addr,
38 pub cleanup_on_drop: bool,
40 pub name_prefix: String,
42 pub network_emulation: Option<NetworkEmulation>,
45}
46
47#[derive(Debug, Clone)]
66pub struct NetworkEmulation {
67 pub delay_ms: u32,
69 pub jitter_ms: u32,
71 pub loss_percent: f64,
73 pub loss_correlation: f64,
75}
76
77impl Default for NetworkEmulation {
78 fn default() -> Self {
79 Self {
80 delay_ms: 0,
81 jitter_ms: 0,
82 loss_percent: 0.0,
83 loss_correlation: 25.0, }
85 }
86}
87
88impl NetworkEmulation {
89 pub fn lan() -> Self {
91 Self {
92 delay_ms: 1,
93 jitter_ms: 1,
94 loss_percent: 0.0,
95 ..Default::default()
96 }
97 }
98
99 pub fn regional() -> Self {
101 Self {
102 delay_ms: 40,
103 jitter_ms: 10,
104 loss_percent: 0.1,
105 ..Default::default()
106 }
107 }
108
109 pub fn intercontinental() -> Self {
111 Self {
112 delay_ms: 125,
113 jitter_ms: 25,
114 loss_percent: 0.5,
115 ..Default::default()
116 }
117 }
118
119 pub fn high_latency() -> Self {
122 Self {
123 delay_ms: 200,
124 jitter_ms: 30,
125 loss_percent: 1.0,
126 ..Default::default()
127 }
128 }
129
130 pub fn challenging() -> Self {
132 Self {
133 delay_ms: 150,
134 jitter_ms: 50,
135 loss_percent: 3.0,
136 loss_correlation: 50.0,
137 }
138 }
139}
140
141impl Default for DockerNatConfig {
142 fn default() -> Self {
143 let timestamp = chrono::Utc::now().format("%Y%m%d-%H%M%S").to_string();
145 let random_id = rand::thread_rng().gen::<u16>();
146 let name_prefix = format!("freenet-nat-{}-{}", timestamp, random_id);
147
148 let second_octet = rand::thread_rng().gen_range(16..=31);
154 let public_subnet = format!("172.{}.0.0/16", second_octet).parse().unwrap();
155
156 let private_first_octet = rand::thread_rng().gen_range(1..=250);
159
160 let network_emulation =
162 if let Ok(emulation) = std::env::var("FREENET_TEST_NETWORK_EMULATION") {
163 match emulation.to_lowercase().as_str() {
164 "lan" => Some(NetworkEmulation::lan()),
165 "regional" => Some(NetworkEmulation::regional()),
166 "intercontinental" => Some(NetworkEmulation::intercontinental()),
167 "high_latency" => Some(NetworkEmulation::high_latency()),
168 "challenging" => Some(NetworkEmulation::challenging()),
169 other => {
170 tracing::warn!(
171 "Unknown FREENET_TEST_NETWORK_EMULATION value '{}', ignoring. \
172 Valid options: lan, regional, intercontinental, high_latency, challenging",
173 other
174 );
175 None
176 }
177 }
178 } else {
179 None
180 };
181
182 Self {
183 topology: NatTopology::OnePerNat,
184 public_subnet,
185 private_subnet_base: Ipv4Addr::new(10, private_first_octet, 0, 0),
186 cleanup_on_drop: true,
187 name_prefix,
188 network_emulation,
189 }
190 }
191}
192
193#[derive(Debug, Clone)]
195pub enum NatTopology {
196 OnePerNat,
198 Custom(Vec<NatNetwork>),
200}
201
202#[derive(Debug, Clone)]
204pub struct NatNetwork {
205 pub name: String,
206 pub peer_indices: Vec<usize>,
207 pub nat_type: NatType,
208}
209
210#[derive(Debug, Clone, Default)]
212pub enum NatType {
213 #[default]
215 RestrictedCone,
216 FullCone { forwarded_ports: Option<Vec<u16>> },
218}
219
220pub struct DockerNatBackend {
222 docker: Docker,
223 config: DockerNatConfig,
224 networks: Vec<String>,
226 containers: Vec<String>,
228 peer_containers: HashMap<usize, DockerPeerInfo>,
230 public_network_id: Option<String>,
232}
233
234#[derive(Debug, Clone)]
236pub struct DockerPeerInfo {
237 pub container_id: String,
238 pub container_name: String,
239 pub private_ip: Ipv4Addr,
241 pub public_ip: Ipv4Addr,
243 pub host_ws_port: u16,
245 pub network_port: u16,
247 pub is_gateway: bool,
249 pub nat_router_id: Option<String>,
251}
252
253pub struct DockerProcess {
255 docker: Docker,
256 container_id: String,
257 container_name: String,
258 local_log_cache: PathBuf,
259}
260
261impl PeerProcess for DockerProcess {
262 fn is_running(&self) -> bool {
263 let docker = self.docker.clone();
265 let id = self.container_id.clone();
266
267 tokio::task::block_in_place(|| {
268 tokio::runtime::Handle::current().block_on(async {
269 match docker.inspect_container(&id, None).await {
270 Ok(info) => info
271 .state
272 .and_then(|s| s.status)
273 .map(|s| s == ContainerStateStatusEnum::RUNNING)
274 .unwrap_or(false),
275 Err(_) => false,
276 }
277 })
278 })
279 }
280
281 fn kill(&mut self) -> Result<()> {
282 let docker = self.docker.clone();
283 let id = self.container_id.clone();
284
285 tokio::task::block_in_place(|| {
286 tokio::runtime::Handle::current().block_on(async {
287 let _ = docker
289 .stop_container(&id, Some(StopContainerOptions { t: 5 }))
290 .await;
291 Ok(())
292 })
293 })
294 }
295
296 fn log_path(&self) -> PathBuf {
297 self.local_log_cache.clone()
298 }
299
300 fn read_logs(&self) -> Result<Vec<LogEntry>> {
301 let docker = self.docker.clone();
302 let id = self.container_id.clone();
303 let cache_path = self.local_log_cache.clone();
304
305 tokio::task::block_in_place(|| {
306 tokio::runtime::Handle::current().block_on(async {
307 let options = LogsOptions::<String> {
309 stdout: true,
310 stderr: true,
311 timestamps: true,
312 ..Default::default()
313 };
314
315 let mut logs = docker.logs(&id, Some(options));
316 let mut log_content = String::new();
317
318 while let Some(log_result) = logs.next().await {
319 match log_result {
320 Ok(LogOutput::StdOut { message }) | Ok(LogOutput::StdErr { message }) => {
321 log_content.push_str(&String::from_utf8_lossy(&message));
322 }
323 _ => {}
324 }
325 }
326
327 if let Some(parent) = cache_path.parent() {
329 std::fs::create_dir_all(parent)?;
330 }
331 std::fs::write(&cache_path, &log_content)?;
332
333 crate::logs::read_log_file(&cache_path)
335 })
336 })
337 }
338}
339
340impl Drop for DockerProcess {
341 fn drop(&mut self) {
342 let _ = self.kill();
343 }
344}
345
346impl DockerNatBackend {
347 pub async fn new(config: DockerNatConfig) -> Result<Self> {
349 let docker = Docker::connect_with_local_defaults()
350 .map_err(|e| Error::Other(anyhow::anyhow!("Failed to connect to Docker: {}", e)))?;
351
352 docker
354 .ping()
355 .await
356 .map_err(|e| Error::Other(anyhow::anyhow!("Docker ping failed: {}", e)))?;
357
358 Self::cleanup_stale_resources(&docker, Duration::from_secs(10)).await?;
364
365 Ok(Self {
366 docker,
367 config,
368 networks: Vec::new(),
369 containers: Vec::new(),
370 peer_containers: HashMap::new(),
371 public_network_id: None,
372 })
373 }
374
375 async fn cleanup_stale_resources(docker: &Docker, max_age: Duration) -> Result<()> {
381 use bollard::container::ListContainersOptions;
382 use bollard::network::ListNetworksOptions;
383
384 let now = std::time::SystemTime::now();
385 let now_secs = now.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() as i64;
386 let cutoff = if max_age.is_zero() {
388 i64::MAX } else {
390 now_secs - max_age.as_secs() as i64
391 };
392
393 if max_age.is_zero() {
394 tracing::debug!("Cleaning up ALL freenet-nat resources");
395 } else {
396 tracing::debug!(
397 "Cleaning up freenet-nat resources older than {} seconds",
398 max_age.as_secs()
399 );
400 }
401
402 let mut filters = HashMap::new();
404 filters.insert("name".to_string(), vec!["freenet-nat-".to_string()]);
405
406 let options = ListContainersOptions {
407 all: true,
408 filters,
409 ..Default::default()
410 };
411
412 match docker.list_containers(Some(options)).await {
413 Ok(containers) => {
414 let mut removed_count = 0;
415 for container in containers {
416 if let Some(name) = container.names.and_then(|n| n.first().cloned()) {
418 if let Some(created) = container.created {
419 if created < cutoff {
420 if let Some(id) = container.id {
421 tracing::info!(
422 "Removing stale container: {} (age: {}s)",
423 name,
424 now.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs()
425 as i64
426 - created
427 );
428 let _ = docker
429 .stop_container(&id, Some(StopContainerOptions { t: 2 }))
430 .await;
431 let _ = docker
432 .remove_container(
433 &id,
434 Some(RemoveContainerOptions {
435 force: true,
436 ..Default::default()
437 }),
438 )
439 .await;
440 removed_count += 1;
441 }
442 }
443 }
444 }
445 }
446 if removed_count > 0 {
447 tracing::info!("Removed {} stale container(s)", removed_count);
448 }
449 }
450 Err(e) => {
451 tracing::warn!("Failed to list containers for cleanup: {}", e);
452 }
453 }
454
455 let mut filters = HashMap::new();
457 filters.insert("name".to_string(), vec!["freenet-nat-".to_string()]);
458
459 let options = ListNetworksOptions { filters };
460
461 match docker.list_networks(Some(options)).await {
462 Ok(networks) => {
463 let mut removed_count = 0;
464 for network in networks {
465 if let Some(name) = &network.name {
466 if name.starts_with("freenet-nat-") {
467 if let Some(timestamp_str) = name.strip_prefix("freenet-nat-") {
469 let parts: Vec<&str> = timestamp_str.split('-').collect();
471 if parts.len() >= 2 {
472 let date_time = format!("{}-{}", parts[0], parts[1]);
473 if let Ok(created_time) = chrono::NaiveDateTime::parse_from_str(
474 &date_time,
475 "%Y%m%d-%H%M%S",
476 ) {
477 let created_timestamp = created_time.and_utc().timestamp();
478 if created_timestamp < cutoff {
479 if let Some(id) = &network.id {
480 tracing::info!(
481 "Removing stale network: {} (age: {}s)",
482 name,
483 now.duration_since(std::time::UNIX_EPOCH)
484 .unwrap()
485 .as_secs()
486 as i64
487 - created_timestamp
488 );
489 let _ = docker.remove_network(id).await;
490 removed_count += 1;
491 }
492 }
493 }
494 }
495 }
496 }
497 }
498 }
499 if removed_count > 0 {
500 tracing::info!("Removed {} stale network(s)", removed_count);
501 }
502 }
503 Err(e) => {
504 tracing::warn!("Failed to list networks for cleanup: {}", e);
505 }
506 }
507
508 Ok(())
509 }
510
511 pub async fn create_public_network(&mut self) -> Result<String> {
516 const MAX_SUBNET_RETRIES: usize = 10;
517
518 for attempt in 0..MAX_SUBNET_RETRIES {
519 let network_name = format!("{}-public", self.config.name_prefix);
520
521 let options = CreateNetworkOptions {
522 name: network_name.clone(),
523 driver: "bridge".to_string(),
524 ipam: Ipam {
525 config: Some(vec![IpamConfig {
526 subnet: Some(self.config.public_subnet.to_string()),
527 ..Default::default()
528 }]),
529 ..Default::default()
530 },
531 ..Default::default()
532 };
533
534 match self.docker.create_network(options).await {
535 Ok(response) => {
536 let network_id = response.id;
537 self.networks.push(network_id.clone());
538 self.public_network_id = Some(network_id.clone());
539 tracing::info!(
540 "Created public network: {} ({}) with subnet {}",
541 network_name,
542 network_id,
543 self.config.public_subnet
544 );
545 return Ok(network_id);
546 }
547 Err(e) => {
548 let error_msg = e.to_string();
549 if error_msg.contains("Pool overlaps") {
550 let old_subnet = self.config.public_subnet;
552 let new_second_octet = rand::thread_rng().gen_range(16..=31);
553 self.config.public_subnet =
554 format!("172.{}.0.0/16", new_second_octet).parse().unwrap();
555 tracing::warn!(
556 "Subnet {} conflicts with existing network, retrying with {} (attempt {}/{})",
557 old_subnet,
558 self.config.public_subnet,
559 attempt + 1,
560 MAX_SUBNET_RETRIES
561 );
562 continue;
563 }
564 return Err(Error::Other(anyhow::anyhow!(
565 "Failed to create public network: {}",
566 e
567 )));
568 }
569 }
570 }
571
572 Err(Error::Other(anyhow::anyhow!(
573 "Failed to create public network after {} attempts due to subnet conflicts. \
574 This may indicate stale Docker networks. Try running: \
575 docker network ls | grep freenet-nat | awk '{{print $1}}' | xargs -r docker network rm",
576 MAX_SUBNET_RETRIES
577 )))
578 }
579
580 pub async fn create_nat_network(
582 &mut self,
583 peer_index: usize,
584 ) -> Result<(String, String, Ipv4Addr)> {
585 let network_name = format!("{}-nat-{}", self.config.name_prefix, peer_index);
588 let base = self.config.private_subnet_base.octets();
589 let subnet = Ipv4Network::new(
590 Ipv4Addr::new(base[0], base[1].wrapping_add(peer_index as u8), 0, 0),
591 24,
592 )
593 .map_err(|e| Error::Other(anyhow::anyhow!("Invalid subnet: {}", e)))?;
594
595 let options = CreateNetworkOptions {
596 name: network_name.clone(),
597 driver: "bridge".to_string(),
598 internal: true, ipam: Ipam {
600 config: Some(vec![IpamConfig {
601 subnet: Some(subnet.to_string()),
602 ..Default::default()
603 }]),
604 ..Default::default()
605 },
606 ..Default::default()
607 };
608
609 let response =
610 self.docker.create_network(options).await.map_err(|e| {
611 Error::Other(anyhow::anyhow!("Failed to create NAT network: {}", e))
612 })?;
613
614 let network_id = response.id;
615 self.networks.push(network_id.clone());
616
617 let router_name = format!("{}-router-{}", self.config.name_prefix, peer_index);
619 let public_network_id = self
620 .public_network_id
621 .as_ref()
622 .ok_or_else(|| Error::Other(anyhow::anyhow!("Public network not created yet")))?;
623
624 let router_public_ip = Ipv4Addr::new(
629 self.config.public_subnet.ip().octets()[0],
630 self.config.public_subnet.ip().octets()[1],
631 peer_index as u8, 100, );
634 let router_private_ip =
636 Ipv4Addr::new(base[0], base[1].wrapping_add(peer_index as u8), 0, 254);
637
638 let public_octets = self.config.public_subnet.ip().octets();
642 let public_pattern = format!("172\\.{}\\.", public_octets[1]);
643 let private_pattern = format!(" {}\\.", base[0]);
644 let peer_private_ip = Ipv4Addr::new(base[0], base[1].wrapping_add(peer_index as u8), 0, 2);
646
647 let dnat_rules = if std::env::var("FREENET_TEST_FULL_CONE_NAT").is_ok() {
663 format!(
666 "iptables -t nat -A PREROUTING -i $PUBLIC_IF -p udp --dport 31337 -j DNAT --to-destination {}:31337 && \
667 echo 'Full Cone NAT: DNAT rule added for port 31337 -> {}:31337' && ",
668 peer_private_ip, peer_private_ip
669 )
670 } else if std::env::var("FREENET_TEST_SYMMETRIC_NAT").is_ok() {
671 format!(
674 "iptables -t nat -A POSTROUTING -o $PUBLIC_IF -p udp -j MASQUERADE --random && \
675 echo 'Symmetric NAT: Random port mapping enabled (hole punching will fail)' && "
676 )
677 } else {
678 format!(
694 "echo 'Port-Restricted Cone NAT: EIM + port-restricted filtering' && \
695 iptables -t nat -A PREROUTING -i $PUBLIC_IF -p udp --dport 31337 -j DNAT --to-destination {}:31337 && \
696 iptables -t nat -A POSTROUTING -o $PUBLIC_IF -p udp --sport 31337 -j SNAT --to-source $PUBLIC_IP:31337 && ",
697 peer_private_ip
698 )
699 };
700
701 let router_config = Config {
702 image: Some("alpine:latest".to_string()),
703 hostname: Some(router_name.clone()),
704 cmd: Some(vec![
705 "sh".to_string(),
706 "-c".to_string(),
707 format!(
712 "apk add --no-cache iptables iproute2 > /dev/null 2>&1 && \
713 PUBLIC_IF=$(ip -o addr show | grep '{}' | awk '{{print $2}}') && \
714 PRIVATE_IF=$(ip -o addr show | grep '{}' | awk '{{print $2}}') && \
715 PUBLIC_IP=$(ip -o addr show dev $PUBLIC_IF | awk '/inet / {{split($4,a,\"/\"); print a[1]}}') && \
716 echo \"Public interface: $PUBLIC_IF ($PUBLIC_IP), Private interface: $PRIVATE_IF\" && \
717 {}iptables -t nat -A POSTROUTING -o $PUBLIC_IF -j MASQUERADE && \
718 iptables -A FORWARD -i $PRIVATE_IF -o $PUBLIC_IF -j ACCEPT && \
719 iptables -A FORWARD -i $PUBLIC_IF -o $PRIVATE_IF -j ACCEPT && \
720 echo 'NAT router ready' && \
721 tail -f /dev/null",
722 public_pattern, private_pattern, dnat_rules
723 ),
724 ]),
725 host_config: Some(HostConfig {
726 cap_add: Some(vec!["NET_ADMIN".to_string()]),
727 sysctls: Some(HashMap::from([
728 ("net.ipv4.ip_forward".to_string(), "1".to_string()),
729 ])),
730 ..Default::default()
731 }),
732 ..Default::default()
733 };
734
735 let router_id = self
736 .docker
737 .create_container(
738 Some(CreateContainerOptions {
739 name: router_name.clone(),
740 ..Default::default()
741 }),
742 router_config,
743 )
744 .await
745 .map_err(|e| Error::Other(anyhow::anyhow!("Failed to create NAT router: {}", e)))?
746 .id;
747
748 self.containers.push(router_id.clone());
749
750 let _ = self
752 .docker
753 .disconnect_network(
754 "bridge",
755 bollard::network::DisconnectNetworkOptions {
756 container: router_id.clone(),
757 force: true,
758 },
759 )
760 .await;
761
762 self.docker
764 .connect_network(
765 public_network_id,
766 bollard::network::ConnectNetworkOptions {
767 container: router_id.clone(),
768 endpoint_config: bollard::secret::EndpointSettings {
769 ipam_config: Some(bollard::secret::EndpointIpamConfig {
770 ipv4_address: Some(router_public_ip.to_string()),
771 ..Default::default()
772 }),
773 ..Default::default()
774 },
775 },
776 )
777 .await
778 .map_err(|e| {
779 Error::Other(anyhow::anyhow!(
780 "Failed to connect router to public network: {}",
781 e
782 ))
783 })?;
784
785 self.docker
787 .connect_network(
788 &network_id,
789 bollard::network::ConnectNetworkOptions {
790 container: router_id.clone(),
791 endpoint_config: bollard::secret::EndpointSettings {
792 ipam_config: Some(bollard::secret::EndpointIpamConfig {
793 ipv4_address: Some(router_private_ip.to_string()),
794 ..Default::default()
795 }),
796 ..Default::default()
797 },
798 },
799 )
800 .await
801 .map_err(|e| {
802 Error::Other(anyhow::anyhow!(
803 "Failed to connect router to private network: {}",
804 e
805 ))
806 })?;
807
808 self.docker
810 .start_container(&router_id, None::<StartContainerOptions<String>>)
811 .await
812 .map_err(|e| Error::Other(anyhow::anyhow!("Failed to start NAT router: {}", e)))?;
813
814 tokio::time::sleep(Duration::from_secs(2)).await;
816
817 tracing::info!(
818 "Created NAT network {} with router {} (public: {}, private: {})",
819 network_name,
820 router_name,
821 router_public_ip,
822 router_private_ip
823 );
824
825 Ok((network_id, router_id, router_public_ip))
826 }
827
828 pub async fn ensure_base_image(&self) -> Result<String> {
830 let image_name = "freenet-test-peer:latest";
831
832 if self.docker.inspect_image(image_name).await.is_ok() {
834 tracing::debug!("Base image {} already exists", image_name);
835 return Ok(image_name.to_string());
836 }
837
838 tracing::info!("Building base image {}...", image_name);
839
840 let dockerfile = r#"
842FROM ubuntu:24.04
843RUN apt-get update && \
844 apt-get install -y --no-install-recommends \
845 libssl3 \
846 ca-certificates \
847 iproute2 \
848 && rm -rf /var/lib/apt/lists/*
849RUN mkdir -p /data /config
850WORKDIR /app
851"#;
852
853 let mut tar_builder = tar::Builder::new(Vec::new());
855 let mut header = tar::Header::new_gnu();
856 header.set_path("Dockerfile")?;
857 header.set_size(dockerfile.len() as u64);
858 header.set_mode(0o644);
859 header.set_cksum();
860 tar_builder.append(&header, dockerfile.as_bytes())?;
861 let tar_data = tar_builder.into_inner()?;
862
863 let options = BuildImageOptions {
865 dockerfile: "Dockerfile",
866 t: image_name,
867 rm: true,
868 ..Default::default()
869 };
870
871 let mut build_stream = self
872 .docker
873 .build_image(options, None, Some(tar_data.into()));
874
875 while let Some(result) = build_stream.next().await {
876 match result {
877 Ok(info) => {
878 if let Some(stream) = info.stream {
879 tracing::debug!("Build: {}", stream.trim());
880 }
881 if let Some(error) = info.error {
882 return Err(Error::Other(anyhow::anyhow!(
883 "Image build error: {}",
884 error
885 )));
886 }
887 }
888 Err(e) => {
889 return Err(Error::Other(anyhow::anyhow!("Image build failed: {}", e)));
890 }
891 }
892 }
893
894 tracing::info!("Built base image {}", image_name);
895 Ok(image_name.to_string())
896 }
897
898 pub async fn copy_binary_to_container(
900 &self,
901 container_id: &str,
902 binary_path: &Path,
903 ) -> Result<()> {
904 let binary_data = std::fs::read(binary_path)?;
906
907 let mut tar_builder = tar::Builder::new(Vec::new());
909 let mut header = tar::Header::new_gnu();
910 header.set_path("freenet")?;
911 header.set_size(binary_data.len() as u64);
912 header.set_mode(0o755);
913 header.set_cksum();
914 tar_builder.append(&header, binary_data.as_slice())?;
915 let tar_data = tar_builder.into_inner()?;
916
917 self.docker
919 .upload_to_container(
920 container_id,
921 Some(UploadToContainerOptions {
922 path: "/app",
923 ..Default::default()
924 }),
925 tar_data.into(),
926 )
927 .await
928 .map_err(|e| Error::Other(anyhow::anyhow!("Failed to copy binary: {}", e)))?;
929
930 Ok(())
931 }
932
933 pub async fn create_gateway(
935 &mut self,
936 index: usize,
937 binary_path: &Path,
938 keypair_path: &Path,
939 public_key_path: &Path,
940 ws_port: u16,
941 network_port: u16,
942 run_root: &Path,
943 ) -> Result<(DockerPeerInfo, DockerProcess)> {
944 let container_name = format!("{}-gw-{}", self.config.name_prefix, index);
945 let image = self.ensure_base_image().await?;
946
947 let public_network_id = self
948 .public_network_id
949 .as_ref()
950 .ok_or_else(|| Error::Other(anyhow::anyhow!("Public network not created yet")))?;
951
952 let gateway_ip = Ipv4Addr::new(
954 self.config.public_subnet.ip().octets()[0],
955 self.config.public_subnet.ip().octets()[1],
956 0,
957 10 + index as u8,
958 );
959
960 let config = Config {
962 image: Some(image),
963 hostname: Some(container_name.clone()),
964 exposed_ports: Some(HashMap::from([(
965 format!("{}/tcp", ws_port),
966 HashMap::new(),
967 )])),
968 host_config: Some(HostConfig {
969 port_bindings: Some(HashMap::from([(
970 format!("{}/tcp", ws_port),
971 Some(vec![PortBinding {
972 host_ip: Some("0.0.0.0".to_string()),
973 host_port: None, }]),
975 )])),
976 cap_add: Some(vec!["NET_ADMIN".to_string()]),
977 ..Default::default()
978 }),
979 env: Some(vec![
980 format!("RUST_LOG={}", std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string())),
981 "RUST_BACKTRACE=1".to_string(),
982 ]),
983 cmd: Some(vec![
984 "/app/freenet".to_string(),
985 "network".to_string(),
986 "--data-dir".to_string(),
987 "/data".to_string(),
988 "--config-dir".to_string(),
989 "/config".to_string(),
990 "--ws-api-address".to_string(),
991 "0.0.0.0".to_string(),
992 "--ws-api-port".to_string(),
993 ws_port.to_string(),
994 "--network-address".to_string(),
995 "0.0.0.0".to_string(),
996 "--network-port".to_string(),
997 network_port.to_string(),
998 "--public-network-address".to_string(),
999 gateway_ip.to_string(),
1000 "--public-network-port".to_string(),
1001 network_port.to_string(),
1002 "--is-gateway".to_string(),
1003 "--skip-load-from-network".to_string(),
1004 "--transport-keypair".to_string(),
1005 "/config/keypair.pem".to_string(),
1006 ]),
1007 ..Default::default()
1008 };
1009
1010 let container_id = self
1011 .docker
1012 .create_container(
1013 Some(CreateContainerOptions {
1014 name: container_name.clone(),
1015 ..Default::default()
1016 }),
1017 config,
1018 )
1019 .await
1020 .map_err(|e| {
1021 Error::Other(anyhow::anyhow!("Failed to create gateway container: {}", e))
1022 })?
1023 .id;
1024
1025 self.containers.push(container_id.clone());
1026
1027 self.docker
1029 .connect_network(
1030 public_network_id,
1031 bollard::network::ConnectNetworkOptions {
1032 container: container_id.clone(),
1033 endpoint_config: bollard::secret::EndpointSettings {
1034 ipam_config: Some(bollard::secret::EndpointIpamConfig {
1035 ipv4_address: Some(gateway_ip.to_string()),
1036 ..Default::default()
1037 }),
1038 ..Default::default()
1039 },
1040 },
1041 )
1042 .await
1043 .map_err(|e| {
1044 Error::Other(anyhow::anyhow!(
1045 "Failed to connect gateway to network: {}",
1046 e
1047 ))
1048 })?;
1049
1050 self.copy_binary_to_container(&container_id, binary_path)
1052 .await?;
1053 self.copy_file_to_container(&container_id, keypair_path, "/config/keypair.pem")
1054 .await?;
1055 self.copy_file_to_container(&container_id, public_key_path, "/config/public_key.pem")
1056 .await?;
1057
1058 self.docker
1060 .start_container(&container_id, None::<StartContainerOptions<String>>)
1061 .await
1062 .map_err(|e| Error::Other(anyhow::anyhow!("Failed to start gateway: {}", e)))?;
1063
1064 self.apply_network_emulation(&container_id, &container_name)
1066 .await?;
1067
1068 let host_ws_port = self
1070 .get_container_host_port(&container_id, ws_port)
1071 .await?;
1072
1073 let info = DockerPeerInfo {
1074 container_id: container_id.clone(),
1075 container_name: container_name.clone(),
1076 private_ip: gateway_ip, public_ip: gateway_ip,
1078 host_ws_port,
1079 network_port,
1080 is_gateway: true,
1081 nat_router_id: None,
1082 };
1083
1084 self.peer_containers.insert(index, info.clone());
1085
1086 let local_log_cache = run_root.join(format!("gw{}", index)).join("peer.log");
1087
1088 tracing::info!(
1089 "Created gateway {} at {} (ws: localhost:{})",
1090 container_name,
1091 gateway_ip,
1092 host_ws_port
1093 );
1094
1095 Ok((
1096 info,
1097 DockerProcess {
1098 docker: self.docker.clone(),
1099 container_id,
1100 container_name,
1101 local_log_cache,
1102 },
1103 ))
1104 }
1105
1106 pub async fn create_peer(
1108 &mut self,
1109 index: usize,
1110 binary_path: &Path,
1111 keypair_path: &Path,
1112 public_key_path: &Path,
1113 gateways_toml_path: &Path,
1114 gateway_public_key_path: Option<&Path>,
1115 ws_port: u16,
1116 network_port: u16,
1117 run_root: &Path,
1118 ) -> Result<(DockerPeerInfo, DockerProcess)> {
1119 let container_name = format!("{}-peer-{}", self.config.name_prefix, index);
1120 let image = self.ensure_base_image().await?;
1121
1122 let (nat_network_id, router_id, router_public_ip) = self.create_nat_network(index).await?;
1124
1125 let base = self.config.private_subnet_base.octets();
1127 let private_ip = Ipv4Addr::new(base[0], base[1].wrapping_add(index as u8), 0, 2);
1128
1129 let config = Config {
1131 image: Some(image),
1132 hostname: Some(container_name.clone()),
1133 exposed_ports: Some(HashMap::from([(
1134 format!("{}/tcp", ws_port),
1135 HashMap::new(),
1136 )])),
1137 host_config: Some(HostConfig {
1138 port_bindings: Some(HashMap::from([(
1139 format!("{}/tcp", ws_port),
1140 Some(vec![PortBinding {
1141 host_ip: Some("0.0.0.0".to_string()),
1142 host_port: None, }]),
1144 )])),
1145 cap_add: Some(vec!["NET_ADMIN".to_string()]),
1146 ..Default::default()
1147 }),
1148 env: Some(vec![
1149 format!("RUST_LOG={}", std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string())),
1150 "RUST_BACKTRACE=1".to_string(),
1151 ]),
1152 cmd: Some(vec![
1153 "/app/freenet".to_string(),
1154 "network".to_string(),
1155 "--data-dir".to_string(),
1156 "/data".to_string(),
1157 "--config-dir".to_string(),
1158 "/config".to_string(),
1159 "--ws-api-address".to_string(),
1160 "0.0.0.0".to_string(),
1161 "--ws-api-port".to_string(),
1162 ws_port.to_string(),
1163 "--network-address".to_string(),
1164 "0.0.0.0".to_string(),
1165 "--network-port".to_string(),
1166 network_port.to_string(),
1167 "--skip-load-from-network".to_string(),
1169 "--transport-keypair".to_string(),
1170 "/config/keypair.pem".to_string(),
1171 ]),
1172 ..Default::default()
1173 };
1174
1175 let container_id = self
1176 .docker
1177 .create_container(
1178 Some(CreateContainerOptions {
1179 name: container_name.clone(),
1180 ..Default::default()
1181 }),
1182 config,
1183 )
1184 .await
1185 .map_err(|e| Error::Other(anyhow::anyhow!("Failed to create peer container: {}", e)))?
1186 .id;
1187
1188 self.containers.push(container_id.clone());
1189
1190 self.docker
1193 .connect_network(
1194 &nat_network_id,
1195 bollard::network::ConnectNetworkOptions {
1196 container: container_id.clone(),
1197 endpoint_config: bollard::secret::EndpointSettings {
1198 ipam_config: Some(bollard::secret::EndpointIpamConfig {
1199 ipv4_address: Some(private_ip.to_string()),
1200 ..Default::default()
1201 }),
1202 gateway: Some(
1203 Ipv4Addr::new(base[0], base[1].wrapping_add(index as u8), 0, 1)
1204 .to_string(),
1205 ),
1206 ..Default::default()
1207 },
1208 },
1209 )
1210 .await
1211 .map_err(|e| {
1212 Error::Other(anyhow::anyhow!(
1213 "Failed to connect peer to NAT network: {}",
1214 e
1215 ))
1216 })?;
1217
1218 self.copy_binary_to_container(&container_id, binary_path)
1220 .await?;
1221 self.copy_file_to_container(&container_id, keypair_path, "/config/keypair.pem")
1222 .await?;
1223 self.copy_file_to_container(&container_id, public_key_path, "/config/public_key.pem")
1224 .await?;
1225 self.copy_file_to_container(&container_id, gateways_toml_path, "/config/gateways.toml")
1226 .await?;
1227
1228 if let Some(gw_pubkey_path) = gateway_public_key_path {
1230 self.copy_file_to_container(&container_id, gw_pubkey_path, "/config/gw_public_key.pem")
1231 .await?;
1232 }
1233
1234 self.docker
1236 .start_container(&container_id, None::<StartContainerOptions<String>>)
1237 .await
1238 .map_err(|e| Error::Other(anyhow::anyhow!("Failed to start peer: {}", e)))?;
1239
1240 self.apply_network_emulation(&container_id, &container_name)
1242 .await?;
1243
1244 let host_ws_port = self
1246 .get_container_host_port(&container_id, ws_port)
1247 .await?;
1248
1249 let router_gateway = Ipv4Addr::new(base[0], base[1].wrapping_add(index as u8), 0, 254);
1252 let public_subnet = self.config.public_subnet;
1253 self.exec_in_container(
1254 &container_id,
1255 &[
1256 "sh",
1257 "-c",
1258 &format!("ip route add {} via {}", public_subnet, router_gateway),
1259 ],
1260 )
1261 .await?;
1262
1263 let info = DockerPeerInfo {
1264 container_id: container_id.clone(),
1265 container_name: container_name.clone(),
1266 private_ip,
1267 public_ip: router_public_ip,
1268 host_ws_port,
1269 network_port,
1270 is_gateway: false,
1271 nat_router_id: Some(router_id),
1272 };
1273
1274 self.peer_containers.insert(index, info.clone());
1275
1276 let local_log_cache = run_root.join(format!("peer{}", index)).join("peer.log");
1277
1278 tracing::info!(
1279 "Created peer {} at {} behind NAT {} (ws: localhost:{})",
1280 container_name,
1281 private_ip,
1282 router_public_ip,
1283 host_ws_port
1284 );
1285
1286 Ok((
1287 info,
1288 DockerProcess {
1289 docker: self.docker.clone(),
1290 container_id,
1291 container_name,
1292 local_log_cache,
1293 },
1294 ))
1295 }
1296
1297 pub async fn copy_file_to_container_pub(
1299 &self,
1300 container_id: &str,
1301 local_path: &Path,
1302 container_path: &str,
1303 ) -> Result<()> {
1304 self.copy_file_to_container(container_id, local_path, container_path)
1305 .await
1306 }
1307
1308 async fn copy_file_to_container(
1310 &self,
1311 container_id: &str,
1312 local_path: &Path,
1313 container_path: &str,
1314 ) -> Result<()> {
1315 let file_data = std::fs::read(local_path)?;
1316 let file_name = Path::new(container_path)
1317 .file_name()
1318 .ok_or_else(|| Error::Other(anyhow::anyhow!("Invalid container path")))?
1319 .to_str()
1320 .ok_or_else(|| Error::Other(anyhow::anyhow!("Invalid file name")))?;
1321
1322 let dir_path = Path::new(container_path)
1323 .parent()
1324 .ok_or_else(|| Error::Other(anyhow::anyhow!("Invalid container path")))?
1325 .to_str()
1326 .ok_or_else(|| Error::Other(anyhow::anyhow!("Invalid directory path")))?;
1327
1328 let mut tar_builder = tar::Builder::new(Vec::new());
1330 let mut header = tar::Header::new_gnu();
1331 header.set_path(file_name)?;
1332 header.set_size(file_data.len() as u64);
1333 header.set_mode(0o644);
1334 header.set_cksum();
1335 tar_builder.append(&header, file_data.as_slice())?;
1336 let tar_data = tar_builder.into_inner()?;
1337
1338 self.docker
1339 .upload_to_container(
1340 container_id,
1341 Some(UploadToContainerOptions {
1342 path: dir_path,
1343 ..Default::default()
1344 }),
1345 tar_data.into(),
1346 )
1347 .await
1348 .map_err(|e| Error::Other(anyhow::anyhow!("Failed to copy file: {}", e)))?;
1349
1350 Ok(())
1351 }
1352
1353 async fn exec_in_container(&self, container_id: &str, cmd: &[&str]) -> Result<String> {
1355 let exec = self
1356 .docker
1357 .create_exec(
1358 container_id,
1359 CreateExecOptions {
1360 cmd: Some(cmd.iter().map(|s| s.to_string()).collect()),
1361 attach_stdout: Some(true),
1362 attach_stderr: Some(true),
1363 ..Default::default()
1364 },
1365 )
1366 .await
1367 .map_err(|e| Error::Other(anyhow::anyhow!("Failed to create exec: {}", e)))?;
1368
1369 let output = self
1370 .docker
1371 .start_exec(&exec.id, None)
1372 .await
1373 .map_err(|e| Error::Other(anyhow::anyhow!("Failed to start exec: {}", e)))?;
1374
1375 let mut result = String::new();
1376 if let StartExecResults::Attached { mut output, .. } = output {
1377 while let Some(Ok(msg)) = output.next().await {
1378 match msg {
1379 LogOutput::StdOut { message } | LogOutput::StdErr { message } => {
1380 result.push_str(&String::from_utf8_lossy(&message));
1381 }
1382 _ => {}
1383 }
1384 }
1385 }
1386
1387 Ok(result)
1388 }
1389
1390 async fn apply_network_emulation(&self, container_id: &str, container_name: &str) -> Result<()> {
1395 let Some(ref emulation) = self.config.network_emulation else {
1396 return Ok(());
1397 };
1398
1399 if emulation.delay_ms == 0 && emulation.loss_percent == 0.0 {
1401 return Ok(());
1402 }
1403
1404 let mut tc_args = vec!["tc", "qdisc", "add", "dev", "eth0", "root", "netem"];
1407
1408 let delay_str;
1409 let jitter_str;
1410 let loss_str;
1411 let correlation_str;
1412
1413 if emulation.delay_ms > 0 {
1415 delay_str = format!("{}ms", emulation.delay_ms);
1416 tc_args.push("delay");
1417 tc_args.push(&delay_str);
1418
1419 if emulation.jitter_ms > 0 {
1420 jitter_str = format!("{}ms", emulation.jitter_ms);
1421 tc_args.push(&jitter_str);
1422 }
1423 }
1424
1425 if emulation.loss_percent > 0.0 {
1427 loss_str = format!("{:.2}%", emulation.loss_percent);
1428 tc_args.push("loss");
1429 tc_args.push(&loss_str);
1430
1431 if emulation.loss_correlation > 0.0 {
1432 correlation_str = format!("{:.0}%", emulation.loss_correlation);
1433 tc_args.push(&correlation_str);
1434 }
1435 }
1436
1437 tracing::info!(
1438 "Applying network emulation to {}: delay={}ms±{}ms, loss={:.2}%",
1439 container_name,
1440 emulation.delay_ms,
1441 emulation.jitter_ms,
1442 emulation.loss_percent
1443 );
1444
1445 let output = self.exec_in_container(container_id, &tc_args).await?;
1446
1447 if !output.is_empty() && output.contains("Error") {
1448 tracing::warn!(
1449 "Network emulation may have failed for {}: {}",
1450 container_name,
1451 output.trim()
1452 );
1453 } else {
1454 tracing::debug!(
1455 "Network emulation applied to {}: {:?}",
1456 container_name,
1457 tc_args
1458 );
1459 }
1460
1461 Ok(())
1462 }
1463
1464 pub async fn cleanup(&mut self) -> Result<()> {
1466 tracing::info!("Cleaning up Docker NAT resources...");
1467
1468 for container_id in self.containers.drain(..) {
1470 let _ = self
1471 .docker
1472 .stop_container(&container_id, Some(StopContainerOptions { t: 2 }))
1473 .await;
1474 let _ = self
1475 .docker
1476 .remove_container(
1477 &container_id,
1478 Some(RemoveContainerOptions {
1479 force: true,
1480 ..Default::default()
1481 }),
1482 )
1483 .await;
1484 }
1485
1486 for network_id in self.networks.drain(..) {
1488 let _ = self.docker.remove_network(&network_id).await;
1489 }
1490
1491 self.peer_containers.clear();
1492 self.public_network_id = None;
1493
1494 Ok(())
1495 }
1496
1497 pub fn get_peer_info(&self, index: usize) -> Option<&DockerPeerInfo> {
1499 self.peer_containers.get(&index)
1500 }
1501
1502 pub async fn dump_iptables_counters(&self) -> Result<std::collections::HashMap<usize, String>> {
1508 let mut results = std::collections::HashMap::new();
1509
1510 for (&peer_index, peer_info) in &self.peer_containers {
1511 if let Some(router_id) = &peer_info.nat_router_id {
1512 let mut output = String::new();
1513
1514 output.push_str("=== NAT table ===\n");
1516 match self.exec_in_container(router_id, &["iptables", "-t", "nat", "-nvL"]).await {
1517 Ok(s) => output.push_str(&s),
1518 Err(e) => output.push_str(&format!("Error: {}\n", e)),
1519 }
1520
1521 output.push_str("\n=== FORWARD chain ===\n");
1523 match self.exec_in_container(router_id, &["iptables", "-nvL", "FORWARD"]).await {
1524 Ok(s) => output.push_str(&s),
1525 Err(e) => output.push_str(&format!("Error: {}\n", e)),
1526 }
1527
1528 results.insert(peer_index, output);
1529 }
1530 }
1531
1532 Ok(results)
1533 }
1534
1535 pub async fn dump_conntrack_table(&self) -> Result<std::collections::HashMap<usize, String>> {
1540 let mut results = std::collections::HashMap::new();
1541
1542 for (&peer_index, peer_info) in &self.peer_containers {
1543 if let Some(router_id) = &peer_info.nat_router_id {
1544 let _ = self.exec_in_container(
1546 router_id,
1547 &["apk", "add", "--no-cache", "conntrack-tools"]
1548 ).await;
1549
1550 match self.exec_in_container(router_id, &["conntrack", "-L", "-p", "udp"]).await {
1552 Ok(s) if s.trim().is_empty() => {
1553 results.insert(peer_index, "(no UDP conntrack entries)".to_string());
1554 }
1555 Ok(s) => {
1556 results.insert(peer_index, s);
1557 }
1558 Err(e) => {
1559 results.insert(peer_index, format!("Error: {}", e));
1560 }
1561 }
1562 }
1563 }
1564
1565 Ok(results)
1566 }
1567
1568 pub async fn dump_peer_routes(&self) -> Result<std::collections::HashMap<usize, String>> {
1572 let mut results = std::collections::HashMap::new();
1573
1574 for (&peer_index, peer_info) in &self.peer_containers {
1575 if peer_info.nat_router_id.is_some() {
1576 match self.exec_in_container(&peer_info.container_id, &["ip", "route"]).await {
1578 Ok(s) => { results.insert(peer_index, s); }
1579 Err(e) => { results.insert(peer_index, format!("Error: {}", e)); }
1580 }
1581 }
1582 }
1583
1584 Ok(results)
1585 }
1586
1587 async fn get_container_host_port(&self, container_id: &str, container_port: u16) -> Result<u16> {
1594 let info = self
1595 .docker
1596 .inspect_container(container_id, None)
1597 .await
1598 .map_err(|e| {
1599 Error::Other(anyhow::anyhow!(
1600 "Failed to inspect container for port allocation: {}",
1601 e
1602 ))
1603 })?;
1604
1605 let port_key = format!("{}/tcp", container_port);
1606
1607 let host_port = info
1608 .network_settings
1609 .and_then(|ns| ns.ports)
1610 .and_then(|ports| ports.get(&port_key).cloned())
1611 .flatten()
1612 .and_then(|bindings| bindings.first().cloned())
1613 .and_then(|binding| binding.host_port)
1614 .and_then(|port_str| port_str.parse::<u16>().ok())
1615 .ok_or_else(|| {
1616 Error::Other(anyhow::anyhow!(
1617 "Failed to get allocated host port for container {} port {}",
1618 container_id,
1619 container_port
1620 ))
1621 })?;
1622
1623 Ok(host_port)
1624 }
1625}
1626
1627impl Drop for DockerNatBackend {
1628 fn drop(&mut self) {
1629 if self.config.cleanup_on_drop {
1630 tracing::info!("Cleaning up Docker NAT backend resources...");
1631
1632 let docker = self.docker.clone();
1634 let containers = std::mem::take(&mut self.containers);
1635 let networks = std::mem::take(&mut self.networks);
1636
1637 let cleanup = async {
1641 let container_futures = containers.into_iter().map(|container_id| {
1643 let docker = docker.clone();
1644 async move {
1645 if let Err(e) = docker
1646 .stop_container(&container_id, Some(StopContainerOptions { t: 2 }))
1647 .await
1648 {
1649 tracing::debug!("Failed to stop container {}: {}", container_id, e);
1650 }
1651 if let Err(e) = docker
1652 .remove_container(
1653 &container_id,
1654 Some(RemoveContainerOptions {
1655 force: true,
1656 ..Default::default()
1657 }),
1658 )
1659 .await
1660 {
1661 tracing::debug!("Failed to remove container {}: {}", container_id, e);
1662 }
1663 }
1664 });
1665
1666 futures::future::join_all(container_futures).await;
1668
1669 for network_id in networks {
1671 if let Err(e) = docker.remove_network(&network_id).await {
1672 tracing::debug!("Failed to remove network {}: {}", network_id, e);
1673 }
1674 }
1675
1676 tracing::info!("Docker NAT backend cleanup complete");
1677 };
1678
1679 if let Ok(handle) = tokio::runtime::Handle::try_current() {
1682 tokio::task::block_in_place(|| {
1683 handle.block_on(cleanup);
1684 });
1685 } else if let Ok(rt) = tokio::runtime::Runtime::new() {
1686 rt.block_on(cleanup);
1687 } else {
1688 tracing::error!("Failed to create runtime for cleanup");
1689 }
1690 }
1691 }
1692}