Skip to main content

lean_ctx/server/
bounded_lock.rs

1use std::sync::Arc;
2use std::time::Duration;
3use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
4
5const BASE_READ_TIMEOUT: Duration = Duration::from_secs(10);
6const BASE_WRITE_TIMEOUT: Duration = Duration::from_secs(10);
7
8/// Acquire a read lock with an adaptive timeout based on I/O health.
9/// Returns `None` on timeout (caller must provide graceful fallback).
10/// Records a freeze event for self-healing if the timeout is hit.
11///
12/// Callers are expected to already be in a blocking context (e.g. via
13/// `block_in_place` in the dispatch layer). This function uses
14/// `Handle::block_on` directly to avoid nested `block_in_place` calls
15/// that would consume additional blocking-pool threads.
16pub fn read<T: Send + Sync + 'static>(
17    lock: &Arc<RwLock<T>>,
18    context: &str,
19) -> Option<OwnedRwLockReadGuard<T>> {
20    let timeout = crate::core::io_health::adaptive_timeout(BASE_READ_TIMEOUT);
21    let lock_clone = lock.clone();
22    let rt = tokio::runtime::Handle::current();
23    let result = rt.block_on(tokio::time::timeout(timeout, lock_clone.read_owned()));
24    if let Ok(guard) = result {
25        Some(guard)
26    } else {
27        crate::core::io_health::record_freeze();
28        tracing::warn!(
29            "bounded_lock: read timeout ({}ms) for {context}; degrading gracefully",
30            timeout.as_millis()
31        );
32        None
33    }
34}
35
36/// Acquire a write lock with an adaptive timeout based on I/O health.
37/// Returns `None` on timeout (caller must provide graceful fallback).
38/// Records a freeze event for self-healing if the timeout is hit.
39///
40/// See `read()` for design rationale.
41pub fn write<T: Send + Sync + 'static>(
42    lock: &Arc<RwLock<T>>,
43    context: &str,
44) -> Option<OwnedRwLockWriteGuard<T>> {
45    let timeout = crate::core::io_health::adaptive_timeout(BASE_WRITE_TIMEOUT);
46    let lock_clone = lock.clone();
47    let rt = tokio::runtime::Handle::current();
48    let result = rt.block_on(tokio::time::timeout(timeout, lock_clone.write_owned()));
49    if let Ok(guard) = result {
50        Some(guard)
51    } else {
52        crate::core::io_health::record_freeze();
53        tracing::warn!(
54            "bounded_lock: write timeout ({}ms) for {context}; degrading gracefully",
55            timeout.as_millis()
56        );
57        None
58    }
59}