use serde::Serialize;
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread;
use std::time::{Duration, Instant};
const BUSY_TIMEOUT_MS: u64 = 5000;
pub(crate) static SQLITE_BUSY_COUNT: AtomicU64 = AtomicU64::new(0);
thread_local! {
static BUSY_SESSION_START: std::cell::RefCell<Option<Instant>> = std::cell::RefCell::new(None);
}
#[derive(Debug, Default)]
pub struct StoreMetricsInner {
pub store_wait_us: AtomicU64,
pub store_write_us: AtomicU64,
pub last_txn_batch_size: AtomicU64,
pub max_txn_batch_size: AtomicU64,
}
impl StoreMetricsInner {
pub fn add_wait_us(&self, us: u64) {
self.store_wait_us.fetch_add(us, Ordering::Relaxed);
}
pub fn add_write_us(&self, us: u64) {
self.store_write_us.fetch_add(us, Ordering::Relaxed);
}
pub fn record_batch_size(&self, n: usize) {
let n = n as u64;
self.last_txn_batch_size.store(n, Ordering::Relaxed);
self.max_txn_batch_size.fetch_max(n, Ordering::Relaxed);
}
pub fn snapshot(&self, reset: bool) -> StoreMetricsSnapshot {
let sqlite_busy_count = SQLITE_BUSY_COUNT.load(Ordering::Relaxed);
let store_wait_us = self.store_wait_us.load(Ordering::Relaxed);
let store_write_us = self.store_write_us.load(Ordering::Relaxed);
let _last_txn_batch_size = self.last_txn_batch_size.load(Ordering::Relaxed);
let max_txn_batch_size = self.max_txn_batch_size.load(Ordering::Relaxed);
if reset {
SQLITE_BUSY_COUNT.store(0, Ordering::Relaxed);
self.store_wait_us.store(0, Ordering::Relaxed);
self.store_write_us.store(0, Ordering::Relaxed);
self.last_txn_batch_size.store(0, Ordering::Relaxed);
self.max_txn_batch_size.store(0, Ordering::Relaxed);
}
StoreMetricsSnapshot {
sqlite_busy_count,
store_wait_ms: store_wait_us / 1000,
store_write_ms: store_write_us / 1000,
store_wait_us: Some(store_wait_us),
store_write_us: Some(store_write_us),
txn_batch_size: if max_txn_batch_size > 0 {
Some(max_txn_batch_size)
} else {
None
},
effective_pragmas: None,
wal_checkpoint: None,
busy_handler: Some("counting_timeout".to_string()),
busy_timeout_configured_ms: Some(BUSY_TIMEOUT_MS),
store_wait_pct: None,
store_write_pct: None,
}
}
}
#[derive(Debug, Clone, Serialize, serde::Deserialize)]
pub struct EffectivePragmas {
pub journal_mode: String,
pub synchronous: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub synchronous_human: Option<String>,
pub busy_timeout: i64,
pub wal_autocheckpoint: i64,
}
#[derive(Debug, Clone, Serialize, serde::Deserialize)]
pub struct WalCheckpointResult {
pub blocked: i32,
pub log_frames: i32,
pub checkpointed_frames: i32,
}
#[derive(Debug, Clone, Serialize, serde::Deserialize)]
pub struct StoreMetricsSnapshot {
pub sqlite_busy_count: u64,
pub store_wait_ms: u64,
pub store_write_ms: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub store_wait_us: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub store_write_us: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub txn_batch_size: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub effective_pragmas: Option<EffectivePragmas>,
#[serde(skip_serializing_if = "Option::is_none")]
pub wal_checkpoint: Option<WalCheckpointResult>,
#[serde(skip_serializing_if = "Option::is_none")]
pub busy_handler: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub busy_timeout_configured_ms: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub store_wait_pct: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub store_write_pct: Option<f64>,
}
pub fn reset_busy_count() {
SQLITE_BUSY_COUNT.store(0, Ordering::Relaxed);
}
pub fn busy_handler(retries: i32) -> bool {
SQLITE_BUSY_COUNT.fetch_add(1, Ordering::Relaxed);
BUSY_SESSION_START.with(|cell| {
let mut start = cell.borrow_mut();
if retries == 0 {
*start = Some(Instant::now());
}
let elapsed_ms = start
.as_ref()
.map(|s| s.elapsed().as_millis() as u64)
.unwrap_or(0);
if elapsed_ms >= BUSY_TIMEOUT_MS {
*start = None;
return false;
}
let delay_ms = (1u64 << retries.min(10)).min(50);
thread::sleep(Duration::from_millis(delay_ms));
true
})
}