krata-daemon 0.0.21

Daemon for the krata isolation engine
Documentation
use std::{
    collections::{hash_map::Entry, HashMap},
    path::PathBuf,
    sync::Arc,
    time::Duration,
};

use self::create::ZoneCreator;
use crate::config::DaemonConfig;
use crate::db::network::NetworkReservation;
use crate::network::assignment::NetworkAssignment;
use crate::{
    db::zone::ZoneStore,
    devices::DaemonDeviceManager,
    event::{DaemonEvent, DaemonEventContext},
    zlt::ZoneLookupTable,
};
use anyhow::Result;
use krata::v1::{
    common::{Zone, ZoneErrorStatus, ZoneExitStatus, ZoneNetworkStatus, ZoneState, ZoneStatus},
    control::ZoneChangedEvent,
};
use krataoci::packer::service::OciPackerService;
use kratart::Runtime;
use log::{error, info, trace, warn};
use tokio::{
    select,
    sync::{
        mpsc::{channel, Receiver, Sender},
        RwLock,
    },
    task::JoinHandle,
    time::sleep,
};
use uuid::Uuid;

mod create;

const PARALLEL_LIMIT: u32 = 5;

#[derive(Debug)]
enum ZoneReconcilerResult {
    Unchanged,
    Changed { rerun: bool },
}

struct ZoneReconcilerEntry {
    sender: Sender<()>,
}

#[derive(Clone)]
pub struct ZoneReconciler {
    devices: DaemonDeviceManager,
    zlt: ZoneLookupTable,
    zones: ZoneStore,
    events: DaemonEventContext,
    runtime: Runtime,
    packer: OciPackerService,
    kernel_path: PathBuf,
    initrd_path: PathBuf,
    addons_path: PathBuf,
    tasks: Arc<RwLock<HashMap<Uuid, ZoneReconcilerEntry>>>,
    zone_reconciler_notify: Sender<Uuid>,
    zone_reconcile_lock: Arc<RwLock<()>>,
    ip_assignment: NetworkAssignment,
    config: Arc<DaemonConfig>,
}

impl ZoneReconciler {
    #[allow(clippy::too_many_arguments)]
    pub fn new(
        devices: DaemonDeviceManager,
        zlt: ZoneLookupTable,
        zones: ZoneStore,
        events: DaemonEventContext,
        runtime: Runtime,
        packer: OciPackerService,
        zone_reconciler_notify: Sender<Uuid>,
        kernel_path: PathBuf,
        initrd_path: PathBuf,
        modules_path: PathBuf,
        ip_assignment: NetworkAssignment,
        config: Arc<DaemonConfig>,
    ) -> Result<Self> {
        Ok(Self {
            devices,
            zlt,
            zones,
            events,
            runtime,
            packer,
            kernel_path,
            initrd_path,
            addons_path: modules_path,
            tasks: Arc::new(RwLock::new(HashMap::new())),
            zone_reconciler_notify,
            zone_reconcile_lock: Arc::new(RwLock::with_max_readers((), PARALLEL_LIMIT)),
            ip_assignment,
            config,
        })
    }

    pub async fn launch(self, mut notify: Receiver<Uuid>) -> Result<JoinHandle<()>> {
        Ok(tokio::task::spawn(async move {
            if let Err(error) = self.reconcile_runtime(true).await {
                error!("runtime reconciler failed: {}", error);
            }

            loop {
                select! {
                    x = notify.recv() => match x {
                        None => {
                            break;
                        },

                        Some(uuid) => {
                            if let Err(error) = self.launch_task_if_needed(uuid).await {
                                error!("failed to start zone reconciler task {}: {}", uuid, error);
                            }

                            let map = self.tasks.read().await;
                            if let Some(entry) = map.get(&uuid) {
                                if let Err(error) = entry.sender.send(()).await {
                                    error!("failed to notify zone reconciler task {}: {}", uuid, error);
                                }
                            }
                        }
                    },

                    _ = sleep(Duration::from_secs(15)) => {
                        if let Err(error) = self.reconcile_runtime(false).await {
                            error!("runtime reconciler failed: {}", error);
                        }
                    }
                }
            }
        }))
    }

    pub async fn reconcile_runtime(&self, initial: bool) -> Result<()> {
        let _permit = self.zone_reconcile_lock.write().await;
        trace!("reconciling runtime");
        let runtime_zones = self.runtime.list().await?;
        let stored_zones = self.zones.list().await?;

        let non_existent_zones = runtime_zones
            .iter()
            .filter(|x| !stored_zones.iter().any(|g| *g.0 == x.uuid))
            .collect::<Vec<_>>();

        for zone in non_existent_zones {
            warn!("destroying unknown runtime zone {}", zone.uuid);
            if let Err(error) = self.runtime.destroy(zone.uuid).await {
                error!(
                    "failed to destroy unknown runtime zone {}: {}",
                    zone.uuid, error
                );
            }
            self.zones.remove(zone.uuid).await?;
        }

        let mut device_claims = HashMap::new();

        for (uuid, mut stored_zone) in stored_zones {
            let previous_zone = stored_zone.clone();
            let runtime_zone = runtime_zones.iter().find(|x| x.uuid == uuid);
            match runtime_zone {
                None => {
                    let mut status = stored_zone.status.as_mut().cloned().unwrap_or_default();
                    if status.state() == ZoneState::Created {
                        status.state = ZoneState::Creating.into();
                    }
                    stored_zone.status = Some(status);
                }

                Some(runtime) => {
                    self.zlt.associate(uuid, runtime.domid).await;
                    let mut status = stored_zone.status.as_mut().cloned().unwrap_or_default();
                    if let Some(code) = runtime.state.exit_code {
                        status.state = ZoneState::Exited.into();
                        status.exit_status = Some(ZoneExitStatus { code });
                    } else {
                        status.state = ZoneState::Created.into();
                    }

                    for device in &stored_zone
                        .spec
                        .as_ref()
                        .cloned()
                        .unwrap_or_default()
                        .devices
                    {
                        device_claims.insert(device.name.clone(), uuid);
                    }

                    if let Some(reservation) = self.ip_assignment.retrieve(uuid).await? {
                        status.network_status =
                            Some(network_reservation_to_network_status(&reservation));
                    }
                    stored_zone.status = Some(status);
                }
            }

            let changed = stored_zone != previous_zone;

            if changed || initial {
                self.zones.update(uuid, stored_zone).await?;
                let _ = self.zone_reconciler_notify.try_send(uuid);
            }
        }

        self.devices.update_claims(device_claims).await?;

        Ok(())
    }

    pub async fn reconcile(&self, uuid: Uuid) -> Result<bool> {
        let _runtime_reconcile_permit = self.zone_reconcile_lock.read().await;
        let Some(mut zone) = self.zones.read(uuid).await? else {
            warn!(
                "notified of reconcile for zone {} but it didn't exist",
                uuid
            );
            return Ok(false);
        };

        info!("reconciling zone {}", uuid);

        self.events
            .send(DaemonEvent::ZoneChanged(ZoneChangedEvent {
                zone: Some(zone.clone()),
            }))?;

        let start_state = zone.status.as_ref().map(|x| x.state()).unwrap_or_default();
        let result = match start_state {
            ZoneState::Creating => self.create(uuid, &mut zone).await,
            ZoneState::Exited => self.exited(&mut zone).await,
            ZoneState::Destroying => self.destroy(uuid, &mut zone).await,
            _ => Ok(ZoneReconcilerResult::Unchanged),
        };

        let result = match result {
            Ok(result) => result,
            Err(error) => {
                zone.status = Some(zone.status.as_mut().cloned().unwrap_or_default());
                zone.status.as_mut().unwrap().state = ZoneState::Failed.into();
                zone.status.as_mut().unwrap().error_status = Some(ZoneErrorStatus {
                    message: error.to_string(),
                });
                warn!("failed to start zone {}: {}", zone.id, error);
                ZoneReconcilerResult::Changed { rerun: false }
            }
        };

        info!("reconciled zone {}", uuid);

        let state = zone.status.as_ref().map(|x| x.state()).unwrap_or_default();
        let destroyed = state == ZoneState::Destroyed;

        let rerun = if let ZoneReconcilerResult::Changed { rerun } = result {
            let event = DaemonEvent::ZoneChanged(ZoneChangedEvent {
                zone: Some(zone.clone()),
            });

            if destroyed {
                self.zones.remove(uuid).await?;
                let mut map = self.tasks.write().await;
                map.remove(&uuid);
            } else {
                self.zones.update(uuid, zone.clone()).await?;
            }

            self.events.send(event)?;
            rerun
        } else {
            false
        };

        Ok(rerun)
    }

    async fn create(&self, uuid: Uuid, zone: &mut Zone) -> Result<ZoneReconcilerResult> {
        let starter = ZoneCreator {
            devices: &self.devices,
            kernel_path: &self.kernel_path,
            initrd_path: &self.initrd_path,
            addons_path: &self.addons_path,
            packer: &self.packer,
            network_assignment: &self.ip_assignment,
            zlt: &self.zlt,
            runtime: &self.runtime,
            config: &self.config,
        };
        starter.create(uuid, zone).await
    }

    async fn exited(&self, zone: &mut Zone) -> Result<ZoneReconcilerResult> {
        if let Some(ref mut status) = zone.status {
            status.set_state(ZoneState::Destroying);
            Ok(ZoneReconcilerResult::Changed { rerun: true })
        } else {
            Ok(ZoneReconcilerResult::Unchanged)
        }
    }

    async fn destroy(&self, uuid: Uuid, zone: &mut Zone) -> Result<ZoneReconcilerResult> {
        if let Err(error) = self.runtime.destroy(uuid).await {
            trace!("failed to destroy runtime zone {}: {}", uuid, error);
        }

        let domid = zone.status.as_ref().map(|x| x.domid);

        if let Some(domid) = domid {
            self.zlt.remove(uuid, domid).await;
        }

        info!("destroyed zone {}", uuid);
        self.ip_assignment.recall(uuid).await?;
        zone.status = Some(ZoneStatus {
            state: ZoneState::Destroyed.into(),
            network_status: None,
            exit_status: None,
            error_status: None,
            resource_status: None,
            host: self.zlt.host_uuid().to_string(),
            domid: domid.unwrap_or(u32::MAX),
        });
        self.devices.release_all(uuid).await?;
        Ok(ZoneReconcilerResult::Changed { rerun: false })
    }

    async fn launch_task_if_needed(&self, uuid: Uuid) -> Result<()> {
        let mut map = self.tasks.write().await;
        match map.entry(uuid) {
            Entry::Occupied(_) => {}
            Entry::Vacant(entry) => {
                entry.insert(self.launch_task(uuid).await?);
            }
        }
        Ok(())
    }

    async fn launch_task(&self, uuid: Uuid) -> Result<ZoneReconcilerEntry> {
        let this = self.clone();
        let (sender, mut receiver) = channel(10);
        tokio::task::spawn(async move {
            'notify_loop: loop {
                if receiver.recv().await.is_none() {
                    break 'notify_loop;
                }

                'rerun_loop: loop {
                    let rerun = match this.reconcile(uuid).await {
                        Ok(rerun) => rerun,
                        Err(error) => {
                            error!("failed to reconcile zone {}: {}", uuid, error);
                            false
                        }
                    };

                    if rerun {
                        continue 'rerun_loop;
                    }
                    break 'rerun_loop;
                }
            }
        });
        Ok(ZoneReconcilerEntry { sender })
    }
}

pub fn network_reservation_to_network_status(ip: &NetworkReservation) -> ZoneNetworkStatus {
    ZoneNetworkStatus {
        zone_ipv4: format!("{}/{}", ip.ipv4, ip.ipv4_prefix),
        zone_ipv6: format!("{}/{}", ip.ipv6, ip.ipv6_prefix),
        zone_mac: ip.mac.to_string().to_lowercase().replace('-', ":"),
        gateway_ipv4: format!("{}/{}", ip.gateway_ipv4, ip.ipv4_prefix),
        gateway_ipv6: format!("{}/{}", ip.gateway_ipv6, ip.ipv6_prefix),
        gateway_mac: ip.gateway_mac.to_string().to_lowercase().replace('-', ":"),
    }
}