apalis_core/backend/impls/json/
backend.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use futures_channel::mpsc::SendError;
7use futures_core::{Stream, stream::BoxStream};
8use futures_util::{StreamExt, TryStreamExt, stream};
9use serde::{Serialize, de::DeserializeOwned};
10use serde_json::Value;
11
12use crate::{
13    backend::{
14        Backend, BackendExt, ConfigExt, TaskStream,
15        codec::json::JsonCodec,
16        impls::json::{
17            JsonStorage,
18            meta::JsonMapMetadata,
19            util::{FindFirstWith, JsonAck},
20        },
21        queue::Queue,
22    },
23    task::{Task, status::Status, task_id::RandomId},
24    worker::{context::WorkerContext, ext::ack::AcknowledgeLayer},
25};
26
27impl<Args> Backend for JsonStorage<Args>
28where
29    Args: 'static + Send + Serialize + DeserializeOwned + Unpin,
30{
31    type Args = Args;
32    type IdType = RandomId;
33    type Error = SendError;
34    type Context = JsonMapMetadata;
35    type Stream = TaskStream<Task<Args, JsonMapMetadata>, SendError>;
36    type Layer = AcknowledgeLayer<JsonAck<Args>>;
37    type Beat = BoxStream<'static, Result<(), Self::Error>>;
38
39    fn heartbeat(&self, _: &WorkerContext) -> Self::Beat {
40        stream::once(async { Ok(()) }).boxed()
41    }
42    fn middleware(&self) -> Self::Layer {
43        AcknowledgeLayer::new(JsonAck {
44            inner: self.clone(),
45        })
46    }
47    fn poll(self, _worker: &WorkerContext) -> Self::Stream {
48        (self.map(|r| Ok(Some(r))).boxed()) as _
49    }
50}
51
52impl<Args: 'static + Send + Serialize + DeserializeOwned + Unpin> BackendExt for JsonStorage<Args> {
53    type Codec = JsonCodec<Value>;
54    type Compact = Value;
55
56    type CompactStream = TaskStream<Task<Self::Compact, JsonMapMetadata>, SendError>;
57
58    fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream {
59        self.poll(worker)
60            .map_ok(|c| {
61                c.map(|t| t.map(|args| serde_json::to_value(args).expect("to be encodable")))
62            })
63            .boxed()
64    }
65}
66
67impl<Args> ConfigExt for JsonStorage<Args>
68where
69    Args: 'static + Send + Serialize + DeserializeOwned + Unpin,
70{
71    fn get_queue(&self) -> Queue {
72        Queue::from(std::any::type_name::<Args>())
73    }
74}
75impl<Args: DeserializeOwned + Unpin> Stream for JsonStorage<Args> {
76    type Item = Task<Args, JsonMapMetadata>;
77
78    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
79        let map = self.tasks.try_write().unwrap();
80        if let Some((key, task)) = map.find_first_with(|s, _| {
81            s.queue == std::any::type_name::<Args>() && s.status == Status::Pending
82        }) {
83            use crate::task::builder::TaskBuilder;
84            let key = key.clone();
85            let args = Args::deserialize(&task.args).unwrap();
86            let task = TaskBuilder::new(args)
87                .with_task_id(key.task_id.clone())
88                .with_ctx(task.ctx.clone())
89                .build();
90            drop(map);
91            let this = self.get_mut();
92            this.update_status(&key, Status::Running)
93                .expect("Failed to update status");
94            this.persist_to_disk().expect("Failed to persist to disk");
95            Poll::Ready(Some(task))
96        } else {
97            Poll::Pending
98        }
99    }
100}