use std::collections::HashMap;
use fsqlite_types::sync_primitives::Mutex;
use fsqlite_types::{RowId, RowIdMode, SchemaEpoch, TableId};
use tracing::{debug, error, info, warn};
use crate::coordinator_ipc::{RowidReservePayload, RowidReserveResponse};
pub const DEFAULT_RANGE_SIZE: u32 = 64;
pub const SQLITE_FULL: u32 = 13;
pub const SQLITE_SCHEMA: u32 = 17;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct AllocatorKey {
pub schema_epoch: SchemaEpoch,
pub table_id: TableId,
}
#[derive(Debug, Clone)]
struct TableAllocatorState {
next_rowid: i64,
mode: RowIdMode,
autoincrement_high_water: i64,
}
impl TableAllocatorState {
fn new(max_committed_rowid: Option<RowId>, sqlite_sequence_seq: i64, mode: RowIdMode) -> Self {
let max_committed = max_committed_rowid.map_or(0, RowId::get);
let next = match mode {
RowIdMode::Normal => max_committed.saturating_add(1).max(1),
RowIdMode::AutoIncrement => {
let base = max_committed.max(sqlite_sequence_seq);
base.saturating_add(1).max(1)
}
};
info!(
max_committed_rowid = max_committed,
sqlite_sequence_seq,
next_rowid = next,
source = "durable_tip",
"allocator init from durable tip"
);
Self {
next_rowid: next,
mode,
autoincrement_high_water: if mode == RowIdMode::AutoIncrement {
sqlite_sequence_seq
} else {
0
},
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RowIdAllocError {
Exhausted,
SchemaMismatch {
requested: SchemaEpoch,
current: SchemaEpoch,
},
NotInitialized(AllocatorKey),
}
impl std::fmt::Display for RowIdAllocError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Exhausted => f.write_str("rowid space exhausted (SQLITE_FULL)"),
Self::SchemaMismatch { requested, current } => {
write!(
f,
"schema epoch mismatch: requested {}, current {} (SQLITE_SCHEMA)",
requested.get(),
current.get()
)
}
Self::NotInitialized(key) => {
write!(
f,
"table allocator not initialized: epoch={}, table={}",
key.schema_epoch.get(),
key.table_id.get()
)
}
}
}
}
impl std::error::Error for RowIdAllocError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RangeReservation {
pub start_rowid: RowId,
pub count: u32,
}
impl RangeReservation {
#[must_use]
pub fn end_rowid_inclusive(&self) -> RowId {
RowId::new(self.start_rowid.get() + i64::from(self.count) - 1)
}
}
#[derive(Debug, Clone)]
pub struct LocalRowIdCache {
next: i64,
remaining: u32,
key: AllocatorKey,
}
impl LocalRowIdCache {
#[must_use]
pub fn new(reservation: RangeReservation, key: AllocatorKey) -> Self {
Self {
next: reservation.start_rowid.get(),
remaining: reservation.count,
key,
}
}
pub fn allocate(&mut self) -> Option<RowId> {
if self.remaining > 0 {
let rowid = RowId::new(self.next);
self.next = self.next.wrapping_add(1);
self.remaining -= 1;
Some(rowid)
} else {
None
}
}
#[must_use]
pub const fn remaining(&self) -> u32 {
self.remaining
}
#[must_use]
pub const fn key(&self) -> AllocatorKey {
self.key
}
}
#[derive(Debug)]
pub struct ConcurrentRowIdAllocator {
current_epoch: Mutex<SchemaEpoch>,
tables: Mutex<HashMap<AllocatorKey, TableAllocatorState>>,
}
impl ConcurrentRowIdAllocator {
#[must_use]
pub fn new(current_epoch: SchemaEpoch) -> Self {
Self {
current_epoch: Mutex::new(current_epoch),
tables: Mutex::new(HashMap::new()),
}
}
pub fn init_table(
&self,
key: AllocatorKey,
max_committed_rowid: Option<RowId>,
sqlite_sequence_seq: i64,
mode: RowIdMode,
) {
let state = TableAllocatorState::new(max_committed_rowid, sqlite_sequence_seq, mode);
self.tables.lock().insert(key, state);
}
pub fn set_current_epoch(&self, epoch: SchemaEpoch) {
*self.current_epoch.lock() = epoch;
}
#[must_use]
pub fn current_epoch(&self) -> SchemaEpoch {
*self.current_epoch.lock()
}
pub fn reserve_range(
&self,
key: AllocatorKey,
count: u32,
) -> Result<RangeReservation, RowIdAllocError> {
if count == 0 {
return Ok(RangeReservation {
start_rowid: RowId::new(1),
count: 0,
});
}
let mut tables = self.tables.lock();
let state = tables
.get_mut(&key)
.ok_or(RowIdAllocError::NotInitialized(key))?;
let start = state.next_rowid;
if start < 1 {
error!(
attempted_next = start,
max_rowid = RowId::MAX.get(),
"MAX_ROWID saturation: SQLITE_FULL"
);
return Err(RowIdAllocError::Exhausted);
}
let count_i64 = i64::from(count);
let remaining_capacity = RowId::MAX.get() - start + 1;
if count_i64 > remaining_capacity {
error!(
attempted_next = start,
count,
remaining_capacity,
max_rowid = RowId::MAX.get(),
"MAX_ROWID saturation: SQLITE_FULL"
);
return Err(RowIdAllocError::Exhausted);
}
state.next_rowid = start.wrapping_add(count_i64);
if state.mode == RowIdMode::AutoIncrement {
let last_in_range = start + (count_i64 - 1);
state.autoincrement_high_water = state.autoincrement_high_water.max(last_in_range);
}
let next_after = state.next_rowid;
drop(tables);
debug!(
schema_epoch = key.schema_epoch.get(),
table_id = key.table_id.get(),
start_rowid = start,
count,
next_rowid_after = next_after,
"range reservation"
);
Ok(RangeReservation {
start_rowid: RowId::new(start),
count,
})
}
pub fn allocate_one(&self, key: AllocatorKey) -> Result<RowId, RowIdAllocError> {
self.reserve_range(key, 1).map(|r| r.start_rowid)
}
pub fn bump_explicit(
&self,
key: AllocatorKey,
explicit_rowid: RowId,
) -> Result<(), RowIdAllocError> {
let mut tables = self.tables.lock();
let state = tables
.get_mut(&key)
.ok_or(RowIdAllocError::NotInitialized(key))?;
let r = explicit_rowid.get();
let before = state.next_rowid;
if state.next_rowid < 1 {
return Err(RowIdAllocError::Exhausted);
}
if r >= state.next_rowid {
state.next_rowid = r.wrapping_add(1);
}
if state.mode == RowIdMode::AutoIncrement {
state.autoincrement_high_water = state.autoincrement_high_water.max(r);
}
let next_after = state.next_rowid;
drop(tables);
info!(
explicit_rowid = r,
allocator_next_before = before,
allocator_next_after = next_after,
"bump-on-explicit-rowid"
);
Ok(())
}
#[must_use]
pub fn autoincrement_high_water(&self, key: &AllocatorKey) -> Option<i64> {
self.tables
.lock()
.get(key)
.map(|s| s.autoincrement_high_water)
}
pub fn handle_rowid_reserve(&self, payload: &RowidReservePayload) -> RowidReserveResponse {
let current_epoch = self.current_epoch();
let requested_epoch = SchemaEpoch::new(payload.schema_epoch);
if requested_epoch != current_epoch {
warn!(
requested_epoch = requested_epoch.get(),
current_epoch = current_epoch.get(),
"schema epoch mismatch on ROWID_RESERVE"
);
return RowidReserveResponse::Err {
code: SQLITE_SCHEMA,
};
}
let key = AllocatorKey {
schema_epoch: requested_epoch,
table_id: TableId::new(payload.table_id),
};
match self.reserve_range(key, payload.count) {
Ok(reservation) => {
#[allow(clippy::cast_sign_loss)]
let start = reservation.start_rowid.get() as u64;
RowidReserveResponse::Ok {
start_rowid: start,
count: reservation.count,
}
}
Err(RowIdAllocError::Exhausted | RowIdAllocError::NotInitialized(_)) => {
RowidReserveResponse::Err { code: SQLITE_FULL }
}
Err(RowIdAllocError::SchemaMismatch { .. }) => RowidReserveResponse::Err {
code: SQLITE_SCHEMA,
},
}
}
#[must_use]
pub fn is_initialized(&self, key: &AllocatorKey) -> bool {
self.tables.lock().contains_key(key)
}
#[must_use]
pub fn next_rowid(&self, key: &AllocatorKey) -> Option<i64> {
self.tables.lock().get(key).map(|s| s.next_rowid)
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use super::*;
fn epoch(n: u64) -> SchemaEpoch {
SchemaEpoch::new(n)
}
fn table(n: u32) -> TableId {
TableId::new(n)
}
fn key(e: u64, t: u32) -> AllocatorKey {
AllocatorKey {
schema_epoch: epoch(e),
table_id: table(t),
}
}
#[test]
fn test_rowid_allocator_basic() {
let alloc = ConcurrentRowIdAllocator::new(epoch(1));
let k = key(1, 42);
alloc.init_table(k, Some(RowId::new(5)), 0, RowIdMode::Normal);
let mut ids = Vec::new();
for _ in 0..10 {
ids.push(alloc.allocate_one(k).unwrap());
}
assert_eq!(ids[0].get(), 6);
for window in ids.windows(2) {
assert!(window[1].get() > window[0].get(), "must be monotone");
}
assert_eq!(ids[9].get(), 15);
}
#[test]
fn test_rowid_allocator_concurrent_uniqueness() {
use std::sync::Arc;
let alloc = Arc::new(ConcurrentRowIdAllocator::new(epoch(1)));
let k = key(1, 100);
alloc.init_table(k, None, 0, RowIdMode::Normal);
let mut handles = Vec::new();
for _ in 0..2 {
let alloc = Arc::clone(&alloc);
handles.push(std::thread::spawn(move || {
let mut ids = Vec::with_capacity(100);
for _ in 0..100 {
ids.push(alloc.allocate_one(k).unwrap());
}
ids
}));
}
let mut all_ids = HashSet::new();
for h in handles {
for id in h.join().unwrap() {
assert!(all_ids.insert(id.get()), "duplicate rowid {}", id.get());
}
}
assert_eq!(all_ids.len(), 200);
}
#[test]
fn test_rowid_allocator_gap_on_abort() {
let alloc = ConcurrentRowIdAllocator::new(epoch(1));
let k = key(1, 1);
alloc.init_table(k, None, 0, RowIdMode::Normal);
let first_batch: Vec<_> = (0..5).map(|_| alloc.allocate_one(k).unwrap()).collect();
assert_eq!(first_batch[0].get(), 1);
assert_eq!(first_batch[4].get(), 5);
let next = alloc.allocate_one(k).unwrap();
assert_eq!(next.get(), 6, "must not reuse aborted rowids");
}
#[test]
fn test_rowid_bump_on_explicit() {
let alloc = ConcurrentRowIdAllocator::new(epoch(1));
let k = key(1, 1);
alloc.init_table(k, None, 0, RowIdMode::Normal);
let _ = alloc.allocate_one(k).unwrap(); let _ = alloc.allocate_one(k).unwrap();
alloc.bump_explicit(k, RowId::new(1000)).unwrap();
let next = alloc.allocate_one(k).unwrap();
assert!(next.get() >= 1001, "got {}", next.get());
}
#[test]
fn test_rowid_autoincrement_init() {
let alloc = ConcurrentRowIdAllocator::new(epoch(1));
let k = key(1, 1);
alloc.init_table(k, Some(RowId::new(400)), 500, RowIdMode::AutoIncrement);
let r = alloc.allocate_one(k).unwrap();
assert_eq!(r.get(), 501);
}
#[test]
fn test_rowid_autoincrement_persist() {
let alloc = ConcurrentRowIdAllocator::new(epoch(1));
let k = key(1, 1);
alloc.init_table(k, None, 0, RowIdMode::AutoIncrement);
for _ in 0..5 {
let _ = alloc.allocate_one(k).unwrap();
}
let hw = alloc.autoincrement_high_water(&k).unwrap();
assert!(hw >= 5, "high_water={hw}, expected >= 5");
}
#[test]
fn test_rowid_autoincrement_monotone_max_merge() {
let alloc = ConcurrentRowIdAllocator::new(epoch(1));
let k = key(1, 1);
alloc.init_table(k, None, 0, RowIdMode::AutoIncrement);
let range_a = alloc.reserve_range(k, 10).unwrap();
assert_eq!(range_a.start_rowid.get(), 1);
let range_b = alloc.reserve_range(k, 20).unwrap();
assert_eq!(range_b.start_rowid.get(), 11);
let hw = alloc.autoincrement_high_water(&k).unwrap();
assert_eq!(hw, 30, "monotone max must reflect both txns");
}
#[test]
fn test_rowid_range_reservation() {
let alloc = ConcurrentRowIdAllocator::new(epoch(1));
let k = key(1, 1);
alloc.init_table(k, Some(RowId::new(100)), 0, RowIdMode::Normal);
let range = alloc.reserve_range(k, 64).unwrap();
assert_eq!(range.start_rowid.get(), 101);
assert_eq!(range.count, 64);
assert_eq!(range.end_rowid_inclusive().get(), 164);
let mut cache = LocalRowIdCache::new(range, k);
assert_eq!(cache.remaining(), 64);
let first = cache.allocate().unwrap();
assert_eq!(first.get(), 101);
assert_eq!(cache.remaining(), 63);
for _ in 0..63 {
assert!(cache.allocate().is_some());
}
assert_eq!(cache.remaining(), 0);
assert!(cache.allocate().is_none(), "cache must be exhausted");
let range2 = alloc.reserve_range(k, 32).unwrap();
assert_eq!(range2.start_rowid.get(), 165);
}
#[test]
fn test_rowid_max_saturation() {
let alloc = ConcurrentRowIdAllocator::new(epoch(1));
let k = key(1, 1);
alloc.init_table(k, Some(RowId::new(i64::MAX - 5)), 0, RowIdMode::Normal);
for i in 0..5 {
let r = alloc.allocate_one(k);
assert!(r.is_ok(), "allocation {i} should succeed");
}
let r = alloc.allocate_one(k);
assert_eq!(r, Err(RowIdAllocError::Exhausted));
alloc.init_table(k, Some(RowId::new(i64::MAX - 10)), 0, RowIdMode::Normal);
let r = alloc.reserve_range(k, 20);
assert_eq!(r, Err(RowIdAllocError::Exhausted));
let r = alloc.reserve_range(k, 10);
assert!(r.is_ok());
}
#[test]
fn test_rowid_schema_epoch_mismatch() {
let alloc = ConcurrentRowIdAllocator::new(epoch(5));
let k = key(5, 1);
alloc.init_table(k, None, 0, RowIdMode::Normal);
let payload = RowidReservePayload {
txn: crate::coordinator_ipc::WireTxnToken {
txn_id: 1,
txn_epoch: 1,
},
schema_epoch: 3,
table_id: 1,
count: 10,
};
let resp = alloc.handle_rowid_reserve(&payload);
assert_eq!(
resp,
RowidReserveResponse::Err {
code: SQLITE_SCHEMA
}
);
let payload_ok = RowidReservePayload {
txn: crate::coordinator_ipc::WireTxnToken {
txn_id: 1,
txn_epoch: 1,
},
schema_epoch: 5,
table_id: 1,
count: 10,
};
let resp_ok = alloc.handle_rowid_reserve(&payload_ok);
assert!(matches!(resp_ok, RowidReserveResponse::Ok { .. }));
}
#[test]
fn test_rowid_coordinator_restart_init() {
let k = key(1, 1);
let alloc1 = ConcurrentRowIdAllocator::new(epoch(1));
alloc1.init_table(k, Some(RowId::new(100)), 0, RowIdMode::Normal);
for _ in 0..50 {
let _ = alloc1.allocate_one(k).unwrap();
}
let alloc2 = ConcurrentRowIdAllocator::new(epoch(1));
alloc2.init_table(k, Some(RowId::new(150)), 0, RowIdMode::Normal);
let r = alloc2.allocate_one(k).unwrap();
assert_eq!(
r.get(),
151,
"re-init from durable tip must be non-duplicate"
);
let ka = key(1, 2);
alloc2.init_table(ka, Some(RowId::new(80)), 200, RowIdMode::AutoIncrement);
let ra = alloc2.allocate_one(ka).unwrap();
assert_eq!(
ra.get(),
201,
"AUTOINCREMENT re-init: max(80, 200) + 1 = 201"
);
}
#[test]
fn test_handle_rowid_reserve_ok() {
let alloc = ConcurrentRowIdAllocator::new(epoch(1));
let k = key(1, 42);
alloc.init_table(k, Some(RowId::new(10)), 0, RowIdMode::Normal);
let payload = RowidReservePayload {
txn: crate::coordinator_ipc::WireTxnToken {
txn_id: 99,
txn_epoch: 1,
},
schema_epoch: 1,
table_id: 42,
count: 32,
};
let resp = alloc.handle_rowid_reserve(&payload);
match resp {
RowidReserveResponse::Ok { start_rowid, count } => {
assert_eq!(start_rowid, 11);
assert_eq!(count, 32);
}
RowidReserveResponse::Err { code } => panic!("unexpected error: {code}"),
}
}
#[test]
fn test_empty_range_reservation() {
let alloc = ConcurrentRowIdAllocator::new(epoch(1));
let k = key(1, 1);
alloc.init_table(k, None, 0, RowIdMode::Normal);
let r = alloc.reserve_range(k, 0).unwrap();
assert_eq!(r.count, 0);
let next = alloc.allocate_one(k).unwrap();
assert_eq!(next.get(), 1);
}
#[test]
fn test_bump_explicit_no_retreat() {
let alloc = ConcurrentRowIdAllocator::new(epoch(1));
let k = key(1, 1);
alloc.init_table(k, Some(RowId::new(1000)), 0, RowIdMode::Normal);
alloc.bump_explicit(k, RowId::new(500)).unwrap();
let next = alloc.allocate_one(k).unwrap();
assert_eq!(next.get(), 1001, "bump below current must not retreat");
}
#[test]
fn test_not_initialized_error() {
let alloc = ConcurrentRowIdAllocator::new(epoch(1));
let k = key(1, 999);
let r = alloc.allocate_one(k);
assert_eq!(r, Err(RowIdAllocError::NotInitialized(k)));
let r = alloc.bump_explicit(k, RowId::new(5));
assert_eq!(r, Err(RowIdAllocError::NotInitialized(k)));
}
#[test]
fn test_bump_explicit_at_max_rowid() {
let alloc = ConcurrentRowIdAllocator::new(epoch(1));
let k = key(1, 1);
alloc.init_table(k, None, 0, RowIdMode::Normal);
alloc.bump_explicit(k, RowId::MAX).unwrap();
let r = alloc.allocate_one(k);
assert_eq!(r, Err(RowIdAllocError::Exhausted));
}
}