use ahash::AHashMap;
use bytes::Bytes;
use crossbeam::queue::ArrayQueue;
use parking_lot::RwLock;
use std::hash::{Hash, Hasher};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
#[cfg(all(not(feature = "quic"), not(target_os = "macos")))]
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
#[derive(Debug, Clone)]
pub struct HighPerfConfig {
pub cache_shards: usize,
pub max_entries_per_shard: usize,
pub buffer_pool_size: usize,
pub simd_json_enabled: bool,
pub connections_per_backend: usize,
pub batch_window_us: u64,
pub max_batch_size: usize,
pub cpu_affinity: bool,
pub preallocate_buffers: bool,
pub response_buffer_hint: usize,
}
impl Default for HighPerfConfig {
fn default() -> Self {
Self {
cache_shards: 64,
max_entries_per_shard: 10_000,
buffer_pool_size: 1024,
simd_json_enabled: true,
connections_per_backend: 100,
batch_window_us: 100,
max_batch_size: 100,
cpu_affinity: false,
preallocate_buffers: true,
response_buffer_hint: 4096,
}
}
}
impl HighPerfConfig {
pub fn ultra_fast() -> Self {
Self {
cache_shards: 128,
max_entries_per_shard: 100_000,
buffer_pool_size: 4096,
simd_json_enabled: true,
connections_per_backend: 200,
batch_window_us: 50,
max_batch_size: 200,
cpu_affinity: true,
preallocate_buffers: true,
response_buffer_hint: 8192,
}
}
pub fn balanced() -> Self {
Self {
cache_shards: 64,
max_entries_per_shard: 20_000,
buffer_pool_size: 2048,
simd_json_enabled: true,
connections_per_backend: 100,
batch_window_us: 100,
max_batch_size: 50,
cpu_affinity: false,
preallocate_buffers: true,
response_buffer_hint: 4096,
}
}
pub fn low_latency() -> Self {
Self {
cache_shards: 32,
max_entries_per_shard: 10_000,
buffer_pool_size: 512,
simd_json_enabled: true,
connections_per_backend: 50,
batch_window_us: 0, max_batch_size: 1,
cpu_affinity: true,
preallocate_buffers: false,
response_buffer_hint: 2048,
}
}
}
pub struct FastJsonParser {
buffer_pool: ObjectPool<Vec<u8>>,
}
impl FastJsonParser {
pub fn new(pool_size: usize) -> Self {
Self {
buffer_pool: ObjectPool::new(pool_size, || Vec::with_capacity(4096)),
}
}
pub fn parse_bytes(&self, input: &[u8]) -> Result<serde_json::Value, FastJsonError> {
if input.is_empty() {
return Err(FastJsonError::EmptyInput);
}
let mut buffer = self
.buffer_pool
.get()
.unwrap_or_else(|| Vec::with_capacity(input.len()));
buffer.clear();
buffer.extend_from_slice(input);
let owned_value = simd_json::to_owned_value(&mut buffer)
.map_err(|e| FastJsonError::ParseError(e.to_string()))?;
let result = convert_simd_owned_to_serde(owned_value);
buffer.clear();
self.buffer_pool.put(buffer);
Ok(result)
}
pub fn parse_str(&self, input: &str) -> Result<serde_json::Value, FastJsonError> {
self.parse_bytes(input.as_bytes())
}
pub fn serialize<T: serde::Serialize>(&self, value: &T) -> Result<Bytes, FastJsonError> {
let mut buffer = self
.buffer_pool
.get()
.unwrap_or_else(|| Vec::with_capacity(4096));
buffer.clear();
serde_json::to_writer(&mut buffer, value)
.map_err(|e| FastJsonError::SerializeError(e.to_string()))?;
let bytes = Bytes::copy_from_slice(&buffer);
buffer.clear();
self.buffer_pool.put(buffer);
Ok(bytes)
}
}
impl Default for FastJsonParser {
fn default() -> Self {
Self::new(256)
}
}
fn convert_simd_owned_to_serde(value: simd_json::OwnedValue) -> serde_json::Value {
match value {
simd_json::OwnedValue::Static(s) => match s {
simd_json::StaticNode::Null => serde_json::Value::Null,
simd_json::StaticNode::Bool(b) => serde_json::Value::Bool(b),
simd_json::StaticNode::I64(n) => serde_json::Value::Number(n.into()),
simd_json::StaticNode::U64(n) => serde_json::Value::Number(n.into()),
simd_json::StaticNode::F64(f) => serde_json::Number::from_f64(f)
.map(serde_json::Value::Number)
.unwrap_or(serde_json::Value::Null),
},
simd_json::OwnedValue::String(s) => serde_json::Value::String(s.to_string()),
simd_json::OwnedValue::Array(arr) => {
serde_json::Value::Array(arr.into_iter().map(convert_simd_owned_to_serde).collect())
}
simd_json::OwnedValue::Object(obj) => {
let map: serde_json::Map<String, serde_json::Value> = obj
.into_iter()
.map(|(k, v)| (k.to_string(), convert_simd_owned_to_serde(v)))
.collect();
serde_json::Value::Object(map)
}
}
}
#[allow(dead_code)]
fn convert_simd_to_serde(value: &simd_json::BorrowedValue) -> serde_json::Value {
match value {
simd_json::BorrowedValue::Static(s) => match s {
simd_json::StaticNode::Null => serde_json::Value::Null,
simd_json::StaticNode::Bool(b) => serde_json::Value::Bool(*b),
simd_json::StaticNode::I64(n) => serde_json::Value::Number((*n).into()),
simd_json::StaticNode::U64(n) => serde_json::Value::Number((*n).into()),
simd_json::StaticNode::F64(f) => serde_json::Number::from_f64(*f)
.map(serde_json::Value::Number)
.unwrap_or(serde_json::Value::Null),
},
simd_json::BorrowedValue::String(s) => serde_json::Value::String(s.to_string()),
simd_json::BorrowedValue::Array(arr) => {
serde_json::Value::Array(arr.iter().map(convert_simd_to_serde).collect())
}
simd_json::BorrowedValue::Object(obj) => {
let map: serde_json::Map<String, serde_json::Value> = obj
.iter()
.map(|(k, v)| (k.to_string(), convert_simd_to_serde(v)))
.collect();
serde_json::Value::Object(map)
}
}
}
#[derive(Debug, Clone)]
pub enum FastJsonError {
EmptyInput,
ParseError(String),
SerializeError(String),
}
impl std::fmt::Display for FastJsonError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::EmptyInput => write!(f, "Empty input"),
Self::ParseError(e) => write!(f, "JSON parse error: {}", e),
Self::SerializeError(e) => write!(f, "JSON serialize error: {}", e),
}
}
}
impl std::error::Error for FastJsonError {}
pub struct ShardedCache<V: Clone + Send + Sync> {
shards: Vec<CacheShard<V>>,
shard_mask: usize,
stats: CacheStats,
}
struct CacheShard<V> {
data: RwLock<AHashMap<u64, CacheEntry<V>>>,
max_entries: usize,
}
struct CacheEntry<V> {
value: V,
created_at: Instant,
ttl: Duration,
access_count: AtomicU64,
}
impl<V> CacheEntry<V> {
fn is_expired(&self) -> bool {
self.created_at.elapsed() > self.ttl
}
}
#[derive(Debug, Default)]
pub struct CacheStats {
hits: AtomicU64,
misses: AtomicU64,
evictions: AtomicU64,
insertions: AtomicU64,
}
impl CacheStats {
pub fn hit_rate(&self) -> f64 {
let hits = self.hits.load(Ordering::Relaxed) as f64;
let misses = self.misses.load(Ordering::Relaxed) as f64;
let total = hits + misses;
if total == 0.0 {
0.0
} else {
hits / total
}
}
pub fn hits(&self) -> u64 {
self.hits.load(Ordering::Relaxed)
}
pub fn misses(&self) -> u64 {
self.misses.load(Ordering::Relaxed)
}
pub fn evictions(&self) -> u64 {
self.evictions.load(Ordering::Relaxed)
}
pub fn total_requests(&self) -> u64 {
self.hits() + self.misses()
}
}
impl<V: Clone + Send + Sync> ShardedCache<V> {
pub fn new(num_shards: usize, max_entries_per_shard: usize) -> Self {
let num_shards = num_shards.next_power_of_two();
let shard_mask = num_shards - 1;
let shards = (0..num_shards)
.map(|_| CacheShard {
data: RwLock::new(AHashMap::with_capacity(max_entries_per_shard)),
max_entries: max_entries_per_shard,
})
.collect();
Self {
shards,
shard_mask,
stats: CacheStats::default(),
}
}
#[inline(always)]
fn shard_index(&self, key_hash: u64) -> usize {
(key_hash as usize) & self.shard_mask
}
#[inline(always)]
fn hash_key(key: &str) -> u64 {
let mut hasher = ahash::AHasher::default();
key.hash(&mut hasher);
hasher.finish()
}
pub fn get(&self, key: &str) -> Option<V> {
let hash = Self::hash_key(key);
let shard_idx = self.shard_index(hash);
let shard = &self.shards[shard_idx];
let data = shard.data.read();
if let Some(entry) = data.get(&hash) {
if !entry.is_expired() {
entry.access_count.fetch_add(1, Ordering::Relaxed);
self.stats.hits.fetch_add(1, Ordering::Relaxed);
return Some(entry.value.clone());
}
}
self.stats.misses.fetch_add(1, Ordering::Relaxed);
None
}
pub fn insert(&self, key: &str, value: V, ttl: Duration) {
let hash = Self::hash_key(key);
let shard_idx = self.shard_index(hash);
let shard = &self.shards[shard_idx];
let mut data = shard.data.write();
if data.len() >= shard.max_entries {
self.evict_from_shard(&mut data);
}
data.insert(
hash,
CacheEntry {
value,
created_at: Instant::now(),
ttl,
access_count: AtomicU64::new(1),
},
);
self.stats.insertions.fetch_add(1, Ordering::Relaxed);
}
fn evict_from_shard(&self, data: &mut AHashMap<u64, CacheEntry<V>>) {
let mut to_remove: Vec<u64> = data
.iter()
.filter(|(_, entry)| entry.is_expired())
.map(|(k, _)| *k)
.collect();
if to_remove.len() < data.len() / 4 {
let mut entries: Vec<_> = data
.iter()
.map(|(k, v)| (*k, v.access_count.load(Ordering::Relaxed)))
.collect();
entries.sort_by_key(|(_, count)| *count);
let evict_count = entries.len() / 4;
to_remove.extend(entries.iter().take(evict_count).map(|(k, _)| *k));
}
for key in to_remove {
data.remove(&key);
self.stats.evictions.fetch_add(1, Ordering::Relaxed);
}
}
pub fn remove(&self, key: &str) -> bool {
let hash = Self::hash_key(key);
let shard_idx = self.shard_index(hash);
let shard = &self.shards[shard_idx];
let mut data = shard.data.write();
data.remove(&hash).is_some()
}
pub fn clear(&self) {
for shard in &self.shards {
let mut data = shard.data.write();
data.clear();
}
}
pub fn stats(&self) -> &CacheStats {
&self.stats
}
pub fn len(&self) -> usize {
self.shards.iter().map(|s| s.data.read().len()).sum()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
unsafe impl<V: Clone + Send + Sync> Send for ShardedCache<V> {}
unsafe impl<V: Clone + Send + Sync> Sync for ShardedCache<V> {}
pub struct ObjectPool<T: Send> {
pool: ArrayQueue<T>,
factory: Box<dyn Fn() -> T + Send + Sync>,
_max_size: usize,
}
impl<T: Send> ObjectPool<T> {
pub fn new<F>(max_size: usize, factory: F) -> Self
where
F: Fn() -> T + Send + Sync + 'static,
{
let pool = ArrayQueue::new(max_size);
let prepopulate = max_size / 4;
for _ in 0..prepopulate {
let _ = pool.push(factory());
}
Self {
pool,
factory: Box::new(factory),
_max_size: max_size,
}
}
pub fn get(&self) -> Option<T> {
self.pool.pop().or_else(|| Some((self.factory)()))
}
pub fn put(&self, item: T) {
let _ = self.pool.push(item);
}
pub fn size(&self) -> usize {
self.pool.len()
}
pub fn is_empty(&self) -> bool {
self.pool.is_empty()
}
}
pub struct BatchProcessor<Req: Clone + Send, Resp: Clone + Send> {
pending: RwLock<Vec<BatchItem<Req, Resp>>>,
config: BatchConfig,
last_flush: RwLock<Instant>,
}
struct BatchItem<Req, Resp> {
request: Req,
response_tx: tokio::sync::oneshot::Sender<Resp>,
}
#[derive(Debug, Clone)]
pub struct BatchConfig {
pub window_us: u64,
pub max_size: usize,
pub min_immediate: usize,
}
impl Default for BatchConfig {
fn default() -> Self {
Self {
window_us: 100,
max_size: 100,
min_immediate: 10,
}
}
}
impl<Req: Clone + Send + 'static, Resp: Clone + Send + 'static> BatchProcessor<Req, Resp> {
pub fn new(config: BatchConfig) -> Self {
Self {
pending: RwLock::new(Vec::with_capacity(config.max_size)),
config,
last_flush: RwLock::new(Instant::now()),
}
}
pub async fn submit(&self, request: Req) -> tokio::sync::oneshot::Receiver<Resp> {
let (tx, rx) = tokio::sync::oneshot::channel();
{
let mut pending = self.pending.write();
pending.push(BatchItem {
request,
response_tx: tx,
});
}
rx
}
pub fn should_flush(&self) -> bool {
let pending = self.pending.read();
let last_flush = self.last_flush.read();
pending.len() >= self.config.max_size
|| (pending.len() >= self.config.min_immediate
&& last_flush.elapsed() > Duration::from_micros(self.config.window_us))
}
pub fn flush(&self) -> Vec<(Req, tokio::sync::oneshot::Sender<Resp>)> {
let mut pending = self.pending.write();
let mut last_flush = self.last_flush.write();
*last_flush = Instant::now();
pending
.drain(..)
.map(|item| (item.request, item.response_tx))
.collect()
}
}
#[derive(Debug, Clone)]
pub struct OptimizedConnectionConfig {
pub connections: usize,
pub keep_alive_interval: Duration,
pub keep_alive_timeout: Duration,
pub adaptive_window: bool,
pub initial_connection_window: u32,
pub initial_stream_window: u32,
pub tcp_nodelay: bool,
}
impl Default for OptimizedConnectionConfig {
fn default() -> Self {
Self {
connections: 100,
keep_alive_interval: Duration::from_secs(10),
keep_alive_timeout: Duration::from_secs(3),
adaptive_window: true,
initial_connection_window: 65535 * 16, initial_stream_window: 65535 * 16, tcp_nodelay: true,
}
}
}
impl OptimizedConnectionConfig {
pub fn high_throughput() -> Self {
Self {
connections: 200,
keep_alive_interval: Duration::from_secs(30),
keep_alive_timeout: Duration::from_secs(10),
adaptive_window: true,
initial_connection_window: 65535 * 32, initial_stream_window: 65535 * 32, tcp_nodelay: true,
}
}
pub fn configure_endpoint(
&self,
endpoint: tonic::transport::Endpoint,
) -> tonic::transport::Endpoint {
endpoint
.keep_alive_timeout(self.keep_alive_timeout)
.keep_alive_while_idle(true)
.http2_keep_alive_interval(self.keep_alive_interval)
.http2_adaptive_window(self.adaptive_window)
.initial_connection_window_size(self.initial_connection_window)
.initial_stream_window_size(self.initial_stream_window)
.tcp_nodelay(self.tcp_nodelay)
}
}
#[derive(Debug, Default)]
pub struct PerfMetrics {
pub requests: AtomicU64,
pub total_time_ns: AtomicU64,
pub max_latency_ns: AtomicU64,
pub cache_hits: AtomicU64,
pub cache_misses: AtomicU64,
pub batches: AtomicU64,
pub batched_requests: AtomicU64,
}
impl PerfMetrics {
pub fn record(&self, latency_ns: u64, cached: bool) {
self.requests.fetch_add(1, Ordering::Relaxed);
self.total_time_ns.fetch_add(latency_ns, Ordering::Relaxed);
let mut current = self.max_latency_ns.load(Ordering::Relaxed);
while latency_ns > current {
match self.max_latency_ns.compare_exchange_weak(
current,
latency_ns,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(c) => current = c,
}
}
if cached {
self.cache_hits.fetch_add(1, Ordering::Relaxed);
} else {
self.cache_misses.fetch_add(1, Ordering::Relaxed);
}
}
pub fn record_batch(&self, batch_size: u64) {
self.batches.fetch_add(1, Ordering::Relaxed);
self.batched_requests
.fetch_add(batch_size, Ordering::Relaxed);
}
pub fn rps(&self, duration: Duration) -> f64 {
let requests = self.requests.load(Ordering::Relaxed) as f64;
requests / duration.as_secs_f64()
}
pub fn avg_latency_us(&self) -> f64 {
let requests = self.requests.load(Ordering::Relaxed) as f64;
let total_ns = self.total_time_ns.load(Ordering::Relaxed) as f64;
if requests > 0.0 {
(total_ns / requests) / 1000.0
} else {
0.0
}
}
pub fn cache_hit_rate(&self) -> f64 {
let hits = self.cache_hits.load(Ordering::Relaxed) as f64;
let misses = self.cache_misses.load(Ordering::Relaxed) as f64;
let total = hits + misses;
if total > 0.0 {
hits / total
} else {
0.0
}
}
pub fn reset(&self) {
self.requests.store(0, Ordering::Relaxed);
self.total_time_ns.store(0, Ordering::Relaxed);
self.max_latency_ns.store(0, Ordering::Relaxed);
self.cache_hits.store(0, Ordering::Relaxed);
self.cache_misses.store(0, Ordering::Relaxed);
self.batches.store(0, Ordering::Relaxed);
self.batched_requests.store(0, Ordering::Relaxed);
}
}
pub fn pin_to_core(core_id: usize) -> Result<(), String> {
#[cfg(target_os = "linux")]
{
let result = unsafe {
let mut cpuset: libc::cpu_set_t = std::mem::zeroed();
libc::CPU_SET(core_id, &mut cpuset);
libc::sched_setaffinity(0, std::mem::size_of::<libc::cpu_set_t>(), &cpuset)
};
if result != 0 {
return Err(format!("Failed to set CPU affinity: {}", result));
}
}
#[cfg(target_os = "macos")]
{
extern "C" {
fn mach_thread_self() -> u32;
fn thread_policy_set(
thread: u32,
flavor: u32,
policy_info: *const i32,
count: u32,
) -> i32;
}
const THREAD_AFFINITY_POLICY: u32 = 4;
const THREAD_AFFINITY_POLICY_COUNT: u32 = 1;
let affinity_tag: i32 = (core_id + 1) as i32;
let result = unsafe {
let thread = mach_thread_self();
thread_policy_set(
thread,
THREAD_AFFINITY_POLICY,
&affinity_tag as *const i32,
THREAD_AFFINITY_POLICY_COUNT,
)
};
if result != 0 {
return Err(format!(
"Failed to set macOS thread affinity tag for core {}: kern_return {}",
core_id, result
));
}
tracing::debug!(
"macOS thread affinity tag set to {} (core_id={})",
affinity_tag,
core_id
);
}
#[cfg(target_os = "windows")]
{
use windows_sys::Win32::System::Threading::{GetCurrentThread, SetThreadAffinityMask};
let mask = 1usize << core_id;
let thread = unsafe { GetCurrentThread() };
let result = unsafe { SetThreadAffinityMask(thread, mask) };
if result == 0 {
return Err(format!(
"Failed to set Windows thread affinity for core {}: SetThreadAffinityMask failed",
core_id
));
}
tracing::debug!(
"Windows thread affinity set to mask {:b} (core_id={})",
mask,
core_id
);
}
#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
{
let _ = core_id;
tracing::debug!("CPU pinning not available on this platform");
}
Ok(())
}
pub fn recommended_workers() -> usize {
std::thread::available_parallelism()
.map(|p| p.get())
.unwrap_or(4)
}
pub struct ResponseTemplates {
pub empty_data: Bytes,
pub null_data: Bytes,
pub errors: AHashMap<String, Bytes>,
}
impl ResponseTemplates {
pub fn new() -> Self {
let empty_data = Bytes::from(r#"{"data":{}}"#);
let null_data = Bytes::from(r#"{"data":null}"#);
let mut errors = AHashMap::new();
errors.insert(
"UNAUTHORIZED".to_string(),
Bytes::from(
r#"{"errors":[{"message":"Unauthorized","extensions":{"code":"UNAUTHORIZED"}}]}"#,
),
);
errors.insert(
"NOT_FOUND".to_string(),
Bytes::from(
r#"{"errors":[{"message":"Not found","extensions":{"code":"NOT_FOUND"}}]}"#,
),
);
errors.insert(
"RATE_LIMITED".to_string(),
Bytes::from(r#"{"errors":[{"message":"Rate limit exceeded","extensions":{"code":"RATE_LIMITED"}}]}"#),
);
Self {
empty_data,
null_data,
errors,
}
}
}
impl Default for ResponseTemplates {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, PartialEq)]
struct TestPayload {
id: u64,
name: String,
values: Vec<i32>,
}
#[test]
fn test_fast_json_parser_roundtrip() {
let parser = FastJsonParser::new(10);
let payload = TestPayload {
id: 12345,
name: "test_parser".to_string(),
values: vec![1, 2, 3, 4, 5],
};
let bytes = parser.serialize(&payload).unwrap();
let json_val = parser.parse_bytes(&bytes).unwrap();
assert_eq!(json_val["id"], 12345);
assert_eq!(json_val["name"], "test_parser");
assert!(json_val["values"].is_array());
let values: Vec<i32> = serde_json::from_value(json_val["values"].clone()).unwrap();
assert_eq!(values, vec![1, 2, 3, 4, 5]);
}
#[test]
fn test_fast_json_parser_errors() {
let parser = FastJsonParser::new(10);
match parser.parse_bytes(&[]) {
Err(FastJsonError::EmptyInput) => {}
_ => panic!("Expected EmptyInput error"),
}
match parser.parse_str(r#"{"incomplete": "#) {
Err(FastJsonError::ParseError(_)) => {}
_ => panic!("Expected ParseError"),
}
}
#[test]
fn test_sharded_cache_basics() {
let cache: ShardedCache<String> = ShardedCache::new(16, 1000);
cache.insert("key1", "value1".to_string(), Duration::from_secs(60));
cache.insert("key2", "value2".to_string(), Duration::from_secs(60));
assert_eq!(cache.get("key1"), Some("value1".to_string()));
assert_eq!(cache.get("key2"), Some("value2".to_string()));
assert_eq!(cache.get("key3"), None);
assert!(cache.len() >= 2);
assert!(cache.remove("key1"));
assert_eq!(cache.get("key1"), None);
assert!(!cache.remove("key1"));
cache.clear();
assert!(cache.is_empty());
}
#[test]
fn test_sharded_cache_expiration() {
let cache: ShardedCache<String> = ShardedCache::new(4, 100);
cache.insert("short", "lived".to_string(), Duration::from_millis(10));
assert_eq!(cache.get("short"), Some("lived".to_string()));
std::thread::sleep(Duration::from_millis(20));
assert_eq!(cache.get("short"), None);
}
#[test]
fn test_sharded_cache_eviction_logic() {
let cache: ShardedCache<i32> = ShardedCache::new(1, 10);
for i in 0..10 {
cache.insert(&format!("key{}", i), i, Duration::from_secs(60));
}
for i in 0..5 {
let k = format!("key{}", i);
cache.get(&k);
cache.get(&k);
}
cache.insert("key10", 10, Duration::from_secs(60));
assert_eq!(cache.stats().evictions(), 2); }
#[test]
fn test_object_pool_lifecycle() {
let pool: ObjectPool<Vec<u8>> = ObjectPool::new(10, || Vec::with_capacity(1024));
let item1 = pool.get().unwrap();
let item2 = pool.get().unwrap();
assert_eq!(item1.capacity(), 1024);
assert_eq!(item2.capacity(), 1024);
pool.put(item1);
pool.put(item2);
}
#[test]
fn test_object_pool_exhaustion() {
let pool: ObjectPool<i32> = ObjectPool::new(2, || 42);
let i1 = pool.get().unwrap(); let i2 = pool.get().unwrap(); let i3 = pool.get().unwrap();
pool.put(i1);
pool.put(i2);
pool.put(i3);
assert!(pool.size() <= 2);
}
#[tokio::test]
async fn test_batch_processor() {
let config = BatchConfig {
window_us: 100000,
max_size: 5,
min_immediate: 2,
};
let processor = BatchProcessor::<i32, i32>::new(config);
let mut handlers = Vec::new();
for i in 0..5 {
handlers.push(processor.submit(i).await);
}
assert!(processor.should_flush());
let batch = processor.flush();
assert_eq!(batch.len(), 5);
for (req, tx) in batch {
let _ = tx.send(req * 2);
}
for (i, h) in handlers.into_iter().enumerate() {
let res = h.await.unwrap();
assert_eq!(res, (i as i32) * 2);
}
}
#[tokio::test]
async fn test_batch_processor_time_flush() {
let config = BatchConfig {
window_us: 1, max_size: 10,
min_immediate: 1,
};
let processor = BatchProcessor::<i32, i32>::new(config);
let _h = processor.submit(42).await;
tokio::time::sleep(Duration::from_millis(1)).await;
assert!(processor.should_flush());
let batch = processor.flush();
assert_eq!(batch.len(), 1);
}
#[test]
fn test_perf_metrics_recording() {
let metrics = PerfMetrics::default();
metrics.record(1000, true); metrics.record(2000, false); metrics.record(3000, true);
assert_eq!(metrics.requests.load(Ordering::Relaxed), 3);
assert_eq!(metrics.total_time_ns.load(Ordering::Relaxed), 6000);
assert_eq!(metrics.max_latency_ns.load(Ordering::Relaxed), 3000);
assert!(metrics.avg_latency_us() > 0.0);
assert!(metrics.cache_hit_rate() > 0.6);
metrics.reset();
assert_eq!(metrics.requests.load(Ordering::Relaxed), 0);
}
#[test]
fn test_high_perf_config_variants() {
let ultra = HighPerfConfig::ultra_fast();
assert_eq!(ultra.cache_shards, 128);
assert!(ultra.cpu_affinity);
let balanced = HighPerfConfig::balanced();
assert_eq!(balanced.cache_shards, 64);
assert!(!balanced.cpu_affinity);
let low_lat = HighPerfConfig::low_latency();
assert_eq!(low_lat.batch_window_us, 0); }
#[test]
fn test_response_templates() {
let templates = ResponseTemplates::new();
assert!(!templates.empty_data.is_empty());
assert!(!templates.null_data.is_empty());
assert!(templates.errors.contains_key("UNAUTHORIZED"));
assert!(templates.errors.contains_key("NOT_FOUND"));
}
}