1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
use std::{net::SocketAddr, path::PathBuf, str::FromStr};

use anyhow::Result;
use console::{DaemonConsole, DaemonConsoleHandle};
use control::RuntimeControlService;
use db::GuestStore;
use event::{DaemonEventContext, DaemonEventGenerator};
use idm::{DaemonIdm, DaemonIdmHandle};
use krata::{dial::ControlDialAddress, v1::control::control_service_server::ControlServiceServer};
use kratart::Runtime;
use log::info;
use reconcile::guest::GuestReconciler;
use tokio::{
    net::UnixListener,
    sync::mpsc::{channel, Sender},
    task::JoinHandle,
};
use tokio_stream::wrappers::UnixListenerStream;
use tonic::transport::{Identity, Server, ServerTlsConfig};
use uuid::Uuid;

pub mod console;
pub mod control;
pub mod db;
pub mod event;
pub mod idm;
pub mod reconcile;

pub struct Daemon {
    store: String,
    guests: GuestStore,
    events: DaemonEventContext,
    guest_reconciler_task: JoinHandle<()>,
    guest_reconciler_notify: Sender<Uuid>,
    generator_task: JoinHandle<()>,
    _idm: DaemonIdmHandle,
    console: DaemonConsoleHandle,
}

const GUEST_RECONCILER_QUEUE_LEN: usize = 1000;

impl Daemon {
    pub async fn new(store: String, runtime: Runtime) -> Result<Self> {
        let guests_db_path = format!("{}/guests.db", store);
        let guests = GuestStore::open(&PathBuf::from(guests_db_path))?;
        let (guest_reconciler_notify, guest_reconciler_receiver) =
            channel::<Uuid>(GUEST_RECONCILER_QUEUE_LEN);
        let idm = DaemonIdm::new().await?;
        let idm = idm.launch().await?;
        let console = DaemonConsole::new().await?;
        let console = console.launch().await?;
        let (events, generator) =
            DaemonEventGenerator::new(guests.clone(), guest_reconciler_notify.clone(), idm.clone())
                .await?;
        let runtime_for_reconciler = runtime.dupe().await?;
        let guest_reconciler = GuestReconciler::new(
            guests.clone(),
            events.clone(),
            runtime_for_reconciler,
            guest_reconciler_notify.clone(),
        )?;

        let guest_reconciler_task = guest_reconciler.launch(guest_reconciler_receiver).await?;
        let generator_task = generator.launch().await?;
        Ok(Self {
            store,
            guests,
            events,
            guest_reconciler_task,
            guest_reconciler_notify,
            generator_task,
            _idm: idm,
            console,
        })
    }

    pub async fn listen(&mut self, addr: ControlDialAddress) -> Result<()> {
        let control_service = RuntimeControlService::new(
            self.events.clone(),
            self.console.clone(),
            self.guests.clone(),
            self.guest_reconciler_notify.clone(),
        );

        let mut server = Server::builder();

        if let ControlDialAddress::Tls {
            host: _,
            port: _,
            insecure,
        } = &addr
        {
            let mut tls_config = ServerTlsConfig::new();
            if !insecure {
                let certificate_path = format!("{}/tls/daemon.pem", self.store);
                let key_path = format!("{}/tls/daemon.key", self.store);
                tls_config = tls_config.identity(Identity::from_pem(certificate_path, key_path));
            }
            server = server.tls_config(tls_config)?;
        }

        let server = server.add_service(ControlServiceServer::new(control_service));
        info!("listening on address {}", addr);
        match addr {
            ControlDialAddress::UnixSocket { path } => {
                let path = PathBuf::from(path);
                if path.exists() {
                    tokio::fs::remove_file(&path).await?;
                }
                let listener = UnixListener::bind(path)?;
                let stream = UnixListenerStream::new(listener);
                server.serve_with_incoming(stream).await?;
            }

            ControlDialAddress::Tcp { host, port } => {
                let address = format!("{}:{}", host, port);
                server.serve(SocketAddr::from_str(&address)?).await?;
            }

            ControlDialAddress::Tls {
                host,
                port,
                insecure: _,
            } => {
                let address = format!("{}:{}", host, port);
                server.serve(SocketAddr::from_str(&address)?).await?;
            }
        }
        Ok(())
    }
}

impl Drop for Daemon {
    fn drop(&mut self) {
        self.guest_reconciler_task.abort();
        self.generator_task.abort();
    }
}