1mod iter;
4mod receiver;
5mod sender;
6
7pub use iter::QueueIter;
8pub use receiver::{Receiver, ReceiverBuilder, RecvGuard};
9pub use sender::{Sender, SenderBuilder};
10
11#[cfg(feature = "recovery")]
12pub(crate) use receiver::recv_lock_filename;
13#[cfg(feature = "recovery")]
14pub(crate) use sender::send_lock_filename;
15
16use std::fs::*;
17use std::io::{self};
18use std::path::{Path, PathBuf};
19
20use receiver::{acquire_recv_lock, try_acquire_recv_lock};
21use sender::{acquire_send_lock, try_acquire_send_lock};
22
23fn segment_filename<P: AsRef<Path>>(base: P, segment: u64) -> PathBuf {
25 base.as_ref().join(format!("{}.q", segment))
26}
27
28const HEADER_EOF: [u8; 4] = [255, 255, 255, 255];
30
31pub fn channel<P: AsRef<Path>>(base: P) -> io::Result<(Sender, Receiver)> {
33 Ok((Sender::open(base.as_ref())?, Receiver::open(base.as_ref())?))
34}
35
36pub fn try_clear<P: AsRef<Path>>(base: P) -> io::Result<()> {
39 let mut send_lock = try_acquire_send_lock(base.as_ref())?;
40 let mut recv_lock = try_acquire_recv_lock(base.as_ref())?;
41
42 send_lock.ignore();
44 recv_lock.ignore();
45
46 remove_dir_all(base.as_ref())?;
47
48 Ok(())
49}
50
51pub async fn clear<P: AsRef<Path>>(base: P) -> io::Result<()> {
54 let mut send_lock = acquire_send_lock(base.as_ref()).await?;
55 let mut recv_lock = acquire_recv_lock(base.as_ref()).await?;
56
57 send_lock.ignore();
59 recv_lock.ignore();
60
61 remove_dir_all(base.as_ref())?;
62
63 Ok(())
64}
65
66#[cfg(test)]
68#[ctor::ctor]
69fn init_log() {
70 #[cfg(feature = "log-trace")]
72 simple_logger::SimpleLogger::new()
73 .with_level(log::LevelFilter::Trace)
74 .init()
75 .ok();
76
77 #[cfg(feature = "log-debug")]
78 simple_logger::SimpleLogger::new()
79 .with_level(log::LevelFilter::Debug)
80 .init()
81 .ok();
82
83 std::fs::remove_dir_all("data").ok();
85
86 std::fs::create_dir_all("data").unwrap();
88}
89
90#[cfg(test)]
91mod tests {
92 use super::*;
93
94 use futures_timer::Delay;
96 use rand::{Rng, SeedableRng};
97 use rand_xorshift::XorShiftRng;
98 use std::sync::Arc;
99 use std::time::Duration;
100
101 use crate::error::{TryRecvError, TrySendError};
102
103 use self::sender::get_queue_size;
104
105 fn data_lots_of_data() -> impl Iterator<Item = Vec<u8>> {
106 let mut rng = XorShiftRng::from_rng(rand::thread_rng()).expect("can init");
107 (0..).map(move |_| {
108 (0..rng.gen::<usize>() % 128)
109 .map(|_| rng.gen())
110 .collect::<Vec<_>>()
111 })
112 }
113
114 #[test]
115 fn create_and_clear() {
116 let _ = Sender::open("data/create-and-clear").unwrap();
117 try_clear("data/create-and-clear").unwrap();
118 }
119
120 #[test]
121 #[should_panic]
122 fn create_and_clear_fails() {
123 let sender = Sender::open("data/create-and-clear-fails").unwrap();
124 try_clear("data/create-and-clear-fails").unwrap();
125 drop(sender);
126 }
127
128 #[test]
129 fn create_and_clear_async() {
130 let _ = Sender::open("data/create-and-clear-async").unwrap();
131
132 futures::executor::block_on(async { clear("data/create-and-clear-async").await.unwrap() });
133 }
134
135 #[test]
136 fn test_enqueue() {
137 let mut sender = Sender::open("data/enqueue").unwrap();
138 for data in data_lots_of_data().take(100_000) {
139 sender.try_send(&data).unwrap();
140 }
141 }
142
143 #[test]
145 fn test_enqueue_then_dequeue() {
146 let dataset = data_lots_of_data().take(100_000).collect::<Vec<_>>();
148 let mut sender = Sender::open("data/enqueue-then-dequeue").unwrap();
149 for data in &dataset {
150 sender.try_send(data).unwrap();
151 }
152
153 log::trace!("enqueued");
154
155 futures::executor::block_on(async {
157 let mut receiver = Receiver::open("data/enqueue-then-dequeue").unwrap();
158 let dataset_iter = dataset.iter();
159 let mut i = 0u64;
160
161 for should_be in dataset_iter {
162 let data = receiver.recv().await.unwrap();
163 assert_eq!(&*data, should_be, "at sample {}", i);
164 i += 1;
165 data.commit().unwrap();
166 }
167 });
168 }
169
170 #[test]
172 fn test_enqueue_and_dequeue() {
173 let dataset = data_lots_of_data().take(100_000).collect::<Vec<_>>();
175 let mut sender = Sender::open("data/enqueue-and-dequeue").unwrap();
176
177 futures::executor::block_on(async {
178 let mut receiver = Receiver::open("data/enqueue-and-dequeue").unwrap();
179 let mut i = 0;
180
181 for data in &dataset {
182 sender.try_send(data).unwrap();
183 let received = receiver.recv().await.unwrap();
184 assert_eq!(&*received, data, "at sample {}", i);
185
186 i += 1;
187 received.commit().unwrap();
188 }
189 });
190 }
191
192 #[test]
194 fn test_enqueue_dequeue_parallel() {
195 let dataset = data_lots_of_data().take(100_000).collect::<Vec<_>>();
197 let arc_sender = Arc::new(dataset);
198 let arc_receiver = arc_sender.clone();
199
200 let enqueue = std::thread::spawn(move || {
202 futures::executor::block_on(async {
203 let mut sender = Sender::open("data/enqueue-dequeue-parallel").unwrap();
204 for data in &*arc_sender {
205 sender.send(data).await.unwrap();
206 }
207 });
208 });
209
210 let dequeue = std::thread::spawn(move || {
212 futures::executor::block_on(async {
213 let mut receiver = Receiver::open("data/enqueue-dequeue-parallel").unwrap();
214 let dataset_iter = arc_receiver.iter();
215 let mut i = 0u64;
216
217 for should_be in dataset_iter {
218 let data = receiver.recv().await.unwrap();
219 assert_eq!(&*data, should_be, "at sample {}", i);
220 i += 1;
221 data.commit().unwrap();
222 }
223 });
224 });
225
226 enqueue.join().expect("enqueue thread panicked");
227 dequeue.join().expect("dequeue thread panicked");
228 }
229
230 #[test]
232 fn test_enqueue_dequeue_parallel_with_batches() {
233 let mut dataset = vec![];
235 let mut batch = vec![];
236
237 for data in data_lots_of_data().take(100_000) {
238 batch.push(data);
239
240 if batch.len() >= 256 {
241 dataset.push(batch);
242 batch = vec![];
243 }
244 }
245
246 let arc_sender = Arc::new(dataset);
247 let arc_receiver = arc_sender.clone();
248
249 let enqueue = std::thread::spawn(move || {
251 let mut sender = Sender::open("data/enqueue-dequeue-parallel-with-batches").unwrap();
252 for batch in &*arc_sender {
253 sender.try_send_batch(batch).unwrap();
254 }
255 });
256
257 let dequeue = std::thread::spawn(move || {
259 futures::executor::block_on(async {
260 let mut receiver =
261 Receiver::open("data/enqueue-dequeue-parallel-with-batches").unwrap();
262 let dataset_iter = arc_receiver.iter();
263 let mut i = 0u64;
264
265 for should_be in dataset_iter {
266 let batch = receiver.recv_batch(256).await.unwrap();
267 assert_eq!(&*batch, should_be, "at sample {}", i);
268 i += 1;
269 batch.commit().unwrap();
270 }
271 });
272 });
273
274 enqueue.join().expect("enqueue thread panicked");
275 dequeue.join().expect("dequeue thread panicked");
276 }
277
278 #[test]
279 fn test_dequeue_is_atomic() {
280 let mut sender = Sender::open("data/dequeue-is-atomic").unwrap();
281 let dataset = data_lots_of_data().take(100_000).collect::<Vec<_>>();
282
283 futures::executor::block_on(async move {
284 let mut receiver = Receiver::open("data/dequeue-is-atomic").unwrap();
285 let dataset_iter = dataset.iter();
286 let mut i = 0u64;
287
288 for data in dataset_iter {
289 sender.try_send(data).unwrap();
290 let incomplete =
294 futures::future::select(Box::pin(receiver.recv()), Box::pin(async {})).await;
295 drop(incomplete); let received = receiver.recv().await.unwrap();
299 assert_eq!(&*received, data, "at sample {}", i);
300 i += 1;
301 received.commit().unwrap();
302 }
303 });
304 }
305
306 #[test]
307 fn test_rollback() {
308 futures::executor::block_on(async move {
309 let (mut sender, mut receiver) = channel("data/rollback").unwrap();
310 sender.try_send(b"123").unwrap();
311 sender.try_send(b"456").unwrap();
312
313 assert_eq!(&*receiver.recv().await.unwrap(), b"123");
314 assert_eq!(&*receiver.recv().await.unwrap(), b"123");
315
316 receiver.recv().await.unwrap().commit().unwrap();
317
318 assert_eq!(&*receiver.recv().await.unwrap(), b"456");
319 assert_eq!(&*receiver.recv().await.unwrap(), b"456");
320 });
321 }
322
323 #[test]
324 fn test_recv_timeout_nothing() {
325 futures::executor::block_on(async move {
326 let (_, mut receiver) = channel("data/recv-timeout-nothing").unwrap();
327
328 assert!(receiver
329 .recv_timeout(Delay::new(Duration::from_secs(1)))
330 .await
331 .unwrap()
332 .is_none(),);
333 });
334 }
335
336 #[test]
337 fn test_recv_timeout_immediate() {
338 futures::executor::block_on(async move {
339 let (mut sender, mut receiver) = channel("data/recv-timeout-immediate").unwrap();
340
341 sender.try_send(b"123").unwrap();
342 assert_eq!(
345 &*receiver
346 .recv_timeout(Delay::new(Duration::from_secs(1)))
347 .await
348 .unwrap()
349 .unwrap(),
350 b"123"
351 );
352 });
353 }
354
355 #[test]
356 fn test_recv_timeout_delayed() {
357 futures::executor::block_on(async move {
358 let (mut sender, mut receiver) = channel("data/recv-timeout-delayed").unwrap();
359
360 std::thread::spawn(move || {
361 futures::executor::block_on(async move {
362 Delay::new(Duration::from_secs(1)).await;
363 sender.try_send(b"123").unwrap();
364 });
365 });
366
367 assert_eq!(
368 &*receiver
369 .recv_timeout(Delay::new(Duration::from_secs(2)))
370 .await
371 .unwrap()
372 .unwrap(),
373 b"123"
374 );
375 });
376 }
377
378 #[test]
379 fn test_recv_batch_timeout_nothing() {
380 futures::executor::block_on(async move {
381 let (_, mut receiver) = channel("data/recv-batch-timeout-nothing").unwrap();
382
383 assert!(receiver
384 .recv_batch_timeout(2, Delay::new(Duration::from_secs(1)))
385 .await
386 .unwrap()
387 .is_empty(),);
388 });
389 }
390
391 #[test]
392 fn test_recv_batch_timeout_immediate() {
393 futures::executor::block_on(async move {
394 let (mut sender, mut receiver) = channel("data/recv-batch-timeout-immediate").unwrap();
395
396 sender.try_send(b"123").unwrap();
397 sender.try_send(b"456").unwrap();
398
399 assert_eq!(
400 &*receiver
401 .recv_batch_timeout(2, Delay::new(Duration::from_secs(1)))
402 .await
403 .unwrap(),
404 &[b"123", b"456"],
405 );
406 });
407 }
408
409 #[test]
410 fn test_recv_batch_timeout_delayed_1() {
411 futures::executor::block_on(async move {
412 let (mut sender, mut receiver) = channel("data/recv-batch-timeout-delayed-1").unwrap();
413
414 std::thread::spawn(move || {
415 futures::executor::block_on(async move {
416 for i in 0..5usize {
417 Delay::new(Duration::from_secs_f64(0.5)).await;
418 sender.try_send(i.to_string().as_bytes()).unwrap();
419 }
420 });
421 });
422
423 assert_eq!(
424 &*receiver
425 .recv_batch_timeout(3, Delay::new(Duration::from_secs(2)))
426 .await
427 .unwrap(),
428 &[b"0", b"1", b"2"]
429 );
430 });
431 }
432
433 #[test]
434 fn test_recv_batch_timeout_delayed_2() {
435 futures::executor::block_on(async move {
436 let (mut sender, mut receiver) = channel("data/recv-batch-timeout-delayed-2").unwrap();
437
438 std::thread::spawn(move || {
439 futures::executor::block_on(async move {
440 for i in 0..5usize {
441 Delay::new(Duration::from_secs_f64(0.6)).await;
442 sender.try_send(i.to_string().as_bytes()).unwrap();
443 }
444 });
445 });
446
447 assert_eq!(
448 &*receiver
449 .recv_batch_timeout(5, Delay::new(Duration::from_secs(2)))
450 .await
451 .unwrap(),
452 &[b"0", b"1", b"2"]
453 );
454 });
455 }
456
457 #[test]
458 fn test_max_queue_size() {
459 let mut sender = SenderBuilder::new()
460 .max_queue_size(Some(2048))
461 .segment_size(512)
462 .open("data/max-queue-size")
463 .unwrap();
464 let mut data = data_lots_of_data();
465
466 loop {
467 let item = data.next().unwrap();
468 match sender.try_send(&item) {
469 Ok(_) => {}
470 Err(TrySendError::Io(err)) => Err(err).unwrap(),
471 Err(TrySendError::QueueFull { .. }) => break,
472 }
473 }
474
475 let size = get_queue_size("data/max-queue-size").unwrap().in_bytes;
476 assert!(
477 size >= 2048,
478 "size was {}; should be at least {}",
479 size,
480 2048
481 );
482 }
483
484 #[test]
485 fn test_max_queue_size_with_drain() {
486 let mut sender = SenderBuilder::new()
487 .max_queue_size(Some(2048))
488 .segment_size(512)
489 .open("data/max-queue-size-with-drain")
490 .unwrap();
491 let mut receiver = Receiver::open("data/max-queue-size-with-drain").unwrap();
492 let mut data = data_lots_of_data();
493
494 loop {
495 let item = data.next().unwrap();
496 match sender.try_send(&item) {
497 Ok(_) => {}
498 Err(TrySendError::Io(err)) => Err(err).unwrap(),
499 Err(TrySendError::QueueFull { .. }) => break,
500 }
501 }
502
503 let size = get_queue_size("data/max-queue-size-with-drain")
504 .unwrap()
505 .in_bytes;
506 assert!(
507 size >= 2048,
508 "size was {}; should be at least {}",
509 size,
510 2048
511 );
512
513 loop {
515 match receiver.try_recv() {
516 Ok(thing) => thing.commit().unwrap(),
517 Err(TryRecvError::QueueEmpty) => break,
518 Err(TryRecvError::Io(err)) => Err(err).unwrap(),
519 }
520 }
521
522 for _ in 0..8 {
523 sender.try_send(data.next().unwrap()).unwrap();
524 }
525 }
526
527 #[test]
529 fn test_enqueue_dequeue_parallel_with_max_queue_size() {
530 fn test(queue_size: u64) {
531 let dataset = data_lots_of_data().take(100_000).collect::<Vec<_>>();
533 let arc_sender = Arc::new(dataset);
534 let arc_receiver = arc_sender.clone();
535
536 let enqueue = std::thread::spawn(move || {
538 futures::executor::block_on(async {
539 let mut sender = SenderBuilder::new()
540 .max_queue_size(Some(queue_size))
541 .open("data/enqueue-dequeue-parallel-with-max-queue-size")
542 .unwrap();
543 for data in &*arc_sender {
544 sender.send(data).await.unwrap();
545 }
546 });
547 });
548
549 let dequeue = std::thread::spawn(move || {
551 futures::executor::block_on(async {
552 let mut receiver =
553 Receiver::open("data/enqueue-dequeue-parallel-with-max-queue-size")
554 .unwrap();
555 let dataset_iter = arc_receiver.iter();
556 let mut i = 0u64;
557
558 for should_be in dataset_iter {
559 let data = receiver.recv().await.unwrap();
560 assert_eq!(&*data, should_be, "at sample {}", i);
561 i += 1;
562 data.commit().unwrap();
563 }
564 });
565 });
566
567 enqueue.join().expect("enqueue thread panicked");
568 dequeue.join().expect("dequeue thread panicked");
569
570 try_clear("data/enqueue-dequeue-parallel-with-max-queue-size").unwrap();
571 }
572
573 test(2 * 1024 * 1024); test(4 * 1024 * 1024); test(8 * 1024 * 1024); }
577
578 #[test]
579 #[should_panic]
580 fn test_small_queue_size() {
581 SenderBuilder::new().max_queue_size(Some(0));
582 }
583
584 #[test]
585 #[should_panic]
586 fn test_small_segment_size() {
587 SenderBuilder::new().segment_size(0);
588 }
589
590 #[test]
592 fn test_trans_segment_transactions() {
593 let data = data_lots_of_data().take(100).collect::<Vec<_>>();
594
595 let mut sender = SenderBuilder::new()
597 .segment_size(512)
598 .open("data/trans-segment-transactions")
599 .unwrap();
600
601 sender.try_send_batch(&data).unwrap();
602
603 futures::executor::block_on(async move {
604 let mut receiver = Receiver::open("data/trans-segment-transactions").unwrap();
605
606 for _ in 0..7 {
608 let batch = receiver.recv_batch(50).await.unwrap();
609
610 for (batch_item, item) in batch.iter().zip(&data) {
611 assert_eq!(batch_item, item);
612 }
613
614 batch.rollback().unwrap();
615 }
616
617 let batch = receiver.recv_batch(50).await.unwrap();
619
620 for (batch_item, item) in batch.iter().zip(&data) {
621 assert_eq!(batch_item, item);
622 }
623
624 batch.commit().unwrap();
625
626 for _ in 0..7 {
628 let batch = receiver.recv_batch(50).await.unwrap();
629
630 for (batch_item, item) in batch.iter().zip(&data[50..]) {
631 assert_eq!(batch_item, item);
632 }
633
634 batch.rollback().unwrap();
635 }
636 });
637 }
638
639 #[test]
641 fn test_try_recv() {
642 let data = data_lots_of_data().take(100).collect::<Vec<_>>();
643
644 let mut sender = SenderBuilder::new()
646 .segment_size(512)
647 .open("data/try-recv")
648 .unwrap();
649
650 sender.try_send_batch(&data[..25]).unwrap();
651
652 let mut receiver = Receiver::open("data/try-recv").unwrap();
653
654 let mut count = 0;
655 loop {
656 match receiver.try_recv() {
657 Ok(item) => {
658 assert_eq!(&*item, &data[count]);
659 item.commit().unwrap();
660 count += 1;
661 }
662 Err(TryRecvError::Io(err)) => Err(err).unwrap(),
663 Err(TryRecvError::QueueEmpty) => break,
664 }
665 }
666
667 assert_eq!(count, 25);
668 }
669
670 #[test]
672 fn test_try_recv_batch() {
673 let data = data_lots_of_data().take(100).collect::<Vec<_>>();
674
675 let mut sender = SenderBuilder::new()
677 .segment_size(512)
678 .open("data/try-recv-batch")
679 .unwrap();
680
681 let mut receiver = Receiver::open("data/try-recv-batch").unwrap();
682
683 for recv_size in [1, 5, 25] {
685 sender.try_send_batch(&data[..25]).unwrap();
686
687 let mut count = 0;
688 loop {
689 match receiver.try_recv_batch(recv_size) {
690 Ok(items) => {
691 for item in items.iter() {
692 assert_eq!(&*item, &data[count]);
693 count += 1;
694 }
695 items.commit().unwrap();
696 }
697 Err(TryRecvError::Io(err)) => Err(err).unwrap(),
698 Err(TryRecvError::QueueEmpty) => break,
699 }
700 }
701
702 assert_eq!(count, 25);
703 }
704 }
705
706 #[test]
708 fn test_try_recv_batch_up_to() {
709 let data = data_lots_of_data().take(100).collect::<Vec<_>>();
710
711 let mut sender = SenderBuilder::new()
713 .segment_size(512)
714 .open("data/try-recv-batch-up-to")
715 .unwrap();
716
717 let mut receiver = Receiver::open("data/try-recv-batch-up-to").unwrap();
718
719 for send_size in [1, 10, 25, 100] {
720 for recv_size in [1, 7, 25, 30, 100] {
721 println!("send_size: {send_size}, recv_size: {recv_size}");
722
723 sender.try_send_batch(&data[..send_size]).unwrap();
724
725 let mut count = 0;
726 loop {
727 match receiver.try_recv_batch_up_to(recv_size) {
728 Ok(items) => {
729 for item in items.iter() {
730 assert_eq!(&*item, &data[count]);
731 count += 1;
732 }
733 items.commit().unwrap();
734 }
735 Err(TryRecvError::Io(err)) => Err(err).unwrap(),
736 Err(TryRecvError::QueueEmpty) => break,
737 }
738 }
739
740 assert_eq!(count, send_size);
741 }
742 }
743 }
744
745 #[test]
746 fn test_try_recv_batch_up_to_zero() {
747 let mut sender = SenderBuilder::new()
749 .segment_size(512)
750 .open("data/try-recv-batch-up-to-zero")
751 .unwrap();
752
753 let mut receiver = Receiver::open("data/try-recv-batch-up-to-zero").unwrap();
754 sender.try_send_batch([[1]]).unwrap();
755
756 let no_items = receiver
757 .try_recv_batch_up_to(0)
758 .unwrap()
759 .try_into_inner()
760 .unwrap();
761 assert!(no_items.is_empty());
762 }
763
764 #[test]
765 fn test_receive_with_timeout_and_end_transaction() {
766 }
785
786 #[test]
787 fn test_iterate() {
788 let data = data_lots_of_data().take(10_000).collect::<Vec<_>>();
789
790 let mut sender = SenderBuilder::new()
792 .segment_size(512)
793 .open("data/iterate")
794 .unwrap();
795
796 sender.try_send_batch(&data).unwrap();
797
798 let iterated = QueueIter::open("data/iterate")
799 .unwrap()
800 .enumerate()
801 .map(|(i, item)| {
802 println!("{}", i);
803 item.unwrap()
804 })
805 .collect::<Vec<_>>();
806
807 assert_eq!(data, iterated);
808 }
809
810 #[test]
811 fn test_try_recv_empty_msg() {
812 let mut sender = SenderBuilder::new()
814 .segment_size(512)
815 .open("data/try-recv-empty-msg")
816 .unwrap();
817
818 sender.try_send(&[]).unwrap();
819
820 let mut receiver = Receiver::open("data/try-recv-empty-msg").unwrap();
821
822 let item = receiver.try_recv().unwrap();
823 assert_eq!(&*item, &[]);
824 item.commit().unwrap();
825 }
826}