use std::{io, path::Path, sync::Arc};
use anyhow::{Context, Result};
use aranya_crypto::{
dangerous::spideroak_crypto::{import::Import, keys::SecretKey},
default::DefaultEngine,
keystore::{fs_keystore::Store, KeyStore},
Engine, Rng,
};
use aranya_keygen::{PublicKeyBundle, PublicKeys};
use aranya_runtime::{
storage::linear::{libc::FileManager, LinearStorageProvider},
ClientState, GraphId,
};
use aranya_util::{ready, Addr};
use buggy::{bug, Bug, BugExt};
use ciborium as cbor;
use serde::{de::DeserializeOwned, Serialize};
use tokio::{fs, sync::mpsc, task::JoinSet};
use tracing::{error, info, info_span, Instrument as _};
#[cfg(feature = "afc")]
use crate::afc::Afc;
use crate::{
api::{self, ApiKey, DaemonApiServer, DaemonApiServerArgs, QSData},
aranya,
config::{Config, Toggle},
keystore::{AranyaStore, LocalStore},
policy,
sync::{
quic::{ConnectionPool, PskStore, QuicConnector, QuicListener},
SyncClient, SyncHandle, SyncManager,
},
util::{load_team_psk_pairs, SeedDir},
vm_policy::{PolicyEngine, POLICY_SOURCE},
};
pub(crate) type CE = DefaultEngine;
pub(crate) type CS = <DefaultEngine as Engine>::CS;
pub(crate) type KS = Store;
pub(crate) type PS = PolicyEngine<CE, KS>;
pub(crate) type SP = LinearStorageProvider<FileManager>;
pub(crate) type EF = policy::Effect;
pub(crate) type Client = aranya::Client<PS, SP>;
pub(crate) type SyncServer = crate::sync::SyncServer<QuicListener, PS, SP>;
#[clippy::has_significant_drop]
#[derive(Debug)]
pub struct DaemonHandle {
set: JoinSet<()>,
}
impl DaemonHandle {
pub async fn join(mut self) -> Result<(), Bug> {
match self.set.join_next().await.assume("set not empty")? {
Ok(()) => {}
Err(err) if err.is_panic() => std::panic::resume_unwind(err.into_panic()),
Err(err) => {
error!(error = %err, "tasks unexpectedly cancelled");
bug!("tasks cancelled");
}
}
self.set.abort_all();
Ok(())
}
}
#[derive(Debug)]
pub struct Daemon {
sync_server: SyncServer,
manager: SyncManager<QuicConnector, PS, SP, EF>,
api: DaemonApiServer,
span: tracing::Span,
}
impl Daemon {
pub async fn load(cfg: Config) -> Result<Self> {
let name = (!cfg.name.is_empty()).then_some(cfg.name.as_str());
let span = info_span!("daemon", name);
let span_id = span.id();
async move {
let Toggle::Enabled(qs_config) = &cfg.sync.quic else {
anyhow::bail!("Supply a valid QUIC sync config")
};
let qs_client_addr = match qs_config.client_addr {
None => Addr::new(qs_config.addr.host(), 0)?,
Some(v) => v,
};
Self::setup_env(&cfg).await?;
let mut aranya_store = Self::load_aranya_keystore(&cfg).await?;
let eng = Self::load_crypto_engine(&cfg).await?;
let pks = Self::load_or_gen_public_keys(&cfg, &eng, &mut aranya_store).await?;
let mut local_store = Self::load_local_keystore(&cfg).await?;
let api_sk = ApiKey::generate(&eng);
aranya_util::write_file(cfg.api_pk_path(), &api_sk.public()?.encode()?)
.await
.context("unable to write API public key")?;
info!(path = %cfg.api_pk_path().display(), "wrote API public key");
let seed_id_dir = SeedDir::new(cfg.seed_id_path().to_path_buf()).await?;
let initial_keys = load_team_psk_pairs(&eng, &mut local_store, &seed_id_dir).await?;
let psk_store = Arc::new(PskStore::new(initial_keys));
let (client, sync_server, manager, syncer, recv_effects) = Self::setup_aranya(
&cfg,
eng.clone(),
aranya_store
.try_clone()
.context("unable to clone keystore")?,
&pks,
Arc::clone(&psk_store),
qs_config.addr,
qs_client_addr,
)
.await?;
#[cfg(feature = "afc")]
let afc = {
let Toggle::Enabled(afc_cfg) = &cfg.afc else {
anyhow::bail!(
"AFC is currently required, set `afc.enable = true` in daemon config."
)
};
Afc::new(
client.clone(),
eng.clone(),
pks.ident_pk.id()?,
aranya_store
.try_clone()
.context("unable to clone keystore")?,
afc_cfg.clone(),
)?
};
let data = QSData { psk_store };
let crypto = api::Crypto {
engine: eng,
local_store,
aranya_store,
};
let api = DaemonApiServer::new(DaemonApiServerArgs {
client,
local_addr: sync_server.local_addr(),
uds_path: cfg.uds_api_sock(),
sk: api_sk,
pk: pks,
syncer,
recv_effects,
#[cfg(feature = "afc")]
afc,
crypto,
seed_id_dir,
quic: Some(data),
})?;
Ok(Self {
sync_server,
manager,
api,
span,
})
}
.instrument(info_span!(parent: span_id, "load"))
.await
}
pub async fn spawn(self) -> Result<DaemonHandle, ready::WaitError> {
let _guard = self.span.enter();
let mut set = JoinSet::new();
let waiter = ready::Waiter::new(3);
set.spawn(
self.sync_server
.serve(waiter.notifier())
.instrument(info_span!("sync-server")),
);
set.spawn({
self.manager
.run(waiter.notifier())
.instrument(info_span!("syncer"))
});
set.spawn(
self.api
.serve(waiter.notifier())
.instrument(info_span!("api-server")),
);
waiter.wait().await?;
Ok(DaemonHandle { set })
}
async fn setup_env(cfg: &Config) -> Result<()> {
for dir in &[
&cfg.runtime_dir,
&cfg.state_dir,
&cfg.cache_dir,
&cfg.logs_dir,
&cfg.config_dir,
] {
if !dir.try_exists()? {
return Err(anyhow::anyhow!(
"directory does not exist: {}",
dir.display()
));
}
}
for (name, path) in [
("keystore", cfg.keystore_path()),
("storage", cfg.storage_path()),
] {
aranya_util::create_dir_all(&path)
.await
.with_context(|| format!("unable to create '{name}' directory"))?;
}
info!("created directories");
let uds_api_sock = cfg.uds_api_sock();
if let Err(err) = fs::remove_file(&uds_api_sock).await {
if err.kind() != io::ErrorKind::NotFound {
return Err(err).context(format!("unable to remove api socket {uds_api_sock:?}"));
}
}
info!("set up environment");
Ok(())
}
async fn setup_aranya(
cfg: &Config,
eng: CE,
store: AranyaStore<KS>,
pk: &PublicKeys<CS>,
psk_store: Arc<PskStore>,
server_addr: Addr,
client_addr: Addr,
) -> Result<(
Client,
SyncServer,
SyncManager<QuicConnector, PS, SP, EF>,
SyncHandle,
mpsc::Receiver<(GraphId, Vec<EF>)>,
)> {
let device_id = pk.ident_pk.id()?;
let client = Client::new(ClientState::new(
PS::new(POLICY_SOURCE, eng, store, device_id)?,
SP::new(
FileManager::new(cfg.storage_path()).context("unable to create `FileManager`")?,
),
));
let (send_effects, recv_effects) = mpsc::channel(256);
let (handle, recv) = SyncHandle::channel(128);
let (connector_pool, listener_pool) = ConnectionPool::new(32).split();
let listener = QuicListener::new(server_addr, Arc::clone(&psk_store), listener_pool)
.await
.context("unable to initialize QUIC sync listener")?;
let server = SyncServer::new(listener, client.clone(), handle.clone());
let connector =
QuicConnector::new(client_addr, server.local_addr(), connector_pool, psk_store)?;
let sync_client = SyncClient::new(client.clone(), connector, send_effects);
let manager = SyncManager::new(sync_client, recv)?;
Ok((client, server, manager, handle, recv_effects))
}
async fn load_crypto_engine(cfg: &Config) -> Result<CE> {
let key = load_or_gen_key(cfg.key_wrap_key_path()).await?;
Ok(CE::new(&key, Rng))
}
async fn load_aranya_keystore(cfg: &Config) -> Result<AranyaStore<KS>> {
let dir = cfg.aranya_keystore_path();
aranya_util::create_dir_all(&dir).await?;
KS::open(&dir)
.context("unable to open Aranya keystore")
.map(AranyaStore::new)
}
async fn load_local_keystore(cfg: &Config) -> Result<LocalStore<KS>> {
let dir = cfg.local_keystore_path();
aranya_util::create_dir_all(&dir).await?;
KS::open(&dir)
.context("unable to open local keystore")
.map(LocalStore::new)
}
async fn load_or_gen_public_keys<CE, KS>(
cfg: &Config,
eng: &CE,
store: &mut AranyaStore<KS>,
) -> Result<PublicKeys<CE::CS>>
where
CE: Engine,
KS: KeyStore,
{
let path = cfg.public_key_bundle_path();
let bundle = match try_read_cbor(&path).await? {
Some(bundle) => bundle,
None => {
let bundle = PublicKeyBundle::generate(eng, store)
.context("unable to generate key bundle")?;
info!("generated key bundle");
write_cbor(&path, &bundle)
.await
.context("unable to write `PublicKeyBundle` to disk")?;
bundle
}
};
bundle.public_keys(eng, store)
}
}
async fn try_read_cbor<T: DeserializeOwned>(path: impl AsRef<Path>) -> Result<Option<T>> {
match fs::read(path.as_ref()).await {
Ok(buf) => Ok(cbor::from_reader(&buf[..])?),
Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None),
Err(err) => Err(err.into()),
}
}
async fn write_cbor(path: impl AsRef<Path>, data: impl Serialize) -> Result<()> {
let mut buf = Vec::new();
cbor::into_writer(&data, &mut buf)?;
Ok(aranya_util::write_file(path, &buf).await?)
}
async fn load_or_gen_key<K: SecretKey>(path: impl AsRef<Path>) -> Result<K> {
async fn load_or_gen_key_inner<K: SecretKey>(path: &Path) -> Result<K> {
match fs::read(&path).await {
Ok(buf) => {
tracing::info!("loading key");
let key =
Import::import(buf.as_slice()).context("unable to import key from file")?;
Ok(key)
}
Err(err) if err.kind() == io::ErrorKind::NotFound => {
tracing::info!("generating key");
let key = K::random(Rng);
let bytes = key
.try_export_secret()
.context("unable to export new key")?;
aranya_util::write_file(&path, bytes.as_bytes())
.await
.context("unable to write key")?;
Ok(key)
}
Err(err) => Err(err).context("unable to read key"),
}
}
let path = path.as_ref();
load_or_gen_key_inner(path)
.instrument(info_span!("load_or_gen_key", ?path))
.await
.with_context(|| format!("load_or_gen_key({path:?})"))
}
#[cfg(test)]
mod tests {
#![allow(
clippy::arithmetic_side_effects,
clippy::expect_used,
clippy::panic,
clippy::indexing_slicing
)]
use std::time::Duration;
use aranya_util::Addr;
use tempfile::tempdir;
use test_log::test;
use tokio::time;
use super::*;
use crate::config::{QuicSyncConfig, SyncConfig, Toggle};
#[test(tokio::test)]
async fn test_daemon_run() {
let dir = tempdir().expect("should be able to create temp dir");
let work_dir = dir.path().join("work");
#[cfg(feature = "afc")]
let shm_path = {
let path = "/test_daemon_run\0"
.try_into()
.expect("should be able to parse AFC shared memory path");
let _ = aranya_fast_channels::shm::unlink(&path);
path
};
let any = Addr::new("localhost", 0).expect("should be able to create new Addr");
let cfg = Config {
name: "test-daemon-run".into(),
runtime_dir: work_dir.join("run"),
state_dir: work_dir.join("state"),
cache_dir: work_dir.join("cache"),
logs_dir: work_dir.join("logs"),
config_dir: work_dir.join("config"),
sync: SyncConfig {
quic: Toggle::Enabled(QuicSyncConfig {
addr: any,
client_addr: None,
}),
},
#[cfg(feature = "afc")]
afc: Toggle::Enabled(crate::config::AfcConfig {
shm_path,
max_chans: 100,
}),
};
for dir in [
&cfg.runtime_dir,
&cfg.state_dir,
&cfg.cache_dir,
&cfg.logs_dir,
&cfg.config_dir,
] {
aranya_util::create_dir_all(dir)
.await
.expect("should be able to create directory");
}
let daemon = Daemon::load(cfg)
.await
.expect("should be able to load `Daemon`");
time::timeout(
Duration::from_secs(1),
daemon.spawn().await.expect("startup").join(),
)
.await
.expect_err("`Timeout` should return Elapsed");
}
}