1use std::sync::Arc;
9
10use nika_mcp::ContentBlock;
11
12use super::detect::detect_mime;
13use super::error::MediaError;
14use super::store::{CasStore, StoreResult};
15use super::types::{MediaBudget, MediaRef};
16
17const MAX_BASE64_INPUT_BYTES: usize = 100 * 1024 * 1024;
19
20pub struct MediaProcessor {
22 store: CasStore,
23 budget: Arc<MediaBudget>,
24}
25
26impl MediaProcessor {
27 #[allow(dead_code)] pub fn new(store: CasStore) -> Self {
30 Self {
31 store,
32 budget: Arc::new(MediaBudget::new()),
33 }
34 }
35
36 #[allow(dead_code)] pub fn with_budget(store: CasStore, budget: MediaBudget) -> Self {
39 Self {
40 store,
41 budget: Arc::new(budget),
42 }
43 }
44
45 pub fn with_shared_budget(store: CasStore, budget: Arc<MediaBudget>) -> Self {
47 Self { store, budget }
48 }
49
50 pub async fn process(
55 &self,
56 block: &ContentBlock,
57 task_id: &str,
58 ) -> Result<Option<(MediaRef, StoreResult)>, MediaError> {
59 match block {
60 ContentBlock::Text { .. } => Ok(None),
61
62 ContentBlock::Image { data, mime_type } => {
63 self.process_base64(data, Some(mime_type.as_str()), task_id)
64 .await
65 }
66
67 ContentBlock::Audio { data, mime_type } => {
68 self.process_base64(data, Some(mime_type.as_str()), task_id)
69 .await
70 }
71
72 ContentBlock::Resource(rc) => {
73 if let Some(blob) = &rc.blob {
74 self.process_base64(blob, rc.mime_type.as_deref(), task_id)
75 .await
76 } else {
77 Ok(None)
78 }
79 }
80
81 ContentBlock::ResourceLink { uri, .. } => {
82 tracing::debug!(uri = %uri, "ResourceLink skipped (no inline data)");
83 Ok(None)
84 }
85 }
86 }
87
88 pub async fn process_all(
92 &self,
93 blocks: &[ContentBlock],
94 task_id: &str,
95 ) -> Vec<Result<(MediaRef, StoreResult), (usize, MediaError)>> {
96 let mut results = Vec::new();
97 for (i, block) in blocks.iter().enumerate() {
98 if block.is_text() {
99 continue;
100 }
101 match self.process(block, task_id).await {
102 Ok(Some(pair)) => results.push(Ok(pair)),
103 Ok(None) => {}
104 Err(e) => {
105 tracing::error!(
106 task_id = %task_id,
107 block_index = i,
108 error = %e,
109 "Failed to process media block"
110 );
111 results.push(Err((i, e)));
112 }
113 }
114 }
115 results
116 }
117
118 async fn process_base64(
120 &self,
121 base64_data: &str,
122 server_mime: Option<&str>,
123 task_id: &str,
124 ) -> Result<Option<(MediaRef, StoreResult)>, MediaError> {
125 tracing::debug!(
126 task_id = %task_id,
127 base64_len = base64_data.len(),
128 server_mime = ?server_mime,
129 "media: processing base64 block"
130 );
131
132 if base64_data.len() > MAX_BASE64_INPUT_BYTES {
134 return Err(MediaError::Base64InputTooLarge {
135 size: base64_data.len(),
136 max: MAX_BASE64_INPUT_BYTES,
137 });
138 }
139
140 if base64_data.is_empty() {
142 return Err(MediaError::EmptyMediaContent {
143 task_id: task_id.to_string(),
144 });
145 }
146
147 use base64::Engine;
150 let clean_b64: String = base64_data
151 .chars()
152 .filter(|c| !c.is_ascii_whitespace())
153 .collect();
154 let decoded = base64::engine::general_purpose::STANDARD
155 .decode(&clean_b64)
156 .map_err(|e| MediaError::Base64DecodeFailed {
157 source_desc: format!("task {task_id}"),
158 reason: e.to_string(),
159 })?;
160
161 if decoded.is_empty() {
163 return Err(MediaError::EmptyMediaContent {
164 task_id: task_id.to_string(),
165 });
166 }
167
168 tracing::trace!(
169 task_id = %task_id,
170 decoded_bytes = decoded.len(),
171 budget_used = self.budget.current_bytes(),
172 "media: base64 decoded successfully"
173 );
174
175 let decoded_size = decoded.len() as u64;
177 self.budget.check_and_add(decoded_size, task_id)?;
178
179 let detected = match detect_mime(&decoded, server_mime) {
181 Ok(d) => d,
182 Err(e) => {
183 self.budget.rollback(decoded_size);
184 return Err(e);
185 }
186 };
187
188 let store_result = match self.store.store(&decoded).await {
190 Ok(r) => r,
191 Err(e) => {
192 self.budget.rollback(decoded_size);
193 return Err(e);
194 }
195 };
196
197 tracing::debug!(
198 task_id = %task_id,
199 hash = %store_result.hash,
200 mime = %detected.mime_type,
201 size = store_result.size,
202 dedup = store_result.deduplicated,
203 verified = store_result.verified,
204 pipeline_ms = store_result.pipeline_ms,
205 "media: stored in CAS"
206 );
207
208 let mut metadata = serde_json::Map::new();
210 if detected.mime_type.starts_with("image/") {
211 if let Ok(size) = imagesize::blob_size(&decoded) {
213 metadata.insert("width".into(), serde_json::json!(size.width));
214 metadata.insert("height".into(), serde_json::json!(size.height));
215 }
216
217 if decoded.len() < 10_000_000 {
219 if let Some(hash) = compute_thumbhash_for_enrichment(&decoded) {
220 metadata.insert("thumbhash".into(), serde_json::json!(hash));
221 }
222 }
223 }
224
225 let media_ref = MediaRef {
226 hash: store_result.hash.clone(),
227 mime_type: detected.mime_type,
228 size_bytes: store_result.size,
229 path: store_result.path.clone(),
230 extension: detected.extension,
231 created_by: task_id.to_string(),
232 metadata,
233 };
234
235 Ok(Some((media_ref, store_result)))
236 }
237}
238
239fn compute_thumbhash_for_enrichment(data: &[u8]) -> Option<String> {
244 #[cfg(feature = "media-thumbnail")]
245 {
246 let img = {
249 use image::ImageReader;
250 use std::io::Cursor;
251 let mut reader = ImageReader::new(Cursor::new(data))
252 .with_guessed_format()
253 .ok()?;
254 let mut limits = image::Limits::default();
255 limits.max_alloc = Some(256 * 1024 * 1024); limits.max_image_width = Some(16384);
257 limits.max_image_height = Some(16384);
258 reader.limits(limits);
259 reader.decode().ok()?
260 };
261 let small = img.resize(100, 100, image::imageops::FilterType::Triangle);
262 let rgba = small.to_rgba8();
263 let (w, h) = rgba.dimensions();
264 let hash = thumbhash::rgba_to_thumb_hash(w as usize, h as usize, rgba.as_raw());
265 use base64::Engine;
266 Some(base64::engine::general_purpose::STANDARD.encode(&hash))
267 }
268
269 #[cfg(not(feature = "media-thumbnail"))]
270 {
271 let _ = data;
273 None
274 }
275}
276
277#[cfg(test)]
278mod tests {
279 use super::*;
280 use base64::Engine;
281 use nika_mcp::ContentBlock;
282
283 fn make_processor_with_dir() -> (MediaProcessor, tempfile::TempDir) {
284 let dir = tempfile::tempdir().unwrap();
285 let store = CasStore::new(dir.path());
286 (MediaProcessor::new(store), dir)
287 }
288
289 fn png_base64() -> String {
291 let png_data: Vec<u8> = vec![
292 0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A, 0x00, 0x00, 0x00, 0x0D, 0x49, 0x48, 0x44, 0x52, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x08, 0x02, 0x00, 0x00, 0x00, ];
299 base64::engine::general_purpose::STANDARD.encode(&png_data)
300 }
301
302 #[tokio::test]
303 async fn process_text_block_returns_none() {
304 let (processor, _dir) = make_processor_with_dir();
305 let block = ContentBlock::text("hello");
306 let result = processor.process(&block, "t1").await.unwrap();
307 assert!(result.is_none());
308 }
309
310 #[tokio::test]
311 async fn process_image_block_returns_media_ref() {
312 let (processor, _dir) = make_processor_with_dir();
313 let block = ContentBlock::image(png_base64(), "image/png");
314 let result = processor.process(&block, "t1").await.unwrap();
315 assert!(result.is_some());
316 let (media_ref, _store_result) = result.unwrap();
317 assert!(media_ref.hash.starts_with("blake3:"));
318 assert_eq!(media_ref.mime_type, "image/png");
319 assert_eq!(media_ref.extension, "png");
320 assert!(media_ref.size_bytes > 0);
321 assert_eq!(media_ref.created_by, "t1");
322 }
323
324 #[tokio::test]
325 async fn process_invalid_base64_returns_error() {
326 let (processor, _dir) = make_processor_with_dir();
327 let block = ContentBlock::image("not!valid!base64!!!", "image/png");
328 let result = processor.process(&block, "t1").await;
329 assert!(result.is_err());
330 assert_eq!(result.unwrap_err().code(), "NIKA-256");
331 }
332
333 #[tokio::test]
334 async fn process_empty_base64_returns_error() {
335 let (processor, _dir) = make_processor_with_dir();
336 let block = ContentBlock::image("", "image/png");
337 let result = processor.process(&block, "t1").await;
338 assert!(
339 result.is_err(),
340 "Empty base64 image should error (NIKA-258)"
341 );
342 assert_eq!(result.unwrap_err().code(), "NIKA-258");
343 }
344
345 #[tokio::test]
346 async fn process_oversized_base64_returns_error() {
347 let (processor, _dir) = make_processor_with_dir();
348 let big = "A".repeat(MAX_BASE64_INPUT_BYTES + 1);
350 let block = ContentBlock::image(big, "image/png");
351 let result = processor.process(&block, "t1").await;
352 assert!(result.is_err());
353 assert_eq!(result.unwrap_err().code(), "NIKA-257");
354 }
355
356 #[tokio::test]
357 async fn process_all_mixed_blocks() {
358 let (processor, _dir) = make_processor_with_dir();
359 let blocks = vec![
360 ContentBlock::text("some text"),
361 ContentBlock::image(png_base64(), "image/png"),
362 ContentBlock::text("more text"),
363 ];
364 let results = processor.process_all(&blocks, "t1").await;
365 assert_eq!(results.len(), 1);
367 assert!(results[0].is_ok());
368 }
369
370 #[tokio::test]
371 async fn resource_link_skipped() {
372 let (processor, _dir) = make_processor_with_dir();
373 let block = ContentBlock::resource_link("file:///test", None, None);
374 let result = processor.process(&block, "t1").await.unwrap();
375 assert!(result.is_none());
376 }
377
378 #[tokio::test]
379 async fn budget_enforcement() {
380 let dir = tempfile::tempdir().unwrap();
381 let store = CasStore::new(dir.path());
382 let budget = MediaBudget::with_max_per_run(50); let processor = MediaProcessor::with_budget(store, budget);
384
385 let block = ContentBlock::image(png_base64(), "image/png");
386 let r1 = processor.process(&block, "t1").await;
388 assert!(r1.is_ok());
389
390 let r2 = processor.process(&block, "t2").await;
392 assert!(r2.is_err());
393 assert_eq!(r2.unwrap_err().code(), "NIKA-259");
394 }
395
396 #[tokio::test]
403 async fn budget_rollback_on_mime_detection_failure() {
404 let dir = tempfile::tempdir().unwrap();
405 let store = CasStore::new(dir.path());
406 let budget = Arc::new(MediaBudget::with_max_per_run(1000));
407 let processor = MediaProcessor::with_shared_budget(store, Arc::clone(&budget));
408
409 let unknown_data = vec![0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07];
413 let b64 = base64::engine::general_purpose::STANDARD.encode(&unknown_data);
414
415 assert_eq!(budget.current_bytes(), 0, "budget should start at 0");
417
418 let block = ContentBlock::image(b64, "application/octet-stream");
420 let result = processor.process(&block, "t_mime_fail").await;
421 assert!(
422 result.is_err(),
423 "unrecognizable bytes with octet-stream should fail"
424 );
425 assert_eq!(result.unwrap_err().code(), "NIKA-251");
426
427 assert_eq!(
429 budget.current_bytes(),
430 0,
431 "budget must be rolled back after MIME detection failure, got {}",
432 budget.current_bytes()
433 );
434
435 let block2 = ContentBlock::image(png_base64(), "image/png");
437 let result2 = processor.process(&block2, "t_after_rollback").await;
438 assert!(
439 result2.is_ok(),
440 "valid image should succeed after rollback: {:?}",
441 result2.err()
442 );
443 }
444
445 #[tokio::test]
452 async fn budget_rollback_on_cas_store_failure() {
453 let dir = tempfile::tempdir().unwrap();
456 let readonly_path = dir.path().join("readonly_store");
457 std::fs::create_dir_all(&readonly_path).unwrap();
459
460 #[cfg(unix)]
462 {
463 use std::os::unix::fs::PermissionsExt;
464 std::fs::set_permissions(&readonly_path, std::fs::Permissions::from_mode(0o444))
465 .unwrap();
466 }
467
468 let store = CasStore::new(&readonly_path);
469 let budget = Arc::new(MediaBudget::with_max_per_run(10000));
470 let processor = MediaProcessor::with_shared_budget(store, Arc::clone(&budget));
471
472 assert_eq!(budget.current_bytes(), 0, "budget should start at 0");
473
474 let block = ContentBlock::image(png_base64(), "image/png");
475 let result = processor.process(&block, "t_cas_fail").await;
476
477 assert!(result.is_err(), "CAS store to read-only dir should fail");
479 let err = result.unwrap_err();
480 assert_eq!(
481 err.code(),
482 "NIKA-255",
483 "expected MediaStoreIo (NIKA-255), got {}",
484 err.code()
485 );
486
487 assert_eq!(
489 budget.current_bytes(),
490 0,
491 "budget must be rolled back after CAS store failure, got {}",
492 budget.current_bytes()
493 );
494
495 #[cfg(unix)]
497 {
498 use std::os::unix::fs::PermissionsExt;
499 std::fs::set_permissions(&readonly_path, std::fs::Permissions::from_mode(0o755))
500 .unwrap();
501 }
502 }
503
504 #[tokio::test]
509 async fn enrichment_png_has_dimensions() {
510 let (processor, _dir) = make_processor_with_dir();
511 let block = ContentBlock::image(png_base64(), "image/png");
512 let result = processor.process(&block, "t1").await.unwrap();
513 let (media_ref, _) = result.unwrap();
514
515 assert!(
517 media_ref.metadata.contains_key("width"),
518 "metadata should contain 'width', got: {:?}",
519 media_ref.metadata
520 );
521 assert!(
522 media_ref.metadata.contains_key("height"),
523 "metadata should contain 'height', got: {:?}",
524 media_ref.metadata
525 );
526 }
527
528 #[cfg(feature = "media-thumbnail")]
529 #[tokio::test]
530 async fn enrichment_png_has_thumbhash() {
531 let full_png = {
533 use image::{ImageBuffer, Rgb};
534 let img = ImageBuffer::from_pixel(4, 4, Rgb([255u8, 0, 0]));
535 let mut buf = Vec::new();
536 let enc = image::codecs::png::PngEncoder::new(&mut buf);
537 image::ImageEncoder::write_image(
538 enc,
539 img.as_raw(),
540 4,
541 4,
542 image::ExtendedColorType::Rgb8,
543 )
544 .unwrap();
545 base64::engine::general_purpose::STANDARD.encode(&buf)
546 };
547
548 let (processor, _dir) = make_processor_with_dir();
549 let block = ContentBlock::image(full_png, "image/png");
550 let result = processor.process(&block, "t1").await.unwrap();
551 let (media_ref, _) = result.unwrap();
552
553 assert!(
555 media_ref.metadata.contains_key("thumbhash"),
556 "metadata should contain 'thumbhash', got: {:?}",
557 media_ref.metadata
558 );
559 let th = media_ref
560 .metadata
561 .get("thumbhash")
562 .unwrap()
563 .as_str()
564 .unwrap();
565 assert!(
567 base64::engine::general_purpose::STANDARD.decode(th).is_ok(),
568 "thumbhash should be valid base64: {th}"
569 );
570 }
571
572 #[tokio::test]
573 async fn enrichment_non_image_no_dimensions() {
574 let wav_data: Vec<u8> = vec![
576 0x52, 0x49, 0x46, 0x46, 0x24, 0x00, 0x00, 0x00, 0x57, 0x41, 0x56, 0x45, 0x66, 0x6D, 0x74, 0x20, 0x10, 0x00, 0x00, 0x00, 0x01, 0x00, 0x01, 0x00, 0x44, 0xAC, 0x00, 0x00, 0x88, 0x58, 0x01, 0x00, 0x02, 0x00, 0x10, 0x00, 0x64, 0x61, 0x74, 0x61, 0x00, 0x00, 0x00, 0x00, ];
590 let wav_b64 = base64::engine::general_purpose::STANDARD.encode(&wav_data);
591 let (processor, _dir) = make_processor_with_dir();
592 let block = ContentBlock::audio(wav_b64, "audio/wav");
593 let result = processor.process(&block, "t1").await.unwrap();
594 let (media_ref, _) = result.unwrap();
595
596 assert!(
598 !media_ref.metadata.contains_key("width"),
599 "audio should not have width in metadata"
600 );
601 assert!(
602 !media_ref.metadata.contains_key("thumbhash"),
603 "audio should not have thumbhash in metadata"
604 );
605 }
606
607 #[tokio::test]
608 async fn enrichment_metadata_serializes_cleanly() {
609 let (processor, _dir) = make_processor_with_dir();
610 let block = ContentBlock::image(png_base64(), "image/png");
611 let result = processor.process(&block, "t1").await.unwrap();
612 let (media_ref, _) = result.unwrap();
613
614 let json = serde_json::to_string(&media_ref).unwrap();
616 let back: MediaRef = serde_json::from_str(&json).unwrap();
617 assert_eq!(media_ref, back);
618
619 if media_ref.metadata.contains_key("width") {
621 assert!(json.contains("metadata"));
622 assert!(json.contains("width"));
623 }
624 }
625
626 #[tokio::test]
627 async fn enrichment_empty_metadata_not_serialized() {
628 let mr = MediaRef {
630 hash: "blake3:test".to_string(),
631 mime_type: "text/plain".to_string(),
632 size_bytes: 10,
633 path: std::path::PathBuf::from("/tmp/test"),
634 extension: "txt".to_string(),
635 created_by: "test".to_string(),
636 metadata: serde_json::Map::new(),
637 };
638 let json = serde_json::to_string(&mr).unwrap();
639 assert!(
640 !json.contains("metadata"),
641 "empty metadata should not appear in JSON: {json}"
642 );
643 }
644}