Skip to main content

aranya_daemon/
daemon.rs

1use std::{io, path::Path, sync::Arc};
2
3use anyhow::{Context, Result};
4use aranya_crypto::{
5    dangerous::spideroak_crypto::{import::Import, keys::SecretKey},
6    default::DefaultEngine,
7    keystore::{fs_keystore::Store, KeyStore},
8    Engine, Rng,
9};
10use aranya_keygen::{PublicKeyBundle, PublicKeys};
11use aranya_runtime::{
12    storage::linear::{libc::FileManager, LinearStorageProvider},
13    ClientState, GraphId,
14};
15use aranya_util::{ready, Addr};
16use buggy::{bug, Bug, BugExt};
17use ciborium as cbor;
18use serde::{de::DeserializeOwned, Serialize};
19use tokio::{fs, sync::mpsc, task::JoinSet};
20use tracing::{error, info, info_span, Instrument as _};
21
22#[cfg(feature = "afc")]
23use crate::afc::Afc;
24use crate::{
25    api::{self, ApiKey, DaemonApiServer, DaemonApiServerArgs, QSData},
26    aranya,
27    config::{Config, Toggle},
28    keystore::{AranyaStore, LocalStore},
29    policy,
30    sync::{
31        quic::{PskStore, QuicState, SyncParams},
32        SyncHandle, SyncManager,
33    },
34    util::{load_team_psk_pairs, SeedDir},
35    vm_policy::{PolicyEngine, POLICY_SOURCE},
36};
37
38// Use short names so that we can more easily add generics.
39/// CE = Crypto Engine
40pub(crate) type CE = DefaultEngine;
41/// CS = Crypto Suite
42pub(crate) type CS = <DefaultEngine as Engine>::CS;
43/// KS = Key Store
44pub(crate) type KS = Store;
45/// PS = Policy Store
46pub(crate) type PS = PolicyEngine<CE, KS>;
47/// SP = Storage Provider
48pub(crate) type SP = LinearStorageProvider<FileManager>;
49/// EF = Policy Effect
50pub(crate) type EF = policy::Effect;
51
52pub(crate) type Client = aranya::Client<PS, SP>;
53pub(crate) type SyncServer = crate::sync::quic::Server<PS, SP>;
54
55/// Handle for the spawned daemon.
56///
57/// Dropping this will abort the daemon's tasks.
58#[clippy::has_significant_drop]
59#[derive(Debug)]
60pub struct DaemonHandle {
61    set: JoinSet<()>,
62}
63
64impl DaemonHandle {
65    /// Wait for the daemon to finish.
66    ///
67    /// Panics if any of the daemon's tasks panic.
68    pub async fn join(mut self) -> Result<(), Bug> {
69        match self.set.join_next().await.assume("set not empty")? {
70            Ok(()) => {}
71            Err(err) if err.is_panic() => std::panic::resume_unwind(err.into_panic()),
72            Err(err) => {
73                error!(error = %err, "tasks unexpectedly cancelled");
74                bug!("tasks cancelled");
75            }
76        }
77        self.set.abort_all();
78        Ok(())
79    }
80}
81
82/// The daemon itself.
83#[derive(Debug)]
84pub struct Daemon {
85    sync_server: SyncServer,
86    manager: SyncManager<QuicState, PS, SP, EF>,
87    api: DaemonApiServer,
88    span: tracing::Span,
89}
90
91impl Daemon {
92    /// Loads a `Daemon` using its config.
93    pub async fn load(cfg: Config) -> Result<Self> {
94        let name = (!cfg.name.is_empty()).then_some(cfg.name.as_str());
95        let span = info_span!("daemon", name);
96        let span_id = span.id();
97
98        async move {
99            // TODO: Fix this when other syncer types are supported
100            let Toggle::Enabled(qs_config) = &cfg.sync.quic else {
101                anyhow::bail!("Supply a valid QUIC sync config")
102            };
103            let qs_client_addr = match qs_config.client_addr {
104                None => Addr::new(qs_config.addr.host(), 0)?,
105                Some(v) => v,
106            };
107
108            Self::setup_env(&cfg).await?;
109            let mut aranya_store = Self::load_aranya_keystore(&cfg).await?;
110            let eng = Self::load_crypto_engine(&cfg).await?;
111            let pks = Self::load_or_gen_public_keys(&cfg, &eng, &mut aranya_store).await?;
112
113            let mut local_store = Self::load_local_keystore(&cfg).await?;
114
115            // Generate a fresh API key at startup.
116            let api_sk = ApiKey::generate(&eng);
117            aranya_util::write_file(cfg.api_pk_path(), &api_sk.public()?.encode()?)
118                .await
119                .context("unable to write API public key")?;
120            info!(path = %cfg.api_pk_path().display(), "wrote API public key");
121
122            // Initialize the PSK store used by the syncer and sync server
123            let seed_id_dir = SeedDir::new(cfg.seed_id_path().to_path_buf()).await?;
124            let initial_keys = load_team_psk_pairs(&eng, &mut local_store, &seed_id_dir).await?;
125            let psk_store = Arc::new(PskStore::new(initial_keys));
126
127            // Initialize Aranya client, sync client,and sync server.
128            let (client, sync_server, manager, syncer, recv_effects) = Self::setup_aranya(
129                &cfg,
130                eng.clone(),
131                aranya_store
132                    .try_clone()
133                    .context("unable to clone keystore")?,
134                &pks,
135                SyncParams {
136                    psk_store: Arc::clone(&psk_store),
137                    server_addr: qs_config.addr,
138                },
139                qs_client_addr,
140            )
141            .await?;
142
143            #[cfg(feature = "afc")]
144            let afc = {
145                let Toggle::Enabled(afc_cfg) = &cfg.afc else {
146                    anyhow::bail!(
147                        "AFC is currently required, set `afc.enable = true` in daemon config."
148                    )
149                };
150                Afc::new(
151                    client.clone(),
152                    eng.clone(),
153                    pks.ident_pk.id()?,
154                    aranya_store
155                        .try_clone()
156                        .context("unable to clone keystore")?,
157                    afc_cfg.clone(),
158                )?
159            };
160
161            let data = QSData { psk_store };
162
163            let crypto = api::Crypto {
164                engine: eng,
165                local_store,
166                aranya_store,
167            };
168
169            let api = DaemonApiServer::new(DaemonApiServerArgs {
170                client,
171                local_addr: sync_server.local_addr(),
172                uds_path: cfg.uds_api_sock(),
173                sk: api_sk,
174                pk: pks,
175                syncer,
176                recv_effects,
177                #[cfg(feature = "afc")]
178                afc,
179                crypto,
180                seed_id_dir,
181                quic: Some(data),
182            })?;
183            Ok(Self {
184                sync_server,
185                manager,
186                api,
187                span,
188            })
189        }
190        .instrument(info_span!(parent: span_id, "load"))
191        .await
192    }
193
194    /// The daemon's entrypoint.
195    pub async fn spawn(self) -> Result<DaemonHandle, ready::WaitError> {
196        let _guard = self.span.enter();
197        let mut set = JoinSet::new();
198        let waiter = ready::Waiter::new(3);
199        set.spawn(
200            self.sync_server
201                .serve(waiter.notifier())
202                .instrument(info_span!("sync-server")),
203        );
204        set.spawn({
205            self.manager
206                .run(waiter.notifier())
207                .instrument(info_span!("syncer"))
208        });
209        set.spawn(
210            self.api
211                .serve(waiter.notifier())
212                .instrument(info_span!("api-server")),
213        );
214        waiter.wait().await?;
215        Ok(DaemonHandle { set })
216    }
217
218    /// Initializes the environment (creates directories, etc.).
219    async fn setup_env(cfg: &Config) -> Result<()> {
220        // These directories need to already exist.
221        for dir in &[
222            &cfg.runtime_dir,
223            &cfg.state_dir,
224            &cfg.cache_dir,
225            &cfg.logs_dir,
226            &cfg.config_dir,
227        ] {
228            if !dir.try_exists()? {
229                return Err(anyhow::anyhow!(
230                    "directory does not exist: {}",
231                    dir.display()
232                ));
233            }
234        }
235
236        // These directories aren't created for us.
237        for (name, path) in [
238            ("keystore", cfg.keystore_path()),
239            ("storage", cfg.storage_path()),
240        ] {
241            aranya_util::create_dir_all(&path)
242                .await
243                .with_context(|| format!("unable to create '{name}' directory"))?;
244        }
245        info!("created directories");
246
247        // Remove unix socket so we can re-bind after e.g. the process is killed.
248        // (We could remove it at exit but can't guarantee that will happen.)
249        let uds_api_sock = cfg.uds_api_sock();
250        if let Err(err) = fs::remove_file(&uds_api_sock).await {
251            if err.kind() != io::ErrorKind::NotFound {
252                return Err(err).context(format!("unable to remove api socket {uds_api_sock:?}"));
253            }
254        }
255
256        info!("set up environment");
257        Ok(())
258    }
259
260    /// Creates the Aranya client, sync client, and sync server.
261    async fn setup_aranya(
262        cfg: &Config,
263        eng: CE,
264        store: AranyaStore<KS>,
265        pk: &PublicKeys<CS>,
266        SyncParams {
267            psk_store,
268            server_addr,
269        }: SyncParams,
270        client_addr: Addr,
271    ) -> Result<(
272        Client,
273        SyncServer,
274        SyncManager<QuicState, PS, SP, EF>,
275        SyncHandle,
276        mpsc::Receiver<(GraphId, Vec<EF>)>,
277    )> {
278        let device_id = pk.ident_pk.id()?;
279
280        let client = Client::new(ClientState::new(
281            PS::new(POLICY_SOURCE, eng, store, device_id)?,
282            SP::new(
283                FileManager::new(cfg.storage_path()).context("unable to create `FileManager`")?,
284            ),
285        ));
286
287        // Sync in the background at some specified interval.
288        let (send_effects, recv_effects) = mpsc::channel(256);
289
290        // Create the sync server
291        let (server, peers, conns, syncer_recv) =
292            SyncServer::new(client.clone(), &server_addr, Arc::clone(&psk_store))
293                .await
294                .context("unable to initialize QUIC sync server")?;
295
296        // Initialize the syncer
297        let syncer = SyncManager::new(
298            client.clone(),
299            send_effects,
300            psk_store,
301            (server.local_addr().into(), client_addr),
302            syncer_recv,
303            conns,
304        )?;
305
306        Ok((client, server, syncer, peers, recv_effects))
307    }
308
309    /// Loads the crypto engine.
310    async fn load_crypto_engine(cfg: &Config) -> Result<CE> {
311        let key = load_or_gen_key(cfg.key_wrap_key_path()).await?;
312        Ok(CE::new(&key, Rng))
313    }
314
315    /// Loads the Aranya keystore.
316    ///
317    /// The Aranaya keystore contains Aranya's key material.
318    async fn load_aranya_keystore(cfg: &Config) -> Result<AranyaStore<KS>> {
319        let dir = cfg.aranya_keystore_path();
320        aranya_util::create_dir_all(&dir).await?;
321        KS::open(&dir)
322            .context("unable to open Aranya keystore")
323            .map(AranyaStore::new)
324    }
325
326    /// Loads the local keystore.
327    ///
328    /// The local keystore contains key material for the daemon.
329    /// E.g., its API key.
330    async fn load_local_keystore(cfg: &Config) -> Result<LocalStore<KS>> {
331        let dir = cfg.local_keystore_path();
332        aranya_util::create_dir_all(&dir).await?;
333        KS::open(&dir)
334            .context("unable to open local keystore")
335            .map(LocalStore::new)
336    }
337
338    /// Loads the daemon's [`PublicKeys`].
339    async fn load_or_gen_public_keys<CE, KS>(
340        cfg: &Config,
341        eng: &CE,
342        store: &mut AranyaStore<KS>,
343    ) -> Result<PublicKeys<CE::CS>>
344    where
345        CE: Engine,
346        KS: KeyStore,
347    {
348        let path = cfg.public_key_bundle_path();
349        let bundle = match try_read_cbor(&path).await? {
350            Some(bundle) => bundle,
351            None => {
352                let bundle = PublicKeyBundle::generate(eng, store)
353                    .context("unable to generate key bundle")?;
354                info!("generated key bundle");
355                write_cbor(&path, &bundle)
356                    .await
357                    .context("unable to write `PublicKeyBundle` to disk")?;
358                bundle
359            }
360        };
361        bundle.public_keys(eng, store)
362    }
363}
364
365/// Tries to read CBOR from `path`.
366async fn try_read_cbor<T: DeserializeOwned>(path: impl AsRef<Path>) -> Result<Option<T>> {
367    match fs::read(path.as_ref()).await {
368        Ok(buf) => Ok(cbor::from_reader(&buf[..])?),
369        Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None),
370        Err(err) => Err(err.into()),
371    }
372}
373
374/// Writes `data` as CBOR to `path`.
375async fn write_cbor(path: impl AsRef<Path>, data: impl Serialize) -> Result<()> {
376    let mut buf = Vec::new();
377    cbor::into_writer(&data, &mut buf)?;
378    Ok(aranya_util::write_file(path, &buf).await?)
379}
380
381/// Loads a key from a file or generates and writes a new one.
382async fn load_or_gen_key<K: SecretKey>(path: impl AsRef<Path>) -> Result<K> {
383    async fn load_or_gen_key_inner<K: SecretKey>(path: &Path) -> Result<K> {
384        match fs::read(&path).await {
385            Ok(buf) => {
386                tracing::info!("loading key");
387                let key =
388                    Import::import(buf.as_slice()).context("unable to import key from file")?;
389                Ok(key)
390            }
391            Err(err) if err.kind() == io::ErrorKind::NotFound => {
392                tracing::info!("generating key");
393                let key = K::random(Rng);
394                let bytes = key
395                    .try_export_secret()
396                    .context("unable to export new key")?;
397                aranya_util::write_file(&path, bytes.as_bytes())
398                    .await
399                    .context("unable to write key")?;
400                Ok(key)
401            }
402            Err(err) => Err(err).context("unable to read key"),
403        }
404    }
405    let path = path.as_ref();
406    load_or_gen_key_inner(path)
407        .instrument(info_span!("load_or_gen_key", ?path))
408        .await
409        .with_context(|| format!("load_or_gen_key({path:?})"))
410}
411
412#[cfg(test)]
413mod tests {
414    #![allow(
415        clippy::arithmetic_side_effects,
416        clippy::expect_used,
417        clippy::panic,
418        clippy::indexing_slicing
419    )]
420
421    use std::time::Duration;
422
423    use aranya_util::Addr;
424    use tempfile::tempdir;
425    use test_log::test;
426    use tokio::time;
427
428    use super::*;
429    use crate::config::{QuicSyncConfig, SyncConfig, Toggle};
430
431    /// Tests running the daemon.
432    #[test(tokio::test)]
433    async fn test_daemon_run() {
434        let dir = tempdir().expect("should be able to create temp dir");
435        let work_dir = dir.path().join("work");
436
437        #[cfg(feature = "afc")]
438        let shm_path = {
439            let path = "/test_daemon_run\0"
440                .try_into()
441                .expect("should be able to parse AFC shared memory path");
442            let _ = aranya_fast_channels::shm::unlink(&path);
443            path
444        };
445
446        let any = Addr::new("localhost", 0).expect("should be able to create new Addr");
447        let cfg = Config {
448            name: "test-daemon-run".into(),
449            runtime_dir: work_dir.join("run"),
450            state_dir: work_dir.join("state"),
451            cache_dir: work_dir.join("cache"),
452            logs_dir: work_dir.join("logs"),
453            config_dir: work_dir.join("config"),
454            sync: SyncConfig {
455                quic: Toggle::Enabled(QuicSyncConfig {
456                    addr: any,
457                    client_addr: None,
458                }),
459            },
460            #[cfg(feature = "afc")]
461            afc: Toggle::Enabled(crate::config::AfcConfig {
462                shm_path,
463                max_chans: 100,
464            }),
465        };
466        for dir in [
467            &cfg.runtime_dir,
468            &cfg.state_dir,
469            &cfg.cache_dir,
470            &cfg.logs_dir,
471            &cfg.config_dir,
472        ] {
473            aranya_util::create_dir_all(dir)
474                .await
475                .expect("should be able to create directory");
476        }
477
478        let daemon = Daemon::load(cfg)
479            .await
480            .expect("should be able to load `Daemon`");
481
482        time::timeout(
483            Duration::from_secs(1),
484            daemon.spawn().await.expect("startup").join(),
485        )
486        .await
487        .expect_err("`Timeout` should return Elapsed");
488    }
489}