use crate::error::Result;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
#[derive(Debug, Clone)]
pub struct PerformanceConfig {
pub parallel_processing: bool,
pub chunk_size: usize,
pub memory_pooling: bool,
pub cache_size: usize,
pub simd_enabled: bool,
pub gpu_enabled: bool,
pub memory_pool_size: usize,
pub num_workers: usize,
pub profiling_enabled: bool,
pub adaptive_chunking: bool,
pub memory_limit: usize,
}
impl Default for PerformanceConfig {
fn default() -> Self {
Self {
parallel_processing: true,
chunk_size: 100,
memory_pooling: true,
cache_size: 100,
simd_enabled: true,
gpu_enabled: false, memory_pool_size: 128 * 1024 * 1024, num_workers: 0, profiling_enabled: true,
adaptive_chunking: true,
memory_limit: 1024 * 1024 * 1024, }
}
}
pub struct ModelCache {
cache: Arc<RwLock<HashMap<String, Arc<String>>>>,
max_size: usize,
hits: Arc<AtomicU64>,
misses: Arc<AtomicU64>,
}
impl ModelCache {
pub fn new(max_size: usize) -> Self {
Self {
cache: Arc::new(RwLock::new(HashMap::new())),
max_size,
hits: Arc::new(AtomicU64::new(0)),
misses: Arc::new(AtomicU64::new(0)),
}
}
pub fn get(&self, urn: &str) -> Option<Arc<String>> {
let result = self.cache.read().ok()?.get(urn).cloned();
if result.is_some() {
self.hits.fetch_add(1, Ordering::Relaxed);
} else {
self.misses.fetch_add(1, Ordering::Relaxed);
}
result
}
pub fn put(&self, urn: String, content: Arc<String>) {
if let Ok(mut cache) = self.cache.write() {
if cache.len() >= self.max_size {
if let Some(key) = cache.keys().next().cloned() {
cache.remove(&key);
}
}
cache.insert(urn, content);
}
}
pub fn clear(&self) {
if let Ok(mut cache) = self.cache.write() {
cache.clear();
}
}
pub fn stats(&self) -> CacheStats {
if let Ok(cache) = self.cache.read() {
CacheStats {
size: cache.len(),
max_size: self.max_size,
hit_rate: self.calculate_hit_rate(),
}
} else {
CacheStats {
size: 0,
max_size: self.max_size,
hit_rate: 0.0,
}
}
}
fn calculate_hit_rate(&self) -> f64 {
let hits = self.hits.load(Ordering::Relaxed);
let misses = self.misses.load(Ordering::Relaxed);
let total = hits + misses;
if total == 0 {
0.0
} else {
(hits as f64) / (total as f64)
}
}
}
#[derive(Debug, Clone)]
pub struct CacheStats {
pub size: usize,
pub max_size: usize,
pub hit_rate: f64,
}
pub struct BatchProcessor {
config: PerformanceConfig,
cache: ModelCache,
num_workers: usize,
}
impl BatchProcessor {
pub fn new(config: PerformanceConfig) -> Self {
let cache = ModelCache::new(config.cache_size);
let num_workers = if config.num_workers == 0 {
num_cpus::get()
} else {
config.num_workers
};
Self {
config,
cache,
num_workers,
}
}
pub async fn process_batch<F, T>(&self, models: Vec<String>, processor: F) -> Result<Vec<T>>
where
F: Fn(&str) -> Result<T> + Send + Sync,
T: Send,
{
if !self.config.parallel_processing || models.len() < self.config.chunk_size {
models.iter().map(|m| processor(m)).collect()
} else {
self.process_parallel(&models, processor)
}
}
fn process_parallel<F, T>(&self, models: &[String], processor: F) -> Result<Vec<T>>
where
F: Fn(&str) -> Result<T> + Send + Sync,
T: Send,
{
let processor = Arc::new(processor);
use rayon::prelude::*;
let results: Result<Vec<T>> = models
.par_iter()
.map(|model| {
let proc = Arc::clone(&processor);
proc(model)
})
.collect();
results
}
pub fn cache(&self) -> &ModelCache {
&self.cache
}
pub fn num_workers(&self) -> usize {
self.num_workers
}
}
pub mod string_utils {
pub fn process_large_content<F, T>(content: &str, processor: F) -> T
where
F: FnOnce(&str) -> T,
{
if content.len() > 1_000_000 {
tracing::debug!("Processing large content: {} bytes", content.len());
}
processor(content)
}
pub fn simd_contains(haystack: &str, needle: &str) -> bool {
haystack.contains(needle)
}
pub fn parallel_split(content: &str, delimiter: char) -> Vec<String> {
content.split(delimiter).map(|s| s.to_string()).collect()
}
pub fn count_lines_efficient(content: &str) -> usize {
bytecount::count(content.as_bytes(), b'\n')
}
}
pub mod profiling {
use scirs2_core::profiling::{MemoryTracker, Profiler, Timer};
use std::time::Instant;
pub fn profile<F, T>(name: &str, f: F) -> (T, std::time::Duration)
where
F: FnOnce() -> T,
{
let timer = Timer::start(name);
let start = Instant::now();
let result = f();
let duration = start.elapsed();
timer.stop();
tracing::debug!("Performance: {} took {:?}", name, duration);
(result, duration)
}
pub async fn profile_async<F, T>(name: &str, f: F) -> (T, std::time::Duration)
where
F: std::future::Future<Output = T>,
{
let timer = Timer::start(name);
let start = Instant::now();
let result = f.await;
let duration = start.elapsed();
timer.stop();
tracing::debug!("Performance (async): {} took {:?}", name, duration);
(result, duration)
}
pub fn profile_memory<F, T>(name: &str, f: F) -> T
where
F: FnOnce() -> T,
{
let tracker = MemoryTracker::start(name);
let result = f();
tracker.stop();
result
}
pub fn get_global_profiler() -> std::sync::MutexGuard<'static, Profiler> {
Profiler::global()
.lock()
.expect("lock should not be poisoned")
}
pub fn start_profiling() {
get_global_profiler().start();
}
pub fn stop_profiling() {
get_global_profiler().stop();
}
pub fn print_profiling_report() {
get_global_profiler().print_report();
}
pub fn get_profiling_report() -> String {
format!("{:?}", get_global_profiler())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_model_cache() {
let cache = ModelCache::new(2);
cache.put("urn:1".to_string(), Arc::new("content1".to_string()));
cache.put("urn:2".to_string(), Arc::new("content2".to_string()));
assert!(cache.get("urn:1").is_some());
assert!(cache.get("urn:2").is_some());
cache.put("urn:3".to_string(), Arc::new("content3".to_string()));
let stats = cache.stats();
assert_eq!(stats.size, 2);
assert_eq!(stats.max_size, 2);
assert!(stats.hit_rate > 0.0 && stats.hit_rate <= 1.0);
}
#[test]
fn test_cache_hit_rate() {
let cache = ModelCache::new(10);
cache.put("urn:1".to_string(), Arc::new("content1".to_string()));
cache.put("urn:2".to_string(), Arc::new("content2".to_string()));
assert!(cache.get("urn:1").is_some());
assert!(cache.get("urn:2").is_some());
assert!(cache.get("urn:3").is_none());
let stats = cache.stats();
assert!((stats.hit_rate - 0.666).abs() < 0.01);
}
#[tokio::test]
async fn test_batch_processor() {
let config = PerformanceConfig {
parallel_processing: true,
chunk_size: 2,
..Default::default()
};
let processor = BatchProcessor::new(config);
let models = vec![
"model1".to_string(),
"model2".to_string(),
"model3".to_string(),
];
let results = processor
.process_batch(models, |m| Ok(m.len()))
.await
.expect("operation should succeed");
assert_eq!(results.len(), 3);
assert_eq!(results[0], 6); }
#[tokio::test]
async fn test_batch_processor_with_profiling() {
let config = PerformanceConfig {
parallel_processing: true,
profiling_enabled: true,
chunk_size: 2,
..Default::default()
};
let processor = BatchProcessor::new(config);
let models = vec!["a".to_string(), "b".to_string()];
let results = processor
.process_batch(models, |m| Ok(m.len()))
.await
.expect("operation should succeed");
assert_eq!(results.len(), 2);
assert_eq!(processor.num_workers(), num_cpus::get());
}
#[test]
fn test_performance_config_defaults() {
let config = PerformanceConfig::default();
assert!(config.parallel_processing);
assert!(config.memory_pooling);
assert!(config.simd_enabled);
assert!(config.profiling_enabled);
assert!(config.adaptive_chunking);
assert_eq!(config.chunk_size, 100);
assert_eq!(config.cache_size, 100);
}
#[test]
fn test_string_utils() {
use string_utils::*;
let content = "line1\nline2\nline3";
assert_eq!(count_lines_efficient(content), 2);
assert!(simd_contains("hello world", "world"));
assert!(!simd_contains("hello world", "rust"));
let parts = parallel_split("a,b,c,d", ',');
assert_eq!(parts.len(), 4);
assert_eq!(parts[0], "a");
}
}