Skip to main content

magic_bird/store/
outcomes.rs

1//! Outcome storage operations (v5 schema).
2//!
3//! An outcome is created when an invocation completes. It links back to
4//! the attempt that started the invocation.
5
6use std::fs;
7
8use duckdb::params;
9use uuid::Uuid;
10
11use super::atomic;
12use super::Store;
13use crate::config::StorageMode;
14use crate::schema::OutcomeRecord;
15use crate::Result;
16
17impl Store {
18    /// Write an outcome record to the store (v5 schema).
19    ///
20    /// Call this when an invocation completes (success, failure, or crash).
21    /// The attempt should have been written at invocation start using `write_attempt()`.
22    ///
23    /// Behavior depends on storage mode:
24    /// - Parquet: Creates a new Parquet file in the appropriate date partition
25    /// - DuckDB: Inserts directly into the local.outcomes table
26    pub fn write_outcome(&self, record: &OutcomeRecord) -> Result<()> {
27        match self.config.storage_mode {
28            StorageMode::Parquet => self.write_outcome_parquet(record),
29            StorageMode::DuckDB => self.write_outcome_duckdb(record),
30        }
31    }
32
33    /// Write outcome to a Parquet file (multi-writer safe).
34    fn write_outcome_parquet(&self, record: &OutcomeRecord) -> Result<()> {
35        let conn = self.connection_with_options(false)?;
36        let date = record.date;
37
38        // Ensure the partition directory exists
39        let partition_dir = self.config.outcomes_dir(&date);
40        fs::create_dir_all(&partition_dir)?;
41
42        // Generate filename: {attempt_id}.parquet
43        let filename = format!("{}.parquet", record.attempt_id);
44        let file_path = partition_dir.join(&filename);
45
46        // Convert metadata HashMap to DuckDB MAP format
47        let metadata_map = if record.metadata.is_empty() {
48            "map([],[]::JSON[])".to_string()
49        } else {
50            let entries: Vec<String> = record.metadata.iter()
51                .map(|(k, v)| {
52                    let key = k.replace('\'', "''");
53                    let value = v.to_string().replace('\'', "''");
54                    format!("struct_pack(k := '{}', v := '{}'::JSON)", key, value)
55                })
56                .collect();
57            format!("map_from_entries([{}])", entries.join(", "))
58        };
59
60        // Write via DuckDB using COPY
61        conn.execute_batch(
62            r#"
63            CREATE OR REPLACE TEMP TABLE temp_outcome (
64                attempt_id UUID,
65                completed_at TIMESTAMP,
66                exit_code INTEGER,
67                duration_ms BIGINT,
68                signal INTEGER,
69                timeout BOOLEAN,
70                metadata MAP(VARCHAR, JSON),
71                date DATE
72            );
73            "#,
74        )?;
75
76        // Insert with dynamic SQL for the MAP
77        conn.execute(
78            &format!(
79                r#"
80                INSERT INTO temp_outcome VALUES (
81                    ?, ?, ?, ?, ?, ?, {}, ?
82                )
83                "#,
84                metadata_map
85            ),
86            params![
87                record.attempt_id.to_string(),
88                record.completed_at.to_rfc3339(),
89                record.exit_code,
90                record.duration_ms,
91                record.signal,
92                record.timeout,
93                date.to_string(),
94            ],
95        )?;
96
97        // Atomic write: COPY to temp file, then rename
98        let temp_path = atomic::temp_path(&file_path);
99        conn.execute(
100            &format!(
101                "COPY temp_outcome TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
102                temp_path.display()
103            ),
104            [],
105        )?;
106        conn.execute("DROP TABLE temp_outcome", [])?;
107
108        // Rename temp to final (atomic on POSIX)
109        atomic::rename_into_place(&temp_path, &file_path)?;
110
111        Ok(())
112    }
113
114    /// Write outcome directly to DuckDB table.
115    fn write_outcome_duckdb(&self, record: &OutcomeRecord) -> Result<()> {
116        let conn = self.connection()?;
117        let date = record.date;
118
119        // Convert metadata HashMap to DuckDB MAP format
120        let metadata_map = if record.metadata.is_empty() {
121            "map([],[]::JSON[])".to_string()
122        } else {
123            let entries: Vec<String> = record.metadata.iter()
124                .map(|(k, v)| {
125                    let key = k.replace('\'', "''");
126                    let value = v.to_string().replace('\'', "''");
127                    format!("struct_pack(k := '{}', v := '{}'::JSON)", key, value)
128                })
129                .collect();
130            format!("map_from_entries([{}])", entries.join(", "))
131        };
132
133        conn.execute(
134            &format!(
135                r#"
136                INSERT INTO local.outcomes VALUES (
137                    ?, ?, ?, ?, ?, ?, {}, ?
138                )
139                "#,
140                metadata_map
141            ),
142            params![
143                record.attempt_id.to_string(),
144                record.completed_at.to_rfc3339(),
145                record.exit_code,
146                record.duration_ms,
147                record.signal,
148                record.timeout,
149                date.to_string(),
150            ],
151        )?;
152
153        Ok(())
154    }
155
156    /// Complete an invocation by writing an outcome (v5 schema).
157    ///
158    /// This is the v5 equivalent of `complete_pending_invocation()`.
159    pub fn complete_invocation(
160        &self,
161        attempt_id: Uuid,
162        exit_code: i32,
163        duration_ms: Option<i64>,
164        date: chrono::NaiveDate,
165    ) -> Result<()> {
166        let outcome = OutcomeRecord::completed(attempt_id, exit_code, duration_ms, date);
167        self.write_outcome(&outcome)
168    }
169
170    /// Mark an invocation as orphaned (crashed without cleanup).
171    pub fn orphan_invocation(&self, attempt_id: Uuid, date: chrono::NaiveDate) -> Result<()> {
172        let outcome = OutcomeRecord::orphaned(attempt_id, date);
173        self.write_outcome(&outcome)
174    }
175
176    /// Mark an invocation as killed by signal.
177    pub fn kill_invocation(
178        &self,
179        attempt_id: Uuid,
180        signal: i32,
181        duration_ms: Option<i64>,
182        date: chrono::NaiveDate,
183    ) -> Result<()> {
184        let outcome = OutcomeRecord::killed(attempt_id, signal, duration_ms, date);
185        self.write_outcome(&outcome)
186    }
187
188    /// Mark an invocation as timed out.
189    pub fn timeout_invocation(
190        &self,
191        attempt_id: Uuid,
192        duration_ms: i64,
193        date: chrono::NaiveDate,
194    ) -> Result<()> {
195        let outcome = OutcomeRecord::timed_out(attempt_id, duration_ms, date);
196        self.write_outcome(&outcome)
197    }
198
199    /// Get the count of outcomes in the store.
200    pub fn outcome_count(&self) -> Result<i64> {
201        let conn = self.connection()?;
202
203        let result: std::result::Result<i64, _> =
204            conn.query_row("SELECT COUNT(*) FROM outcomes", [], |row| row.get(0));
205
206        match result {
207            Ok(count) => Ok(count),
208            Err(e) => {
209                if e.to_string().contains("No files found") || e.to_string().contains("does not exist") {
210                    Ok(0)
211                } else {
212                    Err(e.into())
213                }
214            }
215        }
216    }
217
218    /// Recover orphaned invocations (v5 schema).
219    ///
220    /// Finds attempts without outcomes where the runner is no longer alive,
221    /// and marks them as orphaned. Returns statistics about the operation.
222    ///
223    /// This is safe to run periodically (e.g., during compaction) and is
224    /// idempotent - it won't create duplicate outcomes.
225    pub fn recover_orphans(&self) -> Result<super::pending::RecoveryStats> {
226        use super::pending::{is_runner_alive, RecoveryStats};
227        use chrono::NaiveDate;
228
229        let conn = self.connection()?;
230        let mut stats = RecoveryStats::default();
231
232        // Find pending attempts (attempts without matching outcomes)
233        // machine_id stores the runner_id in v5 schema
234        let mut stmt = conn.prepare(
235            r#"
236            SELECT a.id, a.timestamp::DATE as date, a.machine_id as runner_id
237            FROM attempts a
238            LEFT JOIN outcomes o ON a.id = o.attempt_id
239            WHERE o.attempt_id IS NULL
240            "#,
241        )?;
242
243        let pending: Vec<(String, String, Option<String>)> = stmt
244            .query_map([], |row| {
245                Ok((
246                    row.get::<_, String>(0)?,
247                    row.get::<_, String>(1)?,
248                    row.get::<_, Option<String>>(2)?,
249                ))
250            })?
251            .filter_map(|r| r.ok())
252            .collect();
253
254        stats.pending_checked = pending.len();
255
256        for (id_str, date_str, runner_id) in pending {
257            // Check if runner is still alive
258            let alive = runner_id
259                .as_ref()
260                .map(|r| is_runner_alive(r))
261                .unwrap_or(false);
262
263            if alive {
264                stats.still_running += 1;
265                continue;
266            }
267
268            // Runner is dead - mark as orphaned
269            let attempt_id = match uuid::Uuid::parse_str(&id_str) {
270                Ok(id) => id,
271                Err(_) => {
272                    stats.errors += 1;
273                    continue;
274                }
275            };
276
277            let date = match NaiveDate::parse_from_str(&date_str, "%Y-%m-%d") {
278                Ok(d) => d,
279                Err(_) => {
280                    stats.errors += 1;
281                    continue;
282                }
283            };
284
285            match self.orphan_invocation(attempt_id, date) {
286                Ok(()) => stats.orphaned += 1,
287                Err(_) => stats.errors += 1,
288            }
289        }
290
291        Ok(stats)
292    }
293}
294
295#[cfg(test)]
296mod tests {
297    // Tests will be added after init.rs is updated for v5 schema
298}