use std::{collections::BTreeMap, sync::Arc};
use async_trait::async_trait;
use bytes::Bytes;
use rustrails_support::runtime;
use serde_json::Value;
use thiserror::Error;
use crate::Blob;
#[derive(Debug, Error)]
pub enum AnalyzerError {
#[error("no analyzer accepted blob")]
Unsupported,
#[error("malformed payload: {0}")]
Malformed(String),
}
#[derive(Debug, Clone, PartialEq)]
pub struct Analysis {
pub metadata: BTreeMap<String, Value>,
}
impl Analysis {
#[must_use]
pub fn empty() -> Self {
Self {
metadata: BTreeMap::new(),
}
}
}
#[async_trait]
pub trait BlobAnalyzer: Send + Sync {
fn name(&self) -> &str;
fn accepts(&self, blob: &Blob) -> bool;
async fn analyze(&self, blob: &Blob, data: &Bytes) -> Result<Analysis, AnalyzerError>;
}
#[derive(Default, Clone)]
pub struct AnalyzerRegistry {
analyzers: Vec<Arc<dyn BlobAnalyzer>>,
}
impl std::fmt::Debug for AnalyzerRegistry {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter
.debug_struct("AnalyzerRegistry")
.field("analyzers", &self.analyzers.len())
.finish()
}
}
impl AnalyzerRegistry {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_defaults() -> Self {
Self {
analyzers: vec![
Arc::new(ImageAnalyzer),
Arc::new(TextAnalyzer),
Arc::new(MediaAnalyzer),
],
}
}
#[must_use]
pub fn register(mut self, analyzer: Arc<dyn BlobAnalyzer>) -> Self {
self.analyzers.push(analyzer);
self
}
pub async fn analyze(&self, blob: &Blob, data: &Bytes) -> Result<Analysis, AnalyzerError> {
for analyzer in &self.analyzers {
if analyzer.accepts(blob) {
return analyzer.analyze(blob, data).await;
}
}
Err(AnalyzerError::Unsupported)
}
pub fn analyze_sync(&self, blob: &Blob, data: &Bytes) -> Result<Analysis, AnalyzerError> {
runtime::block_on(self.analyze(blob, data))
}
}
#[derive(Debug, Clone, Copy)]
pub struct ImageAnalyzer;
#[derive(Debug, Clone, Copy)]
pub struct TextAnalyzer;
#[derive(Debug, Clone, Copy)]
pub struct MediaAnalyzer;
#[async_trait]
impl BlobAnalyzer for ImageAnalyzer {
fn name(&self) -> &str {
"image"
}
fn accepts(&self, blob: &Blob) -> bool {
blob.is_image()
}
async fn analyze(&self, _blob: &Blob, data: &Bytes) -> Result<Analysis, AnalyzerError> {
let (width, height, format) = detect_dimensions(data)?;
let mut metadata = BTreeMap::new();
metadata.insert("width".to_owned(), Value::from(width));
metadata.insert("height".to_owned(), Value::from(height));
metadata.insert("format".to_owned(), Value::from(format));
Ok(Analysis { metadata })
}
}
#[async_trait]
impl BlobAnalyzer for TextAnalyzer {
fn name(&self) -> &str {
"text"
}
fn accepts(&self, blob: &Blob) -> bool {
blob.is_text()
}
async fn analyze(&self, _blob: &Blob, data: &Bytes) -> Result<Analysis, AnalyzerError> {
let content = std::str::from_utf8(data)
.map_err(|error| AnalyzerError::Malformed(error.to_string()))?;
let mut metadata = BTreeMap::new();
metadata.insert(
"line_count".to_owned(),
Value::from(content.lines().count()),
);
metadata.insert(
"character_count".to_owned(),
Value::from(content.chars().count()),
);
Ok(Analysis { metadata })
}
}
#[async_trait]
impl BlobAnalyzer for MediaAnalyzer {
fn name(&self) -> &str {
"media"
}
fn accepts(&self, blob: &Blob) -> bool {
blob.is_audio() || blob.is_video()
}
async fn analyze(&self, _blob: &Blob, data: &Bytes) -> Result<Analysis, AnalyzerError> {
let content = String::from_utf8_lossy(data);
let marker = content
.split_whitespace()
.find(|segment| segment.starts_with("duration="))
.ok_or_else(|| AnalyzerError::Malformed("missing duration marker".to_owned()))?;
let duration = marker
.trim_start_matches("duration=")
.parse::<f64>()
.map_err(|error| AnalyzerError::Malformed(error.to_string()))?;
let mut metadata = BTreeMap::new();
metadata.insert("duration_seconds".to_owned(), Value::from(duration));
Ok(Analysis { metadata })
}
}
fn detect_dimensions(data: &Bytes) -> Result<(u32, u32, &'static str), AnalyzerError> {
if data.starts_with(b"\x89PNG\r\n\x1a\n") && data.len() >= 24 {
let width = u32::from_be_bytes(
data[16..20]
.try_into()
.map_err(|_| AnalyzerError::Malformed("png width".to_owned()))?,
);
let height = u32::from_be_bytes(
data[20..24]
.try_into()
.map_err(|_| AnalyzerError::Malformed("png height".to_owned()))?,
);
return Ok((width, height, "png"));
}
if data.starts_with(b"GIF87a") || data.starts_with(b"GIF89a") {
if data.len() < 10 {
return Err(AnalyzerError::Malformed("gif header too short".to_owned()));
}
let width = u16::from_le_bytes([data[6], data[7]]);
let height = u16::from_le_bytes([data[8], data[9]]);
return Ok((u32::from(width), u32::from(height), "gif"));
}
if data.starts_with(b"\xff\xd8") {
let mut index = 2;
while index + 9 < data.len() {
if data[index] != 0xFF {
index += 1;
continue;
}
let marker = data[index + 1];
index += 2;
if marker == 0xD9 || marker == 0xDA {
break;
}
if index + 2 > data.len() {
break;
}
let length = u16::from_be_bytes([data[index], data[index + 1]]) as usize;
if (0xC0..=0xC3).contains(&marker) && index + 7 < data.len() {
let height = u16::from_be_bytes([data[index + 3], data[index + 4]]);
let width = u16::from_be_bytes([data[index + 5], data[index + 6]]);
return Ok((u32::from(width), u32::from(height), "jpeg"));
}
index += length;
}
return Err(AnalyzerError::Malformed(
"jpeg dimensions not found".to_owned(),
));
}
Err(AnalyzerError::Malformed(
"unsupported image payload".to_owned(),
))
}
#[cfg(test)]
mod tests {
use bytes::Bytes;
use super::*;
use crate::{Blob, test_support::run_sync_test};
fn blob(filename: &str, content_type: Option<&str>) -> Blob {
Blob::create(
Bytes::from(filename.as_bytes().to_vec()),
filename,
content_type,
Default::default(),
"memory",
)
.expect("blob should build")
}
fn png(width: u32, height: u32) -> Bytes {
let mut data = vec![137, 80, 78, 71, 13, 10, 26, 10];
data.extend_from_slice(&[0, 0, 0, 13, 73, 72, 68, 82]);
data.extend_from_slice(&width.to_be_bytes());
data.extend_from_slice(&height.to_be_bytes());
data.extend_from_slice(&[8, 2, 0, 0, 0]);
data.extend_from_slice(&[0, 0, 0, 0]);
Bytes::from(data)
}
fn gif(width: u16, height: u16) -> Bytes {
let mut data = b"GIF89a".to_vec();
data.extend_from_slice(&width.to_le_bytes());
data.extend_from_slice(&height.to_le_bytes());
data.extend_from_slice(&[0, 0, 0]);
Bytes::from(data)
}
fn jpeg(width: u16, height: u16) -> Bytes {
Bytes::from(vec![
0xFF,
0xD8, 0xFF,
0xE0,
0x00,
0x10,
b'J',
b'F',
b'I',
b'F',
0,
1,
1,
0,
0,
1,
0,
1,
0,
0,
0xFF,
0xC0,
0x00,
0x11,
0x08,
(height >> 8) as u8,
height as u8,
(width >> 8) as u8,
width as u8,
0x03,
0x01,
0x11,
0x00,
0x02,
0x11,
0x00,
0x03,
0x11,
0x00,
0xFF,
0xD9,
])
}
#[tokio::test]
async fn test_image_analyzer_reads_png_dimensions() {
let analysis = ImageAnalyzer
.analyze(&blob("image.png", Some("image/png")), &png(100, 50))
.await
.expect("analysis should succeed");
assert_eq!(analysis.metadata.get("width"), Some(&Value::from(100)));
assert_eq!(analysis.metadata.get("height"), Some(&Value::from(50)));
}
#[tokio::test]
async fn test_image_analyzer_reads_gif_dimensions() {
let analysis = ImageAnalyzer
.analyze(&blob("image.gif", Some("image/gif")), &gif(20, 10))
.await
.expect("analysis should succeed");
assert_eq!(analysis.metadata.get("width"), Some(&Value::from(20)));
assert_eq!(analysis.metadata.get("height"), Some(&Value::from(10)));
}
#[tokio::test]
async fn test_image_analyzer_reads_jpeg_dimensions() {
let analysis = ImageAnalyzer
.analyze(&blob("image.jpg", Some("image/jpeg")), &jpeg(640, 480))
.await
.expect("analysis should succeed");
assert_eq!(analysis.metadata.get("width"), Some(&Value::from(640)));
assert_eq!(analysis.metadata.get("height"), Some(&Value::from(480)));
}
#[tokio::test]
async fn test_text_analyzer_counts_lines_and_characters() {
let analysis = TextAnalyzer
.analyze(
&blob("notes.txt", Some("text/plain")),
&Bytes::from_static(b"a\nbc\n"),
)
.await
.expect("analysis should succeed");
assert_eq!(analysis.metadata.get("line_count"), Some(&Value::from(2)));
assert_eq!(
analysis.metadata.get("character_count"),
Some(&Value::from(5))
);
}
#[tokio::test]
async fn test_media_analyzer_reads_duration_marker() {
let analysis = MediaAnalyzer
.analyze(
&blob("clip.mp4", Some("video/mp4")),
&Bytes::from_static(b"duration=1.5 codec=h264"),
)
.await
.expect("analysis should succeed");
assert_eq!(
analysis.metadata.get("duration_seconds"),
Some(&Value::from(1.5))
);
}
#[tokio::test]
async fn test_registry_dispatches_to_matching_analyzer() {
let registry = AnalyzerRegistry::with_defaults();
let analysis = registry
.analyze(&blob("image.png", Some("image/png")), &png(1, 2))
.await
.expect("analysis should succeed");
assert_eq!(analysis.metadata.get("format"), Some(&Value::from("png")));
}
#[test]
fn test_registry_analyze_sync_dispatches_to_matching_analyzer() {
run_sync_test(|| {
let registry = AnalyzerRegistry::with_defaults();
let analysis = registry
.analyze_sync(&blob("image.png", Some("image/png")), &png(1, 2))
.expect("analysis should succeed");
assert_eq!(analysis.metadata.get("format"), Some(&Value::from("png")));
});
}
#[tokio::test]
async fn test_registry_returns_unsupported_when_nothing_matches() {
let registry = AnalyzerRegistry::with_defaults();
let error = registry
.analyze(
&blob("report.pdf", Some("application/pdf")),
&Bytes::from_static(b"%PDF-1.4"),
)
.await
.expect_err("analysis should fail");
assert!(matches!(error, AnalyzerError::Unsupported));
}
}