use serde::{Deserialize, Serialize};
#[derive(
Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
)]
pub struct RangeAllocationRequest {
pub tenant_id: u64,
pub sequence_name: String,
pub chunk_size: i64,
pub epoch: u64,
}
#[derive(
Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
)]
pub struct GapFreeAdvanceRequest {
pub tenant_id: u64,
pub sequence_name: String,
pub reserved_value: i64,
pub epoch: u64,
}
#[derive(
Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
)]
pub struct RangeAllocationResponse {
pub range_start: i64,
pub range_end: i64,
pub epoch: u64,
}
pub struct RangeAllocator {
pub default_chunk_size: i64,
}
impl RangeAllocator {
pub fn new(default_chunk_size: i64) -> Self {
Self { default_chunk_size }
}
pub fn allocate_chunk(
&self,
state: &crate::control::state::SharedState,
tenant_id: u64,
sequence_name: &str,
increment: i64,
epoch: u64,
) -> Result<RangeAllocationResponse, crate::Error> {
let chunk_size = self.default_chunk_size;
if let Some(proposer) = state.raft_proposer.get() {
let request = RangeAllocationRequest {
tenant_id,
sequence_name: sequence_name.to_string(),
chunk_size,
epoch,
};
let payload =
zerompk::to_msgpack_vec(&request).map_err(|e| crate::Error::Serialization {
format: "msgpack".into(),
detail: format!("range allocation request: {e}"),
})?;
let (_group_id, _log_index) =
proposer(0, payload).map_err(|e| crate::Error::Dispatch {
detail: format!("sequence range allocation raft propose: {e}"),
})?;
let current = state
.sequence_registry
.get_def(tenant_id, sequence_name)
.map(|d| d.start_value)
.unwrap_or(1);
let range_start = current;
let range_end = if increment > 0 {
current + chunk_size * increment - increment
} else {
current + chunk_size * increment + increment.abs()
};
return Ok(RangeAllocationResponse {
range_start,
range_end,
epoch,
});
}
let handle_exists = state.sequence_registry.exists(tenant_id, sequence_name);
if !handle_exists {
return Err(crate::Error::BadRequest {
detail: format!("sequence \"{sequence_name}\" does not exist"),
});
}
let current_val = state
.sequence_registry
.currval(tenant_id, sequence_name)
.unwrap_or(0);
let range_start = current_val + increment;
let range_end = range_start + (chunk_size - 1) * increment;
Ok(RangeAllocationResponse {
range_start,
range_end,
epoch,
})
}
pub fn propose_gap_free_advance(
&self,
state: &crate::control::state::SharedState,
tenant_id: u64,
sequence_name: &str,
reserved_value: i64,
epoch: u64,
) -> Result<(), crate::Error> {
let Some(proposer) = state.raft_proposer.get() else {
return Ok(());
};
let request = GapFreeAdvanceRequest {
tenant_id,
sequence_name: sequence_name.to_string(),
reserved_value,
epoch,
};
let payload =
zerompk::to_msgpack_vec(&request).map_err(|e| crate::Error::Serialization {
format: "msgpack".into(),
detail: format!("gap-free advance request: {e}"),
})?;
proposer(0, payload).map_err(|e| crate::Error::Dispatch {
detail: format!("gap-free advance raft propose: {e}"),
})?;
Ok(())
}
}
impl Default for RangeAllocator {
fn default() -> Self {
Self::new(10_000)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn request_roundtrip() {
let req = RangeAllocationRequest {
tenant_id: 1,
sequence_name: "order_seq".into(),
chunk_size: 10_000,
epoch: 1,
};
let bytes = zerompk::to_msgpack_vec(&req).unwrap();
let decoded: RangeAllocationRequest = zerompk::from_msgpack(&bytes).unwrap();
assert_eq!(decoded.sequence_name, "order_seq");
assert_eq!(decoded.chunk_size, 10_000);
}
#[test]
fn response_roundtrip() {
let resp = RangeAllocationResponse {
range_start: 1,
range_end: 10_000,
epoch: 1,
};
let bytes = zerompk::to_msgpack_vec(&resp).unwrap();
let decoded: RangeAllocationResponse = zerompk::from_msgpack(&bytes).unwrap();
assert_eq!(decoded.range_start, 1);
assert_eq!(decoded.range_end, 10_000);
}
}