Skip to main content

rustrails_storage/
analyzer.rs

1//! Pluggable blob analysis.
2
3use std::{collections::BTreeMap, sync::Arc};
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use rustrails_support::runtime;
8use serde_json::Value;
9use thiserror::Error;
10
11use crate::Blob;
12
13/// Errors returned by analyzers.
14#[derive(Debug, Error)]
15pub enum AnalyzerError {
16    /// No analyzer accepted the blob.
17    #[error("no analyzer accepted blob")]
18    Unsupported,
19    /// The blob payload was malformed for the selected analyzer.
20    #[error("malformed payload: {0}")]
21    Malformed(String),
22}
23
24/// Structured analyzer output.
25#[derive(Debug, Clone, PartialEq)]
26pub struct Analysis {
27    /// Metadata extracted from the blob payload.
28    pub metadata: BTreeMap<String, Value>,
29}
30
31impl Analysis {
32    /// Creates an empty analysis.
33    #[must_use]
34    pub fn empty() -> Self {
35        Self {
36            metadata: BTreeMap::new(),
37        }
38    }
39}
40
41/// Analyzer trait for inspecting blob bytes.
42#[async_trait]
43pub trait BlobAnalyzer: Send + Sync {
44    /// Returns the analyzer name.
45    fn name(&self) -> &str;
46
47    /// Returns whether the analyzer supports this blob.
48    fn accepts(&self, blob: &Blob) -> bool;
49
50    /// Extracts metadata from the blob bytes.
51    ///
52    /// # Errors
53    ///
54    /// Returns an error when the payload is malformed for the analyzer.
55    async fn analyze(&self, blob: &Blob, data: &Bytes) -> Result<Analysis, AnalyzerError>;
56}
57
58/// Dispatches to the first analyzer that accepts a blob.
59#[derive(Default, Clone)]
60pub struct AnalyzerRegistry {
61    analyzers: Vec<Arc<dyn BlobAnalyzer>>,
62}
63
64impl std::fmt::Debug for AnalyzerRegistry {
65    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        formatter
67            .debug_struct("AnalyzerRegistry")
68            .field("analyzers", &self.analyzers.len())
69            .finish()
70    }
71}
72
73impl AnalyzerRegistry {
74    /// Creates an empty registry.
75    #[must_use]
76    pub fn new() -> Self {
77        Self::default()
78    }
79
80    /// Creates a registry with the default analyzers.
81    #[must_use]
82    pub fn with_defaults() -> Self {
83        Self {
84            analyzers: vec![
85                Arc::new(ImageAnalyzer),
86                Arc::new(TextAnalyzer),
87                Arc::new(MediaAnalyzer),
88            ],
89        }
90    }
91
92    /// Registers another analyzer.
93    #[must_use]
94    pub fn register(mut self, analyzer: Arc<dyn BlobAnalyzer>) -> Self {
95        self.analyzers.push(analyzer);
96        self
97    }
98
99    /// Analyzes a blob with the first matching analyzer.
100    ///
101    /// # Errors
102    ///
103    /// Returns an error when no analyzer accepts the blob or when the analyzer fails.
104    pub async fn analyze(&self, blob: &Blob, data: &Bytes) -> Result<Analysis, AnalyzerError> {
105        for analyzer in &self.analyzers {
106            if analyzer.accepts(blob) {
107                return analyzer.analyze(blob, data).await;
108            }
109        }
110        Err(AnalyzerError::Unsupported)
111    }
112
113    /// Analyzes a blob with the first matching analyzer using the thread-local runtime.
114    ///
115    /// # Errors
116    ///
117    /// Returns an error when no analyzer accepts the blob or when the analyzer fails.
118    pub fn analyze_sync(&self, blob: &Blob, data: &Bytes) -> Result<Analysis, AnalyzerError> {
119        runtime::block_on(self.analyze(blob, data))
120    }
121}
122
123/// Analyzer for image dimensions.
124#[derive(Debug, Clone, Copy)]
125pub struct ImageAnalyzer;
126
127/// Analyzer for line and character counts in text blobs.
128#[derive(Debug, Clone, Copy)]
129pub struct TextAnalyzer;
130
131/// Analyzer for duration-like metadata in audio and video blobs.
132#[derive(Debug, Clone, Copy)]
133pub struct MediaAnalyzer;
134
135#[async_trait]
136impl BlobAnalyzer for ImageAnalyzer {
137    fn name(&self) -> &str {
138        "image"
139    }
140
141    fn accepts(&self, blob: &Blob) -> bool {
142        blob.is_image()
143    }
144
145    async fn analyze(&self, _blob: &Blob, data: &Bytes) -> Result<Analysis, AnalyzerError> {
146        let (width, height, format) = detect_dimensions(data)?;
147        let mut metadata = BTreeMap::new();
148        metadata.insert("width".to_owned(), Value::from(width));
149        metadata.insert("height".to_owned(), Value::from(height));
150        metadata.insert("format".to_owned(), Value::from(format));
151        Ok(Analysis { metadata })
152    }
153}
154
155#[async_trait]
156impl BlobAnalyzer for TextAnalyzer {
157    fn name(&self) -> &str {
158        "text"
159    }
160
161    fn accepts(&self, blob: &Blob) -> bool {
162        blob.is_text()
163    }
164
165    async fn analyze(&self, _blob: &Blob, data: &Bytes) -> Result<Analysis, AnalyzerError> {
166        let content = std::str::from_utf8(data)
167            .map_err(|error| AnalyzerError::Malformed(error.to_string()))?;
168        let mut metadata = BTreeMap::new();
169        metadata.insert(
170            "line_count".to_owned(),
171            Value::from(content.lines().count()),
172        );
173        metadata.insert(
174            "character_count".to_owned(),
175            Value::from(content.chars().count()),
176        );
177        Ok(Analysis { metadata })
178    }
179}
180
181#[async_trait]
182impl BlobAnalyzer for MediaAnalyzer {
183    fn name(&self) -> &str {
184        "media"
185    }
186
187    fn accepts(&self, blob: &Blob) -> bool {
188        blob.is_audio() || blob.is_video()
189    }
190
191    async fn analyze(&self, _blob: &Blob, data: &Bytes) -> Result<Analysis, AnalyzerError> {
192        let content = String::from_utf8_lossy(data);
193        let marker = content
194            .split_whitespace()
195            .find(|segment| segment.starts_with("duration="))
196            .ok_or_else(|| AnalyzerError::Malformed("missing duration marker".to_owned()))?;
197        let duration = marker
198            .trim_start_matches("duration=")
199            .parse::<f64>()
200            .map_err(|error| AnalyzerError::Malformed(error.to_string()))?;
201        let mut metadata = BTreeMap::new();
202        metadata.insert("duration_seconds".to_owned(), Value::from(duration));
203        Ok(Analysis { metadata })
204    }
205}
206
207fn detect_dimensions(data: &Bytes) -> Result<(u32, u32, &'static str), AnalyzerError> {
208    if data.starts_with(b"\x89PNG\r\n\x1a\n") && data.len() >= 24 {
209        let width = u32::from_be_bytes(
210            data[16..20]
211                .try_into()
212                .map_err(|_| AnalyzerError::Malformed("png width".to_owned()))?,
213        );
214        let height = u32::from_be_bytes(
215            data[20..24]
216                .try_into()
217                .map_err(|_| AnalyzerError::Malformed("png height".to_owned()))?,
218        );
219        return Ok((width, height, "png"));
220    }
221
222    if data.starts_with(b"GIF87a") || data.starts_with(b"GIF89a") {
223        if data.len() < 10 {
224            return Err(AnalyzerError::Malformed("gif header too short".to_owned()));
225        }
226        let width = u16::from_le_bytes([data[6], data[7]]);
227        let height = u16::from_le_bytes([data[8], data[9]]);
228        return Ok((u32::from(width), u32::from(height), "gif"));
229    }
230
231    if data.starts_with(b"\xff\xd8") {
232        let mut index = 2;
233        while index + 9 < data.len() {
234            if data[index] != 0xFF {
235                index += 1;
236                continue;
237            }
238            let marker = data[index + 1];
239            index += 2;
240            if marker == 0xD9 || marker == 0xDA {
241                break;
242            }
243            if index + 2 > data.len() {
244                break;
245            }
246            let length = u16::from_be_bytes([data[index], data[index + 1]]) as usize;
247            if (0xC0..=0xC3).contains(&marker) && index + 7 < data.len() {
248                let height = u16::from_be_bytes([data[index + 3], data[index + 4]]);
249                let width = u16::from_be_bytes([data[index + 5], data[index + 6]]);
250                return Ok((u32::from(width), u32::from(height), "jpeg"));
251            }
252            index += length;
253        }
254        return Err(AnalyzerError::Malformed(
255            "jpeg dimensions not found".to_owned(),
256        ));
257    }
258
259    Err(AnalyzerError::Malformed(
260        "unsupported image payload".to_owned(),
261    ))
262}
263
264#[cfg(test)]
265mod tests {
266    use bytes::Bytes;
267
268    use super::*;
269    use crate::{Blob, test_support::run_sync_test};
270
271    fn blob(filename: &str, content_type: Option<&str>) -> Blob {
272        Blob::create(
273            Bytes::from(filename.as_bytes().to_vec()),
274            filename,
275            content_type,
276            Default::default(),
277            "memory",
278        )
279        .expect("blob should build")
280    }
281
282    fn png(width: u32, height: u32) -> Bytes {
283        let mut data = vec![137, 80, 78, 71, 13, 10, 26, 10];
284        data.extend_from_slice(&[0, 0, 0, 13, 73, 72, 68, 82]);
285        data.extend_from_slice(&width.to_be_bytes());
286        data.extend_from_slice(&height.to_be_bytes());
287        data.extend_from_slice(&[8, 2, 0, 0, 0]);
288        data.extend_from_slice(&[0, 0, 0, 0]);
289        Bytes::from(data)
290    }
291
292    fn gif(width: u16, height: u16) -> Bytes {
293        let mut data = b"GIF89a".to_vec();
294        data.extend_from_slice(&width.to_le_bytes());
295        data.extend_from_slice(&height.to_le_bytes());
296        data.extend_from_slice(&[0, 0, 0]);
297        Bytes::from(data)
298    }
299
300    fn jpeg(width: u16, height: u16) -> Bytes {
301        Bytes::from(vec![
302            0xFF,
303            0xD8, // SOI
304            0xFF,
305            0xE0,
306            0x00,
307            0x10,
308            b'J',
309            b'F',
310            b'I',
311            b'F',
312            0,
313            1,
314            1,
315            0,
316            0,
317            1,
318            0,
319            1,
320            0,
321            0,
322            0xFF,
323            0xC0,
324            0x00,
325            0x11,
326            0x08,
327            (height >> 8) as u8,
328            height as u8,
329            (width >> 8) as u8,
330            width as u8,
331            0x03,
332            0x01,
333            0x11,
334            0x00,
335            0x02,
336            0x11,
337            0x00,
338            0x03,
339            0x11,
340            0x00,
341            0xFF,
342            0xD9,
343        ])
344    }
345
346    #[tokio::test]
347    async fn test_image_analyzer_reads_png_dimensions() {
348        let analysis = ImageAnalyzer
349            .analyze(&blob("image.png", Some("image/png")), &png(100, 50))
350            .await
351            .expect("analysis should succeed");
352        assert_eq!(analysis.metadata.get("width"), Some(&Value::from(100)));
353        assert_eq!(analysis.metadata.get("height"), Some(&Value::from(50)));
354    }
355
356    #[tokio::test]
357    async fn test_image_analyzer_reads_gif_dimensions() {
358        let analysis = ImageAnalyzer
359            .analyze(&blob("image.gif", Some("image/gif")), &gif(20, 10))
360            .await
361            .expect("analysis should succeed");
362        assert_eq!(analysis.metadata.get("width"), Some(&Value::from(20)));
363        assert_eq!(analysis.metadata.get("height"), Some(&Value::from(10)));
364    }
365
366    #[tokio::test]
367    async fn test_image_analyzer_reads_jpeg_dimensions() {
368        let analysis = ImageAnalyzer
369            .analyze(&blob("image.jpg", Some("image/jpeg")), &jpeg(640, 480))
370            .await
371            .expect("analysis should succeed");
372        assert_eq!(analysis.metadata.get("width"), Some(&Value::from(640)));
373        assert_eq!(analysis.metadata.get("height"), Some(&Value::from(480)));
374    }
375
376    #[tokio::test]
377    async fn test_text_analyzer_counts_lines_and_characters() {
378        let analysis = TextAnalyzer
379            .analyze(
380                &blob("notes.txt", Some("text/plain")),
381                &Bytes::from_static(b"a\nbc\n"),
382            )
383            .await
384            .expect("analysis should succeed");
385        assert_eq!(analysis.metadata.get("line_count"), Some(&Value::from(2)));
386        assert_eq!(
387            analysis.metadata.get("character_count"),
388            Some(&Value::from(5))
389        );
390    }
391
392    #[tokio::test]
393    async fn test_media_analyzer_reads_duration_marker() {
394        let analysis = MediaAnalyzer
395            .analyze(
396                &blob("clip.mp4", Some("video/mp4")),
397                &Bytes::from_static(b"duration=1.5 codec=h264"),
398            )
399            .await
400            .expect("analysis should succeed");
401        assert_eq!(
402            analysis.metadata.get("duration_seconds"),
403            Some(&Value::from(1.5))
404        );
405    }
406
407    #[tokio::test]
408    async fn test_registry_dispatches_to_matching_analyzer() {
409        let registry = AnalyzerRegistry::with_defaults();
410        let analysis = registry
411            .analyze(&blob("image.png", Some("image/png")), &png(1, 2))
412            .await
413            .expect("analysis should succeed");
414        assert_eq!(analysis.metadata.get("format"), Some(&Value::from("png")));
415    }
416
417    #[test]
418    fn test_registry_analyze_sync_dispatches_to_matching_analyzer() {
419        run_sync_test(|| {
420            let registry = AnalyzerRegistry::with_defaults();
421            let analysis = registry
422                .analyze_sync(&blob("image.png", Some("image/png")), &png(1, 2))
423                .expect("analysis should succeed");
424            assert_eq!(analysis.metadata.get("format"), Some(&Value::from("png")));
425        });
426    }
427
428    #[tokio::test]
429    async fn test_registry_returns_unsupported_when_nothing_matches() {
430        let registry = AnalyzerRegistry::with_defaults();
431        let error = registry
432            .analyze(
433                &blob("report.pdf", Some("application/pdf")),
434                &Bytes::from_static(b"%PDF-1.4"),
435            )
436            .await
437            .expect_err("analysis should fail");
438        assert!(matches!(error, AnalyzerError::Unsupported));
439    }
440}