hyperi_rustlib/spool/
queue.rs1use crate::spool::{Result, SpoolConfig, SpoolError};
12use std::path::Path;
13use yaque::{Receiver, Sender};
14
15pub struct Spool {
20 sender: Sender,
21 receiver: Receiver,
22 config: SpoolConfig,
23 len: usize,
24}
25
26impl Spool {
27 pub async fn open(config: SpoolConfig) -> Result<Self> {
33 let (sender, receiver) = yaque::channel(&config.path).map_err(|e| SpoolError::Open {
34 path: config.path.display().to_string(),
35 message: e.to_string(),
36 })?;
37
38 let len = count_existing_items(&config.path).unwrap_or(0);
41
42 Ok(Self {
43 sender,
44 receiver,
45 config,
46 len,
47 })
48 }
49
50 pub async fn create(path: impl AsRef<Path>) -> Result<Self> {
56 Self::open(SpoolConfig::new(path.as_ref())).await
57 }
58
59 pub async fn create_compressed(path: impl AsRef<Path>) -> Result<Self> {
65 Self::open(SpoolConfig::with_compression(path.as_ref())).await
66 }
67
68 pub async fn push(&mut self, data: &[u8]) -> Result<()> {
74 if let Some(max) = self.config.max_items
75 && self.len >= max
76 {
77 return Err(SpoolError::MaxItemsReached { max });
78 }
79
80 if let Some(max_bytes) = self.config.max_size_bytes
82 && self.file_size()? >= max_bytes
83 {
84 return Err(SpoolError::MaxSizeReached { max_bytes });
85 }
86
87 let to_write = if self.config.compress {
88 self.compress(data)?
89 } else {
90 data.to_vec()
91 };
92
93 self.sender
94 .send(to_write)
95 .await
96 .map_err(|e| SpoolError::Queue(e.to_string()))?;
97
98 self.len += 1;
99 #[cfg(feature = "metrics")]
100 {
101 ::metrics::gauge!("dfe_spool_queue_depth").set(self.len as f64);
102 ::metrics::counter!("dfe_spool_enqueue_total").increment(1);
104 }
105 Ok(())
106 }
107
108 pub async fn peek(&mut self) -> Result<Option<Vec<u8>>> {
117 match self.receiver.try_recv() {
118 Ok(guard) => {
119 let raw_data = guard.to_vec();
120 let data = if self.config.compress {
121 zstd::decode_all(raw_data.as_slice())
122 .map_err(|e| SpoolError::Decompression(e.to_string()))?
123 } else {
124 raw_data
125 };
126 drop(guard);
128 Ok(Some(data))
129 }
130 Err(yaque::TryRecvError::Io(e)) => Err(SpoolError::Io(e)),
131 Err(yaque::TryRecvError::QueueEmpty) => Ok(None),
132 }
133 }
134
135 pub async fn pop(&mut self) -> Result<()> {
141 match self.receiver.try_recv() {
142 Ok(guard) => {
143 guard
144 .commit()
145 .map_err(|e| SpoolError::Queue(e.to_string()))?;
146 self.len = self.len.saturating_sub(1);
147 Ok(())
148 }
149 Err(yaque::TryRecvError::Io(e)) => Err(SpoolError::Io(e)),
150 Err(yaque::TryRecvError::QueueEmpty) => Ok(()), }
152 }
153
154 pub async fn pop_front(&mut self) -> Result<Option<Vec<u8>>> {
160 match self.receiver.try_recv() {
161 Ok(guard) => {
162 let raw_data = guard.to_vec();
163 let data = if self.config.compress {
164 zstd::decode_all(raw_data.as_slice())
165 .map_err(|e| SpoolError::Decompression(e.to_string()))?
166 } else {
167 raw_data
168 };
169 guard
170 .commit()
171 .map_err(|e| SpoolError::Queue(e.to_string()))?;
172 self.len = self.len.saturating_sub(1);
173 #[cfg(feature = "metrics")]
174 {
175 ::metrics::gauge!("dfe_spool_queue_depth").set(self.len as f64);
176 ::metrics::counter!("dfe_spool_dequeue_total").increment(1);
178 }
179 Ok(Some(data))
180 }
181 Err(yaque::TryRecvError::Io(e)) => Err(SpoolError::Io(e)),
182 Err(yaque::TryRecvError::QueueEmpty) => Ok(None),
183 }
184 }
185
186 pub async fn recv(&mut self) -> Result<Vec<u8>> {
192 let guard = self
193 .receiver
194 .recv()
195 .await
196 .map_err(|e| SpoolError::Queue(e.to_string()))?;
197
198 let raw_data = guard.to_vec();
199 let data = if self.config.compress {
200 zstd::decode_all(raw_data.as_slice())
201 .map_err(|e| SpoolError::Decompression(e.to_string()))?
202 } else {
203 raw_data
204 };
205
206 guard
207 .commit()
208 .map_err(|e| SpoolError::Queue(e.to_string()))?;
209 self.len = self.len.saturating_sub(1);
210 #[cfg(feature = "metrics")]
211 {
212 ::metrics::gauge!("dfe_spool_queue_depth").set(self.len as f64);
213 ::metrics::counter!("dfe_spool_dequeue_total").increment(1);
215 }
216 Ok(data)
217 }
218
219 #[must_use]
221 pub fn len(&self) -> usize {
222 self.len
223 }
224
225 #[must_use]
227 pub fn is_empty(&self) -> bool {
228 self.len == 0
229 }
230
231 pub fn clear(&mut self) -> Result<()> {
237 loop {
239 match self.receiver.try_recv() {
240 Ok(guard) => {
241 guard
242 .commit()
243 .map_err(|e| SpoolError::Queue(e.to_string()))?;
244 }
245 Err(yaque::TryRecvError::QueueEmpty) => break,
246 Err(yaque::TryRecvError::Io(e)) => return Err(SpoolError::Io(e)),
247 }
248 }
249 self.len = 0;
250 #[cfg(feature = "metrics")]
251 ::metrics::gauge!("dfe_spool_queue_depth").set(0.0);
252 Ok(())
253 }
254
255 #[must_use]
257 pub fn config(&self) -> &SpoolConfig {
258 &self.config
259 }
260
261 pub fn file_size(&self) -> Result<u64> {
267 let mut total = 0u64;
268 if self.config.path.is_dir() {
269 for entry in std::fs::read_dir(&self.config.path)? {
270 let entry = entry?;
271 if entry.file_type()?.is_file() {
272 total += entry.metadata()?.len();
273 }
274 }
275 }
276 Ok(total)
277 }
278
279 fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
281 zstd::encode_all(data, self.config.compression_level)
282 .map_err(|e| SpoolError::Compression(e.to_string()))
283 }
284}
285
286fn count_existing_items(path: &std::path::Path) -> std::io::Result<usize> {
292 if !path.is_dir() {
293 return Ok(0);
294 }
295
296 let recv_metadata_path = path.join("recv-metadata");
298 let (recv_segment, recv_position) = if recv_metadata_path.exists() {
299 let data = std::fs::read(&recv_metadata_path)?;
300 if data.len() >= 16 {
301 let segment = u64::from_be_bytes(data[0..8].try_into().unwrap_or([0; 8]));
302 let position = u64::from_be_bytes(data[8..16].try_into().unwrap_or([0; 8]));
303 (segment, position)
304 } else {
305 (0, 0)
306 }
307 } else {
308 (0, 0)
309 };
310
311 let mut segments: Vec<u64> = Vec::new();
313 for entry in std::fs::read_dir(path)? {
314 let entry = entry?;
315 let file_path = entry.path();
316 if file_path.extension().and_then(|e| e.to_str()) == Some("q")
317 && let Some(stem) = file_path.file_stem().and_then(|s| s.to_str())
318 && let Ok(seg_num) = stem.parse::<u64>()
319 && seg_num >= recv_segment
320 {
321 segments.push(seg_num);
322 }
323 }
324 segments.sort_unstable();
325
326 let mut count = 0usize;
327 let header_eof: [u8; 4] = [255, 255, 255, 255];
329
330 for &seg_num in &segments {
331 let seg_path = path.join(format!("{seg_num}.q"));
332 let file_data = std::fs::read(&seg_path)?;
333
334 #[allow(clippy::cast_possible_truncation)]
336 let start = if seg_num == recv_segment {
337 recv_position as usize
338 } else {
339 0
340 };
341
342 let mut pos = start;
343 while pos + 4 <= file_data.len() {
344 let header_bytes: [u8; 4] = file_data[pos..pos + 4].try_into().unwrap_or([0; 4]);
345
346 if header_bytes == header_eof {
348 break; }
350
351 let encoded = u32::from_be_bytes(header_bytes);
353 let payload_len = (encoded & 0x03_FF_FF_FF) as usize;
354
355 pos += 4 + payload_len;
356 if pos <= file_data.len() {
357 count += 1;
358 }
359 }
360 }
361
362 Ok(count)
363}
364
365impl std::fmt::Debug for Spool {
366 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
367 f.debug_struct("Spool")
368 .field("path", &self.config.path)
369 .field("len", &self.len)
370 .field("compress", &self.config.compress)
371 .finish_non_exhaustive()
372 }
373}
374
375#[cfg(test)]
376mod tests {
377 use super::*;
378 use tempfile::tempdir;
379
380 #[tokio::test]
381 async fn test_create_and_push_pop() {
382 let dir = tempdir().unwrap();
383 let path = dir.path().join("test-queue");
384
385 let mut spool = Spool::create(&path).await.unwrap();
386 assert!(spool.is_empty());
387
388 spool.push(b"hello").await.unwrap();
389 spool.push(b"world").await.unwrap();
390
391 assert_eq!(spool.len(), 2);
392 assert!(!spool.is_empty());
393
394 assert_eq!(spool.pop_front().await.unwrap(), Some(b"hello".to_vec()));
395 assert_eq!(spool.pop_front().await.unwrap(), Some(b"world".to_vec()));
396
397 assert!(spool.is_empty());
398 }
399
400 #[tokio::test]
401 async fn test_pop_front_empty() {
402 let dir = tempdir().unwrap();
403 let path = dir.path().join("test-queue");
404
405 let mut spool = Spool::create(&path).await.unwrap();
406 assert_eq!(spool.pop_front().await.unwrap(), None);
407 }
408
409 #[tokio::test]
410 async fn test_compression() {
411 let dir = tempdir().unwrap();
412 let path = dir.path().join("test-queue");
413
414 let mut spool = Spool::create_compressed(&path).await.unwrap();
415
416 let data = b"hello world ".repeat(100);
417 spool.push(&data).await.unwrap();
418
419 let retrieved = spool.pop_front().await.unwrap().unwrap();
421 assert_eq!(retrieved, data);
422 }
423
424 #[tokio::test]
425 async fn test_max_items_limit() {
426 let dir = tempdir().unwrap();
427 let path = dir.path().join("test-queue");
428
429 let config = SpoolConfig::new(&path).max_items(2);
430 let mut spool = Spool::open(config).await.unwrap();
431
432 spool.push(b"one").await.unwrap();
433 spool.push(b"two").await.unwrap();
434
435 let result = spool.push(b"three").await;
436 assert!(matches!(
437 result,
438 Err(SpoolError::MaxItemsReached { max: 2 })
439 ));
440 }
441
442 #[tokio::test]
443 async fn test_clear() {
444 let dir = tempdir().unwrap();
445 let path = dir.path().join("test-queue");
446
447 let mut spool = Spool::create(&path).await.unwrap();
448 spool.push(b"one").await.unwrap();
449 spool.push(b"two").await.unwrap();
450
451 assert_eq!(spool.len(), 2);
452 spool.clear().unwrap();
453 assert!(spool.is_empty());
454 }
455
456 #[tokio::test]
457 async fn test_len_survives_reopen() {
458 let dir = tempdir().unwrap();
459 let path = dir.path().join("test-reopen-queue");
460
461 {
463 let mut spool = Spool::create(&path).await.unwrap();
464 spool.push(b"one").await.unwrap();
465 spool.push(b"two").await.unwrap();
466 spool.push(b"three").await.unwrap();
467 assert_eq!(spool.len(), 3);
468 }
469
470 {
472 let spool = Spool::create(&path).await.unwrap();
473 assert_eq!(spool.len(), 3);
474 }
475 }
476
477 #[tokio::test]
478 async fn test_len_survives_partial_consume_and_reopen() {
479 let dir = tempdir().unwrap();
480 let path = dir.path().join("test-partial-queue");
481
482 {
484 let mut spool = Spool::create(&path).await.unwrap();
485 for i in 0..5 {
486 spool.push(format!("item-{i}").as_bytes()).await.unwrap();
487 }
488 assert_eq!(spool.len(), 5);
489 spool.pop_front().await.unwrap(); spool.pop_front().await.unwrap(); assert_eq!(spool.len(), 3);
492 }
493
494 {
496 let spool = Spool::create(&path).await.unwrap();
497 assert_eq!(spool.len(), 3);
498 }
499 }
500
501 #[tokio::test]
502 async fn test_debug_format() {
503 let dir = tempdir().unwrap();
504 let path = dir.path().join("test-queue");
505
506 let spool = Spool::create(&path).await.unwrap();
507 let debug = format!("{spool:?}");
508 assert!(debug.contains("Spool"));
509 assert!(debug.contains("test-queue"));
510 }
511}