hyperi_rustlib/spool/
queue.rs1use crate::spool::{Result, SpoolConfig, SpoolError};
12use std::path::Path;
13use yaque::{Receiver, Sender};
14
15pub struct Spool {
20 sender: Sender,
21 receiver: Receiver,
22 config: SpoolConfig,
23 len: usize,
24}
25
26impl Spool {
27 pub async fn open(config: SpoolConfig) -> Result<Self> {
33 let (sender, receiver) = yaque::channel(&config.path).map_err(|e| SpoolError::Open {
34 path: config.path.display().to_string(),
35 message: e.to_string(),
36 })?;
37
38 let len = count_existing_items(&config.path).unwrap_or(0);
41
42 Ok(Self {
43 sender,
44 receiver,
45 config,
46 len,
47 })
48 }
49
50 pub async fn create(path: impl AsRef<Path>) -> Result<Self> {
56 Self::open(SpoolConfig::new(path.as_ref())).await
57 }
58
59 pub async fn create_compressed(path: impl AsRef<Path>) -> Result<Self> {
65 Self::open(SpoolConfig::with_compression(path.as_ref())).await
66 }
67
68 pub async fn push(&mut self, data: &[u8]) -> Result<()> {
74 if let Some(max) = self.config.max_items
75 && self.len >= max
76 {
77 return Err(SpoolError::MaxItemsReached { max });
78 }
79
80 if let Some(max_bytes) = self.config.max_size_bytes
82 && self.file_size()? >= max_bytes
83 {
84 return Err(SpoolError::MaxSizeReached { max_bytes });
85 }
86
87 let to_write = if self.config.compress {
88 self.compress(data)?
89 } else {
90 data.to_vec()
91 };
92
93 self.sender
94 .send(to_write)
95 .await
96 .map_err(|e| SpoolError::Queue(e.to_string()))?;
97
98 self.len += 1;
99 #[cfg(feature = "metrics")]
100 ::metrics::gauge!("dfe_spool_queue_depth").set(self.len as f64);
101 Ok(())
102 }
103
104 pub async fn peek(&mut self) -> Result<Option<Vec<u8>>> {
113 match self.receiver.try_recv() {
114 Ok(guard) => {
115 let raw_data = guard.to_vec();
116 let data = if self.config.compress {
117 zstd::decode_all(raw_data.as_slice())
118 .map_err(|e| SpoolError::Decompression(e.to_string()))?
119 } else {
120 raw_data
121 };
122 drop(guard);
124 Ok(Some(data))
125 }
126 Err(yaque::TryRecvError::Io(e)) => Err(SpoolError::Io(e)),
127 Err(yaque::TryRecvError::QueueEmpty) => Ok(None),
128 }
129 }
130
131 pub async fn pop(&mut self) -> Result<()> {
137 match self.receiver.try_recv() {
138 Ok(guard) => {
139 guard
140 .commit()
141 .map_err(|e| SpoolError::Queue(e.to_string()))?;
142 self.len = self.len.saturating_sub(1);
143 Ok(())
144 }
145 Err(yaque::TryRecvError::Io(e)) => Err(SpoolError::Io(e)),
146 Err(yaque::TryRecvError::QueueEmpty) => Ok(()), }
148 }
149
150 pub async fn pop_front(&mut self) -> Result<Option<Vec<u8>>> {
156 match self.receiver.try_recv() {
157 Ok(guard) => {
158 let raw_data = guard.to_vec();
159 let data = if self.config.compress {
160 zstd::decode_all(raw_data.as_slice())
161 .map_err(|e| SpoolError::Decompression(e.to_string()))?
162 } else {
163 raw_data
164 };
165 guard
166 .commit()
167 .map_err(|e| SpoolError::Queue(e.to_string()))?;
168 self.len = self.len.saturating_sub(1);
169 #[cfg(feature = "metrics")]
170 ::metrics::gauge!("dfe_spool_queue_depth").set(self.len as f64);
171 Ok(Some(data))
172 }
173 Err(yaque::TryRecvError::Io(e)) => Err(SpoolError::Io(e)),
174 Err(yaque::TryRecvError::QueueEmpty) => Ok(None),
175 }
176 }
177
178 pub async fn recv(&mut self) -> Result<Vec<u8>> {
184 let guard = self
185 .receiver
186 .recv()
187 .await
188 .map_err(|e| SpoolError::Queue(e.to_string()))?;
189
190 let raw_data = guard.to_vec();
191 let data = if self.config.compress {
192 zstd::decode_all(raw_data.as_slice())
193 .map_err(|e| SpoolError::Decompression(e.to_string()))?
194 } else {
195 raw_data
196 };
197
198 guard
199 .commit()
200 .map_err(|e| SpoolError::Queue(e.to_string()))?;
201 self.len = self.len.saturating_sub(1);
202 #[cfg(feature = "metrics")]
203 ::metrics::gauge!("dfe_spool_queue_depth").set(self.len as f64);
204 Ok(data)
205 }
206
207 #[must_use]
209 pub fn len(&self) -> usize {
210 self.len
211 }
212
213 #[must_use]
215 pub fn is_empty(&self) -> bool {
216 self.len == 0
217 }
218
219 pub fn clear(&mut self) -> Result<()> {
225 loop {
227 match self.receiver.try_recv() {
228 Ok(guard) => {
229 guard
230 .commit()
231 .map_err(|e| SpoolError::Queue(e.to_string()))?;
232 }
233 Err(yaque::TryRecvError::QueueEmpty) => break,
234 Err(yaque::TryRecvError::Io(e)) => return Err(SpoolError::Io(e)),
235 }
236 }
237 self.len = 0;
238 #[cfg(feature = "metrics")]
239 ::metrics::gauge!("dfe_spool_queue_depth").set(0.0);
240 Ok(())
241 }
242
243 #[must_use]
245 pub fn config(&self) -> &SpoolConfig {
246 &self.config
247 }
248
249 pub fn file_size(&self) -> Result<u64> {
255 let mut total = 0u64;
256 if self.config.path.is_dir() {
257 for entry in std::fs::read_dir(&self.config.path)? {
258 let entry = entry?;
259 if entry.file_type()?.is_file() {
260 total += entry.metadata()?.len();
261 }
262 }
263 }
264 Ok(total)
265 }
266
267 fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
269 zstd::encode_all(data, self.config.compression_level)
270 .map_err(|e| SpoolError::Compression(e.to_string()))
271 }
272}
273
274fn count_existing_items(path: &std::path::Path) -> std::io::Result<usize> {
280 if !path.is_dir() {
281 return Ok(0);
282 }
283
284 let recv_metadata_path = path.join("recv-metadata");
286 let (recv_segment, recv_position) = if recv_metadata_path.exists() {
287 let data = std::fs::read(&recv_metadata_path)?;
288 if data.len() >= 16 {
289 let segment = u64::from_be_bytes(data[0..8].try_into().unwrap_or([0; 8]));
290 let position = u64::from_be_bytes(data[8..16].try_into().unwrap_or([0; 8]));
291 (segment, position)
292 } else {
293 (0, 0)
294 }
295 } else {
296 (0, 0)
297 };
298
299 let mut segments: Vec<u64> = Vec::new();
301 for entry in std::fs::read_dir(path)? {
302 let entry = entry?;
303 let file_path = entry.path();
304 if file_path.extension().and_then(|e| e.to_str()) == Some("q")
305 && let Some(stem) = file_path.file_stem().and_then(|s| s.to_str())
306 && let Ok(seg_num) = stem.parse::<u64>()
307 && seg_num >= recv_segment
308 {
309 segments.push(seg_num);
310 }
311 }
312 segments.sort_unstable();
313
314 let mut count = 0usize;
315 let header_eof: [u8; 4] = [255, 255, 255, 255];
317
318 for &seg_num in &segments {
319 let seg_path = path.join(format!("{seg_num}.q"));
320 let file_data = std::fs::read(&seg_path)?;
321
322 #[allow(clippy::cast_possible_truncation)]
324 let start = if seg_num == recv_segment {
325 recv_position as usize
326 } else {
327 0
328 };
329
330 let mut pos = start;
331 while pos + 4 <= file_data.len() {
332 let header_bytes: [u8; 4] = file_data[pos..pos + 4].try_into().unwrap_or([0; 4]);
333
334 if header_bytes == header_eof {
336 break; }
338
339 let encoded = u32::from_be_bytes(header_bytes);
341 let payload_len = (encoded & 0x03_FF_FF_FF) as usize;
342
343 pos += 4 + payload_len;
344 if pos <= file_data.len() {
345 count += 1;
346 }
347 }
348 }
349
350 Ok(count)
351}
352
353impl std::fmt::Debug for Spool {
354 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
355 f.debug_struct("Spool")
356 .field("path", &self.config.path)
357 .field("len", &self.len)
358 .field("compress", &self.config.compress)
359 .finish_non_exhaustive()
360 }
361}
362
363#[cfg(test)]
364mod tests {
365 use super::*;
366 use tempfile::tempdir;
367
368 #[tokio::test]
369 async fn test_create_and_push_pop() {
370 let dir = tempdir().unwrap();
371 let path = dir.path().join("test-queue");
372
373 let mut spool = Spool::create(&path).await.unwrap();
374 assert!(spool.is_empty());
375
376 spool.push(b"hello").await.unwrap();
377 spool.push(b"world").await.unwrap();
378
379 assert_eq!(spool.len(), 2);
380 assert!(!spool.is_empty());
381
382 assert_eq!(spool.pop_front().await.unwrap(), Some(b"hello".to_vec()));
383 assert_eq!(spool.pop_front().await.unwrap(), Some(b"world".to_vec()));
384
385 assert!(spool.is_empty());
386 }
387
388 #[tokio::test]
389 async fn test_pop_front_empty() {
390 let dir = tempdir().unwrap();
391 let path = dir.path().join("test-queue");
392
393 let mut spool = Spool::create(&path).await.unwrap();
394 assert_eq!(spool.pop_front().await.unwrap(), None);
395 }
396
397 #[tokio::test]
398 async fn test_compression() {
399 let dir = tempdir().unwrap();
400 let path = dir.path().join("test-queue");
401
402 let mut spool = Spool::create_compressed(&path).await.unwrap();
403
404 let data = b"hello world ".repeat(100);
405 spool.push(&data).await.unwrap();
406
407 let retrieved = spool.pop_front().await.unwrap().unwrap();
409 assert_eq!(retrieved, data);
410 }
411
412 #[tokio::test]
413 async fn test_max_items_limit() {
414 let dir = tempdir().unwrap();
415 let path = dir.path().join("test-queue");
416
417 let config = SpoolConfig::new(&path).max_items(2);
418 let mut spool = Spool::open(config).await.unwrap();
419
420 spool.push(b"one").await.unwrap();
421 spool.push(b"two").await.unwrap();
422
423 let result = spool.push(b"three").await;
424 assert!(matches!(
425 result,
426 Err(SpoolError::MaxItemsReached { max: 2 })
427 ));
428 }
429
430 #[tokio::test]
431 async fn test_clear() {
432 let dir = tempdir().unwrap();
433 let path = dir.path().join("test-queue");
434
435 let mut spool = Spool::create(&path).await.unwrap();
436 spool.push(b"one").await.unwrap();
437 spool.push(b"two").await.unwrap();
438
439 assert_eq!(spool.len(), 2);
440 spool.clear().unwrap();
441 assert!(spool.is_empty());
442 }
443
444 #[tokio::test]
445 async fn test_len_survives_reopen() {
446 let dir = tempdir().unwrap();
447 let path = dir.path().join("test-reopen-queue");
448
449 {
451 let mut spool = Spool::create(&path).await.unwrap();
452 spool.push(b"one").await.unwrap();
453 spool.push(b"two").await.unwrap();
454 spool.push(b"three").await.unwrap();
455 assert_eq!(spool.len(), 3);
456 }
457
458 {
460 let spool = Spool::create(&path).await.unwrap();
461 assert_eq!(spool.len(), 3);
462 }
463 }
464
465 #[tokio::test]
466 async fn test_len_survives_partial_consume_and_reopen() {
467 let dir = tempdir().unwrap();
468 let path = dir.path().join("test-partial-queue");
469
470 {
472 let mut spool = Spool::create(&path).await.unwrap();
473 for i in 0..5 {
474 spool.push(format!("item-{i}").as_bytes()).await.unwrap();
475 }
476 assert_eq!(spool.len(), 5);
477 spool.pop_front().await.unwrap(); spool.pop_front().await.unwrap(); assert_eq!(spool.len(), 3);
480 }
481
482 {
484 let spool = Spool::create(&path).await.unwrap();
485 assert_eq!(spool.len(), 3);
486 }
487 }
488
489 #[tokio::test]
490 async fn test_debug_format() {
491 let dir = tempdir().unwrap();
492 let path = dir.path().join("test-queue");
493
494 let spool = Spool::create(&path).await.unwrap();
495 let debug = format!("{spool:?}");
496 assert!(debug.contains("Spool"));
497 assert!(debug.contains("test-queue"));
498 }
499}