1use std::{io, path::Path, sync::Arc};
2
3use anyhow::{Context, Result};
4use aranya_crypto::{
5 dangerous::spideroak_crypto::{import::Import, keys::SecretKey},
6 default::DefaultEngine,
7 keystore::{fs_keystore::Store, KeyStore},
8 Engine, Rng,
9};
10use aranya_keygen::{PublicKeyBundle, PublicKeys};
11use aranya_runtime::{
12 storage::linear::{libc::FileManager, LinearStorageProvider},
13 ClientState, GraphId,
14};
15use aranya_util::{ready, Addr};
16use buggy::{bug, Bug, BugExt};
17use ciborium as cbor;
18use serde::{de::DeserializeOwned, Serialize};
19use tokio::{fs, sync::mpsc, task::JoinSet};
20use tracing::{error, info, info_span, Instrument as _};
21
22#[cfg(feature = "afc")]
23use crate::afc::Afc;
24use crate::{
25 api::{self, ApiKey, DaemonApiServer, DaemonApiServerArgs, QSData},
26 aranya,
27 config::{Config, Toggle},
28 keystore::{AranyaStore, LocalStore},
29 policy,
30 sync::{
31 quic::{PskStore, QuicState, SyncParams},
32 SyncHandle, SyncManager,
33 },
34 util::{load_team_psk_pairs, SeedDir},
35 vm_policy::{PolicyEngine, POLICY_SOURCE},
36};
37
38pub(crate) type CE = DefaultEngine;
41pub(crate) type CS = <DefaultEngine as Engine>::CS;
43pub(crate) type KS = Store;
45pub(crate) type PS = PolicyEngine<CE, KS>;
47pub(crate) type SP = LinearStorageProvider<FileManager>;
49pub(crate) type EF = policy::Effect;
51
52pub(crate) type Client = aranya::Client<PS, SP>;
53pub(crate) type SyncServer = crate::sync::quic::Server<PS, SP>;
54
55#[clippy::has_significant_drop]
59#[derive(Debug)]
60pub struct DaemonHandle {
61 set: JoinSet<()>,
62}
63
64impl DaemonHandle {
65 pub async fn join(mut self) -> Result<(), Bug> {
69 match self.set.join_next().await.assume("set not empty")? {
70 Ok(()) => {}
71 Err(err) if err.is_panic() => std::panic::resume_unwind(err.into_panic()),
72 Err(err) => {
73 error!(error = %err, "tasks unexpectedly cancelled");
74 bug!("tasks cancelled");
75 }
76 }
77 self.set.abort_all();
78 Ok(())
79 }
80}
81
82#[derive(Debug)]
84pub struct Daemon {
85 sync_server: SyncServer,
86 manager: SyncManager<QuicState, PS, SP, EF>,
87 api: DaemonApiServer,
88 span: tracing::Span,
89}
90
91impl Daemon {
92 pub async fn load(cfg: Config) -> Result<Self> {
94 let name = (!cfg.name.is_empty()).then_some(cfg.name.as_str());
95 let span = info_span!("daemon", name);
96 let span_id = span.id();
97
98 async move {
99 let Toggle::Enabled(qs_config) = &cfg.sync.quic else {
101 anyhow::bail!("Supply a valid QUIC sync config")
102 };
103 let qs_client_addr = match qs_config.client_addr {
104 None => Addr::new(qs_config.addr.host(), 0)?,
105 Some(v) => v,
106 };
107
108 Self::setup_env(&cfg).await?;
109 let mut aranya_store = Self::load_aranya_keystore(&cfg).await?;
110 let eng = Self::load_crypto_engine(&cfg).await?;
111 let pks = Self::load_or_gen_public_keys(&cfg, &eng, &mut aranya_store).await?;
112
113 let mut local_store = Self::load_local_keystore(&cfg).await?;
114
115 let api_sk = ApiKey::generate(&eng);
117 aranya_util::write_file(cfg.api_pk_path(), &api_sk.public()?.encode()?)
118 .await
119 .context("unable to write API public key")?;
120 info!(path = %cfg.api_pk_path().display(), "wrote API public key");
121
122 let seed_id_dir = SeedDir::new(cfg.seed_id_path().to_path_buf()).await?;
124 let initial_keys = load_team_psk_pairs(&eng, &mut local_store, &seed_id_dir).await?;
125 let psk_store = Arc::new(PskStore::new(initial_keys));
126
127 let (client, sync_server, manager, syncer, recv_effects) = Self::setup_aranya(
129 &cfg,
130 eng.clone(),
131 aranya_store
132 .try_clone()
133 .context("unable to clone keystore")?,
134 &pks,
135 SyncParams {
136 psk_store: Arc::clone(&psk_store),
137 server_addr: qs_config.addr,
138 },
139 qs_client_addr,
140 )
141 .await?;
142
143 #[cfg(feature = "afc")]
144 let afc = {
145 let Toggle::Enabled(afc_cfg) = &cfg.afc else {
146 anyhow::bail!(
147 "AFC is currently required, set `afc.enable = true` in daemon config."
148 )
149 };
150 Afc::new(
151 client.clone(),
152 eng.clone(),
153 pks.ident_pk.id()?,
154 aranya_store
155 .try_clone()
156 .context("unable to clone keystore")?,
157 afc_cfg.clone(),
158 )?
159 };
160
161 let data = QSData { psk_store };
162
163 let crypto = api::Crypto {
164 engine: eng,
165 local_store,
166 aranya_store,
167 };
168
169 let api = DaemonApiServer::new(DaemonApiServerArgs {
170 client,
171 local_addr: sync_server.local_addr(),
172 uds_path: cfg.uds_api_sock(),
173 sk: api_sk,
174 pk: pks,
175 syncer,
176 recv_effects,
177 #[cfg(feature = "afc")]
178 afc,
179 crypto,
180 seed_id_dir,
181 quic: Some(data),
182 })?;
183 Ok(Self {
184 sync_server,
185 manager,
186 api,
187 span,
188 })
189 }
190 .instrument(info_span!(parent: span_id, "load"))
191 .await
192 }
193
194 pub async fn spawn(self) -> Result<DaemonHandle, ready::WaitError> {
196 let _guard = self.span.enter();
197 let mut set = JoinSet::new();
198 let waiter = ready::Waiter::new(3);
199 set.spawn(
200 self.sync_server
201 .serve(waiter.notifier())
202 .instrument(info_span!("sync-server")),
203 );
204 set.spawn({
205 self.manager
206 .run(waiter.notifier())
207 .instrument(info_span!("syncer"))
208 });
209 set.spawn(
210 self.api
211 .serve(waiter.notifier())
212 .instrument(info_span!("api-server")),
213 );
214 waiter.wait().await?;
215 Ok(DaemonHandle { set })
216 }
217
218 async fn setup_env(cfg: &Config) -> Result<()> {
220 for dir in &[
222 &cfg.runtime_dir,
223 &cfg.state_dir,
224 &cfg.cache_dir,
225 &cfg.logs_dir,
226 &cfg.config_dir,
227 ] {
228 if !dir.try_exists()? {
229 return Err(anyhow::anyhow!(
230 "directory does not exist: {}",
231 dir.display()
232 ));
233 }
234 }
235
236 for (name, path) in [
238 ("keystore", cfg.keystore_path()),
239 ("storage", cfg.storage_path()),
240 ] {
241 aranya_util::create_dir_all(&path)
242 .await
243 .with_context(|| format!("unable to create '{name}' directory"))?;
244 }
245 info!("created directories");
246
247 let uds_api_sock = cfg.uds_api_sock();
250 if let Err(err) = fs::remove_file(&uds_api_sock).await {
251 if err.kind() != io::ErrorKind::NotFound {
252 return Err(err).context(format!("unable to remove api socket {uds_api_sock:?}"));
253 }
254 }
255
256 info!("set up environment");
257 Ok(())
258 }
259
260 async fn setup_aranya(
262 cfg: &Config,
263 eng: CE,
264 store: AranyaStore<KS>,
265 pk: &PublicKeys<CS>,
266 SyncParams {
267 psk_store,
268 server_addr,
269 }: SyncParams,
270 client_addr: Addr,
271 ) -> Result<(
272 Client,
273 SyncServer,
274 SyncManager<QuicState, PS, SP, EF>,
275 SyncHandle,
276 mpsc::Receiver<(GraphId, Vec<EF>)>,
277 )> {
278 let device_id = pk.ident_pk.id()?;
279
280 let client = Client::new(ClientState::new(
281 PS::new(POLICY_SOURCE, eng, store, device_id)?,
282 SP::new(
283 FileManager::new(cfg.storage_path()).context("unable to create `FileManager`")?,
284 ),
285 ));
286
287 let (send_effects, recv_effects) = mpsc::channel(256);
289
290 let (server, peers, conns, syncer_recv) =
292 SyncServer::new(client.clone(), &server_addr, Arc::clone(&psk_store))
293 .await
294 .context("unable to initialize QUIC sync server")?;
295
296 let syncer = SyncManager::new(
298 client.clone(),
299 send_effects,
300 psk_store,
301 (server.local_addr().into(), client_addr),
302 syncer_recv,
303 conns,
304 )?;
305
306 Ok((client, server, syncer, peers, recv_effects))
307 }
308
309 async fn load_crypto_engine(cfg: &Config) -> Result<CE> {
311 let key = load_or_gen_key(cfg.key_wrap_key_path()).await?;
312 Ok(CE::new(&key, Rng))
313 }
314
315 async fn load_aranya_keystore(cfg: &Config) -> Result<AranyaStore<KS>> {
319 let dir = cfg.aranya_keystore_path();
320 aranya_util::create_dir_all(&dir).await?;
321 KS::open(&dir)
322 .context("unable to open Aranya keystore")
323 .map(AranyaStore::new)
324 }
325
326 async fn load_local_keystore(cfg: &Config) -> Result<LocalStore<KS>> {
331 let dir = cfg.local_keystore_path();
332 aranya_util::create_dir_all(&dir).await?;
333 KS::open(&dir)
334 .context("unable to open local keystore")
335 .map(LocalStore::new)
336 }
337
338 async fn load_or_gen_public_keys<CE, KS>(
340 cfg: &Config,
341 eng: &CE,
342 store: &mut AranyaStore<KS>,
343 ) -> Result<PublicKeys<CE::CS>>
344 where
345 CE: Engine,
346 KS: KeyStore,
347 {
348 let path = cfg.public_key_bundle_path();
349 let bundle = match try_read_cbor(&path).await? {
350 Some(bundle) => bundle,
351 None => {
352 let bundle = PublicKeyBundle::generate(eng, store)
353 .context("unable to generate key bundle")?;
354 info!("generated key bundle");
355 write_cbor(&path, &bundle)
356 .await
357 .context("unable to write `PublicKeyBundle` to disk")?;
358 bundle
359 }
360 };
361 bundle.public_keys(eng, store)
362 }
363}
364
365async fn try_read_cbor<T: DeserializeOwned>(path: impl AsRef<Path>) -> Result<Option<T>> {
367 match fs::read(path.as_ref()).await {
368 Ok(buf) => Ok(cbor::from_reader(&buf[..])?),
369 Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None),
370 Err(err) => Err(err.into()),
371 }
372}
373
374async fn write_cbor(path: impl AsRef<Path>, data: impl Serialize) -> Result<()> {
376 let mut buf = Vec::new();
377 cbor::into_writer(&data, &mut buf)?;
378 Ok(aranya_util::write_file(path, &buf).await?)
379}
380
381async fn load_or_gen_key<K: SecretKey>(path: impl AsRef<Path>) -> Result<K> {
383 async fn load_or_gen_key_inner<K: SecretKey>(path: &Path) -> Result<K> {
384 match fs::read(&path).await {
385 Ok(buf) => {
386 tracing::info!("loading key");
387 let key =
388 Import::import(buf.as_slice()).context("unable to import key from file")?;
389 Ok(key)
390 }
391 Err(err) if err.kind() == io::ErrorKind::NotFound => {
392 tracing::info!("generating key");
393 let key = K::random(Rng);
394 let bytes = key
395 .try_export_secret()
396 .context("unable to export new key")?;
397 aranya_util::write_file(&path, bytes.as_bytes())
398 .await
399 .context("unable to write key")?;
400 Ok(key)
401 }
402 Err(err) => Err(err).context("unable to read key"),
403 }
404 }
405 let path = path.as_ref();
406 load_or_gen_key_inner(path)
407 .instrument(info_span!("load_or_gen_key", ?path))
408 .await
409 .with_context(|| format!("load_or_gen_key({path:?})"))
410}
411
412#[cfg(test)]
413mod tests {
414 #![allow(
415 clippy::arithmetic_side_effects,
416 clippy::expect_used,
417 clippy::panic,
418 clippy::indexing_slicing
419 )]
420
421 use std::time::Duration;
422
423 use aranya_util::Addr;
424 use tempfile::tempdir;
425 use test_log::test;
426 use tokio::time;
427
428 use super::*;
429 use crate::config::{QuicSyncConfig, SyncConfig, Toggle};
430
431 #[test(tokio::test)]
433 async fn test_daemon_run() {
434 let dir = tempdir().expect("should be able to create temp dir");
435 let work_dir = dir.path().join("work");
436
437 #[cfg(feature = "afc")]
438 let shm_path = {
439 let path = "/test_daemon_run\0"
440 .try_into()
441 .expect("should be able to parse AFC shared memory path");
442 let _ = aranya_fast_channels::shm::unlink(&path);
443 path
444 };
445
446 let any = Addr::new("localhost", 0).expect("should be able to create new Addr");
447 let cfg = Config {
448 name: "test-daemon-run".into(),
449 runtime_dir: work_dir.join("run"),
450 state_dir: work_dir.join("state"),
451 cache_dir: work_dir.join("cache"),
452 logs_dir: work_dir.join("logs"),
453 config_dir: work_dir.join("config"),
454 sync: SyncConfig {
455 quic: Toggle::Enabled(QuicSyncConfig {
456 addr: any,
457 client_addr: None,
458 }),
459 },
460 #[cfg(feature = "afc")]
461 afc: Toggle::Enabled(crate::config::AfcConfig {
462 shm_path,
463 max_chans: 100,
464 }),
465 };
466 for dir in [
467 &cfg.runtime_dir,
468 &cfg.state_dir,
469 &cfg.cache_dir,
470 &cfg.logs_dir,
471 &cfg.config_dir,
472 ] {
473 aranya_util::create_dir_all(dir)
474 .await
475 .expect("should be able to create directory");
476 }
477
478 let daemon = Daemon::load(cfg)
479 .await
480 .expect("should be able to load `Daemon`");
481
482 time::timeout(
483 Duration::from_secs(1),
484 daemon.spawn().await.expect("startup").join(),
485 )
486 .await
487 .expect_err("`Timeout` should return Elapsed");
488 }
489}