Skip to main content

nika_media/
processor.rs

1//! MediaProcessor: ContentBlock -> decode -> detect -> hash -> store -> MediaRef
2//!
3//! Fully async — CasStore::store() uses io::atomic internally.
4//! Rejects base64 input exceeding MAX_BASE64_INPUT_BYTES before decoding.
5//! Rejects empty decoded data.
6//! Enforces per-run media budget via MediaBudget.
7
8use std::sync::Arc;
9
10use nika_mcp::ContentBlock;
11
12use super::detect::detect_mime;
13use super::error::MediaError;
14use super::store::{CasStore, StoreResult};
15use super::types::{MediaBudget, MediaRef};
16
17/// Maximum base64 input size (100 MB encoded, ~75 MB decoded).
18const MAX_BASE64_INPUT_BYTES: usize = 100 * 1024 * 1024;
19
20/// Processes MCP content blocks into stored media files.
21pub struct MediaProcessor {
22    store: CasStore,
23    budget: Arc<MediaBudget>,
24}
25
26impl MediaProcessor {
27    /// Create a new processor with the given CAS store and default budget.
28    #[allow(dead_code)] // Used in tests
29    pub fn new(store: CasStore) -> Self {
30        Self {
31            store,
32            budget: Arc::new(MediaBudget::new()),
33        }
34    }
35
36    /// Create a new processor with custom owned budget.
37    #[allow(dead_code)] // Used in tests
38    pub fn with_budget(store: CasStore, budget: MediaBudget) -> Self {
39        Self {
40            store,
41            budget: Arc::new(budget),
42        }
43    }
44
45    /// Create a new processor with a shared per-run budget.
46    pub fn with_shared_budget(store: CasStore, budget: Arc<MediaBudget>) -> Self {
47        Self { store, budget }
48    }
49
50    /// Process a single content block (async).
51    ///
52    /// Returns `Ok(None)` for text blocks (no media to process).
53    /// Returns `Ok(Some((MediaRef, StoreResult)))` for media blocks.
54    pub async fn process(
55        &self,
56        block: &ContentBlock,
57        task_id: &str,
58    ) -> Result<Option<(MediaRef, StoreResult)>, MediaError> {
59        match block {
60            ContentBlock::Text { .. } => Ok(None),
61
62            ContentBlock::Image { data, mime_type } => {
63                self.process_base64(data, Some(mime_type.as_str()), task_id)
64                    .await
65            }
66
67            ContentBlock::Audio { data, mime_type } => {
68                self.process_base64(data, Some(mime_type.as_str()), task_id)
69                    .await
70            }
71
72            ContentBlock::Resource(rc) => {
73                if let Some(blob) = &rc.blob {
74                    self.process_base64(blob, rc.mime_type.as_deref(), task_id)
75                        .await
76                } else {
77                    Ok(None)
78                }
79            }
80
81            ContentBlock::ResourceLink { uri, .. } => {
82                tracing::debug!(uri = %uri, "ResourceLink skipped (no inline data)");
83                Ok(None)
84            }
85        }
86    }
87
88    /// Process all content blocks, collecting results per block (async).
89    ///
90    /// Skips text blocks silently. The `usize` in Err is the block index.
91    pub async fn process_all(
92        &self,
93        blocks: &[ContentBlock],
94        task_id: &str,
95    ) -> Vec<Result<(MediaRef, StoreResult), (usize, MediaError)>> {
96        let mut results = Vec::new();
97        for (i, block) in blocks.iter().enumerate() {
98            if block.is_text() {
99                continue;
100            }
101            match self.process(block, task_id).await {
102                Ok(Some(pair)) => results.push(Ok(pair)),
103                Ok(None) => {}
104                Err(e) => {
105                    tracing::error!(
106                        task_id = %task_id,
107                        block_index = i,
108                        error = %e,
109                        "Failed to process media block"
110                    );
111                    results.push(Err((i, e)));
112                }
113            }
114        }
115        results
116    }
117
118    /// Decode base64 data, detect MIME, and store in CAS (async).
119    async fn process_base64(
120        &self,
121        base64_data: &str,
122        server_mime: Option<&str>,
123        task_id: &str,
124    ) -> Result<Option<(MediaRef, StoreResult)>, MediaError> {
125        tracing::debug!(
126            task_id = %task_id,
127            base64_len = base64_data.len(),
128            server_mime = ?server_mime,
129            "media: processing base64 block"
130        );
131
132        // Guard: reject oversized base64 BEFORE decode
133        if base64_data.len() > MAX_BASE64_INPUT_BYTES {
134            return Err(MediaError::Base64InputTooLarge {
135                size: base64_data.len(),
136                max: MAX_BASE64_INPUT_BYTES,
137            });
138        }
139
140        // Guard: reject empty data — malformed Image/Audio block
141        if base64_data.is_empty() {
142            return Err(MediaError::EmptyMediaContent {
143                task_id: task_id.to_string(),
144            });
145        }
146
147        // Decode base64 — strip whitespace first since many MCP servers
148        // return PEM-style base64 with \n at 76-char boundaries (OpenAI, etc.)
149        use base64::Engine;
150        let clean_b64: String = base64_data
151            .chars()
152            .filter(|c| !c.is_ascii_whitespace())
153            .collect();
154        let decoded = base64::engine::general_purpose::STANDARD
155            .decode(&clean_b64)
156            .map_err(|e| MediaError::Base64DecodeFailed {
157                source_desc: format!("task {task_id}"),
158                reason: e.to_string(),
159            })?;
160
161        // Guard: reject empty decoded data
162        if decoded.is_empty() {
163            return Err(MediaError::EmptyMediaContent {
164                task_id: task_id.to_string(),
165            });
166        }
167
168        tracing::trace!(
169            task_id = %task_id,
170            decoded_bytes = decoded.len(),
171            budget_used = self.budget.current_bytes(),
172            "media: base64 decoded successfully"
173        );
174
175        // Check media budget before proceeding
176        let decoded_size = decoded.len() as u64;
177        self.budget.check_and_add(decoded_size, task_id)?;
178
179        // Detect MIME type
180        let detected = match detect_mime(&decoded, server_mime) {
181            Ok(d) => d,
182            Err(e) => {
183                self.budget.rollback(decoded_size);
184                return Err(e);
185            }
186        };
187
188        // Store in CAS (hash-only filenames)
189        let store_result = match self.store.store(&decoded).await {
190            Ok(r) => r,
191            Err(e) => {
192                self.budget.rollback(decoded_size);
193                return Err(e);
194            }
195        };
196
197        tracing::debug!(
198            task_id = %task_id,
199            hash = %store_result.hash,
200            mime = %detected.mime_type,
201            size = store_result.size,
202            dedup = store_result.deduplicated,
203            verified = store_result.verified,
204            pipeline_ms = store_result.pipeline_ms,
205            "media: stored in CAS"
206        );
207
208        // Auto-enrich metadata for images
209        let mut metadata = serde_json::Map::new();
210        if detected.mime_type.starts_with("image/") {
211            // Dimensions (fast, header-only via imagesize)
212            if let Ok(size) = imagesize::blob_size(&decoded) {
213                metadata.insert("width".into(), serde_json::json!(size.width));
214                metadata.insert("height".into(), serde_json::json!(size.height));
215            }
216
217            // ThumbHash (skip images > 10MB to avoid excessive CPU)
218            if decoded.len() < 10_000_000 {
219                if let Some(hash) = compute_thumbhash_for_enrichment(&decoded) {
220                    metadata.insert("thumbhash".into(), serde_json::json!(hash));
221                }
222            }
223        }
224
225        let media_ref = MediaRef {
226            hash: store_result.hash.clone(),
227            mime_type: detected.mime_type,
228            size_bytes: store_result.size,
229            path: store_result.path.clone(),
230            extension: detected.extension,
231            created_by: task_id.to_string(),
232            metadata,
233        };
234
235        Ok(Some((media_ref, store_result)))
236    }
237}
238
239/// Compute a thumbhash for auto-enrichment.
240///
241/// Best-effort: returns None if decoding fails.
242/// Uses the image crate when available, otherwise returns a content-based hash.
243fn compute_thumbhash_for_enrichment(data: &[u8]) -> Option<String> {
244    #[cfg(feature = "media-thumbnail")]
245    {
246        // SECURITY: Use image::Limits — never load_from_memory() directly.
247        // Enrichment is best-effort, so errors silently return None.
248        let img = {
249            use image::ImageReader;
250            use std::io::Cursor;
251            let mut reader = ImageReader::new(Cursor::new(data))
252                .with_guessed_format()
253                .ok()?;
254            let mut limits = image::Limits::default();
255            limits.max_alloc = Some(256 * 1024 * 1024); // 256 MB
256            limits.max_image_width = Some(16384);
257            limits.max_image_height = Some(16384);
258            reader.limits(limits);
259            reader.decode().ok()?
260        };
261        let small = img.resize(100, 100, image::imageops::FilterType::Triangle);
262        let rgba = small.to_rgba8();
263        let (w, h) = rgba.dimensions();
264        let hash = thumbhash::rgba_to_thumb_hash(w as usize, h as usize, rgba.as_raw());
265        use base64::Engine;
266        Some(base64::engine::general_purpose::STANDARD.encode(&hash))
267    }
268
269    #[cfg(not(feature = "media-thumbnail"))]
270    {
271        // Without image crate, cannot decode pixels for real thumbhash
272        let _ = data;
273        None
274    }
275}
276
277#[cfg(test)]
278mod tests {
279    use super::*;
280    use base64::Engine;
281    use nika_mcp::ContentBlock;
282
283    fn make_processor_with_dir() -> (MediaProcessor, tempfile::TempDir) {
284        let dir = tempfile::tempdir().unwrap();
285        let store = CasStore::new(dir.path());
286        (MediaProcessor::new(store), dir)
287    }
288
289    /// Encode PNG header as base64 for tests
290    fn png_base64() -> String {
291        let png_data: Vec<u8> = vec![
292            0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A, // PNG signature
293            0x00, 0x00, 0x00, 0x0D, // IHDR chunk length
294            0x49, 0x48, 0x44, 0x52, // IHDR
295            0x00, 0x00, 0x00, 0x01, // width=1
296            0x00, 0x00, 0x00, 0x01, // height=1
297            0x08, 0x02, 0x00, 0x00, 0x00, // bit depth, color type, etc
298        ];
299        base64::engine::general_purpose::STANDARD.encode(&png_data)
300    }
301
302    #[tokio::test]
303    async fn process_text_block_returns_none() {
304        let (processor, _dir) = make_processor_with_dir();
305        let block = ContentBlock::text("hello");
306        let result = processor.process(&block, "t1").await.unwrap();
307        assert!(result.is_none());
308    }
309
310    #[tokio::test]
311    async fn process_image_block_returns_media_ref() {
312        let (processor, _dir) = make_processor_with_dir();
313        let block = ContentBlock::image(png_base64(), "image/png");
314        let result = processor.process(&block, "t1").await.unwrap();
315        assert!(result.is_some());
316        let (media_ref, _store_result) = result.unwrap();
317        assert!(media_ref.hash.starts_with("blake3:"));
318        assert_eq!(media_ref.mime_type, "image/png");
319        assert_eq!(media_ref.extension, "png");
320        assert!(media_ref.size_bytes > 0);
321        assert_eq!(media_ref.created_by, "t1");
322    }
323
324    #[tokio::test]
325    async fn process_invalid_base64_returns_error() {
326        let (processor, _dir) = make_processor_with_dir();
327        let block = ContentBlock::image("not!valid!base64!!!", "image/png");
328        let result = processor.process(&block, "t1").await;
329        assert!(result.is_err());
330        assert_eq!(result.unwrap_err().code(), "NIKA-256");
331    }
332
333    #[tokio::test]
334    async fn process_empty_base64_returns_error() {
335        let (processor, _dir) = make_processor_with_dir();
336        let block = ContentBlock::image("", "image/png");
337        let result = processor.process(&block, "t1").await;
338        assert!(
339            result.is_err(),
340            "Empty base64 image should error (NIKA-258)"
341        );
342        assert_eq!(result.unwrap_err().code(), "NIKA-258");
343    }
344
345    #[tokio::test]
346    async fn process_oversized_base64_returns_error() {
347        let (processor, _dir) = make_processor_with_dir();
348        // Create a string > 100MB
349        let big = "A".repeat(MAX_BASE64_INPUT_BYTES + 1);
350        let block = ContentBlock::image(big, "image/png");
351        let result = processor.process(&block, "t1").await;
352        assert!(result.is_err());
353        assert_eq!(result.unwrap_err().code(), "NIKA-257");
354    }
355
356    #[tokio::test]
357    async fn process_all_mixed_blocks() {
358        let (processor, _dir) = make_processor_with_dir();
359        let blocks = vec![
360            ContentBlock::text("some text"),
361            ContentBlock::image(png_base64(), "image/png"),
362            ContentBlock::text("more text"),
363        ];
364        let results = processor.process_all(&blocks, "t1").await;
365        // Should only have 1 result (the image), text blocks are skipped
366        assert_eq!(results.len(), 1);
367        assert!(results[0].is_ok());
368    }
369
370    #[tokio::test]
371    async fn resource_link_skipped() {
372        let (processor, _dir) = make_processor_with_dir();
373        let block = ContentBlock::resource_link("file:///test", None, None);
374        let result = processor.process(&block, "t1").await.unwrap();
375        assert!(result.is_none());
376    }
377
378    #[tokio::test]
379    async fn budget_enforcement() {
380        let dir = tempfile::tempdir().unwrap();
381        let store = CasStore::new(dir.path());
382        let budget = MediaBudget::with_max_per_run(50); // Tiny budget
383        let processor = MediaProcessor::with_budget(store, budget);
384
385        let block = ContentBlock::image(png_base64(), "image/png");
386        // First process should succeed (PNG header is ~25 bytes)
387        let r1 = processor.process(&block, "t1").await;
388        assert!(r1.is_ok());
389
390        // Second process should fail (budget exceeded)
391        let r2 = processor.process(&block, "t2").await;
392        assert!(r2.is_err());
393        assert_eq!(r2.unwrap_err().code(), "NIKA-259");
394    }
395
396    // ═══════════════════════════════════════════════════════════════
397    // GAP 1: Budget rollback on MIME detection failure
398    // When MIME detection fails AFTER budget was charged, the budget
399    // must be rolled back so future tasks are not unfairly penalized.
400    // ═══════════════════════════════════════════════════════════════
401
402    #[tokio::test]
403    async fn budget_rollback_on_mime_detection_failure() {
404        let dir = tempfile::tempdir().unwrap();
405        let store = CasStore::new(dir.path());
406        let budget = Arc::new(MediaBudget::with_max_per_run(1000));
407        let processor = MediaProcessor::with_shared_budget(store, Arc::clone(&budget));
408
409        // Encode random bytes that infer cannot detect as any MIME type.
410        // Use application/octet-stream as server hint, which is explicitly
411        // rejected by detect_mime, causing MimeDetectionFailed.
412        let unknown_data = vec![0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07];
413        let b64 = base64::engine::general_purpose::STANDARD.encode(&unknown_data);
414
415        // Verify budget starts at 0
416        assert_eq!(budget.current_bytes(), 0, "budget should start at 0");
417
418        // Process should fail on MIME detection (NIKA-251)
419        let block = ContentBlock::image(b64, "application/octet-stream");
420        let result = processor.process(&block, "t_mime_fail").await;
421        assert!(
422            result.is_err(),
423            "unrecognizable bytes with octet-stream should fail"
424        );
425        assert_eq!(result.unwrap_err().code(), "NIKA-251");
426
427        // CRITICAL: budget must be rolled back to 0
428        assert_eq!(
429            budget.current_bytes(),
430            0,
431            "budget must be rolled back after MIME detection failure, got {}",
432            budget.current_bytes()
433        );
434
435        // Prove the budget is usable: a valid image should still succeed
436        let block2 = ContentBlock::image(png_base64(), "image/png");
437        let result2 = processor.process(&block2, "t_after_rollback").await;
438        assert!(
439            result2.is_ok(),
440            "valid image should succeed after rollback: {:?}",
441            result2.err()
442        );
443    }
444
445    // ═══════════════════════════════════════════════════════════════
446    // GAP 2: Budget rollback on CAS store failure
447    // When CAS store fails AFTER budget was charged (e.g. I/O error),
448    // the budget must be rolled back.
449    // ═══════════════════════════════════════════════════════════════
450
451    #[tokio::test]
452    async fn budget_rollback_on_cas_store_failure() {
453        // Create a CAS store pointing to a read-only or nonexistent path
454        // to force a MediaStoreIo error during store()
455        let dir = tempfile::tempdir().unwrap();
456        let readonly_path = dir.path().join("readonly_store");
457        // Create the directory then make it read-only
458        std::fs::create_dir_all(&readonly_path).unwrap();
459
460        // On macOS/Linux: make the directory read-only so CAS writes fail
461        #[cfg(unix)]
462        {
463            use std::os::unix::fs::PermissionsExt;
464            std::fs::set_permissions(&readonly_path, std::fs::Permissions::from_mode(0o444))
465                .unwrap();
466        }
467
468        let store = CasStore::new(&readonly_path);
469        let budget = Arc::new(MediaBudget::with_max_per_run(10000));
470        let processor = MediaProcessor::with_shared_budget(store, Arc::clone(&budget));
471
472        assert_eq!(budget.current_bytes(), 0, "budget should start at 0");
473
474        let block = ContentBlock::image(png_base64(), "image/png");
475        let result = processor.process(&block, "t_cas_fail").await;
476
477        // Should fail with I/O error (NIKA-255)
478        assert!(result.is_err(), "CAS store to read-only dir should fail");
479        let err = result.unwrap_err();
480        assert_eq!(
481            err.code(),
482            "NIKA-255",
483            "expected MediaStoreIo (NIKA-255), got {}",
484            err.code()
485        );
486
487        // CRITICAL: budget must be rolled back to 0
488        assert_eq!(
489            budget.current_bytes(),
490            0,
491            "budget must be rolled back after CAS store failure, got {}",
492            budget.current_bytes()
493        );
494
495        // Restore permissions for cleanup
496        #[cfg(unix)]
497        {
498            use std::os::unix::fs::PermissionsExt;
499            std::fs::set_permissions(&readonly_path, std::fs::Permissions::from_mode(0o755))
500                .unwrap();
501        }
502    }
503
504    // ═══════════════════════════════════════════
505    // AUTO-ENRICHMENT TESTS
506    // ═══════════════════════════════════════════
507
508    #[tokio::test]
509    async fn enrichment_png_has_dimensions() {
510        let (processor, _dir) = make_processor_with_dir();
511        let block = ContentBlock::image(png_base64(), "image/png");
512        let result = processor.process(&block, "t1").await.unwrap();
513        let (media_ref, _) = result.unwrap();
514
515        // PNG should have width/height in metadata
516        assert!(
517            media_ref.metadata.contains_key("width"),
518            "metadata should contain 'width', got: {:?}",
519            media_ref.metadata
520        );
521        assert!(
522            media_ref.metadata.contains_key("height"),
523            "metadata should contain 'height', got: {:?}",
524            media_ref.metadata
525        );
526    }
527
528    #[cfg(feature = "media-thumbnail")]
529    #[tokio::test]
530    async fn enrichment_png_has_thumbhash() {
531        // Need a FULL decodable PNG for thumbhash (png_base64 is just header)
532        let full_png = {
533            use image::{ImageBuffer, Rgb};
534            let img = ImageBuffer::from_pixel(4, 4, Rgb([255u8, 0, 0]));
535            let mut buf = Vec::new();
536            let enc = image::codecs::png::PngEncoder::new(&mut buf);
537            image::ImageEncoder::write_image(
538                enc,
539                img.as_raw(),
540                4,
541                4,
542                image::ExtendedColorType::Rgb8,
543            )
544            .unwrap();
545            base64::engine::general_purpose::STANDARD.encode(&buf)
546        };
547
548        let (processor, _dir) = make_processor_with_dir();
549        let block = ContentBlock::image(full_png, "image/png");
550        let result = processor.process(&block, "t1").await.unwrap();
551        let (media_ref, _) = result.unwrap();
552
553        // Should have thumbhash
554        assert!(
555            media_ref.metadata.contains_key("thumbhash"),
556            "metadata should contain 'thumbhash', got: {:?}",
557            media_ref.metadata
558        );
559        let th = media_ref
560            .metadata
561            .get("thumbhash")
562            .unwrap()
563            .as_str()
564            .unwrap();
565        // Verify it's valid base64
566        assert!(
567            base64::engine::general_purpose::STANDARD.decode(th).is_ok(),
568            "thumbhash should be valid base64: {th}"
569        );
570    }
571
572    #[tokio::test]
573    async fn enrichment_non_image_no_dimensions() {
574        // WAV RIFF header (minimal)
575        let wav_data: Vec<u8> = vec![
576            0x52, 0x49, 0x46, 0x46, // "RIFF"
577            0x24, 0x00, 0x00, 0x00, // chunk size
578            0x57, 0x41, 0x56, 0x45, // "WAVE"
579            0x66, 0x6D, 0x74, 0x20, // "fmt "
580            0x10, 0x00, 0x00, 0x00, // subchunk size
581            0x01, 0x00, // PCM
582            0x01, 0x00, // mono
583            0x44, 0xAC, 0x00, 0x00, // 44100 Hz
584            0x88, 0x58, 0x01, 0x00, // byte rate
585            0x02, 0x00, // block align
586            0x10, 0x00, // bits per sample
587            0x64, 0x61, 0x74, 0x61, // "data"
588            0x00, 0x00, 0x00, 0x00, // data size
589        ];
590        let wav_b64 = base64::engine::general_purpose::STANDARD.encode(&wav_data);
591        let (processor, _dir) = make_processor_with_dir();
592        let block = ContentBlock::audio(wav_b64, "audio/wav");
593        let result = processor.process(&block, "t1").await.unwrap();
594        let (media_ref, _) = result.unwrap();
595
596        // Audio should NOT have dimensions or thumbhash
597        assert!(
598            !media_ref.metadata.contains_key("width"),
599            "audio should not have width in metadata"
600        );
601        assert!(
602            !media_ref.metadata.contains_key("thumbhash"),
603            "audio should not have thumbhash in metadata"
604        );
605    }
606
607    #[tokio::test]
608    async fn enrichment_metadata_serializes_cleanly() {
609        let (processor, _dir) = make_processor_with_dir();
610        let block = ContentBlock::image(png_base64(), "image/png");
611        let result = processor.process(&block, "t1").await.unwrap();
612        let (media_ref, _) = result.unwrap();
613
614        // Roundtrip through JSON
615        let json = serde_json::to_string(&media_ref).unwrap();
616        let back: MediaRef = serde_json::from_str(&json).unwrap();
617        assert_eq!(media_ref, back);
618
619        // With metadata populated, the JSON should contain the metadata
620        if media_ref.metadata.contains_key("width") {
621            assert!(json.contains("metadata"));
622            assert!(json.contains("width"));
623        }
624    }
625
626    #[tokio::test]
627    async fn enrichment_empty_metadata_not_serialized() {
628        // Test that empty metadata is skipped in serialization
629        let mr = MediaRef {
630            hash: "blake3:test".to_string(),
631            mime_type: "text/plain".to_string(),
632            size_bytes: 10,
633            path: std::path::PathBuf::from("/tmp/test"),
634            extension: "txt".to_string(),
635            created_by: "test".to_string(),
636            metadata: serde_json::Map::new(),
637        };
638        let json = serde_json::to_string(&mr).unwrap();
639        assert!(
640            !json.contains("metadata"),
641            "empty metadata should not appear in JSON: {json}"
642        );
643    }
644}