use crate::core::config::ExtractionConfig;
use crate::types::ExtractionResult;
use crate::utils::pool::{ByteBufferPool, StringBufferPool, create_byte_buffer_pool, create_string_buffer_pool};
use crate::utils::pool_sizing::PoolSizeHint;
use crate::{KreuzbergError, Result};
use parking_lot::Mutex;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
#[derive(Debug, Clone)]
pub struct BatchProcessorConfig {
pub string_pool_size: usize,
pub string_buffer_capacity: usize,
pub byte_pool_size: usize,
pub byte_buffer_capacity: usize,
pub max_concurrent: Option<usize>,
}
impl Default for BatchProcessorConfig {
fn default() -> Self {
BatchProcessorConfig {
string_pool_size: 10,
string_buffer_capacity: 8192,
byte_pool_size: 10,
byte_buffer_capacity: 65536,
max_concurrent: None,
}
}
}
pub struct BatchProcessor {
string_pool: Mutex<Option<Arc<StringBufferPool>>>,
byte_pool: Mutex<Option<Arc<ByteBufferPool>>>,
config: BatchProcessorConfig,
string_pool_initialized: AtomicBool,
byte_pool_initialized: AtomicBool,
}
impl BatchProcessor {
pub fn new() -> Self {
Self::with_config(BatchProcessorConfig::default())
}
pub fn with_config(config: BatchProcessorConfig) -> Self {
BatchProcessor {
string_pool: Mutex::new(None),
byte_pool: Mutex::new(None),
config,
string_pool_initialized: AtomicBool::new(false),
byte_pool_initialized: AtomicBool::new(false),
}
}
pub fn with_pool_hint(hint: &PoolSizeHint) -> Self {
let config = BatchProcessorConfig {
string_pool_size: hint.string_buffer_count,
string_buffer_capacity: hint.string_buffer_capacity,
byte_pool_size: hint.byte_buffer_count,
byte_buffer_capacity: hint.byte_buffer_capacity,
max_concurrent: None,
};
Self::with_config(config)
}
pub fn string_pool(&self) -> Arc<StringBufferPool> {
if self.string_pool_initialized.load(Ordering::Acquire) {
return Arc::clone(self.string_pool.lock().as_ref().unwrap());
}
let mut pool_opt = self.string_pool.lock();
if pool_opt.is_none() {
let pool = Arc::new(create_string_buffer_pool(
self.config.string_pool_size,
self.config.string_buffer_capacity,
));
*pool_opt = Some(pool);
self.string_pool_initialized.store(true, Ordering::Release);
}
Arc::clone(pool_opt.as_ref().unwrap())
}
pub fn byte_pool(&self) -> Arc<ByteBufferPool> {
if self.byte_pool_initialized.load(Ordering::Acquire) {
return Arc::clone(self.byte_pool.lock().as_ref().unwrap());
}
let mut pool_opt = self.byte_pool.lock();
if pool_opt.is_none() {
let pool = Arc::new(create_byte_buffer_pool(
self.config.byte_pool_size,
self.config.byte_buffer_capacity,
));
*pool_opt = Some(pool);
self.byte_pool_initialized.store(true, Ordering::Release);
}
Arc::clone(pool_opt.as_ref().unwrap())
}
pub fn config(&self) -> &BatchProcessorConfig {
&self.config
}
#[cfg(feature = "tokio-runtime")]
pub async fn process_image_files(
&self,
paths: Vec<impl AsRef<Path>>,
extraction_config: &ExtractionConfig,
) -> Result<Vec<ExtractionResult>> {
use crate::core::extractor::batch_extract_file;
let items: Vec<(
std::path::PathBuf,
Option<crate::core::config::extraction::FileExtractionConfig>,
)> = paths.into_iter().map(|p| (p.as_ref().to_path_buf(), None)).collect();
batch_extract_file(items, extraction_config).await
}
#[cfg(feature = "tokio-runtime")]
pub async fn process_bytes(
&self,
contents: Vec<(&[u8], &str)>,
extraction_config: &ExtractionConfig,
) -> Result<Vec<ExtractionResult>> {
use crate::core::extractor::batch_extract_bytes;
let items: Vec<(
Vec<u8>,
String,
Option<crate::core::config::extraction::FileExtractionConfig>,
)> = contents
.into_iter()
.map(|(bytes, mime)| (bytes.to_vec(), mime.to_string(), None))
.collect();
batch_extract_bytes(items, extraction_config).await
}
pub fn string_pool_size(&self) -> usize {
self.string_pool.lock().as_ref().map(|p| p.size()).unwrap_or(0)
}
pub fn byte_pool_size(&self) -> usize {
self.byte_pool.lock().as_ref().map(|p| p.size()).unwrap_or(0)
}
pub fn clear_pools(&self) -> Result<()> {
let pool_opt = self.string_pool.lock();
if let Some(pool) = pool_opt.as_ref() {
pool.clear()
.map_err(|e| KreuzbergError::Other(format!("string pool error: {}", e)))?;
}
let pool_opt = self.byte_pool.lock();
if let Some(pool) = pool_opt.as_ref() {
pool.clear()
.map_err(|e| KreuzbergError::Other(format!("byte pool error: {}", e)))?;
}
Ok(())
}
}
impl Default for BatchProcessor {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_batch_processor_creation() {
let processor = BatchProcessor::new();
assert_eq!(processor.string_pool_size(), 0);
assert_eq!(processor.byte_pool_size(), 0);
}
#[test]
fn test_batch_processor_with_config() {
let config = BatchProcessorConfig {
string_pool_size: 5,
string_buffer_capacity: 1024,
byte_pool_size: 3,
byte_buffer_capacity: 4096,
max_concurrent: None,
};
let processor = BatchProcessor::with_config(config);
assert_eq!(processor.config().string_pool_size, 5);
assert_eq!(processor.config().byte_pool_size, 3);
}
#[test]
fn test_batch_processor_string_pool_usage() {
let processor = BatchProcessor::new();
let pool = processor.string_pool();
{
let mut s = pool.acquire().unwrap();
s.push_str("test");
}
{
let s = pool.acquire().unwrap();
assert_eq!(s.len(), 0);
}
}
#[test]
fn test_batch_processor_byte_pool_usage() {
let processor = BatchProcessor::new();
let pool = processor.byte_pool();
{
let mut buf = pool.acquire().unwrap();
buf.extend_from_slice(b"test");
}
{
let buf = pool.acquire().unwrap();
assert_eq!(buf.len(), 0);
}
}
#[test]
fn test_batch_processor_clear_pools() {
let processor = BatchProcessor::new();
let s1 = processor.string_pool().acquire().unwrap();
let s2 = processor.byte_pool().acquire().unwrap();
drop(s1);
drop(s2);
assert!(processor.string_pool_size() > 0);
assert!(processor.byte_pool_size() > 0);
processor.clear_pools().unwrap();
assert_eq!(processor.string_pool_size(), 0);
assert_eq!(processor.byte_pool_size(), 0);
}
}