Skip to main content

alien_bindings/providers/queue/
local.rs

1use crate::error::{ErrorData, Result};
2use crate::traits::{
3    Binding, MessagePayload, Queue, QueueMessage, MAX_BATCH_SIZE, MAX_MESSAGE_BYTES,
4};
5use alien_core::bindings::LocalQueueBinding;
6use alien_error::{AlienError, Context, IntoAlienError};
7use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10use std::path::PathBuf;
11use std::sync::Arc;
12use tokio::sync::Mutex;
13
14const LEASE_DURATION_SECS: i64 = 30;
15
16/// Local disk-persisted queue implementation using sled embedded database.
17///
18/// This provides a persistent, thread-safe, disk-based message queue that implements
19/// all Queue trait features including send, receive with visibility timeout, and ack.
20/// Messages survive process restarts.
21#[derive(Debug)]
22pub struct LocalQueue {
23    db: Arc<Mutex<sled::Db>>,
24}
25
26/// Stored message format that avoids serde issues with `MessagePayload`'s internal tagging.
27/// We store the payload as a raw JSON value and a discriminator tag.
28#[derive(Debug, Clone, Serialize, Deserialize)]
29struct StoredMessage {
30    /// "json" or "text"
31    payload_type: String,
32    /// The raw payload content (JSON value for json type, string for text type)
33    payload_data: serde_json::Value,
34    enqueued_at: DateTime<Utc>,
35}
36
37impl StoredMessage {
38    fn from_payload(payload: MessagePayload) -> Self {
39        let (payload_type, payload_data) = match payload {
40            MessagePayload::Json(v) => ("json".to_string(), v),
41            MessagePayload::Text(s) => ("text".to_string(), serde_json::Value::String(s)),
42        };
43        Self {
44            payload_type,
45            payload_data,
46            enqueued_at: Utc::now(),
47        }
48    }
49
50    fn into_payload(self) -> MessagePayload {
51        match self.payload_type.as_str() {
52            "json" => MessagePayload::Json(self.payload_data),
53            _ => match self.payload_data {
54                serde_json::Value::String(s) => MessagePayload::Text(s),
55                other => MessagePayload::Text(other.to_string()),
56            },
57        }
58    }
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
62struct InFlightMessage {
63    /// The sequence key in the messages tree (big-endian u64 bytes)
64    seq_bytes: Vec<u8>,
65    message: StoredMessage,
66    leased_until: DateTime<Utc>,
67}
68
69impl LocalQueue {
70    /// Create a new local queue store with the given data directory.
71    pub async fn new(data_dir: PathBuf) -> Result<Self> {
72        tracing::debug!(data_dir = %data_dir.display(), "Opening LocalQueue database");
73
74        if let Some(parent) = data_dir.parent() {
75            tokio::fs::create_dir_all(parent)
76                .await
77                .into_alien_error()
78                .context(ErrorData::LocalFilesystemError {
79                    path: parent.to_string_lossy().to_string(),
80                    operation: "create_dir_all".to_string(),
81                })?;
82        }
83
84        let db =
85            sled::open(&data_dir)
86                .into_alien_error()
87                .context(ErrorData::BindingSetupFailed {
88                    binding_type: "local queue".to_string(),
89                    reason: format!("Failed to open sled database at: {:?}", data_dir),
90                })?;
91
92        tracing::debug!(data_dir = %data_dir.display(), "LocalQueue database opened successfully");
93
94        Ok(Self {
95            db: Arc::new(Mutex::new(db)),
96        })
97    }
98
99    /// Create a LocalQueue from a LocalQueueBinding.
100    pub async fn from_binding(binding: LocalQueueBinding) -> Result<Self> {
101        let queue_path = binding
102            .queue_path
103            .into_value("queue", "queue_path")
104            .context(ErrorData::BindingConfigInvalid {
105                binding_name: "queue".to_string(),
106                reason: "Failed to resolve queue_path from binding".to_string(),
107            })?;
108
109        Self::new(PathBuf::from(queue_path)).await
110    }
111
112    /// Reclaim expired in-flight messages back to the messages tree.
113    fn reclaim_expired_leases(db: &sled::Db) -> Result<()> {
114        let in_flight_tree = db.open_tree("in_flight").into_alien_error().context(
115            ErrorData::QueueOperationFailed {
116                operation: "open in_flight tree".to_string(),
117                reason: "Failed to open in_flight tree".to_string(),
118            },
119        )?;
120
121        let messages_tree = db.open_tree("messages").into_alien_error().context(
122            ErrorData::QueueOperationFailed {
123                operation: "open messages tree".to_string(),
124                reason: "Failed to open messages tree".to_string(),
125            },
126        )?;
127
128        let now = Utc::now();
129        let mut expired_handles = Vec::new();
130
131        for result in in_flight_tree.iter() {
132            let (handle_bytes, value_bytes) =
133                result
134                    .into_alien_error()
135                    .context(ErrorData::QueueOperationFailed {
136                        operation: "scan in_flight".to_string(),
137                        reason: "Failed to iterate in-flight messages".to_string(),
138                    })?;
139
140            if let Ok(in_flight) = serde_json::from_slice::<InFlightMessage>(&value_bytes) {
141                if now >= in_flight.leased_until {
142                    // Re-enqueue the message with its original sequence key
143                    let stored_bytes = serde_json::to_vec(&in_flight.message)
144                        .into_alien_error()
145                        .context(ErrorData::QueueOperationFailed {
146                            operation: "serialize reclaimed message".to_string(),
147                            reason: "Failed to serialize message".to_string(),
148                        })?;
149
150                    messages_tree
151                        .insert(&in_flight.seq_bytes, stored_bytes)
152                        .into_alien_error()
153                        .context(ErrorData::QueueOperationFailed {
154                            operation: "re-enqueue expired message".to_string(),
155                            reason: "Failed to re-enqueue expired message".to_string(),
156                        })?;
157
158                    expired_handles.push(handle_bytes);
159                }
160            }
161        }
162
163        for handle in expired_handles {
164            let _ = in_flight_tree.remove(&handle);
165        }
166
167        Ok(())
168    }
169
170    fn serialize_message(message: &StoredMessage) -> Result<Vec<u8>> {
171        serde_json::to_vec(message)
172            .into_alien_error()
173            .context(ErrorData::QueueOperationFailed {
174                operation: "serialize message".to_string(),
175                reason: "Failed to serialize message to JSON".to_string(),
176            })
177    }
178
179    fn message_size(payload: &MessagePayload) -> Result<usize> {
180        match payload {
181            MessagePayload::Json(v) => serde_json::to_string(v)
182                .map(|s| s.len())
183                .into_alien_error()
184                .context(ErrorData::QueueOperationFailed {
185                    operation: "measure message size".to_string(),
186                    reason: "Failed to serialize JSON payload".to_string(),
187                }),
188            MessagePayload::Text(s) => Ok(s.len()),
189        }
190    }
191}
192
193impl Binding for LocalQueue {}
194
195#[async_trait]
196impl Queue for LocalQueue {
197    async fn send(&self, _queue: &str, message: MessagePayload) -> Result<()> {
198        let size = Self::message_size(&message)?;
199        if size > MAX_MESSAGE_BYTES {
200            return Err(AlienError::new(ErrorData::BindingSetupFailed {
201                binding_type: "queue.local".to_string(),
202                reason: format!(
203                    "Message size {} bytes exceeds limit of {} bytes",
204                    size, MAX_MESSAGE_BYTES
205                ),
206            }));
207        }
208
209        let stored = StoredMessage::from_payload(message);
210        let serialized = Self::serialize_message(&stored)?;
211
212        let db = self.db.lock().await;
213        let messages_tree = db.open_tree("messages").into_alien_error().context(
214            ErrorData::QueueOperationFailed {
215                operation: "open messages tree".to_string(),
216                reason: "Failed to open messages tree".to_string(),
217            },
218        )?;
219
220        // Use generate_id for monotonically increasing sequence numbers
221        let seq = db
222            .generate_id()
223            .into_alien_error()
224            .context(ErrorData::QueueOperationFailed {
225                operation: "generate sequence".to_string(),
226                reason: "Failed to generate message sequence number".to_string(),
227            })?;
228        let seq_key = seq.to_be_bytes();
229
230        messages_tree
231            .insert(seq_key, serialized)
232            .into_alien_error()
233            .context(ErrorData::QueueOperationFailed {
234                operation: "send".to_string(),
235                reason: "Failed to insert message".to_string(),
236            })?;
237
238        messages_tree
239            .flush_async()
240            .await
241            .into_alien_error()
242            .context(ErrorData::QueueOperationFailed {
243                operation: "flush".to_string(),
244                reason: "Failed to flush message to disk".to_string(),
245            })?;
246
247        Ok(())
248    }
249
250    async fn receive(&self, _queue: &str, max_messages: usize) -> Result<Vec<QueueMessage>> {
251        if max_messages == 0 || max_messages > MAX_BATCH_SIZE {
252            return Err(AlienError::new(ErrorData::BindingSetupFailed {
253                binding_type: "queue.local".to_string(),
254                reason: format!(
255                    "Batch size {} is invalid. Must be between 1 and {}",
256                    max_messages, MAX_BATCH_SIZE
257                ),
258            }));
259        }
260
261        let db = self.db.lock().await;
262
263        // Reclaim expired leases first
264        Self::reclaim_expired_leases(&db)?;
265
266        let messages_tree = db.open_tree("messages").into_alien_error().context(
267            ErrorData::QueueOperationFailed {
268                operation: "open messages tree".to_string(),
269                reason: "Failed to open messages tree".to_string(),
270            },
271        )?;
272
273        let in_flight_tree = db.open_tree("in_flight").into_alien_error().context(
274            ErrorData::QueueOperationFailed {
275                operation: "open in_flight tree".to_string(),
276                reason: "Failed to open in_flight tree".to_string(),
277            },
278        )?;
279
280        let now = Utc::now();
281        let leased_until = now + chrono::Duration::seconds(LEASE_DURATION_SECS);
282        let mut result = Vec::new();
283
284        // Pop messages from the front (lowest sequence number)
285        for item in messages_tree.iter() {
286            if result.len() >= max_messages {
287                break;
288            }
289
290            let (seq_key, value_bytes) =
291                item.into_alien_error()
292                    .context(ErrorData::QueueOperationFailed {
293                        operation: "receive".to_string(),
294                        reason: "Failed to iterate messages".to_string(),
295                    })?;
296
297            let stored: StoredMessage = match serde_json::from_slice(&value_bytes) {
298                Ok(m) => m,
299                Err(_) => continue, // Skip corrupted messages
300            };
301
302            // Generate a receipt handle
303            let receipt_handle = uuid::Uuid::new_v4().to_string();
304
305            // Move to in-flight
306            let in_flight = InFlightMessage {
307                seq_bytes: seq_key.to_vec(),
308                message: stored.clone(),
309                leased_until,
310            };
311            let in_flight_bytes = serde_json::to_vec(&in_flight).into_alien_error().context(
312                ErrorData::QueueOperationFailed {
313                    operation: "serialize in_flight".to_string(),
314                    reason: "Failed to serialize in-flight message".to_string(),
315                },
316            )?;
317
318            in_flight_tree
319                .insert(receipt_handle.as_bytes(), in_flight_bytes)
320                .into_alien_error()
321                .context(ErrorData::QueueOperationFailed {
322                    operation: "move to in_flight".to_string(),
323                    reason: "Failed to move message to in-flight".to_string(),
324                })?;
325
326            // Remove from messages
327            messages_tree.remove(&seq_key).into_alien_error().context(
328                ErrorData::QueueOperationFailed {
329                    operation: "remove from messages".to_string(),
330                    reason: "Failed to remove message from queue".to_string(),
331                },
332            )?;
333
334            result.push(QueueMessage {
335                payload: stored.into_payload(),
336                receipt_handle,
337            });
338        }
339
340        // Flush both trees
341        messages_tree
342            .flush_async()
343            .await
344            .into_alien_error()
345            .context(ErrorData::QueueOperationFailed {
346                operation: "flush".to_string(),
347                reason: "Failed to flush messages tree".to_string(),
348            })?;
349        in_flight_tree
350            .flush_async()
351            .await
352            .into_alien_error()
353            .context(ErrorData::QueueOperationFailed {
354                operation: "flush".to_string(),
355                reason: "Failed to flush in_flight tree".to_string(),
356            })?;
357
358        Ok(result)
359    }
360
361    async fn ack(&self, _queue: &str, receipt_handle: &str) -> Result<()> {
362        let db = self.db.lock().await;
363        let in_flight_tree = db.open_tree("in_flight").into_alien_error().context(
364            ErrorData::QueueOperationFailed {
365                operation: "open in_flight tree".to_string(),
366                reason: "Failed to open in_flight tree".to_string(),
367            },
368        )?;
369
370        // Remove the message (idempotent - missing key is OK)
371        in_flight_tree
372            .remove(receipt_handle.as_bytes())
373            .into_alien_error()
374            .context(ErrorData::QueueOperationFailed {
375                operation: "ack".to_string(),
376                reason: "Failed to acknowledge message".to_string(),
377            })?;
378
379        in_flight_tree
380            .flush_async()
381            .await
382            .into_alien_error()
383            .context(ErrorData::QueueOperationFailed {
384                operation: "flush".to_string(),
385                reason: "Failed to flush acknowledgment".to_string(),
386            })?;
387
388        Ok(())
389    }
390}
391
392#[cfg(test)]
393mod tests {
394    use super::*;
395    use tempfile::TempDir;
396
397    fn payload_text(msg: &QueueMessage) -> String {
398        match &msg.payload {
399            MessagePayload::Text(s) => s.clone(),
400            MessagePayload::Json(v) => v.to_string(),
401        }
402    }
403
404    async fn create_test_queue() -> (LocalQueue, TempDir) {
405        let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
406        let queue = LocalQueue::new(temp_dir.path().join("queue.db"))
407            .await
408            .expect("Failed to create LocalQueue");
409        (queue, temp_dir)
410    }
411
412    #[tokio::test]
413    async fn test_send_and_receive() {
414        let (queue, _temp_dir) = create_test_queue().await;
415
416        queue
417            .send("q", MessagePayload::Text("hello".to_string()))
418            .await
419            .unwrap();
420        queue
421            .send("q", MessagePayload::Text("world".to_string()))
422            .await
423            .unwrap();
424
425        let msgs = queue.receive("q", 10).await.unwrap();
426        assert_eq!(msgs.len(), 2);
427        assert_eq!(payload_text(&msgs[0]), "hello");
428        assert_eq!(payload_text(&msgs[1]), "world");
429    }
430
431    #[tokio::test]
432    async fn test_receive_empty_queue() {
433        let (queue, _temp_dir) = create_test_queue().await;
434
435        let msgs = queue.receive("q", 10).await.unwrap();
436        assert!(msgs.is_empty());
437    }
438
439    #[tokio::test]
440    async fn test_ack_removes_message() {
441        let (queue, _temp_dir) = create_test_queue().await;
442
443        queue
444            .send("q", MessagePayload::Text("msg".to_string()))
445            .await
446            .unwrap();
447
448        let msgs = queue.receive("q", 1).await.unwrap();
449        assert_eq!(msgs.len(), 1);
450
451        // Ack the message
452        queue.ack("q", &msgs[0].receipt_handle).await.unwrap();
453
454        // No messages should be available (acked, not expired)
455        let msgs = queue.receive("q", 10).await.unwrap();
456        assert!(msgs.is_empty());
457    }
458
459    #[tokio::test]
460    async fn test_ack_idempotent() {
461        let (queue, _temp_dir) = create_test_queue().await;
462
463        // Acking a non-existent receipt handle should succeed
464        queue.ack("q", "non-existent-handle").await.unwrap();
465    }
466
467    #[tokio::test]
468    async fn test_receive_respects_max_messages() {
469        let (queue, _temp_dir) = create_test_queue().await;
470
471        for i in 0..5 {
472            queue
473                .send("q", MessagePayload::Text(format!("msg-{}", i)))
474                .await
475                .unwrap();
476        }
477
478        let msgs = queue.receive("q", 2).await.unwrap();
479        assert_eq!(msgs.len(), 2);
480        assert_eq!(payload_text(&msgs[0]), "msg-0");
481        assert_eq!(payload_text(&msgs[1]), "msg-1");
482    }
483
484    #[tokio::test]
485    async fn test_json_payload() {
486        let (queue, _temp_dir) = create_test_queue().await;
487
488        let payload = serde_json::json!({"key": "value", "num": 42});
489        queue
490            .send("q", MessagePayload::Json(payload.clone()))
491            .await
492            .unwrap();
493
494        let msgs = queue.receive("q", 1).await.unwrap();
495        assert_eq!(msgs.len(), 1);
496        match &msgs[0].payload {
497            MessagePayload::Json(v) => assert_eq!(v, &payload),
498            _ => panic!("Expected JSON payload"),
499        }
500    }
501
502    #[tokio::test]
503    async fn test_message_size_validation() {
504        let (queue, _temp_dir) = create_test_queue().await;
505
506        let large = "x".repeat(MAX_MESSAGE_BYTES + 1);
507        let result = queue.send("q", MessagePayload::Text(large)).await;
508        assert!(result.is_err());
509    }
510
511    #[tokio::test]
512    async fn test_batch_size_validation() {
513        let (queue, _temp_dir) = create_test_queue().await;
514
515        assert!(queue.receive("q", 0).await.is_err());
516        assert!(queue.receive("q", MAX_BATCH_SIZE + 1).await.is_err());
517    }
518
519    #[tokio::test]
520    async fn test_persistence_across_reopens() {
521        let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
522        let db_path = temp_dir.path().join("queue.db");
523
524        // Send a message and drop the queue
525        {
526            let queue = LocalQueue::new(db_path.clone()).await.unwrap();
527            queue
528                .send("q", MessagePayload::Text("persistent".to_string()))
529                .await
530                .unwrap();
531        }
532
533        // Reopen and verify message persists
534        {
535            let queue = LocalQueue::new(db_path).await.unwrap();
536            let msgs = queue.receive("q", 1).await.unwrap();
537            assert_eq!(msgs.len(), 1);
538            assert_eq!(payload_text(&msgs[0]), "persistent");
539        }
540    }
541
542    #[tokio::test]
543    async fn test_fifo_ordering() {
544        let (queue, _temp_dir) = create_test_queue().await;
545
546        for i in 0..10 {
547            queue
548                .send("q", MessagePayload::Text(format!("{}", i)))
549                .await
550                .unwrap();
551        }
552
553        let msgs = queue.receive("q", 10).await.unwrap();
554        for (i, msg) in msgs.iter().enumerate() {
555            assert_eq!(payload_text(msg), format!("{}", i));
556        }
557    }
558}