Skip to main content

magic_bird/store/
attempts.rs

1//! Attempt storage operations (v5 schema).
2//!
3//! An attempt is created when an invocation starts. The outcome is recorded
4//! separately when the invocation completes.
5
6use std::fs;
7
8use duckdb::params;
9
10use super::atomic;
11use super::{sanitize_filename, Store};
12use crate::config::StorageMode;
13use crate::schema::AttemptRecord;
14use crate::Result;
15
16impl Store {
17    /// Write an attempt record to the store (v5 schema).
18    ///
19    /// Call this at invocation start. The outcome should be written when
20    /// the invocation completes using `write_outcome()`.
21    ///
22    /// Behavior depends on storage mode:
23    /// - Parquet: Creates a new Parquet file in the appropriate date partition
24    /// - DuckDB: Inserts directly into the local.attempts table
25    pub fn write_attempt(&self, record: &AttemptRecord) -> Result<()> {
26        match self.config.storage_mode {
27            StorageMode::Parquet => self.write_attempt_parquet(record),
28            StorageMode::DuckDB => self.write_attempt_duckdb(record),
29        }
30    }
31
32    /// Write attempt to a Parquet file (multi-writer safe).
33    fn write_attempt_parquet(&self, record: &AttemptRecord) -> Result<()> {
34        let conn = self.connection_with_options(false)?;
35        let date = record.date();
36
37        // Ensure the partition directory exists
38        let partition_dir = self.config.attempts_dir(&date);
39        fs::create_dir_all(&partition_dir)?;
40
41        // Generate filename: {session}--{executable}--{id}.parquet
42        let executable = record.executable.as_deref().unwrap_or("unknown");
43        let filename = format!(
44            "{}--{}--{}.parquet",
45            sanitize_filename(&record.session_id),
46            sanitize_filename(executable),
47            record.id
48        );
49        let file_path = partition_dir.join(&filename);
50
51        // Convert metadata HashMap to DuckDB MAP format
52        // Format: map_from_entries([struct_pack(k := 'key1', v := 'value1'), ...])
53        let metadata_map = if record.metadata.is_empty() {
54            "map([],[]::JSON[])".to_string()
55        } else {
56            let entries: Vec<String> = record.metadata.iter()
57                .map(|(k, v)| {
58                    let key = k.replace('\'', "''");
59                    let value = v.to_string().replace('\'', "''");
60                    format!("struct_pack(k := '{}', v := '{}'::JSON)", key, value)
61                })
62                .collect();
63            format!("map_from_entries([{}])", entries.join(", "))
64        };
65
66        // Write via DuckDB using COPY
67        conn.execute_batch(
68            r#"
69            CREATE OR REPLACE TEMP TABLE temp_attempt (
70                id UUID,
71                timestamp TIMESTAMP,
72                cmd VARCHAR,
73                cwd VARCHAR,
74                session_id VARCHAR,
75                tag VARCHAR,
76                source_client VARCHAR,
77                machine_id VARCHAR,
78                hostname VARCHAR,
79                executable VARCHAR,
80                format_hint VARCHAR,
81                metadata MAP(VARCHAR, JSON),
82                date DATE
83            );
84            "#,
85        )?;
86
87        // Insert with dynamic SQL for the MAP
88        conn.execute(
89            &format!(
90                r#"
91                INSERT INTO temp_attempt VALUES (
92                    ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, {}, ?
93                )
94                "#,
95                metadata_map
96            ),
97            params![
98                record.id.to_string(),
99                record.timestamp.to_rfc3339(),
100                record.cmd,
101                record.cwd,
102                record.session_id,
103                record.tag,
104                record.source_client,
105                record.machine_id,
106                record.hostname,
107                record.executable,
108                record.format_hint,
109                date.to_string(),
110            ],
111        )?;
112
113        // Atomic write: COPY to temp file, then rename
114        let temp_path = atomic::temp_path(&file_path);
115        conn.execute(
116            &format!(
117                "COPY temp_attempt TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
118                temp_path.display()
119            ),
120            [],
121        )?;
122        conn.execute("DROP TABLE temp_attempt", [])?;
123
124        // Rename temp to final (atomic on POSIX)
125        atomic::rename_into_place(&temp_path, &file_path)?;
126
127        Ok(())
128    }
129
130    /// Write attempt directly to DuckDB table.
131    fn write_attempt_duckdb(&self, record: &AttemptRecord) -> Result<()> {
132        let conn = self.connection()?;
133        let date = record.date();
134
135        // Convert metadata HashMap to DuckDB MAP format
136        let metadata_map = if record.metadata.is_empty() {
137            "map([],[]::JSON[])".to_string()
138        } else {
139            let entries: Vec<String> = record.metadata.iter()
140                .map(|(k, v)| {
141                    let key = k.replace('\'', "''");
142                    let value = v.to_string().replace('\'', "''");
143                    format!("struct_pack(k := '{}', v := '{}'::JSON)", key, value)
144                })
145                .collect();
146            format!("map_from_entries([{}])", entries.join(", "))
147        };
148
149        conn.execute(
150            &format!(
151                r#"
152                INSERT INTO local.attempts VALUES (
153                    ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, {}, ?
154                )
155                "#,
156                metadata_map
157            ),
158            params![
159                record.id.to_string(),
160                record.timestamp.to_rfc3339(),
161                record.cmd,
162                record.cwd,
163                record.session_id,
164                record.tag,
165                record.source_client,
166                record.machine_id,
167                record.hostname,
168                record.executable,
169                record.format_hint,
170                date.to_string(),
171            ],
172        )?;
173
174        Ok(())
175    }
176
177    /// Start an invocation by writing an attempt (v5 schema).
178    ///
179    /// This is the v5 equivalent of `start_pending_invocation()`.
180    /// Returns the attempt record for later use with `complete_invocation()`.
181    pub fn start_invocation(&self, record: &AttemptRecord) -> Result<AttemptRecord> {
182        self.write_attempt(record)?;
183        Ok(record.clone())
184    }
185
186    /// Get the count of attempts in the store.
187    pub fn attempt_count(&self) -> Result<i64> {
188        let conn = self.connection()?;
189
190        let result: std::result::Result<i64, _> =
191            conn.query_row("SELECT COUNT(*) FROM attempts", [], |row| row.get(0));
192
193        match result {
194            Ok(count) => Ok(count),
195            Err(e) => {
196                if e.to_string().contains("No files found") || e.to_string().contains("does not exist") {
197                    Ok(0)
198                } else {
199                    Err(e.into())
200                }
201            }
202        }
203    }
204
205    /// Get pending attempts (attempts without outcomes).
206    ///
207    /// In v5 schema, this replaces the pending file mechanism:
208    /// `SELECT * FROM attempts WHERE id NOT IN (SELECT attempt_id FROM outcomes)`
209    pub fn get_pending_attempts(&self) -> Result<Vec<AttemptRecord>> {
210        let conn = self.connection()?;
211
212        let sql = r#"
213            SELECT
214                a.id::VARCHAR, a.timestamp::VARCHAR, a.cmd, a.cwd, a.session_id,
215                a.tag, a.source_client, a.machine_id, a.hostname, a.executable,
216                a.format_hint, a.metadata::VARCHAR, a.date::VARCHAR
217            FROM attempts a
218            WHERE a.id NOT IN (SELECT attempt_id FROM outcomes)
219            ORDER BY a.timestamp DESC
220        "#;
221
222        let mut stmt = match conn.prepare(sql) {
223            Ok(stmt) => stmt,
224            Err(e) => {
225                // Tables might not exist in a fresh v4 database
226                if e.to_string().contains("does not exist") {
227                    return Ok(Vec::new());
228                }
229                return Err(e.into());
230            }
231        };
232
233        let rows = stmt.query_map([], |row| {
234            let id_str: String = row.get(0)?;
235            let ts_str: String = row.get(1)?;
236            let date_str: String = row.get(12)?;
237            let metadata_str: Option<String> = row.get(11)?;
238
239            let metadata: std::collections::HashMap<String, serde_json::Value> = metadata_str
240                .and_then(|s| serde_json::from_str(&s).ok())
241                .unwrap_or_default();
242
243            Ok(AttemptRecord {
244                id: uuid::Uuid::parse_str(&id_str).unwrap_or_else(|_| uuid::Uuid::nil()),
245                timestamp: chrono::DateTime::parse_from_rfc3339(&ts_str)
246                    .map(|dt| dt.with_timezone(&chrono::Utc))
247                    .unwrap_or_else(|_| chrono::Utc::now()),
248                cmd: row.get(2)?,
249                cwd: row.get(3)?,
250                session_id: row.get(4)?,
251                tag: row.get(5)?,
252                source_client: row.get(6)?,
253                machine_id: row.get(7)?,
254                hostname: row.get(8)?,
255                executable: row.get(9)?,
256                format_hint: row.get(10)?,
257                metadata,
258                date: chrono::NaiveDate::parse_from_str(&date_str, "%Y-%m-%d")
259                    .unwrap_or_else(|_| chrono::Utc::now().date_naive()),
260            })
261        });
262
263        match rows {
264            Ok(rows) => {
265                let mut results = Vec::new();
266                for row in rows {
267                    results.push(row?);
268                }
269                Ok(results)
270            }
271            Err(e) => {
272                if e.to_string().contains("does not exist") {
273                    Ok(Vec::new())
274                } else {
275                    Err(e.into())
276                }
277            }
278        }
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285    use crate::init::initialize;
286    use crate::Config;
287    use tempfile::TempDir;
288
289    // Note: These tests will fail until we update init.rs to create the v5 schema.
290    // For now, we can verify the code compiles correctly.
291
292    fn _setup_store_v5() -> (TempDir, Store) {
293        let tmp = TempDir::new().unwrap();
294        let config = Config::with_root(tmp.path());
295        initialize(&config).unwrap();
296        let store = Store::open(config).unwrap();
297        (tmp, store)
298    }
299
300    fn _setup_store_v5_duckdb() -> (TempDir, Store) {
301        let tmp = TempDir::new().unwrap();
302        let config = Config::with_duckdb_mode(tmp.path());
303        initialize(&config).unwrap();
304        let store = Store::open(config).unwrap();
305        (tmp, store)
306    }
307
308    // Tests will be uncommented after init.rs is updated for v5 schema
309    /*
310    #[test]
311    fn test_write_attempt_parquet() {
312        let (_tmp, store) = _setup_store_v5();
313
314        let attempt = AttemptRecord::new(
315            "test-session",
316            "make test",
317            "/home/user/project",
318            "test@client",
319        );
320
321        store.write_attempt(&attempt).unwrap();
322
323        let count = store.attempt_count().unwrap();
324        assert_eq!(count, 1);
325    }
326
327    #[test]
328    fn test_write_attempt_duckdb() {
329        let (_tmp, store) = _setup_store_v5_duckdb();
330
331        let attempt = AttemptRecord::new(
332            "test-session",
333            "make test",
334            "/home/user/project",
335            "test@client",
336        );
337
338        store.write_attempt(&attempt).unwrap();
339
340        let count = store.attempt_count().unwrap();
341        assert_eq!(count, 1);
342    }
343    */
344}