Skip to main content

nika_media/
types.rs

1//! Media pipeline types: MediaRef, MediaType, MediaBudget
2
3use std::path::PathBuf;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6use serde::{Deserialize, Serialize};
7
8/// Reference to a media file stored in the CAS.
9///
10/// Serializes to JSON for use in `{{with.task_id.media[0].hash}}` templates.
11/// Hash stores the algorithm-prefixed hash: `"blake3:af1349..."`.
12/// CAS filenames are hash-only (no extension). Extension is stored only here.
13#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
14pub struct MediaRef {
15    /// Algorithm-prefixed hash (e.g., "blake3:af1349b9...")
16    pub hash: String,
17
18    /// Detected MIME type (e.g., "image/png", "audio/wav")
19    pub mime_type: String,
20
21    /// File size in bytes (decoded, not base64)
22    pub size_bytes: u64,
23
24    /// Absolute path to the stored file
25    pub path: PathBuf,
26
27    /// File extension without dot (e.g., "png", "wav")
28    pub extension: String,
29
30    /// Task ID that produced this media
31    pub created_by: String,
32
33    /// Auto-enriched metadata (dimensions, thumbhash, etc.)
34    ///
35    /// Template access: `{{with.task.media[0].metadata.width}}`
36    #[serde(default, skip_serializing_if = "serde_json::Map::is_empty")]
37    pub metadata: serde_json::Map<String, serde_json::Value>,
38}
39
40/// Broad media type classification.
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
42#[serde(rename_all = "snake_case")]
43#[allow(dead_code)] // Used in tests
44pub enum MediaType {
45    Image,
46    Audio,
47    Document,
48    Unknown,
49}
50
51impl MediaType {
52    /// Classify a MIME type string into a MediaType.
53    #[allow(dead_code)] // Used in tests
54    pub fn from_mime(mime: &str) -> Self {
55        if mime.starts_with("image/") {
56            Self::Image
57        } else if mime.starts_with("audio/") {
58            Self::Audio
59        } else if mime.starts_with("application/pdf")
60            || mime.starts_with("application/vnd.openxmlformats")
61            || mime.starts_with("application/msword")
62        {
63            Self::Document
64        } else {
65            Self::Unknown
66        }
67    }
68}
69
70/// Media budget enforcement for memory safety.
71///
72/// Tracks cumulative bytes processed per run to prevent unbounded media
73/// accumulation (e.g., from `for_each` with many media-producing iterations).
74/// Uses `AtomicU64` for lock-free concurrent access from parallel tasks.
75pub struct MediaBudget {
76    run_bytes: AtomicU64,
77    max_per_run: u64,
78}
79
80impl MediaBudget {
81    /// Default per-run budget: 500MB.
82    pub const DEFAULT_MAX_PER_RUN: u64 = 500 * 1024 * 1024;
83
84    /// Create a new budget with default limits.
85    pub fn new() -> Self {
86        Self {
87            run_bytes: AtomicU64::new(0),
88            max_per_run: Self::DEFAULT_MAX_PER_RUN,
89        }
90    }
91
92    /// Create a new budget with custom per-run limit.
93    pub fn with_max_per_run(max_per_run: u64) -> Self {
94        Self {
95            run_bytes: AtomicU64::new(0),
96            max_per_run,
97        }
98    }
99
100    /// Check budget and add bytes atomically. Returns error if budget exceeded.
101    ///
102    /// Uses `fetch_update` (CAS loop) so the increment and comparison
103    /// are one atomic unit — prevents concurrent overruns.
104    pub fn check_and_add(&self, size: u64, _task_id: &str) -> Result<(), super::error::MediaError> {
105        let result = self
106            .run_bytes
107            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
108                let new_total = current + size;
109                if new_total > self.max_per_run {
110                    None // reject: over budget
111                } else {
112                    Some(new_total) // accept: update counter
113                }
114            });
115        match result {
116            Ok(_) => Ok(()),
117            Err(current) => Err(super::error::MediaError::RunBudgetExceeded {
118                current: current + size,
119                max: self.max_per_run,
120            }),
121        }
122    }
123
124    /// Roll back a budget allocation (e.g., when CAS store fails after budget was charged).
125    ///
126    /// Uses `fetch_update` with `saturating_sub` to prevent underflow.
127    /// Without this, a rollback larger than the current value would wrap to `u64::MAX`,
128    /// effectively disabling budget enforcement for the rest of the run.
129    pub fn rollback(&self, size: u64) {
130        let _ = self
131            .run_bytes
132            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
133                Some(current.saturating_sub(size))
134            });
135    }
136
137    /// Get current accumulated bytes.
138    pub fn current_bytes(&self) -> u64 {
139        self.run_bytes.load(Ordering::Acquire)
140    }
141}
142
143impl Default for MediaBudget {
144    fn default() -> Self {
145        Self::new()
146    }
147}
148
149#[cfg(test)]
150mod tests {
151    use super::*;
152
153    #[test]
154    fn media_budget_under_limit() {
155        let budget = MediaBudget::with_max_per_run(1024);
156        assert!(budget.check_and_add(512, "t1").is_ok());
157        assert_eq!(budget.current_bytes(), 512);
158    }
159
160    #[test]
161    fn media_budget_exceeds_limit() {
162        let budget = MediaBudget::with_max_per_run(1024);
163        budget.check_and_add(512, "t1").unwrap();
164        let err = budget.check_and_add(1024, "t2").unwrap_err();
165        assert_eq!(err.code(), "NIKA-259");
166        // Budget should NOT have increased
167        assert_eq!(budget.current_bytes(), 512);
168    }
169
170    #[test]
171    fn media_budget_accumulated_tracking() {
172        let budget = MediaBudget::with_max_per_run(1000);
173        budget.check_and_add(300, "t1").unwrap();
174        budget.check_and_add(300, "t2").unwrap();
175        budget.check_and_add(300, "t3").unwrap();
176        assert_eq!(budget.current_bytes(), 900);
177        // This should fail (900 + 200 = 1100 > 1000)
178        assert!(budget.check_and_add(200, "t4").is_err());
179    }
180
181    #[test]
182    fn media_ref_serde_roundtrip() {
183        let mr = MediaRef {
184            hash: "blake3:af1349b9".to_string(),
185            mime_type: "image/png".to_string(),
186            size_bytes: 4096,
187            path: PathBuf::from("/tmp/store/af/1349b9"),
188            extension: "png".to_string(),
189            created_by: "gen_img".to_string(),
190            metadata: serde_json::Map::new(),
191        };
192        let json = serde_json::to_string(&mr).unwrap();
193        let back: MediaRef = serde_json::from_str(&json).unwrap();
194        assert_eq!(mr, back);
195        assert!(json.contains("blake3:af1349b9"));
196    }
197
198    #[test]
199    fn media_type_from_mime() {
200        assert_eq!(MediaType::from_mime("image/png"), MediaType::Image);
201        assert_eq!(MediaType::from_mime("image/jpeg"), MediaType::Image);
202        assert_eq!(MediaType::from_mime("audio/wav"), MediaType::Audio);
203        assert_eq!(MediaType::from_mime("audio/mpeg"), MediaType::Audio);
204        assert_eq!(MediaType::from_mime("application/pdf"), MediaType::Document);
205        assert_eq!(MediaType::from_mime("text/plain"), MediaType::Unknown);
206    }
207
208    // ═══════════════════════════════════════════════════════════════
209    // GAP 1/2 support: MediaBudget::rollback unit test
210    // Verifies that rollback correctly decrements the counter
211    // ═══════════════════════════════════════════════════════════════
212
213    #[test]
214    fn media_budget_rollback_restores_capacity() {
215        let budget = MediaBudget::with_max_per_run(1000);
216
217        // Charge 600 bytes
218        budget.check_and_add(600, "t1").unwrap();
219        assert_eq!(budget.current_bytes(), 600);
220
221        // Roll back 600 bytes (simulates failure after budget charge)
222        budget.rollback(600);
223        assert_eq!(
224            budget.current_bytes(),
225            0,
226            "rollback should restore budget to 0"
227        );
228
229        // After rollback, the full budget should be available again
230        budget.check_and_add(900, "t2").unwrap();
231        assert_eq!(budget.current_bytes(), 900);
232    }
233
234    #[test]
235    fn media_budget_partial_rollback() {
236        let budget = MediaBudget::with_max_per_run(1000);
237
238        budget.check_and_add(300, "t1").unwrap();
239        budget.check_and_add(400, "t2").unwrap();
240        assert_eq!(budget.current_bytes(), 700);
241
242        // Roll back only t2's allocation
243        budget.rollback(400);
244        assert_eq!(
245            budget.current_bytes(),
246            300,
247            "partial rollback should leave t1's allocation intact"
248        );
249
250        // Now we have 700 bytes of capacity remaining
251        budget.check_and_add(700, "t3").unwrap();
252        assert_eq!(budget.current_bytes(), 1000);
253    }
254
255    #[test]
256    fn media_budget_rollback_then_reuse() {
257        let budget = MediaBudget::with_max_per_run(100);
258
259        // Fill to capacity
260        budget.check_and_add(100, "t1").unwrap();
261        assert!(
262            budget.check_and_add(1, "t2").is_err(),
263            "should be at capacity"
264        );
265
266        // Roll back and verify capacity is restored
267        budget.rollback(100);
268        assert_eq!(budget.current_bytes(), 0);
269
270        // Now the full budget is available
271        budget.check_and_add(100, "t3").unwrap();
272        assert_eq!(budget.current_bytes(), 100);
273    }
274}