1use bytes::Bytes;
2use uintn::UintN;
3
4pub type SubscriberCallback = Box<dyn Fn(&[(UintN, Bytes)]) -> bool + Send + Sync>;
6
7#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
10pub struct QueueId {
11 path: String,
12}
13
14impl std::fmt::Debug for QueueId {
15 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
16 write!(f, "{}", self.path)
17 }
18}
19
20impl std::fmt::Display for QueueId {
21 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22 write!(f, "{}", self.path)
23 }
24}
25
26impl QueueId {
27 pub fn as_str(&self) -> &str {
29 &self.path
30 }
31
32 pub fn to_key_derivation_base(&self) -> &str {
34 &self.path
35 }
36
37 pub fn to_cloud_queue_path(&self, base_prefix: &str) -> String {
40 format!("{}{}/", base_prefix, self.path.trim_start_matches('/'))
41 }
42
43 fn to_fs_path(&self, prefix: &std::path::Path) -> std::path::PathBuf {
44 let relative = self.path.trim_start_matches('/');
45 prefix.join(relative)
46 }
47
48 pub fn to_store_dir(&self, base: &std::path::Path) -> std::path::PathBuf {
51 self.to_fs_path(base).join("store")
52 }
53
54 pub fn to_store_path(&self, base: &std::path::Path, file_id: &UintN) -> std::path::PathBuf {
57 let queue_path = self.to_store_dir(base);
58 file_id.to_file_path(&queue_path.to_string_lossy(), "store")
59 }
60
61 pub fn to_wal_dir(&self, base: &std::path::Path) -> std::path::PathBuf {
64 self.to_fs_path(base).join("wal")
65 }
66
67 pub fn to_wal_path(&self, base: &std::path::Path, file_id: &UintN) -> std::path::PathBuf {
70 let queue_path = self.to_wal_dir(base);
71 file_id.to_file_path(&queue_path.to_string_lossy(), "wal")
72 }
73
74 pub fn to_cloud_key(&self, base_prefix: &str, file_id: &UintN) -> String {
78 let queue_path = self.path.trim_start_matches('/');
79 let file_path_buf = file_id.to_file_path("", "store");
80 let file_path = file_path_buf.to_str().unwrap_or("").trim_start_matches('/');
81
82 if base_prefix.is_empty() {
83 format!("{}/{}", queue_path, file_path)
84 } else {
85 format!("{}{}/{}", base_prefix, queue_path, file_path)
86 }
87 }
88}
89
90#[derive(Debug, Clone)]
92pub struct QueueIdResolver {
93 instance_id: String,
94}
95
96impl QueueIdResolver {
97 pub fn new(instance_id: impl Into<String>) -> Self {
98 Self {
99 instance_id: instance_id.into(),
100 }
101 }
102
103 pub fn instance_id(&self) -> &str {
104 &self.instance_id
105 }
106
107 pub fn resolve(&self, path: &str) -> QueueId {
110 let absolute_path = if path.starts_with('/') {
111 path.to_string()
112 } else {
113 format!("/{}/{}", self.instance_id, path)
114 };
115
116 QueueId {
117 path: absolute_path,
118 }
119 }
120}
121
122#[derive(Debug, Clone, Copy, PartialEq, Eq)]
123pub enum DataSource {
124 None,
125 Cloud,
126 DiskStore,
127 DiskWal,
128 Memory,
129}
130
131#[derive(Debug, Clone, Copy, PartialEq, Eq)]
132#[repr(u64)]
133pub enum CompressionType {
134 None = 0,
135 Gzip = 1,
136 Xz = 2,
137 Zstd = 3,
138}
139
140impl TryFrom<u64> for CompressionType {
141 type Error = String;
142
143 fn try_from(value: u64) -> Result<Self, Self::Error> {
144 match value {
145 0 => Ok(CompressionType::None),
146 1 => Ok(CompressionType::Gzip),
147 2 => Ok(CompressionType::Xz),
148 3 => Ok(CompressionType::Zstd),
149 _ => Err(format!("Unsupported compression type: {}", value)),
150 }
151 }
152}
153
154impl From<CompressionType> for u64 {
155 fn from(v: CompressionType) -> Self {
156 v as u64
157 }
158}
159
160#[derive(Debug, Clone, Copy, PartialEq, Eq)]
161#[repr(u64)]
162pub enum EncryptionType {
163 None = 0,
164 Aes = 1,
165}
166
167impl TryFrom<u64> for EncryptionType {
168 type Error = String;
169
170 fn try_from(value: u64) -> Result<Self, Self::Error> {
171 match value {
172 0 => Ok(EncryptionType::None),
173 1 => Ok(EncryptionType::Aes),
174 _ => Err(format!("Unsupported encryption type: {}", value)),
175 }
176 }
177}
178
179impl From<EncryptionType> for u64 {
180 fn from(v: EncryptionType) -> Self {
181 v as u64
182 }
183}
184
185#[derive(Debug, Clone, PartialEq, Eq)]
186pub enum ReadPosition {
187 Absolute(UintN),
188 ShiftFromTail(UintN),
189}
190
191impl ReadPosition {
192 pub fn absolute(offset: UintN) -> Self {
193 Self::Absolute(offset)
194 }
195
196 pub fn shift_from_tail(offset: UintN) -> Self {
197 Self::ShiftFromTail(offset)
198 }
199
200 pub fn offset(&self) -> &UintN {
201 match self {
202 Self::Absolute(offset) => offset,
203 Self::ShiftFromTail(offset) => offset,
204 }
205 }
206
207 pub fn is_absolute(&self) -> bool {
208 matches!(self, Self::Absolute(_))
209 }
210
211 pub fn is_tail_relative(&self) -> bool {
212 matches!(self, Self::ShiftFromTail(_))
213 }
214}
215
216#[derive(Debug, Clone)]
217pub struct ReadEntry {
218 pub id: UintN,
219 pub data: Bytes,
220 pub source: DataSource,
221}
222
223impl ReadEntry {
224 pub fn new(id: UintN, data: Bytes, source: DataSource) -> Self {
225 Self { id, data, source }
226 }
227}
228
229#[cfg(test)]
230mod queue_id_test;