apalis_core/backend/impls/json/
shared.rs1use std::{
46 pin::Pin,
47 sync::Arc,
48 task::{Context, Poll},
49};
50
51use futures_channel::mpsc::SendError;
52use futures_core::{Stream, stream::BoxStream};
53use futures_sink::Sink;
54use futures_util::{SinkExt, StreamExt};
55use serde::{Serialize, de::DeserializeOwned};
56use serde_json::Value;
57
58use crate::{
59 backend::impls::{
60 json::{
61 JsonStorage,
62 meta::JsonMapMetadata,
63 util::{FindFirstWith, TaskKey, TaskWithMeta},
64 },
65 memory::{MemorySink, MemoryStorage},
66 },
67 task::{
68 Task,
69 status::Status,
70 task_id::{RandomId, TaskId},
71 },
72};
73
74#[derive(Debug)]
75struct SharedJsonStream<T, Ctx> {
76 inner: JsonStorage<Value>,
77 req_type: std::marker::PhantomData<(T, Ctx)>,
78}
79
80impl<Args: DeserializeOwned + Unpin> Stream for SharedJsonStream<Args, JsonMapMetadata> {
81 type Item = Task<Args, JsonMapMetadata, RandomId>;
82 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
83 use crate::task::builder::TaskBuilder;
84 let map = self.inner.tasks.try_read().expect("Failed to read tasks");
85 if let Some((key, _)) = map.find_first_with(|k, _| {
86 k.queue == std::any::type_name::<Args>() && k.status == Status::Pending
87 }) {
88 let task = map.get(key).unwrap();
89 let args = match Args::deserialize(&task.args) {
90 Ok(value) => value,
91 Err(_) => return Poll::Pending,
92 };
93 let task = TaskBuilder::new(args)
94 .with_task_id(key.task_id.clone())
95 .with_ctx(task.ctx.clone())
96 .build();
97 let key = key.clone();
98 drop(map);
99 let this = &mut self.get_mut().inner;
100 this.update_status(&key, Status::Running)
101 .expect("Failed to update status");
102 this.persist_to_disk().expect("Failed to persist to disk");
103 Poll::Ready(Some(task))
104 } else {
105 Poll::Pending
106 }
107 }
108}
109#[derive(Debug)]
117pub struct SharedJsonStore {
118 inner: JsonStorage<serde_json::Value>,
119}
120
121impl Default for SharedJsonStore {
122 fn default() -> Self {
123 Self::new()
124 }
125}
126
127impl SharedJsonStore {
128 #[must_use]
130 pub fn new() -> Self {
131 Self {
132 inner: JsonStorage::new_temp().unwrap(),
133 }
134 }
135}
136
137impl<Args: Send + Serialize + DeserializeOwned + Unpin + 'static>
138 crate::backend::shared::MakeShared<Args> for SharedJsonStore
139{
140 type Backend = MemoryStorage<Args, JsonMapMetadata>;
141
142 type Config = ();
143
144 type MakeError = String;
145
146 fn make_shared_with_config(
147 &mut self,
148 _: Self::Config,
149 ) -> Result<Self::Backend, Self::MakeError> {
150 let (sender, receiver) = self.inner.create_channel::<Args>();
151 Ok(MemoryStorage {
152 sender: MemorySink {
153 inner: Arc::new(futures_util::lock::Mutex::new(sender)),
154 },
155 receiver,
156 })
157 }
158}
159
160type BoxSink<Args> = Box<
161 dyn Sink<Task<Args, JsonMapMetadata, RandomId>, Error = SendError>
162 + Send
163 + Sync
164 + Unpin
165 + 'static,
166>;
167
168impl JsonStorage<Value> {
169 fn create_channel<Args: 'static + DeserializeOwned + Serialize + Send + Unpin>(
170 &self,
171 ) -> (
172 BoxSink<Args>,
173 BoxStream<'static, Task<Args, JsonMapMetadata, RandomId>>,
174 ) {
175 let sender = self.clone();
177
178 let wrapped_sender = {
180 let store = self.clone();
181
182 sender.with_flat_map(move |task: Task<Args, JsonMapMetadata, RandomId>| {
183 use crate::task::task_id::RandomId;
184 let task_id = task
185 .parts
186 .task_id
187 .clone()
188 .unwrap_or(TaskId::new(RandomId::default()));
189 let task = task.map(|args| serde_json::to_value(args).unwrap());
190 store
191 .insert(
192 &TaskKey {
193 task_id,
194 queue: std::any::type_name::<Args>().to_owned(),
195 status: Status::Pending,
196 },
197 TaskWithMeta {
198 args: task.args.clone(),
199 ctx: task.parts.ctx.clone(),
200 result: None,
201 },
202 )
203 .unwrap();
204 futures_util::stream::iter(vec![Ok(task)])
205 })
206 };
207
208 let filtered_stream = {
210 let inner = self.clone();
211 SharedJsonStream {
212 inner,
213 req_type: std::marker::PhantomData,
214 }
215 };
216
217 let sender = Box::new(wrapped_sender)
219 as Box<
220 dyn Sink<Task<Args, JsonMapMetadata, RandomId>, Error = SendError>
221 + Send
222 + Sync
223 + Unpin,
224 >;
225 let receiver = filtered_stream.boxed();
226
227 (sender, receiver)
228 }
229}
230#[cfg(test)]
231mod tests {
232 use std::time::Duration;
233
234 use crate::error::BoxDynError;
235
236 use crate::worker::context::WorkerContext;
237 use crate::{
238 backend::{TaskSink, shared::MakeShared},
239 worker::{builder::WorkerBuilder, ext::event_listener::EventListenerExt},
240 };
241
242 use super::*;
243
244 const ITEMS: u32 = 10;
245
246 #[tokio::test]
247 async fn basic_shared() {
248 let mut store = SharedJsonStore::new();
249 let mut string_store = store.make_shared().unwrap();
250 let mut int_store = store.make_shared().unwrap();
251 for i in 0..ITEMS {
252 string_store.push(format!("ITEM: {i}")).await.unwrap();
253 int_store.push(i).await.unwrap();
254 }
255
256 async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
257 tokio::time::sleep(Duration::from_millis(2)).await;
258 if task == ITEMS - 1 {
259 ctx.stop()?;
260 return Err("Worker stopped!")?;
261 }
262 Ok(())
263 }
264
265 let string_worker = WorkerBuilder::new("rango-tango-string")
266 .backend(string_store)
267 .on_event(|ctx, ev| {
268 println!("CTX {:?}, On Event = {ev:?}", ctx.name());
269 })
270 .build(|req: String, ctx: WorkerContext| async move {
271 tokio::time::sleep(Duration::from_millis(2)).await;
272 println!("{req}");
273 if req.ends_with(&(ITEMS - 1).to_string()) {
274 ctx.stop().unwrap();
275 }
276 })
277 .run();
278
279 let int_worker = WorkerBuilder::new("rango-tango-int")
280 .backend(int_store)
281 .on_event(|ctx, ev| {
282 println!("CTX {:?}, On Event = {ev:?}", ctx.name());
283 })
284 .build(task)
285 .run();
286
287 let _ = futures_util::future::join(int_worker, string_worker).await;
288 }
289}