use std::sync::{Arc, OnceLock};
use std::time::Duration;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
pub(super) const BLOB_READ_BUSY: &str = "blob read queue is full";
const DEFAULT_MAX_CONCURRENT_BLOB_READS: usize = 16;
const MAX_CONCURRENT_BLOB_READS_ENV: &str = "HTREE_MAX_CONCURRENT_BLOB_READS";
const DEFAULT_BLOB_READ_TIMEOUT_MS: u64 = 5_000;
const BLOB_READ_TIMEOUT_MS_ENV: &str = "HTREE_BLOB_READ_TIMEOUT_MS";
const DEFAULT_BLOB_READ_QUEUE_TIMEOUT_MS: u64 = 2_000;
const BLOB_READ_QUEUE_TIMEOUT_MS_ENV: &str = "HTREE_BLOB_READ_QUEUE_TIMEOUT_MS";
const DEFAULT_MAX_CONCURRENT_BLOB_WRITES: usize = 4;
const MAX_CONCURRENT_BLOB_WRITES_ENV: &str = "HTREE_MAX_CONCURRENT_BLOB_WRITES";
#[derive(Debug, Clone, Copy)]
pub(super) struct BlobIoQueueSnapshot {
pub read_limit: usize,
pub read_available: usize,
pub read_in_use: usize,
pub write_limit: usize,
pub write_available: usize,
pub write_in_use: usize,
pub read_queue_timeout_ms: u64,
pub read_task_timeout_ms: u64,
}
fn blob_read_limiter() -> &'static Arc<Semaphore> {
static LIMITER: OnceLock<Arc<Semaphore>> = OnceLock::new();
LIMITER.get_or_init(|| Arc::new(Semaphore::new(max_concurrent_blob_reads())))
}
fn blob_write_limiter() -> &'static Arc<Semaphore> {
static LIMITER: OnceLock<Arc<Semaphore>> = OnceLock::new();
LIMITER.get_or_init(|| Arc::new(Semaphore::new(max_concurrent_blob_writes())))
}
fn max_concurrent_blob_reads() -> usize {
std::env::var(MAX_CONCURRENT_BLOB_READS_ENV)
.ok()
.and_then(|value| value.parse::<usize>().ok())
.filter(|value| *value > 0)
.unwrap_or(DEFAULT_MAX_CONCURRENT_BLOB_READS)
}
fn max_concurrent_blob_writes() -> usize {
std::env::var(MAX_CONCURRENT_BLOB_WRITES_ENV)
.ok()
.and_then(|value| value.parse::<usize>().ok())
.filter(|value| *value > 0)
.unwrap_or(DEFAULT_MAX_CONCURRENT_BLOB_WRITES)
}
pub(super) async fn acquire_blob_read() -> Result<OwnedSemaphorePermit, &'static str> {
match tokio::time::timeout(
blob_read_queue_timeout(),
blob_read_limiter().clone().acquire_owned(),
)
.await
{
Ok(Ok(permit)) => Ok(permit),
Ok(Err(_)) => Err("blob read queue is closed"),
Err(_) => Err(BLOB_READ_BUSY),
}
}
pub(super) async fn acquire_blob_write() -> Result<OwnedSemaphorePermit, &'static str> {
blob_write_limiter()
.clone()
.acquire_owned()
.await
.map_err(|_| "blob write queue is closed")
}
pub(super) fn blob_read_timeout() -> Duration {
let millis = std::env::var(BLOB_READ_TIMEOUT_MS_ENV)
.ok()
.and_then(|value| value.parse::<u64>().ok())
.filter(|value| *value > 0)
.unwrap_or(DEFAULT_BLOB_READ_TIMEOUT_MS);
Duration::from_millis(millis)
}
fn blob_read_queue_timeout() -> Duration {
let millis = std::env::var(BLOB_READ_QUEUE_TIMEOUT_MS_ENV)
.ok()
.and_then(|value| value.parse::<u64>().ok())
.filter(|value| *value > 0)
.unwrap_or(DEFAULT_BLOB_READ_QUEUE_TIMEOUT_MS);
Duration::from_millis(millis)
}
pub(super) fn blob_io_queue_snapshot() -> BlobIoQueueSnapshot {
let read_limit = max_concurrent_blob_reads();
let read_available = blob_read_limiter().available_permits().min(read_limit);
let write_limit = max_concurrent_blob_writes();
let write_available = blob_write_limiter().available_permits().min(write_limit);
BlobIoQueueSnapshot {
read_limit,
read_available,
read_in_use: read_limit.saturating_sub(read_available),
write_limit,
write_available,
write_in_use: write_limit.saturating_sub(write_available),
read_queue_timeout_ms: duration_millis_u64(blob_read_queue_timeout()),
read_task_timeout_ms: duration_millis_u64(blob_read_timeout()),
}
}
fn duration_millis_u64(duration: Duration) -> u64 {
duration.as_millis().min(u128::from(u64::MAX)) as u64
}