krata-daemon 0.0.21

Daemon for the krata isolation engine
Documentation
use std::pin::Pin;
use std::str::FromStr;

use anyhow::{anyhow, Result};
use async_stream::try_stream;
use tokio::select;
use tokio::sync::mpsc::channel;
use tokio_stream::{Stream, StreamExt};
use tonic::{Status, Streaming};
use uuid::Uuid;

use krata::v1::control::{ZoneConsoleReply, ZoneConsoleRequest};

use crate::console::DaemonConsoleHandle;
use crate::control::ApiError;

enum ConsoleDataSelect {
    Read(Option<Vec<u8>>),
    Write(Option<Result<ZoneConsoleRequest, Status>>),
}

pub struct AttachZoneConsoleRpc {
    console: DaemonConsoleHandle,
}

impl AttachZoneConsoleRpc {
    pub fn new(console: DaemonConsoleHandle) -> Self {
        Self { console }
    }

    pub async fn process(
        self,
        mut input: Streaming<ZoneConsoleRequest>,
    ) -> Result<Pin<Box<dyn Stream<Item = Result<ZoneConsoleReply, Status>> + Send + 'static>>>
    {
        let Some(request) = input.next().await else {
            return Err(anyhow!("expected to have at least one request"));
        };
        let request = request?;
        let uuid = Uuid::from_str(&request.zone_id)?;
        let (sender, mut receiver) = channel(100);
        let console = self
            .console
            .attach(uuid, sender)
            .await
            .map_err(|error| anyhow!("failed to attach to console: {}", error))?;

        let output = try_stream! {
            if request.replay_history {
                yield ZoneConsoleReply { data: console.initial.clone(), };
            }
            loop {
                let what = select! {
                    x = receiver.recv() => ConsoleDataSelect::Read(x),
                    x = input.next() => ConsoleDataSelect::Write(x),
                };

                match what {
                    ConsoleDataSelect::Read(Some(data)) => {
                        yield ZoneConsoleReply { data, };
                    },

                    ConsoleDataSelect::Read(None) => {
                        break;
                    }

                    ConsoleDataSelect::Write(Some(request)) => {
                        let request = request?;
                        if !request.data.is_empty() {
                            console.send(request.data).await.map_err(|error| ApiError {
                                message: error.to_string(),
                            })?;
                        }
                    },

                    ConsoleDataSelect::Write(None) => {
                        break;
                    }
                }
            }
        };
        Ok(Box::pin(output))
    }
}