use std::{marker::PhantomData, sync::Arc, time::Duration};
use futures_lite;
use parking_lot::Mutex;
use smol::Timer;
pub async fn with_timeout<T, F>(duration: Duration, future: F) -> Result<T, TimeoutError>
where
F: std::future::Future<Output = T>,
{
match futures_lite::future::or(
async {
Timer::after(duration).await;
Err::<T, TimeoutError>(TimeoutError::Timeout)
},
async { Ok(future.await) },
)
.await
{
Ok(result) => Ok(result),
Err(e) => Err(e),
}
}
#[derive(Debug, Clone)]
pub enum TimeoutError {
Timeout,
}
pub async fn retry_async<F, Fut, T, E>(operation: F) -> Result<T, E>
where
F: Fn() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
E: std::fmt::Debug,
{
retry_async_with_config(3, Duration::from_millis(100), operation).await
}
pub async fn retry_async_with_config<F, Fut, T, E>(
mut attempts: usize,
delay: Duration,
operation: F,
) -> Result<T, E>
where
F: Fn() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
E: std::fmt::Debug,
{
loop {
match operation().await {
Ok(result) => return Ok(result),
Err(e) => {
attempts -= 1;
if attempts == 0 {
return Err(e);
}
Timer::after(delay).await;
}
}
}
}
pub struct AsyncCircuitBreaker {
failures: Mutex<u32>,
threshold: u32,
timeout: Duration,
last_failure: Mutex<Option<std::time::Instant>>,
}
impl AsyncCircuitBreaker {
#[must_use]
pub fn new() -> Self {
Self::with_config(5, Duration::from_secs(60))
}
#[must_use]
pub fn with_config(threshold: u32, timeout: Duration) -> Self {
Self {
failures: Mutex::new(0),
threshold,
timeout,
last_failure: Mutex::new(None),
}
}
#[must_use]
pub fn builder() -> AsyncCircuitBreakerBuilder {
AsyncCircuitBreakerBuilder::new()
}
pub async fn execute<F, Fut, T, E>(&self, operation: F) -> Result<T, CircuitBreakerError<E>>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
{
if let Some(last_failure) = *self.last_failure.lock()
&& last_failure.elapsed() < self.timeout
{
return Err(CircuitBreakerError::CircuitOpen);
}
match operation().await {
Ok(result) => {
*self.failures.lock() = 0;
Ok(result)
}
Err(e) => {
let failures = {
let mut f = self.failures.lock();
*f += 1;
*f
};
if failures >= self.threshold {
*self.last_failure.lock() = Some(std::time::Instant::now());
}
Err(CircuitBreakerError::OperationError(e))
}
}
}
}
impl Default for AsyncCircuitBreaker {
fn default() -> Self {
Self::new()
}
}
pub struct AsyncCircuitBreakerBuilder {
threshold: u32,
timeout: Duration,
}
impl AsyncCircuitBreakerBuilder {
#[must_use]
pub fn new() -> Self {
Self {
threshold: 5,
timeout: Duration::from_secs(60),
}
}
#[must_use]
pub fn threshold(mut self, threshold: u32) -> Self {
self.threshold = threshold;
self
}
#[must_use]
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
#[must_use]
pub fn build(self) -> AsyncCircuitBreaker {
AsyncCircuitBreaker::with_config(self.threshold, self.timeout)
}
}
impl Default for AsyncCircuitBreakerBuilder {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub enum CircuitBreakerError<E> {
CircuitOpen,
OperationError(E),
}
pub async fn parallel_process_async<T, U, F>(data: Vec<T>, processor: F) -> Vec<U>
where
T: Send + 'static,
U: Send + 'static,
F: Fn(T) -> U + Send + Sync + 'static,
{
let processor = Arc::new(processor);
let mut tasks = Vec::new();
for item in data {
let processor = processor.clone();
let task = smol::spawn(async move { smol::unblock(move || processor(item)).await });
tasks.push(task);
}
let mut results = Vec::new();
for task in tasks {
results.push(task.await);
}
results
}
pub struct AsyncResourcePool<T> {
resources: Mutex<Vec<T>>,
factory: Box<dyn Fn() -> T + Send + Sync>,
max_size: usize,
}
impl<T> AsyncResourcePool<T> {
pub fn new<F>(factory: F) -> Self
where
F: Fn() -> T + Send + Sync + 'static,
{
Self::with_config(factory, 10)
}
pub fn with_config<F>(factory: F, max_size: usize) -> Self
where
F: Fn() -> T + Send + Sync + 'static,
{
Self {
resources: Mutex::new(Vec::new()),
factory: Box::new(factory),
max_size,
}
}
pub fn builder<F>(factory: F) -> AsyncResourcePoolBuilder<T, F>
where
F: Fn() -> T + Send + Sync + 'static,
{
AsyncResourcePoolBuilder::new(factory)
}
pub fn acquire(&self) -> ResourceGuard<'_, T> {
let resource = {
let mut resources = self.resources.lock();
resources.pop().unwrap_or_else(|| (self.factory)())
};
ResourceGuard {
resource: Some(resource),
pool: self,
}
}
fn release(&self, resource: T) {
let mut resources = self.resources.lock();
if resources.len() < self.max_size {
resources.push(resource);
}
}
}
pub struct AsyncResourcePoolBuilder<T, F> {
factory: F,
max_size: usize,
_phantom: PhantomData<T>,
}
impl<T, F> AsyncResourcePoolBuilder<T, F>
where
F: Fn() -> T + Send + Sync + 'static,
{
pub fn new(factory: F) -> Self {
Self {
factory,
max_size: 10,
_phantom: PhantomData,
}
}
#[must_use]
pub fn max_size(mut self, max_size: usize) -> Self {
self.max_size = max_size;
self
}
pub fn build(self) -> AsyncResourcePool<T> {
AsyncResourcePool::with_config(self.factory, self.max_size)
}
}
pub struct ResourceGuard<'a, T> {
resource: Option<T>,
pool: &'a AsyncResourcePool<T>,
}
impl<T> std::ops::Deref for ResourceGuard<'_, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.resource
.as_ref()
.expect("resource must exist in guard")
}
}
impl<T> std::ops::DerefMut for ResourceGuard<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.resource
.as_mut()
.expect("resource must exist in guard")
}
}
impl<T> Drop for ResourceGuard<'_, T> {
fn drop(&mut self) {
if let Some(resource) = self.resource.take() {
self.pool.release(resource);
}
}
}
pub struct AsyncStreamProcessor<T, F> {
processor: F,
buffer: Mutex<Vec<T>>,
buffer_size: usize,
}
impl<T, F, Fut> AsyncStreamProcessor<T, F>
where
F: Fn(Vec<T>) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = ()> + Send + 'static,
T: Send + 'static,
{
pub fn new(processor: F) -> Self {
Self::with_config(processor, 100)
}
pub fn with_config(processor: F, buffer_size: usize) -> Self {
Self {
processor,
buffer: Mutex::new(Vec::new()),
buffer_size,
}
}
pub fn builder(processor: F) -> AsyncStreamProcessorBuilder<T, F> {
AsyncStreamProcessorBuilder::new(processor)
}
pub async fn push(&self, item: T) {
let should_process = {
let mut buffer = self.buffer.lock();
buffer.push(item);
buffer.len() >= self.buffer_size
};
if should_process {
self.process_batch().await;
}
}
pub async fn flush(&self) {
self.process_batch().await;
}
async fn process_batch(&self) {
let batch = {
let mut buffer = self.buffer.lock();
std::mem::take(&mut *buffer)
};
if !batch.is_empty() {
(self.processor)(batch).await;
}
}
}
pub struct AsyncStreamProcessorBuilder<T, F> {
processor: F,
buffer_size: usize,
_phantom: PhantomData<T>,
}
impl<T, F> AsyncStreamProcessorBuilder<T, F> {
pub fn new(processor: F) -> Self {
Self {
processor,
buffer_size: 100,
_phantom: PhantomData,
}
}
#[must_use]
pub fn buffer_size(mut self, size: usize) -> Self {
self.buffer_size = size;
self
}
pub fn build<Fut>(self) -> AsyncStreamProcessor<T, F>
where
F: Fn(Vec<T>) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = ()> + Send + 'static,
T: Send + 'static,
{
AsyncStreamProcessor::with_config(self.processor, self.buffer_size)
}
}
#[derive(Debug)]
pub struct AsyncPerformanceMonitor {
operations: Mutex<Vec<(String, Duration)>>,
}
impl AsyncPerformanceMonitor {
#[must_use]
pub fn new() -> Self {
Self {
operations: Mutex::new(Vec::new()),
}
}
pub async fn time_operation<F, Fut, T>(&self, name: &str, operation: F) -> T
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = T>,
{
let start = std::time::Instant::now();
let result = operation().await;
let duration = start.elapsed();
self.operations.lock().push((name.to_string(), duration));
result
}
pub fn stats(&self) -> Vec<(String, Duration)> {
self.operations.lock().clone()
}
pub fn clear(&self) -> &Self {
self.operations.lock().clear();
self
}
pub fn avg_duration(&self, name: &str) -> Option<Duration> {
let operations = self.operations.lock();
let matching: Vec<_> = operations
.iter()
.filter(|(op_name, _)| op_name == name)
.map(|(_, duration)| *duration)
.collect();
if matching.is_empty() {
None
} else {
let total: Duration = matching.iter().sum();
let len = u32::try_from(matching.len()).unwrap_or(1);
Some(total / len)
}
}
pub fn operation_count(&self) -> usize {
self.operations.lock().len()
}
pub fn operations_for(&self, name: &str) -> Vec<Duration> {
self.operations
.lock()
.iter()
.filter(|(op_name, _)| op_name == name)
.map(|(_, duration)| *duration)
.collect()
}
}
impl Default for AsyncPerformanceMonitor {
fn default() -> Self {
Self::new()
}
}
pub async fn traced_async_operation<F, Fut, T>(_name: &str, f: F) -> T
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = T>,
{
f().await
}