use std::sync::Arc;
use atuin_client::{
database::Sqlite as HistoryDatabase, encryption, record::sqlite_store::SqliteStore,
settings::Settings,
};
use eyre::{Context, Result};
use tokio::sync::{RwLock, broadcast};
use crate::events::DaemonEvent;
pub struct DaemonState {
event_tx: broadcast::Sender<DaemonEvent>,
settings: RwLock<Settings>,
encryption_key: [u8; 32],
history_db: HistoryDatabase,
store: SqliteStore,
}
#[derive(Clone)]
pub struct DaemonHandle {
state: Arc<DaemonState>,
}
impl DaemonHandle {
pub fn emit(&self, event: DaemonEvent) {
if let Err(e) = self.state.event_tx.send(event) {
tracing::warn!("failed to emit event (no receivers?): {e}");
}
}
pub fn subscribe(&self) -> broadcast::Receiver<DaemonEvent> {
self.state.event_tx.subscribe()
}
pub fn shutdown(&self) {
self.emit(DaemonEvent::ShutdownRequested);
}
pub async fn settings(&self) -> tokio::sync::RwLockReadGuard<'_, Settings> {
self.state.settings.read().await
}
pub async fn reload_settings(&self) -> Result<()> {
let new_settings = Settings::new()?;
self.apply_settings(new_settings).await;
Ok(())
}
pub async fn apply_settings(&self, settings: Settings) {
*self.state.settings.write().await = settings;
self.emit(DaemonEvent::SettingsReloaded);
tracing::info!("settings applied");
}
pub fn encryption_key(&self) -> &[u8; 32] {
&self.state.encryption_key
}
pub fn history_db(&self) -> &HistoryDatabase {
&self.state.history_db
}
pub fn store(&self) -> &SqliteStore {
&self.state.store
}
}
impl std::fmt::Debug for DaemonHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DaemonHandle").finish_non_exhaustive()
}
}
#[tonic::async_trait]
pub trait Component: Send + Sync {
fn name(&self) -> &'static str;
async fn start(&mut self, handle: DaemonHandle) -> Result<()>;
async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()>;
async fn stop(&mut self) -> Result<()>;
}
pub struct Daemon {
components: Vec<Box<dyn Component>>,
handle: DaemonHandle,
}
impl Daemon {
pub fn builder(settings: Settings) -> DaemonBuilder {
DaemonBuilder::new(settings)
}
pub fn handle(&self) -> DaemonHandle {
self.handle.clone()
}
pub async fn start_components(&mut self) -> Result<()> {
for component in &mut self.components {
tracing::info!(component = component.name(), "starting component");
component
.start(self.handle.clone())
.await
.with_context(|| format!("failed to start component: {}", component.name()))?;
}
Ok(())
}
pub async fn run_event_loop(&mut self) -> Result<()> {
let mut event_rx = self.handle.subscribe();
loop {
match event_rx.recv().await {
Ok(DaemonEvent::ShutdownRequested) => {
tracing::info!("shutdown requested, stopping daemon");
break;
}
Ok(event) => {
tracing::debug!(?event, "processing event");
self.dispatch_event(&event).await;
}
Err(broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!(
skipped = n,
"event receiver lagged, some events were dropped"
);
}
Err(broadcast::error::RecvError::Closed) => {
tracing::info!("event bus closed, stopping daemon");
break;
}
}
}
Ok(())
}
pub async fn stop_components(&mut self) {
for component in &mut self.components {
tracing::info!(component = component.name(), "stopping component");
if let Err(e) = component.stop().await {
tracing::error!(
component = component.name(),
error = ?e,
"error stopping component"
);
}
}
tracing::info!("all components stopped");
}
pub async fn run(mut self) -> Result<()> {
self.start_components().await?;
self.run_event_loop().await?;
self.stop_components().await;
tracing::info!("daemon stopped");
Ok(())
}
async fn dispatch_event(&mut self, event: &DaemonEvent) {
for component in &mut self.components {
if let Err(e) = component.handle_event(event).await {
tracing::error!(
component = component.name(),
error = ?e,
"error handling event"
);
}
}
}
}
pub struct DaemonBuilder {
settings: Settings,
store: Option<SqliteStore>,
history_db: Option<HistoryDatabase>,
components: Vec<Box<dyn Component>>,
}
impl DaemonBuilder {
pub fn new(settings: Settings) -> Self {
Self {
settings,
store: None,
history_db: None,
components: Vec::new(),
}
}
pub fn store(mut self, store: SqliteStore) -> Self {
self.store = Some(store);
self
}
pub fn history_db(mut self, db: HistoryDatabase) -> Self {
self.history_db = Some(db);
self
}
pub fn component(mut self, component: impl Component + 'static) -> Self {
self.components.push(Box::new(component));
self
}
pub async fn build(self) -> Result<Daemon> {
let store = self.store.ok_or_else(|| eyre::eyre!("store is required"))?;
let history_db = self
.history_db
.ok_or_else(|| eyre::eyre!("history_db is required"))?;
let encryption_key: [u8; 32] = encryption::load_key(&self.settings)
.context("could not load encryption key")?
.into();
let (event_tx, _) = broadcast::channel(64);
let state = Arc::new(DaemonState {
event_tx,
settings: RwLock::new(self.settings),
encryption_key,
history_db,
store,
});
let handle = DaemonHandle { state };
Ok(Daemon {
components: self.components,
handle,
})
}
}