use std::collections::HashMap;
use std::sync::RwLock;
use crate::control::security::catalog::sequence_types::{SequenceState, StoredSequence};
use crate::control::security::catalog::types::SystemCatalog;
use super::format::{self, FormatContext, ResetScope};
use super::gap_free::GapFreeManager;
use super::types::{SequenceError, SequenceHandle};
pub struct SequenceRegistry {
sequences: RwLock<HashMap<String, SequenceHandle>>,
gap_free: GapFreeManager,
}
impl SequenceRegistry {
pub fn new() -> Self {
Self {
sequences: RwLock::new(HashMap::new()),
gap_free: GapFreeManager::new(),
}
}
pub fn gap_free_manager(&self) -> &GapFreeManager {
&self.gap_free
}
pub fn load_from_catalog(&self, catalog: &SystemCatalog) {
let all_defs = match catalog.load_all_sequences() {
Ok(defs) => defs,
Err(e) => {
tracing::warn!(error = %e, "failed to load sequences from catalog");
return;
}
};
let mut map = self.sequences.write().unwrap_or_else(|p| p.into_inner());
for def in all_defs {
let key = registry_key(def.tenant_id, &def.name);
let state = catalog
.get_sequence_state(def.tenant_id, &def.name)
.ok()
.flatten();
map.insert(key, SequenceHandle::new(def, state));
}
}
pub fn create(&self, def: StoredSequence) -> Result<(), SequenceError> {
let key = registry_key(def.tenant_id, &def.name);
let mut map = self.sequences.write().unwrap_or_else(|p| p.into_inner());
if map.contains_key(&key) {
return Err(SequenceError::AlreadyExists {
name: def.name.clone(),
});
}
map.insert(key, SequenceHandle::new(def, None));
Ok(())
}
pub fn remove(&self, tenant_id: u64, name: &str) -> Result<(), SequenceError> {
let key = registry_key(tenant_id, name);
let mut map = self.sequences.write().unwrap_or_else(|p| p.into_inner());
if map.remove(&key).is_none() {
return Err(SequenceError::NotFound {
name: name.to_string(),
});
}
Ok(())
}
pub fn nextval(&self, tenant_id: u64, name: &str) -> Result<i64, SequenceError> {
let key = registry_key(tenant_id, name);
let map = self.sequences.read().unwrap_or_else(|p| p.into_inner());
let handle = map.get(&key).ok_or_else(|| SequenceError::NotFound {
name: name.to_string(),
})?;
self.check_period_reset(handle);
handle.nextval()
}
pub fn nextval_formatted(
&self,
tenant_id: u64,
name: &str,
tenant_code: &str,
session_vars: &std::collections::HashMap<String, String>,
) -> Result<SequenceValue, SequenceError> {
let key = registry_key(tenant_id, name);
let map = self.sequences.read().unwrap_or_else(|p| p.into_inner());
let handle = map.get(&key).ok_or_else(|| SequenceError::NotFound {
name: name.to_string(),
})?;
self.check_period_reset(handle);
let raw = handle.nextval()?;
match &handle.def.format_template {
Some(tokens) => {
let ctx = FormatContext::now(raw, tenant_code, session_vars);
let formatted = format::format_sequence_value(tokens, &ctx);
Ok(SequenceValue::Formatted(formatted))
}
None => Ok(SequenceValue::Int(raw)),
}
}
pub fn next_preview(
&self,
tenant_id: u64,
name: &str,
tenant_code: &str,
session_vars: &std::collections::HashMap<String, String>,
) -> Result<SequenceValue, SequenceError> {
let key = registry_key(tenant_id, name);
let map = self.sequences.read().unwrap_or_else(|p| p.into_inner());
let handle = map.get(&key).ok_or_else(|| SequenceError::NotFound {
name: name.to_string(),
})?;
let next_raw = handle.current_value() + handle.def.increment;
match &handle.def.format_template {
Some(tokens) => {
let ctx = FormatContext::now(next_raw, tenant_code, session_vars);
let formatted = format::format_sequence_value(tokens, &ctx);
Ok(SequenceValue::Formatted(formatted))
}
None => Ok(SequenceValue::Int(next_raw)),
}
}
fn check_period_reset(&self, handle: &SequenceHandle) {
if handle.def.reset_scope == ResetScope::Never {
return;
}
let dt = nodedb_types::NdbDateTime::now();
let c = dt.components();
let new_pk =
format::compute_period_key(&handle.def.reset_scope, c.year as u16, c.month, c.day);
handle.check_period_reset(&new_pk);
}
pub fn nextval_batch(
&self,
tenant_id: u64,
name: &str,
n: usize,
) -> Result<Vec<i64>, SequenceError> {
let key = registry_key(tenant_id, name);
let map = self.sequences.read().unwrap_or_else(|p| p.into_inner());
let handle = map.get(&key).ok_or_else(|| SequenceError::NotFound {
name: name.to_string(),
})?;
handle.nextval_batch(n)
}
pub fn currval(&self, tenant_id: u64, name: &str) -> Result<i64, SequenceError> {
let key = registry_key(tenant_id, name);
let map = self.sequences.read().unwrap_or_else(|p| p.into_inner());
let handle = map.get(&key).ok_or_else(|| SequenceError::NotFound {
name: name.to_string(),
})?;
handle.currval()
}
pub fn setval(&self, tenant_id: u64, name: &str, value: i64) -> Result<i64, SequenceError> {
let key = registry_key(tenant_id, name);
let map = self.sequences.read().unwrap_or_else(|p| p.into_inner());
let handle = map.get(&key).ok_or_else(|| SequenceError::NotFound {
name: name.to_string(),
})?;
handle.setval(value)
}
pub fn restart(
&self,
tenant_id: u64,
name: &str,
restart_value: i64,
) -> Result<(), SequenceError> {
let key = registry_key(tenant_id, name);
let map = self.sequences.read().unwrap_or_else(|p| p.into_inner());
let handle = map.get(&key).ok_or_else(|| SequenceError::NotFound {
name: name.to_string(),
})?;
handle.setval(restart_value)?;
Ok(())
}
pub fn list(&self, tenant_id: u64) -> Vec<(String, i64, bool)> {
let prefix = format!("{tenant_id}:");
let map = self.sequences.read().unwrap_or_else(|p| p.into_inner());
map.iter()
.filter(|(k, _)| k.starts_with(&prefix))
.map(|(_, handle)| {
(
handle.def.name.clone(),
handle.current_value(),
handle.is_called(),
)
})
.collect()
}
pub fn persist_all(&self, catalog: &SystemCatalog) {
let map = self.sequences.read().unwrap_or_else(|p| p.into_inner());
for (_, handle) in map.iter() {
let state = SequenceState {
tenant_id: handle.def.tenant_id,
name: handle.def.name.clone(),
current_value: handle.current_value(),
is_called: handle.is_called(),
epoch: handle.def.epoch,
period_key: handle.period_key(),
};
if let Err(e) = catalog.put_sequence_state(&state) {
tracing::warn!(
sequence = %handle.def.name,
error = %e,
"failed to persist sequence state"
);
}
}
}
pub fn sequences_read(
&self,
) -> std::sync::RwLockReadGuard<'_, HashMap<String, SequenceHandle>> {
self.sequences.read().unwrap_or_else(|p| p.into_inner())
}
pub fn exists(&self, tenant_id: u64, name: &str) -> bool {
let key = registry_key(tenant_id, name);
let map = self.sequences.read().unwrap_or_else(|p| p.into_inner());
map.contains_key(&key)
}
pub fn get_def(&self, tenant_id: u64, name: &str) -> Option<StoredSequence> {
let key = registry_key(tenant_id, name);
let map = self.sequences.read().unwrap_or_else(|p| p.into_inner());
map.get(&key).map(|h| h.def.clone())
}
pub fn restart_sequences_for_collection(&self, tenant_id: u64, collection: &str) {
let prefix = format!("{tenant_id}:{collection}_");
let suffix = "_seq";
let map = self.sequences.read().unwrap_or_else(|p| p.into_inner());
for (key, handle) in map.iter() {
if key.starts_with(&prefix) && handle.def.name.ends_with(suffix) {
let start = handle.def.start_value;
if let Err(e) = handle.setval(start) {
tracing::warn!(
sequence = %handle.def.name,
error = %e,
"failed to restart sequence during TRUNCATE RESTART IDENTITY"
);
}
}
}
}
}
impl Default for SequenceRegistry {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub enum SequenceValue {
Int(i64),
Formatted(String),
}
fn registry_key(tenant_id: u64, name: &str) -> String {
format!("{tenant_id}:{name}")
}