Skip to main content

hyperi_rustlib/spool/
queue.rs

1// Project:   hyperi-rustlib
2// File:      src/spool/queue.rs
3// Purpose:   Disk-backed async FIFO queue implementation using yaque
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Disk-backed async FIFO queue implementation.
10
11use crate::spool::{Result, SpoolConfig, SpoolError};
12use std::path::Path;
13use yaque::{Receiver, Sender};
14
15/// Disk-backed async FIFO queue with optional compression.
16///
17/// Crash-safe writes, survives restarts. Built on
18/// [yaque](https://crates.io/crates/yaque) (transactional persistent queue).
19pub struct Spool {
20    sender: Sender,
21    receiver: Receiver,
22    config: SpoolConfig,
23    len: usize,
24}
25
26impl Spool {
27    /// Open the queue at the configured path, creating it if absent.
28    ///
29    /// # Errors
30    ///
31    /// Returns an error if the queue cannot be opened or created.
32    pub async fn open(config: SpoolConfig) -> Result<Self> {
33        let (sender, receiver) = yaque::channel(&config.path).map_err(|e| SpoolError::Open {
34            path: config.path.display().to_string(),
35            message: e.to_string(),
36        })?;
37
38        // yaque exposes no count API -- parse segment files to count items
39        // between the receiver position and the end.
40        let len = count_existing_items(&config.path).unwrap_or(0);
41
42        Ok(Self {
43            sender,
44            receiver,
45            config,
46            len,
47        })
48    }
49
50    /// Create a new spool at the given path with default settings.
51    ///
52    /// # Errors
53    ///
54    /// Returns an error if the queue cannot be created.
55    pub async fn create(path: impl AsRef<Path>) -> Result<Self> {
56        Self::open(SpoolConfig::new(path.as_ref())).await
57    }
58
59    /// Create a new spool with compression enabled.
60    ///
61    /// # Errors
62    ///
63    /// Returns an error if the queue cannot be created.
64    pub async fn create_compressed(path: impl AsRef<Path>) -> Result<Self> {
65        Self::open(SpoolConfig::with_compression(path.as_ref())).await
66    }
67
68    /// Push data onto the queue, compressing first if enabled.
69    ///
70    /// # Errors
71    ///
72    /// Errors on size/item-count cap, compression failure, or I/O.
73    pub async fn push(&mut self, data: &[u8]) -> Result<()> {
74        if let Some(max) = self.config.max_items
75            && self.len >= max
76        {
77            return Err(SpoolError::MaxItemsReached { max });
78        }
79
80        // Approximate -- checked before write.
81        if let Some(max_bytes) = self.config.max_size_bytes
82            && self.file_size()? >= max_bytes
83        {
84            return Err(SpoolError::MaxSizeReached { max_bytes });
85        }
86
87        let to_write = if self.config.compress {
88            self.compress(data)?
89        } else {
90            data.to_vec()
91        };
92
93        self.sender
94            .send(to_write)
95            .await
96            .map_err(|e| SpoolError::Queue(e.to_string()))?;
97
98        self.len += 1;
99        #[cfg(feature = "metrics")]
100        ::metrics::gauge!("dfe_spool_queue_depth").set(self.len as f64);
101        Ok(())
102    }
103
104    /// Peek at the first item without removing it.
105    ///
106    /// yaque has no direct peek -- `try_recv` then let the guard roll back
107    /// on drop to leave the item in the queue.
108    ///
109    /// # Errors
110    ///
111    /// Returns an error if decompression fails or an I/O error occurs.
112    pub async fn peek(&mut self) -> Result<Option<Vec<u8>>> {
113        match self.receiver.try_recv() {
114            Ok(guard) => {
115                let raw_data = guard.to_vec();
116                let data = if self.config.compress {
117                    zstd::decode_all(raw_data.as_slice())
118                        .map_err(|e| SpoolError::Decompression(e.to_string()))?
119                } else {
120                    raw_data
121                };
122                // No commit -- guard rollback on drop keeps the item.
123                drop(guard);
124                Ok(Some(data))
125            }
126            Err(yaque::TryRecvError::Io(e)) => Err(SpoolError::Io(e)),
127            Err(yaque::TryRecvError::QueueEmpty) => Ok(None),
128        }
129    }
130
131    /// Remove the first item from the queue.
132    ///
133    /// # Errors
134    ///
135    /// Returns an error if an I/O error occurs.
136    pub async fn pop(&mut self) -> Result<()> {
137        match self.receiver.try_recv() {
138            Ok(guard) => {
139                guard
140                    .commit()
141                    .map_err(|e| SpoolError::Queue(e.to_string()))?;
142                self.len = self.len.saturating_sub(1);
143                Ok(())
144            }
145            Err(yaque::TryRecvError::Io(e)) => Err(SpoolError::Io(e)),
146            Err(yaque::TryRecvError::QueueEmpty) => Ok(()), // Nothing to pop
147        }
148    }
149
150    /// Pop and return the first item, atomically receiving and removing it.
151    ///
152    /// # Errors
153    ///
154    /// Returns an error if decompression fails or an I/O error occurs.
155    pub async fn pop_front(&mut self) -> Result<Option<Vec<u8>>> {
156        match self.receiver.try_recv() {
157            Ok(guard) => {
158                let raw_data = guard.to_vec();
159                let data = if self.config.compress {
160                    zstd::decode_all(raw_data.as_slice())
161                        .map_err(|e| SpoolError::Decompression(e.to_string()))?
162                } else {
163                    raw_data
164                };
165                guard
166                    .commit()
167                    .map_err(|e| SpoolError::Queue(e.to_string()))?;
168                self.len = self.len.saturating_sub(1);
169                #[cfg(feature = "metrics")]
170                ::metrics::gauge!("dfe_spool_queue_depth").set(self.len as f64);
171                Ok(Some(data))
172            }
173            Err(yaque::TryRecvError::Io(e)) => Err(SpoolError::Io(e)),
174            Err(yaque::TryRecvError::QueueEmpty) => Ok(None),
175        }
176    }
177
178    /// Receive an item, awaiting if the queue is empty. Preferred consumer API.
179    ///
180    /// # Errors
181    ///
182    /// Returns an error if decompression fails or an I/O error occurs.
183    pub async fn recv(&mut self) -> Result<Vec<u8>> {
184        let guard = self
185            .receiver
186            .recv()
187            .await
188            .map_err(|e| SpoolError::Queue(e.to_string()))?;
189
190        let raw_data = guard.to_vec();
191        let data = if self.config.compress {
192            zstd::decode_all(raw_data.as_slice())
193                .map_err(|e| SpoolError::Decompression(e.to_string()))?
194        } else {
195            raw_data
196        };
197
198        guard
199            .commit()
200            .map_err(|e| SpoolError::Queue(e.to_string()))?;
201        self.len = self.len.saturating_sub(1);
202        #[cfg(feature = "metrics")]
203        ::metrics::gauge!("dfe_spool_queue_depth").set(self.len as f64);
204        Ok(data)
205    }
206
207    /// Approximate item count, tracked internally.
208    #[must_use]
209    pub fn len(&self) -> usize {
210        self.len
211    }
212
213    /// Check if the queue is empty.
214    #[must_use]
215    pub fn is_empty(&self) -> bool {
216        self.len == 0
217    }
218
219    /// Drain all items from the queue.
220    ///
221    /// # Errors
222    ///
223    /// Returns an error if an I/O error occurs.
224    pub fn clear(&mut self) -> Result<()> {
225        // yaque has no built-in clear -- drain by committing every item.
226        loop {
227            match self.receiver.try_recv() {
228                Ok(guard) => {
229                    guard
230                        .commit()
231                        .map_err(|e| SpoolError::Queue(e.to_string()))?;
232                }
233                Err(yaque::TryRecvError::QueueEmpty) => break,
234                Err(yaque::TryRecvError::Io(e)) => return Err(SpoolError::Io(e)),
235            }
236        }
237        self.len = 0;
238        #[cfg(feature = "metrics")]
239        ::metrics::gauge!("dfe_spool_queue_depth").set(0.0);
240        Ok(())
241    }
242
243    /// Get the configuration for this spool.
244    #[must_use]
245    pub fn config(&self) -> &SpoolConfig {
246        &self.config
247    }
248
249    /// Get the approximate directory size in bytes.
250    ///
251    /// # Errors
252    ///
253    /// Returns an error if the directory cannot be read.
254    pub fn file_size(&self) -> Result<u64> {
255        let mut total = 0u64;
256        if self.config.path.is_dir() {
257            for entry in std::fs::read_dir(&self.config.path)? {
258                let entry = entry?;
259                if entry.file_type()?.is_file() {
260                    total += entry.metadata()?.len();
261                }
262            }
263        }
264        Ok(total)
265    }
266
267    /// Compress data using zstd.
268    fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
269        zstd::encode_all(data, self.config.compression_level)
270            .map_err(|e| SpoolError::Compression(e.to_string()))
271    }
272}
273
274/// Count items in a yaque queue dir by walking segment files.
275///
276/// yaque stores messages as `[4-byte Hamming header][payload]` in `<n>.q`
277/// segments; receiver position lives in `recv-metadata`. Count from the
278/// receiver position to the end of the highest segment.
279fn count_existing_items(path: &std::path::Path) -> std::io::Result<usize> {
280    if !path.is_dir() {
281        return Ok(0);
282    }
283
284    // Read receiver state from recv-metadata (two big-endian u64: segment, position)
285    let recv_metadata_path = path.join("recv-metadata");
286    let (recv_segment, recv_position) = if recv_metadata_path.exists() {
287        let data = std::fs::read(&recv_metadata_path)?;
288        if data.len() >= 16 {
289            let segment = u64::from_be_bytes(data[0..8].try_into().unwrap_or([0; 8]));
290            let position = u64::from_be_bytes(data[8..16].try_into().unwrap_or([0; 8]));
291            (segment, position)
292        } else {
293            (0, 0)
294        }
295    } else {
296        (0, 0)
297    };
298
299    // Collect all segment numbers
300    let mut segments: Vec<u64> = Vec::new();
301    for entry in std::fs::read_dir(path)? {
302        let entry = entry?;
303        let file_path = entry.path();
304        if file_path.extension().and_then(|e| e.to_str()) == Some("q")
305            && let Some(stem) = file_path.file_stem().and_then(|s| s.to_str())
306            && let Ok(seg_num) = stem.parse::<u64>()
307            && seg_num >= recv_segment
308        {
309            segments.push(seg_num);
310        }
311    }
312    segments.sort_unstable();
313
314    let mut count = 0usize;
315    // Header EOF marker in yaque
316    let header_eof: [u8; 4] = [255, 255, 255, 255];
317
318    for &seg_num in &segments {
319        let seg_path = path.join(format!("{seg_num}.q"));
320        let file_data = std::fs::read(&seg_path)?;
321
322        // Start position: if this is the receiver's segment, skip to receiver position
323        #[allow(clippy::cast_possible_truncation)]
324        let start = if seg_num == recv_segment {
325            recv_position as usize
326        } else {
327            0
328        };
329
330        let mut pos = start;
331        while pos + 4 <= file_data.len() {
332            let header_bytes: [u8; 4] = file_data[pos..pos + 4].try_into().unwrap_or([0; 4]);
333
334            // Check for EOF marker
335            if header_bytes == header_eof {
336                break; // End of segment, move to next
337            }
338
339            // Decode length from Hamming-encoded header (lower 26 bits)
340            let encoded = u32::from_be_bytes(header_bytes);
341            let payload_len = (encoded & 0x03_FF_FF_FF) as usize;
342
343            pos += 4 + payload_len;
344            if pos <= file_data.len() {
345                count += 1;
346            }
347        }
348    }
349
350    Ok(count)
351}
352
353impl std::fmt::Debug for Spool {
354    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
355        f.debug_struct("Spool")
356            .field("path", &self.config.path)
357            .field("len", &self.len)
358            .field("compress", &self.config.compress)
359            .finish_non_exhaustive()
360    }
361}
362
363#[cfg(test)]
364mod tests {
365    use super::*;
366    use tempfile::tempdir;
367
368    #[tokio::test]
369    async fn test_create_and_push_pop() {
370        let dir = tempdir().unwrap();
371        let path = dir.path().join("test-queue");
372
373        let mut spool = Spool::create(&path).await.unwrap();
374        assert!(spool.is_empty());
375
376        spool.push(b"hello").await.unwrap();
377        spool.push(b"world").await.unwrap();
378
379        assert_eq!(spool.len(), 2);
380        assert!(!spool.is_empty());
381
382        assert_eq!(spool.pop_front().await.unwrap(), Some(b"hello".to_vec()));
383        assert_eq!(spool.pop_front().await.unwrap(), Some(b"world".to_vec()));
384
385        assert!(spool.is_empty());
386    }
387
388    #[tokio::test]
389    async fn test_pop_front_empty() {
390        let dir = tempdir().unwrap();
391        let path = dir.path().join("test-queue");
392
393        let mut spool = Spool::create(&path).await.unwrap();
394        assert_eq!(spool.pop_front().await.unwrap(), None);
395    }
396
397    #[tokio::test]
398    async fn test_compression() {
399        let dir = tempdir().unwrap();
400        let path = dir.path().join("test-queue");
401
402        let mut spool = Spool::create_compressed(&path).await.unwrap();
403
404        let data = b"hello world ".repeat(100);
405        spool.push(&data).await.unwrap();
406
407        // Verify decompression works - data comes back correctly
408        let retrieved = spool.pop_front().await.unwrap().unwrap();
409        assert_eq!(retrieved, data);
410    }
411
412    #[tokio::test]
413    async fn test_max_items_limit() {
414        let dir = tempdir().unwrap();
415        let path = dir.path().join("test-queue");
416
417        let config = SpoolConfig::new(&path).max_items(2);
418        let mut spool = Spool::open(config).await.unwrap();
419
420        spool.push(b"one").await.unwrap();
421        spool.push(b"two").await.unwrap();
422
423        let result = spool.push(b"three").await;
424        assert!(matches!(
425            result,
426            Err(SpoolError::MaxItemsReached { max: 2 })
427        ));
428    }
429
430    #[tokio::test]
431    async fn test_clear() {
432        let dir = tempdir().unwrap();
433        let path = dir.path().join("test-queue");
434
435        let mut spool = Spool::create(&path).await.unwrap();
436        spool.push(b"one").await.unwrap();
437        spool.push(b"two").await.unwrap();
438
439        assert_eq!(spool.len(), 2);
440        spool.clear().unwrap();
441        assert!(spool.is_empty());
442    }
443
444    #[tokio::test]
445    async fn test_len_survives_reopen() {
446        let dir = tempdir().unwrap();
447        let path = dir.path().join("test-reopen-queue");
448
449        // Open, push items, then drop
450        {
451            let mut spool = Spool::create(&path).await.unwrap();
452            spool.push(b"one").await.unwrap();
453            spool.push(b"two").await.unwrap();
454            spool.push(b"three").await.unwrap();
455            assert_eq!(spool.len(), 3);
456        }
457
458        // Reopen -- len should reflect existing items
459        {
460            let spool = Spool::create(&path).await.unwrap();
461            assert_eq!(spool.len(), 3);
462        }
463    }
464
465    #[tokio::test]
466    async fn test_len_survives_partial_consume_and_reopen() {
467        let dir = tempdir().unwrap();
468        let path = dir.path().join("test-partial-queue");
469
470        // Open, push 5, consume 2
471        {
472            let mut spool = Spool::create(&path).await.unwrap();
473            for i in 0..5 {
474                spool.push(format!("item-{i}").as_bytes()).await.unwrap();
475            }
476            assert_eq!(spool.len(), 5);
477            spool.pop_front().await.unwrap(); // consume 1
478            spool.pop_front().await.unwrap(); // consume 2
479            assert_eq!(spool.len(), 3);
480        }
481
482        // Reopen -- should show 3 remaining
483        {
484            let spool = Spool::create(&path).await.unwrap();
485            assert_eq!(spool.len(), 3);
486        }
487    }
488
489    #[tokio::test]
490    async fn test_debug_format() {
491        let dir = tempdir().unwrap();
492        let path = dir.path().join("test-queue");
493
494        let spool = Spool::create(&path).await.unwrap();
495        let debug = format!("{spool:?}");
496        assert!(debug.contains("Spool"));
497        assert!(debug.contains("test-queue"));
498    }
499}