1use std::fs;
7
8use duckdb::params;
9use uuid::Uuid;
10
11use super::atomic;
12use super::Store;
13use crate::config::StorageMode;
14use crate::schema::OutcomeRecord;
15use crate::Result;
16
17impl Store {
18 pub fn write_outcome(&self, record: &OutcomeRecord) -> Result<()> {
27 match self.config.storage_mode {
28 StorageMode::Parquet => self.write_outcome_parquet(record),
29 StorageMode::DuckDB => self.write_outcome_duckdb(record),
30 }
31 }
32
33 fn write_outcome_parquet(&self, record: &OutcomeRecord) -> Result<()> {
35 let conn = self.connection_with_options(false)?;
36 let date = record.date;
37
38 let partition_dir = self.config.outcomes_dir(&date);
40 fs::create_dir_all(&partition_dir)?;
41
42 let filename = format!("{}.parquet", record.attempt_id);
44 let file_path = partition_dir.join(&filename);
45
46 let metadata_map = if record.metadata.is_empty() {
48 "map([],[]::JSON[])".to_string()
49 } else {
50 let entries: Vec<String> = record.metadata.iter()
51 .map(|(k, v)| {
52 let key = k.replace('\'', "''");
53 let value = v.to_string().replace('\'', "''");
54 format!("struct_pack(k := '{}', v := '{}'::JSON)", key, value)
55 })
56 .collect();
57 format!("map_from_entries([{}])", entries.join(", "))
58 };
59
60 conn.execute_batch(
62 r#"
63 CREATE OR REPLACE TEMP TABLE temp_outcome (
64 attempt_id UUID,
65 completed_at TIMESTAMP,
66 exit_code INTEGER,
67 duration_ms BIGINT,
68 signal INTEGER,
69 timeout BOOLEAN,
70 metadata MAP(VARCHAR, JSON),
71 date DATE
72 );
73 "#,
74 )?;
75
76 conn.execute(
78 &format!(
79 r#"
80 INSERT INTO temp_outcome VALUES (
81 ?, ?, ?, ?, ?, ?, {}, ?
82 )
83 "#,
84 metadata_map
85 ),
86 params![
87 record.attempt_id.to_string(),
88 record.completed_at.to_rfc3339(),
89 record.exit_code,
90 record.duration_ms,
91 record.signal,
92 record.timeout,
93 date.to_string(),
94 ],
95 )?;
96
97 let temp_path = atomic::temp_path(&file_path);
99 conn.execute(
100 &format!(
101 "COPY temp_outcome TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
102 temp_path.display()
103 ),
104 [],
105 )?;
106 conn.execute("DROP TABLE temp_outcome", [])?;
107
108 atomic::rename_into_place(&temp_path, &file_path)?;
110
111 Ok(())
112 }
113
114 fn write_outcome_duckdb(&self, record: &OutcomeRecord) -> Result<()> {
116 let conn = self.connection()?;
117 let date = record.date;
118
119 let metadata_map = if record.metadata.is_empty() {
121 "map([],[]::JSON[])".to_string()
122 } else {
123 let entries: Vec<String> = record.metadata.iter()
124 .map(|(k, v)| {
125 let key = k.replace('\'', "''");
126 let value = v.to_string().replace('\'', "''");
127 format!("struct_pack(k := '{}', v := '{}'::JSON)", key, value)
128 })
129 .collect();
130 format!("map_from_entries([{}])", entries.join(", "))
131 };
132
133 conn.execute(
134 &format!(
135 r#"
136 INSERT INTO local.outcomes VALUES (
137 ?, ?, ?, ?, ?, ?, {}, ?
138 )
139 "#,
140 metadata_map
141 ),
142 params![
143 record.attempt_id.to_string(),
144 record.completed_at.to_rfc3339(),
145 record.exit_code,
146 record.duration_ms,
147 record.signal,
148 record.timeout,
149 date.to_string(),
150 ],
151 )?;
152
153 Ok(())
154 }
155
156 pub fn complete_invocation(
160 &self,
161 attempt_id: Uuid,
162 exit_code: i32,
163 duration_ms: Option<i64>,
164 date: chrono::NaiveDate,
165 ) -> Result<()> {
166 let outcome = OutcomeRecord::completed(attempt_id, exit_code, duration_ms, date);
167 self.write_outcome(&outcome)
168 }
169
170 pub fn orphan_invocation(&self, attempt_id: Uuid, date: chrono::NaiveDate) -> Result<()> {
172 let outcome = OutcomeRecord::orphaned(attempt_id, date);
173 self.write_outcome(&outcome)
174 }
175
176 pub fn kill_invocation(
178 &self,
179 attempt_id: Uuid,
180 signal: i32,
181 duration_ms: Option<i64>,
182 date: chrono::NaiveDate,
183 ) -> Result<()> {
184 let outcome = OutcomeRecord::killed(attempt_id, signal, duration_ms, date);
185 self.write_outcome(&outcome)
186 }
187
188 pub fn timeout_invocation(
190 &self,
191 attempt_id: Uuid,
192 duration_ms: i64,
193 date: chrono::NaiveDate,
194 ) -> Result<()> {
195 let outcome = OutcomeRecord::timed_out(attempt_id, duration_ms, date);
196 self.write_outcome(&outcome)
197 }
198
199 pub fn outcome_count(&self) -> Result<i64> {
201 let conn = self.connection()?;
202
203 let result: std::result::Result<i64, _> =
204 conn.query_row("SELECT COUNT(*) FROM outcomes", [], |row| row.get(0));
205
206 match result {
207 Ok(count) => Ok(count),
208 Err(e) => {
209 if e.to_string().contains("No files found") || e.to_string().contains("does not exist") {
210 Ok(0)
211 } else {
212 Err(e.into())
213 }
214 }
215 }
216 }
217
218 pub fn recover_orphans(&self) -> Result<super::pending::RecoveryStats> {
226 use super::pending::{is_runner_alive, RecoveryStats};
227 use chrono::NaiveDate;
228
229 let conn = self.connection()?;
230 let mut stats = RecoveryStats::default();
231
232 let mut stmt = conn.prepare(
235 r#"
236 SELECT a.id, a.timestamp::DATE as date, a.machine_id as runner_id
237 FROM attempts a
238 LEFT JOIN outcomes o ON a.id = o.attempt_id
239 WHERE o.attempt_id IS NULL
240 "#,
241 )?;
242
243 let pending: Vec<(String, String, Option<String>)> = stmt
244 .query_map([], |row| {
245 Ok((
246 row.get::<_, String>(0)?,
247 row.get::<_, String>(1)?,
248 row.get::<_, Option<String>>(2)?,
249 ))
250 })?
251 .filter_map(|r| r.ok())
252 .collect();
253
254 stats.pending_checked = pending.len();
255
256 for (id_str, date_str, runner_id) in pending {
257 let alive = runner_id
259 .as_ref()
260 .map(|r| is_runner_alive(r))
261 .unwrap_or(false);
262
263 if alive {
264 stats.still_running += 1;
265 continue;
266 }
267
268 let attempt_id = match uuid::Uuid::parse_str(&id_str) {
270 Ok(id) => id,
271 Err(_) => {
272 stats.errors += 1;
273 continue;
274 }
275 };
276
277 let date = match NaiveDate::parse_from_str(&date_str, "%Y-%m-%d") {
278 Ok(d) => d,
279 Err(_) => {
280 stats.errors += 1;
281 continue;
282 }
283 };
284
285 match self.orphan_invocation(attempt_id, date) {
286 Ok(()) => stats.orphaned += 1,
287 Err(_) => stats.errors += 1,
288 }
289 }
290
291 Ok(stats)
292 }
293}
294
295#[cfg(test)]
296mod tests {
297 }