use std::sync::OnceLock;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum RmgrId {
Heap = 10,
Btree = 11,
Vector = 12,
Graph = 13,
Timeseries = 14,
Queue = 15,
Document = 16,
Kv = 17,
Probabilistic = 18,
Transaction = 50,
Checkpoint = 51,
Replication = 52,
Reserved = 255,
}
impl RmgrId {
pub fn from_u8(b: u8) -> Option<Self> {
match b {
10 => Some(Self::Heap),
11 => Some(Self::Btree),
12 => Some(Self::Vector),
13 => Some(Self::Graph),
14 => Some(Self::Timeseries),
15 => Some(Self::Queue),
16 => Some(Self::Document),
17 => Some(Self::Kv),
18 => Some(Self::Probabilistic),
50 => Some(Self::Transaction),
51 => Some(Self::Checkpoint),
52 => Some(Self::Replication),
255 => Some(Self::Reserved),
_ => None,
}
}
pub fn to_u8(self) -> u8 {
self as u8
}
pub fn name(self) -> &'static str {
match self {
Self::Heap => "heap",
Self::Btree => "btree",
Self::Vector => "vector",
Self::Graph => "graph",
Self::Timeseries => "timeseries",
Self::Queue => "queue",
Self::Document => "document",
Self::Kv => "kv",
Self::Probabilistic => "probabilistic",
Self::Transaction => "transaction",
Self::Checkpoint => "checkpoint",
Self::Replication => "replication",
Self::Reserved => "reserved",
}
}
}
#[derive(Debug)]
pub enum RmgrError {
Unregistered(RmgrId),
SubsystemFailure { rmgr: RmgrId, message: String },
}
impl std::fmt::Display for RmgrError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Unregistered(id) => write!(f, "no resource manager for {}", id.name()),
Self::SubsystemFailure { rmgr, message } => {
write!(f, "{} rmgr failed: {message}", rmgr.name())
}
}
}
}
impl std::error::Error for RmgrError {}
pub trait ResourceManager: Send + Sync {
fn redo(&self, record: &[u8]) -> Result<(), RmgrError>;
fn undo(&self, _record: &[u8]) -> Result<(), RmgrError> {
Ok(())
}
fn desc(&self, record: &[u8]) -> String {
format!("({} bytes opaque)", record.len())
}
fn name(&self) -> &'static str;
}
static REGISTRY: OnceLock<RmgrRegistry> = OnceLock::new();
pub struct RmgrRegistry {
table: Vec<Option<Box<dyn ResourceManager>>>,
}
impl RmgrRegistry {
pub fn with_capacity(capacity: usize) -> Self {
let mut table = Vec::with_capacity(capacity);
for _ in 0..capacity {
table.push(None);
}
Self { table }
}
pub fn register(mut self, id: RmgrId, rmgr: Box<dyn ResourceManager>) -> Self {
let idx = id.to_u8() as usize;
if idx >= self.table.len() {
self.table.resize_with(idx + 1, || None);
}
self.table[idx] = Some(rmgr);
self
}
pub fn get(&self, id: RmgrId) -> Option<&dyn ResourceManager> {
self.table
.get(id.to_u8() as usize)
.and_then(|slot| slot.as_deref())
}
pub fn dispatch_redo(&self, record: &[u8]) -> Result<(), RmgrError> {
let id_byte = record.first().copied().unwrap_or(0);
let id = RmgrId::from_u8(id_byte).ok_or(RmgrError::Unregistered(RmgrId::Reserved))?;
match self.get(id) {
Some(rmgr) => rmgr.redo(record),
None => Err(RmgrError::Unregistered(id)),
}
}
}
pub fn install(registry: RmgrRegistry) -> Result<(), RmgrError> {
REGISTRY
.set(registry)
.map_err(|_| RmgrError::SubsystemFailure {
rmgr: RmgrId::Reserved,
message: "rmgr registry already installed".to_string(),
})
}
pub fn registry() -> Option<&'static RmgrRegistry> {
REGISTRY.get()
}
pub fn dispatch_redo(record: &[u8]) -> Result<(), RmgrError> {
match registry() {
Some(reg) => reg.dispatch_redo(record),
None => Err(RmgrError::SubsystemFailure {
rmgr: RmgrId::Reserved,
message: "rmgr registry not installed".to_string(),
}),
}
}