atuin_daemon/daemon.rs
1//! Core daemon infrastructure.
2//!
3//! This module provides the foundational types for building the atuin daemon:
4//!
5//! - [`DaemonState`]: Shared state owned by the daemon
6//! - [`DaemonHandle`]: A lightweight, cloneable handle for accessing daemon state
7//! - [`Component`]: A trait for implementing daemon components
8//! - [`Daemon`]: The main daemon orchestrator
9//! - [`DaemonBuilder`]: Builder for constructing and configuring the daemon
10
11use std::sync::Arc;
12
13use atuin_client::{
14 database::Sqlite as HistoryDatabase, encryption, record::sqlite_store::SqliteStore,
15 settings::Settings,
16};
17use eyre::{Context, Result};
18use tokio::sync::{RwLock, broadcast};
19
20use crate::events::DaemonEvent;
21
22// ============================================================================
23// DaemonState
24// ============================================================================
25
26/// Shared state owned by the daemon.
27///
28/// This contains all the resources that components and services need access to.
29/// The state is wrapped in an `Arc` and accessed via [`DaemonHandle`].
30pub struct DaemonState {
31 // Event bus
32 event_tx: broadcast::Sender<DaemonEvent>,
33
34 // Configuration (mutable - can be reloaded)
35 settings: RwLock<Settings>,
36
37 // Encryption key (immutable - derived at startup)
38 encryption_key: [u8; 32],
39
40 // Database handles
41 history_db: HistoryDatabase,
42 store: SqliteStore,
43}
44
45// ============================================================================
46// DaemonHandle
47// ============================================================================
48
49/// A lightweight handle to the daemon's shared state.
50///
51/// This is the primary way for components, gRPC services, and spawned tasks to
52/// interact with the daemon. It provides access to:
53///
54/// - Event emission and subscription
55/// - Configuration (settings, encryption key)
56/// - Database handles
57///
58/// The handle is cheaply cloneable (wraps an `Arc`) and can be freely passed
59/// around to any code that needs daemon access.
60///
61/// # Example
62///
63/// ```ignore
64/// // Emit an event
65/// handle.emit(DaemonEvent::HistoryPruned);
66///
67/// // Access settings
68/// let settings = handle.settings().await;
69/// let sync_freq = settings.daemon.sync_frequency;
70///
71/// // Access database
72/// let history = handle.history_db().load(id).await?;
73/// ```
74#[derive(Clone)]
75pub struct DaemonHandle {
76 state: Arc<DaemonState>,
77}
78
79impl DaemonHandle {
80 // ---- Events ----
81
82 /// Emit an event to the daemon's event bus.
83 ///
84 /// This is fire-and-forget - if no receivers are listening (which shouldn't
85 /// happen in normal operation), the event is dropped silently.
86 pub fn emit(&self, event: DaemonEvent) {
87 if let Err(e) = self.state.event_tx.send(event) {
88 tracing::warn!("failed to emit event (no receivers?): {e}");
89 }
90 }
91
92 /// Subscribe to the event bus.
93 ///
94 /// Returns a receiver that will receive all events emitted after this call.
95 /// Useful for components that need to listen for events outside of the
96 /// normal `handle_event` callback flow.
97 pub fn subscribe(&self) -> broadcast::Receiver<DaemonEvent> {
98 self.state.event_tx.subscribe()
99 }
100
101 /// Request graceful shutdown of the daemon.
102 pub fn shutdown(&self) {
103 self.emit(DaemonEvent::ShutdownRequested);
104 }
105
106 // ---- Configuration ----
107
108 /// Get the current settings.
109 ///
110 /// This acquires a read lock on the settings. For most use cases, clone
111 /// the settings if you need to hold onto them.
112 pub async fn settings(&self) -> tokio::sync::RwLockReadGuard<'_, Settings> {
113 self.state.settings.read().await
114 }
115
116 /// Reload settings from disk and emit a SettingsReloaded event.
117 ///
118 /// Components listening for `SettingsReloaded` can then re-read settings
119 /// via `handle.settings()` to pick up the changes.
120 pub async fn reload_settings(&self) -> Result<()> {
121 let new_settings = Settings::new()?;
122 self.apply_settings(new_settings).await;
123 Ok(())
124 }
125
126 /// Apply already-loaded settings and emit a SettingsReloaded event.
127 ///
128 /// Use this when settings have already been loaded (e.g., from a file watcher)
129 /// to avoid parsing the config file twice.
130 pub async fn apply_settings(&self, settings: Settings) {
131 *self.state.settings.write().await = settings;
132 self.emit(DaemonEvent::SettingsReloaded);
133 tracing::info!("settings applied");
134 }
135
136 /// Get the encryption key.
137 pub fn encryption_key(&self) -> &[u8; 32] {
138 &self.state.encryption_key
139 }
140
141 // ---- Database ----
142
143 /// Get a reference to the history database.
144 pub fn history_db(&self) -> &HistoryDatabase {
145 &self.state.history_db
146 }
147
148 /// Get a reference to the record store.
149 pub fn store(&self) -> &SqliteStore {
150 &self.state.store
151 }
152}
153
154impl std::fmt::Debug for DaemonHandle {
155 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156 f.debug_struct("DaemonHandle").finish_non_exhaustive()
157 }
158}
159
160// ============================================================================
161// Component Trait
162// ============================================================================
163
164/// A daemon component that handles a specific domain.
165///
166/// Components are the building blocks of the daemon. Each component:
167///
168/// - Has a unique name for logging and debugging
169/// - Can optionally expose gRPC services
170/// - Receives a [`DaemonHandle`] on startup for accessing daemon resources
171/// - Handles events from the event bus
172/// - Performs cleanup on shutdown
173///
174/// # Lifecycle
175///
176/// 1. **Construction**: Component is created (usually via `new()`)
177/// 2. **Start**: `start()` is called with a [`DaemonHandle`]
178/// 3. **Running**: `handle_event()` is called for each event on the bus
179/// 4. **Shutdown**: `stop()` is called for cleanup
180///
181/// # Example
182///
183/// ```ignore
184/// pub struct MyComponent {
185/// handle: Option<DaemonHandle>,
186/// }
187///
188/// #[async_trait]
189/// impl Component for MyComponent {
190/// fn name(&self) -> &'static str { "my-component" }
191///
192/// async fn start(&mut self, handle: DaemonHandle) -> Result<()> {
193/// self.handle = Some(handle);
194/// Ok(())
195/// }
196///
197/// async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> {
198/// match event {
199/// DaemonEvent::SomeEvent => {
200/// // Handle the event
201/// if let Some(handle) = &self.handle {
202/// handle.emit(DaemonEvent::ResponseEvent);
203/// }
204/// }
205/// _ => {}
206/// }
207/// Ok(())
208/// }
209///
210/// async fn stop(&mut self) -> Result<()> {
211/// Ok(())
212/// }
213/// }
214/// ```
215#[tonic::async_trait]
216pub trait Component: Send + Sync {
217 /// Human-readable name for logging and debugging.
218 fn name(&self) -> &'static str;
219
220 /// Called once at startup.
221 ///
222 /// Store the handle if you need to emit events or access daemon resources
223 /// later. The handle is cheaply cloneable, so feel free to clone it for
224 /// spawned tasks.
225 async fn start(&mut self, handle: DaemonHandle) -> Result<()>;
226
227 /// Handle an incoming event.
228 ///
229 /// Called for every event on the bus. To emit new events in response,
230 /// use the handle stored during `start()`. Events emitted here will be
231 /// processed in subsequent event loop iterations.
232 async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()>;
233
234 /// Called on graceful shutdown.
235 ///
236 /// Use this to clean up resources, abort spawned tasks, etc.
237 async fn stop(&mut self) -> Result<()>;
238}
239
240// ============================================================================
241// Daemon
242// ============================================================================
243
244/// The main daemon orchestrator.
245///
246/// The daemon manages components, runs the event loop, and coordinates startup
247/// and shutdown. It is constructed via [`DaemonBuilder`].
248///
249/// # Event Loop
250///
251/// The daemon runs a simple event loop:
252///
253/// 1. Wait for an event on the bus
254/// 2. Dispatch the event to all components (in registration order)
255/// 3. Components may emit new events in response
256/// 4. Repeat until `ShutdownRequested` is received
257///
258/// Events emitted during handling are queued and processed in subsequent
259/// iterations, ensuring the loop eventually drains.
260pub struct Daemon {
261 components: Vec<Box<dyn Component>>,
262 handle: DaemonHandle,
263}
264
265impl Daemon {
266 /// Create a new daemon builder.
267 pub fn builder(settings: Settings) -> DaemonBuilder {
268 DaemonBuilder::new(settings)
269 }
270
271 /// Get a clone of the daemon handle.
272 ///
273 /// The handle can be used to emit events, access settings, etc.
274 pub fn handle(&self) -> DaemonHandle {
275 self.handle.clone()
276 }
277
278 /// Start all components.
279 ///
280 /// This must be called before `run_event_loop()`. It initializes all
281 /// registered components with the daemon handle.
282 pub async fn start_components(&mut self) -> Result<()> {
283 for component in &mut self.components {
284 tracing::info!(component = component.name(), "starting component");
285 component
286 .start(self.handle.clone())
287 .await
288 .with_context(|| format!("failed to start component: {}", component.name()))?;
289 }
290 Ok(())
291 }
292
293 /// Run the daemon event loop.
294 ///
295 /// This processes events until a ShutdownRequested event is received.
296 /// Components must be started first via `start_components()`.
297 pub async fn run_event_loop(&mut self) -> Result<()> {
298 let mut event_rx = self.handle.subscribe();
299 loop {
300 match event_rx.recv().await {
301 Ok(DaemonEvent::ShutdownRequested) => {
302 tracing::info!("shutdown requested, stopping daemon");
303 break;
304 }
305 Ok(event) => {
306 tracing::debug!(?event, "processing event");
307 self.dispatch_event(&event).await;
308 }
309 Err(broadcast::error::RecvError::Lagged(n)) => {
310 tracing::warn!(
311 skipped = n,
312 "event receiver lagged, some events were dropped"
313 );
314 }
315 Err(broadcast::error::RecvError::Closed) => {
316 tracing::info!("event bus closed, stopping daemon");
317 break;
318 }
319 }
320 }
321 Ok(())
322 }
323
324 /// Stop all components.
325 ///
326 /// This performs graceful shutdown of all components.
327 pub async fn stop_components(&mut self) {
328 for component in &mut self.components {
329 tracing::info!(component = component.name(), "stopping component");
330 if let Err(e) = component.stop().await {
331 tracing::error!(
332 component = component.name(),
333 error = ?e,
334 "error stopping component"
335 );
336 }
337 }
338 tracing::info!("all components stopped");
339 }
340
341 /// Run the daemon.
342 ///
343 /// This is a convenience method that starts components, runs the event loop,
344 /// and handles shutdown. It does not return until the daemon is shut down.
345 pub async fn run(mut self) -> Result<()> {
346 self.start_components().await?;
347 self.run_event_loop().await?;
348 self.stop_components().await;
349 tracing::info!("daemon stopped");
350 Ok(())
351 }
352
353 async fn dispatch_event(&mut self, event: &DaemonEvent) {
354 for component in &mut self.components {
355 if let Err(e) = component.handle_event(event).await {
356 tracing::error!(
357 component = component.name(),
358 error = ?e,
359 "error handling event"
360 );
361 }
362 }
363 }
364}
365
366// ============================================================================
367// DaemonBuilder
368// ============================================================================
369
370/// Builder for constructing a [`Daemon`].
371///
372/// # Example
373///
374/// ```ignore
375/// let daemon = Daemon::builder(settings)
376/// .store(store)
377/// .history_db(history_db)
378/// .component(HistoryComponent::new())
379/// .component(SearchComponent::new())
380/// .component(SyncComponent::new())
381/// .build()
382/// .await?;
383///
384/// daemon.run().await?;
385/// ```
386pub struct DaemonBuilder {
387 settings: Settings,
388 store: Option<SqliteStore>,
389 history_db: Option<HistoryDatabase>,
390 components: Vec<Box<dyn Component>>,
391}
392
393impl DaemonBuilder {
394 /// Create a new daemon builder with the given settings.
395 pub fn new(settings: Settings) -> Self {
396 Self {
397 settings,
398 store: None,
399 history_db: None,
400 components: Vec::new(),
401 }
402 }
403
404 /// Set the record store.
405 pub fn store(mut self, store: SqliteStore) -> Self {
406 self.store = Some(store);
407 self
408 }
409
410 /// Set the history database.
411 pub fn history_db(mut self, db: HistoryDatabase) -> Self {
412 self.history_db = Some(db);
413 self
414 }
415
416 /// Register a component.
417 ///
418 /// Components are started in registration order and stopped in reverse order.
419 pub fn component(mut self, component: impl Component + 'static) -> Self {
420 self.components.push(Box::new(component));
421 self
422 }
423
424 /// Build the daemon.
425 ///
426 /// This loads the encryption key and creates the daemon state.
427 pub async fn build(self) -> Result<Daemon> {
428 let store = self.store.ok_or_else(|| eyre::eyre!("store is required"))?;
429 let history_db = self
430 .history_db
431 .ok_or_else(|| eyre::eyre!("history_db is required"))?;
432
433 // Load encryption key
434 let encryption_key: [u8; 32] = encryption::load_key(&self.settings)
435 .context("could not load encryption key")?
436 .into();
437
438 // Create the event bus
439 let (event_tx, _) = broadcast::channel(64);
440
441 // Create the shared state
442 let state = Arc::new(DaemonState {
443 event_tx,
444 settings: RwLock::new(self.settings),
445 encryption_key,
446 history_db,
447 store,
448 });
449
450 // Create the handle (just a reference to the state)
451 let handle = DaemonHandle { state };
452
453 Ok(Daemon {
454 components: self.components,
455 handle,
456 })
457 }
458}