1use crate::allocator::IpAllocator;
7use crate::allocator::{NodeSliceAllocator, NodeSliceAllocatorSnapshot};
8use crate::config::PeerInfo;
9use crate::dns::{peer_hostname, DnsConfig, DnsHandle, DnsServer, DEFAULT_DNS_PORT};
10use crate::error::{OverlayError, Result};
11#[cfg(feature = "nat")]
12use crate::nat::{Candidate, ConnectionType, NatTraversal, RelayServer};
13use crate::transport::OverlayTransport;
14use ipnet::IpNet;
15use serde::{Deserialize, Serialize};
16use std::net::{IpAddr, SocketAddr};
17use std::path::{Path, PathBuf};
18use std::time::Duration;
19use tracing::{debug, info, warn};
20
21#[cfg(target_os = "macos")]
26pub const DEFAULT_INTERFACE_NAME: &str = "utun";
27#[cfg(not(target_os = "macos"))]
28pub const DEFAULT_INTERFACE_NAME: &str = "zl-overlay0";
29
30pub use zlayer_core::DEFAULT_WG_PORT;
32
33pub const DEFAULT_OVERLAY_CIDR: &str = "10.200.0.0/16";
35
36pub const DEFAULT_OVERLAY_CIDR_V6: &str = "fd00:200::/48";
41
42pub const DEFAULT_KEEPALIVE_SECS: u16 = 25;
44
45pub const DEFAULT_SLICE_PREFIX: u8 = 28;
49
50mod option_ipnet_str {
54 use ipnet::IpNet;
55 use serde::{Deserialize, Deserializer, Serialize, Serializer};
56
57 #[allow(clippy::ref_option)]
58 pub fn serialize<S>(value: &Option<IpNet>, serializer: S) -> Result<S::Ok, S::Error>
59 where
60 S: Serializer,
61 {
62 value.map(|v| v.to_string()).serialize(serializer)
63 }
64
65 pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<IpNet>, D::Error>
66 where
67 D: Deserializer<'de>,
68 {
69 let opt = Option::<String>::deserialize(deserializer)?;
70 match opt {
71 None => Ok(None),
72 Some(s) => s
73 .parse::<IpNet>()
74 .map(Some)
75 .map_err(serde::de::Error::custom),
76 }
77 }
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct BootstrapConfig {
86 pub cidr: String,
88
89 pub node_ip: IpAddr,
91
92 pub interface: String,
94
95 pub port: u16,
97
98 pub private_key: String,
100
101 pub public_key: String,
103
104 pub is_leader: bool,
106
107 pub created_at: u64,
109
110 #[serde(default, with = "option_ipnet_str")]
114 pub slice_cidr: Option<IpNet>,
115}
116
117impl BootstrapConfig {
118 #[must_use]
125 pub fn allowed_ip(&self) -> String {
126 if let Some(slice) = self.slice_cidr {
127 return slice.to_string();
128 }
129 let prefix = match self.node_ip {
130 IpAddr::V4(_) => 32,
131 IpAddr::V6(_) => 128,
132 };
133 format!("{}/{}", self.node_ip, prefix)
134 }
135}
136
137#[derive(Debug, Clone, Serialize, Deserialize)]
139pub struct PeerConfig {
140 pub node_id: String,
142
143 pub public_key: String,
145
146 pub endpoint: String,
148
149 pub overlay_ip: IpAddr,
151
152 #[serde(default)]
154 pub keepalive: Option<u16>,
155
156 #[serde(default)]
160 pub hostname: Option<String>,
161
162 #[serde(default)]
164 #[cfg(feature = "nat")]
165 pub candidates: Vec<Candidate>,
166
167 #[serde(default)]
169 #[cfg(feature = "nat")]
170 pub connection_type: ConnectionType,
171
172 #[serde(default, with = "option_ipnet_str")]
175 pub slice_cidr: Option<IpNet>,
176}
177
178impl PeerConfig {
179 #[must_use]
181 pub fn new(node_id: String, public_key: String, endpoint: String, overlay_ip: IpAddr) -> Self {
182 Self {
183 node_id,
184 public_key,
185 endpoint,
186 overlay_ip,
187 keepalive: Some(DEFAULT_KEEPALIVE_SECS),
188 hostname: None,
189 #[cfg(feature = "nat")]
190 candidates: Vec::new(),
191 #[cfg(feature = "nat")]
192 connection_type: ConnectionType::default(),
193 slice_cidr: None,
194 }
195 }
196
197 #[must_use]
199 pub fn with_hostname(mut self, hostname: impl Into<String>) -> Self {
200 self.hostname = Some(hostname.into());
201 self
202 }
203
204 #[must_use]
210 pub fn with_slice_cidr(mut self, cidr: IpNet) -> Self {
211 self.slice_cidr = Some(cidr);
212 self
213 }
214
215 pub fn to_peer_info(&self) -> std::result::Result<PeerInfo, Box<dyn std::error::Error>> {
225 let endpoint: SocketAddr = self.endpoint.parse()?;
226 let keepalive =
227 Duration::from_secs(u64::from(self.keepalive.unwrap_or(DEFAULT_KEEPALIVE_SECS)));
228
229 let allowed_ips = if let Some(slice) = self.slice_cidr {
230 slice.to_string()
231 } else {
232 let prefix = match self.overlay_ip {
233 IpAddr::V4(_) => 32,
234 IpAddr::V6(_) => 128,
235 };
236 format!("{}/{}", self.overlay_ip, prefix)
237 };
238
239 Ok(PeerInfo::new(
240 self.public_key.clone(),
241 endpoint,
242 &allowed_ips,
243 keepalive,
244 ))
245 }
246}
247
248#[derive(Debug, Clone, Serialize, Deserialize)]
250pub struct BootstrapState {
251 pub config: BootstrapConfig,
253
254 pub peers: Vec<PeerConfig>,
256
257 #[serde(skip_serializing_if = "Option::is_none")]
259 pub allocator_state: Option<crate::allocator::IpAllocatorState>,
260
261 #[serde(default, skip_serializing_if = "Option::is_none")]
263 pub slice_allocator_state: Option<NodeSliceAllocatorSnapshot>,
264}
265
266pub struct OverlayBootstrap {
271 config: BootstrapConfig,
273
274 peers: Vec<PeerConfig>,
276
277 data_dir: PathBuf,
279
280 allocator: Option<IpAllocator>,
282
283 slice_allocator: Option<NodeSliceAllocator>,
288
289 dns_config: Option<DnsConfig>,
291
292 dns_handle: Option<DnsHandle>,
294
295 transport: Option<OverlayTransport>,
300
301 #[cfg(feature = "nat")]
303 nat_traversal: Option<NatTraversal>,
304
305 #[cfg(feature = "nat")]
307 relay_server: Option<RelayServer>,
308
309 #[cfg(feature = "nat")]
313 nat_config: Option<crate::nat::NatConfig>,
314}
315
316impl OverlayBootstrap {
317 pub async fn init_leader(cidr: &str, port: u16, data_dir: &Path) -> Result<Self> {
340 let config_path = data_dir.join("overlay_bootstrap.json");
342 if config_path.exists() {
343 return Err(OverlayError::AlreadyInitialized(
344 config_path.display().to_string(),
345 ));
346 }
347
348 tokio::fs::create_dir_all(data_dir).await?;
350
351 info!("Generating overlay keypair for leader");
353 let (private_key, public_key) = OverlayTransport::generate_keys()
354 .await
355 .map_err(|e| OverlayError::TransportCommand(e.to_string()))?;
356
357 let mut allocator = IpAllocator::new(cidr)?;
361 let _legacy_first = allocator.allocate_first()?;
362
363 let cluster_cidr: IpNet = cidr
367 .parse()
368 .map_err(|e: ipnet::AddrParseError| OverlayError::InvalidCidr(e.to_string()))?;
369 let mut slice_allocator = NodeSliceAllocator::new(cluster_cidr, DEFAULT_SLICE_PREFIX)?;
370 let leader_slice = slice_allocator.assign("leader")?;
371 let node_ip = leader_slice.hosts().next().unwrap_or_else(|| {
372 match leader_slice.network() {
376 IpAddr::V4(v4) => {
377 let bits = u32::from(v4).saturating_add(1);
378 IpAddr::V4(std::net::Ipv4Addr::from(bits))
379 }
380 IpAddr::V6(v6) => {
381 let bits = u128::from(v6).saturating_add(1);
382 IpAddr::V6(std::net::Ipv6Addr::from(bits))
383 }
384 }
385 });
386
387 info!(
388 node_ip = %node_ip,
389 cidr = cidr,
390 slice = %leader_slice,
391 "Allocated leader IP from slice"
392 );
393
394 let config = BootstrapConfig {
396 cidr: cidr.to_string(),
397 node_ip,
398 interface: DEFAULT_INTERFACE_NAME.to_string(),
399 port,
400 private_key,
401 public_key,
402 is_leader: true,
403 created_at: current_timestamp(),
404 slice_cidr: Some(leader_slice),
405 };
406
407 let bootstrap = Self {
408 config,
409 peers: Vec::new(),
410 data_dir: data_dir.to_path_buf(),
411 allocator: Some(allocator),
412 slice_allocator: Some(slice_allocator),
413 dns_config: None,
414 dns_handle: None,
415 transport: None,
416 #[cfg(feature = "nat")]
417 nat_traversal: None,
418 #[cfg(feature = "nat")]
419 relay_server: None,
420 #[cfg(feature = "nat")]
421 nat_config: None,
422 };
423
424 bootstrap.save().await?;
426
427 Ok(bootstrap)
428 }
429
430 #[allow(clippy::too_many_arguments)]
451 pub async fn join(
452 leader_cidr: &str,
453 leader_endpoint: &str,
454 leader_public_key: &str,
455 leader_overlay_ip: IpAddr,
456 allocated_ip: IpAddr,
457 port: u16,
458 slice_cidr: Option<IpNet>,
459 data_dir: &Path,
460 ) -> Result<Self> {
461 let config_path = data_dir.join("overlay_bootstrap.json");
463 if config_path.exists() {
464 return Err(OverlayError::AlreadyInitialized(
465 config_path.display().to_string(),
466 ));
467 }
468
469 tokio::fs::create_dir_all(data_dir).await?;
471
472 info!("Generating overlay keypair for joining node");
474 let (private_key, public_key) = OverlayTransport::generate_keys()
475 .await
476 .map_err(|e| OverlayError::TransportCommand(e.to_string()))?;
477
478 let config = BootstrapConfig {
480 cidr: leader_cidr.to_string(),
481 node_ip: allocated_ip,
482 interface: DEFAULT_INTERFACE_NAME.to_string(),
483 port,
484 private_key,
485 public_key,
486 is_leader: false,
487 created_at: current_timestamp(),
488 slice_cidr,
489 };
490
491 let leader_peer = PeerConfig {
493 node_id: "leader".to_string(),
494 public_key: leader_public_key.to_string(),
495 endpoint: leader_endpoint.to_string(),
496 overlay_ip: leader_overlay_ip,
497 keepalive: Some(DEFAULT_KEEPALIVE_SECS),
498 hostname: None, #[cfg(feature = "nat")]
500 candidates: Vec::new(),
501 #[cfg(feature = "nat")]
502 connection_type: ConnectionType::default(),
503 slice_cidr: None,
504 };
505
506 info!(
507 leader_endpoint = leader_endpoint,
508 overlay_ip = %allocated_ip,
509 "Configured leader as peer"
510 );
511
512 let bootstrap = Self {
513 config,
514 peers: vec![leader_peer],
515 data_dir: data_dir.to_path_buf(),
516 allocator: None, slice_allocator: None,
518 dns_config: None,
519 dns_handle: None,
520 transport: None,
521 #[cfg(feature = "nat")]
522 nat_traversal: None,
523 #[cfg(feature = "nat")]
524 relay_server: None,
525 #[cfg(feature = "nat")]
526 nat_config: None,
527 };
528
529 bootstrap.save().await?;
531
532 Ok(bootstrap)
533 }
534
535 pub async fn load(data_dir: &Path) -> Result<Self> {
541 let config_path = data_dir.join("overlay_bootstrap.json");
542
543 if !config_path.exists() {
544 return Err(OverlayError::NotInitialized);
545 }
546
547 let contents = tokio::fs::read_to_string(&config_path).await?;
548 let state: BootstrapState = serde_json::from_str(&contents)?;
549
550 let allocator = if let Some(alloc_state) = state.allocator_state {
551 Some(IpAllocator::from_state(alloc_state)?)
552 } else {
553 None
554 };
555
556 let slice_allocator = if let Some(snapshot) = state.slice_allocator_state {
557 Some(NodeSliceAllocator::restore(snapshot)?)
558 } else {
559 None
560 };
561
562 Ok(Self {
563 config: state.config,
564 peers: state.peers,
565 data_dir: data_dir.to_path_buf(),
566 allocator,
567 slice_allocator,
568 dns_config: None, dns_handle: None,
570 transport: None,
571 #[cfg(feature = "nat")]
572 nat_traversal: None,
573 #[cfg(feature = "nat")]
574 relay_server: None,
575 #[cfg(feature = "nat")]
576 nat_config: None,
577 })
578 }
579
580 pub async fn save(&self) -> Result<()> {
586 let config_path = self.data_dir.join("overlay_bootstrap.json");
587
588 let state = BootstrapState {
589 config: self.config.clone(),
590 peers: self.peers.clone(),
591 allocator_state: self
592 .allocator
593 .as_ref()
594 .map(super::allocator::IpAllocator::to_state),
595 slice_allocator_state: self
596 .slice_allocator
597 .as_ref()
598 .map(NodeSliceAllocator::snapshot),
599 };
600
601 let contents = serde_json::to_string_pretty(&state)?;
602 tokio::fs::write(&config_path, contents).await?;
603
604 debug!(path = %config_path.display(), "Saved bootstrap state");
605 Ok(())
606 }
607
608 pub fn with_dns(mut self, zone: &str, port: u16) -> Result<Self> {
632 self.dns_config = Some(DnsConfig {
633 zone: zone.to_string(),
634 port,
635 bind_addr: self.config.node_ip,
636 upstreams: None,
640 });
641 Ok(self)
642 }
643
644 pub fn with_dns_default(self, zone: &str) -> Result<Self> {
650 self.with_dns(zone, DEFAULT_DNS_PORT)
651 }
652
653 #[cfg(feature = "nat")]
658 #[must_use]
659 pub fn with_nat_config(mut self, nat: crate::nat::NatConfig) -> Self {
660 self.nat_config = Some(nat);
661 self
662 }
663
664 #[must_use]
668 pub fn dns_handle(&self) -> Option<&DnsHandle> {
669 self.dns_handle.as_ref()
670 }
671
672 #[must_use]
674 pub fn dns_enabled(&self) -> bool {
675 self.dns_config.is_some()
676 }
677
678 pub async fn start(&mut self) -> Result<()> {
687 info!(
688 interface = %self.config.interface,
689 overlay_ip = %self.config.node_ip,
690 port = self.config.port,
691 dns_enabled = self.dns_config.is_some(),
692 "Starting overlay network"
693 );
694
695 let overlay_config = crate::config::OverlayConfig {
697 local_endpoint: SocketAddr::new(
698 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
699 self.config.port,
700 ),
701 private_key: self.config.private_key.clone(),
702 public_key: self.config.public_key.clone(),
703 overlay_cidr: self.config.allowed_ip(),
704 cluster_cidr: Some(self.config.cidr.clone()),
705 peer_discovery_interval: Duration::from_secs(30),
706 #[cfg(feature = "nat")]
707 nat: self.nat_config.clone().unwrap_or_default(),
708 ..crate::config::OverlayConfig::default()
709 };
710
711 #[cfg(feature = "nat")]
712 let nat_config = overlay_config.nat.clone();
713
714 let mut transport = OverlayTransport::new(overlay_config, self.config.interface.clone());
716
717 transport
719 .create_interface()
720 .await
721 .map_err(|e| OverlayError::TransportCommand(e.to_string()))?;
722
723 let actual_name = transport.interface_name().to_string();
726 if actual_name != self.config.interface {
727 info!(
728 requested = %self.config.interface,
729 actual = %actual_name,
730 "Interface name resolved by kernel"
731 );
732 self.config.interface = actual_name;
733 }
734
735 let peer_infos: Vec<PeerInfo> = self
737 .peers
738 .iter()
739 .filter_map(|p| match p.to_peer_info() {
740 Ok(info) => Some(info),
741 Err(e) => {
742 warn!(peer = %p.node_id, error = %e, "Failed to parse peer info");
743 None
744 }
745 })
746 .collect();
747
748 transport
750 .configure(&peer_infos)
751 .await
752 .map_err(|e| OverlayError::TransportCommand(e.to_string()))?;
753
754 self.transport = Some(transport);
757
758 #[cfg(feature = "nat")]
760 self.start_nat_traversal(nat_config).await;
761
762 self.start_dns().await?;
764
765 info!("Overlay network started successfully");
766 Ok(())
767 }
768
769 async fn start_dns(&mut self) -> Result<()> {
771 let Some(dns_config) = &self.dns_config else {
772 return Ok(());
773 };
774
775 info!(
776 zone = %dns_config.zone,
777 port = dns_config.port,
778 "Starting DNS server for overlay"
779 );
780
781 let dns_server =
782 DnsServer::from_config(dns_config).map_err(|e| OverlayError::Dns(e.to_string()))?;
783
784 let self_hostname = peer_hostname(self.config.node_ip);
786 dns_server
787 .add_record(&self_hostname, self.config.node_ip)
788 .await
789 .map_err(|e| OverlayError::Dns(e.to_string()))?;
790
791 if self.config.is_leader {
793 dns_server
794 .add_record("leader", self.config.node_ip)
795 .await
796 .map_err(|e| OverlayError::Dns(e.to_string()))?;
797 debug!(ip = %self.config.node_ip, "Registered leader.{}", dns_config.zone);
798 }
799
800 for peer in &self.peers {
802 let hostname = peer_hostname(peer.overlay_ip);
804 dns_server
805 .add_record(&hostname, peer.overlay_ip)
806 .await
807 .map_err(|e| OverlayError::Dns(e.to_string()))?;
808
809 if let Some(custom) = &peer.hostname {
811 dns_server
812 .add_record(custom, peer.overlay_ip)
813 .await
814 .map_err(|e| OverlayError::Dns(e.to_string()))?;
815 debug!(
816 hostname = custom,
817 ip = %peer.overlay_ip,
818 "Registered custom hostname"
819 );
820 }
821 }
822
823 let handle = dns_server
825 .start()
826 .await
827 .map_err(|e| OverlayError::Dns(e.to_string()))?;
828 self.dns_handle = Some(handle);
829
830 info!("DNS server started successfully");
831 Ok(())
832 }
833
834 #[cfg(feature = "nat")]
836 async fn start_nat_traversal(&mut self, nat_config: crate::nat::NatConfig) {
837 if !nat_config.enabled {
838 return;
839 }
840
841 if let Some(ref relay_config) = nat_config.relay_server {
843 let relay = RelayServer::new(relay_config, &self.config.private_key);
844 match relay.start().await {
845 Ok(()) => {
846 info!("Built-in relay server started");
847 self.relay_server = Some(relay);
848 }
849 Err(e) => {
850 warn!(error = %e, "Failed to start relay server");
851 }
852 }
853 }
854
855 let mut nat = NatTraversal::new(nat_config, self.config.port);
856 match nat.gather_candidates().await {
857 Ok(candidates) => {
858 info!(count = candidates.len(), "Gathered NAT candidates");
859 if let Some(ref transport) = self.transport {
860 for peer in &mut self.peers {
861 if !peer.candidates.is_empty() {
862 match nat
863 .connect_to_peer(transport, &peer.public_key, &peer.candidates)
864 .await
865 {
866 Ok(ct) => {
867 peer.connection_type = ct;
868 info!(
869 peer = %peer.node_id,
870 connection = %ct,
871 "NAT traversal succeeded"
872 );
873 }
874 Err(e) => warn!(
875 peer = %peer.node_id,
876 error = %e,
877 "NAT traversal failed"
878 ),
879 }
880 }
881 }
882 }
883 self.nat_traversal = Some(nat);
884 }
885 Err(e) => warn!(error = %e, "NAT candidate gathering failed"),
886 }
887 }
888
889 #[allow(clippy::unused_async)]
895 pub async fn stop(&mut self) -> Result<()> {
896 info!(interface = %self.config.interface, "Stopping overlay network");
897
898 if let Some(mut transport) = self.transport.take() {
899 transport.shutdown();
900 }
901
902 Ok(())
903 }
904
905 pub fn reconcile_existing_peers(&mut self) -> Result<()> {
915 let Some(ref mut allocator) = self.slice_allocator else {
916 return Ok(());
917 };
918 let mut assigned: Vec<(String, String)> = Vec::new();
920 for peer in &self.peers {
921 if let Some(slice) = peer.slice_cidr {
922 assigned.push((peer.node_id.clone(), slice.to_string()));
923 }
924 }
925 if assigned.is_empty() {
926 return Ok(());
927 }
928 let snapshot = NodeSliceAllocatorSnapshot {
929 cluster_cidr: allocator.cluster_cidr().to_string(),
930 slice_prefix: allocator.slice_prefix(),
931 assigned,
932 };
933 *allocator = NodeSliceAllocator::restore(snapshot)?;
934 Ok(())
935 }
936
937 pub async fn add_peer(&mut self, mut peer: PeerConfig) -> Result<IpAddr> {
945 let overlay_ip = if let Some(ref mut allocator) = self.allocator {
947 let ip = allocator.allocate().ok_or(OverlayError::NoAvailableIps)?;
948 peer.overlay_ip = ip;
949 ip
950 } else {
951 peer.overlay_ip
952 };
953
954 if let Some(ref mut slice_allocator) = self.slice_allocator {
958 let slice = slice_allocator.assign(&peer.node_id)?;
959 peer.slice_cidr = Some(slice);
960 }
964
965 if let Ok(peer_info) = peer.to_peer_info() {
967 let transport_ref: Option<&OverlayTransport> = self.transport.as_ref();
970
971 let result = if let Some(t) = transport_ref {
972 t.add_peer(&peer_info).await
973 } else {
974 let overlay_config = crate::config::OverlayConfig {
975 local_endpoint: SocketAddr::new(
976 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
977 self.config.port,
978 ),
979 private_key: self.config.private_key.clone(),
980 public_key: self.config.public_key.clone(),
981 overlay_cidr: self.config.allowed_ip(),
982 cluster_cidr: Some(self.config.cidr.clone()),
983 peer_discovery_interval: Duration::from_secs(30),
984 #[cfg(feature = "nat")]
985 nat: crate::nat::NatConfig::default(),
986 ..crate::config::OverlayConfig::default()
987 };
988 let tmp = OverlayTransport::new(overlay_config, self.config.interface.clone());
989 tmp.add_peer(&peer_info).await
990 };
991
992 match result {
993 Ok(()) => debug!(peer = %peer.node_id, "Added peer to overlay"),
994 Err(e) => {
995 warn!(peer = %peer.node_id, error = %e, "Failed to add peer to overlay (interface may not be up)");
996 }
997 }
998 }
999
1000 if let Some(ref dns_handle) = self.dns_handle {
1002 let hostname = peer_hostname(overlay_ip);
1004 dns_handle
1005 .add_record(&hostname, overlay_ip)
1006 .await
1007 .map_err(|e| OverlayError::Dns(e.to_string()))?;
1008 debug!(hostname = %hostname, ip = %overlay_ip, "Registered peer in DNS");
1009
1010 if let Some(ref custom) = peer.hostname {
1012 dns_handle
1013 .add_record(custom, overlay_ip)
1014 .await
1015 .map_err(|e| OverlayError::Dns(e.to_string()))?;
1016 debug!(hostname = %custom, ip = %overlay_ip, "Registered custom hostname in DNS");
1017 }
1018 }
1019
1020 #[cfg(feature = "nat")]
1022 {
1023 if let (Some(ref nat), Some(ref transport)) = (&self.nat_traversal, &self.transport) {
1024 if !peer.candidates.is_empty() {
1025 match nat
1026 .connect_to_peer(transport, &peer.public_key, &peer.candidates)
1027 .await
1028 {
1029 Ok(ct) => {
1030 peer.connection_type = ct;
1031 info!(
1032 peer = %peer.node_id,
1033 connection = %ct,
1034 "NAT traversal for new peer"
1035 );
1036 }
1037 Err(e) => warn!(
1038 peer = %peer.node_id,
1039 error = %e,
1040 "NAT failed for new peer"
1041 ),
1042 }
1043 }
1044 }
1045 }
1046
1047 self.peers.push(peer);
1049
1050 self.save().await?;
1052
1053 info!(peer_ip = %overlay_ip, "Added peer to overlay");
1054 Ok(overlay_ip)
1055 }
1056
1057 pub async fn remove_peer(&mut self, public_key: &str) -> Result<()> {
1063 let peer_idx = self
1065 .peers
1066 .iter()
1067 .position(|p| p.public_key == public_key)
1068 .ok_or_else(|| OverlayError::PeerNotFound(public_key.to_string()))?;
1069
1070 let peer = &self.peers[peer_idx];
1071
1072 let peer_overlay_ip = peer.overlay_ip;
1074 let peer_custom_hostname = peer.hostname.clone();
1075
1076 if let Some(ref mut allocator) = self.allocator {
1078 allocator.release(peer_overlay_ip);
1079 }
1080
1081 if let Some(ref dns_handle) = self.dns_handle {
1083 let hostname = peer_hostname(peer_overlay_ip);
1085 dns_handle
1086 .remove_record(&hostname)
1087 .await
1088 .map_err(|e| OverlayError::Dns(e.to_string()))?;
1089 debug!(hostname = %hostname, "Removed peer from DNS");
1090
1091 if let Some(ref custom) = peer_custom_hostname {
1093 dns_handle
1094 .remove_record(custom)
1095 .await
1096 .map_err(|e| OverlayError::Dns(e.to_string()))?;
1097 debug!(hostname = %custom, "Removed custom hostname from DNS");
1098 }
1099 }
1100
1101 let transport_ref: Option<&OverlayTransport> = self.transport.as_ref();
1103
1104 let result = if let Some(t) = transport_ref {
1105 t.remove_peer(public_key).await
1106 } else {
1107 let overlay_config = crate::config::OverlayConfig {
1108 local_endpoint: SocketAddr::new(
1109 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
1110 self.config.port,
1111 ),
1112 private_key: self.config.private_key.clone(),
1113 public_key: self.config.public_key.clone(),
1114 overlay_cidr: self.config.allowed_ip(),
1115 cluster_cidr: Some(self.config.cidr.clone()),
1116 peer_discovery_interval: Duration::from_secs(30),
1117 #[cfg(feature = "nat")]
1118 nat: crate::nat::NatConfig::default(),
1119 ..crate::config::OverlayConfig::default()
1120 };
1121 let tmp = OverlayTransport::new(overlay_config, self.config.interface.clone());
1122 tmp.remove_peer(public_key).await
1123 };
1124
1125 match result {
1126 Ok(()) => debug!(public_key = public_key, "Removed peer from overlay"),
1127 Err(e) => {
1128 warn!(public_key = public_key, error = %e, "Failed to remove peer from overlay");
1129 }
1130 }
1131
1132 self.peers.remove(peer_idx);
1134
1135 self.save().await?;
1137
1138 info!(public_key = public_key, "Removed peer from overlay");
1139 Ok(())
1140 }
1141
1142 #[must_use]
1144 pub fn public_key(&self) -> &str {
1145 &self.config.public_key
1146 }
1147
1148 #[must_use]
1150 pub fn node_ip(&self) -> IpAddr {
1151 self.config.node_ip
1152 }
1153
1154 #[must_use]
1156 pub fn cidr(&self) -> &str {
1157 &self.config.cidr
1158 }
1159
1160 #[must_use]
1162 pub fn interface(&self) -> &str {
1163 &self.config.interface
1164 }
1165
1166 #[must_use]
1168 pub fn port(&self) -> u16 {
1169 self.config.port
1170 }
1171
1172 #[must_use]
1174 pub fn is_leader(&self) -> bool {
1175 self.config.is_leader
1176 }
1177
1178 #[must_use]
1180 pub fn peers(&self) -> &[PeerConfig] {
1181 &self.peers
1182 }
1183
1184 #[must_use]
1186 pub fn config(&self) -> &BootstrapConfig {
1187 &self.config
1188 }
1189
1190 pub fn allocate_peer_ip(&mut self) -> Result<IpAddr> {
1198 let allocator = self
1199 .allocator
1200 .as_mut()
1201 .ok_or(OverlayError::Config("Not a leader node".to_string()))?;
1202
1203 allocator.allocate().ok_or(OverlayError::NoAvailableIps)
1204 }
1205
1206 #[must_use]
1208 #[allow(clippy::cast_possible_truncation)]
1209 pub fn allocation_stats(&self) -> Option<(u32, u32)> {
1210 self.allocator
1211 .as_ref()
1212 .map(|a| (a.allocated_count() as u32, a.total_hosts()))
1213 }
1214
1215 #[cfg(feature = "nat")]
1225 pub async fn nat_maintenance_tick(&mut self) -> Result<()> {
1226 let (Some(nat), Some(transport)) = (&mut self.nat_traversal, &self.transport) else {
1227 return Ok(());
1228 };
1229
1230 if nat.refresh().await? {
1231 info!("Reflexive address changed");
1232 }
1233
1234 for peer in &mut self.peers {
1235 if peer.connection_type == ConnectionType::Relayed && !peer.candidates.is_empty() {
1236 if let Ok(Some(upgraded)) = nat
1237 .attempt_upgrade(transport, &peer.public_key, &peer.candidates)
1238 .await
1239 {
1240 peer.connection_type = upgraded;
1241 info!(
1242 peer = %peer.node_id,
1243 connection = %upgraded,
1244 "Upgraded relayed connection"
1245 );
1246 }
1247 }
1248 }
1249
1250 Ok(())
1251 }
1252
1253 #[cfg(feature = "nat")]
1258 #[must_use]
1259 pub fn nat_candidates(&self) -> Vec<Candidate> {
1260 self.nat_traversal
1261 .as_ref()
1262 .map(|n| n.local_candidates().to_vec())
1263 .unwrap_or_default()
1264 }
1265}
1266
1267fn current_timestamp() -> u64 {
1269 std::time::SystemTime::now()
1270 .duration_since(std::time::UNIX_EPOCH)
1271 .unwrap_or_default()
1272 .as_secs()
1273}
1274
1275#[cfg(test)]
1276mod tests {
1277 use super::*;
1278 use std::net::Ipv4Addr;
1279
1280 #[test]
1281 fn test_bootstrap_config_allowed_ip_v4() {
1282 let config = BootstrapConfig {
1283 cidr: "10.200.0.0/16".to_string(),
1284 node_ip: IpAddr::V4(Ipv4Addr::new(10, 200, 0, 1)),
1285 interface: DEFAULT_INTERFACE_NAME.to_string(),
1286 port: DEFAULT_WG_PORT,
1287 private_key: "test_private".to_string(),
1288 public_key: "test_public".to_string(),
1289 is_leader: true,
1290 created_at: 0,
1291 slice_cidr: None,
1292 };
1293
1294 assert_eq!(config.allowed_ip(), "10.200.0.1/32");
1295 }
1296
1297 #[test]
1298 fn test_bootstrap_config_allowed_ip_v6() {
1299 let config = BootstrapConfig {
1300 cidr: "fd00:200::/48".to_string(),
1301 node_ip: "fd00:200::1".parse::<IpAddr>().unwrap(),
1302 interface: DEFAULT_INTERFACE_NAME.to_string(),
1303 port: DEFAULT_WG_PORT,
1304 private_key: "test_private".to_string(),
1305 public_key: "test_public".to_string(),
1306 is_leader: true,
1307 created_at: 0,
1308 slice_cidr: None,
1309 };
1310
1311 assert_eq!(config.allowed_ip(), "fd00:200::1/128");
1312 }
1313
1314 #[test]
1315 fn test_peer_config_new_v4() {
1316 let peer = PeerConfig::new(
1317 "node-1".to_string(),
1318 "pubkey123".to_string(),
1319 "192.168.1.100:51820".to_string(),
1320 IpAddr::V4(Ipv4Addr::new(10, 200, 0, 5)),
1321 );
1322
1323 assert_eq!(peer.node_id, "node-1");
1324 assert_eq!(peer.keepalive, Some(DEFAULT_KEEPALIVE_SECS));
1325 assert_eq!(peer.hostname, None);
1326 }
1327
1328 #[test]
1329 fn test_peer_config_new_v6() {
1330 let peer = PeerConfig::new(
1331 "node-1".to_string(),
1332 "pubkey123".to_string(),
1333 "[::1]:51820".to_string(),
1334 "fd00:200::5".parse::<IpAddr>().unwrap(),
1335 );
1336
1337 assert_eq!(peer.node_id, "node-1");
1338 assert_eq!(peer.keepalive, Some(DEFAULT_KEEPALIVE_SECS));
1339 assert_eq!(peer.hostname, None);
1340 }
1341
1342 #[test]
1343 fn test_peer_config_with_hostname() {
1344 let peer = PeerConfig::new(
1345 "node-1".to_string(),
1346 "pubkey123".to_string(),
1347 "192.168.1.100:51820".to_string(),
1348 IpAddr::V4(Ipv4Addr::new(10, 200, 0, 5)),
1349 )
1350 .with_hostname("web-server");
1351
1352 assert_eq!(peer.hostname, Some("web-server".to_string()));
1353 }
1354
1355 #[test]
1356 fn test_peer_config_to_peer_info_v4() {
1357 let peer = PeerConfig::new(
1358 "node-1".to_string(),
1359 "pubkey123".to_string(),
1360 "192.168.1.100:51820".to_string(),
1361 IpAddr::V4(Ipv4Addr::new(10, 200, 0, 5)),
1362 );
1363
1364 let peer_info = peer.to_peer_info().unwrap();
1365 assert_eq!(peer_info.public_key, "pubkey123");
1366 assert_eq!(peer_info.allowed_ips, "10.200.0.5/32");
1367 }
1368
1369 #[test]
1370 fn test_peer_config_to_peer_info_v6() {
1371 let peer = PeerConfig::new(
1372 "node-1".to_string(),
1373 "pubkey123".to_string(),
1374 "[::1]:51820".to_string(),
1375 "fd00:200::5".parse::<IpAddr>().unwrap(),
1376 );
1377
1378 let peer_info = peer.to_peer_info().unwrap();
1379 assert_eq!(peer_info.public_key, "pubkey123");
1380 assert_eq!(peer_info.allowed_ips, "fd00:200::5/128");
1381 }
1382
1383 #[test]
1384 fn test_bootstrap_state_serialization_v4() {
1385 let config = BootstrapConfig {
1386 cidr: "10.200.0.0/16".to_string(),
1387 node_ip: IpAddr::V4(Ipv4Addr::new(10, 200, 0, 1)),
1388 interface: DEFAULT_INTERFACE_NAME.to_string(),
1389 port: DEFAULT_WG_PORT,
1390 private_key: "private".to_string(),
1391 public_key: "public".to_string(),
1392 is_leader: true,
1393 created_at: 1_234_567_890,
1394 slice_cidr: None,
1395 };
1396
1397 let state = BootstrapState {
1398 config,
1399 peers: vec![],
1400 allocator_state: None,
1401 slice_allocator_state: None,
1402 };
1403
1404 let json = serde_json::to_string_pretty(&state).unwrap();
1405 let deserialized: BootstrapState = serde_json::from_str(&json).unwrap();
1406
1407 assert_eq!(deserialized.config.cidr, "10.200.0.0/16");
1408 assert_eq!(deserialized.config.node_ip.to_string(), "10.200.0.1");
1409 }
1410
1411 #[test]
1412 fn test_bootstrap_state_serialization_v6() {
1413 let config = BootstrapConfig {
1414 cidr: "fd00:200::/48".to_string(),
1415 node_ip: "fd00:200::1".parse::<IpAddr>().unwrap(),
1416 interface: DEFAULT_INTERFACE_NAME.to_string(),
1417 port: DEFAULT_WG_PORT,
1418 private_key: "private".to_string(),
1419 public_key: "public".to_string(),
1420 is_leader: true,
1421 created_at: 1_234_567_890,
1422 slice_cidr: None,
1423 };
1424
1425 let state = BootstrapState {
1426 config,
1427 peers: vec![],
1428 allocator_state: None,
1429 slice_allocator_state: None,
1430 };
1431
1432 let json = serde_json::to_string_pretty(&state).unwrap();
1433 let deserialized: BootstrapState = serde_json::from_str(&json).unwrap();
1434
1435 assert_eq!(deserialized.config.cidr, "fd00:200::/48");
1436 assert_eq!(deserialized.config.node_ip.to_string(), "fd00:200::1");
1437 }
1438
1439 #[test]
1440 fn test_default_overlay_cidr_v6_constant() {
1441 let net: ipnet::IpNet = DEFAULT_OVERLAY_CIDR_V6.parse().unwrap();
1443 assert!(matches!(net, ipnet::IpNet::V6(_)));
1444 assert_eq!(net.prefix_len(), 48);
1445 }
1446
1447 #[test]
1448 fn test_to_peer_info_uses_slice_when_set() {
1449 let peer = PeerConfig::new(
1450 "node-42".to_string(),
1451 "pubkey-xyz".to_string(),
1452 "192.168.1.100:51820".to_string(),
1453 IpAddr::V4(Ipv4Addr::new(10, 200, 42, 1)),
1454 )
1455 .with_slice_cidr("10.200.42.0/28".parse().unwrap());
1456
1457 let peer_info = peer.to_peer_info().unwrap();
1458 assert_eq!(peer_info.allowed_ips, "10.200.42.0/28");
1459 }
1460
1461 #[test]
1462 fn test_to_peer_info_falls_back_to_node_ip_when_no_slice() {
1463 let peer = PeerConfig::new(
1464 "node-5".to_string(),
1465 "pubkey-abc".to_string(),
1466 "192.168.1.100:51820".to_string(),
1467 "10.200.0.5".parse().unwrap(),
1468 );
1469
1470 let peer_info = peer.to_peer_info().unwrap();
1471 assert_eq!(peer_info.allowed_ips, "10.200.0.5/32");
1472 }
1473
1474 #[test]
1475 fn test_bootstrap_config_allowed_ip_prefers_slice() {
1476 let config = BootstrapConfig {
1477 cidr: "10.200.0.0/16".to_string(),
1478 node_ip: IpAddr::V4(Ipv4Addr::new(10, 200, 7, 1)),
1479 interface: DEFAULT_INTERFACE_NAME.to_string(),
1480 port: DEFAULT_WG_PORT,
1481 private_key: "private".to_string(),
1482 public_key: "public".to_string(),
1483 is_leader: false,
1484 created_at: 0,
1485 slice_cidr: Some("10.200.7.0/28".parse().unwrap()),
1486 };
1487
1488 assert_eq!(config.allowed_ip(), "10.200.7.0/28");
1489 }
1490
1491 #[test]
1492 fn test_bootstrap_config_allowed_ip_falls_back_to_node_ip() {
1493 let config = BootstrapConfig {
1494 cidr: "10.200.0.0/16".to_string(),
1495 node_ip: IpAddr::V4(Ipv4Addr::new(10, 200, 0, 9)),
1496 interface: DEFAULT_INTERFACE_NAME.to_string(),
1497 port: DEFAULT_WG_PORT,
1498 private_key: "private".to_string(),
1499 public_key: "public".to_string(),
1500 is_leader: false,
1501 created_at: 0,
1502 slice_cidr: None,
1503 };
1504
1505 assert_eq!(config.allowed_ip(), "10.200.0.9/32");
1506 }
1507}