disk_backed_queue/
lib.rs

1//! # disk-backed-queue
2//!
3//! A robust, crash-resistant queue implementation that persists all data to disk using SQLite.
4//! Provides an mpsc-like channel API while ensuring messages survive application restarts and
5//! system failures.
6//!
7//! ## Features
8//!
9//! - **Zero Message Loss**: All messages are persisted to SQLite before acknowledgment
10//! - **Crash Recovery**: Messages survive application restarts and system crashes
11//! - **MPSC-like API**: Familiar channel-based interface compatible with async Rust
12//! - **Dead Letter Queue**: Corrupted messages automatically moved to separate database
13//! - **Batch Operations**: High-performance bulk send/receive (460x faster than single operations)
14//! - **Backpressure Support**: Optional queue size limits prevent unbounded growth
15//!
16//! ## Quick Start
17//!
18//! ```no_run
19//! use disk_backed_queue::disk_backed_channel;
20//! use serde::{Deserialize, Serialize};
21//!
22//! #[derive(Serialize, Deserialize, Debug)]
23//! struct MyMessage {
24//!     id: u64,
25//!     content: String,
26//! }
27//!
28//! #[tokio::main]
29//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
30//!     // Create a disk-backed channel
31//!     let (tx, mut rx) = disk_backed_channel::<MyMessage, _>(
32//!         "my_queue.db",
33//!         "messages".to_string(),
34//!         None  // No size limit
35//!     ).await?;
36//!
37//!     // Send messages
38//!     tx.send(MyMessage {
39//!         id: 1,
40//!         content: "Hello, world!".to_string(),
41//!     }).await?;
42//!
43//!     // Receive messages
44//!     if let Some(msg) = rx.recv().await? {
45//!         println!("Received: {:?}", msg);
46//!     }
47//!
48//!     Ok(())
49//! }
50//! ```
51//!
52//! ## Use Cases
53//!
54//! Perfect for scenarios where message loss is unacceptable:
55//!
56//! - Message queuing during database outages
57//! - Event sourcing and audit logs
58//! - Job queues that survive restarts
59//! - Data pipelines requiring guaranteed delivery
60//!
61//! ## Performance
62//!
63//! Batch operations are approximately **50x faster** than single operations due to
64//! reduced fsync overhead. For high-throughput scenarios, use [`DiskBackedSender::send_batch`]
65//! and [`DiskBackedReceiver::recv_batch`].
66//!
67//! Run `cargo run --example throughput_demo --release` to see performance on your system.
68
69use rusqlite::OptionalExtension;
70use serde::{Deserialize, Serialize};
71use std::marker::PhantomData;
72use std::path::Path;
73use std::sync::{Arc, Mutex};
74use std::time::Duration;
75use thiserror::Error;
76use tracing::{debug, error, instrument, warn};
77
78#[derive(Error, Debug)]
79pub enum DiskQueueError {
80    #[error("Database error: {0}")]
81    Database(#[from] rusqlite::Error),
82    #[error("Serialization error: {0}")]
83    Serialization(#[from] bincode::error::EncodeError),
84    #[error("Deserialization error: {0}")]
85    Deserialization(#[from] bincode::error::DecodeError),
86    #[error("Invalid table name: {0}")]
87    InvalidTableName(String),
88    #[error("Internal task error: {0}")]
89    TaskJoin(String),
90    #[error("Queue is closed")]
91    QueueClosed,
92    #[error("Unexpected row count: {0}")]
93    UnexpectedRowCount(String),
94    #[error("Queue is full (max size: {0})")]
95    QueueFull(usize),
96}
97
98pub type Result<T> = std::result::Result<T, DiskQueueError>;
99
100/// Durability configuration for SQLite synchronous mode
101///
102/// This controls the trade-off between durability and performance.
103#[derive(Debug, Clone, Copy, Default)]
104pub enum DurabilityLevel {
105    /// OFF (0) - No fsync, fastest but data loss on power failure
106    /// **WARNING**: Only use for temporary/cache data where loss is acceptable
107    Off,
108
109    /// NORMAL (1) - Fsync only at critical moments (WAL checkpoints)
110    /// Safe in WAL mode for most crashes, but power loss during checkpoint can corrupt
111    /// Good balance of performance and safety for WAL mode
112    Normal,
113
114    /// FULL (2) - Fsync after every commit (DEFAULT)
115    /// Maximum durability - survives power loss at any moment
116    /// Recommended for critical data where message loss is unacceptable
117    #[default]
118    Full,
119
120    /// EXTRA (3) - Even more paranoid than FULL
121    /// Additional fsyncs for maximum durability
122    /// Rarely needed, significant performance impact
123    Extra,
124}
125
126impl DurabilityLevel {
127    fn as_str(&self) -> &'static str {
128        match self {
129            DurabilityLevel::Off => "OFF",
130            DurabilityLevel::Normal => "NORMAL",
131            DurabilityLevel::Full => "FULL",
132            DurabilityLevel::Extra => "EXTRA",
133        }
134    }
135}
136
137/// Cached SQL query strings for the queue operations.
138///
139/// Pre-formatted SQL queries are cached to avoid repeated string formatting
140/// on every operation. The table name is validated and interpolated once at
141/// queue creation time, ensuring SQL injection safety.
142#[derive(Debug)]
143struct CachedQueries {
144    insert_sql: String,
145    select_sql: String,
146    delete_sql: String,
147    count_sql: String,
148    clear_sql: String,
149}
150
151/// A disk-backed queue that persists all messages to SQLite.
152///
153/// This is the core queue implementation. For most use cases, prefer using
154/// [`disk_backed_channel`] which provides a more ergonomic sender/receiver API.
155///
156/// All operations are async and use `spawn_blocking` internally to avoid blocking
157/// the async executor.
158///
159/// # Type Parameters
160///
161/// * `T` - The message type. Must implement `Serialize + Deserialize + Send + Sync + 'static`.
162///
163/// # Example
164///
165/// ```no_run
166/// use disk_backed_queue::DiskBackedQueue;
167/// use serde::{Deserialize, Serialize};
168///
169/// #[derive(Serialize, Deserialize)]
170/// struct Message {
171///     data: String,
172/// }
173///
174/// #[tokio::main]
175/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
176///     let queue = DiskBackedQueue::new(
177///         "queue.db",
178///         "messages".to_string(),
179///         None,
180///     ).await?;
181///
182///     queue.send(Message { data: "hello".to_string() }).await?;
183///     let msg = queue.recv().await?;
184///
185///     Ok(())
186/// }
187/// ```
188pub struct DiskBackedQueue<T> {
189    db: Arc<Mutex<rusqlite::Connection>>,
190    dlq_db: Arc<Mutex<rusqlite::Connection>>,
191    queries: CachedQueries,
192    dlq_insert_sql: String,
193    table_name: String,
194    max_size: Option<usize>,
195    _phantom: PhantomData<T>,
196}
197
198impl<T> std::fmt::Debug for DiskBackedQueue<T> {
199    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
200        f.debug_struct("DiskBackedQueue")
201            .field("table_name", &self.table_name)
202            .field("max_size", &self.max_size)
203            .field("queries", &self.queries)
204            .finish_non_exhaustive()
205    }
206}
207
208impl<T> DiskBackedQueue<T>
209where
210    T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static,
211{
212    /// Create a new disk-backed queue with default durability (FULL)
213    ///
214    /// This is the recommended constructor for most use cases.
215    #[instrument(skip_all, fields(db_path = %db_path.as_ref().display(), table_name = %table_name))]
216    pub async fn new<P: AsRef<Path>>(
217        db_path: P,
218        table_name: String,
219        max_size: Option<usize>,
220    ) -> Result<Self> {
221        Self::with_durability(db_path, table_name, max_size, DurabilityLevel::default()).await
222    }
223
224    /// Create a new disk-backed queue with custom durability level
225    ///
226    /// # Durability Levels
227    ///
228    /// - `DurabilityLevel::Full` (default) - Maximum safety, fsync after every commit
229    /// - `DurabilityLevel::Normal` - Good balance in WAL mode, fsync at checkpoints
230    /// - `DurabilityLevel::Off` - Fastest, but data loss on power failure (not recommended)
231    /// - `DurabilityLevel::Extra` - Paranoid mode, additional fsyncs (rarely needed)
232    ///
233    /// # Example
234    ///
235    /// ```ignore
236    /// // For critical data (default)
237    /// let queue = DiskBackedQueue::with_durability(
238    ///     "critical.db",
239    ///     "messages".to_string(),
240    ///     None,
241    ///     DurabilityLevel::Full,
242    /// ).await?;
243    ///
244    /// // For high-performance temporary data
245    /// let queue = DiskBackedQueue::with_durability(
246    ///     "cache.db",
247    ///     "temp".to_string(),
248    ///     None,
249    ///     DurabilityLevel::Normal,
250    /// ).await?;
251    /// ```
252    #[instrument(skip_all, fields(db_path = %db_path.as_ref().display(), table_name = %table_name, durability = ?durability))]
253    pub async fn with_durability<P: AsRef<Path>>(
254        db_path: P,
255        table_name: String,
256        max_size: Option<usize>,
257        durability: DurabilityLevel,
258    ) -> Result<Self> {
259        // Validate table name to prevent SQL injection
260        if table_name.is_empty() || table_name.len() > 128 {
261            return Err(DiskQueueError::InvalidTableName(table_name));
262        }
263        if !table_name.chars().all(|c| c.is_alphanumeric() || c == '_') {
264            return Err(DiskQueueError::InvalidTableName(table_name));
265        }
266
267        let conn = rusqlite::Connection::open(&db_path).map_err(|e| {
268            error!(
269                "Failed to open SQLite database at {}: {}",
270                db_path.as_ref().display(),
271                e
272            );
273            e
274        })?;
275
276        // Enable WAL mode for better concurrency and crash safety
277        conn.pragma_update(None, "journal_mode", "WAL")
278            .map_err(|e| {
279                error!("Failed to enable WAL mode: {}", e);
280                e
281            })?;
282
283        // Set synchronous mode for durability guarantees
284        conn.pragma_update(None, "synchronous", durability.as_str())
285            .map_err(|e| {
286                error!(
287                    "Failed to set synchronous mode to {}: {}",
288                    durability.as_str(),
289                    e
290                );
291                e
292            })?;
293
294        debug!(
295            "Configured SQLite with WAL mode and synchronous={}",
296            durability.as_str()
297        );
298
299        // Enable foreign key constraints
300        conn.pragma_update(None, "foreign_keys", true)
301            .map_err(|e| {
302                error!("Failed to enable foreign keys: {}", e);
303                e
304            })?;
305
306        // Set busy timeout to handle concurrent access
307        conn.busy_timeout(Duration::from_secs(5)).map_err(|e| {
308            error!("Failed to set busy timeout: {}", e);
309            e
310        })?;
311
312        // Create table if it doesn't exist
313        let create_table_sql = format!(
314            "CREATE TABLE IF NOT EXISTS {table_name} (
315                id INTEGER PRIMARY KEY AUTOINCREMENT,
316                data BLOB NOT NULL,
317                created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now'))
318            )"
319        );
320        conn.execute(&create_table_sql, []).map_err(|e| {
321            error!("Failed to create table {}: {}", table_name, e);
322            e
323        })?;
324
325        // Create index on created_at for efficient ordering
326        let create_index_sql = format!(
327            "CREATE INDEX IF NOT EXISTS idx_{table_name}_created_at ON {table_name} (created_at, id)"
328        );
329        conn.execute(&create_index_sql, []).map_err(|e| {
330            error!("Failed to create index for table {}: {}", table_name, e);
331            e
332        })?;
333
334        debug!(
335            "Successfully initialized disk-backed queue table: {}",
336            table_name
337        );
338
339        // Open dead letter queue database
340        let dlq_path = db_path.as_ref().with_extension("dlq.db");
341        let dlq_conn = rusqlite::Connection::open(&dlq_path).map_err(|e| {
342            error!(
343                "Failed to open DLQ database at {}: {}",
344                dlq_path.display(),
345                e
346            );
347            e
348        })?;
349
350        // Configure DLQ database with same durability settings
351        dlq_conn
352            .pragma_update(None, "journal_mode", "WAL")
353            .map_err(|e| {
354                error!("Failed to enable WAL mode for DLQ: {}", e);
355                e
356            })?;
357
358        dlq_conn
359            .pragma_update(None, "synchronous", durability.as_str())
360            .map_err(|e| {
361                error!(
362                    "Failed to set synchronous mode for DLQ to {}: {}",
363                    durability.as_str(),
364                    e
365                );
366                e
367            })?;
368
369        dlq_conn.busy_timeout(Duration::from_secs(5)).map_err(|e| {
370            error!("Failed to set busy timeout for DLQ: {}", e);
371            e
372        })?;
373
374        // Create DLQ table with error information
375        let dlq_table_sql = format!(
376            "CREATE TABLE IF NOT EXISTS {table_name}_dlq (
377                id INTEGER PRIMARY KEY AUTOINCREMENT,
378                original_id INTEGER,
379                data BLOB NOT NULL,
380                error_message TEXT NOT NULL,
381                created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')),
382                moved_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now'))
383            )"
384        );
385        dlq_conn.execute(&dlq_table_sql, []).map_err(|e| {
386            error!("Failed to create DLQ table {}_dlq: {}", table_name, e);
387            e
388        })?;
389
390        debug!(
391            "Successfully initialized dead letter queue for table: {}",
392            table_name
393        );
394
395        let queries = CachedQueries {
396            insert_sql: format!("INSERT INTO {table_name} (data) VALUES (?)"),
397            select_sql: format!(
398                "SELECT id, data FROM {table_name} ORDER BY created_at ASC, id ASC LIMIT 1"
399            ),
400            delete_sql: format!("DELETE FROM {table_name} WHERE id = ?"),
401            count_sql: format!("SELECT COUNT(*) FROM {table_name}"),
402            clear_sql: format!("DELETE FROM {table_name}"),
403        };
404
405        let dlq_insert_sql = format!(
406            "INSERT INTO {table_name}_dlq (original_id, data, error_message) VALUES (?, ?, ?)"
407        );
408
409        Ok(Self {
410            db: Arc::new(Mutex::new(conn)),
411            dlq_db: Arc::new(Mutex::new(dlq_conn)),
412            queries,
413            dlq_insert_sql,
414            table_name,
415            max_size,
416            _phantom: PhantomData,
417        })
418    }
419
420    /// Send a single message to the queue.
421    ///
422    /// The message is serialized using bincode and persisted to SQLite before this
423    /// method returns. If the queue has a `max_size` configured and the queue is full,
424    /// this method will block with exponential backoff until space becomes available.
425    ///
426    /// # Performance
427    ///
428    /// Single sends are limited by fsync overhead. For high throughput, use
429    /// [`send_batch`](Self::send_batch) instead, which is approximately 50x faster.
430    ///
431    /// # Example
432    ///
433    /// ```no_run
434    /// # use disk_backed_queue::DiskBackedQueue;
435    /// # use serde::{Deserialize, Serialize};
436    /// # #[derive(Serialize, Deserialize)]
437    /// # struct Message { data: String }
438    /// # #[tokio::main]
439    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
440    /// # let queue = DiskBackedQueue::new("queue.db", "messages".to_string(), None).await?;
441    /// queue.send(Message { data: "hello".to_string() }).await?;
442    /// # Ok(())
443    /// # }
444    /// ```
445    #[instrument(skip_all, fields(table_name = %self.table_name))]
446    pub async fn send(&self, item: T) -> Result<()> {
447        let config = bincode::config::standard();
448        let serialized = bincode::serde::encode_to_vec(&item, config).map_err(|e| {
449            error!("Failed to serialize item for queue: {}", e);
450            DiskQueueError::Serialization(e)
451        })?;
452
453        let db = self.db.clone();
454        let insert_sql = self.queries.insert_sql.clone();
455        let count_sql = self.queries.count_sql.clone();
456        let max_size = self.max_size;
457        let table_name = self.table_name.clone();
458
459        tokio::task::spawn_blocking(move || {
460            // Implement backpressure if max_size is configured
461            if let Some(max) = max_size {
462                let mut backoff = Duration::from_millis(10);
463                loop {
464                    let conn = db.lock().map_err(|e| {
465                        error!("Failed to acquire database lock: {}", e);
466                        DiskQueueError::TaskJoin(format!("Mutex poisoned: {}", e))
467                    })?;
468                    let count: i64 = conn
469                        .query_row(&count_sql, [], |row| row.get(0))
470                        .map_err(DiskQueueError::Database)?;
471
472                    if (count as usize) < max {
473                        break;
474                    }
475
476                    // Queue is full, release lock and wait
477                    drop(conn);
478                    warn!(
479                        table_name = %table_name,
480                        current_size = count,
481                        max_size = max,
482                        "Queue is full, waiting for space..."
483                    );
484                    std::thread::sleep(backoff);
485                    backoff = std::cmp::min(backoff * 2, Duration::from_secs(1));
486                }
487            }
488
489            let conn = db.lock().map_err(|e| {
490                error!("Failed to acquire database lock: {}", e);
491                DiskQueueError::TaskJoin(format!("Mutex poisoned: {}", e))
492            })?;
493            let rows_affected = conn.execute(&insert_sql, [&serialized]).map_err(|e| {
494                error!(
495                    "Failed to insert item into queue table {}: {}",
496                    table_name, e
497                );
498                DiskQueueError::Database(e)
499            })?;
500
501            if rows_affected != 1 {
502                error!("Expected to insert 1 row, but inserted {}", rows_affected);
503                return Err(DiskQueueError::UnexpectedRowCount(format!(
504                    "Insert affected {rows_affected} rows instead of 1"
505                )));
506            }
507
508            debug!("Successfully enqueued item to disk queue");
509            Ok(())
510        })
511        .await
512        .map_err(|e| DiskQueueError::TaskJoin(e.to_string()))?
513    }
514
515    /// Send multiple messages in a single transaction for much better throughput.
516    ///
517    /// All messages in the batch are inserted atomically - either all succeed or all fail.
518    /// This is significantly faster than individual sends (approximately 50x speedup) because
519    /// it uses a single SQLite transaction and fsync.
520    ///
521    /// # Performance
522    ///
523    /// Batch operations amortize the fsync overhead across all messages in the batch.
524    ///
525    /// # Example
526    ///
527    /// ```no_run
528    /// # use disk_backed_queue::DiskBackedQueue;
529    /// # use serde::{Deserialize, Serialize};
530    /// # #[derive(Serialize, Deserialize)]
531    /// # struct Message { data: String }
532    /// # #[tokio::main]
533    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
534    /// # let queue = DiskBackedQueue::new("queue.db", "messages".to_string(), None).await?;
535    /// let messages: Vec<Message> = (0..100)
536    ///     .map(|i| Message { data: format!("msg_{}", i) })
537    ///     .collect();
538    ///
539    /// queue.send_batch(messages).await?;
540    /// # Ok(())
541    /// # }
542    /// ```
543    #[instrument(skip_all, fields(table_name = %self.table_name, batch_size = items.len()))]
544    pub async fn send_batch(&self, items: Vec<T>) -> Result<()> {
545        if items.is_empty() {
546            return Ok(());
547        }
548
549        let config = bincode::config::standard();
550        let mut serialized_items = Vec::with_capacity(items.len());
551
552        // Serialize all items first (outside spawn_blocking for error handling)
553        for item in items {
554            let serialized = bincode::serde::encode_to_vec(&item, config).map_err(|e| {
555                error!("Failed to serialize item for batch queue: {}", e);
556                DiskQueueError::Serialization(e)
557            })?;
558            serialized_items.push(serialized);
559        }
560
561        let db = self.db.clone();
562        let insert_sql = self.queries.insert_sql.clone();
563        let count_sql = self.queries.count_sql.clone();
564        let max_size = self.max_size;
565        let table_name = self.table_name.clone();
566        let batch_size = serialized_items.len();
567
568        tokio::task::spawn_blocking(move || {
569            // Implement backpressure if max_size is configured
570            if let Some(max) = max_size {
571                let mut backoff = Duration::from_millis(10);
572                loop {
573                    let conn = db.lock().map_err(|e| {
574                        error!("Failed to acquire database lock: {}", e);
575                        DiskQueueError::TaskJoin(format!("Mutex poisoned: {}", e))
576                    })?;
577                    let count: i64 = conn
578                        .query_row(&count_sql, [], |row| row.get(0))
579                        .map_err(DiskQueueError::Database)?;
580
581                    // Check if we have space for the entire batch
582                    if (count as usize) + batch_size <= max {
583                        break;
584                    }
585
586                    // Queue is full, release lock and wait
587                    drop(conn);
588                    warn!(
589                        table_name = %table_name,
590                        current_size = count,
591                        max_size = max,
592                        batch_size = batch_size,
593                        "Queue is full, waiting for space for batch..."
594                    );
595                    std::thread::sleep(backoff);
596                    backoff = std::cmp::min(backoff * 2, Duration::from_secs(1));
597                }
598            }
599
600            let mut conn = db.lock().map_err(|e| {
601                error!("Failed to acquire database lock: {}", e);
602                DiskQueueError::TaskJoin(format!("Mutex poisoned: {}", e))
603            })?;
604
605            // Use a transaction for atomic batch insert
606            let tx = conn.transaction().map_err(|e| {
607                error!("Failed to start transaction for batch insert: {}", e);
608                DiskQueueError::Database(e)
609            })?;
610
611            // Insert all items in the transaction
612            for serialized in &serialized_items {
613                tx.execute(&insert_sql, [serialized]).map_err(|e| {
614                    error!(
615                        "Failed to insert item into queue table {} during batch: {}",
616                        table_name, e
617                    );
618                    DiskQueueError::Database(e)
619                })?;
620            }
621
622            // Commit the transaction
623            tx.commit().map_err(|e| {
624                error!(
625                    "Failed to commit batch transaction for table {}: {}",
626                    table_name, e
627                );
628                DiskQueueError::Database(e)
629            })?;
630
631            debug!(
632                "Successfully enqueued {} items to disk queue in batch",
633                batch_size
634            );
635            Ok(())
636        })
637        .await
638        .map_err(|e| DiskQueueError::TaskJoin(e.to_string()))?
639    }
640
641    /// Receive a single message from the queue.
642    ///
643    /// Returns the oldest message in FIFO order and removes it from the queue atomically.
644    /// If the queue is empty, returns `Ok(None)` immediately (non-blocking).
645    ///
646    /// # Dead Letter Queue
647    ///
648    /// If a message fails to deserialize (e.g., due to schema changes or corruption),
649    /// it is automatically moved to the dead letter queue database (`.dlq.db` file)
650    /// and a `Deserialization` error is returned. The queue is not blocked - subsequent
651    /// calls will process the next message.
652    ///
653    /// # Example
654    ///
655    /// ```no_run
656    /// # use disk_backed_queue::DiskBackedQueue;
657    /// # use serde::{Deserialize, Serialize};
658    /// # #[derive(Serialize, Deserialize)]
659    /// # struct Message { data: String }
660    /// # #[tokio::main]
661    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
662    /// # let queue = DiskBackedQueue::new("queue.db", "messages".to_string(), None).await?;
663    /// match queue.recv().await? {
664    ///     Some(msg) => println!("Received: {:?}", msg),
665    ///     None => println!("Queue is empty"),
666    /// }
667    /// # Ok(())
668    /// # }
669    /// ```
670    #[instrument(skip_all, fields(table_name = %self.table_name))]
671    pub async fn recv(&self) -> Result<Option<T>> {
672        let db = self.db.clone();
673        let dlq_db = self.dlq_db.clone();
674        let select_sql = self.queries.select_sql.clone();
675        let delete_sql = self.queries.delete_sql.clone();
676        let dlq_insert_sql = self.dlq_insert_sql.clone();
677        let table_name = self.table_name.clone();
678
679        tokio::task::spawn_blocking(move || {
680            let mut conn = db.lock().map_err(|e| {
681                error!("Failed to acquire database lock: {}", e);
682                DiskQueueError::TaskJoin(format!("Mutex poisoned: {}", e))
683            })?;
684
685            // Start a transaction for atomic read-delete
686            let tx = conn.transaction().map_err(|e| {
687                error!(
688                    "Failed to start transaction for table {}: {}",
689                    table_name, e
690                );
691                DiskQueueError::Database(e)
692            })?;
693
694            // Read the oldest item
695            let result: Option<(i64, Vec<u8>)> = tx
696                .query_row(&select_sql, [], |row| Ok((row.get(0)?, row.get(1)?)))
697                .optional()
698                .map_err(|e| {
699                    error!(
700                        "Failed to execute SELECT query on table {}: {}",
701                        table_name, e
702                    );
703                    DiskQueueError::Database(e)
704                })?;
705
706            if let Some((id, data)) = result {
707                // Try to deserialize BEFORE deleting
708                let config = bincode::config::standard();
709                let item: T = match bincode::serde::decode_from_slice(&data, config) {
710                    Ok((item, _len)) => item,
711                    Err(e) => {
712                        // Deserialization failed - move to dead letter queue
713                        error!(
714                            "Failed to deserialize item from queue (id {}): {}. Moving to DLQ.",
715                            id, e
716                        );
717
718                        // Insert into DLQ
719                        match dlq_db.lock() {
720                            Ok(dlq_conn) => {
721                                if let Err(dlq_error) = dlq_conn.execute(
722                                    &dlq_insert_sql,
723                                    rusqlite::params![id, &data, e.to_string()],
724                                ) {
725                                    error!(
726                                        "CRITICAL: Failed to insert corrupt item {} into DLQ for table {}: {}. Item will be dropped.",
727                                        id, table_name, dlq_error
728                                    );
729                                }
730                            }
731                            Err(poison_err) => {
732                                error!(
733                                    "CRITICAL: DLQ mutex poisoned while handling corrupt item {}: {}. Item will be dropped.",
734                                    id, poison_err
735                                );
736                            }
737                        }
738
739                        // Delete from main queue
740                        tx.execute(&delete_sql, [&id]).map_err(|e| {
741                            error!(
742                                "Failed to delete item {} from table {}: {}",
743                                id, table_name, e
744                            );
745                            DiskQueueError::Database(e)
746                        })?;
747
748                        // Commit the transaction
749                        tx.commit().map_err(|e| {
750                            error!(
751                                "Failed to commit transaction after DLQ move for table {}: {}",
752                                table_name, e
753                            );
754                            DiskQueueError::Database(e)
755                        })?;
756
757                        // Return deserialization error to signal corruption
758                        return Err(DiskQueueError::Deserialization(e));
759                    }
760                };
761
762                // Delete the item from the queue
763                let rows_deleted = tx.execute(&delete_sql, [&id]).map_err(|e| {
764                    error!(
765                        "Failed to delete item {} from table {}: {}",
766                        id, table_name, e
767                    );
768                    DiskQueueError::Database(e)
769                })?;
770
771                if rows_deleted != 1 {
772                    error!(
773                        "Expected to delete 1 row, but deleted {} rows for id {}",
774                        rows_deleted, id
775                    );
776                    return Err(DiskQueueError::UnexpectedRowCount(format!(
777                        "Delete affected {rows_deleted} rows instead of 1 for id {id}"
778                    )));
779                }
780
781                // Commit the transaction - if this fails, message stays in queue
782                tx.commit().map_err(|e| {
783                    error!(
784                        "Failed to commit transaction for table {}: {}",
785                        table_name, e
786                    );
787                    DiskQueueError::Database(e)
788                })?;
789
790                debug!("Successfully dequeued item from disk queue");
791                Ok(Some(item))
792            } else {
793                // No items in queue
794                Ok(None)
795            }
796        })
797        .await
798        .map_err(|e| DiskQueueError::TaskJoin(e.to_string()))?
799    }
800
801    /// Receive multiple messages in a single transaction for much better throughput.
802    ///
803    /// Returns up to `limit` messages from the queue in FIFO order. May return fewer if
804    /// there aren't enough items. Returns an empty `Vec` if the queue is empty.
805    ///
806    /// All messages in the batch are removed atomically - either all succeed or all fail.
807    ///
808    /// # Dead Letter Queue
809    ///
810    /// If any message fails to deserialize, it is moved to the DLQ and skipped.
811    /// The batch will contain only successfully deserialized messages. The transaction
812    /// still commits successfully.
813    ///
814    /// # Performance
815    ///
816    /// Batch operations are approximately 50x faster than individual receives due to
817    /// reduced transaction overhead.
818    ///
819    /// # Example
820    ///
821    /// ```no_run
822    /// # use disk_backed_queue::DiskBackedQueue;
823    /// # use serde::{Deserialize, Serialize};
824    /// # #[derive(Serialize, Deserialize)]
825    /// # struct Message { data: String }
826    /// # #[tokio::main]
827    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
828    /// # let queue = DiskBackedQueue::new("queue.db", "messages".to_string(), None).await?;
829    /// // Receive up to 100 messages at once
830    /// let messages = queue.recv_batch(100).await?;
831    /// println!("Received {} messages", messages.len());
832    /// # Ok(())
833    /// # }
834    /// ```
835    #[instrument(skip_all, fields(table_name = %self.table_name, limit = limit))]
836    pub async fn recv_batch(&self, limit: usize) -> Result<Vec<T>> {
837        if limit == 0 {
838            return Ok(Vec::new());
839        }
840
841        let db = self.db.clone();
842        let dlq_db = self.dlq_db.clone();
843        let table_name = self.table_name.clone();
844        let dlq_insert_sql = self.dlq_insert_sql.clone();
845
846        tokio::task::spawn_blocking(move || {
847            let mut conn = db.lock().map_err(|e| {
848                error!("Failed to acquire database lock: {}", e);
849                DiskQueueError::TaskJoin(format!("Mutex poisoned: {}", e))
850            })?;
851
852            // Start a transaction for atomic batch read-delete
853            let tx = conn.transaction().map_err(|e| {
854                error!("Failed to start transaction for table {}: {}", table_name, e);
855                DiskQueueError::Database(e)
856            })?;
857
858            // Read up to `limit` oldest items
859            // Safety: `table_name` is validated at construction time to contain only
860            // alphanumeric characters and underscores, preventing SQL injection.
861            // `limit` is a usize, so it can only contain numeric values.
862            let select_batch_sql = format!(
863                "SELECT id, data FROM {} ORDER BY created_at ASC, id ASC LIMIT {}",
864                table_name, limit
865            );
866
867            let mut stmt = tx.prepare(&select_batch_sql).map_err(|e| {
868                error!("Failed to prepare SELECT statement for table {}: {}", table_name, e);
869                DiskQueueError::Database(e)
870            })?;
871
872            let rows = stmt
873                .query_map([], |row| Ok((row.get::<_, i64>(0)?, row.get::<_, Vec<u8>>(1)?)))
874                .map_err(|e| {
875                    error!("Failed to execute SELECT query on table {}: {}", table_name, e);
876                    DiskQueueError::Database(e)
877                })?;
878
879            let mut items = Vec::new();
880            let mut ids_to_delete = Vec::new();
881            let config = bincode::config::standard();
882
883            for row_result in rows {
884                let (id, data) = row_result.map_err(|e| {
885                    error!("Failed to read row from table {}: {}", table_name, e);
886                    DiskQueueError::Database(e)
887                })?;
888
889                // Try to deserialize
890                match bincode::serde::decode_from_slice(&data, config) {
891                    Ok((item, _len)) => {
892                        items.push(item);
893                        ids_to_delete.push(id);
894                    }
895                    Err(e) => {
896                        // Deserialization failed - move to dead letter queue
897                        error!(
898                            "Failed to deserialize item from queue (id {}): {}. Moving to DLQ.",
899                            id, e
900                        );
901
902                        // Insert into DLQ (outside main transaction)
903                        match dlq_db.lock() {
904                            Ok(dlq_conn) => {
905                                if let Err(dlq_error) = dlq_conn.execute(
906                                    &dlq_insert_sql,
907                                    rusqlite::params![id, &data, e.to_string()],
908                                ) {
909                                    error!(
910                                        "CRITICAL: Failed to insert corrupt item {} into DLQ for table {}: {}. Item will be dropped.",
911                                        id, table_name, dlq_error
912                                    );
913                                }
914                            }
915                            Err(poison_err) => {
916                                error!(
917                                    "CRITICAL: DLQ mutex poisoned while handling corrupt item {}: {}. Item will be dropped.",
918                                    id, poison_err
919                                );
920                            }
921                        }
922
923                        // Still need to delete from main queue to avoid poison pill loop
924                        ids_to_delete.push(id);
925                    }
926                }
927            }
928
929            drop(stmt); // Release the statement before executing deletes
930
931            // Delete all processed items
932            if !ids_to_delete.is_empty() {
933                // SQLite limit for host parameters is typically 999 or higher, but 900 is safe
934                const BATCH_DELETE_LIMIT: usize = 900;
935
936                for chunk in ids_to_delete.chunks(BATCH_DELETE_LIMIT) {
937                    // Safety: `table_name` is validated at construction time to contain only
938                    // alphanumeric characters and underscores, preventing SQL injection.
939                    // The placeholders (?) are properly parameterized below.
940                    let delete_batch_sql = format!(
941                        "DELETE FROM {} WHERE id IN ({})",
942                        table_name,
943                        chunk
944                            .iter()
945                            .map(|_| "?")
946                            .collect::<Vec<_>>()
947                            .join(",")
948                    );
949
950                    let params: Vec<&dyn rusqlite::ToSql> = chunk
951                        .iter()
952                        .map(|id| id as &dyn rusqlite::ToSql)
953                        .collect();
954
955                    let rows_deleted = tx
956                        .execute(&delete_batch_sql, params.as_slice())
957                        .map_err(|e| {
958                            error!("Failed to delete items from table {}: {}", table_name, e);
959                            DiskQueueError::Database(e)
960                        })?;
961
962                    if rows_deleted != chunk.len() {
963                        error!(
964                            "Expected to delete {} rows, but deleted {} rows",
965                            chunk.len(),
966                            rows_deleted
967                        );
968                        return Err(DiskQueueError::UnexpectedRowCount(format!(
969                            "Delete affected {rows_deleted} rows instead of {}",
970                            chunk.len()
971                        )));
972                    }
973                }
974            }
975
976            // Commit the transaction
977            tx.commit().map_err(|e| {
978                error!("Failed to commit batch transaction for table {}: {}", table_name, e);
979                DiskQueueError::Database(e)
980            })?;
981
982            debug!("Successfully dequeued {} items from disk queue in batch", items.len());
983            Ok(items)
984        })
985        .await
986        .map_err(|e| DiskQueueError::TaskJoin(e.to_string()))?
987    }
988
989    /// Returns the number of messages currently in the queue.
990    ///
991    /// # Example
992    ///
993    /// ```no_run
994    /// # use disk_backed_queue::DiskBackedQueue;
995    /// # use serde::{Deserialize, Serialize};
996    /// # #[derive(Serialize, Deserialize)]
997    /// # struct Message { data: String }
998    /// # #[tokio::main]
999    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1000    /// # let queue = DiskBackedQueue::new("queue.db", "messages".to_string(), None).await?;
1001    /// let count = queue.len().await?;
1002    /// println!("Queue has {} messages", count);
1003    /// # Ok(())
1004    /// # }
1005    /// ```
1006    #[instrument(skip_all, fields(table_name = %self.table_name))]
1007    pub async fn len(&self) -> Result<usize> {
1008        let db = self.db.clone();
1009        let count_sql = self.queries.count_sql.clone();
1010
1011        tokio::task::spawn_blocking(move || {
1012            let conn = db.lock().map_err(|e| {
1013                error!("Failed to acquire database lock: {}", e);
1014                DiskQueueError::TaskJoin(format!("Mutex poisoned: {}", e))
1015            })?;
1016            let count: i64 = conn
1017                .query_row(&count_sql, [], |row| row.get(0))
1018                .map_err(DiskQueueError::Database)?;
1019            Ok(count as usize)
1020        })
1021        .await
1022        .map_err(|e| DiskQueueError::TaskJoin(e.to_string()))?
1023    }
1024
1025    /// Returns `true` if the queue contains no messages.
1026    ///
1027    /// # Example
1028    ///
1029    /// ```no_run
1030    /// # use disk_backed_queue::DiskBackedQueue;
1031    /// # use serde::{Deserialize, Serialize};
1032    /// # #[derive(Serialize, Deserialize)]
1033    /// # struct Message { data: String }
1034    /// # #[tokio::main]
1035    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1036    /// # let queue = DiskBackedQueue::new("queue.db", "messages".to_string(), None).await?;
1037    /// if queue.is_empty().await? {
1038    ///     println!("No messages to process");
1039    /// }
1040    /// # Ok(())
1041    /// # }
1042    /// ```
1043    #[instrument(skip_all, fields(table_name = %self.table_name))]
1044    pub async fn is_empty(&self) -> Result<bool> {
1045        Ok(self.len().await? == 0)
1046    }
1047
1048    /// Removes all messages from the queue.
1049    ///
1050    /// This operation is atomic - either all messages are deleted or none are.
1051    ///
1052    /// # Example
1053    ///
1054    /// ```no_run
1055    /// # use disk_backed_queue::DiskBackedQueue;
1056    /// # use serde::{Deserialize, Serialize};
1057    /// # #[derive(Serialize, Deserialize)]
1058    /// # struct Message { data: String }
1059    /// # #[tokio::main]
1060    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1061    /// # let queue = DiskBackedQueue::new("queue.db", "messages".to_string(), None).await?;
1062    /// queue.clear().await?;
1063    /// assert!(queue.is_empty().await?);
1064    /// # Ok(())
1065    /// # }
1066    /// ```
1067    #[instrument(skip_all, fields(table_name = %self.table_name))]
1068    pub async fn clear(&self) -> Result<()> {
1069        let db = self.db.clone();
1070        let clear_sql = self.queries.clear_sql.clone();
1071
1072        tokio::task::spawn_blocking(move || {
1073            let conn = db.lock().map_err(|e| {
1074                error!("Failed to acquire database lock: {}", e);
1075                DiskQueueError::TaskJoin(format!("Mutex poisoned: {}", e))
1076            })?;
1077            conn.execute(&clear_sql, [])
1078                .map_err(DiskQueueError::Database)?;
1079            debug!("Cleared disk queue");
1080            Ok(())
1081        })
1082        .await
1083        .map_err(|e| DiskQueueError::TaskJoin(e.to_string()))?
1084    }
1085}
1086
1087/// The sending half of a disk-backed channel.
1088///
1089/// Messages sent through this sender are persisted to SQLite. The sender can be
1090/// cloned and shared across multiple tasks or threads safely.
1091///
1092/// Created via [`disk_backed_channel`] or [`disk_backed_channel_with_durability`].
1093///
1094/// # Example
1095///
1096/// ```no_run
1097/// # use disk_backed_queue::disk_backed_channel;
1098/// # use serde::{Deserialize, Serialize};
1099/// # #[derive(Serialize, Deserialize)]
1100/// # struct Message { data: String }
1101/// # #[tokio::main]
1102/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1103/// let (tx, mut rx) = disk_backed_channel::<Message, _>(
1104///     "queue.db",
1105///     "messages".to_string(),
1106///     None,
1107/// ).await?;
1108///
1109/// // Sender can be cloned and moved to other tasks
1110/// let tx2 = tx.clone();
1111/// tokio::spawn(async move {
1112///     tx2.send(Message { data: "from another task".to_string() }).await
1113/// });
1114/// # Ok(())
1115/// # }
1116/// ```
1117#[derive(Debug)]
1118pub struct DiskBackedSender<T> {
1119    queue: std::sync::Arc<DiskBackedQueue<T>>,
1120}
1121
1122impl<T> DiskBackedSender<T>
1123where
1124    T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static,
1125{
1126    /// Send a single message to the queue.
1127    ///
1128    /// See [`DiskBackedQueue::send`] for details.
1129    ///
1130    /// # Example
1131    ///
1132    /// ```no_run
1133    /// # use disk_backed_queue::disk_backed_channel;
1134    /// # use serde::{Deserialize, Serialize};
1135    /// # #[derive(Serialize, Deserialize)]
1136    /// # struct Message { data: String }
1137    /// # #[tokio::main]
1138    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1139    /// # let (tx, mut rx) = disk_backed_channel::<Message, _>("queue.db", "messages".to_string(), None).await?;
1140    /// tx.send(Message { data: "hello".to_string() }).await?;
1141    /// # Ok(())
1142    /// # }
1143    /// ```
1144    pub async fn send(&self, item: T) -> Result<()> {
1145        self.queue.send(item).await
1146    }
1147
1148    /// Send multiple messages in a single transaction for much better throughput.
1149    ///
1150    /// See [`DiskBackedQueue::send_batch`] for details.
1151    ///
1152    /// # Example
1153    ///
1154    /// ```no_run
1155    /// # use disk_backed_queue::disk_backed_channel;
1156    /// # use serde::{Deserialize, Serialize};
1157    /// # #[derive(Serialize, Deserialize)]
1158    /// # struct Message { data: String }
1159    /// # #[tokio::main]
1160    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1161    /// # let (tx, mut rx) = disk_backed_channel::<Message, _>("queue.db", "messages".to_string(), None).await?;
1162    /// let messages = vec![
1163    ///     Message { data: "msg1".to_string() },
1164    ///     Message { data: "msg2".to_string() },
1165    /// ];
1166    /// tx.send_batch(messages).await?;
1167    /// # Ok(())
1168    /// # }
1169    /// ```
1170    pub async fn send_batch(&self, items: Vec<T>) -> Result<()> {
1171        self.queue.send_batch(items).await
1172    }
1173
1174    /// Blocking send for use in synchronous contexts.
1175    ///
1176    /// This method blocks the current thread until the message is persisted.
1177    /// If called from within a Tokio runtime, it uses `block_in_place` to avoid
1178    /// blocking the async executor. If called outside a runtime, it creates a
1179    /// temporary runtime.
1180    ///
1181    /// # Performance
1182    ///
1183    /// This should be used sparingly as it blocks a thread. Prefer async `send`
1184    /// when possible.
1185    ///
1186    /// # Example
1187    ///
1188    /// ```no_run
1189    /// # use disk_backed_queue::disk_backed_channel;
1190    /// # use serde::{Deserialize, Serialize};
1191    /// # #[derive(Serialize, Deserialize)]
1192    /// # struct Message { data: String }
1193    /// # #[tokio::main]
1194    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1195    /// # let (tx, mut rx) = disk_backed_channel::<Message, _>("queue.db", "messages".to_string(), None).await?;
1196    /// // In a synchronous context
1197    /// std::thread::spawn(move || {
1198    ///     tx.blocking_send(Message { data: "from sync".to_string() })
1199    /// });
1200    /// # Ok(())
1201    /// # }
1202    /// ```
1203    pub fn blocking_send(&self, item: T) -> Result<()> {
1204        // Use block_in_place if we're in a runtime context, otherwise create a new runtime
1205        if tokio::runtime::Handle::try_current().is_ok() {
1206            tokio::task::block_in_place(|| {
1207                tokio::runtime::Handle::current().block_on(self.send(item))
1208            })
1209        } else {
1210            // Not in a runtime, create a new one
1211            let runtime = tokio::runtime::Runtime::new().map_err(|e| {
1212                error!("Failed to create Tokio runtime: {}", e);
1213                DiskQueueError::TaskJoin(format!("Runtime creation failed: {}", e))
1214            })?;
1215            runtime.block_on(self.send(item))
1216        }
1217    }
1218}
1219
1220impl<T> Clone for DiskBackedSender<T> {
1221    /// Clone the sender.
1222    ///
1223    /// This is a cheap operation that only clones an `Arc` pointer. All cloned
1224    /// senders share the same underlying queue and can be safely used from
1225    /// multiple tasks or threads.
1226    fn clone(&self) -> Self {
1227        Self {
1228            queue: self.queue.clone(),
1229        }
1230    }
1231}
1232
1233/// The receiving half of a disk-backed channel.
1234///
1235/// Messages are received from SQLite in FIFO order. Unlike the sender, the receiver
1236/// cannot be cloned (similar to `tokio::sync::mpsc::Receiver`).
1237///
1238/// Created via [`disk_backed_channel`] or [`disk_backed_channel_with_durability`].
1239///
1240/// # Example
1241///
1242/// ```no_run
1243/// # use disk_backed_queue::disk_backed_channel;
1244/// # use serde::{Deserialize, Serialize};
1245/// # #[derive(Serialize, Deserialize, Debug)]
1246/// # struct Message { data: String }
1247/// # #[tokio::main]
1248/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1249/// let (tx, mut rx) = disk_backed_channel::<Message, _>(
1250///     "queue.db",
1251///     "messages".to_string(),
1252///     None,
1253/// ).await?;
1254///
1255/// // Receive messages
1256/// while let Some(msg) = rx.recv().await? {
1257///     println!("Received: {:?}", msg);
1258/// }
1259/// # Ok(())
1260/// # }
1261/// ```
1262#[derive(Debug)]
1263pub struct DiskBackedReceiver<T> {
1264    queue: std::sync::Arc<DiskBackedQueue<T>>,
1265}
1266
1267impl<T> DiskBackedReceiver<T>
1268where
1269    T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static,
1270{
1271    /// Receive a single message from the queue.
1272    ///
1273    /// Returns `Ok(None)` if the queue is empty (non-blocking).
1274    /// See [`DiskBackedQueue::recv`] for details about dead letter queue handling.
1275    ///
1276    /// # Example
1277    ///
1278    /// ```no_run
1279    /// # use disk_backed_queue::disk_backed_channel;
1280    /// # use serde::{Deserialize, Serialize};
1281    /// # #[derive(Serialize, Deserialize)]
1282    /// # struct Message { data: String }
1283    /// # #[tokio::main]
1284    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1285    /// # let (tx, mut rx) = disk_backed_channel::<Message, _>("queue.db", "messages".to_string(), None).await?;
1286    /// match rx.recv().await? {
1287    ///     Some(msg) => println!("Got message"),
1288    ///     None => println!("Queue is empty"),
1289    /// }
1290    /// # Ok(())
1291    /// # }
1292    /// ```
1293    pub async fn recv(&mut self) -> Result<Option<T>> {
1294        self.queue.recv().await
1295    }
1296
1297    /// Receive multiple messages in a single transaction for much better throughput.
1298    ///
1299    /// Returns up to `limit` messages from the queue. May return fewer if there aren't
1300    /// enough items. Returns an empty `Vec` if the queue is empty.
1301    ///
1302    /// See [`DiskBackedQueue::recv_batch`] for details.
1303    ///
1304    /// # Example
1305    ///
1306    /// ```no_run
1307    /// # use disk_backed_queue::disk_backed_channel;
1308    /// # use serde::{Deserialize, Serialize};
1309    /// # #[derive(Serialize, Deserialize)]
1310    /// # struct Message { data: String }
1311    /// # #[tokio::main]
1312    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1313    /// # let (tx, mut rx) = disk_backed_channel::<Message, _>("queue.db", "messages".to_string(), None).await?;
1314    /// // Receive up to 100 messages at once
1315    /// let messages = rx.recv_batch(100).await?;
1316    /// for msg in messages {
1317    ///     // Process message
1318    /// }
1319    /// # Ok(())
1320    /// # }
1321    /// ```
1322    pub async fn recv_batch(&mut self, limit: usize) -> Result<Vec<T>> {
1323        self.queue.recv_batch(limit).await
1324    }
1325
1326    /// Returns the number of messages currently in the queue.
1327    ///
1328    /// # Example
1329    ///
1330    /// ```no_run
1331    /// # use disk_backed_queue::disk_backed_channel;
1332    /// # use serde::{Deserialize, Serialize};
1333    /// # #[derive(Serialize, Deserialize)]
1334    /// # struct Message { data: String }
1335    /// # #[tokio::main]
1336    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1337    /// # let (tx, mut rx) = disk_backed_channel::<Message, _>("queue.db", "messages".to_string(), None).await?;
1338    /// let count = rx.len().await?;
1339    /// println!("Queue has {} messages", count);
1340    /// # Ok(())
1341    /// # }
1342    /// ```
1343    pub async fn len(&self) -> Result<usize> {
1344        self.queue.len().await
1345    }
1346
1347    /// Returns `true` if the queue contains no messages.
1348    ///
1349    /// # Example
1350    ///
1351    /// ```no_run
1352    /// # use disk_backed_queue::disk_backed_channel;
1353    /// # use serde::{Deserialize, Serialize};
1354    /// # #[derive(Serialize, Deserialize)]
1355    /// # struct Message { data: String }
1356    /// # #[tokio::main]
1357    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1358    /// # let (tx, mut rx) = disk_backed_channel::<Message, _>("queue.db", "messages".to_string(), None).await?;
1359    /// if rx.is_empty().await? {
1360    ///     println!("No messages to process");
1361    /// }
1362    /// # Ok(())
1363    /// # }
1364    /// ```
1365    pub async fn is_empty(&self) -> Result<bool> {
1366        self.queue.is_empty().await
1367    }
1368}
1369
1370/// Create a disk-backed channel with default durability (FULL).
1371///
1372/// This is the recommended way to create a disk-backed queue for most use cases.
1373/// It provides maximum data safety by fsyncing after every commit, ensuring messages
1374/// survive power loss.
1375///
1376/// # Arguments
1377///
1378/// * `db_path` - Path to the SQLite database file. Will be created if it doesn't exist.
1379/// * `table_name` - Name of the table to store messages. Must contain only alphanumeric
1380///   characters and underscores, and be between 1-128 characters.
1381/// * `max_size` - Optional maximum queue size. When set, senders will block when the
1382///   queue is full. Use `None` for unbounded queues.
1383///
1384/// # Returns
1385///
1386/// A tuple of `(sender, receiver)` where the sender can be cloned and shared across
1387/// tasks, while the receiver is single-threaded.
1388///
1389/// # Example
1390///
1391/// ```no_run
1392/// use disk_backed_queue::disk_backed_channel;
1393/// use serde::{Deserialize, Serialize};
1394///
1395/// #[derive(Serialize, Deserialize, Debug)]
1396/// struct Message {
1397///     id: u64,
1398///     content: String,
1399/// }
1400///
1401/// #[tokio::main]
1402/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
1403///     let (tx, mut rx) = disk_backed_channel::<Message, _>(
1404///         "my_queue.db",
1405///         "messages".to_string(),
1406///         Some(10_000),  // Max 10,000 messages
1407///     ).await?;
1408///
1409///     // Send from multiple tasks
1410///     let tx2 = tx.clone();
1411///     tokio::spawn(async move {
1412///         tx2.send(Message { id: 1, content: "hello".to_string() }).await
1413///     });
1414///
1415///     // Receive from single task
1416///     while let Some(msg) = rx.recv().await? {
1417///         println!("Received: {:?}", msg);
1418///     }
1419///
1420///     Ok(())
1421/// }
1422/// ```
1423pub async fn disk_backed_channel<T, P: AsRef<Path>>(
1424    db_path: P,
1425    table_name: String,
1426    max_size: Option<usize>,
1427) -> Result<(DiskBackedSender<T>, DiskBackedReceiver<T>)>
1428where
1429    T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static,
1430{
1431    disk_backed_channel_with_durability(db_path, table_name, max_size, DurabilityLevel::default())
1432        .await
1433}
1434
1435/// Create a disk-backed channel with custom durability level.
1436///
1437/// This allows you to trade off between performance and data safety by choosing
1438/// different SQLite synchronous modes.
1439///
1440/// # Durability Levels
1441///
1442/// * [`DurabilityLevel::Full`] (default) - Maximum safety, survives power loss
1443/// * [`DurabilityLevel::Normal`] - Balanced performance, safe in most crashes
1444/// * [`DurabilityLevel::Off`] - Fastest, but data loss on power failure
1445/// * [`DurabilityLevel::Extra`] - Paranoid mode, additional fsyncs
1446///
1447/// # Example
1448///
1449/// ```no_run
1450/// use disk_backed_queue::{disk_backed_channel_with_durability, DurabilityLevel};
1451/// use serde::{Deserialize, Serialize};
1452///
1453/// #[derive(Serialize, Deserialize)]
1454/// struct Message { data: String }
1455///
1456/// #[tokio::main]
1457/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
1458///     // For critical data (default)
1459///     let (tx, rx) = disk_backed_channel_with_durability::<Message, _>(
1460///         "critical.db",
1461///         "messages".to_string(),
1462///         None,
1463///         DurabilityLevel::Full,
1464///     ).await?;
1465///
1466///     // For high-performance temporary cache
1467///     let (tx_fast, rx_fast) = disk_backed_channel_with_durability::<Message, _>(
1468///         "cache.db",
1469///         "temp".to_string(),
1470///         None,
1471///         DurabilityLevel::Normal,
1472///     ).await?;
1473///
1474///     Ok(())
1475/// }
1476/// ```
1477pub async fn disk_backed_channel_with_durability<T, P: AsRef<Path>>(
1478    db_path: P,
1479    table_name: String,
1480    max_size: Option<usize>,
1481    durability: DurabilityLevel,
1482) -> Result<(DiskBackedSender<T>, DiskBackedReceiver<T>)>
1483where
1484    T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static,
1485{
1486    let queue = DiskBackedQueue::with_durability(db_path, table_name, max_size, durability).await?;
1487    let queue_arc = std::sync::Arc::new(queue);
1488
1489    let sender = DiskBackedSender {
1490        queue: queue_arc.clone(),
1491    };
1492
1493    let receiver = DiskBackedReceiver { queue: queue_arc };
1494
1495    Ok((sender, receiver))
1496}
1497
1498#[cfg(test)]
1499mod tests {
1500    use super::*;
1501    use serde_derive::{Deserialize, Serialize};
1502    use tempfile::NamedTempFile;
1503
1504    #[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
1505    struct TestMessage {
1506        id: u64,
1507        content: String,
1508    }
1509
1510    #[tokio::test]
1511    async fn test_disk_backed_queue_basic() {
1512        let temp_file = NamedTempFile::new().unwrap();
1513        let queue = DiskBackedQueue::new(temp_file.path(), "test_queue".to_string(), None)
1514            .await
1515            .unwrap();
1516
1517        // Test empty queue
1518        assert!(queue.is_empty().await.unwrap());
1519        assert_eq!(queue.len().await.unwrap(), 0);
1520        assert!(queue.recv().await.unwrap().is_none());
1521
1522        // Send some messages
1523        let msg1 = TestMessage {
1524            id: 1,
1525            content: "Hello".to_string(),
1526        };
1527        let msg2 = TestMessage {
1528            id: 2,
1529            content: "World".to_string(),
1530        };
1531
1532        queue.send(msg1.clone()).await.unwrap();
1533        queue.send(msg2.clone()).await.unwrap();
1534
1535        // Test queue state
1536        assert!(!queue.is_empty().await.unwrap());
1537        assert_eq!(queue.len().await.unwrap(), 2);
1538
1539        // Receive messages in FIFO order
1540        let received1 = queue.recv().await.unwrap().unwrap();
1541        assert_eq!(received1, msg1);
1542        assert_eq!(queue.len().await.unwrap(), 1);
1543
1544        let received2 = queue.recv().await.unwrap().unwrap();
1545        assert_eq!(received2, msg2);
1546        assert_eq!(queue.len().await.unwrap(), 0);
1547
1548        // Queue should be empty now
1549        assert!(queue.is_empty().await.unwrap());
1550        assert!(queue.recv().await.unwrap().is_none());
1551    }
1552
1553    #[tokio::test]
1554    async fn test_disk_backed_channel() {
1555        let temp_file = NamedTempFile::new().unwrap();
1556        let (sender, mut receiver) = disk_backed_channel::<TestMessage, _>(
1557            temp_file.path(),
1558            "test_channel".to_string(),
1559            None,
1560        )
1561        .await
1562        .unwrap();
1563
1564        let msg = TestMessage {
1565            id: 42,
1566            content: "Channel test".to_string(),
1567        };
1568
1569        sender.send(msg.clone()).await.unwrap();
1570        let received = receiver.recv().await.unwrap().unwrap();
1571        assert_eq!(received, msg);
1572    }
1573
1574    #[tokio::test]
1575    async fn test_persistence() {
1576        let temp_file = NamedTempFile::new().unwrap();
1577        let temp_path = temp_file.path().to_path_buf();
1578
1579        let msg = TestMessage {
1580            id: 99,
1581            content: "Persistent message".to_string(),
1582        };
1583
1584        // Create queue, send message, and drop it
1585        {
1586            let queue = DiskBackedQueue::new(&temp_path, "persistence_test".to_string(), None)
1587                .await
1588                .unwrap();
1589            queue.send(msg.clone()).await.unwrap();
1590        }
1591
1592        // Create new queue instance and verify message is still there
1593        {
1594            let queue: DiskBackedQueue<TestMessage> =
1595                DiskBackedQueue::new(&temp_path, "persistence_test".to_string(), None)
1596                    .await
1597                    .unwrap();
1598            assert_eq!(queue.len().await.unwrap(), 1);
1599            let received = queue.recv().await.unwrap().unwrap();
1600            assert_eq!(received, msg);
1601        }
1602    }
1603
1604    #[tokio::test]
1605    async fn test_error_handling_database_corruption() {
1606        let temp_file = NamedTempFile::new().unwrap();
1607        let queue = DiskBackedQueue::new(temp_file.path(), "test_queue".to_string(), None)
1608            .await
1609            .unwrap();
1610
1611        // Send a valid message
1612        let msg = TestMessage {
1613            id: 1,
1614            content: "Test".to_string(),
1615        };
1616        queue.send(msg).await.unwrap();
1617
1618        // Manually corrupt the database by writing invalid binary data
1619        {
1620            let db = queue.db.lock().unwrap();
1621            let garbage_data: Vec<u8> = vec![0xFF, 0xFE, 0xFD, 0xFC]; // Invalid bincode
1622            db.execute(
1623                "UPDATE test_queue SET data = ? WHERE id = 1",
1624                [&garbage_data],
1625            )
1626            .unwrap();
1627        }
1628
1629        // Try to receive - should return deserialization error but message moved to DLQ
1630        let result = queue.recv().await;
1631        assert!(result.is_err(), "Expected error, got: {:?}", result);
1632        assert!(
1633            matches!(result, Err(DiskQueueError::Deserialization(_))),
1634            "Expected Deserialization error, got: {:?}",
1635            result
1636        );
1637    }
1638
1639    #[tokio::test]
1640    async fn test_concurrent_access() {
1641        let temp_file = NamedTempFile::new().unwrap();
1642        let queue = std::sync::Arc::new(
1643            DiskBackedQueue::new(temp_file.path(), "concurrent_test".to_string(), None)
1644                .await
1645                .unwrap(),
1646        );
1647
1648        // Spawn multiple senders
1649        let mut send_handles = vec![];
1650        for i in 0..10 {
1651            let queue_clone = queue.clone();
1652            let handle = tokio::spawn(async move {
1653                for j in 0..10 {
1654                    let msg = TestMessage {
1655                        id: i * 10 + j,
1656                        content: format!("Message from sender {i}, iteration {j}"),
1657                    };
1658                    queue_clone.send(msg).await.unwrap();
1659                }
1660            });
1661            send_handles.push(handle);
1662        }
1663
1664        // Spawn multiple receivers
1665        let mut recv_handles = vec![];
1666        let received_messages = std::sync::Arc::new(tokio::sync::Mutex::new(Vec::new()));
1667
1668        for _ in 0..5 {
1669            let queue_clone = queue.clone();
1670            let messages_clone = received_messages.clone();
1671            let handle = tokio::spawn(async move {
1672                loop {
1673                    match queue_clone.recv().await {
1674                        Ok(Some(msg)) => {
1675                            messages_clone.lock().await.push(msg);
1676                        }
1677                        Ok(None) => {
1678                            // No more messages, small delay before checking again
1679                            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1680                        }
1681                        Err(_) => break,
1682                    }
1683
1684                    // Stop when we've received all messages
1685                    if messages_clone.lock().await.len() >= 100 {
1686                        break;
1687                    }
1688                }
1689            });
1690            recv_handles.push(handle);
1691        }
1692
1693        // Wait for all senders to complete
1694        for handle in send_handles {
1695            handle.await.unwrap();
1696        }
1697
1698        // Wait for all messages to be received with timeout
1699        let start = tokio::time::Instant::now();
1700        let timeout = tokio::time::Duration::from_secs(10);
1701
1702        loop {
1703            let count = received_messages.lock().await.len();
1704            if count >= 100 {
1705                break;
1706            }
1707
1708            if start.elapsed() > timeout {
1709                panic!(
1710                    "Timeout: Only received {} messages after {:?}",
1711                    count, timeout
1712                );
1713            }
1714
1715            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1716        }
1717
1718        // Check that all messages were received
1719        let messages = received_messages.lock().await;
1720        assert_eq!(messages.len(), 100);
1721
1722        // Clean up receivers
1723        for handle in recv_handles {
1724            handle.abort();
1725        }
1726    }
1727
1728    #[tokio::test]
1729    async fn test_empty_queue_operations() {
1730        let temp_file = NamedTempFile::new().unwrap();
1731        let queue: DiskBackedQueue<TestMessage> =
1732            DiskBackedQueue::new(temp_file.path(), "empty_test".to_string(), None)
1733                .await
1734                .unwrap();
1735
1736        // Multiple recv calls on empty queue should all return None
1737        for _ in 0..5 {
1738            assert!(queue.recv().await.unwrap().is_none());
1739        }
1740
1741        assert!(queue.is_empty().await.unwrap());
1742        assert_eq!(queue.len().await.unwrap(), 0);
1743    }
1744
1745    #[tokio::test]
1746    async fn test_large_messages() {
1747        let temp_file = NamedTempFile::new().unwrap();
1748        let queue = DiskBackedQueue::new(temp_file.path(), "large_test".to_string(), None)
1749            .await
1750            .unwrap();
1751
1752        // Create a large message (1MB of content)
1753        let large_content = "x".repeat(1024 * 1024);
1754        let large_msg = TestMessage {
1755            id: 42,
1756            content: large_content.clone(),
1757        };
1758
1759        // Send and receive large message
1760        queue.send(large_msg.clone()).await.unwrap();
1761        let received = queue.recv().await.unwrap().unwrap();
1762
1763        assert_eq!(received.id, 42);
1764        assert_eq!(received.content.len(), 1024 * 1024);
1765        assert_eq!(received.content, large_content);
1766    }
1767
1768    #[tokio::test]
1769    async fn test_fifo_ordering() {
1770        let temp_file = NamedTempFile::new().unwrap();
1771        let queue = DiskBackedQueue::new(temp_file.path(), "fifo_test".to_string(), None)
1772            .await
1773            .unwrap();
1774
1775        // Send messages in order
1776        let messages: Vec<TestMessage> = (0..100)
1777            .map(|i| TestMessage {
1778                id: i,
1779                content: format!("Message {i}"),
1780            })
1781            .collect();
1782
1783        for msg in &messages {
1784            queue.send(msg.clone()).await.unwrap();
1785        }
1786
1787        // Receive messages and verify FIFO order
1788        for expected_msg in &messages {
1789            let received = queue.recv().await.unwrap().unwrap();
1790            assert_eq!(received, *expected_msg);
1791        }
1792
1793        // Queue should be empty now
1794        assert!(queue.recv().await.unwrap().is_none());
1795    }
1796
1797    #[tokio::test]
1798    async fn test_invalid_table_name() {
1799        let temp_file = NamedTempFile::new().unwrap();
1800
1801        // Try to create queue with invalid table name
1802        let _result = DiskBackedQueue::<TestMessage>::new(
1803            temp_file.path(),
1804            "invalid-table-name-with-dashes".to_string(),
1805            None,
1806        )
1807        .await;
1808
1809        // This should still work because SQLite is quite permissive
1810        // but let's test a truly invalid name
1811        let result2 = DiskBackedQueue::<TestMessage>::new(
1812            temp_file.path(),
1813            "".to_string(), // Empty table name should cause issues
1814            None,
1815        )
1816        .await;
1817
1818        // Empty table name should cause an error
1819        assert!(result2.is_err());
1820    }
1821
1822    #[tokio::test(flavor = "multi_thread")]
1823    async fn test_blocking_send() {
1824        let temp_file = NamedTempFile::new().unwrap();
1825        let (sender, mut receiver) = disk_backed_channel::<TestMessage, _>(
1826            temp_file.path(),
1827            "blocking_test".to_string(),
1828            None,
1829        )
1830        .await
1831        .unwrap();
1832
1833        let msg = TestMessage {
1834            id: 123,
1835            content: "Blocking test".to_string(),
1836        };
1837
1838        // Test blocking send
1839        sender.blocking_send(msg.clone()).unwrap();
1840
1841        // Receive and verify
1842        let received = receiver.recv().await.unwrap().unwrap();
1843        assert_eq!(received, msg);
1844    }
1845
1846    #[tokio::test]
1847    async fn test_database_file_permissions() {
1848        let temp_file = NamedTempFile::new().unwrap();
1849        let temp_path = temp_file.path().to_path_buf();
1850
1851        // Create queue first
1852        let _queue = DiskBackedQueue::<TestMessage>::new(&temp_path, "perm_test".to_string(), None)
1853            .await
1854            .unwrap();
1855
1856        // Check that database file was created
1857        assert!(temp_path.exists());
1858
1859        // Basic metadata check (permissions test is platform-specific)
1860        let metadata = std::fs::metadata(&temp_path).unwrap();
1861        assert!(metadata.is_file());
1862        assert!(metadata.len() > 0); // Should have some content (SQLite header)
1863    }
1864
1865    #[tokio::test]
1866    async fn test_dlq_file_created() {
1867        let temp_file = NamedTempFile::new().unwrap();
1868        let temp_path = temp_file.path();
1869
1870        disk_backed_channel::<TestMessage, _>(temp_path, "test".to_string(), None)
1871            .await
1872            .unwrap();
1873
1874        // Check DLQ file exists
1875        let dlq_path = temp_path.with_extension("dlq.db");
1876        assert!(dlq_path.exists());
1877    }
1878
1879    #[tokio::test]
1880    async fn test_transaction_rollback_on_corruption() {
1881        let temp_file = NamedTempFile::new().unwrap();
1882        let queue = DiskBackedQueue::new(temp_file.path(), "test_queue".to_string(), None)
1883            .await
1884            .unwrap();
1885
1886        // Send a valid message
1887        let msg = TestMessage {
1888            id: 1,
1889            content: "Valid".to_string(),
1890        };
1891        queue.send(msg).await.unwrap();
1892
1893        // Manually corrupt the database by writing invalid binary data
1894        {
1895            let db = queue.db.lock().unwrap();
1896            let garbage_data: Vec<u8> = vec![0xFF, 0xFE, 0xFD, 0xFC]; // Invalid bincode
1897            db.execute(
1898                "UPDATE test_queue SET data = ? WHERE id = 1",
1899                [&garbage_data],
1900            )
1901            .unwrap();
1902        }
1903
1904        // Try to receive - should return deserialization error
1905        let result = queue.recv().await;
1906        assert!(result.is_err());
1907        assert!(matches!(result, Err(DiskQueueError::Deserialization(_))));
1908
1909        // Verify message was moved to DLQ
1910        let dlq_path = temp_file.path().with_extension("dlq.db");
1911        let dlq_conn = rusqlite::Connection::open(&dlq_path).unwrap();
1912        let count: i64 = dlq_conn
1913            .query_row("SELECT COUNT(*) FROM test_queue_dlq", [], |row| row.get(0))
1914            .unwrap();
1915        assert_eq!(count, 1);
1916
1917        // Verify main queue is empty
1918        assert_eq!(queue.len().await.unwrap(), 0);
1919    }
1920
1921    #[tokio::test]
1922    async fn test_max_size_blocks() {
1923        let temp_file = NamedTempFile::new().unwrap();
1924        let (tx, mut rx) = disk_backed_channel::<TestMessage, _>(
1925            temp_file.path(),
1926            "test".to_string(),
1927            Some(2), // Max 2 messages
1928        )
1929        .await
1930        .unwrap();
1931
1932        let msg1 = TestMessage {
1933            id: 1,
1934            content: "Message 1".to_string(),
1935        };
1936        let msg2 = TestMessage {
1937            id: 2,
1938            content: "Message 2".to_string(),
1939        };
1940        let msg3 = TestMessage {
1941            id: 3,
1942            content: "Message 3".to_string(),
1943        };
1944
1945        // Send 2 messages (should succeed immediately)
1946        tx.send(msg1.clone()).await.unwrap();
1947        tx.send(msg2.clone()).await.unwrap();
1948
1949        // Send 3rd message in background (should block until space available)
1950        let tx_clone = tx.clone();
1951        let handle = tokio::spawn(async move { tx_clone.send(msg3).await });
1952
1953        // Give it time to attempt send (should be blocked)
1954        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
1955        assert!(!handle.is_finished());
1956
1957        // Receive one message to make space
1958        let received = rx.recv().await.unwrap().unwrap();
1959        assert_eq!(received, msg1);
1960
1961        // Now 3rd send should complete
1962        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
1963        assert!(handle.is_finished());
1964        handle.await.unwrap().unwrap();
1965
1966        // Verify queue has 2 messages
1967        assert_eq!(rx.len().await.unwrap(), 2);
1968    }
1969
1970    #[tokio::test]
1971    async fn test_send_batch() {
1972        let temp_file = NamedTempFile::new().unwrap();
1973        let queue = DiskBackedQueue::new(temp_file.path(), "batch_test".to_string(), None)
1974            .await
1975            .unwrap();
1976
1977        // Create a batch of messages
1978        let messages: Vec<TestMessage> = (0..100)
1979            .map(|i| TestMessage {
1980                id: i,
1981                content: format!("Batch message {}", i),
1982            })
1983            .collect();
1984
1985        // Send batch
1986        queue.send_batch(messages.clone()).await.unwrap();
1987
1988        // Verify count
1989        assert_eq!(queue.len().await.unwrap(), 100);
1990
1991        // Receive and verify all messages
1992        for i in 0..100 {
1993            let received = queue.recv().await.unwrap().unwrap();
1994            assert_eq!(received, messages[i]);
1995        }
1996
1997        assert!(queue.is_empty().await.unwrap());
1998    }
1999
2000    #[tokio::test]
2001    async fn test_recv_batch() {
2002        let temp_file = NamedTempFile::new().unwrap();
2003        let queue = DiskBackedQueue::new(temp_file.path(), "batch_recv_test".to_string(), None)
2004            .await
2005            .unwrap();
2006
2007        // Send 100 individual messages
2008        for i in 0..100 {
2009            let msg = TestMessage {
2010                id: i,
2011                content: format!("Message {}", i),
2012            };
2013            queue.send(msg).await.unwrap();
2014        }
2015
2016        // Receive in batches of 25
2017        let batch1 = queue.recv_batch(25).await.unwrap();
2018        assert_eq!(batch1.len(), 25);
2019        assert_eq!(batch1[0].id, 0);
2020        assert_eq!(batch1[24].id, 24);
2021
2022        let batch2 = queue.recv_batch(25).await.unwrap();
2023        assert_eq!(batch2.len(), 25);
2024        assert_eq!(batch2[0].id, 25);
2025        assert_eq!(batch2[24].id, 49);
2026
2027        // Receive remaining 50 items (but only request 100)
2028        let batch3 = queue.recv_batch(100).await.unwrap();
2029        assert_eq!(batch3.len(), 50); // Should only get 50
2030        assert_eq!(batch3[0].id, 50);
2031        assert_eq!(batch3[49].id, 99);
2032
2033        // Queue should be empty
2034        assert!(queue.is_empty().await.unwrap());
2035        let empty_batch = queue.recv_batch(10).await.unwrap();
2036        assert!(empty_batch.is_empty());
2037    }
2038
2039    #[tokio::test]
2040    async fn test_batch_with_channel_api() {
2041        let temp_file = NamedTempFile::new().unwrap();
2042        let (tx, mut rx) = disk_backed_channel::<TestMessage, _>(
2043            temp_file.path(),
2044            "batch_channel_test".to_string(),
2045            None,
2046        )
2047        .await
2048        .unwrap();
2049
2050        // Send batch via sender
2051        let messages: Vec<TestMessage> = (0..50)
2052            .map(|i| TestMessage {
2053                id: i,
2054                content: format!("Batch {}", i),
2055            })
2056            .collect();
2057
2058        tx.send_batch(messages.clone()).await.unwrap();
2059
2060        // Receive batch via receiver
2061        let received = rx.recv_batch(50).await.unwrap();
2062        assert_eq!(received.len(), 50);
2063        assert_eq!(received, messages);
2064    }
2065
2066    #[tokio::test]
2067    async fn test_batch_performance_comparison() {
2068        let temp_file1 = NamedTempFile::new().unwrap();
2069        let temp_file2 = NamedTempFile::new().unwrap();
2070        let queue_single = DiskBackedQueue::new(temp_file1.path(), "single".to_string(), None)
2071            .await
2072            .unwrap();
2073        let queue_batch = DiskBackedQueue::new(temp_file2.path(), "batch".to_string(), None)
2074            .await
2075            .unwrap();
2076
2077        let message_count = 1000;
2078        let messages: Vec<TestMessage> = (0..message_count)
2079            .map(|i| TestMessage {
2080                id: i,
2081                content: format!("Perf test {}", i),
2082            })
2083            .collect();
2084
2085        // Single send
2086        let start = std::time::Instant::now();
2087        for msg in messages.clone() {
2088            queue_single.send(msg).await.unwrap();
2089        }
2090        let single_duration = start.elapsed();
2091
2092        // Batch send
2093        let start = std::time::Instant::now();
2094        queue_batch.send_batch(messages.clone()).await.unwrap();
2095        let batch_duration = start.elapsed();
2096
2097        println!(
2098            "Single send: {:?}, Batch send: {:?}, Speedup: {:.2}x",
2099            single_duration,
2100            batch_duration,
2101            single_duration.as_secs_f64() / batch_duration.as_secs_f64()
2102        );
2103
2104        // Batch should be significantly faster (at least 5x)
2105        assert!(batch_duration < single_duration / 5);
2106    }
2107
2108    #[tokio::test]
2109    async fn test_batch_with_corrupted_data() {
2110        let temp_file = NamedTempFile::new().unwrap();
2111        let queue = DiskBackedQueue::new(temp_file.path(), "batch_corrupt_test".to_string(), None)
2112            .await
2113            .unwrap();
2114
2115        // Send 10 valid messages
2116        for i in 0..10 {
2117            let msg = TestMessage {
2118                id: i,
2119                content: format!("Valid {}", i),
2120            };
2121            queue.send(msg).await.unwrap();
2122        }
2123
2124        // Corrupt message 5
2125        {
2126            let db = queue.db.lock().unwrap();
2127            let garbage_data: Vec<u8> = vec![0xFF, 0xFE, 0xFD, 0xFC];
2128            db.execute(
2129                "UPDATE batch_corrupt_test SET data = ? WHERE id = 6",
2130                [&garbage_data],
2131            )
2132            .unwrap();
2133        }
2134
2135        // Batch receive should skip corrupted message and move it to DLQ
2136        let batch = queue.recv_batch(10).await.unwrap();
2137        assert_eq!(batch.len(), 9); // Should get 9 valid messages (10 - 1 corrupted)
2138
2139        // Verify DLQ has 1 entry
2140        let dlq_path = temp_file.path().with_extension("dlq.db");
2141        let dlq_conn = rusqlite::Connection::open(&dlq_path).unwrap();
2142        let dlq_count: i64 = dlq_conn
2143            .query_row("SELECT COUNT(*) FROM batch_corrupt_test_dlq", [], |row| {
2144                row.get(0)
2145            })
2146            .unwrap();
2147        assert_eq!(dlq_count, 1);
2148    }
2149}