apalis_core/backend/impls/json/
shared.rs

1/// Sharable JSON based backend.
2///
3/// The [`SharedJsonStore`] allows multiple task types to be stored
4/// and processed concurrently using a single JSON-based in-memory backend.
5/// It is useful for testing, prototyping,
6/// or sharing state between workers in a single process.
7///
8/// # Example
9///
10/// ```rust
11/// # use apalis_core::backend::shared::MakeShared;
12/// # use apalis_core::task::Task;
13/// # use apalis_core::worker::context::WorkerContext;
14/// # use apalis_core::worker::builder::WorkerBuilder;
15/// # use apalis_core::backend::json::SharedJsonStore;
16/// # use apalis_core::error::BoxDynError;
17/// # use std::time::Duration;
18/// # use apalis_core::backend::TaskSink;
19///
20/// #[tokio::main]
21/// async fn main() {
22///     let mut store = SharedJsonStore::new();
23///     let mut int_store = store.make_shared().unwrap();
24///     int_store.push(42).await.unwrap();
25///
26///     async fn task(
27///         task: u32,
28///         ctx: WorkerContext,
29///     ) -> Result<(), BoxDynError> {
30///         tokio::time::sleep(Duration::from_millis(2)).await;
31///         ctx.stop()?;
32///         Ok(())
33///     }
34///
35///     let int_worker = WorkerBuilder::new("int-worker")
36///         .backend(int_store)
37///         .build(task)
38///         .run();
39///
40///     int_worker.await.unwrap();
41/// }
42/// ```
43///
44/// See the tests for more advanced usage with multiple types and event listeners.
45use std::{
46    pin::Pin,
47    sync::Arc,
48    task::{Context, Poll},
49};
50
51use futures_channel::mpsc::SendError;
52use futures_core::{Stream, stream::BoxStream};
53use futures_sink::Sink;
54use futures_util::{SinkExt, StreamExt};
55use serde::{Serialize, de::DeserializeOwned};
56use serde_json::Value;
57
58use crate::{
59    backend::impls::{
60        json::{
61            JsonStorage,
62            meta::JsonMapMetadata,
63            util::{FindFirstWith, TaskKey, TaskWithMeta},
64        },
65        memory::{MemorySink, MemoryStorage},
66    },
67    task::{
68        Task,
69        status::Status,
70        task_id::{RandomId, TaskId},
71    },
72};
73
74#[derive(Debug)]
75struct SharedJsonStream<T, Ctx> {
76    inner: JsonStorage<Value>,
77    req_type: std::marker::PhantomData<(T, Ctx)>,
78}
79
80impl<Args: DeserializeOwned + Unpin> Stream for SharedJsonStream<Args, JsonMapMetadata> {
81    type Item = Task<Args, JsonMapMetadata, RandomId>;
82    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
83        use crate::task::builder::TaskBuilder;
84        let map = self.inner.tasks.try_read().expect("Failed to read tasks");
85        if let Some((key, _)) = map.find_first_with(|k, _| {
86            k.queue == std::any::type_name::<Args>() && k.status == Status::Pending
87        }) {
88            let task = map.get(key).unwrap();
89            let args = match Args::deserialize(&task.args) {
90                Ok(value) => value,
91                Err(_) => return Poll::Pending,
92            };
93            let task = TaskBuilder::new(args)
94                .with_task_id(key.task_id.clone())
95                .with_ctx(task.ctx.clone())
96                .build();
97            let key = key.clone();
98            drop(map);
99            let this = &mut self.get_mut().inner;
100            this.update_status(&key, Status::Running)
101                .expect("Failed to update status");
102            this.persist_to_disk().expect("Failed to persist to disk");
103            Poll::Ready(Some(task))
104        } else {
105            Poll::Pending
106        }
107    }
108}
109/// Sharable JSON based backend.
110///
111/// # Features
112///
113/// - Concurrent processing of multiple task types
114/// - In-memory storage with optional disk persistence
115/// - Metadata support for tasks
116#[derive(Debug)]
117pub struct SharedJsonStore {
118    inner: JsonStorage<serde_json::Value>,
119}
120
121impl Default for SharedJsonStore {
122    fn default() -> Self {
123        Self::new()
124    }
125}
126
127impl SharedJsonStore {
128    /// Create a new instance of the shared JSON store.
129    #[must_use]
130    pub fn new() -> Self {
131        Self {
132            inner: JsonStorage::new_temp().unwrap(),
133        }
134    }
135}
136
137impl<Args: Send + Serialize + DeserializeOwned + Unpin + 'static>
138    crate::backend::shared::MakeShared<Args> for SharedJsonStore
139{
140    type Backend = MemoryStorage<Args, JsonMapMetadata>;
141
142    type Config = ();
143
144    type MakeError = String;
145
146    fn make_shared_with_config(
147        &mut self,
148        _: Self::Config,
149    ) -> Result<Self::Backend, Self::MakeError> {
150        let (sender, receiver) = self.inner.create_channel::<Args>();
151        Ok(MemoryStorage {
152            sender: MemorySink {
153                inner: Arc::new(futures_util::lock::Mutex::new(sender)),
154            },
155            receiver,
156        })
157    }
158}
159
160type BoxSink<Args> = Box<
161    dyn Sink<Task<Args, JsonMapMetadata, RandomId>, Error = SendError>
162        + Send
163        + Sync
164        + Unpin
165        + 'static,
166>;
167
168impl JsonStorage<Value> {
169    fn create_channel<Args: 'static + DeserializeOwned + Serialize + Send + Unpin>(
170        &self,
171    ) -> (
172        BoxSink<Args>,
173        BoxStream<'static, Task<Args, JsonMapMetadata, RandomId>>,
174    ) {
175        // Create a channel for communication
176        let sender = self.clone();
177
178        // Create a wrapped sender that will insert into the in-memory store
179        let wrapped_sender = {
180            let store = self.clone();
181
182            sender.with_flat_map(move |task: Task<Args, JsonMapMetadata, RandomId>| {
183                use crate::task::task_id::RandomId;
184                let task_id = task
185                    .parts
186                    .task_id
187                    .clone()
188                    .unwrap_or(TaskId::new(RandomId::default()));
189                let task = task.map(|args| serde_json::to_value(args).unwrap());
190                store
191                    .insert(
192                        &TaskKey {
193                            task_id,
194                            queue: std::any::type_name::<Args>().to_owned(),
195                            status: Status::Pending,
196                        },
197                        TaskWithMeta {
198                            args: task.args.clone(),
199                            ctx: task.parts.ctx.clone(),
200                            result: None,
201                        },
202                    )
203                    .unwrap();
204                futures_util::stream::iter(vec![Ok(task)])
205            })
206        };
207
208        // Create a stream that filters by type T
209        let filtered_stream = {
210            let inner = self.clone();
211            SharedJsonStream {
212                inner,
213                req_type: std::marker::PhantomData,
214            }
215        };
216
217        // Combine the sender and receiver
218        let sender = Box::new(wrapped_sender)
219            as Box<
220                dyn Sink<Task<Args, JsonMapMetadata, RandomId>, Error = SendError>
221                    + Send
222                    + Sync
223                    + Unpin,
224            >;
225        let receiver = filtered_stream.boxed();
226
227        (sender, receiver)
228    }
229}
230#[cfg(test)]
231mod tests {
232    use std::time::Duration;
233
234    use crate::error::BoxDynError;
235
236    use crate::worker::context::WorkerContext;
237    use crate::{
238        backend::{TaskSink, shared::MakeShared},
239        worker::{builder::WorkerBuilder, ext::event_listener::EventListenerExt},
240    };
241
242    use super::*;
243
244    const ITEMS: u32 = 10;
245
246    #[tokio::test]
247    async fn basic_shared() {
248        let mut store = SharedJsonStore::new();
249        let mut string_store = store.make_shared().unwrap();
250        let mut int_store = store.make_shared().unwrap();
251        for i in 0..ITEMS {
252            string_store.push(format!("ITEM: {i}")).await.unwrap();
253            int_store.push(i).await.unwrap();
254        }
255
256        async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
257            tokio::time::sleep(Duration::from_millis(2)).await;
258            if task == ITEMS - 1 {
259                ctx.stop()?;
260                return Err("Worker stopped!")?;
261            }
262            Ok(())
263        }
264
265        let string_worker = WorkerBuilder::new("rango-tango-string")
266            .backend(string_store)
267            .on_event(|ctx, ev| {
268                println!("CTX {:?}, On Event = {ev:?}", ctx.name());
269            })
270            .build(|req: String, ctx: WorkerContext| async move {
271                tokio::time::sleep(Duration::from_millis(2)).await;
272                println!("{req}");
273                if req.ends_with(&(ITEMS - 1).to_string()) {
274                    ctx.stop().unwrap();
275                }
276            })
277            .run();
278
279        let int_worker = WorkerBuilder::new("rango-tango-int")
280            .backend(int_store)
281            .on_event(|ctx, ev| {
282                println!("CTX {:?}, On Event = {ev:?}", ctx.name());
283            })
284            .build(task)
285            .run();
286
287        let _ = futures_util::future::join(int_worker, string_worker).await;
288    }
289}