use bytes::Bytes;
use uintn::UintN;
pub type SubscriberCallback = Box<dyn Fn(&[(UintN, Bytes)]) -> bool + Send + Sync>;
#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct QueueId {
path: String,
}
impl std::fmt::Debug for QueueId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.path)
}
}
impl std::fmt::Display for QueueId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.path)
}
}
impl QueueId {
pub fn as_str(&self) -> &str {
&self.path
}
pub fn to_key_derivation_base(&self) -> &str {
&self.path
}
pub fn to_cloud_queue_path(&self, base_prefix: &str) -> String {
format!("{}{}/", base_prefix, self.path.trim_start_matches('/'))
}
fn to_fs_path(&self, prefix: &std::path::Path) -> std::path::PathBuf {
let relative = self.path.trim_start_matches('/');
prefix.join(relative)
}
pub fn to_store_dir(&self, base: &std::path::Path) -> std::path::PathBuf {
self.to_fs_path(base).join("store")
}
pub fn to_store_path(&self, base: &std::path::Path, file_id: &UintN) -> std::path::PathBuf {
let queue_path = self.to_store_dir(base);
file_id.to_file_path(&queue_path.to_string_lossy(), "store")
}
pub fn to_wal_dir(&self, base: &std::path::Path) -> std::path::PathBuf {
self.to_fs_path(base).join("wal")
}
pub fn to_wal_path(&self, base: &std::path::Path, file_id: &UintN) -> std::path::PathBuf {
let queue_path = self.to_wal_dir(base);
file_id.to_file_path(&queue_path.to_string_lossy(), "wal")
}
pub fn to_cloud_key(&self, base_prefix: &str, file_id: &UintN) -> String {
let queue_path = self.path.trim_start_matches('/');
let file_path_buf = file_id.to_file_path("", "store");
let file_path = file_path_buf.to_str().unwrap_or("").trim_start_matches('/');
if base_prefix.is_empty() {
format!("{}/{}", queue_path, file_path)
} else {
format!("{}{}/{}", base_prefix, queue_path, file_path)
}
}
}
#[derive(Debug, Clone)]
pub struct QueueIdResolver {
instance_id: String,
}
impl QueueIdResolver {
pub fn new(instance_id: impl Into<String>) -> Self {
Self {
instance_id: instance_id.into(),
}
}
pub fn instance_id(&self) -> &str {
&self.instance_id
}
pub fn resolve(&self, path: &str) -> QueueId {
let absolute_path = if path.starts_with('/') {
path.to_string()
} else {
format!("/{}/{}", self.instance_id, path)
};
QueueId {
path: absolute_path,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DataSource {
None,
Cloud,
DiskStore,
DiskWal,
Memory,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u64)]
pub enum CompressionType {
None = 0,
Gzip = 1,
Xz = 2,
Zstd = 3,
}
impl TryFrom<u64> for CompressionType {
type Error = String;
fn try_from(value: u64) -> Result<Self, Self::Error> {
match value {
0 => Ok(CompressionType::None),
1 => Ok(CompressionType::Gzip),
2 => Ok(CompressionType::Xz),
3 => Ok(CompressionType::Zstd),
_ => Err(format!("Unsupported compression type: {}", value)),
}
}
}
impl From<CompressionType> for u64 {
fn from(v: CompressionType) -> Self {
v as u64
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u64)]
pub enum EncryptionType {
None = 0,
Aes = 1,
}
impl TryFrom<u64> for EncryptionType {
type Error = String;
fn try_from(value: u64) -> Result<Self, Self::Error> {
match value {
0 => Ok(EncryptionType::None),
1 => Ok(EncryptionType::Aes),
_ => Err(format!("Unsupported encryption type: {}", value)),
}
}
}
impl From<EncryptionType> for u64 {
fn from(v: EncryptionType) -> Self {
v as u64
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ReadPosition {
Absolute(UintN),
ShiftFromTail(UintN),
}
impl ReadPosition {
pub fn absolute(offset: UintN) -> Self {
Self::Absolute(offset)
}
pub fn shift_from_tail(offset: UintN) -> Self {
Self::ShiftFromTail(offset)
}
pub fn offset(&self) -> &UintN {
match self {
Self::Absolute(offset) => offset,
Self::ShiftFromTail(offset) => offset,
}
}
pub fn is_absolute(&self) -> bool {
matches!(self, Self::Absolute(_))
}
pub fn is_tail_relative(&self) -> bool {
matches!(self, Self::ShiftFromTail(_))
}
}
#[derive(Debug, Clone)]
pub struct ReadEntry {
pub id: UintN,
pub data: Bytes,
pub source: DataSource,
}
impl ReadEntry {
pub fn new(id: UintN, data: Bytes, source: DataSource) -> Self {
Self { id, data, source }
}
}
#[cfg(test)]
mod queue_id_test;