1use std::{pin::Pin, sync::Arc};
6
7use atuin_client::{
8 database::Database,
9 history::{History, HistoryId, store::HistoryStore},
10 settings::Settings,
11};
12use dashmap::DashMap;
13use eyre::Result;
14use time::OffsetDateTime;
15use tokio_stream::Stream;
16use tonic::{Request, Response, Status};
17use tracing::{Level, instrument};
18
19use crate::{
20 daemon::{Component, DaemonHandle},
21 events::DaemonEvent,
22 history::{
23 EndHistoryReply, EndHistoryRequest, HistoryEntry, HistoryEventKind, ShutdownReply,
24 ShutdownRequest, StartHistoryReply, StartHistoryRequest, StatusReply, StatusRequest,
25 TailHistoryReply, TailHistoryRequest,
26 history_server::{History as HistorySvc, HistoryServer},
27 },
28};
29
30const DAEMON_PROTOCOL_VERSION: u32 = 1;
31
32pub struct HistoryComponent {
40 inner: Arc<HistoryComponentInner>,
41}
42
43struct HistoryComponentInner {
44 running: DashMap<HistoryId, History>,
46
47 handle: tokio::sync::RwLock<Option<DaemonHandle>>,
49
50 history_store: tokio::sync::RwLock<Option<HistoryStore>>,
52}
53
54impl HistoryComponent {
55 pub fn new() -> Self {
57 Self {
58 inner: Arc::new(HistoryComponentInner {
59 running: DashMap::new(),
60 handle: tokio::sync::RwLock::new(None),
61 history_store: tokio::sync::RwLock::new(None),
62 }),
63 }
64 }
65
66 pub fn grpc_service(&self) -> HistoryServer<HistoryGrpcService> {
70 HistoryServer::new(HistoryGrpcService {
71 inner: self.inner.clone(),
72 })
73 }
74}
75
76impl Default for HistoryComponent {
77 fn default() -> Self {
78 Self::new()
79 }
80}
81
82#[tonic::async_trait]
83impl Component for HistoryComponent {
84 fn name(&self) -> &'static str {
85 "history"
86 }
87
88 async fn start(&mut self, handle: DaemonHandle) -> Result<()> {
89 let host_id = Settings::host_id().await?;
91 let history_store =
92 HistoryStore::new(handle.store().clone(), host_id, *handle.encryption_key());
93
94 *self.inner.history_store.write().await = Some(history_store);
95 *self.inner.handle.write().await = Some(handle);
96
97 tracing::info!("history component started");
98 Ok(())
99 }
100
101 async fn handle_event(&mut self, _event: &DaemonEvent) -> Result<()> {
102 Ok(())
104 }
105
106 async fn stop(&mut self) -> Result<()> {
107 tracing::info!("history component stopped");
108 Ok(())
109 }
110}
111
112pub struct HistoryGrpcService {
116 inner: Arc<HistoryComponentInner>,
117}
118
119fn history_to_tail_reply(kind: HistoryEventKind, history: History) -> TailHistoryReply {
120 TailHistoryReply {
121 kind: kind as i32,
122 history: Some(HistoryEntry {
123 timestamp: history.timestamp.unix_timestamp_nanos() as u64,
124 id: history.id.0,
125 command: history.command,
126 cwd: history.cwd,
127 session: history.session,
128 hostname: history.hostname,
129 author: history.author,
130 intent: history.intent.unwrap_or_default(),
131 exit: history.exit,
132 duration: history.duration,
133 }),
134 }
135}
136
137#[tonic::async_trait]
138impl HistorySvc for HistoryGrpcService {
139 type TailHistoryStream = Pin<Box<dyn Stream<Item = Result<TailHistoryReply, Status>> + Send>>;
140
141 #[instrument(skip_all, level = Level::INFO)]
142 async fn start_history(
143 &self,
144 request: Request<StartHistoryRequest>,
145 ) -> Result<Response<StartHistoryReply>, Status> {
146 let req = request.into_inner();
147
148 let timestamp =
149 OffsetDateTime::from_unix_timestamp_nanos(req.timestamp as i128).map_err(|_| {
150 Status::invalid_argument(
151 "failed to parse timestamp as unix time (expected nanos since epoch)",
152 )
153 })?;
154
155 let h: History = History::daemon()
156 .timestamp(timestamp)
157 .command(req.command)
158 .cwd(req.cwd)
159 .session(req.session)
160 .hostname(req.hostname)
161 .author(req.author)
162 .intent(req.intent)
163 .build()
164 .into();
165
166 if let Some(handle) = self.inner.handle.read().await.as_ref() {
168 handle.emit(DaemonEvent::HistoryStarted(h.clone()));
169 }
170
171 let id = h.id.clone();
172 tracing::info!(id = id.to_string(), "start history");
173 self.inner.running.insert(id.clone(), h);
174
175 let reply = StartHistoryReply {
176 id: id.to_string(),
177 version: env!("CARGO_PKG_VERSION").to_string(),
178 protocol: DAEMON_PROTOCOL_VERSION,
179 };
180
181 Ok(Response::new(reply))
182 }
183
184 #[instrument(skip_all, level = Level::INFO)]
185 async fn end_history(
186 &self,
187 request: Request<EndHistoryRequest>,
188 ) -> Result<Response<EndHistoryReply>, Status> {
189 let req = request.into_inner();
190 let id = HistoryId(req.id);
191
192 if let Some((_, mut history)) = self.inner.running.remove(&id) {
193 history.exit = req.exit;
194 history.duration = match req.duration {
195 0 => i64::try_from(
196 (OffsetDateTime::now_utc() - history.timestamp).whole_nanoseconds(),
197 )
198 .expect("failed to convert calculated duration to i64"),
199 value => i64::try_from(value).expect("failed to get i64 duration"),
200 };
201
202 let handle_guard = self.inner.handle.read().await;
204 let handle = handle_guard
205 .as_ref()
206 .ok_or_else(|| Status::internal("component not initialized"))?;
207
208 let store_guard = self.inner.history_store.read().await;
209 let history_store = store_guard
210 .as_ref()
211 .ok_or_else(|| Status::internal("component not initialized"))?;
212
213 handle
215 .history_db()
216 .save(&history)
217 .await
218 .map_err(|e| Status::internal(format!("failed to write to db: {e:?}")))?;
219
220 tracing::info!(
221 id = id.0.to_string(),
222 duration = history.duration,
223 "end history"
224 );
225
226 let (record_id, idx) = history_store
228 .push(history.clone())
229 .await
230 .map_err(|e| Status::internal(format!("failed to push record to store: {e:?}")))?;
231
232 handle.emit(DaemonEvent::HistoryEnded(history));
234
235 let reply = EndHistoryReply {
236 id: record_id.0.to_string(),
237 idx,
238 version: env!("CARGO_PKG_VERSION").to_string(),
239 protocol: DAEMON_PROTOCOL_VERSION,
240 };
241
242 return Ok(Response::new(reply));
243 }
244
245 Err(Status::not_found(format!(
246 "could not find history with id: {id}"
247 )))
248 }
249
250 #[instrument(skip_all, level = Level::INFO)]
251 async fn tail_history(
252 &self,
253 _request: Request<TailHistoryRequest>,
254 ) -> Result<Response<Self::TailHistoryStream>, Status> {
255 let handle_guard = self.inner.handle.read().await;
256 let handle = handle_guard
257 .as_ref()
258 .cloned()
259 .ok_or_else(|| Status::internal("component not initialized"))?;
260
261 let mut rx = handle.subscribe();
262 let (tx, out_rx) = tokio::sync::mpsc::channel::<Result<TailHistoryReply, Status>>(128);
263
264 tokio::spawn(async move {
265 loop {
266 let event = match rx.recv().await {
267 Ok(event) => event,
268 Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
269 let _ = tx
270 .send(Err(Status::resource_exhausted(format!(
271 "tail stream lagged behind and dropped {skipped} events"
272 ))))
273 .await;
274 break;
275 }
276 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
277 };
278
279 let reply = match event {
280 DaemonEvent::HistoryStarted(history) => {
281 Some(history_to_tail_reply(HistoryEventKind::Started, history))
282 }
283 DaemonEvent::HistoryEnded(history) => {
284 Some(history_to_tail_reply(HistoryEventKind::Ended, history))
285 }
286 _ => None,
287 };
288
289 if let Some(reply) = reply
290 && tx.send(Ok(reply)).await.is_err()
291 {
292 break;
293 }
294 }
295 });
296
297 let stream = tokio_stream::wrappers::ReceiverStream::new(out_rx);
298 Ok(Response::new(Box::pin(stream)))
299 }
300
301 #[instrument(skip_all, level = Level::INFO)]
302 async fn status(
303 &self,
304 _request: Request<StatusRequest>,
305 ) -> Result<Response<StatusReply>, Status> {
306 let reply = StatusReply {
307 healthy: true,
308 version: env!("CARGO_PKG_VERSION").to_string(),
309 pid: std::process::id(),
310 protocol: DAEMON_PROTOCOL_VERSION,
311 };
312
313 Ok(Response::new(reply))
314 }
315
316 #[instrument(skip_all, level = Level::INFO)]
317 async fn shutdown(
318 &self,
319 _request: Request<ShutdownRequest>,
320 ) -> Result<Response<ShutdownReply>, Status> {
321 if let Some(handle) = self.inner.handle.read().await.as_ref() {
323 handle.shutdown();
324 }
325 Ok(Response::new(ShutdownReply { accepted: true }))
326 }
327}