apalis_file_storage/
shared.rs1use std::{
46 pin::Pin,
47 sync::Arc,
48 task::{Context, Poll},
49};
50
51use futures_channel::mpsc::SendError;
52use futures_core::{Stream, stream::BoxStream};
53use futures_sink::Sink;
54use futures_util::{SinkExt, StreamExt};
55use serde::{Deserialize, Serialize, de::DeserializeOwned};
56use serde_json::Value;
57
58use apalis_core::{
59 backend::memory::{MemorySink, MemoryStorage},
60 task::{
61 Task,
62 status::Status,
63 task_id::{RandomId, TaskId},
64 },
65};
66
67use crate::{
68 JsonMapMetadata, JsonStorage,
69 util::{FindFirstWith, TaskKey, TaskWithMeta},
70};
71
72#[derive(Debug)]
73struct SharedJsonStream<T, Ctx> {
74 inner: JsonStorage<Value>,
75 req_type: std::marker::PhantomData<(T, Ctx)>,
76}
77
78impl<Args: DeserializeOwned + Unpin> Stream for SharedJsonStream<Args, JsonMapMetadata> {
79 type Item = Task<Args, JsonMapMetadata, RandomId>;
80 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
81 use apalis_core::task::builder::TaskBuilder;
82 let map = self.inner.tasks.try_read().expect("Failed to read tasks");
83 if let Some((key, _)) = map.find_first_with(|k, _| {
84 k.queue == std::any::type_name::<Args>() && k.status == Status::Pending
85 }) {
86 let task = map.get(key).unwrap();
87 let Ok(args) = Args::deserialize(&task.args) else {
88 return Poll::Pending;
89 };
90 let task = TaskBuilder::new(args)
91 .with_task_id(key.task_id.clone())
92 .with_ctx(task.ctx.clone())
93 .build();
94 let key = key.clone();
95 drop(map);
96 let this = &mut self.get_mut().inner;
97 this.update_status(&key, Status::Running)
98 .expect("Failed to update status");
99 this.persist_to_disk().expect("Failed to persist to disk");
100 Poll::Ready(Some(task))
101 } else {
102 Poll::Pending
103 }
104 }
105}
106#[derive(Debug)]
114pub struct SharedJsonStore {
115 inner: JsonStorage<serde_json::Value>,
116}
117
118impl Default for SharedJsonStore {
119 fn default() -> Self {
120 Self::new()
121 }
122}
123
124impl SharedJsonStore {
125 #[must_use]
127 pub fn new() -> Self {
128 Self {
129 inner: JsonStorage::new_temp().unwrap(),
130 }
131 }
132}
133
134impl<Args: Send + Serialize + for<'de> Deserialize<'de> + Unpin + 'static>
135 apalis_core::backend::shared::MakeShared<Args> for SharedJsonStore
136{
137 type Backend = MemoryStorage<Args, JsonMapMetadata>;
138
139 type Config = ();
140
141 type MakeError = String;
142
143 fn make_shared_with_config(
144 &mut self,
145 _: Self::Config,
146 ) -> Result<Self::Backend, Self::MakeError> {
147 let (sender, receiver) = self.inner.create_channel::<Args>();
148 let sender = MemorySink::new(Arc::new(futures_util::lock::Mutex::new(sender)));
149 Ok(MemoryStorage::new_with(sender, receiver))
150 }
151}
152
153type BoxSink<Args> = Box<
154 dyn Sink<Task<Args, JsonMapMetadata, RandomId>, Error = SendError>
155 + Send
156 + Sync
157 + Unpin
158 + 'static,
159>;
160
161impl JsonStorage<Value> {
162 fn create_channel<Args: 'static + for<'de> Deserialize<'de> + Serialize + Send + Unpin>(
163 &self,
164 ) -> (
165 BoxSink<Args>,
166 BoxStream<'static, Task<Args, JsonMapMetadata, RandomId>>,
167 ) {
168 let sender = self.clone();
170
171 let wrapped_sender = {
173 let store = self.clone();
174
175 sender.with_flat_map(move |task: Task<Args, JsonMapMetadata, RandomId>| {
176 use apalis_core::task::task_id::RandomId;
177 let task_id = task
178 .parts
179 .task_id
180 .clone()
181 .unwrap_or(TaskId::new(RandomId::default()));
182 let task = task.map(|args| serde_json::to_value(args).unwrap());
183 store
184 .insert(
185 &TaskKey {
186 task_id,
187 queue: std::any::type_name::<Args>().to_owned(),
188 status: Status::Pending,
189 },
190 TaskWithMeta {
191 args: task.args.clone(),
192 ctx: task.parts.ctx.clone(),
193 result: None,
194 },
195 )
196 .unwrap();
197 futures_util::stream::iter(vec![Ok(task)])
198 })
199 };
200
201 let filtered_stream = {
203 let inner = self.clone();
204 SharedJsonStream {
205 inner,
206 req_type: std::marker::PhantomData,
207 }
208 };
209
210 let sender = Box::new(wrapped_sender)
212 as Box<
213 dyn Sink<Task<Args, JsonMapMetadata, RandomId>, Error = SendError>
214 + Send
215 + Sync
216 + Unpin,
217 >;
218 let receiver = filtered_stream.boxed();
219
220 (sender, receiver)
221 }
222}
223#[cfg(test)]
224mod tests {
225 use std::time::Duration;
226
227 use apalis_core::error::BoxDynError;
228
229 use apalis_core::worker::context::WorkerContext;
230 use apalis_core::{
231 backend::{TaskSink, shared::MakeShared},
232 worker::{builder::WorkerBuilder, ext::event_listener::EventListenerExt},
233 };
234
235 use super::*;
236
237 const ITEMS: u32 = 10;
238
239 #[tokio::test]
240 async fn basic_shared() {
241 let mut store = SharedJsonStore::new();
242 let mut string_store = store.make_shared().unwrap();
243 let mut int_store = store.make_shared().unwrap();
244 for i in 0..ITEMS {
245 string_store.push(format!("ITEM: {i}")).await.unwrap();
246 int_store.push(i).await.unwrap();
247 }
248
249 async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
250 tokio::time::sleep(Duration::from_millis(2)).await;
251 if task == ITEMS - 1 {
252 ctx.stop()?;
253 return Err("Worker stopped!")?;
254 }
255 Ok(())
256 }
257
258 let string_worker = WorkerBuilder::new("rango-tango-string")
259 .backend(string_store)
260 .on_event(|ctx, ev| {
261 println!("CTX {:?}, On Event = {ev:?}", ctx.name());
262 })
263 .build(|req: String, ctx: WorkerContext| async move {
264 tokio::time::sleep(Duration::from_millis(2)).await;
265 println!("{req}");
266 if req.ends_with(&(ITEMS - 1).to_string()) {
267 ctx.stop().unwrap();
268 }
269 })
270 .run();
271
272 let int_worker = WorkerBuilder::new("rango-tango-int")
273 .backend(int_store)
274 .on_event(|ctx, ev| {
275 println!("CTX {:?}, On Event = {ev:?}", ctx.name());
276 })
277 .build(task)
278 .run();
279
280 let _ = futures_util::future::join(int_worker, string_worker).await;
281 }
282}