1use crate::db::network::NetworkReservationStore;
2use crate::db::zone::ZoneStore;
3use crate::db::KrataDatabase;
4use crate::network::assignment::NetworkAssignment;
5use anyhow::{anyhow, Result};
6use config::DaemonConfig;
7use console::{DaemonConsole, DaemonConsoleHandle};
8use control::DaemonControlService;
9use devices::DaemonDeviceManager;
10use event::{DaemonEventContext, DaemonEventGenerator};
11use idm::{DaemonIdm, DaemonIdmHandle};
12use ipnetwork::{Ipv4Network, Ipv6Network};
13use krata::{dial::ControlDialAddress, v1::control::control_service_server::ControlServiceServer};
14use krataoci::{packer::service::OciPackerService, registry::OciPlatform};
15use kratart::Runtime;
16use log::{debug, info};
17use reconcile::zone::ZoneReconciler;
18use std::path::Path;
19use std::time::Duration;
20use std::{net::SocketAddr, path::PathBuf, str::FromStr, sync::Arc};
21use tokio::{
22 fs,
23 net::UnixListener,
24 sync::mpsc::{channel, Sender},
25 task::JoinHandle,
26};
27use tokio_stream::wrappers::UnixListenerStream;
28use tonic::transport::{Identity, Server, ServerTlsConfig};
29use uuid::Uuid;
30use zlt::ZoneLookupTable;
31
32pub mod command;
33pub mod config;
34pub mod console;
35pub mod control;
36pub mod db;
37pub mod devices;
38pub mod event;
39pub mod idm;
40pub mod metrics;
41pub mod network;
42pub mod oci;
43pub mod reconcile;
44pub mod zlt;
45
46pub struct Daemon {
47 store: String,
48 _config: Arc<DaemonConfig>,
49 zlt: ZoneLookupTable,
50 devices: DaemonDeviceManager,
51 zones: ZoneStore,
52 network: NetworkAssignment,
53 events: DaemonEventContext,
54 zone_reconciler_task: JoinHandle<()>,
55 zone_reconciler_notify: Sender<Uuid>,
56 generator_task: JoinHandle<()>,
57 idm: DaemonIdmHandle,
58 console: DaemonConsoleHandle,
59 packer: OciPackerService,
60 runtime: Runtime,
61}
62
63const ZONE_RECONCILER_QUEUE_LEN: usize = 1000;
64
65impl Daemon {
66 pub async fn new(store: String) -> Result<Self> {
67 let store_dir = PathBuf::from(store.clone());
68 debug!("loading configuration");
69 let mut config_path = store_dir.clone();
70 config_path.push("config.toml");
71
72 let config = DaemonConfig::load(&config_path).await?;
73 let config = Arc::new(config);
74 debug!("initializing device manager");
75 let devices = DaemonDeviceManager::new(config.clone());
76
77 debug!("validating image cache directory");
78 let mut image_cache_dir = store_dir.clone();
79 image_cache_dir.push("cache");
80 image_cache_dir.push("image");
81 fs::create_dir_all(&image_cache_dir).await?;
82
83 debug!("loading zone0 uuid");
84 let mut host_uuid_path = store_dir.clone();
85 host_uuid_path.push("host.uuid");
86 let host_uuid = if host_uuid_path.is_file() {
87 let content = fs::read_to_string(&host_uuid_path).await?;
88 Uuid::from_str(content.trim()).ok()
89 } else {
90 None
91 };
92
93 let host_uuid = if let Some(host_uuid) = host_uuid {
94 host_uuid
95 } else {
96 let generated = Uuid::new_v4();
97 let mut string = generated.to_string();
98 string.push('\n');
99 fs::write(&host_uuid_path, string).await?;
100 generated
101 };
102
103 debug!("validating zone asset directories");
104 let initrd_path = detect_zone_path(&store, "initrd")?;
105 let kernel_path = detect_zone_path(&store, "kernel")?;
106 let addons_path = detect_zone_path(&store, "addons.squashfs")?;
107
108 debug!("initializing caches and hydrating zone state");
109 let seed = config.oci.seed.clone().map(PathBuf::from);
110 let packer = OciPackerService::new(seed, &image_cache_dir, OciPlatform::current()).await?;
111 debug!("initializing core runtime");
112 let runtime = Runtime::new().await?;
113 let zlt = ZoneLookupTable::new(0, host_uuid);
114 let db_path = format!("{}/krata.db", store);
115 let database = KrataDatabase::open(Path::new(&db_path))?;
116 let zones = ZoneStore::open(database.clone())?;
117 let (zone_reconciler_notify, zone_reconciler_receiver) =
118 channel::<Uuid>(ZONE_RECONCILER_QUEUE_LEN);
119 debug!("starting IDM service");
120 let idm = DaemonIdm::new(zlt.clone()).await?;
121 let idm = idm.launch().await?;
122 debug!("initializing console interfaces");
123 let console = DaemonConsole::new(zlt.clone()).await?;
124 let console = console.launch().await?;
125 let (events, generator) =
126 DaemonEventGenerator::new(zones.clone(), zone_reconciler_notify.clone(), idm.clone())
127 .await?;
128 let runtime_for_reconciler = runtime.dupe().await?;
129 let ipv4_network = Ipv4Network::from_str(&config.network.ipv4.subnet)?;
130 let ipv6_network = Ipv6Network::from_str(&config.network.ipv6.subnet)?;
131 let network_reservation_store = NetworkReservationStore::open(database)?;
132 let network = NetworkAssignment::new(
133 host_uuid,
134 ipv4_network,
135 ipv6_network,
136 network_reservation_store,
137 )
138 .await?;
139 debug!("initializing zone reconciler");
140 let zone_reconciler = ZoneReconciler::new(
141 devices.clone(),
142 zlt.clone(),
143 zones.clone(),
144 events.clone(),
145 runtime_for_reconciler,
146 packer.clone(),
147 zone_reconciler_notify.clone(),
148 kernel_path,
149 initrd_path,
150 addons_path,
151 network.clone(),
152 config.clone(),
153 )?;
154
155 let zone_reconciler_task = zone_reconciler.launch(zone_reconciler_receiver).await?;
156 let generator_task = generator.launch().await?;
157
158 let power = runtime.power_management_context().await?;
161 power.set_smt_policy(true).await?;
162 power
163 .set_scheduler_policy("performance".to_string())
164 .await?;
165 info!("power management initialized");
166
167 info!("krata daemon initialized");
168 Ok(Self {
169 store,
170 _config: config,
171 zlt,
172 devices,
173 zones,
174 network,
175 events,
176 zone_reconciler_task,
177 zone_reconciler_notify,
178 generator_task,
179 idm,
180 console,
181 packer,
182 runtime,
183 })
184 }
185
186 pub async fn listen(&mut self, addr: ControlDialAddress) -> Result<()> {
187 debug!("starting control service");
188 let control_service = DaemonControlService::new(
189 self.zlt.clone(),
190 self.devices.clone(),
191 self.events.clone(),
192 self.console.clone(),
193 self.idm.clone(),
194 self.zones.clone(),
195 self.network.clone(),
196 self.zone_reconciler_notify.clone(),
197 self.packer.clone(),
198 self.runtime.clone(),
199 );
200
201 let mut server = Server::builder();
202
203 if let ControlDialAddress::Tls {
204 host: _,
205 port: _,
206 insecure,
207 } = &addr
208 {
209 let mut tls_config = ServerTlsConfig::new();
210 if !insecure {
211 let certificate_path = format!("{}/tls/daemon.pem", self.store);
212 let key_path = format!("{}/tls/daemon.key", self.store);
213 tls_config = tls_config.identity(Identity::from_pem(certificate_path, key_path));
214 }
215 server = server.tls_config(tls_config)?;
216 }
217
218 server = server.http2_keepalive_interval(Some(Duration::from_secs(10)));
219
220 let server = server.add_service(ControlServiceServer::new(control_service));
221 info!("listening on address {}", addr);
222 match addr {
223 ControlDialAddress::UnixSocket { path } => {
224 let path = PathBuf::from(path);
225 if path.exists() {
226 fs::remove_file(&path).await?;
227 }
228 let listener = UnixListener::bind(path)?;
229 let stream = UnixListenerStream::new(listener);
230 server.serve_with_incoming(stream).await?;
231 }
232
233 ControlDialAddress::Tcp { host, port } => {
234 let address = format!("{}:{}", host, port);
235 server.serve(SocketAddr::from_str(&address)?).await?;
236 }
237
238 ControlDialAddress::Tls {
239 host,
240 port,
241 insecure: _,
242 } => {
243 let address = format!("{}:{}", host, port);
244 server.serve(SocketAddr::from_str(&address)?).await?;
245 }
246 }
247 Ok(())
248 }
249}
250
251impl Drop for Daemon {
252 fn drop(&mut self) {
253 self.zone_reconciler_task.abort();
254 self.generator_task.abort();
255 }
256}
257
258fn detect_zone_path(store: &str, name: &str) -> Result<PathBuf> {
259 let mut path = PathBuf::from(format!("{}/zone/{}", store, name));
260 if path.is_file() {
261 return Ok(path);
262 }
263
264 path = PathBuf::from(format!("/usr/share/krata/zone/{}", name));
265 if path.is_file() {
266 return Ok(path);
267 }
268 Err(anyhow!("unable to find required zone file: {}", name))
269}