hashtree-cli 0.2.58

Hashtree daemon and CLI - content-addressed storage with P2P sync
Documentation
use std::sync::{Arc, OnceLock};
use std::time::Duration;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};

pub(super) const BLOB_READ_BUSY: &str = "blob read queue is full";
const DEFAULT_MAX_CONCURRENT_BLOB_READS: usize = 16;
const MAX_CONCURRENT_BLOB_READS_ENV: &str = "HTREE_MAX_CONCURRENT_BLOB_READS";
const DEFAULT_BLOB_READ_TIMEOUT_MS: u64 = 5_000;
const BLOB_READ_TIMEOUT_MS_ENV: &str = "HTREE_BLOB_READ_TIMEOUT_MS";
const DEFAULT_BLOB_READ_QUEUE_TIMEOUT_MS: u64 = 2_000;
const BLOB_READ_QUEUE_TIMEOUT_MS_ENV: &str = "HTREE_BLOB_READ_QUEUE_TIMEOUT_MS";
const DEFAULT_MAX_CONCURRENT_BLOB_WRITES: usize = 4;
const MAX_CONCURRENT_BLOB_WRITES_ENV: &str = "HTREE_MAX_CONCURRENT_BLOB_WRITES";

#[derive(Debug, Clone, Copy)]
pub(super) struct BlobIoQueueSnapshot {
    pub read_limit: usize,
    pub read_available: usize,
    pub read_in_use: usize,
    pub write_limit: usize,
    pub write_available: usize,
    pub write_in_use: usize,
    pub read_queue_timeout_ms: u64,
    pub read_task_timeout_ms: u64,
}

fn blob_read_limiter() -> &'static Arc<Semaphore> {
    static LIMITER: OnceLock<Arc<Semaphore>> = OnceLock::new();
    LIMITER.get_or_init(|| Arc::new(Semaphore::new(max_concurrent_blob_reads())))
}

fn blob_write_limiter() -> &'static Arc<Semaphore> {
    static LIMITER: OnceLock<Arc<Semaphore>> = OnceLock::new();
    LIMITER.get_or_init(|| Arc::new(Semaphore::new(max_concurrent_blob_writes())))
}

fn max_concurrent_blob_reads() -> usize {
    std::env::var(MAX_CONCURRENT_BLOB_READS_ENV)
        .ok()
        .and_then(|value| value.parse::<usize>().ok())
        .filter(|value| *value > 0)
        .unwrap_or(DEFAULT_MAX_CONCURRENT_BLOB_READS)
}

fn max_concurrent_blob_writes() -> usize {
    std::env::var(MAX_CONCURRENT_BLOB_WRITES_ENV)
        .ok()
        .and_then(|value| value.parse::<usize>().ok())
        .filter(|value| *value > 0)
        .unwrap_or(DEFAULT_MAX_CONCURRENT_BLOB_WRITES)
}

pub(super) async fn acquire_blob_read() -> Result<OwnedSemaphorePermit, &'static str> {
    match tokio::time::timeout(
        blob_read_queue_timeout(),
        blob_read_limiter().clone().acquire_owned(),
    )
    .await
    {
        Ok(Ok(permit)) => Ok(permit),
        Ok(Err(_)) => Err("blob read queue is closed"),
        Err(_) => Err(BLOB_READ_BUSY),
    }
}

pub(super) async fn acquire_blob_write() -> Result<OwnedSemaphorePermit, &'static str> {
    blob_write_limiter()
        .clone()
        .acquire_owned()
        .await
        .map_err(|_| "blob write queue is closed")
}

pub(super) fn blob_read_timeout() -> Duration {
    let millis = std::env::var(BLOB_READ_TIMEOUT_MS_ENV)
        .ok()
        .and_then(|value| value.parse::<u64>().ok())
        .filter(|value| *value > 0)
        .unwrap_or(DEFAULT_BLOB_READ_TIMEOUT_MS);
    Duration::from_millis(millis)
}

fn blob_read_queue_timeout() -> Duration {
    let millis = std::env::var(BLOB_READ_QUEUE_TIMEOUT_MS_ENV)
        .ok()
        .and_then(|value| value.parse::<u64>().ok())
        .filter(|value| *value > 0)
        .unwrap_or(DEFAULT_BLOB_READ_QUEUE_TIMEOUT_MS);
    Duration::from_millis(millis)
}

pub(super) fn blob_io_queue_snapshot() -> BlobIoQueueSnapshot {
    let read_limit = max_concurrent_blob_reads();
    let read_available = blob_read_limiter().available_permits().min(read_limit);
    let write_limit = max_concurrent_blob_writes();
    let write_available = blob_write_limiter().available_permits().min(write_limit);

    BlobIoQueueSnapshot {
        read_limit,
        read_available,
        read_in_use: read_limit.saturating_sub(read_available),
        write_limit,
        write_available,
        write_in_use: write_limit.saturating_sub(write_available),
        read_queue_timeout_ms: duration_millis_u64(blob_read_queue_timeout()),
        read_task_timeout_ms: duration_millis_u64(blob_read_timeout()),
    }
}

fn duration_millis_u64(duration: Duration) -> u64 {
    duration.as_millis().min(u128::from(u64::MAX)) as u64
}