Skip to main content

hyperi_rustlib/spool/
queue.rs

1// Project:   hyperi-rustlib
2// File:      src/spool/queue.rs
3// Purpose:   Disk-backed async FIFO queue implementation using yaque
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Disk-backed async FIFO queue implementation.
10
11use crate::spool::{Result, SpoolConfig, SpoolError};
12use std::path::Path;
13use yaque::{Receiver, Sender};
14
15/// Disk-backed async FIFO queue with optional compression.
16///
17/// Crash-safe writes, survives restarts. Built on
18/// [yaque](https://crates.io/crates/yaque) (transactional persistent queue).
19pub struct Spool {
20    sender: Sender,
21    receiver: Receiver,
22    config: SpoolConfig,
23    len: usize,
24}
25
26impl Spool {
27    /// Open the queue at the configured path, creating it if absent.
28    ///
29    /// # Errors
30    ///
31    /// Returns an error if the queue cannot be opened or created.
32    pub async fn open(config: SpoolConfig) -> Result<Self> {
33        let (sender, receiver) = yaque::channel(&config.path).map_err(|e| SpoolError::Open {
34            path: config.path.display().to_string(),
35            message: e.to_string(),
36        })?;
37
38        // yaque exposes no count API -- parse segment files to count items
39        // between the receiver position and the end.
40        let len = count_existing_items(&config.path).unwrap_or(0);
41
42        Ok(Self {
43            sender,
44            receiver,
45            config,
46            len,
47        })
48    }
49
50    /// Create a new spool at the given path with default settings.
51    ///
52    /// # Errors
53    ///
54    /// Returns an error if the queue cannot be created.
55    pub async fn create(path: impl AsRef<Path>) -> Result<Self> {
56        Self::open(SpoolConfig::new(path.as_ref())).await
57    }
58
59    /// Create a new spool with compression enabled.
60    ///
61    /// # Errors
62    ///
63    /// Returns an error if the queue cannot be created.
64    pub async fn create_compressed(path: impl AsRef<Path>) -> Result<Self> {
65        Self::open(SpoolConfig::with_compression(path.as_ref())).await
66    }
67
68    /// Push data onto the queue, compressing first if enabled.
69    ///
70    /// # Errors
71    ///
72    /// Errors on size/item-count cap, compression failure, or I/O.
73    pub async fn push(&mut self, data: &[u8]) -> Result<()> {
74        if let Some(max) = self.config.max_items
75            && self.len >= max
76        {
77            return Err(SpoolError::MaxItemsReached { max });
78        }
79
80        // Approximate -- checked before write.
81        if let Some(max_bytes) = self.config.max_size_bytes
82            && self.file_size()? >= max_bytes
83        {
84            return Err(SpoolError::MaxSizeReached { max_bytes });
85        }
86
87        let to_write = if self.config.compress {
88            self.compress(data)?
89        } else {
90            data.to_vec()
91        };
92
93        self.sender
94            .send(to_write)
95            .await
96            .map_err(|e| SpoolError::Queue(e.to_string()))?;
97
98        self.len += 1;
99        #[cfg(feature = "metrics")]
100        {
101            ::metrics::gauge!("dfe_spool_queue_depth").set(self.len as f64);
102            // New default (metrics audit): spill RATE. drain < enqueue => backlog.
103            ::metrics::counter!("dfe_spool_enqueue_total").increment(1);
104        }
105        Ok(())
106    }
107
108    /// Peek at the first item without removing it.
109    ///
110    /// yaque has no direct peek -- `try_recv` then let the guard roll back
111    /// on drop to leave the item in the queue.
112    ///
113    /// # Errors
114    ///
115    /// Returns an error if decompression fails or an I/O error occurs.
116    pub async fn peek(&mut self) -> Result<Option<Vec<u8>>> {
117        match self.receiver.try_recv() {
118            Ok(guard) => {
119                let raw_data = guard.to_vec();
120                let data = if self.config.compress {
121                    zstd::decode_all(raw_data.as_slice())
122                        .map_err(|e| SpoolError::Decompression(e.to_string()))?
123                } else {
124                    raw_data
125                };
126                // No commit -- guard rollback on drop keeps the item.
127                drop(guard);
128                Ok(Some(data))
129            }
130            Err(yaque::TryRecvError::Io(e)) => Err(SpoolError::Io(e)),
131            Err(yaque::TryRecvError::QueueEmpty) => Ok(None),
132        }
133    }
134
135    /// Remove the first item from the queue.
136    ///
137    /// # Errors
138    ///
139    /// Returns an error if an I/O error occurs.
140    pub async fn pop(&mut self) -> Result<()> {
141        match self.receiver.try_recv() {
142            Ok(guard) => {
143                guard
144                    .commit()
145                    .map_err(|e| SpoolError::Queue(e.to_string()))?;
146                self.len = self.len.saturating_sub(1);
147                Ok(())
148            }
149            Err(yaque::TryRecvError::Io(e)) => Err(SpoolError::Io(e)),
150            Err(yaque::TryRecvError::QueueEmpty) => Ok(()), // Nothing to pop
151        }
152    }
153
154    /// Pop and return the first item, atomically receiving and removing it.
155    ///
156    /// # Errors
157    ///
158    /// Returns an error if decompression fails or an I/O error occurs.
159    pub async fn pop_front(&mut self) -> Result<Option<Vec<u8>>> {
160        match self.receiver.try_recv() {
161            Ok(guard) => {
162                let raw_data = guard.to_vec();
163                let data = if self.config.compress {
164                    zstd::decode_all(raw_data.as_slice())
165                        .map_err(|e| SpoolError::Decompression(e.to_string()))?
166                } else {
167                    raw_data
168                };
169                guard
170                    .commit()
171                    .map_err(|e| SpoolError::Queue(e.to_string()))?;
172                self.len = self.len.saturating_sub(1);
173                #[cfg(feature = "metrics")]
174                {
175                    ::metrics::gauge!("dfe_spool_queue_depth").set(self.len as f64);
176                    // New default (metrics audit): drain RATE.
177                    ::metrics::counter!("dfe_spool_dequeue_total").increment(1);
178                }
179                Ok(Some(data))
180            }
181            Err(yaque::TryRecvError::Io(e)) => Err(SpoolError::Io(e)),
182            Err(yaque::TryRecvError::QueueEmpty) => Ok(None),
183        }
184    }
185
186    /// Receive an item, awaiting if the queue is empty. Preferred consumer API.
187    ///
188    /// # Errors
189    ///
190    /// Returns an error if decompression fails or an I/O error occurs.
191    pub async fn recv(&mut self) -> Result<Vec<u8>> {
192        let guard = self
193            .receiver
194            .recv()
195            .await
196            .map_err(|e| SpoolError::Queue(e.to_string()))?;
197
198        let raw_data = guard.to_vec();
199        let data = if self.config.compress {
200            zstd::decode_all(raw_data.as_slice())
201                .map_err(|e| SpoolError::Decompression(e.to_string()))?
202        } else {
203            raw_data
204        };
205
206        guard
207            .commit()
208            .map_err(|e| SpoolError::Queue(e.to_string()))?;
209        self.len = self.len.saturating_sub(1);
210        #[cfg(feature = "metrics")]
211        {
212            ::metrics::gauge!("dfe_spool_queue_depth").set(self.len as f64);
213            // New default (metrics audit): drain RATE.
214            ::metrics::counter!("dfe_spool_dequeue_total").increment(1);
215        }
216        Ok(data)
217    }
218
219    /// Approximate item count, tracked internally.
220    #[must_use]
221    pub fn len(&self) -> usize {
222        self.len
223    }
224
225    /// Check if the queue is empty.
226    #[must_use]
227    pub fn is_empty(&self) -> bool {
228        self.len == 0
229    }
230
231    /// Drain all items from the queue.
232    ///
233    /// # Errors
234    ///
235    /// Returns an error if an I/O error occurs.
236    pub fn clear(&mut self) -> Result<()> {
237        // yaque has no built-in clear -- drain by committing every item.
238        loop {
239            match self.receiver.try_recv() {
240                Ok(guard) => {
241                    guard
242                        .commit()
243                        .map_err(|e| SpoolError::Queue(e.to_string()))?;
244                }
245                Err(yaque::TryRecvError::QueueEmpty) => break,
246                Err(yaque::TryRecvError::Io(e)) => return Err(SpoolError::Io(e)),
247            }
248        }
249        self.len = 0;
250        #[cfg(feature = "metrics")]
251        ::metrics::gauge!("dfe_spool_queue_depth").set(0.0);
252        Ok(())
253    }
254
255    /// Get the configuration for this spool.
256    #[must_use]
257    pub fn config(&self) -> &SpoolConfig {
258        &self.config
259    }
260
261    /// Get the approximate directory size in bytes.
262    ///
263    /// # Errors
264    ///
265    /// Returns an error if the directory cannot be read.
266    pub fn file_size(&self) -> Result<u64> {
267        let mut total = 0u64;
268        if self.config.path.is_dir() {
269            for entry in std::fs::read_dir(&self.config.path)? {
270                let entry = entry?;
271                if entry.file_type()?.is_file() {
272                    total += entry.metadata()?.len();
273                }
274            }
275        }
276        Ok(total)
277    }
278
279    /// Compress data using zstd.
280    fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
281        zstd::encode_all(data, self.config.compression_level)
282            .map_err(|e| SpoolError::Compression(e.to_string()))
283    }
284}
285
286/// Count items in a yaque queue dir by walking segment files.
287///
288/// yaque stores messages as `[4-byte Hamming header][payload]` in `<n>.q`
289/// segments; receiver position lives in `recv-metadata`. Count from the
290/// receiver position to the end of the highest segment.
291fn count_existing_items(path: &std::path::Path) -> std::io::Result<usize> {
292    if !path.is_dir() {
293        return Ok(0);
294    }
295
296    // Read receiver state from recv-metadata (two big-endian u64: segment, position)
297    let recv_metadata_path = path.join("recv-metadata");
298    let (recv_segment, recv_position) = if recv_metadata_path.exists() {
299        let data = std::fs::read(&recv_metadata_path)?;
300        if data.len() >= 16 {
301            let segment = u64::from_be_bytes(data[0..8].try_into().unwrap_or([0; 8]));
302            let position = u64::from_be_bytes(data[8..16].try_into().unwrap_or([0; 8]));
303            (segment, position)
304        } else {
305            (0, 0)
306        }
307    } else {
308        (0, 0)
309    };
310
311    // Collect all segment numbers
312    let mut segments: Vec<u64> = Vec::new();
313    for entry in std::fs::read_dir(path)? {
314        let entry = entry?;
315        let file_path = entry.path();
316        if file_path.extension().and_then(|e| e.to_str()) == Some("q")
317            && let Some(stem) = file_path.file_stem().and_then(|s| s.to_str())
318            && let Ok(seg_num) = stem.parse::<u64>()
319            && seg_num >= recv_segment
320        {
321            segments.push(seg_num);
322        }
323    }
324    segments.sort_unstable();
325
326    let mut count = 0usize;
327    // Header EOF marker in yaque
328    let header_eof: [u8; 4] = [255, 255, 255, 255];
329
330    for &seg_num in &segments {
331        let seg_path = path.join(format!("{seg_num}.q"));
332        let file_data = std::fs::read(&seg_path)?;
333
334        // Start position: if this is the receiver's segment, skip to receiver position
335        #[allow(clippy::cast_possible_truncation)]
336        let start = if seg_num == recv_segment {
337            recv_position as usize
338        } else {
339            0
340        };
341
342        let mut pos = start;
343        while pos + 4 <= file_data.len() {
344            let header_bytes: [u8; 4] = file_data[pos..pos + 4].try_into().unwrap_or([0; 4]);
345
346            // Check for EOF marker
347            if header_bytes == header_eof {
348                break; // End of segment, move to next
349            }
350
351            // Decode length from Hamming-encoded header (lower 26 bits)
352            let encoded = u32::from_be_bytes(header_bytes);
353            let payload_len = (encoded & 0x03_FF_FF_FF) as usize;
354
355            pos += 4 + payload_len;
356            if pos <= file_data.len() {
357                count += 1;
358            }
359        }
360    }
361
362    Ok(count)
363}
364
365impl std::fmt::Debug for Spool {
366    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
367        f.debug_struct("Spool")
368            .field("path", &self.config.path)
369            .field("len", &self.len)
370            .field("compress", &self.config.compress)
371            .finish_non_exhaustive()
372    }
373}
374
375#[cfg(test)]
376mod tests {
377    use super::*;
378    use tempfile::tempdir;
379
380    #[tokio::test]
381    async fn test_create_and_push_pop() {
382        let dir = tempdir().unwrap();
383        let path = dir.path().join("test-queue");
384
385        let mut spool = Spool::create(&path).await.unwrap();
386        assert!(spool.is_empty());
387
388        spool.push(b"hello").await.unwrap();
389        spool.push(b"world").await.unwrap();
390
391        assert_eq!(spool.len(), 2);
392        assert!(!spool.is_empty());
393
394        assert_eq!(spool.pop_front().await.unwrap(), Some(b"hello".to_vec()));
395        assert_eq!(spool.pop_front().await.unwrap(), Some(b"world".to_vec()));
396
397        assert!(spool.is_empty());
398    }
399
400    #[tokio::test]
401    async fn test_pop_front_empty() {
402        let dir = tempdir().unwrap();
403        let path = dir.path().join("test-queue");
404
405        let mut spool = Spool::create(&path).await.unwrap();
406        assert_eq!(spool.pop_front().await.unwrap(), None);
407    }
408
409    #[tokio::test]
410    async fn test_compression() {
411        let dir = tempdir().unwrap();
412        let path = dir.path().join("test-queue");
413
414        let mut spool = Spool::create_compressed(&path).await.unwrap();
415
416        let data = b"hello world ".repeat(100);
417        spool.push(&data).await.unwrap();
418
419        // Verify decompression works - data comes back correctly
420        let retrieved = spool.pop_front().await.unwrap().unwrap();
421        assert_eq!(retrieved, data);
422    }
423
424    #[tokio::test]
425    async fn test_max_items_limit() {
426        let dir = tempdir().unwrap();
427        let path = dir.path().join("test-queue");
428
429        let config = SpoolConfig::new(&path).max_items(2);
430        let mut spool = Spool::open(config).await.unwrap();
431
432        spool.push(b"one").await.unwrap();
433        spool.push(b"two").await.unwrap();
434
435        let result = spool.push(b"three").await;
436        assert!(matches!(
437            result,
438            Err(SpoolError::MaxItemsReached { max: 2 })
439        ));
440    }
441
442    #[tokio::test]
443    async fn test_clear() {
444        let dir = tempdir().unwrap();
445        let path = dir.path().join("test-queue");
446
447        let mut spool = Spool::create(&path).await.unwrap();
448        spool.push(b"one").await.unwrap();
449        spool.push(b"two").await.unwrap();
450
451        assert_eq!(spool.len(), 2);
452        spool.clear().unwrap();
453        assert!(spool.is_empty());
454    }
455
456    #[tokio::test]
457    async fn test_len_survives_reopen() {
458        let dir = tempdir().unwrap();
459        let path = dir.path().join("test-reopen-queue");
460
461        // Open, push items, then drop
462        {
463            let mut spool = Spool::create(&path).await.unwrap();
464            spool.push(b"one").await.unwrap();
465            spool.push(b"two").await.unwrap();
466            spool.push(b"three").await.unwrap();
467            assert_eq!(spool.len(), 3);
468        }
469
470        // Reopen -- len should reflect existing items
471        {
472            let spool = Spool::create(&path).await.unwrap();
473            assert_eq!(spool.len(), 3);
474        }
475    }
476
477    #[tokio::test]
478    async fn test_len_survives_partial_consume_and_reopen() {
479        let dir = tempdir().unwrap();
480        let path = dir.path().join("test-partial-queue");
481
482        // Open, push 5, consume 2
483        {
484            let mut spool = Spool::create(&path).await.unwrap();
485            for i in 0..5 {
486                spool.push(format!("item-{i}").as_bytes()).await.unwrap();
487            }
488            assert_eq!(spool.len(), 5);
489            spool.pop_front().await.unwrap(); // consume 1
490            spool.pop_front().await.unwrap(); // consume 2
491            assert_eq!(spool.len(), 3);
492        }
493
494        // Reopen -- should show 3 remaining
495        {
496            let spool = Spool::create(&path).await.unwrap();
497            assert_eq!(spool.len(), 3);
498        }
499    }
500
501    #[tokio::test]
502    async fn test_debug_format() {
503        let dir = tempdir().unwrap();
504        let path = dir.path().join("test-queue");
505
506        let spool = Spool::create(&path).await.unwrap();
507        let debug = format!("{spool:?}");
508        assert!(debug.contains("Spool"));
509        assert!(debug.contains("test-queue"));
510    }
511}