Skip to main content

atuin_daemon/components/
history.rs

1//! History component.
2//!
3//! Handles command history lifecycle (start/end) and provides the History gRPC service.
4
5use std::{pin::Pin, sync::Arc};
6
7use atuin_client::{
8    database::Database,
9    history::{History, HistoryId, store::HistoryStore},
10    settings::Settings,
11};
12use dashmap::DashMap;
13use eyre::Result;
14use time::OffsetDateTime;
15use tokio_stream::Stream;
16use tonic::{Request, Response, Status};
17use tracing::{Level, instrument};
18
19use crate::{
20    daemon::{Component, DaemonHandle},
21    events::DaemonEvent,
22    history::{
23        EndHistoryReply, EndHistoryRequest, HistoryEntry, HistoryEventKind, ShutdownReply,
24        ShutdownRequest, StartHistoryReply, StartHistoryRequest, StatusReply, StatusRequest,
25        TailHistoryReply, TailHistoryRequest,
26        history_server::{History as HistorySvc, HistoryServer},
27    },
28};
29
30const DAEMON_PROTOCOL_VERSION: u32 = 1;
31
32/// History component - manages command history lifecycle.
33///
34/// This component:
35/// - Tracks currently running commands (stored in memory)
36/// - Saves completed commands to the database and record store
37/// - Emits history events for other components (e.g., search indexing)
38/// - Provides the History gRPC service
39pub struct HistoryComponent {
40    inner: Arc<HistoryComponentInner>,
41}
42
43struct HistoryComponentInner {
44    /// Commands currently running (not yet completed).
45    running: DashMap<HistoryId, History>,
46
47    /// Handle to the daemon (set during start).
48    handle: tokio::sync::RwLock<Option<DaemonHandle>>,
49
50    /// History store for pushing records (set during start).
51    history_store: tokio::sync::RwLock<Option<HistoryStore>>,
52}
53
54impl HistoryComponent {
55    /// Create a new history component.
56    pub fn new() -> Self {
57        Self {
58            inner: Arc::new(HistoryComponentInner {
59                running: DashMap::new(),
60                handle: tokio::sync::RwLock::new(None),
61                history_store: tokio::sync::RwLock::new(None),
62            }),
63        }
64    }
65
66    /// Get the gRPC service for this component.
67    ///
68    /// This returns a tonic service that can be added to a gRPC server.
69    pub fn grpc_service(&self) -> HistoryServer<HistoryGrpcService> {
70        HistoryServer::new(HistoryGrpcService {
71            inner: self.inner.clone(),
72        })
73    }
74}
75
76impl Default for HistoryComponent {
77    fn default() -> Self {
78        Self::new()
79    }
80}
81
82#[tonic::async_trait]
83impl Component for HistoryComponent {
84    fn name(&self) -> &'static str {
85        "history"
86    }
87
88    async fn start(&mut self, handle: DaemonHandle) -> Result<()> {
89        // Create the history store
90        let host_id = Settings::host_id().await?;
91        let history_store =
92            HistoryStore::new(handle.store().clone(), host_id, *handle.encryption_key());
93
94        *self.inner.history_store.write().await = Some(history_store);
95        *self.inner.handle.write().await = Some(handle);
96
97        tracing::info!("history component started");
98        Ok(())
99    }
100
101    async fn handle_event(&mut self, _event: &DaemonEvent) -> Result<()> {
102        // History component produces events but doesn't need to react to them
103        Ok(())
104    }
105
106    async fn stop(&mut self) -> Result<()> {
107        tracing::info!("history component stopped");
108        Ok(())
109    }
110}
111
112/// The gRPC service implementation.
113///
114/// This is a thin wrapper that delegates to the component's shared state.
115pub struct HistoryGrpcService {
116    inner: Arc<HistoryComponentInner>,
117}
118
119fn history_to_tail_reply(kind: HistoryEventKind, history: History) -> TailHistoryReply {
120    TailHistoryReply {
121        kind: kind as i32,
122        history: Some(HistoryEntry {
123            timestamp: history.timestamp.unix_timestamp_nanos() as u64,
124            id: history.id.0,
125            command: history.command,
126            cwd: history.cwd,
127            session: history.session,
128            hostname: history.hostname,
129            author: history.author,
130            intent: history.intent.unwrap_or_default(),
131            exit: history.exit,
132            duration: history.duration,
133        }),
134    }
135}
136
137#[tonic::async_trait]
138impl HistorySvc for HistoryGrpcService {
139    type TailHistoryStream = Pin<Box<dyn Stream<Item = Result<TailHistoryReply, Status>> + Send>>;
140
141    #[instrument(skip_all, level = Level::INFO)]
142    async fn start_history(
143        &self,
144        request: Request<StartHistoryRequest>,
145    ) -> Result<Response<StartHistoryReply>, Status> {
146        let req = request.into_inner();
147
148        let timestamp =
149            OffsetDateTime::from_unix_timestamp_nanos(req.timestamp as i128).map_err(|_| {
150                Status::invalid_argument(
151                    "failed to parse timestamp as unix time (expected nanos since epoch)",
152                )
153            })?;
154
155        let h: History = History::daemon()
156            .timestamp(timestamp)
157            .command(req.command)
158            .cwd(req.cwd)
159            .session(req.session)
160            .hostname(req.hostname)
161            .author(req.author)
162            .intent(req.intent)
163            .build()
164            .into();
165
166        // Emit the event
167        if let Some(handle) = self.inner.handle.read().await.as_ref() {
168            handle.emit(DaemonEvent::HistoryStarted(h.clone()));
169        }
170
171        let id = h.id.clone();
172        tracing::info!(id = id.to_string(), "start history");
173        self.inner.running.insert(id.clone(), h);
174
175        let reply = StartHistoryReply {
176            id: id.to_string(),
177            version: env!("CARGO_PKG_VERSION").to_string(),
178            protocol: DAEMON_PROTOCOL_VERSION,
179        };
180
181        Ok(Response::new(reply))
182    }
183
184    #[instrument(skip_all, level = Level::INFO)]
185    async fn end_history(
186        &self,
187        request: Request<EndHistoryRequest>,
188    ) -> Result<Response<EndHistoryReply>, Status> {
189        let req = request.into_inner();
190        let id = HistoryId(req.id);
191
192        if let Some((_, mut history)) = self.inner.running.remove(&id) {
193            history.exit = req.exit;
194            history.duration = match req.duration {
195                0 => i64::try_from(
196                    (OffsetDateTime::now_utc() - history.timestamp).whole_nanoseconds(),
197                )
198                .expect("failed to convert calculated duration to i64"),
199                value => i64::try_from(value).expect("failed to get i64 duration"),
200            };
201
202            // Get the handle and store to save the history
203            let handle_guard = self.inner.handle.read().await;
204            let handle = handle_guard
205                .as_ref()
206                .ok_or_else(|| Status::internal("component not initialized"))?;
207
208            let store_guard = self.inner.history_store.read().await;
209            let history_store = store_guard
210                .as_ref()
211                .ok_or_else(|| Status::internal("component not initialized"))?;
212
213            // Save to database
214            handle
215                .history_db()
216                .save(&history)
217                .await
218                .map_err(|e| Status::internal(format!("failed to write to db: {e:?}")))?;
219
220            tracing::info!(
221                id = id.0.to_string(),
222                duration = history.duration,
223                "end history"
224            );
225
226            // Push to record store
227            let (record_id, idx) = history_store
228                .push(history.clone())
229                .await
230                .map_err(|e| Status::internal(format!("failed to push record to store: {e:?}")))?;
231
232            // Emit the event
233            handle.emit(DaemonEvent::HistoryEnded(history));
234
235            let reply = EndHistoryReply {
236                id: record_id.0.to_string(),
237                idx,
238                version: env!("CARGO_PKG_VERSION").to_string(),
239                protocol: DAEMON_PROTOCOL_VERSION,
240            };
241
242            return Ok(Response::new(reply));
243        }
244
245        Err(Status::not_found(format!(
246            "could not find history with id: {id}"
247        )))
248    }
249
250    #[instrument(skip_all, level = Level::INFO)]
251    async fn tail_history(
252        &self,
253        _request: Request<TailHistoryRequest>,
254    ) -> Result<Response<Self::TailHistoryStream>, Status> {
255        let handle_guard = self.inner.handle.read().await;
256        let handle = handle_guard
257            .as_ref()
258            .cloned()
259            .ok_or_else(|| Status::internal("component not initialized"))?;
260
261        let mut rx = handle.subscribe();
262        let (tx, out_rx) = tokio::sync::mpsc::channel::<Result<TailHistoryReply, Status>>(128);
263
264        tokio::spawn(async move {
265            loop {
266                let event = match rx.recv().await {
267                    Ok(event) => event,
268                    Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
269                        let _ = tx
270                            .send(Err(Status::resource_exhausted(format!(
271                                "tail stream lagged behind and dropped {skipped} events"
272                            ))))
273                            .await;
274                        break;
275                    }
276                    Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
277                };
278
279                let reply = match event {
280                    DaemonEvent::HistoryStarted(history) => {
281                        Some(history_to_tail_reply(HistoryEventKind::Started, history))
282                    }
283                    DaemonEvent::HistoryEnded(history) => {
284                        Some(history_to_tail_reply(HistoryEventKind::Ended, history))
285                    }
286                    _ => None,
287                };
288
289                if let Some(reply) = reply
290                    && tx.send(Ok(reply)).await.is_err()
291                {
292                    break;
293                }
294            }
295        });
296
297        let stream = tokio_stream::wrappers::ReceiverStream::new(out_rx);
298        Ok(Response::new(Box::pin(stream)))
299    }
300
301    #[instrument(skip_all, level = Level::INFO)]
302    async fn status(
303        &self,
304        _request: Request<StatusRequest>,
305    ) -> Result<Response<StatusReply>, Status> {
306        let reply = StatusReply {
307            healthy: true,
308            version: env!("CARGO_PKG_VERSION").to_string(),
309            pid: std::process::id(),
310            protocol: DAEMON_PROTOCOL_VERSION,
311        };
312
313        Ok(Response::new(reply))
314    }
315
316    #[instrument(skip_all, level = Level::INFO)]
317    async fn shutdown(
318        &self,
319        _request: Request<ShutdownRequest>,
320    ) -> Result<Response<ShutdownReply>, Status> {
321        // Use the daemon handle to request shutdown
322        if let Some(handle) = self.inner.handle.read().await.as_ref() {
323            handle.shutdown();
324        }
325        Ok(Response::new(ShutdownReply { accepted: true }))
326    }
327}