Skip to main content

khata_rs/
queue.rs

1//! khata-rs Queue implementation.
2
3use std::collections::HashMap;
4use std::fs;
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use parking_lot::RwLock;
10
11use crate::cycle::{FastDaily, RollCycle};
12use crate::error::{Error, Result};
13use crate::reader::Reader;
14use crate::store::{Store, StoreConfig};
15use crate::writer::Writer;
16
17/// File extension for Chronicle Queue data files.
18pub const FILE_EXTENSION: &str = "cq4";
19
20/// Builder for creating a [`Queue`].
21///
22/// # Example
23///
24/// ```no_run
25/// use khata_rs::{QueueBuilder, cycle::FastHourly};
26///
27/// let queue = QueueBuilder::new("/tmp/my-queue")
28///     .roll_cycle(FastHourly)
29///     .block_size(32 * 1024 * 1024)
30///     .build()?;
31/// # Ok::<(), khata_rs::Error>(())
32/// ```
33pub struct QueueBuilder {
34    path: PathBuf,
35    roll_cycle: Arc<dyn RollCycle>,
36    block_size: usize,
37    index_spacing: u32,
38    index_count: u32,
39    epoch: u64,
40    read_only: bool,
41    enable_checksums: bool,
42}
43
44impl QueueBuilder {
45    /// Creates a new builder for a queue at the given path.
46    pub fn new(path: impl Into<PathBuf>) -> Self {
47        Self {
48            path: path.into(),
49            roll_cycle: Arc::new(FastDaily),
50            block_size: 64 * 1024 * 1024,
51            index_spacing: 256,
52            index_count: 4096,
53            epoch: 0,
54            read_only: false,
55            enable_checksums: false,
56        }
57    }
58
59    /// Sets the roll cycle.
60    pub fn roll_cycle<R: RollCycle + 'static>(mut self, cycle: R) -> Self {
61        self.roll_cycle = Arc::new(cycle);
62        self
63    }
64
65    /// Sets the block size for memory-mapped files (default: 64 MB).
66    pub fn block_size(mut self, size: usize) -> Self {
67        self.block_size = size;
68        self
69    }
70
71    /// Sets the index spacing (default: 256)
72    /// Has to be a power of two for bitwise masks
73    pub fn index_spacing(mut self, spacing: u32) -> Self {
74        self.index_spacing = spacing.next_power_of_two();
75        self
76    }
77
78    /// Sets the index count per array (default: 4096).
79    /// Has to be a power of two for bitwise masks.
80    pub fn index_count(mut self, count: u32) -> Self {
81        self.index_count = count.next_power_of_two();
82        self
83    }
84
85    /// Sets the epoch offset in milliseconds (default: 0).
86    pub fn epoch(mut self, epoch: u64) -> Self {
87        self.epoch = epoch;
88        self
89    }
90
91    /// Opens in read-only mode.
92    pub fn read_only(mut self, read_only: bool) -> Self {
93        self.read_only = read_only;
94        self
95    }
96
97    /// Enables or disables CRC-16 checksums for data integrity (default: true).
98    ///
99    /// When enabled, each message is written with a CRC-16 checksum that can
100    /// be verified on read. Disabling checksums reduces latency but removes
101    /// data integrity verification.
102    pub fn checksums(mut self, enable: bool) -> Self {
103        self.enable_checksums = enable;
104        self
105    }
106
107    /// Builds the queue.
108    pub fn build(self) -> Result<Queue> {
109        Queue::open(self)
110    }
111}
112
113/// A persisted, low-latency message queue.
114///
115/// Uses memory-mapped files organized into cycle files based on time.
116///
117/// # Thread Safety
118///
119/// - One [`Writer`](crate::Writer) per thread
120/// - Multiple [`Reader`]s can read concurrently
121/// - The queue itself is `Send + Sync`
122///
123/// # Example
124///
125/// ```no_run
126/// use khata_rs::Queue;
127///
128/// let queue = Queue::new("/tmp/my-queue").build()?;
129///
130/// let mut writer = queue.writer()?;
131/// writer.write(b"Hello, World!")?;
132/// # Ok::<(), khata_rs::Error>(())
133/// ```
134pub struct Queue {
135    path: PathBuf,
136    roll_cycle: Arc<dyn RollCycle>,
137    block_size: usize,
138    index_spacing: u32,
139    index_count: u32,
140    epoch: u64,
141    read_only: bool,
142    enable_checksums: bool,
143    store_cache: RwLock<HashMap<i32, Arc<Store>>>,
144    directory_listing: RwLock<Vec<i32>>,
145}
146
147impl Queue {
148    /// Creates a new builder for a queue at the given path.
149    #[must_use]
150    pub fn new(path: impl Into<PathBuf>) -> QueueBuilder {
151        QueueBuilder::new(path)
152    }
153
154    /// Opens or creates a queue from builder config.
155    fn open(builder: QueueBuilder) -> Result<Self> {
156        if !builder.path.exists() {
157            if builder.read_only {
158                return Err(Error::StoreFileMissing(builder.path));
159            }
160            fs::create_dir_all(&builder.path)
161                .map_err(|_| Error::DirectoryCreationFailed(builder.path.clone()))?;
162        }
163
164        let queue = Self {
165            path: builder.path,
166            roll_cycle: builder.roll_cycle,
167            block_size: builder.block_size,
168            index_spacing: builder.index_spacing,
169            index_count: builder.index_count,
170            epoch: builder.epoch,
171            read_only: builder.read_only,
172            enable_checksums: builder.enable_checksums,
173            store_cache: RwLock::new(HashMap::new()),
174            directory_listing: RwLock::new(Vec::new()),
175        };
176
177        queue.refresh_directory_listing()?;
178        Ok(queue)
179    }
180
181    /// Creates a writer for appending messages.
182    ///
183    /// # Errors
184    ///
185    /// Returns `Error::ReadOnly` if opened in read-only mode.
186    pub fn writer(&self) -> Result<Writer<'_>> {
187        if self.read_only {
188            return Err(Error::ReadOnly);
189        }
190        Writer::new(self)
191    }
192
193    /// Creates a reader for consuming messages.
194    pub fn reader(&self) -> Result<Reader<'_>> {
195        Reader::new(self)
196    }
197
198    /// Returns the queue directory path.
199    #[must_use]
200    pub fn path(&self) -> &Path {
201        &self.path
202    }
203
204    /// Returns the roll cycle configuration.
205    #[must_use]
206    pub fn roll_cycle(&self) -> &dyn RollCycle {
207        self.roll_cycle.as_ref()
208    }
209
210    /// Returns the block size.
211    #[must_use]
212    pub fn block_size(&self) -> usize {
213        self.block_size
214    }
215
216    /// Returns the epoch offset.
217    #[must_use]
218    pub fn epoch(&self) -> u64 {
219        self.epoch
220    }
221
222    /// Returns whether the queue is read-only.
223    #[must_use]
224    pub fn is_read_only(&self) -> bool {
225        self.read_only
226    }
227
228    /// Returns whether CRC-16 checksums are enabled.
229    #[must_use]
230    pub fn checksums_enabled(&self) -> bool {
231        self.enable_checksums
232    }
233
234    /// Returns the index spacing.
235    #[must_use]
236    pub fn index_spacing(&self) -> u32 {
237        self.index_spacing
238    }
239
240    /// Returns the index count per array.
241    #[must_use]
242    pub fn index_count(&self) -> u32 {
243        self.index_count
244    }
245
246    /// Returns the first (oldest) cycle, if any.
247    #[must_use]
248    pub fn first_cycle(&self) -> Option<i32> {
249        self.directory_listing.read().first().copied()
250    }
251
252    /// Returns the last (newest) cycle, if any.
253    #[must_use]
254    pub fn last_cycle(&self) -> Option<i32> {
255        self.directory_listing.read().last().copied()
256    }
257
258    /// Returns the current cycle based on current time.
259    #[must_use]
260    pub fn current_cycle(&self) -> i32 {
261        let now = SystemTime::now()
262            .duration_since(UNIX_EPOCH)
263            .map(|d| d.as_millis() as u64)
264            .unwrap_or(0);
265        self.roll_cycle.current_cycle(now, self.epoch)
266    }
267
268    /// Acquires a store for the given cycle.
269    pub(crate) fn acquire_store(&self, cycle: i32) -> Result<Arc<Store>> {
270        {
271            let cache = self.store_cache.read();
272            if let Some(store) = cache.get(&cycle) {
273                return Ok(Arc::clone(store));
274            }
275        }
276
277        let store = Arc::new(self.create_store(cycle)?);
278
279        {
280            let mut cache = self.store_cache.write();
281            cache.insert(cycle, Arc::clone(&store));
282        }
283
284        {
285            let mut listing = self.directory_listing.write();
286            if !listing.contains(&cycle) {
287                listing.push(cycle);
288                listing.sort_unstable();
289            }
290        }
291
292        Ok(store)
293    }
294
295    /// Creates a new store for the given cycle.
296    fn create_store(&self, cycle: i32) -> Result<Store> {
297        let filename = format!(
298            "{}.{}",
299            self.roll_cycle.format_cycle(cycle, self.epoch),
300            FILE_EXTENSION
301        );
302        let config = StoreConfig {
303            block_size: self.block_size,
304            enable_checksums: self.enable_checksums,
305            ..StoreConfig::default()
306        };
307        Store::open(self.path.join(filename), cycle, config, self.read_only)
308    }
309
310    /// Generates the filename for a given cycle.
311    #[must_use]
312    pub fn filename_for_cycle(&self, cycle: i32) -> String {
313        format!(
314            "{}.{}",
315            self.roll_cycle.format_cycle(cycle, self.epoch),
316            FILE_EXTENSION
317        )
318    }
319
320    /// Refreshes the directory listing.
321    fn refresh_directory_listing(&self) -> Result<()> {
322        let mut cycles = Vec::new();
323
324        if let Ok(entries) = fs::read_dir(&self.path) {
325            for entry in entries.flatten() {
326                let path = entry.path();
327                if path.extension().is_some_and(|ext| ext == FILE_EXTENSION) {
328                    if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
329                        if let Some(cycle) = self.roll_cycle.parse_cycle(stem, self.epoch) {
330                            cycles.push(cycle);
331                        }
332                    }
333                }
334            }
335        }
336
337        cycles.sort_unstable();
338
339        *self.directory_listing.write() = cycles;
340        Ok(())
341    }
342}
343
344#[cfg(test)]
345mod tests {
346    use super::*;
347    use tempfile::TempDir;
348
349    #[test]
350    fn test_builder_defaults() {
351        let builder = QueueBuilder::new("/tmp/test");
352        assert_eq!(builder.block_size, 64 * 1024 * 1024);
353        assert_eq!(builder.index_spacing, 256);
354        assert_eq!(builder.index_count, 4096);
355        assert_eq!(builder.epoch, 0);
356        assert!(!builder.read_only);
357        assert!(!builder.enable_checksums);
358    }
359
360    #[test]
361    fn test_checksums_enabled() {
362        let temp_dir = TempDir::new().unwrap();
363        let queue = Queue::new(temp_dir.path()).checksums(true).build().unwrap();
364
365        assert!(queue.checksums_enabled());
366    }
367
368    #[test]
369    fn test_creation() {
370        let temp_dir = TempDir::new().unwrap();
371        let queue = Queue::new(temp_dir.path()).build().unwrap();
372
373        assert!(queue.path().exists());
374        assert!(!queue.is_read_only());
375    }
376
377    #[test]
378    fn test_filename_for_cycle() {
379        let temp_dir = TempDir::new().unwrap();
380        let queue = Queue::new(temp_dir.path()).build().unwrap();
381
382        assert_eq!(queue.filename_for_cycle(0), "19700101.cq4");
383    }
384
385    #[test]
386    fn test_single_producer_multiple_consumer() {
387        let temp_dir = TempDir::new().unwrap();
388        let queue = Queue::new(temp_dir.path()).build().unwrap();
389
390        let mut writer = queue.writer().unwrap();
391
392        // Write multiple messages
393        let messages: Vec<Vec<u8>> = (0..10)
394            .map(|i| format!("message-{}", i).into_bytes())
395            .collect();
396
397        for msg in &messages {
398            writer.write(msg).unwrap();
399        }
400
401        // Create two independent readers
402        let mut r1 = queue.reader().unwrap();
403        r1.rewind().unwrap();
404
405        let mut r2 = queue.reader().unwrap();
406        r2.rewind().unwrap();
407
408        // Both readers should independently read all messages
409        for expected in &messages {
410            let read1 = r1.read().unwrap();
411            assert!(read1.is_some(), "r1 should have data");
412            assert_eq!(read1.unwrap().as_ref(), expected.as_slice());
413
414            let read2 = r2.read().unwrap();
415            assert!(read2.is_some(), "r2 should have data");
416            assert_eq!(read2.unwrap().as_ref(), expected.as_slice());
417        }
418
419        // Both readers should now be at the end
420        assert!(r1.read().unwrap().is_none());
421        assert!(r2.read().unwrap().is_none());
422    }
423}