1use std::fs;
4
5use duckdb::params;
6
7use super::atomic;
8use super::{sanitize_filename, Store};
9use crate::config::StorageMode;
10use crate::query::{CompareOp, Query, QueryComponent};
11use crate::schema::InvocationRecord;
12use crate::Result;
13
14#[derive(Debug)]
16pub struct InvocationSummary {
17 pub id: String,
18 pub cmd: String,
19 pub exit_code: i32,
20 pub timestamp: String,
21 pub duration_ms: Option<i64>,
22}
23
24impl Store {
25 pub fn write_invocation(&self, record: &InvocationRecord) -> Result<()> {
31 match self.config.storage_mode {
32 StorageMode::Parquet => self.write_invocation_parquet(record),
33 StorageMode::DuckDB => self.write_invocation_duckdb(record),
34 }
35 }
36
37 fn write_invocation_parquet(&self, record: &InvocationRecord) -> Result<()> {
39 let conn = self.connection()?;
40 let date = record.date();
41
42 let partition_dir = self.config.invocations_dir(&date);
44 fs::create_dir_all(&partition_dir)?;
45
46 let executable = record.executable.as_deref().unwrap_or("unknown");
48 let filename = format!(
49 "{}--{}--{}.parquet",
50 sanitize_filename(&record.session_id),
51 sanitize_filename(executable),
52 record.id
53 );
54 let file_path = partition_dir.join(&filename);
55
56 conn.execute_batch(
58 r#"
59 CREATE OR REPLACE TEMP TABLE temp_invocation (
60 id UUID,
61 session_id VARCHAR,
62 timestamp TIMESTAMP,
63 duration_ms BIGINT,
64 cwd VARCHAR,
65 cmd VARCHAR,
66 executable VARCHAR,
67 exit_code INTEGER,
68 format_hint VARCHAR,
69 client_id VARCHAR,
70 hostname VARCHAR,
71 username VARCHAR,
72 tag VARCHAR,
73 date DATE
74 );
75 "#,
76 )?;
77
78 conn.execute(
79 r#"
80 INSERT INTO temp_invocation VALUES (
81 ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
82 )
83 "#,
84 params![
85 record.id.to_string(),
86 record.session_id,
87 record.timestamp.to_rfc3339(),
88 record.duration_ms,
89 record.cwd,
90 record.cmd,
91 record.executable,
92 record.exit_code,
93 record.format_hint,
94 record.client_id,
95 record.hostname,
96 record.username,
97 record.tag,
98 date.to_string(),
99 ],
100 )?;
101
102 let temp_path = atomic::temp_path(&file_path);
104 conn.execute(
105 &format!(
106 "COPY temp_invocation TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
107 temp_path.display()
108 ),
109 [],
110 )?;
111 conn.execute("DROP TABLE temp_invocation", [])?;
112
113 atomic::rename_into_place(&temp_path, &file_path)?;
115
116 Ok(())
117 }
118
119 fn write_invocation_duckdb(&self, record: &InvocationRecord) -> Result<()> {
121 let conn = self.connection()?;
122 let date = record.date();
123
124 conn.execute(
125 r#"
126 INSERT INTO local.invocations VALUES (
127 ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
128 )
129 "#,
130 params![
131 record.id.to_string(),
132 record.session_id,
133 record.timestamp.to_rfc3339(),
134 record.duration_ms,
135 record.cwd,
136 record.cmd,
137 record.executable,
138 record.exit_code,
139 record.format_hint,
140 record.client_id,
141 record.hostname,
142 record.username,
143 record.tag,
144 date.to_string(),
145 ],
146 )?;
147
148 Ok(())
149 }
150
151 pub fn recent_invocations(&self, limit: usize) -> Result<Vec<InvocationSummary>> {
153 let conn = self.connection()?;
154
155 let sql = format!(
156 r#"
157 SELECT id::VARCHAR, cmd, exit_code, timestamp::VARCHAR, duration_ms
158 FROM recent_invocations
159 LIMIT {}
160 "#,
161 limit
162 );
163
164 let mut stmt = match conn.prepare(&sql) {
165 Ok(stmt) => stmt,
166 Err(e) => {
167 if e.to_string().contains("No files found") {
168 return Ok(Vec::new());
169 }
170 return Err(e.into());
171 }
172 };
173
174 let rows = stmt.query_map([], |row| {
175 Ok(InvocationSummary {
176 id: row.get(0)?,
177 cmd: row.get(1)?,
178 exit_code: row.get(2)?,
179 timestamp: row.get(3)?,
180 duration_ms: row.get(4)?,
181 })
182 });
183
184 match rows {
185 Ok(rows) => {
186 let mut results = Vec::new();
187 for row in rows {
188 results.push(row?);
189 }
190 Ok(results)
191 }
192 Err(e) => {
193 if e.to_string().contains("No files found") {
194 Ok(Vec::new())
195 } else {
196 Err(e.into())
197 }
198 }
199 }
200 }
201
202 pub fn last_invocation(&self) -> Result<Option<InvocationSummary>> {
204 let invocations = self.recent_invocations(1)?;
205 Ok(invocations.into_iter().next())
206 }
207
208 pub fn query_invocations_with_limit(
219 &self,
220 query: &Query,
221 default_limit: usize,
222 ) -> Result<Vec<InvocationSummary>> {
223 let conn = self.connection()?;
224
225 let mut where_clauses: Vec<String> = Vec::new();
227
228 for component in &query.filters {
229 match component {
230 QueryComponent::CommandRegex(pattern) => {
231 let escaped = pattern.replace('\'', "''");
233 where_clauses.push(format!("regexp_matches(cmd, '{}')", escaped));
234 }
235 QueryComponent::FieldFilter(filter) => {
236 let column = match filter.field.as_str() {
238 "exit" | "exit_code" => "exit_code",
239 "duration" | "duration_ms" => "duration_ms",
240 "cmd" | "command" => "cmd",
241 "cwd" => "cwd",
242 other => other, };
244
245 let escaped_value = filter.value.replace('\'', "''");
246
247 let clause = match filter.op {
248 CompareOp::Eq => format!("{} = '{}'", column, escaped_value),
249 CompareOp::NotEq => format!("{} <> '{}'", column, escaped_value),
250 CompareOp::Gt => format!("{} > '{}'", column, escaped_value),
251 CompareOp::Lt => format!("{} < '{}'", column, escaped_value),
252 CompareOp::Gte => format!("{} >= '{}'", column, escaped_value),
253 CompareOp::Lte => format!("{} <= '{}'", column, escaped_value),
254 CompareOp::Regex => {
255 format!("regexp_matches({}::VARCHAR, '{}')", column, escaped_value)
256 }
257 };
258 where_clauses.push(clause);
259 }
260 QueryComponent::Tag(_) => {
261 }
263 }
264 }
265
266 let where_sql = if where_clauses.is_empty() {
268 String::new()
269 } else {
270 format!("WHERE {}", where_clauses.join(" AND "))
271 };
272
273 let limit = query.range.map(|r| r.start).unwrap_or(default_limit);
274
275 let sql = format!(
276 r#"
277 SELECT id::VARCHAR, cmd, exit_code, timestamp::VARCHAR, duration_ms
278 FROM recent_invocations
279 {}
280 LIMIT {}
281 "#,
282 where_sql, limit
283 );
284
285 let mut stmt = match conn.prepare(&sql) {
286 Ok(stmt) => stmt,
287 Err(e) => {
288 if e.to_string().contains("No files found") {
289 return Ok(Vec::new());
290 }
291 return Err(e.into());
292 }
293 };
294
295 let rows = stmt.query_map([], |row| {
296 Ok(InvocationSummary {
297 id: row.get(0)?,
298 cmd: row.get(1)?,
299 exit_code: row.get(2)?,
300 timestamp: row.get(3)?,
301 duration_ms: row.get(4)?,
302 })
303 });
304
305 match rows {
306 Ok(rows) => {
307 let mut results = Vec::new();
308 for row in rows {
309 results.push(row?);
310 }
311 Ok(results)
312 }
313 Err(e) => {
314 if e.to_string().contains("No files found") {
315 Ok(Vec::new())
316 } else {
317 Err(e.into())
318 }
319 }
320 }
321 }
322
323 pub fn query_invocations(&self, query: &Query) -> Result<Vec<InvocationSummary>> {
325 self.query_invocations_with_limit(query, 20)
326 }
327
328 pub fn invocation_count(&self) -> Result<i64> {
330 let conn = self.connection()?;
331
332 let result: std::result::Result<i64, _> =
333 conn.query_row("SELECT COUNT(*) FROM invocations", [], |row| row.get(0));
334
335 match result {
336 Ok(count) => Ok(count),
337 Err(e) => {
338 if e.to_string().contains("No files found") {
339 Ok(0)
340 } else {
341 Err(e.into())
342 }
343 }
344 }
345 }
346
347 pub fn find_by_tag(&self, tag: &str) -> Result<Option<String>> {
350 let conn = self.connection()?;
351
352 let tag = tag.trim_start_matches(':');
354
355 let result: std::result::Result<String, _> = conn.query_row(
356 "SELECT id::VARCHAR FROM invocations WHERE tag = ?",
357 params![tag],
358 |row| row.get(0),
359 );
360
361 match result {
362 Ok(id) => Ok(Some(id)),
363 Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
364 Err(e) => Err(e.into()),
365 }
366 }
367
368 pub fn set_tag(&self, invocation_id: &str, tag: Option<&str>) -> Result<()> {
370 let conn = self.connection()?;
371
372 conn.execute(
373 "UPDATE local.invocations SET tag = ? WHERE id = ?",
374 params![tag, invocation_id],
375 )?;
376
377 Ok(())
378 }
379}
380
381#[cfg(test)]
382mod tests {
383 use super::*;
384 use crate::init::initialize;
385 use crate::Config;
386 use tempfile::TempDir;
387
388 fn setup_store() -> (TempDir, Store) {
389 let tmp = TempDir::new().unwrap();
390 let config = Config::with_root(tmp.path());
391 initialize(&config).unwrap();
392 let store = Store::open(config).unwrap();
393 (tmp, store)
394 }
395
396 #[test]
397 fn test_write_and_count_invocation() {
398 let (_tmp, store) = setup_store();
399
400 let record = InvocationRecord::new(
401 "test-session",
402 "make test",
403 "/home/user/project",
404 0,
405 "test@client",
406 );
407
408 store.write_invocation(&record).unwrap();
409
410 let count = store.invocation_count().unwrap();
411 assert_eq!(count, 1);
412 }
413
414 #[test]
415 fn test_write_and_query_invocation() {
416 let (_tmp, store) = setup_store();
417
418 let record = InvocationRecord::new(
419 "test-session",
420 "cargo build",
421 "/home/user/project",
422 0,
423 "test@client",
424 )
425 .with_duration(1500);
426
427 store.write_invocation(&record).unwrap();
428
429 let result = store
431 .query("SELECT cmd, exit_code, duration_ms FROM invocations")
432 .unwrap();
433
434 assert_eq!(result.columns, vec!["cmd", "exit_code", "duration_ms"]);
435 assert_eq!(result.rows.len(), 1);
436 assert_eq!(result.rows[0][0], "cargo build");
437 assert_eq!(result.rows[0][1], "0");
438 assert_eq!(result.rows[0][2], "1500");
439 }
440
441 #[test]
442 fn test_recent_invocations_empty() {
443 let (_tmp, store) = setup_store();
444
445 let recent = store.recent_invocations(10).unwrap();
446 assert!(recent.is_empty());
447 }
448
449 #[test]
450 fn test_recent_invocations() {
451 let (_tmp, store) = setup_store();
452
453 for i in 0..3 {
455 let record = InvocationRecord::new(
456 "test-session",
457 format!("command-{}", i),
458 "/home/user",
459 i,
460 "test@client",
461 );
462 store.write_invocation(&record).unwrap();
463 }
464
465 let recent = store.recent_invocations(10).unwrap();
466 assert_eq!(recent.len(), 3);
467 }
468
469 #[test]
470 fn test_atomic_parquet_no_temp_files() {
471 let (_tmp, store) = setup_store();
472
473 let record = InvocationRecord::new(
474 "test-session",
475 "test",
476 "/home/user",
477 0,
478 "test@client",
479 );
480 store.write_invocation(&record).unwrap();
481
482 let date = record.date();
484 let inv_dir = store.config().invocations_dir(&date);
485 let temps: Vec<_> = std::fs::read_dir(&inv_dir)
486 .unwrap()
487 .filter_map(|e| e.ok())
488 .filter(|e| e.file_name().to_str().unwrap_or("").starts_with(".tmp."))
489 .collect();
490 assert!(
491 temps.is_empty(),
492 "No temp files should remain in {:?}",
493 inv_dir
494 );
495 }
496
497 fn setup_store_duckdb() -> (TempDir, Store) {
500 let tmp = TempDir::new().unwrap();
501 let config = Config::with_duckdb_mode(tmp.path());
502 initialize(&config).unwrap();
503 let store = Store::open(config).unwrap();
504 (tmp, store)
505 }
506
507 #[test]
508 fn test_duckdb_mode_write_and_count_invocation() {
509 let (_tmp, store) = setup_store_duckdb();
510
511 let record = InvocationRecord::new(
512 "test-session",
513 "make test",
514 "/home/user/project",
515 0,
516 "test@client",
517 );
518
519 store.write_invocation(&record).unwrap();
520
521 let count = store.invocation_count().unwrap();
522 assert_eq!(count, 1);
523 }
524
525 #[test]
526 fn test_duckdb_mode_write_and_query_invocation() {
527 let (_tmp, store) = setup_store_duckdb();
528
529 let record = InvocationRecord::new(
530 "test-session",
531 "cargo build",
532 "/home/user/project",
533 0,
534 "test@client",
535 )
536 .with_duration(1500);
537
538 store.write_invocation(&record).unwrap();
539
540 let result = store
542 .query("SELECT cmd, exit_code, duration_ms FROM invocations")
543 .unwrap();
544
545 assert_eq!(result.columns, vec!["cmd", "exit_code", "duration_ms"]);
546 assert_eq!(result.rows.len(), 1);
547 assert_eq!(result.rows[0][0], "cargo build");
548 assert_eq!(result.rows[0][1], "0");
549 assert_eq!(result.rows[0][2], "1500");
550 }
551
552 #[test]
553 fn test_duckdb_mode_recent_invocations() {
554 let (_tmp, store) = setup_store_duckdb();
555
556 for i in 0..3 {
558 let record = InvocationRecord::new(
559 "test-session",
560 format!("command-{}", i),
561 "/home/user",
562 i,
563 "test@client",
564 );
565 store.write_invocation(&record).unwrap();
566 }
567
568 let recent = store.recent_invocations(10).unwrap();
569 assert_eq!(recent.len(), 3);
570 }
571
572 #[test]
573 fn test_duckdb_mode_no_parquet_files() {
574 let (tmp, store) = setup_store_duckdb();
575
576 let record = InvocationRecord::new(
577 "test-session",
578 "test",
579 "/home/user",
580 0,
581 "test@client",
582 );
583 store.write_invocation(&record).unwrap();
584
585 let invocations_dir = tmp.path().join("db/data/recent/invocations");
587 if invocations_dir.exists() {
588 let parquet_files: Vec<_> = std::fs::read_dir(&invocations_dir)
589 .unwrap()
590 .filter_map(|e| e.ok())
591 .filter(|e| e.file_name().to_str().unwrap_or("").ends_with(".parquet"))
592 .collect();
593 assert!(
594 parquet_files.is_empty(),
595 "DuckDB mode should not create parquet files"
596 );
597 }
598 }
599}