kratanet/
lib.rs

1use std::{collections::HashMap, str::FromStr, time::Duration};
2
3use anyhow::{anyhow, Result};
4use autonet::{AutoNetworkChangeset, AutoNetworkWatcher, NetworkMetadata};
5use futures::{future::join_all, TryFutureExt};
6use hbridge::HostBridge;
7use krata::{
8    client::ControlClientProvider,
9    dial::ControlDialAddress,
10    v1::{
11        common::Zone,
12        control::{control_service_client::ControlServiceClient, GetHostStatusRequest},
13    },
14};
15use log::warn;
16use smoltcp::wire::{EthernetAddress, Ipv4Cidr, Ipv6Cidr};
17use tokio::{task::JoinHandle, time::sleep};
18use tonic::{transport::Channel, Request};
19use uuid::Uuid;
20use vbridge::VirtualBridge;
21
22use crate::backend::NetworkBackend;
23
24pub mod autonet;
25pub mod backend;
26pub mod chandev;
27pub mod hbridge;
28pub mod icmp;
29pub mod nat;
30pub mod pkt;
31pub mod proxynat;
32pub mod raw_socket;
33pub mod vbridge;
34
35const HOST_BRIDGE_MTU: usize = 1500;
36pub const EXTRA_MTU: usize = 20;
37
38pub struct NetworkService {
39    pub control: ControlServiceClient<Channel>,
40    pub zones: HashMap<Uuid, Zone>,
41    pub backends: HashMap<Uuid, JoinHandle<()>>,
42    pub bridge: VirtualBridge,
43    pub hbridge: HostBridge,
44}
45
46impl NetworkService {
47    pub async fn new(control_address: ControlDialAddress) -> Result<NetworkService> {
48        let mut control = ControlClientProvider::dial(control_address).await?;
49        let host_status = control
50            .get_host_status(Request::new(GetHostStatusRequest {}))
51            .await?
52            .into_inner();
53        let host_ipv4 = Ipv4Cidr::from_str(&host_status.host_ipv4)
54            .map_err(|_| anyhow!("failed to parse host ipv4 cidr"))?;
55        let host_ipv6 = Ipv6Cidr::from_str(&host_status.host_ipv6)
56            .map_err(|_| anyhow!("failed to parse host ipv6 cidr"))?;
57        let host_mac = EthernetAddress::from_str(&host_status.host_mac)
58            .map_err(|_| anyhow!("failed to parse host mac address"))?;
59        let bridge = VirtualBridge::new()?;
60        let hbridge = HostBridge::new(
61            HOST_BRIDGE_MTU + EXTRA_MTU,
62            "krata0".to_string(),
63            &bridge,
64            host_ipv4,
65            host_ipv6,
66            host_mac,
67        )
68        .await?;
69        Ok(NetworkService {
70            control,
71            zones: HashMap::new(),
72            backends: HashMap::new(),
73            bridge,
74            hbridge,
75        })
76    }
77}
78
79impl NetworkService {
80    pub async fn watch(&mut self) -> Result<()> {
81        let mut watcher = AutoNetworkWatcher::new(self.control.clone()).await?;
82        let mut receiver = watcher.events.subscribe();
83        loop {
84            let changeset = watcher.read_changes().await?;
85            self.process_network_changeset(&mut watcher, changeset)
86                .await?;
87            watcher.wait(&mut receiver).await?;
88        }
89    }
90
91    async fn process_network_changeset(
92        &mut self,
93        collector: &mut AutoNetworkWatcher,
94        changeset: AutoNetworkChangeset,
95    ) -> Result<()> {
96        for removal in &changeset.removed {
97            if let Some(handle) = self.backends.remove(&removal.uuid) {
98                handle.abort();
99            }
100        }
101
102        let futures = changeset
103            .added
104            .iter()
105            .map(|metadata| {
106                self.add_network_backend(metadata)
107                    .map_err(|x| (metadata.clone(), x))
108            })
109            .collect::<Vec<_>>();
110
111        sleep(Duration::from_secs(1)).await;
112        let mut failed: Vec<Uuid> = Vec::new();
113        let mut launched: Vec<(Uuid, JoinHandle<()>)> = Vec::new();
114        let results = join_all(futures).await;
115        for result in results {
116            match result {
117                Ok(launch) => {
118                    launched.push(launch);
119                }
120
121                Err((metadata, error)) => {
122                    warn!(
123                        "failed to launch network backend for krata zone {}: {}",
124                        metadata.uuid, error
125                    );
126                    failed.push(metadata.uuid);
127                }
128            };
129        }
130
131        for (uuid, handle) in launched {
132            self.backends.insert(uuid, handle);
133        }
134
135        for uuid in failed {
136            collector.mark_unknown(uuid)?;
137        }
138
139        Ok(())
140    }
141
142    async fn add_network_backend(
143        &self,
144        metadata: &NetworkMetadata,
145    ) -> Result<(Uuid, JoinHandle<()>)> {
146        let mut network = NetworkBackend::new(metadata.clone(), self.bridge.clone())?;
147        network.init().await?;
148        Ok((metadata.uuid, network.launch().await?))
149    }
150}