apalis_core/backend/impls/json/
shared.rs1use std::{
46 pin::Pin,
47 sync::Arc,
48 task::{Context, Poll},
49};
50
51use futures_channel::mpsc::SendError;
52use futures_core::{Stream, stream::BoxStream};
53use futures_sink::Sink;
54use futures_util::{SinkExt, StreamExt};
55use serde::{Serialize, de::DeserializeOwned};
56use serde_json::Value;
57
58use crate::{
59 backend::impls::{
60 json::{
61 JsonStorage,
62 meta::JsonMapMetadata,
63 util::{FindFirstWith, TaskKey, TaskWithMeta},
64 },
65 memory::{MemorySink, MemoryStorage},
66 },
67 task::{Task, status::Status, task_id::TaskId},
68};
69
70#[derive(Debug)]
71struct SharedJsonStream<T, Ctx> {
72 inner: JsonStorage<Value>,
73 req_type: std::marker::PhantomData<(T, Ctx)>,
74}
75
76impl<Args: DeserializeOwned + Unpin> Stream for SharedJsonStream<Args, JsonMapMetadata> {
77 type Item = Task<Args, JsonMapMetadata>;
78 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
79 use crate::task::builder::TaskBuilder;
80 let map = self.inner.tasks.try_read().expect("Failed to read tasks");
81 if let Some((key, _)) = map.find_first_with(|k, _| {
82 k.queue == std::any::type_name::<Args>() && k.status == Status::Pending
83 }) {
84 let task = map.get(key).unwrap();
85 let args = match Args::deserialize(&task.args) {
86 Ok(value) => value,
87 Err(_) => return Poll::Pending,
88 };
89 let task = TaskBuilder::new(args)
90 .with_task_id(key.task_id.clone())
91 .with_ctx(task.ctx.clone())
92 .build();
93 let key = key.clone();
94 drop(map);
95 let this = &mut self.get_mut().inner;
96 this.update_status(&key, Status::Running)
97 .expect("Failed to update status");
98 this.persist_to_disk().expect("Failed to persist to disk");
99 Poll::Ready(Some(task))
100 } else {
101 Poll::Pending
102 }
103 }
104}
105#[derive(Debug)]
113pub struct SharedJsonStore {
114 inner: JsonStorage<serde_json::Value>,
115}
116
117impl Default for SharedJsonStore {
118 fn default() -> Self {
119 Self::new()
120 }
121}
122
123impl SharedJsonStore {
124 #[must_use]
126 pub fn new() -> Self {
127 Self {
128 inner: JsonStorage::new_temp().unwrap(),
129 }
130 }
131}
132
133impl<Args: Send + Serialize + DeserializeOwned + Unpin + 'static>
134 crate::backend::shared::MakeShared<Args> for SharedJsonStore
135{
136 type Backend = MemoryStorage<Args, JsonMapMetadata>;
137
138 type Config = ();
139
140 type MakeError = String;
141
142 fn make_shared_with_config(
143 &mut self,
144 _: Self::Config,
145 ) -> Result<Self::Backend, Self::MakeError> {
146 let (sender, receiver) = self.inner.create_channel::<Args>();
147 Ok(MemoryStorage {
148 sender: MemorySink {
149 inner: Arc::new(futures_util::lock::Mutex::new(sender)),
150 },
151 receiver,
152 })
153 }
154}
155
156type BoxSink<Args> =
157 Box<dyn Sink<Task<Args, JsonMapMetadata>, Error = SendError> + Send + Sync + Unpin + 'static>;
158
159impl JsonStorage<Value> {
160 fn create_channel<Args: 'static + DeserializeOwned + Serialize + Send + Unpin>(
161 &self,
162 ) -> (
163 BoxSink<Args>,
164 BoxStream<'static, Task<Args, JsonMapMetadata>>,
165 ) {
166 let sender = self.clone();
168
169 let wrapped_sender = {
171 let store = self.clone();
172
173 sender.with_flat_map(move |task: Task<Args, JsonMapMetadata>| {
174 use crate::task::task_id::RandomId;
175 let task_id = task
176 .parts
177 .task_id
178 .clone()
179 .unwrap_or(TaskId::new(RandomId::default()));
180 let task = task.map(|args| serde_json::to_value(args).unwrap());
181 store
182 .insert(
183 &TaskKey {
184 task_id,
185 queue: std::any::type_name::<Args>().to_owned(),
186 status: Status::Pending,
187 },
188 TaskWithMeta {
189 args: task.args.clone(),
190 ctx: task.parts.ctx.clone(),
191 result: None,
192 },
193 )
194 .unwrap();
195 futures_util::stream::iter(vec![Ok(task)])
196 })
197 };
198
199 let filtered_stream = {
201 let inner = self.clone();
202 SharedJsonStream {
203 inner,
204 req_type: std::marker::PhantomData,
205 }
206 };
207
208 let sender = Box::new(wrapped_sender)
210 as Box<dyn Sink<Task<Args, JsonMapMetadata>, Error = SendError> + Send + Sync + Unpin>;
211 let receiver = filtered_stream.boxed();
212
213 (sender, receiver)
214 }
215}
216#[cfg(test)]
217mod tests {
218 use std::time::Duration;
219
220 use crate::error::BoxDynError;
221
222 use crate::worker::context::WorkerContext;
223 use crate::{
224 backend::{TaskSink, shared::MakeShared},
225 worker::{builder::WorkerBuilder, ext::event_listener::EventListenerExt},
226 };
227
228 use super::*;
229
230 const ITEMS: u32 = 10;
231
232 #[tokio::test]
233 async fn basic_shared() {
234 let mut store = SharedJsonStore::new();
235 let mut string_store = store.make_shared().unwrap();
236 let mut int_store = store.make_shared().unwrap();
237 for i in 0..ITEMS {
238 string_store.push(format!("ITEM: {i}")).await.unwrap();
239 int_store.push(i).await.unwrap();
240 }
241
242 async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
243 tokio::time::sleep(Duration::from_millis(2)).await;
244 if task == ITEMS - 1 {
245 ctx.stop()?;
246 return Err("Worker stopped!")?;
247 }
248 Ok(())
249 }
250
251 let string_worker = WorkerBuilder::new("rango-tango-string")
252 .backend(string_store)
253 .on_event(|ctx, ev| {
254 println!("CTX {:?}, On Event = {ev:?}", ctx.name());
255 })
256 .build(|req: String, ctx: WorkerContext| async move {
257 tokio::time::sleep(Duration::from_millis(2)).await;
258 println!("{req}");
259 if req.ends_with(&(ITEMS - 1).to_string()) {
260 ctx.stop().unwrap();
261 }
262 })
263 .run();
264
265 let int_worker = WorkerBuilder::new("rango-tango-int")
266 .backend(int_store)
267 .on_event(|ctx, ev| {
268 println!("CTX {:?}, On Event = {ev:?}", ctx.name());
269 })
270 .build(task)
271 .run();
272
273 let _ = futures_util::future::join(int_worker, string_worker).await;
274 }
275}