apalis_core/backend/impls/json/
backend.rs1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use futures_channel::mpsc::SendError;
7use futures_core::{stream::BoxStream, Stream};
8use futures_util::{stream, StreamExt};
9use serde::de::DeserializeOwned;
10use serde_json::Value;
11
12use crate::{
13 backend::{
14 codec::json::JsonCodec,
15 impls::json::{
16 meta::JsonMapMetadata,
17 util::{FindFirstWith, JsonAck},
18 JsonStorage,
19 },
20 Backend, TaskStream,
21 },
22 task::{status::Status, task_id::RandomId, Task},
23 worker::{context::WorkerContext, ext::ack::AcknowledgeLayer},
24};
25
26impl<Args: 'static + Send + DeserializeOwned + Unpin> Backend<Args> for JsonStorage<Args> {
27 type IdType = RandomId;
28 type Error = SendError;
29 type Context = JsonMapMetadata;
30 type Stream = TaskStream<Task<Args, JsonMapMetadata>, SendError>;
31 type Layer = AcknowledgeLayer<JsonAck<Args>>;
32 type Beat = BoxStream<'static, Result<(), Self::Error>>;
33
34 type Codec = JsonCodec<Value>;
35
36 fn heartbeat(&self, _: &WorkerContext) -> Self::Beat {
37 stream::once(async { Ok(()) }).boxed()
38 }
39 fn middleware(&self) -> Self::Layer {
40 AcknowledgeLayer::new(JsonAck {
41 inner: self.clone(),
42 })
43 }
44 fn poll(self, _worker: &WorkerContext) -> Self::Stream {
45 let stream = self.map(|r| Ok(Some(r))).boxed();
46 stream
47 }
48}
49impl<Args: DeserializeOwned + Unpin> Stream for JsonStorage<Args> {
50 type Item = Task<Args, JsonMapMetadata>;
51
52 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
53 let map = self.tasks.try_write().unwrap();
54 if let Some((key, task)) = map.find_first_with(|s, _| {
55 s.namespace == std::any::type_name::<Args>() && s.status == Status::Pending
56 }) {
57 use crate::task::builder::TaskBuilder;
58 let key = key.clone();
59 let args = Args::deserialize(&task.args).unwrap();
60 let task = TaskBuilder::new(args)
61 .with_task_id(key.task_id.clone())
62 .with_ctx(task.ctx.clone())
63 .build();
64 drop(map);
65 let this = self.get_mut();
66 this.update_status(&key, Status::Running)
67 .expect("Failed to update status");
68 this.persist_to_disk().expect("Failed to persist to disk");
69 Poll::Ready(Some(task))
70 } else {
71 Poll::Pending
72 }
73 }
74}