Skip to main content

hyperi_rustlib/spool/
queue.rs

1// Project:   hyperi-rustlib
2// File:      src/spool/queue.rs
3// Purpose:   Disk-backed async FIFO queue implementation using yaque
4// Language:  Rust
5//
6// License:   FSL-1.1-ALv2
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Disk-backed async FIFO queue implementation.
10
11use crate::spool::{Result, SpoolConfig, SpoolError};
12use std::path::Path;
13use yaque::{Receiver, Sender};
14
15/// A disk-backed async FIFO queue with optional compression.
16///
17/// Provides persistent storage for binary data with crash-safe writes.
18/// Items are stored in FIFO order and survive application restarts.
19///
20/// Built on [yaque](https://crates.io/crates/yaque), a fast, async,
21/// persistent queue with transactional semantics.
22pub struct Spool {
23    sender: Sender,
24    receiver: Receiver,
25    config: SpoolConfig,
26    len: usize,
27}
28
29impl Spool {
30    /// Open or create a spool at the configured path.
31    ///
32    /// If the directory exists, opens the existing queue.
33    /// If the directory doesn't exist, creates a new queue.
34    ///
35    /// # Errors
36    ///
37    /// Returns an error if the queue cannot be opened or created.
38    pub async fn open(config: SpoolConfig) -> Result<Self> {
39        let (sender, receiver) = yaque::channel(&config.path).map_err(|e| SpoolError::Open {
40            path: config.path.display().to_string(),
41            message: e.to_string(),
42        })?;
43
44        // Count existing items by walking the queue directory.
45        // yaque doesn't expose a count API, so we parse segment files to
46        // count messages between the receiver position and sender position.
47        let len = count_existing_items(&config.path).unwrap_or(0);
48
49        Ok(Self {
50            sender,
51            receiver,
52            config,
53            len,
54        })
55    }
56
57    /// Create a new spool at the given path with default settings.
58    ///
59    /// # Errors
60    ///
61    /// Returns an error if the queue cannot be created.
62    pub async fn create(path: impl AsRef<Path>) -> Result<Self> {
63        Self::open(SpoolConfig::new(path.as_ref())).await
64    }
65
66    /// Create a new spool with compression enabled.
67    ///
68    /// # Errors
69    ///
70    /// Returns an error if the queue cannot be created.
71    pub async fn create_compressed(path: impl AsRef<Path>) -> Result<Self> {
72        Self::open(SpoolConfig::with_compression(path.as_ref())).await
73    }
74
75    /// Push data onto the queue.
76    ///
77    /// If compression is enabled, the data is compressed before storage.
78    ///
79    /// # Errors
80    ///
81    /// Returns an error if:
82    /// - The queue has reached its maximum item count
83    /// - Compression fails
84    /// - I/O error occurs
85    pub async fn push(&mut self, data: &[u8]) -> Result<()> {
86        // Check item limit
87        if let Some(max) = self.config.max_items
88            && self.len >= max
89        {
90            return Err(SpoolError::MaxItemsReached { max });
91        }
92
93        // Check size limit (approximate - check before write)
94        if let Some(max_bytes) = self.config.max_size_bytes
95            && self.file_size()? >= max_bytes
96        {
97            return Err(SpoolError::MaxSizeReached { max_bytes });
98        }
99
100        let to_write = if self.config.compress {
101            self.compress(data)?
102        } else {
103            data.to_vec()
104        };
105
106        self.sender
107            .send(to_write)
108            .await
109            .map_err(|e| SpoolError::Queue(e.to_string()))?;
110
111        self.len += 1;
112        #[cfg(feature = "metrics")]
113        ::metrics::gauge!("dfe_spool_queue_depth").set(self.len as f64);
114        Ok(())
115    }
116
117    /// Peek at the first item in the queue without removing it.
118    ///
119    /// Note: In yaque, there's no direct peek. This uses try_recv and
120    /// lets the guard rollback on drop.
121    ///
122    /// # Errors
123    ///
124    /// Returns an error if decompression fails or an I/O error occurs.
125    pub async fn peek(&mut self) -> Result<Option<Vec<u8>>> {
126        match self.receiver.try_recv() {
127            Ok(guard) => {
128                // Copy data before any operations
129                let raw_data = guard.to_vec();
130                let data = if self.config.compress {
131                    zstd::decode_all(raw_data.as_slice())
132                        .map_err(|e| SpoolError::Decompression(e.to_string()))?
133                } else {
134                    raw_data
135                };
136                // Don't commit - guard drops and rolls back, keeping item in queue
137                drop(guard);
138                Ok(Some(data))
139            }
140            Err(yaque::TryRecvError::Io(e)) => Err(SpoolError::Io(e)),
141            Err(yaque::TryRecvError::QueueEmpty) => Ok(None),
142        }
143    }
144
145    /// Remove the first item from the queue.
146    ///
147    /// # Errors
148    ///
149    /// Returns an error if an I/O error occurs.
150    pub async fn pop(&mut self) -> Result<()> {
151        match self.receiver.try_recv() {
152            Ok(guard) => {
153                guard
154                    .commit()
155                    .map_err(|e| SpoolError::Queue(e.to_string()))?;
156                self.len = self.len.saturating_sub(1);
157                Ok(())
158            }
159            Err(yaque::TryRecvError::Io(e)) => Err(SpoolError::Io(e)),
160            Err(yaque::TryRecvError::QueueEmpty) => Ok(()), // Nothing to pop
161        }
162    }
163
164    /// Pop and return the first item from the queue.
165    ///
166    /// This atomically receives and removes the item.
167    ///
168    /// # Errors
169    ///
170    /// Returns an error if decompression fails or an I/O error occurs.
171    pub async fn pop_front(&mut self) -> Result<Option<Vec<u8>>> {
172        match self.receiver.try_recv() {
173            Ok(guard) => {
174                // Copy data before any operations
175                let raw_data = guard.to_vec();
176                let data = if self.config.compress {
177                    zstd::decode_all(raw_data.as_slice())
178                        .map_err(|e| SpoolError::Decompression(e.to_string()))?
179                } else {
180                    raw_data
181                };
182                guard
183                    .commit()
184                    .map_err(|e| SpoolError::Queue(e.to_string()))?;
185                self.len = self.len.saturating_sub(1);
186                #[cfg(feature = "metrics")]
187                ::metrics::gauge!("dfe_spool_queue_depth").set(self.len as f64);
188                Ok(Some(data))
189            }
190            Err(yaque::TryRecvError::Io(e)) => Err(SpoolError::Io(e)),
191            Err(yaque::TryRecvError::QueueEmpty) => Ok(None),
192        }
193    }
194
195    /// Receive an item asynchronously, waiting if the queue is empty.
196    ///
197    /// This is the preferred async method for consuming items.
198    ///
199    /// # Errors
200    ///
201    /// Returns an error if decompression fails or an I/O error occurs.
202    pub async fn recv(&mut self) -> Result<Vec<u8>> {
203        let guard = self
204            .receiver
205            .recv()
206            .await
207            .map_err(|e| SpoolError::Queue(e.to_string()))?;
208
209        // Copy data before any operations
210        let raw_data = guard.to_vec();
211        let data = if self.config.compress {
212            zstd::decode_all(raw_data.as_slice())
213                .map_err(|e| SpoolError::Decompression(e.to_string()))?
214        } else {
215            raw_data
216        };
217
218        guard
219            .commit()
220            .map_err(|e| SpoolError::Queue(e.to_string()))?;
221        self.len = self.len.saturating_sub(1);
222        #[cfg(feature = "metrics")]
223        ::metrics::gauge!("dfe_spool_queue_depth").set(self.len as f64);
224        Ok(data)
225    }
226
227    /// Get the approximate number of items in the queue.
228    ///
229    /// Note: This is tracked internally and may not be accurate
230    /// if the queue was opened with existing data.
231    #[must_use]
232    pub fn len(&self) -> usize {
233        self.len
234    }
235
236    /// Check if the queue is empty.
237    #[must_use]
238    pub fn is_empty(&self) -> bool {
239        self.len == 0
240    }
241
242    /// Clear all items from the queue.
243    ///
244    /// This removes all files in the queue directory and recreates it.
245    ///
246    /// # Errors
247    ///
248    /// Returns an error if an I/O error occurs.
249    pub fn clear(&mut self) -> Result<()> {
250        // yaque doesn't have a built-in clear, so we manually clear by
251        // removing all items
252        loop {
253            match self.receiver.try_recv() {
254                Ok(guard) => {
255                    guard
256                        .commit()
257                        .map_err(|e| SpoolError::Queue(e.to_string()))?;
258                }
259                Err(yaque::TryRecvError::QueueEmpty) => break,
260                Err(yaque::TryRecvError::Io(e)) => return Err(SpoolError::Io(e)),
261            }
262        }
263        self.len = 0;
264        #[cfg(feature = "metrics")]
265        ::metrics::gauge!("dfe_spool_queue_depth").set(0.0);
266        Ok(())
267    }
268
269    /// Get the configuration for this spool.
270    #[must_use]
271    pub fn config(&self) -> &SpoolConfig {
272        &self.config
273    }
274
275    /// Get the approximate directory size in bytes.
276    ///
277    /// # Errors
278    ///
279    /// Returns an error if the directory cannot be read.
280    pub fn file_size(&self) -> Result<u64> {
281        let mut total = 0u64;
282        if self.config.path.is_dir() {
283            for entry in std::fs::read_dir(&self.config.path)? {
284                let entry = entry?;
285                if entry.file_type()?.is_file() {
286                    total += entry.metadata()?.len();
287                }
288            }
289        }
290        Ok(total)
291    }
292
293    /// Compress data using zstd.
294    fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
295        zstd::encode_all(data, self.config.compression_level)
296            .map_err(|e| SpoolError::Compression(e.to_string()))
297    }
298}
299
300/// Count existing items in a yaque queue directory by walking segment files.
301///
302/// yaque stores messages as `[4-byte Hamming header][payload]` in segment files
303/// named `<n>.q`. The receiver position is persisted in `recv-metadata`.
304/// We count items from the receiver position to the end of the highest segment.
305fn count_existing_items(path: &std::path::Path) -> std::io::Result<usize> {
306    if !path.is_dir() {
307        return Ok(0);
308    }
309
310    // Read receiver state from recv-metadata (two big-endian u64: segment, position)
311    let recv_metadata_path = path.join("recv-metadata");
312    let (recv_segment, recv_position) = if recv_metadata_path.exists() {
313        let data = std::fs::read(&recv_metadata_path)?;
314        if data.len() >= 16 {
315            let segment = u64::from_be_bytes(data[0..8].try_into().unwrap_or([0; 8]));
316            let position = u64::from_be_bytes(data[8..16].try_into().unwrap_or([0; 8]));
317            (segment, position)
318        } else {
319            (0, 0)
320        }
321    } else {
322        (0, 0)
323    };
324
325    // Collect all segment numbers
326    let mut segments: Vec<u64> = Vec::new();
327    for entry in std::fs::read_dir(path)? {
328        let entry = entry?;
329        let file_path = entry.path();
330        if file_path.extension().and_then(|e| e.to_str()) == Some("q")
331            && let Some(stem) = file_path.file_stem().and_then(|s| s.to_str())
332            && let Ok(seg_num) = stem.parse::<u64>()
333            && seg_num >= recv_segment
334        {
335            segments.push(seg_num);
336        }
337    }
338    segments.sort_unstable();
339
340    let mut count = 0usize;
341    // Header EOF marker in yaque
342    let header_eof: [u8; 4] = [255, 255, 255, 255];
343
344    for &seg_num in &segments {
345        let seg_path = path.join(format!("{seg_num}.q"));
346        let file_data = std::fs::read(&seg_path)?;
347
348        // Start position: if this is the receiver's segment, skip to receiver position
349        #[allow(clippy::cast_possible_truncation)]
350        let start = if seg_num == recv_segment {
351            recv_position as usize
352        } else {
353            0
354        };
355
356        let mut pos = start;
357        while pos + 4 <= file_data.len() {
358            let header_bytes: [u8; 4] = file_data[pos..pos + 4].try_into().unwrap_or([0; 4]);
359
360            // Check for EOF marker
361            if header_bytes == header_eof {
362                break; // End of segment, move to next
363            }
364
365            // Decode length from Hamming-encoded header (lower 26 bits)
366            let encoded = u32::from_be_bytes(header_bytes);
367            let payload_len = (encoded & 0x03_FF_FF_FF) as usize;
368
369            pos += 4 + payload_len;
370            if pos <= file_data.len() {
371                count += 1;
372            }
373        }
374    }
375
376    Ok(count)
377}
378
379impl std::fmt::Debug for Spool {
380    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
381        f.debug_struct("Spool")
382            .field("path", &self.config.path)
383            .field("len", &self.len)
384            .field("compress", &self.config.compress)
385            .finish_non_exhaustive()
386    }
387}
388
389#[cfg(test)]
390mod tests {
391    use super::*;
392    use tempfile::tempdir;
393
394    #[tokio::test]
395    async fn test_create_and_push_pop() {
396        let dir = tempdir().unwrap();
397        let path = dir.path().join("test-queue");
398
399        let mut spool = Spool::create(&path).await.unwrap();
400        assert!(spool.is_empty());
401
402        spool.push(b"hello").await.unwrap();
403        spool.push(b"world").await.unwrap();
404
405        assert_eq!(spool.len(), 2);
406        assert!(!spool.is_empty());
407
408        assert_eq!(spool.pop_front().await.unwrap(), Some(b"hello".to_vec()));
409        assert_eq!(spool.pop_front().await.unwrap(), Some(b"world".to_vec()));
410
411        assert!(spool.is_empty());
412    }
413
414    #[tokio::test]
415    async fn test_pop_front_empty() {
416        let dir = tempdir().unwrap();
417        let path = dir.path().join("test-queue");
418
419        let mut spool = Spool::create(&path).await.unwrap();
420        assert_eq!(spool.pop_front().await.unwrap(), None);
421    }
422
423    #[tokio::test]
424    async fn test_compression() {
425        let dir = tempdir().unwrap();
426        let path = dir.path().join("test-queue");
427
428        let mut spool = Spool::create_compressed(&path).await.unwrap();
429
430        let data = b"hello world ".repeat(100);
431        spool.push(&data).await.unwrap();
432
433        // Verify decompression works - data comes back correctly
434        let retrieved = spool.pop_front().await.unwrap().unwrap();
435        assert_eq!(retrieved, data);
436    }
437
438    #[tokio::test]
439    async fn test_max_items_limit() {
440        let dir = tempdir().unwrap();
441        let path = dir.path().join("test-queue");
442
443        let config = SpoolConfig::new(&path).max_items(2);
444        let mut spool = Spool::open(config).await.unwrap();
445
446        spool.push(b"one").await.unwrap();
447        spool.push(b"two").await.unwrap();
448
449        let result = spool.push(b"three").await;
450        assert!(matches!(
451            result,
452            Err(SpoolError::MaxItemsReached { max: 2 })
453        ));
454    }
455
456    #[tokio::test]
457    async fn test_clear() {
458        let dir = tempdir().unwrap();
459        let path = dir.path().join("test-queue");
460
461        let mut spool = Spool::create(&path).await.unwrap();
462        spool.push(b"one").await.unwrap();
463        spool.push(b"two").await.unwrap();
464
465        assert_eq!(spool.len(), 2);
466        spool.clear().unwrap();
467        assert!(spool.is_empty());
468    }
469
470    #[tokio::test]
471    async fn test_len_survives_reopen() {
472        let dir = tempdir().unwrap();
473        let path = dir.path().join("test-reopen-queue");
474
475        // Open, push items, then drop
476        {
477            let mut spool = Spool::create(&path).await.unwrap();
478            spool.push(b"one").await.unwrap();
479            spool.push(b"two").await.unwrap();
480            spool.push(b"three").await.unwrap();
481            assert_eq!(spool.len(), 3);
482        }
483
484        // Reopen -- len should reflect existing items
485        {
486            let spool = Spool::create(&path).await.unwrap();
487            assert_eq!(spool.len(), 3);
488        }
489    }
490
491    #[tokio::test]
492    async fn test_len_survives_partial_consume_and_reopen() {
493        let dir = tempdir().unwrap();
494        let path = dir.path().join("test-partial-queue");
495
496        // Open, push 5, consume 2
497        {
498            let mut spool = Spool::create(&path).await.unwrap();
499            for i in 0..5 {
500                spool.push(format!("item-{i}").as_bytes()).await.unwrap();
501            }
502            assert_eq!(spool.len(), 5);
503            spool.pop_front().await.unwrap(); // consume 1
504            spool.pop_front().await.unwrap(); // consume 2
505            assert_eq!(spool.len(), 3);
506        }
507
508        // Reopen -- should show 3 remaining
509        {
510            let spool = Spool::create(&path).await.unwrap();
511            assert_eq!(spool.len(), 3);
512        }
513    }
514
515    #[tokio::test]
516    async fn test_debug_format() {
517        let dir = tempdir().unwrap();
518        let path = dir.path().join("test-queue");
519
520        let spool = Spool::create(&path).await.unwrap();
521        let debug = format!("{spool:?}");
522        assert!(debug.contains("Spool"));
523        assert!(debug.contains("test-queue"));
524    }
525}