apalis_file_storage/
shared.rs

1/// Sharable JSON based backend.
2///
3/// The [`SharedJsonStore`] allows multiple task types to be stored
4/// and processed concurrently using a single JSON-based in-memory backend.
5/// It is useful for testing, prototyping,
6/// or sharing state between workers in a single process.
7///
8/// # Example
9///
10/// ```rust,no_run
11/// # use apalis_core::backend::shared::MakeShared;
12/// # use apalis_core::task::Task;
13/// # use apalis_core::worker::context::WorkerContext;
14/// # use apalis_core::worker::builder::WorkerBuilder;
15/// # use apalis_file_storage::SharedJsonStore;
16/// # use apalis_core::error::BoxDynError;
17/// # use std::time::Duration;
18/// # use apalis_core::backend::TaskSink;
19///
20/// #[tokio::main]
21/// async fn main() {
22///     let mut store = SharedJsonStore::new();
23///     let mut int_store = store.make_shared().unwrap();
24///     int_store.push(42).await.unwrap();
25///
26///     async fn task(
27///         task: u32,
28///         ctx: WorkerContext,
29///     ) -> Result<(), BoxDynError> {
30///         tokio::time::sleep(Duration::from_millis(2)).await;
31///         ctx.stop()?;
32///         Ok(())
33///     }
34///
35///     let int_worker = WorkerBuilder::new("int-worker")
36///         .backend(int_store)
37///         .build(task)
38///         .run();
39///
40///     int_worker.await.unwrap();
41/// }
42/// ```
43///
44/// See the tests for more advanced usage with multiple types and event listeners.
45use std::{
46    pin::Pin,
47    sync::Arc,
48    task::{Context, Poll},
49};
50
51use futures_channel::mpsc::SendError;
52use futures_core::{Stream, stream::BoxStream};
53use futures_sink::Sink;
54use futures_util::{SinkExt, StreamExt};
55use serde::{Deserialize, Serialize, de::DeserializeOwned};
56use serde_json::Value;
57
58use apalis_core::{
59    backend::memory::{MemorySink, MemoryStorage},
60    task::{
61        Task,
62        status::Status,
63        task_id::{RandomId, TaskId},
64    },
65};
66
67use crate::{
68    JsonMapMetadata, JsonStorage,
69    util::{FindFirstWith, TaskKey, TaskWithMeta},
70};
71
72#[derive(Debug)]
73struct SharedJsonStream<T, Ctx> {
74    inner: JsonStorage<Value>,
75    req_type: std::marker::PhantomData<(T, Ctx)>,
76}
77
78impl<Args: DeserializeOwned + Unpin> Stream for SharedJsonStream<Args, JsonMapMetadata> {
79    type Item = Task<Args, JsonMapMetadata, RandomId>;
80    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
81        use apalis_core::task::builder::TaskBuilder;
82        let map = self.inner.tasks.try_read().expect("Failed to read tasks");
83        if let Some((key, _)) = map.find_first_with(|k, _| {
84            k.queue == std::any::type_name::<Args>() && k.status == Status::Pending
85        }) {
86            let task = map.get(key).unwrap();
87            let Ok(args) = Args::deserialize(&task.args) else {
88                return Poll::Pending;
89            };
90            let task = TaskBuilder::new(args)
91                .with_task_id(key.task_id.clone())
92                .with_ctx(task.ctx.clone())
93                .build();
94            let key = key.clone();
95            drop(map);
96            let this = &mut self.get_mut().inner;
97            this.update_status(&key, Status::Running)
98                .expect("Failed to update status");
99            this.persist_to_disk().expect("Failed to persist to disk");
100            Poll::Ready(Some(task))
101        } else {
102            Poll::Pending
103        }
104    }
105}
106/// Sharable JSON based backend.
107///
108/// # Features
109///
110/// - Concurrent processing of multiple task types
111/// - In-memory storage with optional disk persistence
112/// - Metadata support for tasks
113#[derive(Debug)]
114pub struct SharedJsonStore {
115    inner: JsonStorage<serde_json::Value>,
116}
117
118impl Default for SharedJsonStore {
119    fn default() -> Self {
120        Self::new()
121    }
122}
123
124impl SharedJsonStore {
125    /// Create a new instance of the shared JSON store.
126    #[must_use]
127    pub fn new() -> Self {
128        Self {
129            inner: JsonStorage::new_temp().unwrap(),
130        }
131    }
132}
133
134impl<Args: Send + Serialize + for<'de> Deserialize<'de> + Unpin + 'static>
135    apalis_core::backend::shared::MakeShared<Args> for SharedJsonStore
136{
137    type Backend = MemoryStorage<Args, JsonMapMetadata>;
138
139    type Config = ();
140
141    type MakeError = String;
142
143    fn make_shared_with_config(
144        &mut self,
145        _: Self::Config,
146    ) -> Result<Self::Backend, Self::MakeError> {
147        let (sender, receiver) = self.inner.create_channel::<Args>();
148        let sender = MemorySink::new(Arc::new(futures_util::lock::Mutex::new(sender)));
149        Ok(MemoryStorage::new_with(sender, receiver))
150    }
151}
152
153type BoxSink<Args> = Box<
154    dyn Sink<Task<Args, JsonMapMetadata, RandomId>, Error = SendError>
155        + Send
156        + Sync
157        + Unpin
158        + 'static,
159>;
160
161impl JsonStorage<Value> {
162    fn create_channel<Args: 'static + for<'de> Deserialize<'de> + Serialize + Send + Unpin>(
163        &self,
164    ) -> (
165        BoxSink<Args>,
166        BoxStream<'static, Task<Args, JsonMapMetadata, RandomId>>,
167    ) {
168        // Create a channel for communication
169        let sender = self.clone();
170
171        // Create a wrapped sender that will insert into the in-memory store
172        let wrapped_sender = {
173            let store = self.clone();
174
175            sender.with_flat_map(move |task: Task<Args, JsonMapMetadata, RandomId>| {
176                use apalis_core::task::task_id::RandomId;
177                let task_id = task
178                    .parts
179                    .task_id
180                    .clone()
181                    .unwrap_or(TaskId::new(RandomId::default()));
182                let task = task.map(|args| serde_json::to_value(args).unwrap());
183                store
184                    .insert(
185                        &TaskKey {
186                            task_id,
187                            queue: std::any::type_name::<Args>().to_owned(),
188                            status: Status::Pending,
189                        },
190                        TaskWithMeta {
191                            args: task.args.clone(),
192                            ctx: task.parts.ctx.clone(),
193                            result: None,
194                        },
195                    )
196                    .unwrap();
197                futures_util::stream::iter(vec![Ok(task)])
198            })
199        };
200
201        // Create a stream that filters by type T
202        let filtered_stream = {
203            let inner = self.clone();
204            SharedJsonStream {
205                inner,
206                req_type: std::marker::PhantomData,
207            }
208        };
209
210        // Combine the sender and receiver
211        let sender = Box::new(wrapped_sender)
212            as Box<
213                dyn Sink<Task<Args, JsonMapMetadata, RandomId>, Error = SendError>
214                    + Send
215                    + Sync
216                    + Unpin,
217            >;
218        let receiver = filtered_stream.boxed();
219
220        (sender, receiver)
221    }
222}
223#[cfg(test)]
224mod tests {
225    use std::time::Duration;
226
227    use apalis_core::error::BoxDynError;
228
229    use apalis_core::worker::context::WorkerContext;
230    use apalis_core::{
231        backend::{TaskSink, shared::MakeShared},
232        worker::{builder::WorkerBuilder, ext::event_listener::EventListenerExt},
233    };
234
235    use super::*;
236
237    const ITEMS: u32 = 10;
238
239    #[tokio::test]
240    async fn basic_shared() {
241        let mut store = SharedJsonStore::new();
242        let mut string_store = store.make_shared().unwrap();
243        let mut int_store = store.make_shared().unwrap();
244        for i in 0..ITEMS {
245            string_store.push(format!("ITEM: {i}")).await.unwrap();
246            int_store.push(i).await.unwrap();
247        }
248
249        async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
250            tokio::time::sleep(Duration::from_millis(2)).await;
251            if task == ITEMS - 1 {
252                ctx.stop()?;
253                return Err("Worker stopped!")?;
254            }
255            Ok(())
256        }
257
258        let string_worker = WorkerBuilder::new("rango-tango-string")
259            .backend(string_store)
260            .on_event(|ctx, ev| {
261                println!("CTX {:?}, On Event = {ev:?}", ctx.name());
262            })
263            .build(|req: String, ctx: WorkerContext| async move {
264                tokio::time::sleep(Duration::from_millis(2)).await;
265                println!("{req}");
266                if req.ends_with(&(ITEMS - 1).to_string()) {
267                    ctx.stop().unwrap();
268                }
269            })
270            .run();
271
272        let int_worker = WorkerBuilder::new("rango-tango-int")
273            .backend(int_store)
274            .on_event(|ctx, ev| {
275                println!("CTX {:?}, On Event = {ev:?}", ctx.name());
276            })
277            .build(task)
278            .run();
279
280        let _ = futures_util::future::join(int_worker, string_worker).await;
281    }
282}