1use std::fs;
7
8use duckdb::params;
9
10use super::atomic;
11use super::{sanitize_filename, Store};
12use crate::config::StorageMode;
13use crate::schema::AttemptRecord;
14use crate::Result;
15
16impl Store {
17 pub fn write_attempt(&self, record: &AttemptRecord) -> Result<()> {
26 match self.config.storage_mode {
27 StorageMode::Parquet => self.write_attempt_parquet(record),
28 StorageMode::DuckDB => self.write_attempt_duckdb(record),
29 }
30 }
31
32 fn write_attempt_parquet(&self, record: &AttemptRecord) -> Result<()> {
34 let conn = self.connection_with_options(false)?;
35 let date = record.date();
36
37 let partition_dir = self.config.attempts_dir(&date);
39 fs::create_dir_all(&partition_dir)?;
40
41 let executable = record.executable.as_deref().unwrap_or("unknown");
43 let filename = format!(
44 "{}--{}--{}.parquet",
45 sanitize_filename(&record.session_id),
46 sanitize_filename(executable),
47 record.id
48 );
49 let file_path = partition_dir.join(&filename);
50
51 let metadata_map = if record.metadata.is_empty() {
54 "map([],[]::JSON[])".to_string()
55 } else {
56 let entries: Vec<String> = record.metadata.iter()
57 .map(|(k, v)| {
58 let key = k.replace('\'', "''");
59 let value = v.to_string().replace('\'', "''");
60 format!("struct_pack(k := '{}', v := '{}'::JSON)", key, value)
61 })
62 .collect();
63 format!("map_from_entries([{}])", entries.join(", "))
64 };
65
66 conn.execute_batch(
68 r#"
69 CREATE OR REPLACE TEMP TABLE temp_attempt (
70 id UUID,
71 timestamp TIMESTAMP,
72 cmd VARCHAR,
73 cwd VARCHAR,
74 session_id VARCHAR,
75 tag VARCHAR,
76 source_client VARCHAR,
77 machine_id VARCHAR,
78 hostname VARCHAR,
79 executable VARCHAR,
80 format_hint VARCHAR,
81 metadata MAP(VARCHAR, JSON),
82 date DATE
83 );
84 "#,
85 )?;
86
87 conn.execute(
89 &format!(
90 r#"
91 INSERT INTO temp_attempt VALUES (
92 ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, {}, ?
93 )
94 "#,
95 metadata_map
96 ),
97 params![
98 record.id.to_string(),
99 record.timestamp.to_rfc3339(),
100 record.cmd,
101 record.cwd,
102 record.session_id,
103 record.tag,
104 record.source_client,
105 record.machine_id,
106 record.hostname,
107 record.executable,
108 record.format_hint,
109 date.to_string(),
110 ],
111 )?;
112
113 let temp_path = atomic::temp_path(&file_path);
115 conn.execute(
116 &format!(
117 "COPY temp_attempt TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
118 temp_path.display()
119 ),
120 [],
121 )?;
122 conn.execute("DROP TABLE temp_attempt", [])?;
123
124 atomic::rename_into_place(&temp_path, &file_path)?;
126
127 Ok(())
128 }
129
130 fn write_attempt_duckdb(&self, record: &AttemptRecord) -> Result<()> {
132 let conn = self.connection()?;
133 let date = record.date();
134
135 let metadata_map = if record.metadata.is_empty() {
137 "map([],[]::JSON[])".to_string()
138 } else {
139 let entries: Vec<String> = record.metadata.iter()
140 .map(|(k, v)| {
141 let key = k.replace('\'', "''");
142 let value = v.to_string().replace('\'', "''");
143 format!("struct_pack(k := '{}', v := '{}'::JSON)", key, value)
144 })
145 .collect();
146 format!("map_from_entries([{}])", entries.join(", "))
147 };
148
149 conn.execute(
150 &format!(
151 r#"
152 INSERT INTO local.attempts VALUES (
153 ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, {}, ?
154 )
155 "#,
156 metadata_map
157 ),
158 params![
159 record.id.to_string(),
160 record.timestamp.to_rfc3339(),
161 record.cmd,
162 record.cwd,
163 record.session_id,
164 record.tag,
165 record.source_client,
166 record.machine_id,
167 record.hostname,
168 record.executable,
169 record.format_hint,
170 date.to_string(),
171 ],
172 )?;
173
174 Ok(())
175 }
176
177 pub fn start_invocation(&self, record: &AttemptRecord) -> Result<AttemptRecord> {
182 self.write_attempt(record)?;
183 Ok(record.clone())
184 }
185
186 pub fn attempt_count(&self) -> Result<i64> {
188 let conn = self.connection()?;
189
190 let result: std::result::Result<i64, _> =
191 conn.query_row("SELECT COUNT(*) FROM attempts", [], |row| row.get(0));
192
193 match result {
194 Ok(count) => Ok(count),
195 Err(e) => {
196 if e.to_string().contains("No files found") || e.to_string().contains("does not exist") {
197 Ok(0)
198 } else {
199 Err(e.into())
200 }
201 }
202 }
203 }
204
205 pub fn get_pending_attempts(&self) -> Result<Vec<AttemptRecord>> {
210 let conn = self.connection()?;
211
212 let sql = r#"
213 SELECT
214 a.id::VARCHAR, a.timestamp::VARCHAR, a.cmd, a.cwd, a.session_id,
215 a.tag, a.source_client, a.machine_id, a.hostname, a.executable,
216 a.format_hint, a.metadata::VARCHAR, a.date::VARCHAR
217 FROM attempts a
218 WHERE a.id NOT IN (SELECT attempt_id FROM outcomes)
219 ORDER BY a.timestamp DESC
220 "#;
221
222 let mut stmt = match conn.prepare(sql) {
223 Ok(stmt) => stmt,
224 Err(e) => {
225 if e.to_string().contains("does not exist") {
227 return Ok(Vec::new());
228 }
229 return Err(e.into());
230 }
231 };
232
233 let rows = stmt.query_map([], |row| {
234 let id_str: String = row.get(0)?;
235 let ts_str: String = row.get(1)?;
236 let date_str: String = row.get(12)?;
237 let metadata_str: Option<String> = row.get(11)?;
238
239 let metadata: std::collections::HashMap<String, serde_json::Value> = metadata_str
240 .and_then(|s| serde_json::from_str(&s).ok())
241 .unwrap_or_default();
242
243 Ok(AttemptRecord {
244 id: uuid::Uuid::parse_str(&id_str).unwrap_or_else(|_| uuid::Uuid::nil()),
245 timestamp: chrono::DateTime::parse_from_rfc3339(&ts_str)
246 .map(|dt| dt.with_timezone(&chrono::Utc))
247 .unwrap_or_else(|_| chrono::Utc::now()),
248 cmd: row.get(2)?,
249 cwd: row.get(3)?,
250 session_id: row.get(4)?,
251 tag: row.get(5)?,
252 source_client: row.get(6)?,
253 machine_id: row.get(7)?,
254 hostname: row.get(8)?,
255 executable: row.get(9)?,
256 format_hint: row.get(10)?,
257 metadata,
258 date: chrono::NaiveDate::parse_from_str(&date_str, "%Y-%m-%d")
259 .unwrap_or_else(|_| chrono::Utc::now().date_naive()),
260 })
261 });
262
263 match rows {
264 Ok(rows) => {
265 let mut results = Vec::new();
266 for row in rows {
267 results.push(row?);
268 }
269 Ok(results)
270 }
271 Err(e) => {
272 if e.to_string().contains("does not exist") {
273 Ok(Vec::new())
274 } else {
275 Err(e.into())
276 }
277 }
278 }
279 }
280}
281
282#[cfg(test)]
283mod tests {
284 use super::*;
285 use crate::init::initialize;
286 use crate::Config;
287 use tempfile::TempDir;
288
289 fn _setup_store_v5() -> (TempDir, Store) {
293 let tmp = TempDir::new().unwrap();
294 let config = Config::with_root(tmp.path());
295 initialize(&config).unwrap();
296 let store = Store::open(config).unwrap();
297 (tmp, store)
298 }
299
300 fn _setup_store_v5_duckdb() -> (TempDir, Store) {
301 let tmp = TempDir::new().unwrap();
302 let config = Config::with_duckdb_mode(tmp.path());
303 initialize(&config).unwrap();
304 let store = Store::open(config).unwrap();
305 (tmp, store)
306 }
307
308 }