disk_backed_queue/lib.rs
1//! # disk-backed-queue
2//!
3//! A robust, crash-resistant queue implementation that persists all data to disk using SQLite.
4//! Provides an mpsc-like channel API while ensuring messages survive application restarts and
5//! system failures.
6//!
7//! ## Features
8//!
9//! - **Zero Message Loss**: All messages are persisted to SQLite before acknowledgment
10//! - **Crash Recovery**: Messages survive application restarts and system crashes
11//! - **MPSC-like API**: Familiar channel-based interface compatible with async Rust
12//! - **Dead Letter Queue**: Corrupted messages automatically moved to separate database
13//! - **Batch Operations**: High-performance bulk send/receive (460x faster than single operations)
14//! - **Backpressure Support**: Optional queue size limits prevent unbounded growth
15//!
16//! ## Quick Start
17//!
18//! ```no_run
19//! use disk_backed_queue::disk_backed_channel;
20//! use serde::{Deserialize, Serialize};
21//!
22//! #[derive(Serialize, Deserialize, Debug)]
23//! struct MyMessage {
24//! id: u64,
25//! content: String,
26//! }
27//!
28//! #[tokio::main]
29//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
30//! // Create a disk-backed channel
31//! let (tx, mut rx) = disk_backed_channel::<MyMessage, _>(
32//! "my_queue.db",
33//! "messages".to_string(),
34//! None // No size limit
35//! ).await?;
36//!
37//! // Send messages
38//! tx.send(MyMessage {
39//! id: 1,
40//! content: "Hello, world!".to_string(),
41//! }).await?;
42//!
43//! // Receive messages
44//! if let Some(msg) = rx.recv().await? {
45//! println!("Received: {:?}", msg);
46//! }
47//!
48//! Ok(())
49//! }
50//! ```
51//!
52//! ## Use Cases
53//!
54//! Perfect for scenarios where message loss is unacceptable:
55//!
56//! - Message queuing during database outages
57//! - Event sourcing and audit logs
58//! - Job queues that survive restarts
59//! - Data pipelines requiring guaranteed delivery
60//!
61//! ## Performance
62//!
63//! Batch operations are approximately **50x faster** than single operations due to
64//! reduced fsync overhead. For high-throughput scenarios, use [`DiskBackedSender::send_batch`]
65//! and [`DiskBackedReceiver::recv_batch`].
66//!
67//! Run `cargo run --example throughput_demo --release` to see performance on your system.
68
69use rusqlite::OptionalExtension;
70use serde::{Deserialize, Serialize};
71use std::marker::PhantomData;
72use std::path::Path;
73use std::sync::{Arc, Mutex};
74use std::time::Duration;
75use thiserror::Error;
76use tracing::{debug, error, instrument, warn};
77
78#[derive(Error, Debug)]
79pub enum DiskQueueError {
80 #[error("Database error: {0}")]
81 Database(#[from] rusqlite::Error),
82 #[error("Serialization error: {0}")]
83 Serialization(#[from] bincode::error::EncodeError),
84 #[error("Deserialization error: {0}")]
85 Deserialization(#[from] bincode::error::DecodeError),
86 #[error("Invalid table name: {0}")]
87 InvalidTableName(String),
88 #[error("Internal task error: {0}")]
89 TaskJoin(String),
90 #[error("Queue is closed")]
91 QueueClosed,
92 #[error("Unexpected row count: {0}")]
93 UnexpectedRowCount(String),
94 #[error("Queue is full (max size: {0})")]
95 QueueFull(usize),
96}
97
98pub type Result<T> = std::result::Result<T, DiskQueueError>;
99
100/// Durability configuration for SQLite synchronous mode
101///
102/// This controls the trade-off between durability and performance.
103#[derive(Debug, Clone, Copy, Default)]
104pub enum DurabilityLevel {
105 /// OFF (0) - No fsync, fastest but data loss on power failure
106 /// **WARNING**: Only use for temporary/cache data where loss is acceptable
107 Off,
108
109 /// NORMAL (1) - Fsync only at critical moments (WAL checkpoints)
110 /// Safe in WAL mode for most crashes, but power loss during checkpoint can corrupt
111 /// Good balance of performance and safety for WAL mode
112 Normal,
113
114 /// FULL (2) - Fsync after every commit (DEFAULT)
115 /// Maximum durability - survives power loss at any moment
116 /// Recommended for critical data where message loss is unacceptable
117 #[default]
118 Full,
119
120 /// EXTRA (3) - Even more paranoid than FULL
121 /// Additional fsyncs for maximum durability
122 /// Rarely needed, significant performance impact
123 Extra,
124}
125
126impl DurabilityLevel {
127 fn as_str(&self) -> &'static str {
128 match self {
129 DurabilityLevel::Off => "OFF",
130 DurabilityLevel::Normal => "NORMAL",
131 DurabilityLevel::Full => "FULL",
132 DurabilityLevel::Extra => "EXTRA",
133 }
134 }
135}
136
137/// Cached SQL query strings for the queue operations.
138///
139/// Pre-formatted SQL queries are cached to avoid repeated string formatting
140/// on every operation. The table name is validated and interpolated once at
141/// queue creation time, ensuring SQL injection safety.
142#[derive(Debug)]
143struct CachedQueries {
144 insert_sql: String,
145 select_sql: String,
146 delete_sql: String,
147 count_sql: String,
148 clear_sql: String,
149}
150
151/// A disk-backed queue that persists all messages to SQLite.
152///
153/// This is the core queue implementation. For most use cases, prefer using
154/// [`disk_backed_channel`] which provides a more ergonomic sender/receiver API.
155///
156/// All operations are async and use `spawn_blocking` internally to avoid blocking
157/// the async executor.
158///
159/// # Type Parameters
160///
161/// * `T` - The message type. Must implement `Serialize + Deserialize + Send + Sync + 'static`.
162///
163/// # Example
164///
165/// ```no_run
166/// use disk_backed_queue::DiskBackedQueue;
167/// use serde::{Deserialize, Serialize};
168///
169/// #[derive(Serialize, Deserialize)]
170/// struct Message {
171/// data: String,
172/// }
173///
174/// #[tokio::main]
175/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
176/// let queue = DiskBackedQueue::new(
177/// "queue.db",
178/// "messages".to_string(),
179/// None,
180/// ).await?;
181///
182/// queue.send(Message { data: "hello".to_string() }).await?;
183/// let msg = queue.recv().await?;
184///
185/// Ok(())
186/// }
187/// ```
188pub struct DiskBackedQueue<T> {
189 db: Arc<Mutex<rusqlite::Connection>>,
190 dlq_db: Arc<Mutex<rusqlite::Connection>>,
191 queries: CachedQueries,
192 dlq_insert_sql: String,
193 table_name: String,
194 max_size: Option<usize>,
195 _phantom: PhantomData<T>,
196}
197
198impl<T> std::fmt::Debug for DiskBackedQueue<T> {
199 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
200 f.debug_struct("DiskBackedQueue")
201 .field("table_name", &self.table_name)
202 .field("max_size", &self.max_size)
203 .field("queries", &self.queries)
204 .finish_non_exhaustive()
205 }
206}
207
208impl<T> DiskBackedQueue<T>
209where
210 T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static,
211{
212 /// Create a new disk-backed queue with default durability (FULL)
213 ///
214 /// This is the recommended constructor for most use cases.
215 #[instrument(skip_all, fields(db_path = %db_path.as_ref().display(), table_name = %table_name))]
216 pub async fn new<P: AsRef<Path>>(
217 db_path: P,
218 table_name: String,
219 max_size: Option<usize>,
220 ) -> Result<Self> {
221 Self::with_durability(db_path, table_name, max_size, DurabilityLevel::default()).await
222 }
223
224 /// Create a new disk-backed queue with custom durability level
225 ///
226 /// # Durability Levels
227 ///
228 /// - `DurabilityLevel::Full` (default) - Maximum safety, fsync after every commit
229 /// - `DurabilityLevel::Normal` - Good balance in WAL mode, fsync at checkpoints
230 /// - `DurabilityLevel::Off` - Fastest, but data loss on power failure (not recommended)
231 /// - `DurabilityLevel::Extra` - Paranoid mode, additional fsyncs (rarely needed)
232 ///
233 /// # Example
234 ///
235 /// ```ignore
236 /// // For critical data (default)
237 /// let queue = DiskBackedQueue::with_durability(
238 /// "critical.db",
239 /// "messages".to_string(),
240 /// None,
241 /// DurabilityLevel::Full,
242 /// ).await?;
243 ///
244 /// // For high-performance temporary data
245 /// let queue = DiskBackedQueue::with_durability(
246 /// "cache.db",
247 /// "temp".to_string(),
248 /// None,
249 /// DurabilityLevel::Normal,
250 /// ).await?;
251 /// ```
252 #[instrument(skip_all, fields(db_path = %db_path.as_ref().display(), table_name = %table_name, durability = ?durability))]
253 pub async fn with_durability<P: AsRef<Path>>(
254 db_path: P,
255 table_name: String,
256 max_size: Option<usize>,
257 durability: DurabilityLevel,
258 ) -> Result<Self> {
259 // Validate table name to prevent SQL injection
260 if table_name.is_empty() || table_name.len() > 128 {
261 return Err(DiskQueueError::InvalidTableName(table_name));
262 }
263 if !table_name.chars().all(|c| c.is_alphanumeric() || c == '_') {
264 return Err(DiskQueueError::InvalidTableName(table_name));
265 }
266
267 let conn = rusqlite::Connection::open(&db_path).map_err(|e| {
268 error!(
269 "Failed to open SQLite database at {}: {}",
270 db_path.as_ref().display(),
271 e
272 );
273 e
274 })?;
275
276 // Enable WAL mode for better concurrency and crash safety
277 conn.pragma_update(None, "journal_mode", "WAL")
278 .map_err(|e| {
279 error!("Failed to enable WAL mode: {}", e);
280 e
281 })?;
282
283 // Set synchronous mode for durability guarantees
284 conn.pragma_update(None, "synchronous", durability.as_str())
285 .map_err(|e| {
286 error!(
287 "Failed to set synchronous mode to {}: {}",
288 durability.as_str(),
289 e
290 );
291 e
292 })?;
293
294 debug!(
295 "Configured SQLite with WAL mode and synchronous={}",
296 durability.as_str()
297 );
298
299 // Enable foreign key constraints
300 conn.pragma_update(None, "foreign_keys", true)
301 .map_err(|e| {
302 error!("Failed to enable foreign keys: {}", e);
303 e
304 })?;
305
306 // Set busy timeout to handle concurrent access
307 conn.busy_timeout(Duration::from_secs(5)).map_err(|e| {
308 error!("Failed to set busy timeout: {}", e);
309 e
310 })?;
311
312 // Create table if it doesn't exist
313 let create_table_sql = format!(
314 "CREATE TABLE IF NOT EXISTS {table_name} (
315 id INTEGER PRIMARY KEY AUTOINCREMENT,
316 data BLOB NOT NULL,
317 created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now'))
318 )"
319 );
320 conn.execute(&create_table_sql, []).map_err(|e| {
321 error!("Failed to create table {}: {}", table_name, e);
322 e
323 })?;
324
325 // Create index on created_at for efficient ordering
326 let create_index_sql = format!(
327 "CREATE INDEX IF NOT EXISTS idx_{table_name}_created_at ON {table_name} (created_at, id)"
328 );
329 conn.execute(&create_index_sql, []).map_err(|e| {
330 error!("Failed to create index for table {}: {}", table_name, e);
331 e
332 })?;
333
334 debug!(
335 "Successfully initialized disk-backed queue table: {}",
336 table_name
337 );
338
339 // Open dead letter queue database
340 let dlq_path = db_path.as_ref().with_extension("dlq.db");
341 let dlq_conn = rusqlite::Connection::open(&dlq_path).map_err(|e| {
342 error!(
343 "Failed to open DLQ database at {}: {}",
344 dlq_path.display(),
345 e
346 );
347 e
348 })?;
349
350 // Configure DLQ database with same durability settings
351 dlq_conn
352 .pragma_update(None, "journal_mode", "WAL")
353 .map_err(|e| {
354 error!("Failed to enable WAL mode for DLQ: {}", e);
355 e
356 })?;
357
358 dlq_conn
359 .pragma_update(None, "synchronous", durability.as_str())
360 .map_err(|e| {
361 error!(
362 "Failed to set synchronous mode for DLQ to {}: {}",
363 durability.as_str(),
364 e
365 );
366 e
367 })?;
368
369 dlq_conn.busy_timeout(Duration::from_secs(5)).map_err(|e| {
370 error!("Failed to set busy timeout for DLQ: {}", e);
371 e
372 })?;
373
374 // Create DLQ table with error information
375 let dlq_table_sql = format!(
376 "CREATE TABLE IF NOT EXISTS {table_name}_dlq (
377 id INTEGER PRIMARY KEY AUTOINCREMENT,
378 original_id INTEGER,
379 data BLOB NOT NULL,
380 error_message TEXT NOT NULL,
381 created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')),
382 moved_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now'))
383 )"
384 );
385 dlq_conn.execute(&dlq_table_sql, []).map_err(|e| {
386 error!("Failed to create DLQ table {}_dlq: {}", table_name, e);
387 e
388 })?;
389
390 debug!(
391 "Successfully initialized dead letter queue for table: {}",
392 table_name
393 );
394
395 let queries = CachedQueries {
396 insert_sql: format!("INSERT INTO {table_name} (data) VALUES (?)"),
397 select_sql: format!(
398 "SELECT id, data FROM {table_name} ORDER BY created_at ASC, id ASC LIMIT 1"
399 ),
400 delete_sql: format!("DELETE FROM {table_name} WHERE id = ?"),
401 count_sql: format!("SELECT COUNT(*) FROM {table_name}"),
402 clear_sql: format!("DELETE FROM {table_name}"),
403 };
404
405 let dlq_insert_sql = format!(
406 "INSERT INTO {table_name}_dlq (original_id, data, error_message) VALUES (?, ?, ?)"
407 );
408
409 Ok(Self {
410 db: Arc::new(Mutex::new(conn)),
411 dlq_db: Arc::new(Mutex::new(dlq_conn)),
412 queries,
413 dlq_insert_sql,
414 table_name,
415 max_size,
416 _phantom: PhantomData,
417 })
418 }
419
420 /// Send a single message to the queue.
421 ///
422 /// The message is serialized using bincode and persisted to SQLite before this
423 /// method returns. If the queue has a `max_size` configured and the queue is full,
424 /// this method will block with exponential backoff until space becomes available.
425 ///
426 /// # Performance
427 ///
428 /// Single sends are limited by fsync overhead. For high throughput, use
429 /// [`send_batch`](Self::send_batch) instead, which is approximately 50x faster.
430 ///
431 /// # Example
432 ///
433 /// ```no_run
434 /// # use disk_backed_queue::DiskBackedQueue;
435 /// # use serde::{Deserialize, Serialize};
436 /// # #[derive(Serialize, Deserialize)]
437 /// # struct Message { data: String }
438 /// # #[tokio::main]
439 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
440 /// # let queue = DiskBackedQueue::new("queue.db", "messages".to_string(), None).await?;
441 /// queue.send(Message { data: "hello".to_string() }).await?;
442 /// # Ok(())
443 /// # }
444 /// ```
445 #[instrument(skip_all, fields(table_name = %self.table_name))]
446 pub async fn send(&self, item: T) -> Result<()> {
447 let config = bincode::config::standard();
448 let serialized = bincode::serde::encode_to_vec(&item, config).map_err(|e| {
449 error!("Failed to serialize item for queue: {}", e);
450 DiskQueueError::Serialization(e)
451 })?;
452
453 let db = self.db.clone();
454 let insert_sql = self.queries.insert_sql.clone();
455 let count_sql = self.queries.count_sql.clone();
456 let max_size = self.max_size;
457 let table_name = self.table_name.clone();
458
459 tokio::task::spawn_blocking(move || {
460 // Implement backpressure if max_size is configured
461 if let Some(max) = max_size {
462 let mut backoff = Duration::from_millis(10);
463 loop {
464 let conn = db.lock().map_err(|e| {
465 error!("Failed to acquire database lock: {}", e);
466 DiskQueueError::TaskJoin(format!("Mutex poisoned: {}", e))
467 })?;
468 let count: i64 = conn
469 .query_row(&count_sql, [], |row| row.get(0))
470 .map_err(DiskQueueError::Database)?;
471
472 if (count as usize) < max {
473 break;
474 }
475
476 // Queue is full, release lock and wait
477 drop(conn);
478 warn!(
479 table_name = %table_name,
480 current_size = count,
481 max_size = max,
482 "Queue is full, waiting for space..."
483 );
484 std::thread::sleep(backoff);
485 backoff = std::cmp::min(backoff * 2, Duration::from_secs(1));
486 }
487 }
488
489 let conn = db.lock().map_err(|e| {
490 error!("Failed to acquire database lock: {}", e);
491 DiskQueueError::TaskJoin(format!("Mutex poisoned: {}", e))
492 })?;
493 let rows_affected = conn.execute(&insert_sql, [&serialized]).map_err(|e| {
494 error!(
495 "Failed to insert item into queue table {}: {}",
496 table_name, e
497 );
498 DiskQueueError::Database(e)
499 })?;
500
501 if rows_affected != 1 {
502 error!("Expected to insert 1 row, but inserted {}", rows_affected);
503 return Err(DiskQueueError::UnexpectedRowCount(format!(
504 "Insert affected {rows_affected} rows instead of 1"
505 )));
506 }
507
508 debug!("Successfully enqueued item to disk queue");
509 Ok(())
510 })
511 .await
512 .map_err(|e| DiskQueueError::TaskJoin(e.to_string()))?
513 }
514
515 /// Send multiple messages in a single transaction for much better throughput.
516 ///
517 /// All messages in the batch are inserted atomically - either all succeed or all fail.
518 /// This is significantly faster than individual sends (approximately 50x speedup) because
519 /// it uses a single SQLite transaction and fsync.
520 ///
521 /// # Performance
522 ///
523 /// Batch operations amortize the fsync overhead across all messages in the batch.
524 ///
525 /// # Example
526 ///
527 /// ```no_run
528 /// # use disk_backed_queue::DiskBackedQueue;
529 /// # use serde::{Deserialize, Serialize};
530 /// # #[derive(Serialize, Deserialize)]
531 /// # struct Message { data: String }
532 /// # #[tokio::main]
533 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
534 /// # let queue = DiskBackedQueue::new("queue.db", "messages".to_string(), None).await?;
535 /// let messages: Vec<Message> = (0..100)
536 /// .map(|i| Message { data: format!("msg_{}", i) })
537 /// .collect();
538 ///
539 /// queue.send_batch(messages).await?;
540 /// # Ok(())
541 /// # }
542 /// ```
543 #[instrument(skip_all, fields(table_name = %self.table_name, batch_size = items.len()))]
544 pub async fn send_batch(&self, items: Vec<T>) -> Result<()> {
545 if items.is_empty() {
546 return Ok(());
547 }
548
549 let config = bincode::config::standard();
550 let mut serialized_items = Vec::with_capacity(items.len());
551
552 // Serialize all items first (outside spawn_blocking for error handling)
553 for item in items {
554 let serialized = bincode::serde::encode_to_vec(&item, config).map_err(|e| {
555 error!("Failed to serialize item for batch queue: {}", e);
556 DiskQueueError::Serialization(e)
557 })?;
558 serialized_items.push(serialized);
559 }
560
561 let db = self.db.clone();
562 let insert_sql = self.queries.insert_sql.clone();
563 let count_sql = self.queries.count_sql.clone();
564 let max_size = self.max_size;
565 let table_name = self.table_name.clone();
566 let batch_size = serialized_items.len();
567
568 tokio::task::spawn_blocking(move || {
569 // Implement backpressure if max_size is configured
570 if let Some(max) = max_size {
571 let mut backoff = Duration::from_millis(10);
572 loop {
573 let conn = db.lock().map_err(|e| {
574 error!("Failed to acquire database lock: {}", e);
575 DiskQueueError::TaskJoin(format!("Mutex poisoned: {}", e))
576 })?;
577 let count: i64 = conn
578 .query_row(&count_sql, [], |row| row.get(0))
579 .map_err(DiskQueueError::Database)?;
580
581 // Check if we have space for the entire batch
582 if (count as usize) + batch_size <= max {
583 break;
584 }
585
586 // Queue is full, release lock and wait
587 drop(conn);
588 warn!(
589 table_name = %table_name,
590 current_size = count,
591 max_size = max,
592 batch_size = batch_size,
593 "Queue is full, waiting for space for batch..."
594 );
595 std::thread::sleep(backoff);
596 backoff = std::cmp::min(backoff * 2, Duration::from_secs(1));
597 }
598 }
599
600 let mut conn = db.lock().map_err(|e| {
601 error!("Failed to acquire database lock: {}", e);
602 DiskQueueError::TaskJoin(format!("Mutex poisoned: {}", e))
603 })?;
604
605 // Use a transaction for atomic batch insert
606 let tx = conn.transaction().map_err(|e| {
607 error!("Failed to start transaction for batch insert: {}", e);
608 DiskQueueError::Database(e)
609 })?;
610
611 // Insert all items in the transaction
612 for serialized in &serialized_items {
613 tx.execute(&insert_sql, [serialized]).map_err(|e| {
614 error!(
615 "Failed to insert item into queue table {} during batch: {}",
616 table_name, e
617 );
618 DiskQueueError::Database(e)
619 })?;
620 }
621
622 // Commit the transaction
623 tx.commit().map_err(|e| {
624 error!(
625 "Failed to commit batch transaction for table {}: {}",
626 table_name, e
627 );
628 DiskQueueError::Database(e)
629 })?;
630
631 debug!(
632 "Successfully enqueued {} items to disk queue in batch",
633 batch_size
634 );
635 Ok(())
636 })
637 .await
638 .map_err(|e| DiskQueueError::TaskJoin(e.to_string()))?
639 }
640
641 /// Receive a single message from the queue.
642 ///
643 /// Returns the oldest message in FIFO order and removes it from the queue atomically.
644 /// If the queue is empty, returns `Ok(None)` immediately (non-blocking).
645 ///
646 /// # Dead Letter Queue
647 ///
648 /// If a message fails to deserialize (e.g., due to schema changes or corruption),
649 /// it is automatically moved to the dead letter queue database (`.dlq.db` file)
650 /// and a `Deserialization` error is returned. The queue is not blocked - subsequent
651 /// calls will process the next message.
652 ///
653 /// # Example
654 ///
655 /// ```no_run
656 /// # use disk_backed_queue::DiskBackedQueue;
657 /// # use serde::{Deserialize, Serialize};
658 /// # #[derive(Serialize, Deserialize)]
659 /// # struct Message { data: String }
660 /// # #[tokio::main]
661 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
662 /// # let queue = DiskBackedQueue::new("queue.db", "messages".to_string(), None).await?;
663 /// match queue.recv().await? {
664 /// Some(msg) => println!("Received: {:?}", msg),
665 /// None => println!("Queue is empty"),
666 /// }
667 /// # Ok(())
668 /// # }
669 /// ```
670 #[instrument(skip_all, fields(table_name = %self.table_name))]
671 pub async fn recv(&self) -> Result<Option<T>> {
672 let db = self.db.clone();
673 let dlq_db = self.dlq_db.clone();
674 let select_sql = self.queries.select_sql.clone();
675 let delete_sql = self.queries.delete_sql.clone();
676 let dlq_insert_sql = self.dlq_insert_sql.clone();
677 let table_name = self.table_name.clone();
678
679 tokio::task::spawn_blocking(move || {
680 let mut conn = db.lock().map_err(|e| {
681 error!("Failed to acquire database lock: {}", e);
682 DiskQueueError::TaskJoin(format!("Mutex poisoned: {}", e))
683 })?;
684
685 // Start a transaction for atomic read-delete
686 let tx = conn.transaction().map_err(|e| {
687 error!(
688 "Failed to start transaction for table {}: {}",
689 table_name, e
690 );
691 DiskQueueError::Database(e)
692 })?;
693
694 // Read the oldest item
695 let result: Option<(i64, Vec<u8>)> = tx
696 .query_row(&select_sql, [], |row| Ok((row.get(0)?, row.get(1)?)))
697 .optional()
698 .map_err(|e| {
699 error!(
700 "Failed to execute SELECT query on table {}: {}",
701 table_name, e
702 );
703 DiskQueueError::Database(e)
704 })?;
705
706 if let Some((id, data)) = result {
707 // Try to deserialize BEFORE deleting
708 let config = bincode::config::standard();
709 let item: T = match bincode::serde::decode_from_slice(&data, config) {
710 Ok((item, _len)) => item,
711 Err(e) => {
712 // Deserialization failed - move to dead letter queue
713 error!(
714 "Failed to deserialize item from queue (id {}): {}. Moving to DLQ.",
715 id, e
716 );
717
718 // Insert into DLQ
719 match dlq_db.lock() {
720 Ok(dlq_conn) => {
721 if let Err(dlq_error) = dlq_conn.execute(
722 &dlq_insert_sql,
723 rusqlite::params![id, &data, e.to_string()],
724 ) {
725 error!(
726 "CRITICAL: Failed to insert corrupt item {} into DLQ for table {}: {}. Item will be dropped.",
727 id, table_name, dlq_error
728 );
729 }
730 }
731 Err(poison_err) => {
732 error!(
733 "CRITICAL: DLQ mutex poisoned while handling corrupt item {}: {}. Item will be dropped.",
734 id, poison_err
735 );
736 }
737 }
738
739 // Delete from main queue
740 tx.execute(&delete_sql, [&id]).map_err(|e| {
741 error!(
742 "Failed to delete item {} from table {}: {}",
743 id, table_name, e
744 );
745 DiskQueueError::Database(e)
746 })?;
747
748 // Commit the transaction
749 tx.commit().map_err(|e| {
750 error!(
751 "Failed to commit transaction after DLQ move for table {}: {}",
752 table_name, e
753 );
754 DiskQueueError::Database(e)
755 })?;
756
757 // Return deserialization error to signal corruption
758 return Err(DiskQueueError::Deserialization(e));
759 }
760 };
761
762 // Delete the item from the queue
763 let rows_deleted = tx.execute(&delete_sql, [&id]).map_err(|e| {
764 error!(
765 "Failed to delete item {} from table {}: {}",
766 id, table_name, e
767 );
768 DiskQueueError::Database(e)
769 })?;
770
771 if rows_deleted != 1 {
772 error!(
773 "Expected to delete 1 row, but deleted {} rows for id {}",
774 rows_deleted, id
775 );
776 return Err(DiskQueueError::UnexpectedRowCount(format!(
777 "Delete affected {rows_deleted} rows instead of 1 for id {id}"
778 )));
779 }
780
781 // Commit the transaction - if this fails, message stays in queue
782 tx.commit().map_err(|e| {
783 error!(
784 "Failed to commit transaction for table {}: {}",
785 table_name, e
786 );
787 DiskQueueError::Database(e)
788 })?;
789
790 debug!("Successfully dequeued item from disk queue");
791 Ok(Some(item))
792 } else {
793 // No items in queue
794 Ok(None)
795 }
796 })
797 .await
798 .map_err(|e| DiskQueueError::TaskJoin(e.to_string()))?
799 }
800
801 /// Receive multiple messages in a single transaction for much better throughput.
802 ///
803 /// Returns up to `limit` messages from the queue in FIFO order. May return fewer if
804 /// there aren't enough items. Returns an empty `Vec` if the queue is empty.
805 ///
806 /// All messages in the batch are removed atomically - either all succeed or all fail.
807 ///
808 /// # Dead Letter Queue
809 ///
810 /// If any message fails to deserialize, it is moved to the DLQ and skipped.
811 /// The batch will contain only successfully deserialized messages. The transaction
812 /// still commits successfully.
813 ///
814 /// # Performance
815 ///
816 /// Batch operations are approximately 50x faster than individual receives due to
817 /// reduced transaction overhead.
818 ///
819 /// # Example
820 ///
821 /// ```no_run
822 /// # use disk_backed_queue::DiskBackedQueue;
823 /// # use serde::{Deserialize, Serialize};
824 /// # #[derive(Serialize, Deserialize)]
825 /// # struct Message { data: String }
826 /// # #[tokio::main]
827 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
828 /// # let queue = DiskBackedQueue::new("queue.db", "messages".to_string(), None).await?;
829 /// // Receive up to 100 messages at once
830 /// let messages = queue.recv_batch(100).await?;
831 /// println!("Received {} messages", messages.len());
832 /// # Ok(())
833 /// # }
834 /// ```
835 #[instrument(skip_all, fields(table_name = %self.table_name, limit = limit))]
836 pub async fn recv_batch(&self, limit: usize) -> Result<Vec<T>> {
837 if limit == 0 {
838 return Ok(Vec::new());
839 }
840
841 let db = self.db.clone();
842 let dlq_db = self.dlq_db.clone();
843 let table_name = self.table_name.clone();
844 let dlq_insert_sql = self.dlq_insert_sql.clone();
845
846 tokio::task::spawn_blocking(move || {
847 let mut conn = db.lock().map_err(|e| {
848 error!("Failed to acquire database lock: {}", e);
849 DiskQueueError::TaskJoin(format!("Mutex poisoned: {}", e))
850 })?;
851
852 // Start a transaction for atomic batch read-delete
853 let tx = conn.transaction().map_err(|e| {
854 error!("Failed to start transaction for table {}: {}", table_name, e);
855 DiskQueueError::Database(e)
856 })?;
857
858 // Read up to `limit` oldest items
859 // Safety: `table_name` is validated at construction time to contain only
860 // alphanumeric characters and underscores, preventing SQL injection.
861 // `limit` is a usize, so it can only contain numeric values.
862 let select_batch_sql = format!(
863 "SELECT id, data FROM {} ORDER BY created_at ASC, id ASC LIMIT {}",
864 table_name, limit
865 );
866
867 let mut stmt = tx.prepare(&select_batch_sql).map_err(|e| {
868 error!("Failed to prepare SELECT statement for table {}: {}", table_name, e);
869 DiskQueueError::Database(e)
870 })?;
871
872 let rows = stmt
873 .query_map([], |row| Ok((row.get::<_, i64>(0)?, row.get::<_, Vec<u8>>(1)?)))
874 .map_err(|e| {
875 error!("Failed to execute SELECT query on table {}: {}", table_name, e);
876 DiskQueueError::Database(e)
877 })?;
878
879 let mut items = Vec::new();
880 let mut ids_to_delete = Vec::new();
881 let config = bincode::config::standard();
882
883 for row_result in rows {
884 let (id, data) = row_result.map_err(|e| {
885 error!("Failed to read row from table {}: {}", table_name, e);
886 DiskQueueError::Database(e)
887 })?;
888
889 // Try to deserialize
890 match bincode::serde::decode_from_slice(&data, config) {
891 Ok((item, _len)) => {
892 items.push(item);
893 ids_to_delete.push(id);
894 }
895 Err(e) => {
896 // Deserialization failed - move to dead letter queue
897 error!(
898 "Failed to deserialize item from queue (id {}): {}. Moving to DLQ.",
899 id, e
900 );
901
902 // Insert into DLQ (outside main transaction)
903 match dlq_db.lock() {
904 Ok(dlq_conn) => {
905 if let Err(dlq_error) = dlq_conn.execute(
906 &dlq_insert_sql,
907 rusqlite::params![id, &data, e.to_string()],
908 ) {
909 error!(
910 "CRITICAL: Failed to insert corrupt item {} into DLQ for table {}: {}. Item will be dropped.",
911 id, table_name, dlq_error
912 );
913 }
914 }
915 Err(poison_err) => {
916 error!(
917 "CRITICAL: DLQ mutex poisoned while handling corrupt item {}: {}. Item will be dropped.",
918 id, poison_err
919 );
920 }
921 }
922
923 // Still need to delete from main queue to avoid poison pill loop
924 ids_to_delete.push(id);
925 }
926 }
927 }
928
929 drop(stmt); // Release the statement before executing deletes
930
931 // Delete all processed items
932 if !ids_to_delete.is_empty() {
933 // SQLite limit for host parameters is typically 999 or higher, but 900 is safe
934 const BATCH_DELETE_LIMIT: usize = 900;
935
936 for chunk in ids_to_delete.chunks(BATCH_DELETE_LIMIT) {
937 // Safety: `table_name` is validated at construction time to contain only
938 // alphanumeric characters and underscores, preventing SQL injection.
939 // The placeholders (?) are properly parameterized below.
940 let delete_batch_sql = format!(
941 "DELETE FROM {} WHERE id IN ({})",
942 table_name,
943 chunk
944 .iter()
945 .map(|_| "?")
946 .collect::<Vec<_>>()
947 .join(",")
948 );
949
950 let params: Vec<&dyn rusqlite::ToSql> = chunk
951 .iter()
952 .map(|id| id as &dyn rusqlite::ToSql)
953 .collect();
954
955 let rows_deleted = tx
956 .execute(&delete_batch_sql, params.as_slice())
957 .map_err(|e| {
958 error!("Failed to delete items from table {}: {}", table_name, e);
959 DiskQueueError::Database(e)
960 })?;
961
962 if rows_deleted != chunk.len() {
963 error!(
964 "Expected to delete {} rows, but deleted {} rows",
965 chunk.len(),
966 rows_deleted
967 );
968 return Err(DiskQueueError::UnexpectedRowCount(format!(
969 "Delete affected {rows_deleted} rows instead of {}",
970 chunk.len()
971 )));
972 }
973 }
974 }
975
976 // Commit the transaction
977 tx.commit().map_err(|e| {
978 error!("Failed to commit batch transaction for table {}: {}", table_name, e);
979 DiskQueueError::Database(e)
980 })?;
981
982 debug!("Successfully dequeued {} items from disk queue in batch", items.len());
983 Ok(items)
984 })
985 .await
986 .map_err(|e| DiskQueueError::TaskJoin(e.to_string()))?
987 }
988
989 /// Returns the number of messages currently in the queue.
990 ///
991 /// # Example
992 ///
993 /// ```no_run
994 /// # use disk_backed_queue::DiskBackedQueue;
995 /// # use serde::{Deserialize, Serialize};
996 /// # #[derive(Serialize, Deserialize)]
997 /// # struct Message { data: String }
998 /// # #[tokio::main]
999 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1000 /// # let queue = DiskBackedQueue::new("queue.db", "messages".to_string(), None).await?;
1001 /// let count = queue.len().await?;
1002 /// println!("Queue has {} messages", count);
1003 /// # Ok(())
1004 /// # }
1005 /// ```
1006 #[instrument(skip_all, fields(table_name = %self.table_name))]
1007 pub async fn len(&self) -> Result<usize> {
1008 let db = self.db.clone();
1009 let count_sql = self.queries.count_sql.clone();
1010
1011 tokio::task::spawn_blocking(move || {
1012 let conn = db.lock().map_err(|e| {
1013 error!("Failed to acquire database lock: {}", e);
1014 DiskQueueError::TaskJoin(format!("Mutex poisoned: {}", e))
1015 })?;
1016 let count: i64 = conn
1017 .query_row(&count_sql, [], |row| row.get(0))
1018 .map_err(DiskQueueError::Database)?;
1019 Ok(count as usize)
1020 })
1021 .await
1022 .map_err(|e| DiskQueueError::TaskJoin(e.to_string()))?
1023 }
1024
1025 /// Returns `true` if the queue contains no messages.
1026 ///
1027 /// # Example
1028 ///
1029 /// ```no_run
1030 /// # use disk_backed_queue::DiskBackedQueue;
1031 /// # use serde::{Deserialize, Serialize};
1032 /// # #[derive(Serialize, Deserialize)]
1033 /// # struct Message { data: String }
1034 /// # #[tokio::main]
1035 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1036 /// # let queue = DiskBackedQueue::new("queue.db", "messages".to_string(), None).await?;
1037 /// if queue.is_empty().await? {
1038 /// println!("No messages to process");
1039 /// }
1040 /// # Ok(())
1041 /// # }
1042 /// ```
1043 #[instrument(skip_all, fields(table_name = %self.table_name))]
1044 pub async fn is_empty(&self) -> Result<bool> {
1045 Ok(self.len().await? == 0)
1046 }
1047
1048 /// Removes all messages from the queue.
1049 ///
1050 /// This operation is atomic - either all messages are deleted or none are.
1051 ///
1052 /// # Example
1053 ///
1054 /// ```no_run
1055 /// # use disk_backed_queue::DiskBackedQueue;
1056 /// # use serde::{Deserialize, Serialize};
1057 /// # #[derive(Serialize, Deserialize)]
1058 /// # struct Message { data: String }
1059 /// # #[tokio::main]
1060 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1061 /// # let queue = DiskBackedQueue::new("queue.db", "messages".to_string(), None).await?;
1062 /// queue.clear().await?;
1063 /// assert!(queue.is_empty().await?);
1064 /// # Ok(())
1065 /// # }
1066 /// ```
1067 #[instrument(skip_all, fields(table_name = %self.table_name))]
1068 pub async fn clear(&self) -> Result<()> {
1069 let db = self.db.clone();
1070 let clear_sql = self.queries.clear_sql.clone();
1071
1072 tokio::task::spawn_blocking(move || {
1073 let conn = db.lock().map_err(|e| {
1074 error!("Failed to acquire database lock: {}", e);
1075 DiskQueueError::TaskJoin(format!("Mutex poisoned: {}", e))
1076 })?;
1077 conn.execute(&clear_sql, [])
1078 .map_err(DiskQueueError::Database)?;
1079 debug!("Cleared disk queue");
1080 Ok(())
1081 })
1082 .await
1083 .map_err(|e| DiskQueueError::TaskJoin(e.to_string()))?
1084 }
1085}
1086
1087/// The sending half of a disk-backed channel.
1088///
1089/// Messages sent through this sender are persisted to SQLite. The sender can be
1090/// cloned and shared across multiple tasks or threads safely.
1091///
1092/// Created via [`disk_backed_channel`] or [`disk_backed_channel_with_durability`].
1093///
1094/// # Example
1095///
1096/// ```no_run
1097/// # use disk_backed_queue::disk_backed_channel;
1098/// # use serde::{Deserialize, Serialize};
1099/// # #[derive(Serialize, Deserialize)]
1100/// # struct Message { data: String }
1101/// # #[tokio::main]
1102/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1103/// let (tx, mut rx) = disk_backed_channel::<Message, _>(
1104/// "queue.db",
1105/// "messages".to_string(),
1106/// None,
1107/// ).await?;
1108///
1109/// // Sender can be cloned and moved to other tasks
1110/// let tx2 = tx.clone();
1111/// tokio::spawn(async move {
1112/// tx2.send(Message { data: "from another task".to_string() }).await
1113/// });
1114/// # Ok(())
1115/// # }
1116/// ```
1117#[derive(Debug)]
1118pub struct DiskBackedSender<T> {
1119 queue: std::sync::Arc<DiskBackedQueue<T>>,
1120}
1121
1122impl<T> DiskBackedSender<T>
1123where
1124 T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static,
1125{
1126 /// Send a single message to the queue.
1127 ///
1128 /// See [`DiskBackedQueue::send`] for details.
1129 ///
1130 /// # Example
1131 ///
1132 /// ```no_run
1133 /// # use disk_backed_queue::disk_backed_channel;
1134 /// # use serde::{Deserialize, Serialize};
1135 /// # #[derive(Serialize, Deserialize)]
1136 /// # struct Message { data: String }
1137 /// # #[tokio::main]
1138 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1139 /// # let (tx, mut rx) = disk_backed_channel::<Message, _>("queue.db", "messages".to_string(), None).await?;
1140 /// tx.send(Message { data: "hello".to_string() }).await?;
1141 /// # Ok(())
1142 /// # }
1143 /// ```
1144 pub async fn send(&self, item: T) -> Result<()> {
1145 self.queue.send(item).await
1146 }
1147
1148 /// Send multiple messages in a single transaction for much better throughput.
1149 ///
1150 /// See [`DiskBackedQueue::send_batch`] for details.
1151 ///
1152 /// # Example
1153 ///
1154 /// ```no_run
1155 /// # use disk_backed_queue::disk_backed_channel;
1156 /// # use serde::{Deserialize, Serialize};
1157 /// # #[derive(Serialize, Deserialize)]
1158 /// # struct Message { data: String }
1159 /// # #[tokio::main]
1160 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1161 /// # let (tx, mut rx) = disk_backed_channel::<Message, _>("queue.db", "messages".to_string(), None).await?;
1162 /// let messages = vec![
1163 /// Message { data: "msg1".to_string() },
1164 /// Message { data: "msg2".to_string() },
1165 /// ];
1166 /// tx.send_batch(messages).await?;
1167 /// # Ok(())
1168 /// # }
1169 /// ```
1170 pub async fn send_batch(&self, items: Vec<T>) -> Result<()> {
1171 self.queue.send_batch(items).await
1172 }
1173
1174 /// Blocking send for use in synchronous contexts.
1175 ///
1176 /// This method blocks the current thread until the message is persisted.
1177 /// If called from within a Tokio runtime, it uses `block_in_place` to avoid
1178 /// blocking the async executor. If called outside a runtime, it creates a
1179 /// temporary runtime.
1180 ///
1181 /// # Performance
1182 ///
1183 /// This should be used sparingly as it blocks a thread. Prefer async `send`
1184 /// when possible.
1185 ///
1186 /// # Example
1187 ///
1188 /// ```no_run
1189 /// # use disk_backed_queue::disk_backed_channel;
1190 /// # use serde::{Deserialize, Serialize};
1191 /// # #[derive(Serialize, Deserialize)]
1192 /// # struct Message { data: String }
1193 /// # #[tokio::main]
1194 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1195 /// # let (tx, mut rx) = disk_backed_channel::<Message, _>("queue.db", "messages".to_string(), None).await?;
1196 /// // In a synchronous context
1197 /// std::thread::spawn(move || {
1198 /// tx.blocking_send(Message { data: "from sync".to_string() })
1199 /// });
1200 /// # Ok(())
1201 /// # }
1202 /// ```
1203 pub fn blocking_send(&self, item: T) -> Result<()> {
1204 // Use block_in_place if we're in a runtime context, otherwise create a new runtime
1205 if tokio::runtime::Handle::try_current().is_ok() {
1206 tokio::task::block_in_place(|| {
1207 tokio::runtime::Handle::current().block_on(self.send(item))
1208 })
1209 } else {
1210 // Not in a runtime, create a new one
1211 let runtime = tokio::runtime::Runtime::new().map_err(|e| {
1212 error!("Failed to create Tokio runtime: {}", e);
1213 DiskQueueError::TaskJoin(format!("Runtime creation failed: {}", e))
1214 })?;
1215 runtime.block_on(self.send(item))
1216 }
1217 }
1218}
1219
1220impl<T> Clone for DiskBackedSender<T> {
1221 /// Clone the sender.
1222 ///
1223 /// This is a cheap operation that only clones an `Arc` pointer. All cloned
1224 /// senders share the same underlying queue and can be safely used from
1225 /// multiple tasks or threads.
1226 fn clone(&self) -> Self {
1227 Self {
1228 queue: self.queue.clone(),
1229 }
1230 }
1231}
1232
1233/// The receiving half of a disk-backed channel.
1234///
1235/// Messages are received from SQLite in FIFO order. Unlike the sender, the receiver
1236/// cannot be cloned (similar to `tokio::sync::mpsc::Receiver`).
1237///
1238/// Created via [`disk_backed_channel`] or [`disk_backed_channel_with_durability`].
1239///
1240/// # Example
1241///
1242/// ```no_run
1243/// # use disk_backed_queue::disk_backed_channel;
1244/// # use serde::{Deserialize, Serialize};
1245/// # #[derive(Serialize, Deserialize, Debug)]
1246/// # struct Message { data: String }
1247/// # #[tokio::main]
1248/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1249/// let (tx, mut rx) = disk_backed_channel::<Message, _>(
1250/// "queue.db",
1251/// "messages".to_string(),
1252/// None,
1253/// ).await?;
1254///
1255/// // Receive messages
1256/// while let Some(msg) = rx.recv().await? {
1257/// println!("Received: {:?}", msg);
1258/// }
1259/// # Ok(())
1260/// # }
1261/// ```
1262#[derive(Debug)]
1263pub struct DiskBackedReceiver<T> {
1264 queue: std::sync::Arc<DiskBackedQueue<T>>,
1265}
1266
1267impl<T> DiskBackedReceiver<T>
1268where
1269 T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static,
1270{
1271 /// Receive a single message from the queue.
1272 ///
1273 /// Returns `Ok(None)` if the queue is empty (non-blocking).
1274 /// See [`DiskBackedQueue::recv`] for details about dead letter queue handling.
1275 ///
1276 /// # Example
1277 ///
1278 /// ```no_run
1279 /// # use disk_backed_queue::disk_backed_channel;
1280 /// # use serde::{Deserialize, Serialize};
1281 /// # #[derive(Serialize, Deserialize)]
1282 /// # struct Message { data: String }
1283 /// # #[tokio::main]
1284 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1285 /// # let (tx, mut rx) = disk_backed_channel::<Message, _>("queue.db", "messages".to_string(), None).await?;
1286 /// match rx.recv().await? {
1287 /// Some(msg) => println!("Got message"),
1288 /// None => println!("Queue is empty"),
1289 /// }
1290 /// # Ok(())
1291 /// # }
1292 /// ```
1293 pub async fn recv(&mut self) -> Result<Option<T>> {
1294 self.queue.recv().await
1295 }
1296
1297 /// Receive multiple messages in a single transaction for much better throughput.
1298 ///
1299 /// Returns up to `limit` messages from the queue. May return fewer if there aren't
1300 /// enough items. Returns an empty `Vec` if the queue is empty.
1301 ///
1302 /// See [`DiskBackedQueue::recv_batch`] for details.
1303 ///
1304 /// # Example
1305 ///
1306 /// ```no_run
1307 /// # use disk_backed_queue::disk_backed_channel;
1308 /// # use serde::{Deserialize, Serialize};
1309 /// # #[derive(Serialize, Deserialize)]
1310 /// # struct Message { data: String }
1311 /// # #[tokio::main]
1312 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1313 /// # let (tx, mut rx) = disk_backed_channel::<Message, _>("queue.db", "messages".to_string(), None).await?;
1314 /// // Receive up to 100 messages at once
1315 /// let messages = rx.recv_batch(100).await?;
1316 /// for msg in messages {
1317 /// // Process message
1318 /// }
1319 /// # Ok(())
1320 /// # }
1321 /// ```
1322 pub async fn recv_batch(&mut self, limit: usize) -> Result<Vec<T>> {
1323 self.queue.recv_batch(limit).await
1324 }
1325
1326 /// Returns the number of messages currently in the queue.
1327 ///
1328 /// # Example
1329 ///
1330 /// ```no_run
1331 /// # use disk_backed_queue::disk_backed_channel;
1332 /// # use serde::{Deserialize, Serialize};
1333 /// # #[derive(Serialize, Deserialize)]
1334 /// # struct Message { data: String }
1335 /// # #[tokio::main]
1336 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1337 /// # let (tx, mut rx) = disk_backed_channel::<Message, _>("queue.db", "messages".to_string(), None).await?;
1338 /// let count = rx.len().await?;
1339 /// println!("Queue has {} messages", count);
1340 /// # Ok(())
1341 /// # }
1342 /// ```
1343 pub async fn len(&self) -> Result<usize> {
1344 self.queue.len().await
1345 }
1346
1347 /// Returns `true` if the queue contains no messages.
1348 ///
1349 /// # Example
1350 ///
1351 /// ```no_run
1352 /// # use disk_backed_queue::disk_backed_channel;
1353 /// # use serde::{Deserialize, Serialize};
1354 /// # #[derive(Serialize, Deserialize)]
1355 /// # struct Message { data: String }
1356 /// # #[tokio::main]
1357 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1358 /// # let (tx, mut rx) = disk_backed_channel::<Message, _>("queue.db", "messages".to_string(), None).await?;
1359 /// if rx.is_empty().await? {
1360 /// println!("No messages to process");
1361 /// }
1362 /// # Ok(())
1363 /// # }
1364 /// ```
1365 pub async fn is_empty(&self) -> Result<bool> {
1366 self.queue.is_empty().await
1367 }
1368}
1369
1370/// Create a disk-backed channel with default durability (FULL).
1371///
1372/// This is the recommended way to create a disk-backed queue for most use cases.
1373/// It provides maximum data safety by fsyncing after every commit, ensuring messages
1374/// survive power loss.
1375///
1376/// # Arguments
1377///
1378/// * `db_path` - Path to the SQLite database file. Will be created if it doesn't exist.
1379/// * `table_name` - Name of the table to store messages. Must contain only alphanumeric
1380/// characters and underscores, and be between 1-128 characters.
1381/// * `max_size` - Optional maximum queue size. When set, senders will block when the
1382/// queue is full. Use `None` for unbounded queues.
1383///
1384/// # Returns
1385///
1386/// A tuple of `(sender, receiver)` where the sender can be cloned and shared across
1387/// tasks, while the receiver is single-threaded.
1388///
1389/// # Example
1390///
1391/// ```no_run
1392/// use disk_backed_queue::disk_backed_channel;
1393/// use serde::{Deserialize, Serialize};
1394///
1395/// #[derive(Serialize, Deserialize, Debug)]
1396/// struct Message {
1397/// id: u64,
1398/// content: String,
1399/// }
1400///
1401/// #[tokio::main]
1402/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
1403/// let (tx, mut rx) = disk_backed_channel::<Message, _>(
1404/// "my_queue.db",
1405/// "messages".to_string(),
1406/// Some(10_000), // Max 10,000 messages
1407/// ).await?;
1408///
1409/// // Send from multiple tasks
1410/// let tx2 = tx.clone();
1411/// tokio::spawn(async move {
1412/// tx2.send(Message { id: 1, content: "hello".to_string() }).await
1413/// });
1414///
1415/// // Receive from single task
1416/// while let Some(msg) = rx.recv().await? {
1417/// println!("Received: {:?}", msg);
1418/// }
1419///
1420/// Ok(())
1421/// }
1422/// ```
1423pub async fn disk_backed_channel<T, P: AsRef<Path>>(
1424 db_path: P,
1425 table_name: String,
1426 max_size: Option<usize>,
1427) -> Result<(DiskBackedSender<T>, DiskBackedReceiver<T>)>
1428where
1429 T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static,
1430{
1431 disk_backed_channel_with_durability(db_path, table_name, max_size, DurabilityLevel::default())
1432 .await
1433}
1434
1435/// Create a disk-backed channel with custom durability level.
1436///
1437/// This allows you to trade off between performance and data safety by choosing
1438/// different SQLite synchronous modes.
1439///
1440/// # Durability Levels
1441///
1442/// * [`DurabilityLevel::Full`] (default) - Maximum safety, survives power loss
1443/// * [`DurabilityLevel::Normal`] - Balanced performance, safe in most crashes
1444/// * [`DurabilityLevel::Off`] - Fastest, but data loss on power failure
1445/// * [`DurabilityLevel::Extra`] - Paranoid mode, additional fsyncs
1446///
1447/// # Example
1448///
1449/// ```no_run
1450/// use disk_backed_queue::{disk_backed_channel_with_durability, DurabilityLevel};
1451/// use serde::{Deserialize, Serialize};
1452///
1453/// #[derive(Serialize, Deserialize)]
1454/// struct Message { data: String }
1455///
1456/// #[tokio::main]
1457/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
1458/// // For critical data (default)
1459/// let (tx, rx) = disk_backed_channel_with_durability::<Message, _>(
1460/// "critical.db",
1461/// "messages".to_string(),
1462/// None,
1463/// DurabilityLevel::Full,
1464/// ).await?;
1465///
1466/// // For high-performance temporary cache
1467/// let (tx_fast, rx_fast) = disk_backed_channel_with_durability::<Message, _>(
1468/// "cache.db",
1469/// "temp".to_string(),
1470/// None,
1471/// DurabilityLevel::Normal,
1472/// ).await?;
1473///
1474/// Ok(())
1475/// }
1476/// ```
1477pub async fn disk_backed_channel_with_durability<T, P: AsRef<Path>>(
1478 db_path: P,
1479 table_name: String,
1480 max_size: Option<usize>,
1481 durability: DurabilityLevel,
1482) -> Result<(DiskBackedSender<T>, DiskBackedReceiver<T>)>
1483where
1484 T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static,
1485{
1486 let queue = DiskBackedQueue::with_durability(db_path, table_name, max_size, durability).await?;
1487 let queue_arc = std::sync::Arc::new(queue);
1488
1489 let sender = DiskBackedSender {
1490 queue: queue_arc.clone(),
1491 };
1492
1493 let receiver = DiskBackedReceiver { queue: queue_arc };
1494
1495 Ok((sender, receiver))
1496}
1497
1498#[cfg(test)]
1499mod tests {
1500 use super::*;
1501 use serde_derive::{Deserialize, Serialize};
1502 use tempfile::NamedTempFile;
1503
1504 #[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
1505 struct TestMessage {
1506 id: u64,
1507 content: String,
1508 }
1509
1510 #[tokio::test]
1511 async fn test_disk_backed_queue_basic() {
1512 let temp_file = NamedTempFile::new().unwrap();
1513 let queue = DiskBackedQueue::new(temp_file.path(), "test_queue".to_string(), None)
1514 .await
1515 .unwrap();
1516
1517 // Test empty queue
1518 assert!(queue.is_empty().await.unwrap());
1519 assert_eq!(queue.len().await.unwrap(), 0);
1520 assert!(queue.recv().await.unwrap().is_none());
1521
1522 // Send some messages
1523 let msg1 = TestMessage {
1524 id: 1,
1525 content: "Hello".to_string(),
1526 };
1527 let msg2 = TestMessage {
1528 id: 2,
1529 content: "World".to_string(),
1530 };
1531
1532 queue.send(msg1.clone()).await.unwrap();
1533 queue.send(msg2.clone()).await.unwrap();
1534
1535 // Test queue state
1536 assert!(!queue.is_empty().await.unwrap());
1537 assert_eq!(queue.len().await.unwrap(), 2);
1538
1539 // Receive messages in FIFO order
1540 let received1 = queue.recv().await.unwrap().unwrap();
1541 assert_eq!(received1, msg1);
1542 assert_eq!(queue.len().await.unwrap(), 1);
1543
1544 let received2 = queue.recv().await.unwrap().unwrap();
1545 assert_eq!(received2, msg2);
1546 assert_eq!(queue.len().await.unwrap(), 0);
1547
1548 // Queue should be empty now
1549 assert!(queue.is_empty().await.unwrap());
1550 assert!(queue.recv().await.unwrap().is_none());
1551 }
1552
1553 #[tokio::test]
1554 async fn test_disk_backed_channel() {
1555 let temp_file = NamedTempFile::new().unwrap();
1556 let (sender, mut receiver) = disk_backed_channel::<TestMessage, _>(
1557 temp_file.path(),
1558 "test_channel".to_string(),
1559 None,
1560 )
1561 .await
1562 .unwrap();
1563
1564 let msg = TestMessage {
1565 id: 42,
1566 content: "Channel test".to_string(),
1567 };
1568
1569 sender.send(msg.clone()).await.unwrap();
1570 let received = receiver.recv().await.unwrap().unwrap();
1571 assert_eq!(received, msg);
1572 }
1573
1574 #[tokio::test]
1575 async fn test_persistence() {
1576 let temp_file = NamedTempFile::new().unwrap();
1577 let temp_path = temp_file.path().to_path_buf();
1578
1579 let msg = TestMessage {
1580 id: 99,
1581 content: "Persistent message".to_string(),
1582 };
1583
1584 // Create queue, send message, and drop it
1585 {
1586 let queue = DiskBackedQueue::new(&temp_path, "persistence_test".to_string(), None)
1587 .await
1588 .unwrap();
1589 queue.send(msg.clone()).await.unwrap();
1590 }
1591
1592 // Create new queue instance and verify message is still there
1593 {
1594 let queue: DiskBackedQueue<TestMessage> =
1595 DiskBackedQueue::new(&temp_path, "persistence_test".to_string(), None)
1596 .await
1597 .unwrap();
1598 assert_eq!(queue.len().await.unwrap(), 1);
1599 let received = queue.recv().await.unwrap().unwrap();
1600 assert_eq!(received, msg);
1601 }
1602 }
1603
1604 #[tokio::test]
1605 async fn test_error_handling_database_corruption() {
1606 let temp_file = NamedTempFile::new().unwrap();
1607 let queue = DiskBackedQueue::new(temp_file.path(), "test_queue".to_string(), None)
1608 .await
1609 .unwrap();
1610
1611 // Send a valid message
1612 let msg = TestMessage {
1613 id: 1,
1614 content: "Test".to_string(),
1615 };
1616 queue.send(msg).await.unwrap();
1617
1618 // Manually corrupt the database by writing invalid binary data
1619 {
1620 let db = queue.db.lock().unwrap();
1621 let garbage_data: Vec<u8> = vec![0xFF, 0xFE, 0xFD, 0xFC]; // Invalid bincode
1622 db.execute(
1623 "UPDATE test_queue SET data = ? WHERE id = 1",
1624 [&garbage_data],
1625 )
1626 .unwrap();
1627 }
1628
1629 // Try to receive - should return deserialization error but message moved to DLQ
1630 let result = queue.recv().await;
1631 assert!(result.is_err(), "Expected error, got: {:?}", result);
1632 assert!(
1633 matches!(result, Err(DiskQueueError::Deserialization(_))),
1634 "Expected Deserialization error, got: {:?}",
1635 result
1636 );
1637 }
1638
1639 #[tokio::test]
1640 async fn test_concurrent_access() {
1641 let temp_file = NamedTempFile::new().unwrap();
1642 let queue = std::sync::Arc::new(
1643 DiskBackedQueue::new(temp_file.path(), "concurrent_test".to_string(), None)
1644 .await
1645 .unwrap(),
1646 );
1647
1648 // Spawn multiple senders
1649 let mut send_handles = vec![];
1650 for i in 0..10 {
1651 let queue_clone = queue.clone();
1652 let handle = tokio::spawn(async move {
1653 for j in 0..10 {
1654 let msg = TestMessage {
1655 id: i * 10 + j,
1656 content: format!("Message from sender {i}, iteration {j}"),
1657 };
1658 queue_clone.send(msg).await.unwrap();
1659 }
1660 });
1661 send_handles.push(handle);
1662 }
1663
1664 // Spawn multiple receivers
1665 let mut recv_handles = vec![];
1666 let received_messages = std::sync::Arc::new(tokio::sync::Mutex::new(Vec::new()));
1667
1668 for _ in 0..5 {
1669 let queue_clone = queue.clone();
1670 let messages_clone = received_messages.clone();
1671 let handle = tokio::spawn(async move {
1672 loop {
1673 match queue_clone.recv().await {
1674 Ok(Some(msg)) => {
1675 messages_clone.lock().await.push(msg);
1676 }
1677 Ok(None) => {
1678 // No more messages, small delay before checking again
1679 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1680 }
1681 Err(_) => break,
1682 }
1683
1684 // Stop when we've received all messages
1685 if messages_clone.lock().await.len() >= 100 {
1686 break;
1687 }
1688 }
1689 });
1690 recv_handles.push(handle);
1691 }
1692
1693 // Wait for all senders to complete
1694 for handle in send_handles {
1695 handle.await.unwrap();
1696 }
1697
1698 // Wait for all messages to be received with timeout
1699 let start = tokio::time::Instant::now();
1700 let timeout = tokio::time::Duration::from_secs(10);
1701
1702 loop {
1703 let count = received_messages.lock().await.len();
1704 if count >= 100 {
1705 break;
1706 }
1707
1708 if start.elapsed() > timeout {
1709 panic!(
1710 "Timeout: Only received {} messages after {:?}",
1711 count, timeout
1712 );
1713 }
1714
1715 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1716 }
1717
1718 // Check that all messages were received
1719 let messages = received_messages.lock().await;
1720 assert_eq!(messages.len(), 100);
1721
1722 // Clean up receivers
1723 for handle in recv_handles {
1724 handle.abort();
1725 }
1726 }
1727
1728 #[tokio::test]
1729 async fn test_empty_queue_operations() {
1730 let temp_file = NamedTempFile::new().unwrap();
1731 let queue: DiskBackedQueue<TestMessage> =
1732 DiskBackedQueue::new(temp_file.path(), "empty_test".to_string(), None)
1733 .await
1734 .unwrap();
1735
1736 // Multiple recv calls on empty queue should all return None
1737 for _ in 0..5 {
1738 assert!(queue.recv().await.unwrap().is_none());
1739 }
1740
1741 assert!(queue.is_empty().await.unwrap());
1742 assert_eq!(queue.len().await.unwrap(), 0);
1743 }
1744
1745 #[tokio::test]
1746 async fn test_large_messages() {
1747 let temp_file = NamedTempFile::new().unwrap();
1748 let queue = DiskBackedQueue::new(temp_file.path(), "large_test".to_string(), None)
1749 .await
1750 .unwrap();
1751
1752 // Create a large message (1MB of content)
1753 let large_content = "x".repeat(1024 * 1024);
1754 let large_msg = TestMessage {
1755 id: 42,
1756 content: large_content.clone(),
1757 };
1758
1759 // Send and receive large message
1760 queue.send(large_msg.clone()).await.unwrap();
1761 let received = queue.recv().await.unwrap().unwrap();
1762
1763 assert_eq!(received.id, 42);
1764 assert_eq!(received.content.len(), 1024 * 1024);
1765 assert_eq!(received.content, large_content);
1766 }
1767
1768 #[tokio::test]
1769 async fn test_fifo_ordering() {
1770 let temp_file = NamedTempFile::new().unwrap();
1771 let queue = DiskBackedQueue::new(temp_file.path(), "fifo_test".to_string(), None)
1772 .await
1773 .unwrap();
1774
1775 // Send messages in order
1776 let messages: Vec<TestMessage> = (0..100)
1777 .map(|i| TestMessage {
1778 id: i,
1779 content: format!("Message {i}"),
1780 })
1781 .collect();
1782
1783 for msg in &messages {
1784 queue.send(msg.clone()).await.unwrap();
1785 }
1786
1787 // Receive messages and verify FIFO order
1788 for expected_msg in &messages {
1789 let received = queue.recv().await.unwrap().unwrap();
1790 assert_eq!(received, *expected_msg);
1791 }
1792
1793 // Queue should be empty now
1794 assert!(queue.recv().await.unwrap().is_none());
1795 }
1796
1797 #[tokio::test]
1798 async fn test_invalid_table_name() {
1799 let temp_file = NamedTempFile::new().unwrap();
1800
1801 // Try to create queue with invalid table name
1802 let _result = DiskBackedQueue::<TestMessage>::new(
1803 temp_file.path(),
1804 "invalid-table-name-with-dashes".to_string(),
1805 None,
1806 )
1807 .await;
1808
1809 // This should still work because SQLite is quite permissive
1810 // but let's test a truly invalid name
1811 let result2 = DiskBackedQueue::<TestMessage>::new(
1812 temp_file.path(),
1813 "".to_string(), // Empty table name should cause issues
1814 None,
1815 )
1816 .await;
1817
1818 // Empty table name should cause an error
1819 assert!(result2.is_err());
1820 }
1821
1822 #[tokio::test(flavor = "multi_thread")]
1823 async fn test_blocking_send() {
1824 let temp_file = NamedTempFile::new().unwrap();
1825 let (sender, mut receiver) = disk_backed_channel::<TestMessage, _>(
1826 temp_file.path(),
1827 "blocking_test".to_string(),
1828 None,
1829 )
1830 .await
1831 .unwrap();
1832
1833 let msg = TestMessage {
1834 id: 123,
1835 content: "Blocking test".to_string(),
1836 };
1837
1838 // Test blocking send
1839 sender.blocking_send(msg.clone()).unwrap();
1840
1841 // Receive and verify
1842 let received = receiver.recv().await.unwrap().unwrap();
1843 assert_eq!(received, msg);
1844 }
1845
1846 #[tokio::test]
1847 async fn test_database_file_permissions() {
1848 let temp_file = NamedTempFile::new().unwrap();
1849 let temp_path = temp_file.path().to_path_buf();
1850
1851 // Create queue first
1852 let _queue = DiskBackedQueue::<TestMessage>::new(&temp_path, "perm_test".to_string(), None)
1853 .await
1854 .unwrap();
1855
1856 // Check that database file was created
1857 assert!(temp_path.exists());
1858
1859 // Basic metadata check (permissions test is platform-specific)
1860 let metadata = std::fs::metadata(&temp_path).unwrap();
1861 assert!(metadata.is_file());
1862 assert!(metadata.len() > 0); // Should have some content (SQLite header)
1863 }
1864
1865 #[tokio::test]
1866 async fn test_dlq_file_created() {
1867 let temp_file = NamedTempFile::new().unwrap();
1868 let temp_path = temp_file.path();
1869
1870 disk_backed_channel::<TestMessage, _>(temp_path, "test".to_string(), None)
1871 .await
1872 .unwrap();
1873
1874 // Check DLQ file exists
1875 let dlq_path = temp_path.with_extension("dlq.db");
1876 assert!(dlq_path.exists());
1877 }
1878
1879 #[tokio::test]
1880 async fn test_transaction_rollback_on_corruption() {
1881 let temp_file = NamedTempFile::new().unwrap();
1882 let queue = DiskBackedQueue::new(temp_file.path(), "test_queue".to_string(), None)
1883 .await
1884 .unwrap();
1885
1886 // Send a valid message
1887 let msg = TestMessage {
1888 id: 1,
1889 content: "Valid".to_string(),
1890 };
1891 queue.send(msg).await.unwrap();
1892
1893 // Manually corrupt the database by writing invalid binary data
1894 {
1895 let db = queue.db.lock().unwrap();
1896 let garbage_data: Vec<u8> = vec![0xFF, 0xFE, 0xFD, 0xFC]; // Invalid bincode
1897 db.execute(
1898 "UPDATE test_queue SET data = ? WHERE id = 1",
1899 [&garbage_data],
1900 )
1901 .unwrap();
1902 }
1903
1904 // Try to receive - should return deserialization error
1905 let result = queue.recv().await;
1906 assert!(result.is_err());
1907 assert!(matches!(result, Err(DiskQueueError::Deserialization(_))));
1908
1909 // Verify message was moved to DLQ
1910 let dlq_path = temp_file.path().with_extension("dlq.db");
1911 let dlq_conn = rusqlite::Connection::open(&dlq_path).unwrap();
1912 let count: i64 = dlq_conn
1913 .query_row("SELECT COUNT(*) FROM test_queue_dlq", [], |row| row.get(0))
1914 .unwrap();
1915 assert_eq!(count, 1);
1916
1917 // Verify main queue is empty
1918 assert_eq!(queue.len().await.unwrap(), 0);
1919 }
1920
1921 #[tokio::test]
1922 async fn test_max_size_blocks() {
1923 let temp_file = NamedTempFile::new().unwrap();
1924 let (tx, mut rx) = disk_backed_channel::<TestMessage, _>(
1925 temp_file.path(),
1926 "test".to_string(),
1927 Some(2), // Max 2 messages
1928 )
1929 .await
1930 .unwrap();
1931
1932 let msg1 = TestMessage {
1933 id: 1,
1934 content: "Message 1".to_string(),
1935 };
1936 let msg2 = TestMessage {
1937 id: 2,
1938 content: "Message 2".to_string(),
1939 };
1940 let msg3 = TestMessage {
1941 id: 3,
1942 content: "Message 3".to_string(),
1943 };
1944
1945 // Send 2 messages (should succeed immediately)
1946 tx.send(msg1.clone()).await.unwrap();
1947 tx.send(msg2.clone()).await.unwrap();
1948
1949 // Send 3rd message in background (should block until space available)
1950 let tx_clone = tx.clone();
1951 let handle = tokio::spawn(async move { tx_clone.send(msg3).await });
1952
1953 // Give it time to attempt send (should be blocked)
1954 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
1955 assert!(!handle.is_finished());
1956
1957 // Receive one message to make space
1958 let received = rx.recv().await.unwrap().unwrap();
1959 assert_eq!(received, msg1);
1960
1961 // Now 3rd send should complete
1962 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
1963 assert!(handle.is_finished());
1964 handle.await.unwrap().unwrap();
1965
1966 // Verify queue has 2 messages
1967 assert_eq!(rx.len().await.unwrap(), 2);
1968 }
1969
1970 #[tokio::test]
1971 async fn test_send_batch() {
1972 let temp_file = NamedTempFile::new().unwrap();
1973 let queue = DiskBackedQueue::new(temp_file.path(), "batch_test".to_string(), None)
1974 .await
1975 .unwrap();
1976
1977 // Create a batch of messages
1978 let messages: Vec<TestMessage> = (0..100)
1979 .map(|i| TestMessage {
1980 id: i,
1981 content: format!("Batch message {}", i),
1982 })
1983 .collect();
1984
1985 // Send batch
1986 queue.send_batch(messages.clone()).await.unwrap();
1987
1988 // Verify count
1989 assert_eq!(queue.len().await.unwrap(), 100);
1990
1991 // Receive and verify all messages
1992 for i in 0..100 {
1993 let received = queue.recv().await.unwrap().unwrap();
1994 assert_eq!(received, messages[i]);
1995 }
1996
1997 assert!(queue.is_empty().await.unwrap());
1998 }
1999
2000 #[tokio::test]
2001 async fn test_recv_batch() {
2002 let temp_file = NamedTempFile::new().unwrap();
2003 let queue = DiskBackedQueue::new(temp_file.path(), "batch_recv_test".to_string(), None)
2004 .await
2005 .unwrap();
2006
2007 // Send 100 individual messages
2008 for i in 0..100 {
2009 let msg = TestMessage {
2010 id: i,
2011 content: format!("Message {}", i),
2012 };
2013 queue.send(msg).await.unwrap();
2014 }
2015
2016 // Receive in batches of 25
2017 let batch1 = queue.recv_batch(25).await.unwrap();
2018 assert_eq!(batch1.len(), 25);
2019 assert_eq!(batch1[0].id, 0);
2020 assert_eq!(batch1[24].id, 24);
2021
2022 let batch2 = queue.recv_batch(25).await.unwrap();
2023 assert_eq!(batch2.len(), 25);
2024 assert_eq!(batch2[0].id, 25);
2025 assert_eq!(batch2[24].id, 49);
2026
2027 // Receive remaining 50 items (but only request 100)
2028 let batch3 = queue.recv_batch(100).await.unwrap();
2029 assert_eq!(batch3.len(), 50); // Should only get 50
2030 assert_eq!(batch3[0].id, 50);
2031 assert_eq!(batch3[49].id, 99);
2032
2033 // Queue should be empty
2034 assert!(queue.is_empty().await.unwrap());
2035 let empty_batch = queue.recv_batch(10).await.unwrap();
2036 assert!(empty_batch.is_empty());
2037 }
2038
2039 #[tokio::test]
2040 async fn test_batch_with_channel_api() {
2041 let temp_file = NamedTempFile::new().unwrap();
2042 let (tx, mut rx) = disk_backed_channel::<TestMessage, _>(
2043 temp_file.path(),
2044 "batch_channel_test".to_string(),
2045 None,
2046 )
2047 .await
2048 .unwrap();
2049
2050 // Send batch via sender
2051 let messages: Vec<TestMessage> = (0..50)
2052 .map(|i| TestMessage {
2053 id: i,
2054 content: format!("Batch {}", i),
2055 })
2056 .collect();
2057
2058 tx.send_batch(messages.clone()).await.unwrap();
2059
2060 // Receive batch via receiver
2061 let received = rx.recv_batch(50).await.unwrap();
2062 assert_eq!(received.len(), 50);
2063 assert_eq!(received, messages);
2064 }
2065
2066 #[tokio::test]
2067 async fn test_batch_performance_comparison() {
2068 let temp_file1 = NamedTempFile::new().unwrap();
2069 let temp_file2 = NamedTempFile::new().unwrap();
2070 let queue_single = DiskBackedQueue::new(temp_file1.path(), "single".to_string(), None)
2071 .await
2072 .unwrap();
2073 let queue_batch = DiskBackedQueue::new(temp_file2.path(), "batch".to_string(), None)
2074 .await
2075 .unwrap();
2076
2077 let message_count = 1000;
2078 let messages: Vec<TestMessage> = (0..message_count)
2079 .map(|i| TestMessage {
2080 id: i,
2081 content: format!("Perf test {}", i),
2082 })
2083 .collect();
2084
2085 // Single send
2086 let start = std::time::Instant::now();
2087 for msg in messages.clone() {
2088 queue_single.send(msg).await.unwrap();
2089 }
2090 let single_duration = start.elapsed();
2091
2092 // Batch send
2093 let start = std::time::Instant::now();
2094 queue_batch.send_batch(messages.clone()).await.unwrap();
2095 let batch_duration = start.elapsed();
2096
2097 println!(
2098 "Single send: {:?}, Batch send: {:?}, Speedup: {:.2}x",
2099 single_duration,
2100 batch_duration,
2101 single_duration.as_secs_f64() / batch_duration.as_secs_f64()
2102 );
2103
2104 // Batch should be significantly faster (at least 5x)
2105 assert!(batch_duration < single_duration / 5);
2106 }
2107
2108 #[tokio::test]
2109 async fn test_batch_with_corrupted_data() {
2110 let temp_file = NamedTempFile::new().unwrap();
2111 let queue = DiskBackedQueue::new(temp_file.path(), "batch_corrupt_test".to_string(), None)
2112 .await
2113 .unwrap();
2114
2115 // Send 10 valid messages
2116 for i in 0..10 {
2117 let msg = TestMessage {
2118 id: i,
2119 content: format!("Valid {}", i),
2120 };
2121 queue.send(msg).await.unwrap();
2122 }
2123
2124 // Corrupt message 5
2125 {
2126 let db = queue.db.lock().unwrap();
2127 let garbage_data: Vec<u8> = vec![0xFF, 0xFE, 0xFD, 0xFC];
2128 db.execute(
2129 "UPDATE batch_corrupt_test SET data = ? WHERE id = 6",
2130 [&garbage_data],
2131 )
2132 .unwrap();
2133 }
2134
2135 // Batch receive should skip corrupted message and move it to DLQ
2136 let batch = queue.recv_batch(10).await.unwrap();
2137 assert_eq!(batch.len(), 9); // Should get 9 valid messages (10 - 1 corrupted)
2138
2139 // Verify DLQ has 1 entry
2140 let dlq_path = temp_file.path().with_extension("dlq.db");
2141 let dlq_conn = rusqlite::Connection::open(&dlq_path).unwrap();
2142 let dlq_count: i64 = dlq_conn
2143 .query_row("SELECT COUNT(*) FROM batch_corrupt_test_dlq", [], |row| {
2144 row.get(0)
2145 })
2146 .unwrap();
2147 assert_eq!(dlq_count, 1);
2148 }
2149}