use crate::error::Result;
use noxu_db::{Database, DatabaseEntry, OperationStatus};
use std::sync::atomic::{AtomicU64, Ordering};
pub struct Sequence<'db> {
db: &'db Database,
key: Vec<u8>,
current: AtomicU64,
cached_limit: AtomicU64,
cache_size: u64,
}
impl<'db> Sequence<'db> {
pub fn new(db: &'db Database, name: &str) -> Result<Self> {
Self::with_cache_size(db, name, 100)
}
pub fn with_cache_size(
db: &'db Database,
name: &str,
cache_size: u64,
) -> Result<Self> {
let key = format!("seq:{}", name).into_bytes();
let key_entry = DatabaseEntry::from_bytes(&key);
let mut data_entry = DatabaseEntry::new();
let initial_value = match db.get(None, &key_entry, &mut data_entry)? {
OperationStatus::Success => {
let bytes = data_entry.data();
if bytes.len() >= 8 {
let mut arr = [0u8; 8];
arr.copy_from_slice(&bytes[..8]);
u64::from_be_bytes(arr)
} else {
1
}
}
_ => 1,
};
let limit = initial_value + cache_size;
let limit_entry = DatabaseEntry::from_bytes(&limit.to_be_bytes());
db.put(None, &key_entry, &limit_entry)?;
Ok(Self {
db,
key,
current: AtomicU64::new(initial_value),
cached_limit: AtomicU64::new(limit),
cache_size,
})
}
pub fn next(&self) -> Result<u64> {
loop {
let val = self.current.fetch_add(1, Ordering::Relaxed);
let limit = self.cached_limit.load(Ordering::Acquire);
if val < limit {
return Ok(val);
}
let new_limit = val + self.cache_size;
match self.cached_limit.compare_exchange(
limit,
new_limit,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
let key_entry = DatabaseEntry::from_bytes(&self.key);
let limit_entry =
DatabaseEntry::from_bytes(&new_limit.to_be_bytes());
self.db.put(None, &key_entry, &limit_entry)?;
return Ok(val);
}
Err(_) => {
continue;
}
}
}
}
pub fn current(&self) -> u64 {
let val = self.current.load(Ordering::Relaxed);
if val == 0 { 0 } else { val.saturating_sub(1) }
}
pub fn cache_size(&self) -> u64 {
self.cache_size
}
pub fn key(&self) -> &[u8] {
&self.key
}
}
#[derive(Debug)]
pub struct MemorySequence {
current: AtomicU64,
}
impl MemorySequence {
pub fn new() -> Self {
Self { current: AtomicU64::new(1) }
}
pub fn starting_at(start: u64) -> Self {
Self { current: AtomicU64::new(start) }
}
pub fn next(&self) -> u64 {
self.current.fetch_add(1, Ordering::Relaxed)
}
pub fn peek(&self) -> u64 {
self.current.load(Ordering::Relaxed)
}
}
impl Default for MemorySequence {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use noxu_db::{DatabaseConfig, Environment, EnvironmentConfig};
use tempfile::TempDir;
fn setup_db() -> (TempDir, Environment, Database) {
let temp_dir = TempDir::new().unwrap();
let env_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
.with_allow_create(true)
.with_transactional(false);
let env = Environment::open(env_config).unwrap();
let db_config = DatabaseConfig::new().with_allow_create(true);
let db = env.open_database(None, "seq_db", &db_config).unwrap();
(temp_dir, env, db)
}
#[test]
fn test_sequence_starts_at_one() {
let (_td, _env, db) = setup_db();
let seq = Sequence::new(&db, "test").unwrap();
assert_eq!(seq.next().unwrap(), 1);
}
#[test]
fn test_sequence_increments() {
let (_td, _env, db) = setup_db();
let seq = Sequence::new(&db, "test").unwrap();
assert_eq!(seq.next().unwrap(), 1);
assert_eq!(seq.next().unwrap(), 2);
assert_eq!(seq.next().unwrap(), 3);
}
#[test]
fn test_sequence_current() {
let (_td, _env, db) = setup_db();
let seq = Sequence::new(&db, "test").unwrap();
seq.next().unwrap();
seq.next().unwrap();
let cur = seq.current();
assert!(cur >= 1);
}
#[test]
fn test_sequence_cache_size() {
let (_td, _env, db) = setup_db();
let seq = Sequence::with_cache_size(&db, "test", 10).unwrap();
assert_eq!(seq.cache_size(), 10);
for expected in 1..=10 {
assert_eq!(seq.next().unwrap(), expected);
}
}
#[test]
fn test_sequence_exceeds_cache() {
let (_td, _env, db) = setup_db();
let seq = Sequence::with_cache_size(&db, "test", 5).unwrap();
for expected in 1..=12 {
assert_eq!(seq.next().unwrap(), expected);
}
}
#[test]
fn test_sequence_key() {
let (_td, _env, db) = setup_db();
let seq = Sequence::new(&db, "my_seq").unwrap();
assert_eq!(seq.key(), b"seq:my_seq");
}
#[test]
fn test_multiple_sequences_independent() {
let (_td, _env, db) = setup_db();
let seq1 = Sequence::new(&db, "seq1").unwrap();
let seq2 = Sequence::new(&db, "seq2").unwrap();
assert_eq!(seq1.next().unwrap(), 1);
assert_eq!(seq2.next().unwrap(), 1);
assert_eq!(seq1.next().unwrap(), 2);
assert_eq!(seq2.next().unwrap(), 2);
}
#[test]
fn test_memory_sequence_starts_at_one() {
let seq = MemorySequence::new();
assert_eq!(seq.next(), 1);
}
#[test]
fn test_memory_sequence_increments() {
let seq = MemorySequence::new();
assert_eq!(seq.next(), 1);
assert_eq!(seq.next(), 2);
assert_eq!(seq.next(), 3);
}
#[test]
fn test_memory_sequence_starting_at() {
let seq = MemorySequence::starting_at(100);
assert_eq!(seq.next(), 100);
assert_eq!(seq.next(), 101);
}
#[test]
fn test_memory_sequence_peek() {
let seq = MemorySequence::new();
assert_eq!(seq.peek(), 1);
seq.next();
assert_eq!(seq.peek(), 2);
}
#[test]
fn test_memory_sequence_default() {
let seq = MemorySequence::default();
assert_eq!(seq.next(), 1);
}
}