apalis_core/backend/impls/json/
backend.rs1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use futures_channel::mpsc::SendError;
7use futures_core::{Stream, stream::BoxStream};
8use futures_util::{StreamExt, stream};
9use serde::{Serialize, de::DeserializeOwned};
10use serde_json::Value;
11
12use crate::{
13 backend::{
14 Backend, ConfigExt, TaskStream,
15 codec::json::JsonCodec,
16 impls::json::{
17 JsonStorage,
18 meta::JsonMapMetadata,
19 util::{FindFirstWith, JsonAck},
20 },
21 queue::Queue,
22 },
23 task::{Task, status::Status, task_id::RandomId},
24 worker::{context::WorkerContext, ext::ack::AcknowledgeLayer},
25};
26
27impl<Args> Backend for JsonStorage<Args>
28where
29 Args: 'static + Send + Serialize + DeserializeOwned + Unpin,
30{
31 type Args = Args;
32 type IdType = RandomId;
33 type Error = SendError;
34 type Context = JsonMapMetadata;
35 type Stream = TaskStream<Task<Args, JsonMapMetadata>, SendError>;
36 type Layer = AcknowledgeLayer<JsonAck<Args>>;
37 type Beat = BoxStream<'static, Result<(), Self::Error>>;
38
39 type Codec = JsonCodec<Value>;
40
41 type Compact = Value;
42
43 fn heartbeat(&self, _: &WorkerContext) -> Self::Beat {
44 stream::once(async { Ok(()) }).boxed()
45 }
46 fn middleware(&self) -> Self::Layer {
47 AcknowledgeLayer::new(JsonAck {
48 inner: self.clone(),
49 })
50 }
51 fn poll(self, _worker: &WorkerContext) -> Self::Stream {
52 let stream = self.map(|r| Ok(Some(r))).boxed();
53 stream
54 }
55}
56
57impl<Args> ConfigExt for JsonStorage<Args>
58where
59 Args: 'static + Send + Serialize + DeserializeOwned + Unpin,
60{
61 fn get_queue(&self) -> Queue {
62 Queue::from(std::any::type_name::<Args>())
63 }
64}
65impl<Args: DeserializeOwned + Unpin> Stream for JsonStorage<Args> {
66 type Item = Task<Args, JsonMapMetadata>;
67
68 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
69 let map = self.tasks.try_write().unwrap();
70 if let Some((key, task)) = map.find_first_with(|s, _| {
71 s.queue == std::any::type_name::<Args>() && s.status == Status::Pending
72 }) {
73 use crate::task::builder::TaskBuilder;
74 let key = key.clone();
75 let args = Args::deserialize(&task.args).unwrap();
76 let task = TaskBuilder::new(args)
77 .with_task_id(key.task_id.clone())
78 .with_ctx(task.ctx.clone())
79 .build();
80 drop(map);
81 let this = self.get_mut();
82 this.update_status(&key, Status::Running)
83 .expect("Failed to update status");
84 this.persist_to_disk().expect("Failed to persist to disk");
85 Poll::Ready(Some(task))
86 } else {
87 Poll::Pending
88 }
89 }
90}