1use crate::error::Result;
2
3use super::{StateConn, StateStore, pg_sql};
4
5#[derive(Debug, serde::Serialize)]
7#[allow(dead_code)]
8pub struct FileRecord {
9 pub run_id: String,
10 pub export_name: String,
11 pub file_name: String,
12 pub row_count: i64,
13 pub bytes: i64,
14 pub format: String,
15 pub compression: Option<String>,
16 pub created_at: String,
17}
18
19impl StateStore {
29 #[allow(clippy::too_many_arguments)]
30 pub fn record_file(
31 &self,
32 run_id: &str,
33 export_name: &str,
34 file_name: &str,
35 row_count: i64,
36 bytes: i64,
37 format: &str,
38 compression: Option<&str>,
39 ) -> Result<()> {
40 let now = chrono::Utc::now().to_rfc3339();
41 let sql = "INSERT INTO file_log (run_id, export_name, file_name, row_count, bytes, format, compression, created_at)
42 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)";
43 match &self.conn {
44 StateConn::Sqlite(c) => {
45 c.execute(
46 sql,
47 rusqlite::params![
48 run_id,
49 export_name,
50 file_name,
51 row_count,
52 bytes,
53 format,
54 compression,
55 now
56 ],
57 )?;
58 }
59 StateConn::Postgres(client) => {
60 let mut c = client.borrow_mut();
61 c.execute(
62 &pg_sql(sql),
63 &[
64 &run_id,
65 &export_name,
66 &file_name,
67 &row_count,
68 &bytes,
69 &format,
70 &compression,
71 &now,
72 ],
73 )?;
74 }
75 }
76 Ok(())
77 }
78
79 pub fn get_files(&self, export_name: Option<&str>, limit: usize) -> Result<Vec<FileRecord>> {
80 let cols =
81 "run_id, export_name, file_name, row_count, bytes, format, compression, created_at";
82 let limit_i64 = limit as i64;
83 match &self.conn {
84 StateConn::Sqlite(c) => {
85 let (sql, params): (&str, Vec<Box<dyn rusqlite::types::ToSql>>) = if let Some(
86 name,
87 ) = export_name
88 {
89 (
90 "SELECT run_id, export_name, file_name, row_count, bytes, format, compression, created_at \
91 FROM file_log WHERE export_name = ?1 ORDER BY id DESC LIMIT ?2",
92 vec![Box::new(name.to_string()), Box::new(limit_i64)],
93 )
94 } else {
95 (
96 "SELECT run_id, export_name, file_name, row_count, bytes, format, compression, created_at \
97 FROM file_log ORDER BY id DESC LIMIT ?1",
98 vec![Box::new(limit_i64)],
99 )
100 };
101 let mut stmt = c.prepare(sql)?;
102 let params_refs: Vec<&dyn rusqlite::types::ToSql> =
103 params.iter().map(|p| p.as_ref()).collect();
104 let rows = stmt.query_map(params_refs.as_slice(), |row| {
105 Ok(FileRecord {
106 run_id: row.get(0)?,
107 export_name: row.get(1)?,
108 file_name: row.get(2)?,
109 row_count: row.get(3)?,
110 bytes: row.get(4)?,
111 format: row.get(5)?,
112 compression: row.get(6)?,
113 created_at: row.get(7)?,
114 })
115 })?;
116 rows.collect::<std::result::Result<Vec<_>, _>>()
117 .map_err(Into::into)
118 }
119 StateConn::Postgres(client) => {
120 let mut c = client.borrow_mut();
123 let rows = if let Some(name) = export_name {
124 c.query(
125 &format!("SELECT {} FROM file_log WHERE export_name = $1 ORDER BY id DESC LIMIT $2", cols),
126 &[&name, &limit_i64],
127 )?
128 } else {
129 c.query(
130 &format!("SELECT {} FROM file_log ORDER BY id DESC LIMIT $1", cols),
131 &[&limit_i64],
132 )?
133 };
134 Ok(rows
135 .iter()
136 .map(|row| FileRecord {
137 run_id: row.get(0),
138 export_name: row.get(1),
139 file_name: row.get(2),
140 row_count: row.get(3),
141 bytes: row.get(4),
142 format: row.get(5),
143 compression: row.get(6),
144 created_at: row.get(7),
145 })
146 .collect())
147 }
148 }
149 }
150}
151
152#[cfg(test)]
153mod tests {
154 use super::*;
155
156 fn store() -> StateStore {
157 StateStore::open_in_memory().expect("in-memory store")
158 }
159
160 #[test]
161 fn record_and_query_files() {
162 let s = store();
163 s.record_file(
164 "run_001",
165 "orders",
166 "orders_20260329.parquet",
167 50000,
168 4096,
169 "parquet",
170 Some("zstd"),
171 )
172 .unwrap();
173 s.record_file(
174 "run_001",
175 "orders",
176 "orders_20260329_chunk1.parquet",
177 25000,
178 2048,
179 "parquet",
180 Some("zstd"),
181 )
182 .unwrap();
183 s.record_file(
184 "run_002",
185 "users",
186 "users_20260329.csv",
187 1000,
188 500,
189 "csv",
190 None,
191 )
192 .unwrap();
193
194 let files = s.get_files(Some("orders"), 10).unwrap();
195 assert_eq!(files.len(), 2);
196 assert_eq!(files[0].run_id, "run_001");
197 assert_eq!(files[0].row_count, 25000);
198
199 let all = s.get_files(None, 10).unwrap();
200 assert_eq!(all.len(), 3);
201 }
202
203 #[test]
204 fn files_limit_works() {
205 let s = store();
206 for i in 0..10 {
207 s.record_file(
208 &format!("r{}", i),
209 "t",
210 &format!("f{}.parquet", i),
211 i,
212 i * 100,
213 "parquet",
214 None,
215 )
216 .unwrap();
217 }
218 let files = s.get_files(Some("t"), 3).unwrap();
219 assert_eq!(files.len(), 3);
220 }
221}