yaque/queue/
mod.rs

1//! Queue implementation and utility functions.
2
3mod iter;
4mod receiver;
5mod sender;
6
7pub use iter::QueueIter;
8pub use receiver::{Receiver, ReceiverBuilder, RecvGuard};
9pub use sender::{Sender, SenderBuilder};
10
11#[cfg(feature = "recovery")]
12pub(crate) use receiver::recv_lock_filename;
13#[cfg(feature = "recovery")]
14pub(crate) use sender::send_lock_filename;
15
16use std::fs::*;
17use std::io::{self};
18use std::path::{Path, PathBuf};
19
20use receiver::{acquire_recv_lock, try_acquire_recv_lock};
21use sender::{acquire_send_lock, try_acquire_send_lock};
22
23/// The name of segment file in the queue folder.
24fn segment_filename<P: AsRef<Path>>(base: P, segment: u64) -> PathBuf {
25    base.as_ref().join(format!("{}.q", segment))
26}
27
28/// The value of a header EOF.
29const HEADER_EOF: [u8; 4] = [255, 255, 255, 255];
30
31/// Convenience function for opening the queue for both sending and receiving.
32pub fn channel<P: AsRef<Path>>(base: P) -> io::Result<(Sender, Receiver)> {
33    Ok((Sender::open(base.as_ref())?, Receiver::open(base.as_ref())?))
34}
35
36/// Tries to deletes a queue at the given path. This function will fail if the
37/// queue is in use either for sending or receiving.
38pub fn try_clear<P: AsRef<Path>>(base: P) -> io::Result<()> {
39    let mut send_lock = try_acquire_send_lock(base.as_ref())?;
40    let mut recv_lock = try_acquire_recv_lock(base.as_ref())?;
41
42    // Sets the the locks to ignore when their files magically disappear.
43    send_lock.ignore();
44    recv_lock.ignore();
45
46    remove_dir_all(base.as_ref())?;
47
48    Ok(())
49}
50
51/// Deletes a queue at the given path. This function will await the queue to
52/// become available for both sending and receiving.
53pub async fn clear<P: AsRef<Path>>(base: P) -> io::Result<()> {
54    let mut send_lock = acquire_send_lock(base.as_ref()).await?;
55    let mut recv_lock = acquire_recv_lock(base.as_ref()).await?;
56
57    // Sets the the locks to ignore when their files magically disappear.
58    send_lock.ignore();
59    recv_lock.ignore();
60
61    remove_dir_all(base.as_ref())?;
62
63    Ok(())
64}
65
66/// Global initialization for tests
67#[cfg(test)]
68#[ctor::ctor]
69fn init_log() {
70    // Init logger:
71    #[cfg(feature = "log-trace")]
72    simple_logger::SimpleLogger::new()
73        .with_level(log::LevelFilter::Trace)
74        .init()
75        .ok();
76
77    #[cfg(feature = "log-debug")]
78    simple_logger::SimpleLogger::new()
79        .with_level(log::LevelFilter::Debug)
80        .init()
81        .ok();
82
83    // Remove an old test:
84    std::fs::remove_dir_all("data").ok();
85
86    // Create new structure:
87    std::fs::create_dir_all("data").unwrap();
88}
89
90#[cfg(test)]
91mod tests {
92    use super::*;
93
94    // use futures::StreamExt;
95    use futures_timer::Delay;
96    use rand::{Rng, SeedableRng};
97    use rand_xorshift::XorShiftRng;
98    use std::sync::Arc;
99    use std::time::Duration;
100
101    use crate::error::{TryRecvError, TrySendError};
102
103    use self::sender::get_queue_size;
104
105    fn data_lots_of_data() -> impl Iterator<Item = Vec<u8>> {
106        let mut rng = XorShiftRng::from_rng(rand::thread_rng()).expect("can init");
107        (0..).map(move |_| {
108            (0..rng.gen::<usize>() % 128)
109                .map(|_| rng.gen())
110                .collect::<Vec<_>>()
111        })
112    }
113
114    #[test]
115    fn create_and_clear() {
116        let _ = Sender::open("data/create-and-clear").unwrap();
117        try_clear("data/create-and-clear").unwrap();
118    }
119
120    #[test]
121    #[should_panic]
122    fn create_and_clear_fails() {
123        let sender = Sender::open("data/create-and-clear-fails").unwrap();
124        try_clear("data/create-and-clear-fails").unwrap();
125        drop(sender);
126    }
127
128    #[test]
129    fn create_and_clear_async() {
130        let _ = Sender::open("data/create-and-clear-async").unwrap();
131
132        futures::executor::block_on(async { clear("data/create-and-clear-async").await.unwrap() });
133    }
134
135    #[test]
136    fn test_enqueue() {
137        let mut sender = Sender::open("data/enqueue").unwrap();
138        for data in data_lots_of_data().take(100_000) {
139            sender.try_send(&data).unwrap();
140        }
141    }
142
143    /// Test enqueuing everything and then dequeueing everything, with no persistence.
144    #[test]
145    fn test_enqueue_then_dequeue() {
146        // Enqueue:
147        let dataset = data_lots_of_data().take(100_000).collect::<Vec<_>>();
148        let mut sender = Sender::open("data/enqueue-then-dequeue").unwrap();
149        for data in &dataset {
150            sender.try_send(data).unwrap();
151        }
152
153        log::trace!("enqueued");
154
155        // Dequeue:
156        futures::executor::block_on(async {
157            let mut receiver = Receiver::open("data/enqueue-then-dequeue").unwrap();
158            let dataset_iter = dataset.iter();
159            let mut i = 0u64;
160
161            for should_be in dataset_iter {
162                let data = receiver.recv().await.unwrap();
163                assert_eq!(&*data, should_be, "at sample {}", i);
164                i += 1;
165                data.commit().unwrap();
166            }
167        });
168    }
169
170    /// Test enqueuing and dequeueing, round robin, with no persistence.
171    #[test]
172    fn test_enqueue_and_dequeue() {
173        // Enqueue:
174        let dataset = data_lots_of_data().take(100_000).collect::<Vec<_>>();
175        let mut sender = Sender::open("data/enqueue-and-dequeue").unwrap();
176
177        futures::executor::block_on(async {
178            let mut receiver = Receiver::open("data/enqueue-and-dequeue").unwrap();
179            let mut i = 0;
180
181            for data in &dataset {
182                sender.try_send(data).unwrap();
183                let received = receiver.recv().await.unwrap();
184                assert_eq!(&*received, data, "at sample {}", i);
185
186                i += 1;
187                received.commit().unwrap();
188            }
189        });
190    }
191
192    /// Test enqueuing and dequeueing in parallel.
193    #[test]
194    fn test_enqueue_dequeue_parallel() {
195        // Generate data:
196        let dataset = data_lots_of_data().take(100_000).collect::<Vec<_>>();
197        let arc_sender = Arc::new(dataset);
198        let arc_receiver = arc_sender.clone();
199
200        // Enqueue (let's test async send!):
201        let enqueue = std::thread::spawn(move || {
202            futures::executor::block_on(async {
203                let mut sender = Sender::open("data/enqueue-dequeue-parallel").unwrap();
204                for data in &*arc_sender {
205                    sender.send(data).await.unwrap();
206                }
207            });
208        });
209
210        // Dequeue:
211        let dequeue = std::thread::spawn(move || {
212            futures::executor::block_on(async {
213                let mut receiver = Receiver::open("data/enqueue-dequeue-parallel").unwrap();
214                let dataset_iter = arc_receiver.iter();
215                let mut i = 0u64;
216
217                for should_be in dataset_iter {
218                    let data = receiver.recv().await.unwrap();
219                    assert_eq!(&*data, should_be, "at sample {}", i);
220                    i += 1;
221                    data.commit().unwrap();
222                }
223            });
224        });
225
226        enqueue.join().expect("enqueue thread panicked");
227        dequeue.join().expect("dequeue thread panicked");
228    }
229
230    /// Test enqueuing and dequeueing in parallel, using batches.
231    #[test]
232    fn test_enqueue_dequeue_parallel_with_batches() {
233        // Generate data:
234        let mut dataset = vec![];
235        let mut batch = vec![];
236
237        for data in data_lots_of_data().take(100_000) {
238            batch.push(data);
239
240            if batch.len() >= 256 {
241                dataset.push(batch);
242                batch = vec![];
243            }
244        }
245
246        let arc_sender = Arc::new(dataset);
247        let arc_receiver = arc_sender.clone();
248
249        // Enqueue:
250        let enqueue = std::thread::spawn(move || {
251            let mut sender = Sender::open("data/enqueue-dequeue-parallel-with-batches").unwrap();
252            for batch in &*arc_sender {
253                sender.try_send_batch(batch).unwrap();
254            }
255        });
256
257        // Dequeue:
258        let dequeue = std::thread::spawn(move || {
259            futures::executor::block_on(async {
260                let mut receiver =
261                    Receiver::open("data/enqueue-dequeue-parallel-with-batches").unwrap();
262                let dataset_iter = arc_receiver.iter();
263                let mut i = 0u64;
264
265                for should_be in dataset_iter {
266                    let batch = receiver.recv_batch(256).await.unwrap();
267                    assert_eq!(&*batch, should_be, "at sample {}", i);
268                    i += 1;
269                    batch.commit().unwrap();
270                }
271            });
272        });
273
274        enqueue.join().expect("enqueue thread panicked");
275        dequeue.join().expect("dequeue thread panicked");
276    }
277
278    #[test]
279    fn test_dequeue_is_atomic() {
280        let mut sender = Sender::open("data/dequeue-is-atomic").unwrap();
281        let dataset = data_lots_of_data().take(100_000).collect::<Vec<_>>();
282
283        futures::executor::block_on(async move {
284            let mut receiver = Receiver::open("data/dequeue-is-atomic").unwrap();
285            let dataset_iter = dataset.iter();
286            let mut i = 0u64;
287
288            for data in dataset_iter {
289                sender.try_send(data).unwrap();
290                // Try not to poll the future to the end.
291                // TODO maybe you need something a bit more convincing than
292                // `async {}`...
293                let incomplete =
294                    futures::future::select(Box::pin(receiver.recv()), Box::pin(async {})).await;
295                drop(incomplete); // need to force this explicitly.
296
297                //
298                let received = receiver.recv().await.unwrap();
299                assert_eq!(&*received, data, "at sample {}", i);
300                i += 1;
301                received.commit().unwrap();
302            }
303        });
304    }
305
306    #[test]
307    fn test_rollback() {
308        futures::executor::block_on(async move {
309            let (mut sender, mut receiver) = channel("data/rollback").unwrap();
310            sender.try_send(b"123").unwrap();
311            sender.try_send(b"456").unwrap();
312
313            assert_eq!(&*receiver.recv().await.unwrap(), b"123");
314            assert_eq!(&*receiver.recv().await.unwrap(), b"123");
315
316            receiver.recv().await.unwrap().commit().unwrap();
317
318            assert_eq!(&*receiver.recv().await.unwrap(), b"456");
319            assert_eq!(&*receiver.recv().await.unwrap(), b"456");
320        });
321    }
322
323    #[test]
324    fn test_recv_timeout_nothing() {
325        futures::executor::block_on(async move {
326            let (_, mut receiver) = channel("data/recv-timeout-nothing").unwrap();
327
328            assert!(receiver
329                .recv_timeout(Delay::new(Duration::from_secs(1)))
330                .await
331                .unwrap()
332                .is_none(),);
333        });
334    }
335
336    #[test]
337    fn test_recv_timeout_immediate() {
338        futures::executor::block_on(async move {
339            let (mut sender, mut receiver) = channel("data/recv-timeout-immediate").unwrap();
340
341            sender.try_send(b"123").unwrap();
342            // sender.send(b"456").unwrap();
343
344            assert_eq!(
345                &*receiver
346                    .recv_timeout(Delay::new(Duration::from_secs(1)))
347                    .await
348                    .unwrap()
349                    .unwrap(),
350                b"123"
351            );
352        });
353    }
354
355    #[test]
356    fn test_recv_timeout_delayed() {
357        futures::executor::block_on(async move {
358            let (mut sender, mut receiver) = channel("data/recv-timeout-delayed").unwrap();
359
360            std::thread::spawn(move || {
361                futures::executor::block_on(async move {
362                    Delay::new(Duration::from_secs(1)).await;
363                    sender.try_send(b"123").unwrap();
364                });
365            });
366
367            assert_eq!(
368                &*receiver
369                    .recv_timeout(Delay::new(Duration::from_secs(2)))
370                    .await
371                    .unwrap()
372                    .unwrap(),
373                b"123"
374            );
375        });
376    }
377
378    #[test]
379    fn test_recv_batch_timeout_nothing() {
380        futures::executor::block_on(async move {
381            let (_, mut receiver) = channel("data/recv-batch-timeout-nothing").unwrap();
382
383            assert!(receiver
384                .recv_batch_timeout(2, Delay::new(Duration::from_secs(1)))
385                .await
386                .unwrap()
387                .is_empty(),);
388        });
389    }
390
391    #[test]
392    fn test_recv_batch_timeout_immediate() {
393        futures::executor::block_on(async move {
394            let (mut sender, mut receiver) = channel("data/recv-batch-timeout-immediate").unwrap();
395
396            sender.try_send(b"123").unwrap();
397            sender.try_send(b"456").unwrap();
398
399            assert_eq!(
400                &*receiver
401                    .recv_batch_timeout(2, Delay::new(Duration::from_secs(1)))
402                    .await
403                    .unwrap(),
404                &[b"123", b"456"],
405            );
406        });
407    }
408
409    #[test]
410    fn test_recv_batch_timeout_delayed_1() {
411        futures::executor::block_on(async move {
412            let (mut sender, mut receiver) = channel("data/recv-batch-timeout-delayed-1").unwrap();
413
414            std::thread::spawn(move || {
415                futures::executor::block_on(async move {
416                    for i in 0..5usize {
417                        Delay::new(Duration::from_secs_f64(0.5)).await;
418                        sender.try_send(i.to_string().as_bytes()).unwrap();
419                    }
420                });
421            });
422
423            assert_eq!(
424                &*receiver
425                    .recv_batch_timeout(3, Delay::new(Duration::from_secs(2)))
426                    .await
427                    .unwrap(),
428                &[b"0", b"1", b"2"]
429            );
430        });
431    }
432
433    #[test]
434    fn test_recv_batch_timeout_delayed_2() {
435        futures::executor::block_on(async move {
436            let (mut sender, mut receiver) = channel("data/recv-batch-timeout-delayed-2").unwrap();
437
438            std::thread::spawn(move || {
439                futures::executor::block_on(async move {
440                    for i in 0..5usize {
441                        Delay::new(Duration::from_secs_f64(0.6)).await;
442                        sender.try_send(i.to_string().as_bytes()).unwrap();
443                    }
444                });
445            });
446
447            assert_eq!(
448                &*receiver
449                    .recv_batch_timeout(5, Delay::new(Duration::from_secs(2)))
450                    .await
451                    .unwrap(),
452                &[b"0", b"1", b"2"]
453            );
454        });
455    }
456
457    #[test]
458    fn test_max_queue_size() {
459        let mut sender = SenderBuilder::new()
460            .max_queue_size(Some(2048))
461            .segment_size(512)
462            .open("data/max-queue-size")
463            .unwrap();
464        let mut data = data_lots_of_data();
465
466        loop {
467            let item = data.next().unwrap();
468            match sender.try_send(&item) {
469                Ok(_) => {}
470                Err(TrySendError::Io(err)) => Err(err).unwrap(),
471                Err(TrySendError::QueueFull { .. }) => break,
472            }
473        }
474
475        let size = get_queue_size("data/max-queue-size").unwrap().in_bytes;
476        assert!(
477            size >= 2048,
478            "size was {}; should be at least {}",
479            size,
480            2048
481        );
482    }
483
484    #[test]
485    fn test_max_queue_size_with_drain() {
486        let mut sender = SenderBuilder::new()
487            .max_queue_size(Some(2048))
488            .segment_size(512)
489            .open("data/max-queue-size-with-drain")
490            .unwrap();
491        let mut receiver = Receiver::open("data/max-queue-size-with-drain").unwrap();
492        let mut data = data_lots_of_data();
493
494        loop {
495            let item = data.next().unwrap();
496            match sender.try_send(&item) {
497                Ok(_) => {}
498                Err(TrySendError::Io(err)) => Err(err).unwrap(),
499                Err(TrySendError::QueueFull { .. }) => break,
500            }
501        }
502
503        let size = get_queue_size("data/max-queue-size-with-drain")
504            .unwrap()
505            .in_bytes;
506        assert!(
507            size >= 2048,
508            "size was {}; should be at least {}",
509            size,
510            2048
511        );
512
513        // Drain queue:
514        loop {
515            match receiver.try_recv() {
516                Ok(thing) => thing.commit().unwrap(),
517                Err(TryRecvError::QueueEmpty) => break,
518                Err(TryRecvError::Io(err)) => Err(err).unwrap(),
519            }
520        }
521
522        for _ in 0..8 {
523            sender.try_send(data.next().unwrap()).unwrap();
524        }
525    }
526
527    /// Test enqueuing and dequeueing in parallel.
528    #[test]
529    fn test_enqueue_dequeue_parallel_with_max_queue_size() {
530        fn test(queue_size: u64) {
531            // Generate data:
532            let dataset = data_lots_of_data().take(100_000).collect::<Vec<_>>();
533            let arc_sender = Arc::new(dataset);
534            let arc_receiver = arc_sender.clone();
535
536            // Enqueue (let's test async send!):
537            let enqueue = std::thread::spawn(move || {
538                futures::executor::block_on(async {
539                    let mut sender = SenderBuilder::new()
540                        .max_queue_size(Some(queue_size))
541                        .open("data/enqueue-dequeue-parallel-with-max-queue-size")
542                        .unwrap();
543                    for data in &*arc_sender {
544                        sender.send(data).await.unwrap();
545                    }
546                });
547            });
548
549            // Dequeue:
550            let dequeue = std::thread::spawn(move || {
551                futures::executor::block_on(async {
552                    let mut receiver =
553                        Receiver::open("data/enqueue-dequeue-parallel-with-max-queue-size")
554                            .unwrap();
555                    let dataset_iter = arc_receiver.iter();
556                    let mut i = 0u64;
557
558                    for should_be in dataset_iter {
559                        let data = receiver.recv().await.unwrap();
560                        assert_eq!(&*data, should_be, "at sample {}", i);
561                        i += 1;
562                        data.commit().unwrap();
563                    }
564                });
565            });
566
567            enqueue.join().expect("enqueue thread panicked");
568            dequeue.join().expect("dequeue thread panicked");
569
570            try_clear("data/enqueue-dequeue-parallel-with-max-queue-size").unwrap();
571        }
572
573        test(2 * 1024 * 1024); // smaller than segment
574        test(4 * 1024 * 1024); // equal to segment
575        test(8 * 1024 * 1024); // bigger than segment
576    }
577
578    #[test]
579    #[should_panic]
580    fn test_small_queue_size() {
581        SenderBuilder::new().max_queue_size(Some(0));
582    }
583
584    #[test]
585    #[should_panic]
586    fn test_small_segment_size() {
587        SenderBuilder::new().segment_size(0);
588    }
589
590    // test small segment size + big batch transaction: commit and rollback.
591    #[test]
592    fn test_trans_segment_transactions() {
593        let data = data_lots_of_data().take(100).collect::<Vec<_>>();
594
595        // Populate a queue:
596        let mut sender = SenderBuilder::new()
597            .segment_size(512)
598            .open("data/trans-segment-transactions")
599            .unwrap();
600
601        sender.try_send_batch(&data).unwrap();
602
603        futures::executor::block_on(async move {
604            let mut receiver = Receiver::open("data/trans-segment-transactions").unwrap();
605
606            // Do some rollbacks:
607            for _ in 0..7 {
608                let batch = receiver.recv_batch(50).await.unwrap();
609
610                for (batch_item, item) in batch.iter().zip(&data) {
611                    assert_eq!(batch_item, item);
612                }
613
614                batch.rollback().unwrap();
615            }
616
617            // Now commit:
618            let batch = receiver.recv_batch(50).await.unwrap();
619
620            for (batch_item, item) in batch.iter().zip(&data) {
621                assert_eq!(batch_item, item);
622            }
623
624            batch.commit().unwrap();
625
626            // And now do some more rollbacks:
627            for _ in 0..7 {
628                let batch = receiver.recv_batch(50).await.unwrap();
629
630                for (batch_item, item) in batch.iter().zip(&data[50..]) {
631                    assert_eq!(batch_item, item);
632                }
633
634                batch.rollback().unwrap();
635            }
636        });
637    }
638
639    // test simple try_recv uses.
640    #[test]
641    fn test_try_recv() {
642        let data = data_lots_of_data().take(100).collect::<Vec<_>>();
643
644        // Populate a queue:
645        let mut sender = SenderBuilder::new()
646            .segment_size(512)
647            .open("data/try-recv")
648            .unwrap();
649
650        sender.try_send_batch(&data[..25]).unwrap();
651
652        let mut receiver = Receiver::open("data/try-recv").unwrap();
653
654        let mut count = 0;
655        loop {
656            match receiver.try_recv() {
657                Ok(item) => {
658                    assert_eq!(&*item, &data[count]);
659                    item.commit().unwrap();
660                    count += 1;
661                }
662                Err(TryRecvError::Io(err)) => Err(err).unwrap(),
663                Err(TryRecvError::QueueEmpty) => break,
664            }
665        }
666
667        assert_eq!(count, 25);
668    }
669
670    // test simple try_recv_batch uses.
671    #[test]
672    fn test_try_recv_batch() {
673        let data = data_lots_of_data().take(100).collect::<Vec<_>>();
674
675        // Populate a queue:
676        let mut sender = SenderBuilder::new()
677            .segment_size(512)
678            .open("data/try-recv-batch")
679            .unwrap();
680
681        let mut receiver = Receiver::open("data/try-recv-batch").unwrap();
682
683        // only works for multiples of 25, up to 25
684        for recv_size in [1, 5, 25] {
685            sender.try_send_batch(&data[..25]).unwrap();
686
687            let mut count = 0;
688            loop {
689                match receiver.try_recv_batch(recv_size) {
690                    Ok(items) => {
691                        for item in items.iter() {
692                            assert_eq!(&*item, &data[count]);
693                            count += 1;
694                        }
695                        items.commit().unwrap();
696                    }
697                    Err(TryRecvError::Io(err)) => Err(err).unwrap(),
698                    Err(TryRecvError::QueueEmpty) => break,
699                }
700            }
701
702            assert_eq!(count, 25);
703        }
704    }
705
706    // test simple try_recv_batch_up_to uses.
707    #[test]
708    fn test_try_recv_batch_up_to() {
709        let data = data_lots_of_data().take(100).collect::<Vec<_>>();
710
711        // Populate a queue:
712        let mut sender = SenderBuilder::new()
713            .segment_size(512)
714            .open("data/try-recv-batch-up-to")
715            .unwrap();
716
717        let mut receiver = Receiver::open("data/try-recv-batch-up-to").unwrap();
718
719        for send_size in [1, 10, 25, 100] {
720            for recv_size in [1, 7, 25, 30, 100] {
721                println!("send_size: {send_size}, recv_size: {recv_size}");
722
723                sender.try_send_batch(&data[..send_size]).unwrap();
724
725                let mut count = 0;
726                loop {
727                    match receiver.try_recv_batch_up_to(recv_size) {
728                        Ok(items) => {
729                            for item in items.iter() {
730                                assert_eq!(&*item, &data[count]);
731                                count += 1;
732                            }
733                            items.commit().unwrap();
734                        }
735                        Err(TryRecvError::Io(err)) => Err(err).unwrap(),
736                        Err(TryRecvError::QueueEmpty) => break,
737                    }
738                }
739
740                assert_eq!(count, send_size);
741            }
742        }
743    }
744
745    #[test]
746    fn test_try_recv_batch_up_to_zero() {
747        // Populate a queue:
748        let mut sender = SenderBuilder::new()
749            .segment_size(512)
750            .open("data/try-recv-batch-up-to-zero")
751            .unwrap();
752
753        let mut receiver = Receiver::open("data/try-recv-batch-up-to-zero").unwrap();
754        sender.try_send_batch([[1]]).unwrap();
755
756        let no_items = receiver
757            .try_recv_batch_up_to(0)
758            .unwrap()
759            .try_into_inner()
760            .unwrap();
761        assert!(no_items.is_empty());
762    }
763
764    #[test]
765    fn test_receive_with_timeout_and_end_transaction() {
766        // let data = data_lots_of_data().take(100).collect::<Vec<_>>();
767
768        // let (mut sender, mut receiver) = channel("data/receive_with_timeout_and_end_transaction").unwrap();
769
770        // futures::executor::block_on(async move {
771        //     // Put 7 items:
772        //     sender.try_send("these").unwrap();
773        //     sender.try_send("are").unwrap();
774        //     sender.try_send("seven").unwrap();
775        //     sender.try_send("items").unwrap();
776        //     sender.try_send("in").unwrap();
777        //     sender.try_send("the").unwrap();
778        //     sender.try_send("queue").unwrap();
779
780        //     // Ask for 10:
781        //     let guard = receiver.recv_batch_timeout(10, Delay::new(Duration::from_millis(10))).await.unwrap();
782        //     assert!(guard.is_none());
783        // });
784    }
785
786    #[test]
787    fn test_iterate() {
788        let data = data_lots_of_data().take(10_000).collect::<Vec<_>>();
789
790        // Populate a queue:
791        let mut sender = SenderBuilder::new()
792            .segment_size(512)
793            .open("data/iterate")
794            .unwrap();
795
796        sender.try_send_batch(&data).unwrap();
797
798        let iterated = QueueIter::open("data/iterate")
799            .unwrap()
800            .enumerate()
801            .map(|(i, item)| {
802                println!("{}", i);
803                item.unwrap()
804            })
805            .collect::<Vec<_>>();
806
807        assert_eq!(data, iterated);
808    }
809
810    #[test]
811    fn test_try_recv_empty_msg() {
812        // Populate a queue:
813        let mut sender = SenderBuilder::new()
814            .segment_size(512)
815            .open("data/try-recv-empty-msg")
816            .unwrap();
817
818        sender.try_send(&[]).unwrap();
819
820        let mut receiver = Receiver::open("data/try-recv-empty-msg").unwrap();
821
822        let item = receiver.try_recv().unwrap();
823        assert_eq!(&*item, &[]);
824        item.commit().unwrap();
825    }
826}