1use crate::error::{ErrorData, Result};
2use crate::traits::{
3 Binding, MessagePayload, Queue, QueueMessage, MAX_BATCH_SIZE, MAX_MESSAGE_BYTES,
4};
5use alien_core::bindings::LocalQueueBinding;
6use alien_error::{AlienError, Context, IntoAlienError};
7use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10use std::path::PathBuf;
11use std::sync::Arc;
12use tokio::sync::Mutex;
13
14const LEASE_DURATION_SECS: i64 = 30;
15
16#[derive(Debug)]
22pub struct LocalQueue {
23 db: Arc<Mutex<sled::Db>>,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
29struct StoredMessage {
30 payload_type: String,
32 payload_data: serde_json::Value,
34 enqueued_at: DateTime<Utc>,
35}
36
37impl StoredMessage {
38 fn from_payload(payload: MessagePayload) -> Self {
39 let (payload_type, payload_data) = match payload {
40 MessagePayload::Json(v) => ("json".to_string(), v),
41 MessagePayload::Text(s) => ("text".to_string(), serde_json::Value::String(s)),
42 };
43 Self {
44 payload_type,
45 payload_data,
46 enqueued_at: Utc::now(),
47 }
48 }
49
50 fn into_payload(self) -> MessagePayload {
51 match self.payload_type.as_str() {
52 "json" => MessagePayload::Json(self.payload_data),
53 _ => match self.payload_data {
54 serde_json::Value::String(s) => MessagePayload::Text(s),
55 other => MessagePayload::Text(other.to_string()),
56 },
57 }
58 }
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
62struct InFlightMessage {
63 seq_bytes: Vec<u8>,
65 message: StoredMessage,
66 leased_until: DateTime<Utc>,
67}
68
69impl LocalQueue {
70 pub async fn new(data_dir: PathBuf) -> Result<Self> {
72 tracing::debug!(data_dir = %data_dir.display(), "Opening LocalQueue database");
73
74 if let Some(parent) = data_dir.parent() {
75 tokio::fs::create_dir_all(parent)
76 .await
77 .into_alien_error()
78 .context(ErrorData::LocalFilesystemError {
79 path: parent.to_string_lossy().to_string(),
80 operation: "create_dir_all".to_string(),
81 })?;
82 }
83
84 let db =
85 sled::open(&data_dir)
86 .into_alien_error()
87 .context(ErrorData::BindingSetupFailed {
88 binding_type: "local queue".to_string(),
89 reason: format!("Failed to open sled database at: {:?}", data_dir),
90 })?;
91
92 tracing::debug!(data_dir = %data_dir.display(), "LocalQueue database opened successfully");
93
94 Ok(Self {
95 db: Arc::new(Mutex::new(db)),
96 })
97 }
98
99 pub async fn from_binding(binding: LocalQueueBinding) -> Result<Self> {
101 let queue_path = binding
102 .queue_path
103 .into_value("queue", "queue_path")
104 .context(ErrorData::BindingConfigInvalid {
105 binding_name: "queue".to_string(),
106 reason: "Failed to resolve queue_path from binding".to_string(),
107 })?;
108
109 Self::new(PathBuf::from(queue_path)).await
110 }
111
112 fn reclaim_expired_leases(db: &sled::Db) -> Result<()> {
114 let in_flight_tree = db.open_tree("in_flight").into_alien_error().context(
115 ErrorData::QueueOperationFailed {
116 operation: "open in_flight tree".to_string(),
117 reason: "Failed to open in_flight tree".to_string(),
118 },
119 )?;
120
121 let messages_tree = db.open_tree("messages").into_alien_error().context(
122 ErrorData::QueueOperationFailed {
123 operation: "open messages tree".to_string(),
124 reason: "Failed to open messages tree".to_string(),
125 },
126 )?;
127
128 let now = Utc::now();
129 let mut expired_handles = Vec::new();
130
131 for result in in_flight_tree.iter() {
132 let (handle_bytes, value_bytes) =
133 result
134 .into_alien_error()
135 .context(ErrorData::QueueOperationFailed {
136 operation: "scan in_flight".to_string(),
137 reason: "Failed to iterate in-flight messages".to_string(),
138 })?;
139
140 if let Ok(in_flight) = serde_json::from_slice::<InFlightMessage>(&value_bytes) {
141 if now >= in_flight.leased_until {
142 let stored_bytes = serde_json::to_vec(&in_flight.message)
144 .into_alien_error()
145 .context(ErrorData::QueueOperationFailed {
146 operation: "serialize reclaimed message".to_string(),
147 reason: "Failed to serialize message".to_string(),
148 })?;
149
150 messages_tree
151 .insert(&in_flight.seq_bytes, stored_bytes)
152 .into_alien_error()
153 .context(ErrorData::QueueOperationFailed {
154 operation: "re-enqueue expired message".to_string(),
155 reason: "Failed to re-enqueue expired message".to_string(),
156 })?;
157
158 expired_handles.push(handle_bytes);
159 }
160 }
161 }
162
163 for handle in expired_handles {
164 let _ = in_flight_tree.remove(&handle);
165 }
166
167 Ok(())
168 }
169
170 fn serialize_message(message: &StoredMessage) -> Result<Vec<u8>> {
171 serde_json::to_vec(message)
172 .into_alien_error()
173 .context(ErrorData::QueueOperationFailed {
174 operation: "serialize message".to_string(),
175 reason: "Failed to serialize message to JSON".to_string(),
176 })
177 }
178
179 fn message_size(payload: &MessagePayload) -> Result<usize> {
180 match payload {
181 MessagePayload::Json(v) => serde_json::to_string(v)
182 .map(|s| s.len())
183 .into_alien_error()
184 .context(ErrorData::QueueOperationFailed {
185 operation: "measure message size".to_string(),
186 reason: "Failed to serialize JSON payload".to_string(),
187 }),
188 MessagePayload::Text(s) => Ok(s.len()),
189 }
190 }
191}
192
193impl Binding for LocalQueue {}
194
195#[async_trait]
196impl Queue for LocalQueue {
197 async fn send(&self, _queue: &str, message: MessagePayload) -> Result<()> {
198 let size = Self::message_size(&message)?;
199 if size > MAX_MESSAGE_BYTES {
200 return Err(AlienError::new(ErrorData::BindingSetupFailed {
201 binding_type: "queue.local".to_string(),
202 reason: format!(
203 "Message size {} bytes exceeds limit of {} bytes",
204 size, MAX_MESSAGE_BYTES
205 ),
206 }));
207 }
208
209 let stored = StoredMessage::from_payload(message);
210 let serialized = Self::serialize_message(&stored)?;
211
212 let db = self.db.lock().await;
213 let messages_tree = db.open_tree("messages").into_alien_error().context(
214 ErrorData::QueueOperationFailed {
215 operation: "open messages tree".to_string(),
216 reason: "Failed to open messages tree".to_string(),
217 },
218 )?;
219
220 let seq = db
222 .generate_id()
223 .into_alien_error()
224 .context(ErrorData::QueueOperationFailed {
225 operation: "generate sequence".to_string(),
226 reason: "Failed to generate message sequence number".to_string(),
227 })?;
228 let seq_key = seq.to_be_bytes();
229
230 messages_tree
231 .insert(seq_key, serialized)
232 .into_alien_error()
233 .context(ErrorData::QueueOperationFailed {
234 operation: "send".to_string(),
235 reason: "Failed to insert message".to_string(),
236 })?;
237
238 messages_tree
239 .flush_async()
240 .await
241 .into_alien_error()
242 .context(ErrorData::QueueOperationFailed {
243 operation: "flush".to_string(),
244 reason: "Failed to flush message to disk".to_string(),
245 })?;
246
247 Ok(())
248 }
249
250 async fn receive(&self, _queue: &str, max_messages: usize) -> Result<Vec<QueueMessage>> {
251 if max_messages == 0 || max_messages > MAX_BATCH_SIZE {
252 return Err(AlienError::new(ErrorData::BindingSetupFailed {
253 binding_type: "queue.local".to_string(),
254 reason: format!(
255 "Batch size {} is invalid. Must be between 1 and {}",
256 max_messages, MAX_BATCH_SIZE
257 ),
258 }));
259 }
260
261 let db = self.db.lock().await;
262
263 Self::reclaim_expired_leases(&db)?;
265
266 let messages_tree = db.open_tree("messages").into_alien_error().context(
267 ErrorData::QueueOperationFailed {
268 operation: "open messages tree".to_string(),
269 reason: "Failed to open messages tree".to_string(),
270 },
271 )?;
272
273 let in_flight_tree = db.open_tree("in_flight").into_alien_error().context(
274 ErrorData::QueueOperationFailed {
275 operation: "open in_flight tree".to_string(),
276 reason: "Failed to open in_flight tree".to_string(),
277 },
278 )?;
279
280 let now = Utc::now();
281 let leased_until = now + chrono::Duration::seconds(LEASE_DURATION_SECS);
282 let mut result = Vec::new();
283
284 for item in messages_tree.iter() {
286 if result.len() >= max_messages {
287 break;
288 }
289
290 let (seq_key, value_bytes) =
291 item.into_alien_error()
292 .context(ErrorData::QueueOperationFailed {
293 operation: "receive".to_string(),
294 reason: "Failed to iterate messages".to_string(),
295 })?;
296
297 let stored: StoredMessage = match serde_json::from_slice(&value_bytes) {
298 Ok(m) => m,
299 Err(_) => continue, };
301
302 let receipt_handle = uuid::Uuid::new_v4().to_string();
304
305 let in_flight = InFlightMessage {
307 seq_bytes: seq_key.to_vec(),
308 message: stored.clone(),
309 leased_until,
310 };
311 let in_flight_bytes = serde_json::to_vec(&in_flight).into_alien_error().context(
312 ErrorData::QueueOperationFailed {
313 operation: "serialize in_flight".to_string(),
314 reason: "Failed to serialize in-flight message".to_string(),
315 },
316 )?;
317
318 in_flight_tree
319 .insert(receipt_handle.as_bytes(), in_flight_bytes)
320 .into_alien_error()
321 .context(ErrorData::QueueOperationFailed {
322 operation: "move to in_flight".to_string(),
323 reason: "Failed to move message to in-flight".to_string(),
324 })?;
325
326 messages_tree.remove(&seq_key).into_alien_error().context(
328 ErrorData::QueueOperationFailed {
329 operation: "remove from messages".to_string(),
330 reason: "Failed to remove message from queue".to_string(),
331 },
332 )?;
333
334 result.push(QueueMessage {
335 payload: stored.into_payload(),
336 receipt_handle,
337 });
338 }
339
340 messages_tree
342 .flush_async()
343 .await
344 .into_alien_error()
345 .context(ErrorData::QueueOperationFailed {
346 operation: "flush".to_string(),
347 reason: "Failed to flush messages tree".to_string(),
348 })?;
349 in_flight_tree
350 .flush_async()
351 .await
352 .into_alien_error()
353 .context(ErrorData::QueueOperationFailed {
354 operation: "flush".to_string(),
355 reason: "Failed to flush in_flight tree".to_string(),
356 })?;
357
358 Ok(result)
359 }
360
361 async fn ack(&self, _queue: &str, receipt_handle: &str) -> Result<()> {
362 let db = self.db.lock().await;
363 let in_flight_tree = db.open_tree("in_flight").into_alien_error().context(
364 ErrorData::QueueOperationFailed {
365 operation: "open in_flight tree".to_string(),
366 reason: "Failed to open in_flight tree".to_string(),
367 },
368 )?;
369
370 in_flight_tree
372 .remove(receipt_handle.as_bytes())
373 .into_alien_error()
374 .context(ErrorData::QueueOperationFailed {
375 operation: "ack".to_string(),
376 reason: "Failed to acknowledge message".to_string(),
377 })?;
378
379 in_flight_tree
380 .flush_async()
381 .await
382 .into_alien_error()
383 .context(ErrorData::QueueOperationFailed {
384 operation: "flush".to_string(),
385 reason: "Failed to flush acknowledgment".to_string(),
386 })?;
387
388 Ok(())
389 }
390}
391
392#[cfg(test)]
393mod tests {
394 use super::*;
395 use tempfile::TempDir;
396
397 fn payload_text(msg: &QueueMessage) -> String {
398 match &msg.payload {
399 MessagePayload::Text(s) => s.clone(),
400 MessagePayload::Json(v) => v.to_string(),
401 }
402 }
403
404 async fn create_test_queue() -> (LocalQueue, TempDir) {
405 let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
406 let queue = LocalQueue::new(temp_dir.path().join("queue.db"))
407 .await
408 .expect("Failed to create LocalQueue");
409 (queue, temp_dir)
410 }
411
412 #[tokio::test]
413 async fn test_send_and_receive() {
414 let (queue, _temp_dir) = create_test_queue().await;
415
416 queue
417 .send("q", MessagePayload::Text("hello".to_string()))
418 .await
419 .unwrap();
420 queue
421 .send("q", MessagePayload::Text("world".to_string()))
422 .await
423 .unwrap();
424
425 let msgs = queue.receive("q", 10).await.unwrap();
426 assert_eq!(msgs.len(), 2);
427 assert_eq!(payload_text(&msgs[0]), "hello");
428 assert_eq!(payload_text(&msgs[1]), "world");
429 }
430
431 #[tokio::test]
432 async fn test_receive_empty_queue() {
433 let (queue, _temp_dir) = create_test_queue().await;
434
435 let msgs = queue.receive("q", 10).await.unwrap();
436 assert!(msgs.is_empty());
437 }
438
439 #[tokio::test]
440 async fn test_ack_removes_message() {
441 let (queue, _temp_dir) = create_test_queue().await;
442
443 queue
444 .send("q", MessagePayload::Text("msg".to_string()))
445 .await
446 .unwrap();
447
448 let msgs = queue.receive("q", 1).await.unwrap();
449 assert_eq!(msgs.len(), 1);
450
451 queue.ack("q", &msgs[0].receipt_handle).await.unwrap();
453
454 let msgs = queue.receive("q", 10).await.unwrap();
456 assert!(msgs.is_empty());
457 }
458
459 #[tokio::test]
460 async fn test_ack_idempotent() {
461 let (queue, _temp_dir) = create_test_queue().await;
462
463 queue.ack("q", "non-existent-handle").await.unwrap();
465 }
466
467 #[tokio::test]
468 async fn test_receive_respects_max_messages() {
469 let (queue, _temp_dir) = create_test_queue().await;
470
471 for i in 0..5 {
472 queue
473 .send("q", MessagePayload::Text(format!("msg-{}", i)))
474 .await
475 .unwrap();
476 }
477
478 let msgs = queue.receive("q", 2).await.unwrap();
479 assert_eq!(msgs.len(), 2);
480 assert_eq!(payload_text(&msgs[0]), "msg-0");
481 assert_eq!(payload_text(&msgs[1]), "msg-1");
482 }
483
484 #[tokio::test]
485 async fn test_json_payload() {
486 let (queue, _temp_dir) = create_test_queue().await;
487
488 let payload = serde_json::json!({"key": "value", "num": 42});
489 queue
490 .send("q", MessagePayload::Json(payload.clone()))
491 .await
492 .unwrap();
493
494 let msgs = queue.receive("q", 1).await.unwrap();
495 assert_eq!(msgs.len(), 1);
496 match &msgs[0].payload {
497 MessagePayload::Json(v) => assert_eq!(v, &payload),
498 _ => panic!("Expected JSON payload"),
499 }
500 }
501
502 #[tokio::test]
503 async fn test_message_size_validation() {
504 let (queue, _temp_dir) = create_test_queue().await;
505
506 let large = "x".repeat(MAX_MESSAGE_BYTES + 1);
507 let result = queue.send("q", MessagePayload::Text(large)).await;
508 assert!(result.is_err());
509 }
510
511 #[tokio::test]
512 async fn test_batch_size_validation() {
513 let (queue, _temp_dir) = create_test_queue().await;
514
515 assert!(queue.receive("q", 0).await.is_err());
516 assert!(queue.receive("q", MAX_BATCH_SIZE + 1).await.is_err());
517 }
518
519 #[tokio::test]
520 async fn test_persistence_across_reopens() {
521 let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
522 let db_path = temp_dir.path().join("queue.db");
523
524 {
526 let queue = LocalQueue::new(db_path.clone()).await.unwrap();
527 queue
528 .send("q", MessagePayload::Text("persistent".to_string()))
529 .await
530 .unwrap();
531 }
532
533 {
535 let queue = LocalQueue::new(db_path).await.unwrap();
536 let msgs = queue.receive("q", 1).await.unwrap();
537 assert_eq!(msgs.len(), 1);
538 assert_eq!(payload_text(&msgs[0]), "persistent");
539 }
540 }
541
542 #[tokio::test]
543 async fn test_fifo_ordering() {
544 let (queue, _temp_dir) = create_test_queue().await;
545
546 for i in 0..10 {
547 queue
548 .send("q", MessagePayload::Text(format!("{}", i)))
549 .await
550 .unwrap();
551 }
552
553 let msgs = queue.receive("q", 10).await.unwrap();
554 for (i, msg) in msgs.iter().enumerate() {
555 assert_eq!(payload_text(msg), format!("{}", i));
556 }
557 }
558}