apalis_core/backend/impls/json/
backend.rs1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use futures_channel::mpsc::SendError;
7use futures_core::{Stream, stream::BoxStream};
8use futures_util::{StreamExt, TryStreamExt, stream};
9use serde::{Serialize, de::DeserializeOwned};
10use serde_json::Value;
11
12use crate::{
13 backend::{
14 Backend, BackendExt, ConfigExt, TaskStream,
15 codec::json::JsonCodec,
16 impls::json::{
17 JsonStorage,
18 meta::JsonMapMetadata,
19 util::{FindFirstWith, JsonAck},
20 },
21 queue::Queue,
22 },
23 task::{Task, status::Status, task_id::RandomId},
24 worker::{context::WorkerContext, ext::ack::AcknowledgeLayer},
25};
26
27impl<Args> Backend for JsonStorage<Args>
28where
29 Args: 'static + Send + Serialize + DeserializeOwned + Unpin,
30{
31 type Args = Args;
32 type IdType = RandomId;
33 type Error = SendError;
34 type Context = JsonMapMetadata;
35 type Stream = TaskStream<Task<Args, JsonMapMetadata>, SendError>;
36 type Layer = AcknowledgeLayer<JsonAck<Args>>;
37 type Beat = BoxStream<'static, Result<(), Self::Error>>;
38
39 fn heartbeat(&self, _: &WorkerContext) -> Self::Beat {
40 stream::once(async { Ok(()) }).boxed()
41 }
42 fn middleware(&self) -> Self::Layer {
43 AcknowledgeLayer::new(JsonAck {
44 inner: self.clone(),
45 })
46 }
47 fn poll(self, _worker: &WorkerContext) -> Self::Stream {
48 (self.map(|r| Ok(Some(r))).boxed()) as _
49 }
50}
51
52impl<Args: 'static + Send + Serialize + DeserializeOwned + Unpin> BackendExt for JsonStorage<Args> {
53 type Codec = JsonCodec<Value>;
54 type Compact = Value;
55
56 type CompactStream = TaskStream<Task<Self::Compact, JsonMapMetadata>, SendError>;
57
58 fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream {
59 self.poll(worker)
60 .map_ok(|c| {
61 c.map(|t| t.map(|args| serde_json::to_value(args).expect("to be encodable")))
62 })
63 .boxed()
64 }
65}
66
67impl<Args> ConfigExt for JsonStorage<Args>
68where
69 Args: 'static + Send + Serialize + DeserializeOwned + Unpin,
70{
71 fn get_queue(&self) -> Queue {
72 Queue::from(std::any::type_name::<Args>())
73 }
74}
75impl<Args: DeserializeOwned + Unpin> Stream for JsonStorage<Args> {
76 type Item = Task<Args, JsonMapMetadata>;
77
78 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
79 let map = self.tasks.try_write().unwrap();
80 if let Some((key, task)) = map.find_first_with(|s, _| {
81 s.queue == std::any::type_name::<Args>() && s.status == Status::Pending
82 }) {
83 use crate::task::builder::TaskBuilder;
84 let key = key.clone();
85 let args = Args::deserialize(&task.args).unwrap();
86 let task = TaskBuilder::new(args)
87 .with_task_id(key.task_id.clone())
88 .with_ctx(task.ctx.clone())
89 .build();
90 drop(map);
91 let this = self.get_mut();
92 this.update_status(&key, Status::Running)
93 .expect("Failed to update status");
94 this.persist_to_disk().expect("Failed to persist to disk");
95 Poll::Ready(Some(task))
96 } else {
97 Poll::Pending
98 }
99 }
100}