apalis_core/backend/impls/json/
shared.rs1use std::{
46 pin::Pin,
47 sync::Arc,
48 task::{Context, Poll},
49};
50
51use futures_channel::mpsc::SendError;
52use futures_core::Stream;
53use futures_sink::Sink;
54use futures_util::{SinkExt, StreamExt};
55use serde::{Serialize, de::DeserializeOwned};
56use serde_json::Value;
57
58use crate::{
59 backend::impls::{
60 json::{
61 JsonStorage,
62 meta::JsonMapMetadata,
63 util::{FindFirstWith, TaskKey, TaskWithMeta},
64 },
65 memory::{MemorySink, MemoryStorage},
66 },
67 task::{Task, status::Status, task_id::TaskId},
68};
69
70#[derive(Debug)]
71struct SharedJsonStream<T, Ctx> {
72 inner: JsonStorage<Value>,
73 req_type: std::marker::PhantomData<(T, Ctx)>,
74}
75
76impl<Args: DeserializeOwned + Unpin> Stream for SharedJsonStream<Args, JsonMapMetadata> {
77 type Item = Task<Args, JsonMapMetadata>;
78 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
79 use crate::task::builder::TaskBuilder;
80 let map = self.inner.tasks.try_read().expect("Failed to read tasks");
81 if let Some((key, _)) = map.find_first_with(|k, _| {
82 &k.queue == std::any::type_name::<Args>() && k.status == Status::Pending
83 }) {
84 let task = map.get(&key).unwrap();
85 let args = match Args::deserialize(&task.args) {
86 Ok(value) => value,
87 Err(_) => return Poll::Pending,
88 };
89 let task = TaskBuilder::new(args)
90 .with_task_id(key.task_id.clone())
91 .with_ctx(task.ctx.clone())
92 .with_queue(&key.queue)
93 .build();
94 let key = key.clone();
95 drop(map);
96 let this = &mut self.get_mut().inner;
97 this.update_status(&key, Status::Running)
98 .expect("Failed to update status");
99 this.persist_to_disk().expect("Failed to persist to disk");
100 Poll::Ready(Some(task))
101 } else {
102 Poll::Pending
103 }
104 }
105}
106#[derive(Debug)]
114pub struct SharedJsonStore {
115 inner: JsonStorage<serde_json::Value>,
116}
117
118impl SharedJsonStore {
119 pub fn new() -> Self {
121 Self {
122 inner: JsonStorage::new_temp().unwrap(),
123 }
124 }
125}
126
127impl<Args: Send + Serialize + DeserializeOwned + Unpin + 'static>
128 crate::backend::shared::MakeShared<Args> for SharedJsonStore
129{
130 type Backend = MemoryStorage<Args, JsonMapMetadata>;
131
132 type Config = ();
133
134 type MakeError = String;
135
136 fn make_shared_with_config(
137 &mut self,
138 _: Self::Config,
139 ) -> Result<Self::Backend, Self::MakeError> {
140 let (sender, receiver) = self.inner.create_channel::<Args>();
141 Ok(MemoryStorage {
142 sender: MemorySink {
143 inner: Arc::new(futures_util::lock::Mutex::new(sender)),
144 },
145 receiver,
146 })
147 }
148}
149
150impl JsonStorage<Value> {
151 fn create_channel<Args: 'static + DeserializeOwned + Serialize + Send + Unpin>(
152 &self,
153 ) -> (
154 Box<
155 dyn Sink<Task<Args, JsonMapMetadata>, Error = SendError>
156 + Send
157 + Sync
158 + Unpin
159 + 'static,
160 >,
161 Pin<Box<dyn Stream<Item = Task<Args, JsonMapMetadata>> + Send>>,
162 ) {
163 let sender = self.clone();
165
166 let wrapped_sender = {
168 let mut store = self.clone();
169
170 sender.with_flat_map(move |task: Task<Args, JsonMapMetadata>| {
171 use crate::task::task_id::RandomId;
172 let task_id = task
173 .parts
174 .task_id
175 .clone()
176 .unwrap_or(TaskId::new(RandomId::default()));
177 let task = task.map(|args| serde_json::to_value(args).unwrap());
178 store
179 .insert(
180 &TaskKey {
181 task_id,
182 queue: std::any::type_name::<Args>().to_owned(),
183 status: Status::Pending,
184 },
185 TaskWithMeta {
186 args: task.args.clone(),
187 ctx: task.parts.ctx.clone(),
188 result: None,
189 },
190 )
191 .unwrap();
192 futures_util::stream::iter(vec![Ok(task)])
193 })
194 };
195
196 let filtered_stream = {
198 let inner = self.clone();
199 SharedJsonStream {
200 inner,
201 req_type: std::marker::PhantomData,
202 }
203 };
204
205 let sender = Box::new(wrapped_sender)
207 as Box<dyn Sink<Task<Args, JsonMapMetadata>, Error = SendError> + Send + Sync + Unpin>;
208 let receiver = filtered_stream.boxed();
209
210 (sender, receiver)
211 }
212}
213#[cfg(test)]
214mod tests {
215 use std::time::Duration;
216
217 use crate::error::BoxDynError;
218
219 use crate::worker::context::WorkerContext;
220 use crate::{
221 backend::{TaskSink, shared::MakeShared},
222 worker::{builder::WorkerBuilder, ext::event_listener::EventListenerExt},
223 };
224
225 use super::*;
226
227 const ITEMS: u32 = 10;
228
229 #[tokio::test]
230 async fn basic_shared() {
231 let mut store = SharedJsonStore::new();
232 let mut string_store = store.make_shared().unwrap();
233 let mut int_store = store.make_shared().unwrap();
234 for i in 0..ITEMS {
235 string_store.push(format!("ITEM: {i}")).await.unwrap();
236 int_store.push(i).await.unwrap();
237 }
238
239 async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
240 tokio::time::sleep(Duration::from_millis(2)).await;
241 if task == ITEMS - 1 {
242 ctx.stop()?;
243 return Err("Worker stopped!")?;
244 }
245 Ok(())
246 }
247
248 let string_worker = WorkerBuilder::new("rango-tango-string")
249 .backend(string_store)
250 .on_event(|ctx, ev| {
251 println!("CTX {:?}, On Event = {:?}", ctx.name(), ev);
252 })
253 .build(|req: String, ctx: WorkerContext| async move {
254 tokio::time::sleep(Duration::from_millis(2)).await;
255 println!("{req}");
256 if req.ends_with(&(ITEMS - 1).to_string()) {
257 ctx.stop().unwrap();
258 }
259 })
260 .run();
261
262 let int_worker = WorkerBuilder::new("rango-tango-int")
263 .backend(int_store)
264 .on_event(|ctx, ev| {
265 println!("CTX {:?}, On Event = {:?}", ctx.name(), ev);
266 })
267 .build(task)
268 .run();
269
270 let _ = futures_util::future::join(int_worker, string_worker).await;
271 }
272}