kratad/
lib.rs

1use crate::db::network::NetworkReservationStore;
2use crate::db::zone::ZoneStore;
3use crate::db::KrataDatabase;
4use crate::network::assignment::NetworkAssignment;
5use anyhow::{anyhow, Result};
6use config::DaemonConfig;
7use console::{DaemonConsole, DaemonConsoleHandle};
8use control::DaemonControlService;
9use devices::DaemonDeviceManager;
10use event::{DaemonEventContext, DaemonEventGenerator};
11use idm::{DaemonIdm, DaemonIdmHandle};
12use ipnetwork::{Ipv4Network, Ipv6Network};
13use krata::{dial::ControlDialAddress, v1::control::control_service_server::ControlServiceServer};
14use krataoci::{packer::service::OciPackerService, registry::OciPlatform};
15use kratart::Runtime;
16use log::{debug, info};
17use reconcile::zone::ZoneReconciler;
18use std::path::Path;
19use std::time::Duration;
20use std::{net::SocketAddr, path::PathBuf, str::FromStr, sync::Arc};
21use tokio::{
22    fs,
23    net::UnixListener,
24    sync::mpsc::{channel, Sender},
25    task::JoinHandle,
26};
27use tokio_stream::wrappers::UnixListenerStream;
28use tonic::transport::{Identity, Server, ServerTlsConfig};
29use uuid::Uuid;
30use zlt::ZoneLookupTable;
31
32pub mod command;
33pub mod config;
34pub mod console;
35pub mod control;
36pub mod db;
37pub mod devices;
38pub mod event;
39pub mod idm;
40pub mod metrics;
41pub mod network;
42pub mod oci;
43pub mod reconcile;
44pub mod zlt;
45
46pub struct Daemon {
47    store: String,
48    _config: Arc<DaemonConfig>,
49    zlt: ZoneLookupTable,
50    devices: DaemonDeviceManager,
51    zones: ZoneStore,
52    network: NetworkAssignment,
53    events: DaemonEventContext,
54    zone_reconciler_task: JoinHandle<()>,
55    zone_reconciler_notify: Sender<Uuid>,
56    generator_task: JoinHandle<()>,
57    idm: DaemonIdmHandle,
58    console: DaemonConsoleHandle,
59    packer: OciPackerService,
60    runtime: Runtime,
61}
62
63const ZONE_RECONCILER_QUEUE_LEN: usize = 1000;
64
65impl Daemon {
66    pub async fn new(store: String) -> Result<Self> {
67        let store_dir = PathBuf::from(store.clone());
68        debug!("loading configuration");
69        let mut config_path = store_dir.clone();
70        config_path.push("config.toml");
71
72        let config = DaemonConfig::load(&config_path).await?;
73        let config = Arc::new(config);
74        debug!("initializing device manager");
75        let devices = DaemonDeviceManager::new(config.clone());
76
77        debug!("validating image cache directory");
78        let mut image_cache_dir = store_dir.clone();
79        image_cache_dir.push("cache");
80        image_cache_dir.push("image");
81        fs::create_dir_all(&image_cache_dir).await?;
82
83        debug!("loading zone0 uuid");
84        let mut host_uuid_path = store_dir.clone();
85        host_uuid_path.push("host.uuid");
86        let host_uuid = if host_uuid_path.is_file() {
87            let content = fs::read_to_string(&host_uuid_path).await?;
88            Uuid::from_str(content.trim()).ok()
89        } else {
90            None
91        };
92
93        let host_uuid = if let Some(host_uuid) = host_uuid {
94            host_uuid
95        } else {
96            let generated = Uuid::new_v4();
97            let mut string = generated.to_string();
98            string.push('\n');
99            fs::write(&host_uuid_path, string).await?;
100            generated
101        };
102
103        debug!("validating zone asset directories");
104        let initrd_path = detect_zone_path(&store, "initrd")?;
105        let kernel_path = detect_zone_path(&store, "kernel")?;
106        let addons_path = detect_zone_path(&store, "addons.squashfs")?;
107
108        debug!("initializing caches and hydrating zone state");
109        let seed = config.oci.seed.clone().map(PathBuf::from);
110        let packer = OciPackerService::new(seed, &image_cache_dir, OciPlatform::current()).await?;
111        debug!("initializing core runtime");
112        let runtime = Runtime::new().await?;
113        let zlt = ZoneLookupTable::new(0, host_uuid);
114        let db_path = format!("{}/krata.db", store);
115        let database = KrataDatabase::open(Path::new(&db_path))?;
116        let zones = ZoneStore::open(database.clone())?;
117        let (zone_reconciler_notify, zone_reconciler_receiver) =
118            channel::<Uuid>(ZONE_RECONCILER_QUEUE_LEN);
119        debug!("starting IDM service");
120        let idm = DaemonIdm::new(zlt.clone()).await?;
121        let idm = idm.launch().await?;
122        debug!("initializing console interfaces");
123        let console = DaemonConsole::new(zlt.clone()).await?;
124        let console = console.launch().await?;
125        let (events, generator) =
126            DaemonEventGenerator::new(zones.clone(), zone_reconciler_notify.clone(), idm.clone())
127                .await?;
128        let runtime_for_reconciler = runtime.dupe().await?;
129        let ipv4_network = Ipv4Network::from_str(&config.network.ipv4.subnet)?;
130        let ipv6_network = Ipv6Network::from_str(&config.network.ipv6.subnet)?;
131        let network_reservation_store = NetworkReservationStore::open(database)?;
132        let network = NetworkAssignment::new(
133            host_uuid,
134            ipv4_network,
135            ipv6_network,
136            network_reservation_store,
137        )
138        .await?;
139        debug!("initializing zone reconciler");
140        let zone_reconciler = ZoneReconciler::new(
141            devices.clone(),
142            zlt.clone(),
143            zones.clone(),
144            events.clone(),
145            runtime_for_reconciler,
146            packer.clone(),
147            zone_reconciler_notify.clone(),
148            kernel_path,
149            initrd_path,
150            addons_path,
151            network.clone(),
152            config.clone(),
153        )?;
154
155        let zone_reconciler_task = zone_reconciler.launch(zone_reconciler_receiver).await?;
156        let generator_task = generator.launch().await?;
157
158        // TODO: Create a way of abstracting early init tasks in kratad.
159        // TODO: Make initial power management policy configurable.
160        let power = runtime.power_management_context().await?;
161        power.set_smt_policy(true).await?;
162        power
163            .set_scheduler_policy("performance".to_string())
164            .await?;
165        info!("power management initialized");
166
167        info!("krata daemon initialized");
168        Ok(Self {
169            store,
170            _config: config,
171            zlt,
172            devices,
173            zones,
174            network,
175            events,
176            zone_reconciler_task,
177            zone_reconciler_notify,
178            generator_task,
179            idm,
180            console,
181            packer,
182            runtime,
183        })
184    }
185
186    pub async fn listen(&mut self, addr: ControlDialAddress) -> Result<()> {
187        debug!("starting control service");
188        let control_service = DaemonControlService::new(
189            self.zlt.clone(),
190            self.devices.clone(),
191            self.events.clone(),
192            self.console.clone(),
193            self.idm.clone(),
194            self.zones.clone(),
195            self.network.clone(),
196            self.zone_reconciler_notify.clone(),
197            self.packer.clone(),
198            self.runtime.clone(),
199        );
200
201        let mut server = Server::builder();
202
203        if let ControlDialAddress::Tls {
204            host: _,
205            port: _,
206            insecure,
207        } = &addr
208        {
209            let mut tls_config = ServerTlsConfig::new();
210            if !insecure {
211                let certificate_path = format!("{}/tls/daemon.pem", self.store);
212                let key_path = format!("{}/tls/daemon.key", self.store);
213                tls_config = tls_config.identity(Identity::from_pem(certificate_path, key_path));
214            }
215            server = server.tls_config(tls_config)?;
216        }
217
218        server = server.http2_keepalive_interval(Some(Duration::from_secs(10)));
219
220        let server = server.add_service(ControlServiceServer::new(control_service));
221        info!("listening on address {}", addr);
222        match addr {
223            ControlDialAddress::UnixSocket { path } => {
224                let path = PathBuf::from(path);
225                if path.exists() {
226                    fs::remove_file(&path).await?;
227                }
228                let listener = UnixListener::bind(path)?;
229                let stream = UnixListenerStream::new(listener);
230                server.serve_with_incoming(stream).await?;
231            }
232
233            ControlDialAddress::Tcp { host, port } => {
234                let address = format!("{}:{}", host, port);
235                server.serve(SocketAddr::from_str(&address)?).await?;
236            }
237
238            ControlDialAddress::Tls {
239                host,
240                port,
241                insecure: _,
242            } => {
243                let address = format!("{}:{}", host, port);
244                server.serve(SocketAddr::from_str(&address)?).await?;
245            }
246        }
247        Ok(())
248    }
249}
250
251impl Drop for Daemon {
252    fn drop(&mut self) {
253        self.zone_reconciler_task.abort();
254        self.generator_task.abort();
255    }
256}
257
258fn detect_zone_path(store: &str, name: &str) -> Result<PathBuf> {
259    let mut path = PathBuf::from(format!("{}/zone/{}", store, name));
260    if path.is_file() {
261        return Ok(path);
262    }
263
264    path = PathBuf::from(format!("/usr/share/krata/zone/{}", name));
265    if path.is_file() {
266        return Ok(path);
267    }
268    Err(anyhow!("unable to find required zone file: {}", name))
269}