Skip to main content

celerix_core/
queue.rs

1use std::future::Future;
2use std::pin::Pin;
3use worker::MessageBatch;
4
5use crate::job::{Job, JobExt};
6
7/// A single message from a Cloudflare Queue.
8pub type QueueMessage = worker::Message<String>;
9
10/// Trait for queue message handlers.
11///
12/// Implemented automatically for async functions with the signature:
13/// `async fn(MessageBatch<String>) -> worker::Result<()>`
14pub trait QueueHandler: Send + Sync + 'static {
15    fn handle(
16        &self,
17        batch: MessageBatch<String>,
18    ) -> Pin<Box<dyn Future<Output = worker::Result<()>> + Send + 'static>>;
19}
20
21/// Blanket implementation for async functions.
22impl<F, Fut> QueueHandler for F
23where
24    F: Fn(MessageBatch<String>) -> Fut + Send + Sync + 'static,
25    Fut: Future<Output = worker::Result<()>> + Send + 'static,
26{
27    fn handle(
28        &self,
29        batch: MessageBatch<String>,
30    ) -> Pin<Box<dyn Future<Output = worker::Result<()>> + Send + 'static>> {
31        Box::pin((self)(batch))
32    }
33}
34
35// ── Queue wrapper ───────────────────────────────────────────────────
36
37/// A celerix Queue handle.
38///
39/// Wraps `worker::Queue` and provides a clean `dispatch()` method for jobs.
40/// Injected automatically as an axum `Extension<Queue>` by `#[celerix::main]`.
41#[derive(Clone)]
42pub struct Queue {
43    env: worker::Env,
44}
45
46impl Queue {
47    /// Create a new Queue handle from a worker Env.
48    pub fn new(env: worker::Env) -> Self {
49        Self { env }
50    }
51
52    /// Dispatch a job to its queue.
53    ///
54    /// The queue binding is resolved automatically from the job's
55    /// `queue_binding()` (prefixed with the project name).
56    ///
57    /// Returns a `Send` future so no `#[celerix::send]` is needed on handlers.
58    pub fn dispatch<'a, T: Job + serde::Serialize + 'a>(
59        &'a self,
60        job: T,
61    ) -> crate::send::SendFuture<impl std::future::Future<Output = crate::Result<()>> + 'a> {
62        crate::send::SendFuture::new(async move { job.dispatch(&self.env).await })
63    }
64
65    /// Get a raw `worker::Queue` handle by binding name.
66    ///
67    /// The binding name is auto-prefixed with the project name.
68    pub fn raw(&self, binding: &str) -> crate::Result<worker::Queue> {
69        let resolved = crate::resolve_binding(binding);
70        self.env.queue(&resolved).map_err(crate::Error)
71    }
72
73    /// Get the underlying worker Env.
74    pub fn env(&self) -> &worker::Env {
75        &self.env
76    }
77}