1use std::sync::{Arc, Mutex};
23
24use chrono::{DateTime, Utc};
25use zero_commands::{
26 ReplayEvent, ReplayKind, SessionError as CmdSessionError, SessionSource, SessionSummary,
27};
28use zero_session::{EventKind as SessionKind, SessionError, SessionRow, Store, StoredEvent};
29
30use crate::app::log::{EntryKind, LogEntry};
31
32#[derive(Debug, Default, Clone)]
37struct ActiveSession {
38 row_id: Option<i64>,
39 ulid: Option<String>,
40}
41
42#[derive(Clone)]
45pub struct SessionSink {
46 store: Arc<Store>,
47 active: Arc<Mutex<ActiveSession>>,
48}
49
50impl std::fmt::Debug for SessionSink {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 let active = self.active.lock().unwrap();
53 f.debug_struct("SessionSink")
54 .field("row_id", &active.row_id)
55 .field("ulid", &active.ulid)
56 .finish_non_exhaustive()
57 }
58}
59
60impl SessionSink {
61 #[must_use]
62 pub fn new(store: Arc<Store>, session_id: i64, ulid: String) -> Self {
63 Self {
64 store,
65 active: Arc::new(Mutex::new(ActiveSession {
66 row_id: Some(session_id),
67 ulid: Some(ulid),
68 })),
69 }
70 }
71
72 #[must_use]
75 pub fn adapter(&self) -> SessionAdapter {
76 SessionAdapter {
77 store: Arc::clone(&self.store),
78 active: Arc::clone(&self.active),
79 }
80 }
81
82 pub fn record(&self, entry: &LogEntry) {
85 let Some(session_id) = self.active.lock().unwrap().row_id else {
86 return;
87 };
88 let kind = to_session_kind(entry.kind);
89 if let Err(e) = self.store.append(session_id, kind, &entry.text) {
90 tracing::warn!(err = %e, "session append failed");
91 }
92 }
93
94 pub fn end(&self) {
99 if let Some(session_id) = self.active.lock().unwrap().row_id
100 && let Err(e) = self.store.end_session(session_id)
101 {
102 tracing::warn!(err = %e, "session end failed");
103 }
104 }
105
106 #[must_use]
114 pub fn store(&self) -> &Store {
115 &self.store
116 }
117
118 #[must_use]
124 pub fn session_id(&self) -> Option<i64> {
125 self.active.lock().unwrap().row_id
126 }
127
128 #[must_use]
131 pub fn ulid(&self) -> Option<String> {
132 self.active.lock().unwrap().ulid.clone()
133 }
134}
135
136#[derive(Clone)]
141pub struct SessionAdapter {
142 store: Arc<Store>,
143 active: Arc<Mutex<ActiveSession>>,
144}
145
146impl std::fmt::Debug for SessionAdapter {
147 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148 f.debug_struct("SessionAdapter")
149 .field("active_ulid", &self.active.lock().unwrap().ulid)
150 .finish_non_exhaustive()
151 }
152}
153
154impl SessionAdapter {
155 fn resolve_needle(&self, needle: &str) -> Result<Option<SessionRow>, SessionError> {
164 if let Some(row) = self.store.get_session_by_ulid(needle)? {
165 return Ok(Some(row));
166 }
167 if needle.len() >= 6 {
168 let rows = self.store.list_sessions(1000)?;
169 if let Some(hit) = rows.into_iter().find(|r| r.ulid.starts_with(needle)) {
170 return Ok(Some(hit));
171 }
172 }
173 let key = label_key(needle);
174 if let Some(ulid) = self.store.get_milestone(&key)?
175 && let Some(row) = self.store.get_session_by_ulid(&ulid)?
176 {
177 return Ok(Some(row));
178 }
179 Ok(None)
180 }
181}
182
183impl SessionSource for SessionAdapter {
184 fn current_ulid(&self) -> Option<String> {
185 self.active.lock().unwrap().ulid.clone()
186 }
187
188 fn list(&self, limit: u32) -> Result<Vec<SessionSummary>, CmdSessionError> {
189 let rows = self.store.list_sessions(limit).map_err(io_err)?;
190 let mut out = Vec::with_capacity(rows.len());
191 for row in rows {
192 let n_events = self.store.count_events(row.id).map_err(io_err)?;
196 out.push(row_to_summary(row, n_events));
197 }
198 Ok(out)
199 }
200
201 fn find(&self, needle: &str) -> Result<SessionSummary, CmdSessionError> {
202 let row = self
203 .resolve_needle(needle)
204 .map_err(io_err)?
205 .ok_or(CmdSessionError::NotFound)?;
206 let n_events = self.store.count_events(row.id).map_err(io_err)?;
207 Ok(row_to_summary(row, n_events))
208 }
209
210 fn list_events(&self, ulid: &str, limit: u32) -> Result<Vec<ReplayEvent>, CmdSessionError> {
211 let row = self
212 .store
213 .get_session_by_ulid(ulid)
214 .map_err(io_err)?
215 .ok_or(CmdSessionError::NotFound)?;
216 let events = self.store.list_events(row.id, limit).map_err(io_err)?;
217 Ok(events.into_iter().map(stored_to_replay).collect())
218 }
219
220 fn save_label(&self, ulid: &str, label: &str) -> Result<(), CmdSessionError> {
221 let trimmed = label.trim();
224 if trimmed.is_empty() {
225 return Err(CmdSessionError::Io("empty label".into()));
226 }
227 self.store
228 .set_milestone(&label_key(trimmed), ulid)
229 .map_err(io_err)
230 }
231
232 fn fork_from_current(&self) -> Result<Option<String>, CmdSessionError> {
233 let parent = self.active.lock().unwrap().ulid.clone();
234 let Some(parent_ulid) = parent else {
235 return Ok(None);
236 };
237 let new_ulid = new_ulid();
245 let new_row_id = self
246 .store
247 .start_session(
248 &new_ulid,
249 None,
250 env!("CARGO_PKG_VERSION"),
251 Some(&parent_ulid),
252 )
253 .map_err(io_err)?;
254 let mut g = self.active.lock().unwrap();
255 g.row_id = Some(new_row_id);
256 g.ulid = Some(new_ulid.clone());
257 Ok(Some(new_ulid))
258 }
259}
260
261#[must_use]
265pub fn to_entry_kind(k: SessionKind) -> EntryKind {
266 match k {
267 SessionKind::Prompt => EntryKind::Prompt,
268 SessionKind::System | SessionKind::ModeChange => EntryKind::System,
269 SessionKind::Command => EntryKind::Command,
270 SessionKind::Warn => EntryKind::Warn,
271 SessionKind::Alert => EntryKind::Alert,
272 }
273}
274
275fn to_session_kind(k: EntryKind) -> SessionKind {
276 match k {
277 EntryKind::Prompt => SessionKind::Prompt,
278 EntryKind::System => SessionKind::System,
279 EntryKind::Command => SessionKind::Command,
280 EntryKind::Warn => SessionKind::Warn,
281 EntryKind::Alert => SessionKind::Alert,
282 }
283}
284
285#[must_use]
288pub fn replay(events: &[StoredEvent]) -> Vec<LogEntry> {
289 events
290 .iter()
291 .map(|e| LogEntry::new(to_entry_kind(e.kind), &e.text).at(e.at))
292 .collect()
293}
294
295#[must_use]
297pub fn summarize(row: &SessionRow, n_events: usize) -> String {
298 let ts = row.started_at.format("%Y-%m-%d %H:%M UTC");
299 let status = if row.ended_at.is_some() {
300 "ended"
301 } else {
302 "interrupted"
303 };
304 format!("resuming: {ts} · {status} · {n_events} prior event(s)")
305}
306
307fn row_to_summary(row: SessionRow, n_events: i64) -> SessionSummary {
308 SessionSummary {
309 ulid: row.ulid,
310 started_at_ms: row.started_at.timestamp_millis(),
311 ended_at_ms: row.ended_at.map(|dt| dt.timestamp_millis()),
312 engine_base_url: row.engine_base_url,
313 cli_version: row.cli_version,
314 parent_ulid: row.parent_ulid,
315 n_events,
316 }
317}
318
319fn stored_to_replay(e: StoredEvent) -> ReplayEvent {
320 ReplayEvent {
321 kind: stored_kind_to_replay(e.kind),
322 at_ms: e.at.timestamp_millis(),
323 text: e.text,
324 }
325}
326
327fn stored_kind_to_replay(k: SessionKind) -> ReplayKind {
328 match k {
329 SessionKind::Prompt => ReplayKind::Prompt,
330 SessionKind::System | SessionKind::ModeChange => ReplayKind::System,
331 SessionKind::Command => ReplayKind::Command,
332 SessionKind::Warn => ReplayKind::Warn,
333 SessionKind::Alert => ReplayKind::Alert,
334 }
335}
336
337#[allow(clippy::needless_pass_by_value)]
342fn io_err(e: SessionError) -> CmdSessionError {
343 CmdSessionError::Io(e.to_string())
344}
345
346fn label_key(label: &str) -> String {
347 format!("session.label.{label}")
350}
351
352fn new_ulid() -> String {
357 use std::time::{SystemTime, UNIX_EPOCH};
358 let ms = SystemTime::now()
359 .duration_since(UNIX_EPOCH)
360 .map(|d| d.as_millis())
361 .unwrap_or(0);
362 let rand = fastrand_hex(6);
363 format!("{ms:013x}{rand}")
364}
365
366fn fastrand_hex(n: usize) -> String {
367 use std::time::{SystemTime, UNIX_EPOCH};
368 let mut state: u64 = SystemTime::now()
369 .duration_since(UNIX_EPOCH)
370 .map_or(0x9E37_79B9_7F4A_7C15, |d| {
371 u64::try_from(d.as_nanos()).unwrap_or(0x9E37_79B9_7F4A_7C15)
372 });
373 (0..n)
374 .map(|_| {
375 state = state
376 .wrapping_mul(6_364_136_223_846_793_005)
377 .wrapping_add(1_442_695_040_888_963_407);
378 char::from_digit(u32::try_from((state >> 60) & 0xF).unwrap_or(0), 16).unwrap_or('0')
379 })
380 .collect()
381}
382
383#[allow(dead_code)]
387fn _nudge(_: DateTime<Utc>) {}
388
389#[cfg(test)]
390mod tests {
391 use super::*;
392
393 #[test]
394 fn adapter_current_ulid_tracks_active_session() {
395 let store = Arc::new(Store::open_in_memory().unwrap());
396 let id = store.start_session("01HX", None, "0.3.0", None).unwrap();
397 let sink = SessionSink::new(Arc::clone(&store), id, "01HX".into());
398 let adapter = sink.adapter();
399 assert_eq!(adapter.current_ulid().as_deref(), Some("01HX"));
400 }
401
402 #[test]
403 fn adapter_fork_swaps_active_ulid_and_links_parent() {
404 let store = Arc::new(Store::open_in_memory().unwrap());
405 let id = store
406 .start_session("01HPARENT", None, "0.3.0", None)
407 .unwrap();
408 let sink = SessionSink::new(Arc::clone(&store), id, "01HPARENT".into());
409 let adapter = sink.adapter();
410
411 let child_ulid = adapter
412 .fork_from_current()
413 .unwrap()
414 .expect("fork produced ulid");
415 assert_eq!(adapter.current_ulid(), Some(child_ulid.clone()));
417 assert_eq!(
418 sink.active.lock().unwrap().ulid.as_deref(),
419 Some(child_ulid.as_str()),
420 "sink must see the fork under it",
421 );
422
423 let child = store.get_session_by_ulid(&child_ulid).unwrap().unwrap();
425 assert_eq!(child.parent_ulid.as_deref(), Some("01HPARENT"));
426 }
427
428 #[test]
429 fn adapter_save_label_then_find_by_label() {
430 let store = Arc::new(Store::open_in_memory().unwrap());
431 let id = store.start_session("01HLBL", None, "0.3.0", None).unwrap();
432 let sink = SessionSink::new(Arc::clone(&store), id, "01HLBL".into());
433 let adapter = sink.adapter();
434
435 adapter.save_label("01HLBL", "pre-cpi").unwrap();
436 let hit = adapter.find("pre-cpi").unwrap();
437 assert_eq!(hit.ulid, "01HLBL");
438 }
439
440 #[test]
441 fn adapter_find_missing_returns_not_found() {
442 let store = Arc::new(Store::open_in_memory().unwrap());
443 let id = store.start_session("01HX", None, "0.3.0", None).unwrap();
444 let sink = SessionSink::new(store, id, "01HX".into());
445 let adapter = sink.adapter();
446 assert!(matches!(
447 adapter.find("nope").unwrap_err(),
448 CmdSessionError::NotFound
449 ));
450 }
451
452 #[test]
453 fn adapter_save_rejects_empty_label() {
454 let store = Arc::new(Store::open_in_memory().unwrap());
455 let id = store.start_session("01HE", None, "0.3.0", None).unwrap();
456 let sink = SessionSink::new(store, id, "01HE".into());
457 let adapter = sink.adapter();
458 assert!(matches!(
459 adapter.save_label("01HE", " ").unwrap_err(),
460 CmdSessionError::Io(_)
461 ));
462 }
463}