Skip to main content

magic_bird/store/
outputs.rs

1//! Output storage operations.
2
3use std::fs;
4
5use duckdb::params;
6
7use super::atomic;
8use super::{sanitize_filename, Store};
9use crate::config::StorageMode;
10use crate::schema::OutputRecord;
11use crate::{Config, Error, Result};
12
13/// Info about stored output.
14#[derive(Debug)]
15pub struct OutputInfo {
16    pub storage_type: String,
17    pub storage_ref: String,
18    pub stream: String,
19    pub byte_length: i64,
20    pub content_hash: String,
21}
22
23impl Store {
24    /// Store output content, routing to inline or blob based on size.
25    ///
26    /// This is the high-level method for storing invocation output. It:
27    /// 1. Computes the BLAKE3 hash
28    /// 2. Routes small content to inline (data: URL) or large to blob (file: URL)
29    /// 3. Handles deduplication for blobs
30    /// 4. Writes the output record to Parquet
31    pub fn store_output(
32        &self,
33        invocation_id: uuid::Uuid,
34        stream: &str,
35        content: &[u8],
36        date: chrono::NaiveDate,
37        cmd_hint: Option<&str>,
38    ) -> Result<()> {
39        use base64::Engine;
40
41        // Compute hash
42        let hash = blake3::hash(content);
43        let hash_hex = hash.to_hex().to_string();
44
45        // Route by size
46        let (storage_type, storage_ref) = if content.len() < self.config.inline_threshold {
47            // Inline: use data: URL
48            let b64 = base64::engine::general_purpose::STANDARD.encode(content);
49            let data_url = format!("data:application/octet-stream;base64,{}", b64);
50            ("inline".to_string(), data_url)
51        } else {
52            // Blob: check for dedup, write file if needed
53            let conn = self.connection()?;
54
55            // Check if blob already exists (dedup check)
56            let existing: std::result::Result<String, _> = conn.query_row(
57                "SELECT storage_path FROM blob_registry WHERE content_hash = ?",
58                params![&hash_hex],
59                |row| row.get(0),
60            );
61
62            let storage_path = match existing {
63                Ok(path) => {
64                    // DEDUP HIT - increment ref count
65                    conn.execute(
66                        "UPDATE blob_registry SET ref_count = ref_count + 1, last_accessed = CURRENT_TIMESTAMP WHERE content_hash = ?",
67                        params![&hash_hex],
68                    )?;
69                    path
70                }
71                Err(_) => {
72                    // DEDUP MISS in registry - write new blob atomically
73                    let cmd_hint = cmd_hint.unwrap_or("output");
74                    let blob_path = self.config.blob_path(&hash_hex, cmd_hint);
75
76                    // Ensure subdirectory exists
77                    if let Some(parent) = blob_path.parent() {
78                        fs::create_dir_all(parent)?;
79                    }
80
81                    // Compute relative path for storage_ref
82                    let rel_path = blob_path
83                        .strip_prefix(self.config.data_dir())
84                        .map(|p| p.to_string_lossy().to_string())
85                        .unwrap_or_else(|_| blob_path.to_string_lossy().to_string());
86
87                    // Atomic write: temp file + rename (handles concurrent writes)
88                    let wrote_new = atomic::write_file(&blob_path, content)?;
89
90                    if wrote_new {
91                        // We wrote the file - register in blob_registry
92                        conn.execute(
93                            "INSERT INTO blob_registry (content_hash, byte_length, storage_path) VALUES (?, ?, ?)",
94                            params![&hash_hex, content.len() as i64, &rel_path],
95                        )?;
96                    } else {
97                        // Another process wrote this blob concurrently - increment ref_count
98                        conn.execute(
99                            "UPDATE blob_registry SET ref_count = ref_count + 1, last_accessed = CURRENT_TIMESTAMP WHERE content_hash = ?",
100                            params![&hash_hex],
101                        )?;
102                    }
103
104                    rel_path
105                }
106            };
107
108            ("blob".to_string(), format!("file://{}", storage_path))
109        };
110
111        // Create and write the output record
112        let record = OutputRecord {
113            id: uuid::Uuid::now_v7(),
114            invocation_id,
115            stream: stream.to_string(),
116            content_hash: hash_hex,
117            byte_length: content.len(),
118            storage_type,
119            storage_ref,
120            content_type: None,
121            date,
122        };
123
124        self.write_output(&record)
125    }
126
127    /// Write an output record to the store (low-level).
128    ///
129    /// Behavior depends on storage mode:
130    /// - Parquet: Creates a new Parquet file in the appropriate date partition
131    /// - DuckDB: Inserts directly into the local.outputs
132    pub fn write_output(&self, record: &OutputRecord) -> Result<()> {
133        match self.config.storage_mode {
134            StorageMode::Parquet => self.write_output_parquet(record),
135            StorageMode::DuckDB => self.write_output_duckdb(record),
136        }
137    }
138
139    /// Write output to a Parquet file (multi-writer safe).
140    fn write_output_parquet(&self, record: &OutputRecord) -> Result<()> {
141        let conn = self.connection_with_options(false)?;
142
143        // Ensure the partition directory exists
144        let partition_dir = self.config.outputs_dir(&record.date);
145        fs::create_dir_all(&partition_dir)?;
146
147        // Generate filename: {invocation_id}--{stream}--{id}.parquet
148        let filename = format!(
149            "{}--{}--{}.parquet",
150            record.invocation_id,
151            sanitize_filename(&record.stream),
152            record.id
153        );
154        let file_path = partition_dir.join(&filename);
155
156        // Write via DuckDB
157        conn.execute_batch(
158            r#"
159            CREATE OR REPLACE TEMP TABLE temp_output (
160                id UUID,
161                invocation_id UUID,
162                stream VARCHAR,
163                content_hash VARCHAR,
164                byte_length BIGINT,
165                storage_type VARCHAR,
166                storage_ref VARCHAR,
167                content_type VARCHAR,
168                date DATE
169            );
170            "#,
171        )?;
172
173        conn.execute(
174            r#"
175            INSERT INTO temp_output VALUES (
176                ?, ?, ?, ?, ?, ?, ?, ?, ?
177            )
178            "#,
179            params![
180                record.id.to_string(),
181                record.invocation_id.to_string(),
182                record.stream,
183                record.content_hash,
184                record.byte_length as i64,
185                record.storage_type,
186                record.storage_ref,
187                record.content_type,
188                record.date.to_string(),
189            ],
190        )?;
191
192        // Atomic write: COPY to temp file, then rename
193        let temp_path = atomic::temp_path(&file_path);
194        conn.execute(
195            &format!(
196                "COPY temp_output TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
197                temp_path.display()
198            ),
199            [],
200        )?;
201        conn.execute("DROP TABLE temp_output", [])?;
202
203        // Rename temp to final (atomic on POSIX)
204        atomic::rename_into_place(&temp_path, &file_path)?;
205
206        Ok(())
207    }
208
209    /// Write output directly to DuckDB table.
210    fn write_output_duckdb(&self, record: &OutputRecord) -> Result<()> {
211        let conn = self.connection()?;
212
213        conn.execute(
214            r#"
215            INSERT INTO local.outputs VALUES (
216                ?, ?, ?, ?, ?, ?, ?, ?, ?
217            )
218            "#,
219            params![
220                record.id.to_string(),
221                record.invocation_id.to_string(),
222                record.stream,
223                record.content_hash,
224                record.byte_length as i64,
225                record.storage_type,
226                record.storage_ref,
227                record.content_type,
228                record.date.to_string(),
229            ],
230        )?;
231
232        Ok(())
233    }
234
235    /// Get outputs for an invocation by ID, optionally filtered by stream.
236    pub fn get_outputs(
237        &self,
238        invocation_id: &str,
239        stream_filter: Option<&str>,
240    ) -> Result<Vec<OutputInfo>> {
241        let conn = self.connection()?;
242
243        let sql = match stream_filter {
244            Some(stream) => format!(
245                r#"
246                SELECT storage_type, storage_ref, stream, byte_length, content_hash
247                FROM outputs
248                WHERE invocation_id = '{}' AND stream = '{}'
249                ORDER BY stream
250                "#,
251                invocation_id, stream
252            ),
253            None => format!(
254                r#"
255                SELECT storage_type, storage_ref, stream, byte_length, content_hash
256                FROM outputs
257                WHERE invocation_id = '{}'
258                ORDER BY stream
259                "#,
260                invocation_id
261            ),
262        };
263
264        let mut stmt = match conn.prepare(&sql) {
265            Ok(stmt) => stmt,
266            Err(e) => {
267                if e.to_string().contains("No files found") {
268                    return Ok(Vec::new());
269                }
270                return Err(e.into());
271            }
272        };
273
274        let rows = stmt.query_map([], |row| {
275            Ok(OutputInfo {
276                storage_type: row.get(0)?,
277                storage_ref: row.get(1)?,
278                stream: row.get(2)?,
279                byte_length: row.get(3)?,
280                content_hash: row.get(4)?,
281            })
282        });
283
284        match rows {
285            Ok(rows) => {
286                let mut results = Vec::new();
287                for row in rows {
288                    results.push(row?);
289                }
290                Ok(results)
291            }
292            Err(e) => {
293                if e.to_string().contains("No files found") {
294                    Ok(Vec::new())
295                } else {
296                    Err(e.into())
297                }
298            }
299        }
300    }
301
302    /// Get output for an invocation by ID (first match, for backwards compat).
303    pub fn get_output(&self, invocation_id: &str) -> Result<Option<OutputInfo>> {
304        let outputs = self.get_outputs(invocation_id, None)?;
305        Ok(outputs.into_iter().next())
306    }
307
308    /// Read content from storage using DuckDB's read_blob (handles both data: and file:// URLs).
309    pub fn read_output_content(&self, output: &OutputInfo) -> Result<Vec<u8>> {
310        let conn = self.connection()?;
311
312        // Resolve the storage_ref to an absolute path for file:// URLs
313        let resolved_ref = if output.storage_ref.starts_with("file://") {
314            let rel_path = output.storage_ref.strip_prefix("file://").unwrap();
315            let abs_path = self.config.data_dir().join(rel_path);
316            format!("file://{}", abs_path.display())
317        } else {
318            output.storage_ref.clone()
319        };
320
321        let content: Vec<u8> = conn
322            .query_row(
323                "SELECT content FROM read_blob(?)",
324                params![&resolved_ref],
325                |row| row.get(0),
326            )
327            .map_err(|e| Error::Storage(format!("Failed to read blob: {}", e)))?;
328
329        Ok(content)
330    }
331}
332
333impl OutputInfo {
334    /// Read the content from storage (inline or blob).
335    /// Prefer Store::read_output_content() which uses DuckDB for unified access.
336    #[deprecated(note = "Use Store::read_output_content() instead for DuckDB-based reads")]
337    pub fn read_content(&self, config: &Config) -> Result<Vec<u8>> {
338        use base64::Engine;
339
340        match self.storage_type.as_str() {
341            "inline" => {
342                // Parse data: URL and decode base64
343                if let Some(b64_part) = self.storage_ref.split(',').nth(1) {
344                    base64::engine::general_purpose::STANDARD
345                        .decode(b64_part)
346                        .map_err(|e| Error::Storage(format!("Failed to decode base64: {}", e)))
347                } else {
348                    Err(Error::Storage("Invalid data: URL format".to_string()))
349                }
350            }
351            "blob" => {
352                // Read raw file
353                let rel_path = self
354                    .storage_ref
355                    .strip_prefix("file://")
356                    .ok_or_else(|| Error::Storage("Invalid file:// URL".to_string()))?;
357
358                let full_path = config.data_dir().join(rel_path);
359                fs::read(&full_path).map_err(|e| {
360                    Error::Storage(format!("Failed to read blob {}: {}", full_path.display(), e))
361                })
362            }
363            other => Err(Error::Storage(format!("Unknown storage type: {}", other))),
364        }
365    }
366}
367
368#[cfg(test)]
369mod tests {
370    use super::*;
371    use crate::init::initialize;
372    use crate::schema::InvocationRecord;
373    use crate::Config;
374    use duckdb::params;
375    use tempfile::TempDir;
376
377    fn setup_store() -> (TempDir, Store) {
378        let tmp = TempDir::new().unwrap();
379        let config = Config::with_root(tmp.path());
380        initialize(&config).unwrap();
381        let store = Store::open(config).unwrap();
382        (tmp, store)
383    }
384
385    #[test]
386    fn test_write_and_get_output() {
387        let (_tmp, store) = setup_store();
388
389        // Create an invocation first
390        let inv = InvocationRecord::new(
391            "test-session",
392            "echo hello",
393            "/home/user",
394            0,
395            "test@client",
396        );
397        let inv_id = inv.id;
398        let date = inv.date();
399        store.write_invocation(&inv).unwrap();
400
401        // Write stdout output
402        let content = b"hello world\n";
403        let output = OutputRecord::new_inline(inv_id, "stdout", content, date);
404        store.write_output(&output).unwrap();
405
406        // Retrieve it
407        let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
408        assert_eq!(outputs.len(), 1);
409        assert_eq!(outputs[0].stream, "stdout");
410        assert_eq!(outputs[0].byte_length, 12);
411        assert_eq!(outputs[0].storage_type, "inline");
412    }
413
414    #[test]
415    fn test_write_separate_streams() {
416        let (_tmp, store) = setup_store();
417
418        let inv = InvocationRecord::new(
419            "test-session",
420            "compile",
421            "/home/user",
422            1,
423            "test@client",
424        );
425        let inv_id = inv.id;
426        let date = inv.date();
427        store.write_invocation(&inv).unwrap();
428
429        // Write stdout
430        let stdout_content = b"Building...\nDone.\n";
431        let stdout_output = OutputRecord::new_inline(inv_id, "stdout", stdout_content, date);
432        store.write_output(&stdout_output).unwrap();
433
434        // Write stderr
435        let stderr_content = b"warning: unused variable\n";
436        let stderr_output = OutputRecord::new_inline(inv_id, "stderr", stderr_content, date);
437        store.write_output(&stderr_output).unwrap();
438
439        // Get all outputs
440        let all_outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
441        assert_eq!(all_outputs.len(), 2);
442
443        // Get only stdout
444        let stdout_only = store
445            .get_outputs(&inv_id.to_string(), Some("stdout"))
446            .unwrap();
447        assert_eq!(stdout_only.len(), 1);
448        assert_eq!(stdout_only[0].stream, "stdout");
449        assert_eq!(stdout_only[0].byte_length, 18);
450
451        // Get only stderr
452        let stderr_only = store
453            .get_outputs(&inv_id.to_string(), Some("stderr"))
454            .unwrap();
455        assert_eq!(stderr_only.len(), 1);
456        assert_eq!(stderr_only[0].stream, "stderr");
457        assert_eq!(stderr_only[0].byte_length, 25);
458    }
459
460    #[test]
461    fn test_get_outputs_nonexistent() {
462        let (_tmp, store) = setup_store();
463
464        let outputs = store.get_outputs("nonexistent-id", None).unwrap();
465        assert!(outputs.is_empty());
466    }
467
468    #[test]
469    fn test_output_content_hash() {
470        let (_tmp, store) = setup_store();
471
472        let inv = InvocationRecord::new(
473            "test-session",
474            "test",
475            "/home/user",
476            0,
477            "test@client",
478        );
479        let inv_id = inv.id;
480        let date = inv.date();
481        store.write_invocation(&inv).unwrap();
482
483        let content = b"test content";
484        let output = OutputRecord::new_inline(inv_id, "stdout", content, date);
485        let expected_hash = output.content_hash.clone();
486        store.write_output(&output).unwrap();
487
488        // Verify hash is stored and retrievable
489        let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
490        assert_eq!(outputs[0].content_hash, expected_hash);
491        assert!(!outputs[0].content_hash.is_empty());
492    }
493
494    #[test]
495    fn test_output_decode_inline() {
496        let content = b"hello world";
497        let inv_id = uuid::Uuid::now_v7();
498        let date = chrono::Utc::now().date_naive();
499
500        let output = OutputRecord::new_inline(inv_id, "stdout", content, date);
501
502        // Verify we can decode the content back
503        let decoded = output.decode_content().expect("should decode");
504        assert_eq!(decoded, content);
505    }
506
507    #[test]
508    fn test_store_output_inline_small_content() {
509        let (_tmp, store) = setup_store();
510
511        let inv = InvocationRecord::new(
512            "test-session",
513            "echo hello",
514            "/home/user",
515            0,
516            "test@client",
517        );
518        let inv_id = inv.id;
519        let date = inv.date();
520        store.write_invocation(&inv).unwrap();
521
522        // Small content should be stored inline (under 4KB threshold)
523        let content = b"hello world\n";
524        store
525            .store_output(inv_id, "stdout", content, date, Some("echo"))
526            .unwrap();
527
528        let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
529        assert_eq!(outputs.len(), 1);
530        assert_eq!(outputs[0].storage_type, "inline");
531        assert!(outputs[0].storage_ref.starts_with("data:"));
532
533        // Verify content can be read back via DuckDB
534        let read_back = store.read_output_content(&outputs[0]).unwrap();
535        assert_eq!(read_back, content);
536    }
537
538    #[test]
539    fn test_store_output_blob_large_content() {
540        let (_tmp, store) = setup_store();
541
542        let inv = InvocationRecord::new(
543            "test-session",
544            "cat bigfile",
545            "/home/user",
546            0,
547            "test@client",
548        );
549        let inv_id = inv.id;
550        let date = inv.date();
551        store.write_invocation(&inv).unwrap();
552
553        // Large content should be stored as blob (over 4KB threshold)
554        let content: Vec<u8> = (0..5000).map(|i| (i % 256) as u8).collect();
555        store
556            .store_output(inv_id, "stdout", &content, date, Some("cat"))
557            .unwrap();
558
559        let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
560        assert_eq!(outputs.len(), 1);
561        assert_eq!(outputs[0].storage_type, "blob");
562        assert!(outputs[0].storage_ref.starts_with("file://"));
563
564        // Verify content can be read back via DuckDB
565        let read_back = store.read_output_content(&outputs[0]).unwrap();
566        assert_eq!(read_back, content);
567    }
568
569    #[test]
570    fn test_store_output_blob_deduplication() {
571        let (_tmp, store) = setup_store();
572
573        // Create two invocations
574        let inv1 = InvocationRecord::new(
575            "test-session",
576            "cat file",
577            "/home/user",
578            0,
579            "test@client",
580        );
581        let inv1_id = inv1.id;
582        let date1 = inv1.date();
583        store.write_invocation(&inv1).unwrap();
584
585        let inv2 = InvocationRecord::new(
586            "test-session",
587            "cat file",
588            "/home/user",
589            0,
590            "test@client",
591        );
592        let inv2_id = inv2.id;
593        let date2 = inv2.date();
594        store.write_invocation(&inv2).unwrap();
595
596        // Same large content for both
597        let content: Vec<u8> = (0..5000).map(|i| (i % 256) as u8).collect();
598
599        store
600            .store_output(inv1_id, "stdout", &content, date1, Some("cat"))
601            .unwrap();
602        store
603            .store_output(inv2_id, "stdout", &content, date2, Some("cat"))
604            .unwrap();
605
606        // Both should point to same blob (same content_hash)
607        let outputs1 = store.get_outputs(&inv1_id.to_string(), None).unwrap();
608        let outputs2 = store.get_outputs(&inv2_id.to_string(), None).unwrap();
609
610        assert_eq!(outputs1[0].content_hash, outputs2[0].content_hash);
611        assert_eq!(outputs1[0].storage_ref, outputs2[0].storage_ref);
612
613        // Verify ref_count is 2
614        let conn = store.connection().unwrap();
615        let ref_count: i32 = conn
616            .query_row(
617                "SELECT ref_count FROM blob_registry WHERE content_hash = ?",
618                params![&outputs1[0].content_hash],
619                |row| row.get(0),
620            )
621            .unwrap();
622        assert_eq!(ref_count, 2);
623
624        // Both should read back correctly via DuckDB
625        assert_eq!(store.read_output_content(&outputs1[0]).unwrap(), content);
626        assert_eq!(store.read_output_content(&outputs2[0]).unwrap(), content);
627    }
628
629    #[test]
630    fn test_store_output_blob_file_created() {
631        let (_tmp, store) = setup_store();
632
633        let inv = InvocationRecord::new(
634            "test-session",
635            "generate",
636            "/home/user",
637            0,
638            "test@client",
639        );
640        let inv_id = inv.id;
641        let date = inv.date();
642        store.write_invocation(&inv).unwrap();
643
644        let content: Vec<u8> = (0..5000).map(|i| (i % 256) as u8).collect();
645        store
646            .store_output(inv_id, "stdout", &content, date, Some("generate"))
647            .unwrap();
648
649        // Verify blob file was created
650        let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
651        let rel_path = outputs[0].storage_ref.strip_prefix("file://").unwrap();
652        let full_path = store.config().data_dir().join(rel_path);
653        assert!(
654            full_path.exists(),
655            "Blob file should exist at {:?}",
656            full_path
657        );
658        assert!(full_path.to_string_lossy().ends_with(".bin"));
659    }
660}