Skip to main content

normfs_types/
lib.rs

1use bytes::Bytes;
2use uintn::UintN;
3
4/// Callback type for subscription notifications
5pub type SubscriberCallback = Box<dyn Fn(&[(UintN, Bytes)]) -> bool + Send + Sync>;
6
7/// Queue identifier with support for absolute and relative paths
8/// The path field always contains the absolute path (with instance_id embedded for relative paths)
9#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
10pub struct QueueId {
11    path: String,
12}
13
14impl std::fmt::Debug for QueueId {
15    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
16        write!(f, "{}", self.path)
17    }
18}
19
20impl std::fmt::Display for QueueId {
21    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22        write!(f, "{}", self.path)
23    }
24}
25
26impl QueueId {
27    /// Get the absolute queue path as a string slice
28    pub fn as_str(&self) -> &str {
29        &self.path
30    }
31
32    /// Get the queue path as a base for cryptographic key derivation
33    pub fn to_key_derivation_base(&self) -> &str {
34        &self.path
35    }
36
37    /// Get the queue path for cloud storage with base prefix
38    /// Example: "prefix/instance_id/queue_name/"
39    pub fn to_cloud_queue_path(&self, base_prefix: &str) -> String {
40        format!("{}{}/", base_prefix, self.path.trim_start_matches('/'))
41    }
42
43    fn to_fs_path(&self, prefix: &std::path::Path) -> std::path::PathBuf {
44        let relative = self.path.trim_start_matches('/');
45        prefix.join(relative)
46    }
47
48    /// Construct the full filesystem path to the store directory
49    /// Example: /base/instance_id/queue_name/store
50    pub fn to_store_dir(&self, base: &std::path::Path) -> std::path::PathBuf {
51        self.to_fs_path(base).join("store")
52    }
53
54    /// Construct the full filesystem path to a store file
55    /// Example: /base/instance_id/queue_name/store/abc/def/123.store
56    pub fn to_store_path(&self, base: &std::path::Path, file_id: &UintN) -> std::path::PathBuf {
57        let queue_path = self.to_store_dir(base);
58        file_id.to_file_path(&queue_path.to_string_lossy(), "store")
59    }
60
61    /// Construct the full filesystem path to the WAL directory
62    /// Example: /base/instance_id/queue_name/wal
63    pub fn to_wal_dir(&self, base: &std::path::Path) -> std::path::PathBuf {
64        self.to_fs_path(base).join("wal")
65    }
66
67    /// Construct the full filesystem path to a WAL file
68    /// Example: /base/instance_id/queue_name/wal/123.wal
69    pub fn to_wal_path(&self, base: &std::path::Path, file_id: &UintN) -> std::path::PathBuf {
70        let queue_path = self.to_wal_dir(base);
71        file_id.to_file_path(&queue_path.to_string_lossy(), "wal")
72    }
73
74    /// Construct the cloud storage key for a store file (always uses forward slashes)
75    /// Example: prefix/instance_id/queue_name/abc/def/123.store
76    /// Note: Only store files are stored in cloud storage, not WAL files
77    pub fn to_cloud_key(&self, base_prefix: &str, file_id: &UintN) -> String {
78        let queue_path = self.path.trim_start_matches('/');
79        let file_path_buf = file_id.to_file_path("", "store");
80        let file_path = file_path_buf.to_str().unwrap_or("").trim_start_matches('/');
81
82        if base_prefix.is_empty() {
83            format!("{}/{}", queue_path, file_path)
84        } else {
85            format!("{}{}/{}", base_prefix, queue_path, file_path)
86        }
87    }
88}
89
90/// Resolver for QueueId operations that require instance_id
91#[derive(Debug, Clone)]
92pub struct QueueIdResolver {
93    instance_id: String,
94}
95
96impl QueueIdResolver {
97    pub fn new(instance_id: impl Into<String>) -> Self {
98        Self {
99            instance_id: instance_id.into(),
100        }
101    }
102
103    pub fn instance_id(&self) -> &str {
104        &self.instance_id
105    }
106
107    /// Resolve a queue path to a QueueId with absolute path
108    /// Relative paths are prefixed with /instance_id/, absolute paths (starting with /) are used as-is
109    pub fn resolve(&self, path: &str) -> QueueId {
110        let absolute_path = if path.starts_with('/') {
111            path.to_string()
112        } else {
113            format!("/{}/{}", self.instance_id, path)
114        };
115
116        QueueId {
117            path: absolute_path,
118        }
119    }
120}
121
122#[derive(Debug, Clone, Copy, PartialEq, Eq)]
123pub enum DataSource {
124    None,
125    Cloud,
126    DiskStore,
127    DiskWal,
128    Memory,
129}
130
131#[derive(Debug, Clone, Copy, PartialEq, Eq)]
132#[repr(u64)]
133pub enum CompressionType {
134    None = 0,
135    Gzip = 1,
136    Xz = 2,
137    Zstd = 3,
138}
139
140impl TryFrom<u64> for CompressionType {
141    type Error = String;
142
143    fn try_from(value: u64) -> Result<Self, Self::Error> {
144        match value {
145            0 => Ok(CompressionType::None),
146            1 => Ok(CompressionType::Gzip),
147            2 => Ok(CompressionType::Xz),
148            3 => Ok(CompressionType::Zstd),
149            _ => Err(format!("Unsupported compression type: {}", value)),
150        }
151    }
152}
153
154impl From<CompressionType> for u64 {
155    fn from(v: CompressionType) -> Self {
156        v as u64
157    }
158}
159
160#[derive(Debug, Clone, Copy, PartialEq, Eq)]
161#[repr(u64)]
162pub enum EncryptionType {
163    None = 0,
164    Aes = 1,
165}
166
167impl TryFrom<u64> for EncryptionType {
168    type Error = String;
169
170    fn try_from(value: u64) -> Result<Self, Self::Error> {
171        match value {
172            0 => Ok(EncryptionType::None),
173            1 => Ok(EncryptionType::Aes),
174            _ => Err(format!("Unsupported encryption type: {}", value)),
175        }
176    }
177}
178
179impl From<EncryptionType> for u64 {
180    fn from(v: EncryptionType) -> Self {
181        v as u64
182    }
183}
184
185#[derive(Debug, Clone, PartialEq, Eq)]
186pub enum ReadPosition {
187    Absolute(UintN),
188    ShiftFromTail(UintN),
189}
190
191impl ReadPosition {
192    pub fn absolute(offset: UintN) -> Self {
193        Self::Absolute(offset)
194    }
195
196    pub fn shift_from_tail(offset: UintN) -> Self {
197        Self::ShiftFromTail(offset)
198    }
199
200    pub fn offset(&self) -> &UintN {
201        match self {
202            Self::Absolute(offset) => offset,
203            Self::ShiftFromTail(offset) => offset,
204        }
205    }
206
207    pub fn is_absolute(&self) -> bool {
208        matches!(self, Self::Absolute(_))
209    }
210
211    pub fn is_tail_relative(&self) -> bool {
212        matches!(self, Self::ShiftFromTail(_))
213    }
214}
215
216#[derive(Debug, Clone)]
217pub struct ReadEntry {
218    pub id: UintN,
219    pub data: Bytes,
220    pub source: DataSource,
221}
222
223impl ReadEntry {
224    pub fn new(id: UintN, data: Bytes, source: DataSource) -> Self {
225        Self { id, data, source }
226    }
227}
228
229#[cfg(test)]
230mod queue_id_test;