use super::*;
use std::time::Instant;
use std::sync::Arc;
use core::sync::atomic::{ AtomicU64, AtomicUsize, Ordering };
use std::hash::{ Hash, Hasher };
use std::collections::hash_map::DefaultHasher;
use std::path::Path;
use bytes::{ Bytes, BytesMut };
use futures_util::Stream;
use tokio::sync::Semaphore;
#[ derive( Debug ) ]
pub struct MediaProcessingPipeline
{
config : MediaProcessingConfig,
cache : Arc< MediaCache >,
operation_semaphore : Arc< Semaphore >,
metrics : Arc< MediaProcessingMetrics >,
thumbnail_generator : Option< ThumbnailGenerator >,
}
#[ derive( Debug ) ]
pub struct MediaProcessingMetrics
{
pub files_processed : AtomicU64,
pub bytes_processed : AtomicU64,
pub total_processing_time_us : AtomicU64,
pub failed_operations : AtomicU64,
pub retries_performed : AtomicU64,
pub memory_high_watermark_bytes : AtomicUsize,
}
impl Default for MediaProcessingMetrics
{
#[ inline ]
fn default() -> Self
{
Self {
files_processed : AtomicU64::new( 0 ),
bytes_processed : AtomicU64::new( 0 ),
total_processing_time_us : AtomicU64::new( 0 ),
failed_operations : AtomicU64::new( 0 ),
retries_performed : AtomicU64::new( 0 ),
memory_high_watermark_bytes : AtomicUsize::new( 0 ),
}
}
}
impl MediaProcessingPipeline
{
#[ inline ]
#[ must_use ]
pub fn new( config : MediaProcessingConfig ) -> Self
{
let cache = Arc::new( MediaCache::new( config.clone() ) );
let operation_semaphore = Arc::new( Semaphore::new( config.max_concurrent_operations ) );
let thumbnail_generator = config.thumbnail_config.clone().map( | cfg | ThumbnailGenerator::new( cfg ) );
Self {
config,
cache,
operation_semaphore,
metrics : Arc::new( MediaProcessingMetrics::default() ),
thumbnail_generator,
}
}
#[ inline ]
pub async fn process_upload_bytes(
&self,
file_data : Bytes,
mime_type : String,
display_name : Option< String >
) -> Result< ProcessedMediaResult, crate::error::Error >
{
let start_time = Instant::now();
let _permit = self.operation_semaphore.acquire().await.unwrap();
let current_memory = file_data.len();
self.update_memory_watermark( current_memory );
let cache_key = self.generate_cache_key( &file_data, &mime_type );
if let Some( ( cached_data, metadata ) ) = self.cache.get( &cache_key )
{
return Ok( ProcessedMediaResult {
processed_data : cached_data.clone(),
metadata : ProcessedMediaMetadata {
original_size : metadata.original_size,
processed_size : cached_data.len(),
mime_type : metadata.mime_type,
is_compressed : metadata.is_compressed,
compression_ratio : metadata.compression_ratio,
processing_time_ms : 0, thumbnail_data : None, },
cache_hit : true,
} );
}
let processed_result = if file_data.len() > self.config.max_memory_file_size
{
self.process_large_file( file_data, mime_type.clone(), display_name ).await?
} else {
self.process_in_memory( file_data, mime_type.clone(), display_name ).await?
};
self.cache.put( cache_key, processed_result.processed_data.clone(), mime_type.clone() )?;
let processing_time = start_time.elapsed();
self.metrics.files_processed.fetch_add( 1, Ordering::Relaxed );
self.metrics.bytes_processed.fetch_add( current_memory as u64, Ordering::Relaxed );
self.metrics.total_processing_time_us.fetch_add( processing_time.as_micros() as u64, Ordering::Relaxed );
Ok( processed_result )
}
async fn process_in_memory(
&self,
file_data : Bytes,
mime_type : String,
_display_name : Option< String >
) -> Result< ProcessedMediaResult, crate::error::Error >
{
let start_time = Instant::now();
let original_size = file_data.len();
self.validate_file_format( &file_data, &mime_type )?;
let thumbnail_data = if let Some( ref generator ) = self.thumbnail_generator
{
if self.is_image_type( &mime_type )
{
generator.generate_thumbnail( &file_data, &mime_type ).await.ok()
} else {
None
}
} else {
None
};
let processed_data = file_data;
let processed_size = processed_data.len();
Ok( ProcessedMediaResult {
processed_data,
metadata : ProcessedMediaMetadata {
original_size,
processed_size,
mime_type,
is_compressed : false,
compression_ratio : 1.0,
processing_time_ms : start_time.elapsed().as_millis() as u64,
thumbnail_data,
},
cache_hit : false,
} )
}
async fn process_large_file(
&self,
file_data : Bytes,
mime_type : String,
_display_name : Option< String >
) -> Result< ProcessedMediaResult, crate::error::Error >
{
let start_time = Instant::now();
let original_size = file_data.len();
let chunk_size = self.config.streaming_chunk_size;
let mut processed_chunks = Vec::new();
for chunk in file_data.chunks( chunk_size )
{
processed_chunks.push( Bytes::copy_from_slice( chunk ) );
}
let mut combined = BytesMut::new();
for chunk in processed_chunks
{
combined.extend_from_slice( &chunk );
}
let processed_data = combined.freeze();
Ok( ProcessedMediaResult {
processed_data : processed_data.clone(),
metadata : ProcessedMediaMetadata {
original_size,
processed_size : processed_data.len(),
mime_type,
is_compressed : false,
compression_ratio : 1.0,
processing_time_ms : start_time.elapsed().as_millis() as u64,
thumbnail_data : None, },
cache_hit : false,
} )
}
fn validate_file_format( &self, file_data : &Bytes, declared_mime_type : &str ) -> Result< (), crate::error::Error >
{
if file_data.is_empty()
{
return Err( crate::error::Error::ApiError( "Empty file data".to_string() ) );
}
let signatures = [
( "image/jpeg", &[ 0xFF, 0xD8, 0xFF, 0xE0 ] ),
( "image/png", &[ 0x89, 0x50, 0x4E, 0x47 ] ),
( "image/gif", &[ 0x47, 0x49, 0x46, 0x38 ] ),
( "application/pdf", &[ 0x25, 0x50, 0x44, 0x46 ] ),
];
for ( mime_type, signature ) in &signatures
{
if declared_mime_type == *mime_type
{
if file_data.len() >= signature.len() && &file_data[ ..signature.len() ] == *signature
{
return Ok( () );
}
return Err( crate::error::Error::ApiError(
format!( "File signature doesn't match declared MIME type : {}", declared_mime_type )
) );
}
}
if file_data.len() > 100 * 1024 * 1024 {
return Err( crate::error::Error::ApiError( "File too large".to_string() ) );
}
Ok( () )
}
fn is_image_type( &self, mime_type : &str ) -> bool
{
mime_type.starts_with( "image/" )
}
fn generate_cache_key( &self, file_data : &Bytes, mime_type : &str ) -> String
{
let mut hasher = DefaultHasher::new();
file_data.hash( &mut hasher );
mime_type.hash( &mut hasher );
format!( "media_{:x}", hasher.finish() )
}
fn update_memory_watermark( &self, current_usage : usize )
{
let current_watermark = self.metrics.memory_high_watermark_bytes.load( Ordering::Relaxed );
if current_usage > current_watermark
{
self.metrics.memory_high_watermark_bytes.store( current_usage, Ordering::Relaxed );
}
}
pub fn get_metrics( &self ) -> MediaProcessingMetricsReport
{
let files_processed = self.metrics.files_processed.load( Ordering::Relaxed );
let total_time_us = self.metrics.total_processing_time_us.load( Ordering::Relaxed );
let avg_processing_time_ms = if files_processed > 0
{
( total_time_us / files_processed ) / 1000 } else {
0
};
MediaProcessingMetricsReport {
files_processed,
bytes_processed : self.metrics.bytes_processed.load( Ordering::Relaxed ),
avg_processing_time_ms,
failed_operations : self.metrics.failed_operations.load( Ordering::Relaxed ),
retries_performed : self.metrics.retries_performed.load( Ordering::Relaxed ),
memory_high_watermark_bytes : self.metrics.memory_high_watermark_bytes.load( Ordering::Relaxed ),
cache_stats : self.cache.get_stats(),
}
}
pub async fn clear_cache( &self )
{
self.cache.clear();
}
pub fn get_cache_stats( &self ) -> MediaCacheStatsReport
{
self.cache.get_stats()
}
pub async fn process_upload( &self, file_path : &Path ) -> Result< ProcessedMediaResult, crate::error::Error >
{
let file_data = std::fs::read( file_path )
.map_err( | e | crate::error::Error::ApiError( format!( "Failed to read file : {}", e ) ) )?;
let mime_type = match file_path.extension().and_then( | ext | ext.to_str() )
{
Some("jpg" | "jpeg") => "image/jpeg".to_string(),
Some( "png" ) => "image/png".to_string(),
Some( "gif" ) => "image/gif".to_string(),
Some( "pdf" ) => "application/pdf".to_string(),
_ => "application/octet-stream".to_string(),
};
let display_name = file_path.file_name()
.and_then( | name | name.to_str() )
.map( | s | s.to_string() );
self.process_upload_bytes( Bytes::from( file_data ), mime_type, display_name ).await
}
pub async fn process_download( &self, _file_id : &str, _destination : &Path ) -> Result< ProcessedMediaResult, crate::error::Error >
{
Err( crate::error::Error::ApiError( "Download functionality not implemented yet".to_string() ) )
}
pub async fn process_stream< S >(
&self,
mut _stream : S,
_metadata : ProcessedMediaMetadata
) -> Result< ProcessedMediaResult, crate::error::Error >
where
S: Stream< Item = Result< Bytes, crate::error::Error > > + Send + Unpin,
{
Err( crate::error::Error::ApiError( "Stream processing functionality not implemented yet".to_string() ) )
}
}