hyperi_rustlib/spool/
queue.rs1use crate::spool::{Result, SpoolConfig, SpoolError};
12use std::path::Path;
13use yaque::{Receiver, Sender};
14
15pub struct Spool {
23 sender: Sender,
24 receiver: Receiver,
25 config: SpoolConfig,
26 len: usize,
27}
28
29impl Spool {
30 pub async fn open(config: SpoolConfig) -> Result<Self> {
39 let (sender, receiver) = yaque::channel(&config.path).map_err(|e| SpoolError::Open {
40 path: config.path.display().to_string(),
41 message: e.to_string(),
42 })?;
43
44 let len = count_existing_items(&config.path).unwrap_or(0);
48
49 Ok(Self {
50 sender,
51 receiver,
52 config,
53 len,
54 })
55 }
56
57 pub async fn create(path: impl AsRef<Path>) -> Result<Self> {
63 Self::open(SpoolConfig::new(path.as_ref())).await
64 }
65
66 pub async fn create_compressed(path: impl AsRef<Path>) -> Result<Self> {
72 Self::open(SpoolConfig::with_compression(path.as_ref())).await
73 }
74
75 pub async fn push(&mut self, data: &[u8]) -> Result<()> {
86 if let Some(max) = self.config.max_items
88 && self.len >= max
89 {
90 return Err(SpoolError::MaxItemsReached { max });
91 }
92
93 if let Some(max_bytes) = self.config.max_size_bytes
95 && self.file_size()? >= max_bytes
96 {
97 return Err(SpoolError::MaxSizeReached { max_bytes });
98 }
99
100 let to_write = if self.config.compress {
101 self.compress(data)?
102 } else {
103 data.to_vec()
104 };
105
106 self.sender
107 .send(to_write)
108 .await
109 .map_err(|e| SpoolError::Queue(e.to_string()))?;
110
111 self.len += 1;
112 #[cfg(feature = "metrics")]
113 ::metrics::gauge!("dfe_spool_queue_depth").set(self.len as f64);
114 Ok(())
115 }
116
117 pub async fn peek(&mut self) -> Result<Option<Vec<u8>>> {
126 match self.receiver.try_recv() {
127 Ok(guard) => {
128 let raw_data = guard.to_vec();
130 let data = if self.config.compress {
131 zstd::decode_all(raw_data.as_slice())
132 .map_err(|e| SpoolError::Decompression(e.to_string()))?
133 } else {
134 raw_data
135 };
136 drop(guard);
138 Ok(Some(data))
139 }
140 Err(yaque::TryRecvError::Io(e)) => Err(SpoolError::Io(e)),
141 Err(yaque::TryRecvError::QueueEmpty) => Ok(None),
142 }
143 }
144
145 pub async fn pop(&mut self) -> Result<()> {
151 match self.receiver.try_recv() {
152 Ok(guard) => {
153 guard
154 .commit()
155 .map_err(|e| SpoolError::Queue(e.to_string()))?;
156 self.len = self.len.saturating_sub(1);
157 Ok(())
158 }
159 Err(yaque::TryRecvError::Io(e)) => Err(SpoolError::Io(e)),
160 Err(yaque::TryRecvError::QueueEmpty) => Ok(()), }
162 }
163
164 pub async fn pop_front(&mut self) -> Result<Option<Vec<u8>>> {
172 match self.receiver.try_recv() {
173 Ok(guard) => {
174 let raw_data = guard.to_vec();
176 let data = if self.config.compress {
177 zstd::decode_all(raw_data.as_slice())
178 .map_err(|e| SpoolError::Decompression(e.to_string()))?
179 } else {
180 raw_data
181 };
182 guard
183 .commit()
184 .map_err(|e| SpoolError::Queue(e.to_string()))?;
185 self.len = self.len.saturating_sub(1);
186 #[cfg(feature = "metrics")]
187 ::metrics::gauge!("dfe_spool_queue_depth").set(self.len as f64);
188 Ok(Some(data))
189 }
190 Err(yaque::TryRecvError::Io(e)) => Err(SpoolError::Io(e)),
191 Err(yaque::TryRecvError::QueueEmpty) => Ok(None),
192 }
193 }
194
195 pub async fn recv(&mut self) -> Result<Vec<u8>> {
203 let guard = self
204 .receiver
205 .recv()
206 .await
207 .map_err(|e| SpoolError::Queue(e.to_string()))?;
208
209 let raw_data = guard.to_vec();
211 let data = if self.config.compress {
212 zstd::decode_all(raw_data.as_slice())
213 .map_err(|e| SpoolError::Decompression(e.to_string()))?
214 } else {
215 raw_data
216 };
217
218 guard
219 .commit()
220 .map_err(|e| SpoolError::Queue(e.to_string()))?;
221 self.len = self.len.saturating_sub(1);
222 #[cfg(feature = "metrics")]
223 ::metrics::gauge!("dfe_spool_queue_depth").set(self.len as f64);
224 Ok(data)
225 }
226
227 #[must_use]
232 pub fn len(&self) -> usize {
233 self.len
234 }
235
236 #[must_use]
238 pub fn is_empty(&self) -> bool {
239 self.len == 0
240 }
241
242 pub fn clear(&mut self) -> Result<()> {
250 loop {
253 match self.receiver.try_recv() {
254 Ok(guard) => {
255 guard
256 .commit()
257 .map_err(|e| SpoolError::Queue(e.to_string()))?;
258 }
259 Err(yaque::TryRecvError::QueueEmpty) => break,
260 Err(yaque::TryRecvError::Io(e)) => return Err(SpoolError::Io(e)),
261 }
262 }
263 self.len = 0;
264 #[cfg(feature = "metrics")]
265 ::metrics::gauge!("dfe_spool_queue_depth").set(0.0);
266 Ok(())
267 }
268
269 #[must_use]
271 pub fn config(&self) -> &SpoolConfig {
272 &self.config
273 }
274
275 pub fn file_size(&self) -> Result<u64> {
281 let mut total = 0u64;
282 if self.config.path.is_dir() {
283 for entry in std::fs::read_dir(&self.config.path)? {
284 let entry = entry?;
285 if entry.file_type()?.is_file() {
286 total += entry.metadata()?.len();
287 }
288 }
289 }
290 Ok(total)
291 }
292
293 fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
295 zstd::encode_all(data, self.config.compression_level)
296 .map_err(|e| SpoolError::Compression(e.to_string()))
297 }
298}
299
300fn count_existing_items(path: &std::path::Path) -> std::io::Result<usize> {
306 if !path.is_dir() {
307 return Ok(0);
308 }
309
310 let recv_metadata_path = path.join("recv-metadata");
312 let (recv_segment, recv_position) = if recv_metadata_path.exists() {
313 let data = std::fs::read(&recv_metadata_path)?;
314 if data.len() >= 16 {
315 let segment = u64::from_be_bytes(data[0..8].try_into().unwrap_or([0; 8]));
316 let position = u64::from_be_bytes(data[8..16].try_into().unwrap_or([0; 8]));
317 (segment, position)
318 } else {
319 (0, 0)
320 }
321 } else {
322 (0, 0)
323 };
324
325 let mut segments: Vec<u64> = Vec::new();
327 for entry in std::fs::read_dir(path)? {
328 let entry = entry?;
329 let file_path = entry.path();
330 if file_path.extension().and_then(|e| e.to_str()) == Some("q")
331 && let Some(stem) = file_path.file_stem().and_then(|s| s.to_str())
332 && let Ok(seg_num) = stem.parse::<u64>()
333 && seg_num >= recv_segment
334 {
335 segments.push(seg_num);
336 }
337 }
338 segments.sort_unstable();
339
340 let mut count = 0usize;
341 let header_eof: [u8; 4] = [255, 255, 255, 255];
343
344 for &seg_num in &segments {
345 let seg_path = path.join(format!("{seg_num}.q"));
346 let file_data = std::fs::read(&seg_path)?;
347
348 #[allow(clippy::cast_possible_truncation)]
350 let start = if seg_num == recv_segment {
351 recv_position as usize
352 } else {
353 0
354 };
355
356 let mut pos = start;
357 while pos + 4 <= file_data.len() {
358 let header_bytes: [u8; 4] = file_data[pos..pos + 4].try_into().unwrap_or([0; 4]);
359
360 if header_bytes == header_eof {
362 break; }
364
365 let encoded = u32::from_be_bytes(header_bytes);
367 let payload_len = (encoded & 0x03_FF_FF_FF) as usize;
368
369 pos += 4 + payload_len;
370 if pos <= file_data.len() {
371 count += 1;
372 }
373 }
374 }
375
376 Ok(count)
377}
378
379impl std::fmt::Debug for Spool {
380 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
381 f.debug_struct("Spool")
382 .field("path", &self.config.path)
383 .field("len", &self.len)
384 .field("compress", &self.config.compress)
385 .finish_non_exhaustive()
386 }
387}
388
389#[cfg(test)]
390mod tests {
391 use super::*;
392 use tempfile::tempdir;
393
394 #[tokio::test]
395 async fn test_create_and_push_pop() {
396 let dir = tempdir().unwrap();
397 let path = dir.path().join("test-queue");
398
399 let mut spool = Spool::create(&path).await.unwrap();
400 assert!(spool.is_empty());
401
402 spool.push(b"hello").await.unwrap();
403 spool.push(b"world").await.unwrap();
404
405 assert_eq!(spool.len(), 2);
406 assert!(!spool.is_empty());
407
408 assert_eq!(spool.pop_front().await.unwrap(), Some(b"hello".to_vec()));
409 assert_eq!(spool.pop_front().await.unwrap(), Some(b"world".to_vec()));
410
411 assert!(spool.is_empty());
412 }
413
414 #[tokio::test]
415 async fn test_pop_front_empty() {
416 let dir = tempdir().unwrap();
417 let path = dir.path().join("test-queue");
418
419 let mut spool = Spool::create(&path).await.unwrap();
420 assert_eq!(spool.pop_front().await.unwrap(), None);
421 }
422
423 #[tokio::test]
424 async fn test_compression() {
425 let dir = tempdir().unwrap();
426 let path = dir.path().join("test-queue");
427
428 let mut spool = Spool::create_compressed(&path).await.unwrap();
429
430 let data = b"hello world ".repeat(100);
431 spool.push(&data).await.unwrap();
432
433 let retrieved = spool.pop_front().await.unwrap().unwrap();
435 assert_eq!(retrieved, data);
436 }
437
438 #[tokio::test]
439 async fn test_max_items_limit() {
440 let dir = tempdir().unwrap();
441 let path = dir.path().join("test-queue");
442
443 let config = SpoolConfig::new(&path).max_items(2);
444 let mut spool = Spool::open(config).await.unwrap();
445
446 spool.push(b"one").await.unwrap();
447 spool.push(b"two").await.unwrap();
448
449 let result = spool.push(b"three").await;
450 assert!(matches!(
451 result,
452 Err(SpoolError::MaxItemsReached { max: 2 })
453 ));
454 }
455
456 #[tokio::test]
457 async fn test_clear() {
458 let dir = tempdir().unwrap();
459 let path = dir.path().join("test-queue");
460
461 let mut spool = Spool::create(&path).await.unwrap();
462 spool.push(b"one").await.unwrap();
463 spool.push(b"two").await.unwrap();
464
465 assert_eq!(spool.len(), 2);
466 spool.clear().unwrap();
467 assert!(spool.is_empty());
468 }
469
470 #[tokio::test]
471 async fn test_len_survives_reopen() {
472 let dir = tempdir().unwrap();
473 let path = dir.path().join("test-reopen-queue");
474
475 {
477 let mut spool = Spool::create(&path).await.unwrap();
478 spool.push(b"one").await.unwrap();
479 spool.push(b"two").await.unwrap();
480 spool.push(b"three").await.unwrap();
481 assert_eq!(spool.len(), 3);
482 }
483
484 {
486 let spool = Spool::create(&path).await.unwrap();
487 assert_eq!(spool.len(), 3);
488 }
489 }
490
491 #[tokio::test]
492 async fn test_len_survives_partial_consume_and_reopen() {
493 let dir = tempdir().unwrap();
494 let path = dir.path().join("test-partial-queue");
495
496 {
498 let mut spool = Spool::create(&path).await.unwrap();
499 for i in 0..5 {
500 spool.push(format!("item-{i}").as_bytes()).await.unwrap();
501 }
502 assert_eq!(spool.len(), 5);
503 spool.pop_front().await.unwrap(); spool.pop_front().await.unwrap(); assert_eq!(spool.len(), 3);
506 }
507
508 {
510 let spool = Spool::create(&path).await.unwrap();
511 assert_eq!(spool.len(), 3);
512 }
513 }
514
515 #[tokio::test]
516 async fn test_debug_format() {
517 let dir = tempdir().unwrap();
518 let path = dir.path().join("test-queue");
519
520 let spool = Spool::create(&path).await.unwrap();
521 let debug = format!("{spool:?}");
522 assert!(debug.contains("Spool"));
523 assert!(debug.contains("test-queue"));
524 }
525}