1use std::fs;
4
5use duckdb::params;
6
7use super::atomic;
8use super::{sanitize_filename, Store};
9use crate::config::StorageMode;
10use crate::schema::OutputRecord;
11use crate::{Config, Error, Result};
12
13#[derive(Debug)]
15pub struct OutputInfo {
16 pub storage_type: String,
17 pub storage_ref: String,
18 pub stream: String,
19 pub byte_length: i64,
20 pub content_hash: String,
21}
22
23impl Store {
24 pub fn store_output(
32 &self,
33 invocation_id: uuid::Uuid,
34 stream: &str,
35 content: &[u8],
36 date: chrono::NaiveDate,
37 cmd_hint: Option<&str>,
38 ) -> Result<()> {
39 use base64::Engine;
40
41 let hash = blake3::hash(content);
43 let hash_hex = hash.to_hex().to_string();
44
45 let (storage_type, storage_ref) = if content.len() < self.config.inline_threshold {
47 let b64 = base64::engine::general_purpose::STANDARD.encode(content);
49 let data_url = format!("data:application/octet-stream;base64,{}", b64);
50 ("inline".to_string(), data_url)
51 } else {
52 let conn = self.connection()?;
54
55 let existing: std::result::Result<String, _> = conn.query_row(
57 "SELECT storage_path FROM blob_registry WHERE content_hash = ?",
58 params![&hash_hex],
59 |row| row.get(0),
60 );
61
62 let storage_path = match existing {
63 Ok(path) => {
64 conn.execute(
66 "UPDATE blob_registry SET ref_count = ref_count + 1, last_accessed = CURRENT_TIMESTAMP WHERE content_hash = ?",
67 params![&hash_hex],
68 )?;
69 path
70 }
71 Err(_) => {
72 let cmd_hint = cmd_hint.unwrap_or("output");
74 let blob_path = self.config.blob_path(&hash_hex, cmd_hint);
75
76 if let Some(parent) = blob_path.parent() {
78 fs::create_dir_all(parent)?;
79 }
80
81 let rel_path = blob_path
83 .strip_prefix(&self.config.data_dir())
84 .map(|p| p.to_string_lossy().to_string())
85 .unwrap_or_else(|_| blob_path.to_string_lossy().to_string());
86
87 let wrote_new = atomic::write_file(&blob_path, content)?;
89
90 if wrote_new {
91 conn.execute(
93 "INSERT INTO blob_registry (content_hash, byte_length, storage_path) VALUES (?, ?, ?)",
94 params![&hash_hex, content.len() as i64, &rel_path],
95 )?;
96 } else {
97 conn.execute(
99 "UPDATE blob_registry SET ref_count = ref_count + 1, last_accessed = CURRENT_TIMESTAMP WHERE content_hash = ?",
100 params![&hash_hex],
101 )?;
102 }
103
104 rel_path
105 }
106 };
107
108 ("blob".to_string(), format!("file://{}", storage_path))
109 };
110
111 let record = OutputRecord {
113 id: uuid::Uuid::now_v7(),
114 invocation_id,
115 stream: stream.to_string(),
116 content_hash: hash_hex,
117 byte_length: content.len(),
118 storage_type,
119 storage_ref,
120 content_type: None,
121 date,
122 };
123
124 self.write_output(&record)
125 }
126
127 pub fn write_output(&self, record: &OutputRecord) -> Result<()> {
133 match self.config.storage_mode {
134 StorageMode::Parquet => self.write_output_parquet(record),
135 StorageMode::DuckDB => self.write_output_duckdb(record),
136 }
137 }
138
139 fn write_output_parquet(&self, record: &OutputRecord) -> Result<()> {
141 let conn = self.connection()?;
142
143 let partition_dir = self.config.outputs_dir(&record.date);
145 fs::create_dir_all(&partition_dir)?;
146
147 let filename = format!(
149 "{}--{}--{}.parquet",
150 record.invocation_id,
151 sanitize_filename(&record.stream),
152 record.id
153 );
154 let file_path = partition_dir.join(&filename);
155
156 conn.execute_batch(
158 r#"
159 CREATE OR REPLACE TEMP TABLE temp_output (
160 id UUID,
161 invocation_id UUID,
162 stream VARCHAR,
163 content_hash VARCHAR,
164 byte_length BIGINT,
165 storage_type VARCHAR,
166 storage_ref VARCHAR,
167 content_type VARCHAR,
168 date DATE
169 );
170 "#,
171 )?;
172
173 conn.execute(
174 r#"
175 INSERT INTO temp_output VALUES (
176 ?, ?, ?, ?, ?, ?, ?, ?, ?
177 )
178 "#,
179 params![
180 record.id.to_string(),
181 record.invocation_id.to_string(),
182 record.stream,
183 record.content_hash,
184 record.byte_length as i64,
185 record.storage_type,
186 record.storage_ref,
187 record.content_type,
188 record.date.to_string(),
189 ],
190 )?;
191
192 let temp_path = atomic::temp_path(&file_path);
194 conn.execute(
195 &format!(
196 "COPY temp_output TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
197 temp_path.display()
198 ),
199 [],
200 )?;
201 conn.execute("DROP TABLE temp_output", [])?;
202
203 atomic::rename_into_place(&temp_path, &file_path)?;
205
206 Ok(())
207 }
208
209 fn write_output_duckdb(&self, record: &OutputRecord) -> Result<()> {
211 let conn = self.connection()?;
212
213 conn.execute(
214 r#"
215 INSERT INTO local.outputs VALUES (
216 ?, ?, ?, ?, ?, ?, ?, ?, ?
217 )
218 "#,
219 params![
220 record.id.to_string(),
221 record.invocation_id.to_string(),
222 record.stream,
223 record.content_hash,
224 record.byte_length as i64,
225 record.storage_type,
226 record.storage_ref,
227 record.content_type,
228 record.date.to_string(),
229 ],
230 )?;
231
232 Ok(())
233 }
234
235 pub fn get_outputs(
237 &self,
238 invocation_id: &str,
239 stream_filter: Option<&str>,
240 ) -> Result<Vec<OutputInfo>> {
241 let conn = self.connection()?;
242
243 let sql = match stream_filter {
244 Some(stream) => format!(
245 r#"
246 SELECT storage_type, storage_ref, stream, byte_length, content_hash
247 FROM outputs
248 WHERE invocation_id = '{}' AND stream = '{}'
249 ORDER BY stream
250 "#,
251 invocation_id, stream
252 ),
253 None => format!(
254 r#"
255 SELECT storage_type, storage_ref, stream, byte_length, content_hash
256 FROM outputs
257 WHERE invocation_id = '{}'
258 ORDER BY stream
259 "#,
260 invocation_id
261 ),
262 };
263
264 let mut stmt = match conn.prepare(&sql) {
265 Ok(stmt) => stmt,
266 Err(e) => {
267 if e.to_string().contains("No files found") {
268 return Ok(Vec::new());
269 }
270 return Err(e.into());
271 }
272 };
273
274 let rows = stmt.query_map([], |row| {
275 Ok(OutputInfo {
276 storage_type: row.get(0)?,
277 storage_ref: row.get(1)?,
278 stream: row.get(2)?,
279 byte_length: row.get(3)?,
280 content_hash: row.get(4)?,
281 })
282 });
283
284 match rows {
285 Ok(rows) => {
286 let mut results = Vec::new();
287 for row in rows {
288 results.push(row?);
289 }
290 Ok(results)
291 }
292 Err(e) => {
293 if e.to_string().contains("No files found") {
294 Ok(Vec::new())
295 } else {
296 Err(e.into())
297 }
298 }
299 }
300 }
301
302 pub fn get_output(&self, invocation_id: &str) -> Result<Option<OutputInfo>> {
304 let outputs = self.get_outputs(invocation_id, None)?;
305 Ok(outputs.into_iter().next())
306 }
307
308 pub fn read_output_content(&self, output: &OutputInfo) -> Result<Vec<u8>> {
310 let conn = self.connection()?;
311
312 let resolved_ref = if output.storage_ref.starts_with("file://") {
314 let rel_path = output.storage_ref.strip_prefix("file://").unwrap();
315 let abs_path = self.config.data_dir().join(rel_path);
316 format!("file://{}", abs_path.display())
317 } else {
318 output.storage_ref.clone()
319 };
320
321 let content: Vec<u8> = conn
322 .query_row(
323 "SELECT content FROM read_blob(?)",
324 params![&resolved_ref],
325 |row| row.get(0),
326 )
327 .map_err(|e| Error::Storage(format!("Failed to read blob: {}", e)))?;
328
329 Ok(content)
330 }
331}
332
333impl OutputInfo {
334 #[deprecated(note = "Use Store::read_output_content() instead for DuckDB-based reads")]
337 pub fn read_content(&self, config: &Config) -> Result<Vec<u8>> {
338 use base64::Engine;
339
340 match self.storage_type.as_str() {
341 "inline" => {
342 if let Some(b64_part) = self.storage_ref.split(',').nth(1) {
344 base64::engine::general_purpose::STANDARD
345 .decode(b64_part)
346 .map_err(|e| Error::Storage(format!("Failed to decode base64: {}", e)))
347 } else {
348 Err(Error::Storage("Invalid data: URL format".to_string()))
349 }
350 }
351 "blob" => {
352 let rel_path = self
354 .storage_ref
355 .strip_prefix("file://")
356 .ok_or_else(|| Error::Storage("Invalid file:// URL".to_string()))?;
357
358 let full_path = config.data_dir().join(rel_path);
359 fs::read(&full_path).map_err(|e| {
360 Error::Storage(format!("Failed to read blob {}: {}", full_path.display(), e))
361 })
362 }
363 other => Err(Error::Storage(format!("Unknown storage type: {}", other))),
364 }
365 }
366}
367
368#[cfg(test)]
369mod tests {
370 use super::*;
371 use crate::init::initialize;
372 use crate::schema::InvocationRecord;
373 use crate::Config;
374 use duckdb::params;
375 use tempfile::TempDir;
376
377 fn setup_store() -> (TempDir, Store) {
378 let tmp = TempDir::new().unwrap();
379 let config = Config::with_root(tmp.path());
380 initialize(&config).unwrap();
381 let store = Store::open(config).unwrap();
382 (tmp, store)
383 }
384
385 #[test]
386 fn test_write_and_get_output() {
387 let (_tmp, store) = setup_store();
388
389 let inv = InvocationRecord::new(
391 "test-session",
392 "echo hello",
393 "/home/user",
394 0,
395 "test@client",
396 );
397 let inv_id = inv.id;
398 let date = inv.date();
399 store.write_invocation(&inv).unwrap();
400
401 let content = b"hello world\n";
403 let output = OutputRecord::new_inline(inv_id, "stdout", content, date);
404 store.write_output(&output).unwrap();
405
406 let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
408 assert_eq!(outputs.len(), 1);
409 assert_eq!(outputs[0].stream, "stdout");
410 assert_eq!(outputs[0].byte_length, 12);
411 assert_eq!(outputs[0].storage_type, "inline");
412 }
413
414 #[test]
415 fn test_write_separate_streams() {
416 let (_tmp, store) = setup_store();
417
418 let inv = InvocationRecord::new(
419 "test-session",
420 "compile",
421 "/home/user",
422 1,
423 "test@client",
424 );
425 let inv_id = inv.id;
426 let date = inv.date();
427 store.write_invocation(&inv).unwrap();
428
429 let stdout_content = b"Building...\nDone.\n";
431 let stdout_output = OutputRecord::new_inline(inv_id, "stdout", stdout_content, date);
432 store.write_output(&stdout_output).unwrap();
433
434 let stderr_content = b"warning: unused variable\n";
436 let stderr_output = OutputRecord::new_inline(inv_id, "stderr", stderr_content, date);
437 store.write_output(&stderr_output).unwrap();
438
439 let all_outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
441 assert_eq!(all_outputs.len(), 2);
442
443 let stdout_only = store
445 .get_outputs(&inv_id.to_string(), Some("stdout"))
446 .unwrap();
447 assert_eq!(stdout_only.len(), 1);
448 assert_eq!(stdout_only[0].stream, "stdout");
449 assert_eq!(stdout_only[0].byte_length, 18);
450
451 let stderr_only = store
453 .get_outputs(&inv_id.to_string(), Some("stderr"))
454 .unwrap();
455 assert_eq!(stderr_only.len(), 1);
456 assert_eq!(stderr_only[0].stream, "stderr");
457 assert_eq!(stderr_only[0].byte_length, 25);
458 }
459
460 #[test]
461 fn test_get_outputs_nonexistent() {
462 let (_tmp, store) = setup_store();
463
464 let outputs = store.get_outputs("nonexistent-id", None).unwrap();
465 assert!(outputs.is_empty());
466 }
467
468 #[test]
469 fn test_output_content_hash() {
470 let (_tmp, store) = setup_store();
471
472 let inv = InvocationRecord::new(
473 "test-session",
474 "test",
475 "/home/user",
476 0,
477 "test@client",
478 );
479 let inv_id = inv.id;
480 let date = inv.date();
481 store.write_invocation(&inv).unwrap();
482
483 let content = b"test content";
484 let output = OutputRecord::new_inline(inv_id, "stdout", content, date);
485 let expected_hash = output.content_hash.clone();
486 store.write_output(&output).unwrap();
487
488 let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
490 assert_eq!(outputs[0].content_hash, expected_hash);
491 assert!(!outputs[0].content_hash.is_empty());
492 }
493
494 #[test]
495 fn test_output_decode_inline() {
496 let content = b"hello world";
497 let inv_id = uuid::Uuid::now_v7();
498 let date = chrono::Utc::now().date_naive();
499
500 let output = OutputRecord::new_inline(inv_id, "stdout", content, date);
501
502 let decoded = output.decode_content().expect("should decode");
504 assert_eq!(decoded, content);
505 }
506
507 #[test]
508 fn test_store_output_inline_small_content() {
509 let (_tmp, store) = setup_store();
510
511 let inv = InvocationRecord::new(
512 "test-session",
513 "echo hello",
514 "/home/user",
515 0,
516 "test@client",
517 );
518 let inv_id = inv.id;
519 let date = inv.date();
520 store.write_invocation(&inv).unwrap();
521
522 let content = b"hello world\n";
524 store
525 .store_output(inv_id, "stdout", content, date, Some("echo"))
526 .unwrap();
527
528 let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
529 assert_eq!(outputs.len(), 1);
530 assert_eq!(outputs[0].storage_type, "inline");
531 assert!(outputs[0].storage_ref.starts_with("data:"));
532
533 let read_back = store.read_output_content(&outputs[0]).unwrap();
535 assert_eq!(read_back, content);
536 }
537
538 #[test]
539 fn test_store_output_blob_large_content() {
540 let (_tmp, store) = setup_store();
541
542 let inv = InvocationRecord::new(
543 "test-session",
544 "cat bigfile",
545 "/home/user",
546 0,
547 "test@client",
548 );
549 let inv_id = inv.id;
550 let date = inv.date();
551 store.write_invocation(&inv).unwrap();
552
553 let content: Vec<u8> = (0..5000).map(|i| (i % 256) as u8).collect();
555 store
556 .store_output(inv_id, "stdout", &content, date, Some("cat"))
557 .unwrap();
558
559 let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
560 assert_eq!(outputs.len(), 1);
561 assert_eq!(outputs[0].storage_type, "blob");
562 assert!(outputs[0].storage_ref.starts_with("file://"));
563
564 let read_back = store.read_output_content(&outputs[0]).unwrap();
566 assert_eq!(read_back, content);
567 }
568
569 #[test]
570 fn test_store_output_blob_deduplication() {
571 let (_tmp, store) = setup_store();
572
573 let inv1 = InvocationRecord::new(
575 "test-session",
576 "cat file",
577 "/home/user",
578 0,
579 "test@client",
580 );
581 let inv1_id = inv1.id;
582 let date1 = inv1.date();
583 store.write_invocation(&inv1).unwrap();
584
585 let inv2 = InvocationRecord::new(
586 "test-session",
587 "cat file",
588 "/home/user",
589 0,
590 "test@client",
591 );
592 let inv2_id = inv2.id;
593 let date2 = inv2.date();
594 store.write_invocation(&inv2).unwrap();
595
596 let content: Vec<u8> = (0..5000).map(|i| (i % 256) as u8).collect();
598
599 store
600 .store_output(inv1_id, "stdout", &content, date1, Some("cat"))
601 .unwrap();
602 store
603 .store_output(inv2_id, "stdout", &content, date2, Some("cat"))
604 .unwrap();
605
606 let outputs1 = store.get_outputs(&inv1_id.to_string(), None).unwrap();
608 let outputs2 = store.get_outputs(&inv2_id.to_string(), None).unwrap();
609
610 assert_eq!(outputs1[0].content_hash, outputs2[0].content_hash);
611 assert_eq!(outputs1[0].storage_ref, outputs2[0].storage_ref);
612
613 let conn = store.connection().unwrap();
615 let ref_count: i32 = conn
616 .query_row(
617 "SELECT ref_count FROM blob_registry WHERE content_hash = ?",
618 params![&outputs1[0].content_hash],
619 |row| row.get(0),
620 )
621 .unwrap();
622 assert_eq!(ref_count, 2);
623
624 assert_eq!(store.read_output_content(&outputs1[0]).unwrap(), content);
626 assert_eq!(store.read_output_content(&outputs2[0]).unwrap(), content);
627 }
628
629 #[test]
630 fn test_store_output_blob_file_created() {
631 let (_tmp, store) = setup_store();
632
633 let inv = InvocationRecord::new(
634 "test-session",
635 "generate",
636 "/home/user",
637 0,
638 "test@client",
639 );
640 let inv_id = inv.id;
641 let date = inv.date();
642 store.write_invocation(&inv).unwrap();
643
644 let content: Vec<u8> = (0..5000).map(|i| (i % 256) as u8).collect();
645 store
646 .store_output(inv_id, "stdout", &content, date, Some("generate"))
647 .unwrap();
648
649 let outputs = store.get_outputs(&inv_id.to_string(), None).unwrap();
651 let rel_path = outputs[0].storage_ref.strip_prefix("file://").unwrap();
652 let full_path = store.config().data_dir().join(rel_path);
653 assert!(
654 full_path.exists(),
655 "Blob file should exist at {:?}",
656 full_path
657 );
658 assert!(full_path.to_string_lossy().ends_with(".bin"));
659 }
660}