Skip to main content

atuin_daemon/
daemon.rs

1//! Core daemon infrastructure.
2//!
3//! This module provides the foundational types for building the atuin daemon:
4//!
5//! - [`DaemonState`]: Shared state owned by the daemon
6//! - [`DaemonHandle`]: A lightweight, cloneable handle for accessing daemon state
7//! - [`Component`]: A trait for implementing daemon components
8//! - [`Daemon`]: The main daemon orchestrator
9//! - [`DaemonBuilder`]: Builder for constructing and configuring the daemon
10
11use std::sync::Arc;
12
13use atuin_client::{
14    database::Sqlite as HistoryDatabase, encryption, record::sqlite_store::SqliteStore,
15    settings::Settings,
16};
17use eyre::{Context, Result};
18use tokio::sync::{RwLock, broadcast};
19
20use crate::events::DaemonEvent;
21
22// ============================================================================
23// DaemonState
24// ============================================================================
25
26/// Shared state owned by the daemon.
27///
28/// This contains all the resources that components and services need access to.
29/// The state is wrapped in an `Arc` and accessed via [`DaemonHandle`].
30pub struct DaemonState {
31    // Event bus
32    event_tx: broadcast::Sender<DaemonEvent>,
33
34    // Configuration (mutable - can be reloaded)
35    settings: RwLock<Settings>,
36
37    // Encryption key (immutable - derived at startup)
38    encryption_key: [u8; 32],
39
40    // Database handles
41    history_db: HistoryDatabase,
42    store: SqliteStore,
43}
44
45// ============================================================================
46// DaemonHandle
47// ============================================================================
48
49/// A lightweight handle to the daemon's shared state.
50///
51/// This is the primary way for components, gRPC services, and spawned tasks to
52/// interact with the daemon. It provides access to:
53///
54/// - Event emission and subscription
55/// - Configuration (settings, encryption key)
56/// - Database handles
57///
58/// The handle is cheaply cloneable (wraps an `Arc`) and can be freely passed
59/// around to any code that needs daemon access.
60///
61/// # Example
62///
63/// ```ignore
64/// // Emit an event
65/// handle.emit(DaemonEvent::HistoryPruned);
66///
67/// // Access settings
68/// let settings = handle.settings().await;
69/// let sync_freq = settings.daemon.sync_frequency;
70///
71/// // Access database
72/// let history = handle.history_db().load(id).await?;
73/// ```
74#[derive(Clone)]
75pub struct DaemonHandle {
76    state: Arc<DaemonState>,
77}
78
79impl DaemonHandle {
80    // ---- Events ----
81
82    /// Emit an event to the daemon's event bus.
83    ///
84    /// This is fire-and-forget - if no receivers are listening (which shouldn't
85    /// happen in normal operation), the event is dropped silently.
86    pub fn emit(&self, event: DaemonEvent) {
87        if let Err(e) = self.state.event_tx.send(event) {
88            tracing::warn!("failed to emit event (no receivers?): {e}");
89        }
90    }
91
92    /// Subscribe to the event bus.
93    ///
94    /// Returns a receiver that will receive all events emitted after this call.
95    /// Useful for components that need to listen for events outside of the
96    /// normal `handle_event` callback flow.
97    pub fn subscribe(&self) -> broadcast::Receiver<DaemonEvent> {
98        self.state.event_tx.subscribe()
99    }
100
101    /// Request graceful shutdown of the daemon.
102    pub fn shutdown(&self) {
103        self.emit(DaemonEvent::ShutdownRequested);
104    }
105
106    // ---- Configuration ----
107
108    /// Get the current settings.
109    ///
110    /// This acquires a read lock on the settings. For most use cases, clone
111    /// the settings if you need to hold onto them.
112    pub async fn settings(&self) -> tokio::sync::RwLockReadGuard<'_, Settings> {
113        self.state.settings.read().await
114    }
115
116    /// Reload settings from disk and emit a SettingsReloaded event.
117    ///
118    /// Components listening for `SettingsReloaded` can then re-read settings
119    /// via `handle.settings()` to pick up the changes.
120    pub async fn reload_settings(&self) -> Result<()> {
121        let new_settings = Settings::new()?;
122        self.apply_settings(new_settings).await;
123        Ok(())
124    }
125
126    /// Apply already-loaded settings and emit a SettingsReloaded event.
127    ///
128    /// Use this when settings have already been loaded (e.g., from a file watcher)
129    /// to avoid parsing the config file twice.
130    pub async fn apply_settings(&self, settings: Settings) {
131        *self.state.settings.write().await = settings;
132        self.emit(DaemonEvent::SettingsReloaded);
133        tracing::info!("settings applied");
134    }
135
136    /// Get the encryption key.
137    pub fn encryption_key(&self) -> &[u8; 32] {
138        &self.state.encryption_key
139    }
140
141    // ---- Database ----
142
143    /// Get a reference to the history database.
144    pub fn history_db(&self) -> &HistoryDatabase {
145        &self.state.history_db
146    }
147
148    /// Get a reference to the record store.
149    pub fn store(&self) -> &SqliteStore {
150        &self.state.store
151    }
152}
153
154impl std::fmt::Debug for DaemonHandle {
155    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156        f.debug_struct("DaemonHandle").finish_non_exhaustive()
157    }
158}
159
160// ============================================================================
161// Component Trait
162// ============================================================================
163
164/// A daemon component that handles a specific domain.
165///
166/// Components are the building blocks of the daemon. Each component:
167///
168/// - Has a unique name for logging and debugging
169/// - Can optionally expose gRPC services
170/// - Receives a [`DaemonHandle`] on startup for accessing daemon resources
171/// - Handles events from the event bus
172/// - Performs cleanup on shutdown
173///
174/// # Lifecycle
175///
176/// 1. **Construction**: Component is created (usually via `new()`)
177/// 2. **Start**: `start()` is called with a [`DaemonHandle`]
178/// 3. **Running**: `handle_event()` is called for each event on the bus
179/// 4. **Shutdown**: `stop()` is called for cleanup
180///
181/// # Example
182///
183/// ```ignore
184/// pub struct MyComponent {
185///     handle: Option<DaemonHandle>,
186/// }
187///
188/// #[async_trait]
189/// impl Component for MyComponent {
190///     fn name(&self) -> &'static str { "my-component" }
191///
192///     async fn start(&mut self, handle: DaemonHandle) -> Result<()> {
193///         self.handle = Some(handle);
194///         Ok(())
195///     }
196///
197///     async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> {
198///         match event {
199///             DaemonEvent::SomeEvent => {
200///                 // Handle the event
201///                 if let Some(handle) = &self.handle {
202///                     handle.emit(DaemonEvent::ResponseEvent);
203///                 }
204///             }
205///             _ => {}
206///         }
207///         Ok(())
208///     }
209///
210///     async fn stop(&mut self) -> Result<()> {
211///         Ok(())
212///     }
213/// }
214/// ```
215#[tonic::async_trait]
216pub trait Component: Send + Sync {
217    /// Human-readable name for logging and debugging.
218    fn name(&self) -> &'static str;
219
220    /// Called once at startup.
221    ///
222    /// Store the handle if you need to emit events or access daemon resources
223    /// later. The handle is cheaply cloneable, so feel free to clone it for
224    /// spawned tasks.
225    async fn start(&mut self, handle: DaemonHandle) -> Result<()>;
226
227    /// Handle an incoming event.
228    ///
229    /// Called for every event on the bus. To emit new events in response,
230    /// use the handle stored during `start()`. Events emitted here will be
231    /// processed in subsequent event loop iterations.
232    async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()>;
233
234    /// Called on graceful shutdown.
235    ///
236    /// Use this to clean up resources, abort spawned tasks, etc.
237    async fn stop(&mut self) -> Result<()>;
238}
239
240// ============================================================================
241// Daemon
242// ============================================================================
243
244/// The main daemon orchestrator.
245///
246/// The daemon manages components, runs the event loop, and coordinates startup
247/// and shutdown. It is constructed via [`DaemonBuilder`].
248///
249/// # Event Loop
250///
251/// The daemon runs a simple event loop:
252///
253/// 1. Wait for an event on the bus
254/// 2. Dispatch the event to all components (in registration order)
255/// 3. Components may emit new events in response
256/// 4. Repeat until `ShutdownRequested` is received
257///
258/// Events emitted during handling are queued and processed in subsequent
259/// iterations, ensuring the loop eventually drains.
260pub struct Daemon {
261    components: Vec<Box<dyn Component>>,
262    handle: DaemonHandle,
263}
264
265impl Daemon {
266    /// Create a new daemon builder.
267    pub fn builder(settings: Settings) -> DaemonBuilder {
268        DaemonBuilder::new(settings)
269    }
270
271    /// Get a clone of the daemon handle.
272    ///
273    /// The handle can be used to emit events, access settings, etc.
274    pub fn handle(&self) -> DaemonHandle {
275        self.handle.clone()
276    }
277
278    /// Start all components.
279    ///
280    /// This must be called before `run_event_loop()`. It initializes all
281    /// registered components with the daemon handle.
282    pub async fn start_components(&mut self) -> Result<()> {
283        for component in &mut self.components {
284            tracing::info!(component = component.name(), "starting component");
285            component
286                .start(self.handle.clone())
287                .await
288                .with_context(|| format!("failed to start component: {}", component.name()))?;
289        }
290        Ok(())
291    }
292
293    /// Run the daemon event loop.
294    ///
295    /// This processes events until a ShutdownRequested event is received.
296    /// Components must be started first via `start_components()`.
297    pub async fn run_event_loop(&mut self) -> Result<()> {
298        let mut event_rx = self.handle.subscribe();
299        loop {
300            match event_rx.recv().await {
301                Ok(DaemonEvent::ShutdownRequested) => {
302                    tracing::info!("shutdown requested, stopping daemon");
303                    break;
304                }
305                Ok(event) => {
306                    tracing::debug!(?event, "processing event");
307                    self.dispatch_event(&event).await;
308                }
309                Err(broadcast::error::RecvError::Lagged(n)) => {
310                    tracing::warn!(
311                        skipped = n,
312                        "event receiver lagged, some events were dropped"
313                    );
314                }
315                Err(broadcast::error::RecvError::Closed) => {
316                    tracing::info!("event bus closed, stopping daemon");
317                    break;
318                }
319            }
320        }
321        Ok(())
322    }
323
324    /// Stop all components.
325    ///
326    /// This performs graceful shutdown of all components.
327    pub async fn stop_components(&mut self) {
328        for component in &mut self.components {
329            tracing::info!(component = component.name(), "stopping component");
330            if let Err(e) = component.stop().await {
331                tracing::error!(
332                    component = component.name(),
333                    error = ?e,
334                    "error stopping component"
335                );
336            }
337        }
338        tracing::info!("all components stopped");
339    }
340
341    /// Run the daemon.
342    ///
343    /// This is a convenience method that starts components, runs the event loop,
344    /// and handles shutdown. It does not return until the daemon is shut down.
345    pub async fn run(mut self) -> Result<()> {
346        self.start_components().await?;
347        self.run_event_loop().await?;
348        self.stop_components().await;
349        tracing::info!("daemon stopped");
350        Ok(())
351    }
352
353    async fn dispatch_event(&mut self, event: &DaemonEvent) {
354        for component in &mut self.components {
355            if let Err(e) = component.handle_event(event).await {
356                tracing::error!(
357                    component = component.name(),
358                    error = ?e,
359                    "error handling event"
360                );
361            }
362        }
363    }
364}
365
366// ============================================================================
367// DaemonBuilder
368// ============================================================================
369
370/// Builder for constructing a [`Daemon`].
371///
372/// # Example
373///
374/// ```ignore
375/// let daemon = Daemon::builder(settings)
376///     .store(store)
377///     .history_db(history_db)
378///     .component(HistoryComponent::new())
379///     .component(SearchComponent::new())
380///     .component(SyncComponent::new())
381///     .build()
382///     .await?;
383///
384/// daemon.run().await?;
385/// ```
386pub struct DaemonBuilder {
387    settings: Settings,
388    store: Option<SqliteStore>,
389    history_db: Option<HistoryDatabase>,
390    components: Vec<Box<dyn Component>>,
391}
392
393impl DaemonBuilder {
394    /// Create a new daemon builder with the given settings.
395    pub fn new(settings: Settings) -> Self {
396        Self {
397            settings,
398            store: None,
399            history_db: None,
400            components: Vec::new(),
401        }
402    }
403
404    /// Set the record store.
405    pub fn store(mut self, store: SqliteStore) -> Self {
406        self.store = Some(store);
407        self
408    }
409
410    /// Set the history database.
411    pub fn history_db(mut self, db: HistoryDatabase) -> Self {
412        self.history_db = Some(db);
413        self
414    }
415
416    /// Register a component.
417    ///
418    /// Components are started in registration order and stopped in reverse order.
419    pub fn component(mut self, component: impl Component + 'static) -> Self {
420        self.components.push(Box::new(component));
421        self
422    }
423
424    /// Build the daemon.
425    ///
426    /// This loads the encryption key and creates the daemon state.
427    pub async fn build(self) -> Result<Daemon> {
428        let store = self.store.ok_or_else(|| eyre::eyre!("store is required"))?;
429        let history_db = self
430            .history_db
431            .ok_or_else(|| eyre::eyre!("history_db is required"))?;
432
433        // Load encryption key
434        let encryption_key: [u8; 32] = encryption::load_key(&self.settings)
435            .context("could not load encryption key")?
436            .into();
437
438        // Create the event bus
439        let (event_tx, _) = broadcast::channel(64);
440
441        // Create the shared state
442        let state = Arc::new(DaemonState {
443            event_tx,
444            settings: RwLock::new(self.settings),
445            encryption_key,
446            history_db,
447            store,
448        });
449
450        // Create the handle (just a reference to the state)
451        let handle = DaemonHandle { state };
452
453        Ok(Daemon {
454            components: self.components,
455            handle,
456        })
457    }
458}