apalis_core/backend/impls/json/
shared.rs

1/// Sharable JSON based backend.
2///
3/// The [`SharedJsonStore`] allows multiple task types to be stored
4/// and processed concurrently using a single JSON-based in-memory backend.
5/// It is useful for testing, prototyping,
6/// or sharing state between workers in a single process.
7///
8/// # Example
9///
10/// ```rust
11/// # use apalis_core::backend::shared::MakeShared;
12/// # use apalis_core::task::Task;
13/// # use apalis_core::worker::context::WorkerContext;
14/// # use apalis_core::worker::builder::WorkerBuilder;
15/// # use apalis_core::backend::json::SharedJsonStore;
16/// # use apalis_core::error::BoxDynError;
17/// # use std::time::Duration;
18/// # use apalis_core::backend::TaskSink;
19///
20/// #[tokio::main]
21/// async fn main() {
22///     let mut store = SharedJsonStore::new();
23///     let mut int_store = store.make_shared().unwrap();
24///     int_store.push(42).await.unwrap();
25///
26///     async fn task(
27///         task: u32,
28///         ctx: WorkerContext,
29///     ) -> Result<(), BoxDynError> {
30///         tokio::time::sleep(Duration::from_millis(2)).await;
31///         ctx.stop()?;
32///         Ok(())
33///     }
34///
35///     let int_worker = WorkerBuilder::new("int-worker")
36///         .backend(int_store)
37///         .build(task)
38///         .run();
39///
40///     int_worker.await.unwrap();
41/// }
42/// ```
43///
44/// See the tests for more advanced usage with multiple types and event listeners.
45use std::{
46    pin::Pin,
47    sync::Arc,
48    task::{Context, Poll},
49};
50
51use futures_channel::mpsc::SendError;
52use futures_core::{Stream, stream::BoxStream};
53use futures_sink::Sink;
54use futures_util::{SinkExt, StreamExt};
55use serde::{Serialize, de::DeserializeOwned};
56use serde_json::Value;
57
58use crate::{
59    backend::impls::{
60        json::{
61            JsonStorage,
62            meta::JsonMapMetadata,
63            util::{FindFirstWith, TaskKey, TaskWithMeta},
64        },
65        memory::{MemorySink, MemoryStorage},
66    },
67    task::{Task, status::Status, task_id::TaskId},
68};
69
70#[derive(Debug)]
71struct SharedJsonStream<T, Ctx> {
72    inner: JsonStorage<Value>,
73    req_type: std::marker::PhantomData<(T, Ctx)>,
74}
75
76impl<Args: DeserializeOwned + Unpin> Stream for SharedJsonStream<Args, JsonMapMetadata> {
77    type Item = Task<Args, JsonMapMetadata>;
78    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
79        use crate::task::builder::TaskBuilder;
80        let map = self.inner.tasks.try_read().expect("Failed to read tasks");
81        if let Some((key, _)) = map.find_first_with(|k, _| {
82            k.queue == std::any::type_name::<Args>() && k.status == Status::Pending
83        }) {
84            let task = map.get(key).unwrap();
85            let args = match Args::deserialize(&task.args) {
86                Ok(value) => value,
87                Err(_) => return Poll::Pending,
88            };
89            let task = TaskBuilder::new(args)
90                .with_task_id(key.task_id.clone())
91                .with_ctx(task.ctx.clone())
92                .build();
93            let key = key.clone();
94            drop(map);
95            let this = &mut self.get_mut().inner;
96            this.update_status(&key, Status::Running)
97                .expect("Failed to update status");
98            this.persist_to_disk().expect("Failed to persist to disk");
99            Poll::Ready(Some(task))
100        } else {
101            Poll::Pending
102        }
103    }
104}
105/// Sharable JSON based backend.
106///
107/// # Features
108///
109/// - Concurrent processing of multiple task types
110/// - In-memory storage with optional disk persistence
111/// - Metadata support for tasks
112#[derive(Debug)]
113pub struct SharedJsonStore {
114    inner: JsonStorage<serde_json::Value>,
115}
116
117impl Default for SharedJsonStore {
118    fn default() -> Self {
119        Self::new()
120    }
121}
122
123impl SharedJsonStore {
124    /// Create a new instance of the shared JSON store.
125    #[must_use]
126    pub fn new() -> Self {
127        Self {
128            inner: JsonStorage::new_temp().unwrap(),
129        }
130    }
131}
132
133impl<Args: Send + Serialize + DeserializeOwned + Unpin + 'static>
134    crate::backend::shared::MakeShared<Args> for SharedJsonStore
135{
136    type Backend = MemoryStorage<Args, JsonMapMetadata>;
137
138    type Config = ();
139
140    type MakeError = String;
141
142    fn make_shared_with_config(
143        &mut self,
144        _: Self::Config,
145    ) -> Result<Self::Backend, Self::MakeError> {
146        let (sender, receiver) = self.inner.create_channel::<Args>();
147        Ok(MemoryStorage {
148            sender: MemorySink {
149                inner: Arc::new(futures_util::lock::Mutex::new(sender)),
150            },
151            receiver,
152        })
153    }
154}
155
156type BoxSink<Args> =
157    Box<dyn Sink<Task<Args, JsonMapMetadata>, Error = SendError> + Send + Sync + Unpin + 'static>;
158
159impl JsonStorage<Value> {
160    fn create_channel<Args: 'static + DeserializeOwned + Serialize + Send + Unpin>(
161        &self,
162    ) -> (
163        BoxSink<Args>,
164        BoxStream<'static, Task<Args, JsonMapMetadata>>,
165    ) {
166        // Create a channel for communication
167        let sender = self.clone();
168
169        // Create a wrapped sender that will insert into the in-memory store
170        let wrapped_sender = {
171            let store = self.clone();
172
173            sender.with_flat_map(move |task: Task<Args, JsonMapMetadata>| {
174                use crate::task::task_id::RandomId;
175                let task_id = task
176                    .parts
177                    .task_id
178                    .clone()
179                    .unwrap_or(TaskId::new(RandomId::default()));
180                let task = task.map(|args| serde_json::to_value(args).unwrap());
181                store
182                    .insert(
183                        &TaskKey {
184                            task_id,
185                            queue: std::any::type_name::<Args>().to_owned(),
186                            status: Status::Pending,
187                        },
188                        TaskWithMeta {
189                            args: task.args.clone(),
190                            ctx: task.parts.ctx.clone(),
191                            result: None,
192                        },
193                    )
194                    .unwrap();
195                futures_util::stream::iter(vec![Ok(task)])
196            })
197        };
198
199        // Create a stream that filters by type T
200        let filtered_stream = {
201            let inner = self.clone();
202            SharedJsonStream {
203                inner,
204                req_type: std::marker::PhantomData,
205            }
206        };
207
208        // Combine the sender and receiver
209        let sender = Box::new(wrapped_sender)
210            as Box<dyn Sink<Task<Args, JsonMapMetadata>, Error = SendError> + Send + Sync + Unpin>;
211        let receiver = filtered_stream.boxed();
212
213        (sender, receiver)
214    }
215}
216#[cfg(test)]
217mod tests {
218    use std::time::Duration;
219
220    use crate::error::BoxDynError;
221
222    use crate::worker::context::WorkerContext;
223    use crate::{
224        backend::{TaskSink, shared::MakeShared},
225        worker::{builder::WorkerBuilder, ext::event_listener::EventListenerExt},
226    };
227
228    use super::*;
229
230    const ITEMS: u32 = 10;
231
232    #[tokio::test]
233    async fn basic_shared() {
234        let mut store = SharedJsonStore::new();
235        let mut string_store = store.make_shared().unwrap();
236        let mut int_store = store.make_shared().unwrap();
237        for i in 0..ITEMS {
238            string_store.push(format!("ITEM: {i}")).await.unwrap();
239            int_store.push(i).await.unwrap();
240        }
241
242        async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
243            tokio::time::sleep(Duration::from_millis(2)).await;
244            if task == ITEMS - 1 {
245                ctx.stop()?;
246                return Err("Worker stopped!")?;
247            }
248            Ok(())
249        }
250
251        let string_worker = WorkerBuilder::new("rango-tango-string")
252            .backend(string_store)
253            .on_event(|ctx, ev| {
254                println!("CTX {:?}, On Event = {ev:?}", ctx.name());
255            })
256            .build(|req: String, ctx: WorkerContext| async move {
257                tokio::time::sleep(Duration::from_millis(2)).await;
258                println!("{req}");
259                if req.ends_with(&(ITEMS - 1).to_string()) {
260                    ctx.stop().unwrap();
261                }
262            })
263            .run();
264
265        let int_worker = WorkerBuilder::new("rango-tango-int")
266            .backend(int_store)
267            .on_event(|ctx, ev| {
268                println!("CTX {:?}, On Event = {ev:?}", ctx.name());
269            })
270            .build(task)
271            .run();
272
273        let _ = futures_util::future::join(int_worker, string_worker).await;
274    }
275}