apalis_core/backend/impls/json/
backend.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use futures_channel::mpsc::SendError;
7use futures_core::{Stream, stream::BoxStream};
8use futures_util::{StreamExt, stream};
9use serde::{Serialize, de::DeserializeOwned};
10use serde_json::Value;
11
12use crate::{
13    backend::{
14        Backend, ConfigExt, TaskStream,
15        codec::json::JsonCodec,
16        impls::json::{
17            JsonStorage,
18            meta::JsonMapMetadata,
19            util::{FindFirstWith, JsonAck},
20        },
21        queue::Queue,
22    },
23    task::{Task, status::Status, task_id::RandomId},
24    worker::{context::WorkerContext, ext::ack::AcknowledgeLayer},
25};
26
27impl<Args> Backend for JsonStorage<Args>
28where
29    Args: 'static + Send + Serialize + DeserializeOwned + Unpin,
30{
31    type Args = Args;
32    type IdType = RandomId;
33    type Error = SendError;
34    type Context = JsonMapMetadata;
35    type Stream = TaskStream<Task<Args, JsonMapMetadata>, SendError>;
36    type Layer = AcknowledgeLayer<JsonAck<Args>>;
37    type Beat = BoxStream<'static, Result<(), Self::Error>>;
38
39    type Codec = JsonCodec<Value>;
40
41    type Compact = Value;
42
43    fn heartbeat(&self, _: &WorkerContext) -> Self::Beat {
44        stream::once(async { Ok(()) }).boxed()
45    }
46    fn middleware(&self) -> Self::Layer {
47        AcknowledgeLayer::new(JsonAck {
48            inner: self.clone(),
49        })
50    }
51    fn poll(self, _worker: &WorkerContext) -> Self::Stream {
52        let stream = self.map(|r| Ok(Some(r))).boxed();
53        stream
54    }
55}
56
57impl<Args> ConfigExt for JsonStorage<Args>
58where
59    Args: 'static + Send + Serialize + DeserializeOwned + Unpin,
60{
61    fn get_queue(&self) -> Queue {
62        Queue::from(std::any::type_name::<Args>())
63    }
64}
65impl<Args: DeserializeOwned + Unpin> Stream for JsonStorage<Args> {
66    type Item = Task<Args, JsonMapMetadata>;
67
68    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
69        let map = self.tasks.try_write().unwrap();
70        if let Some((key, task)) = map.find_first_with(|s, _| {
71            s.queue == std::any::type_name::<Args>() && s.status == Status::Pending
72        }) {
73            use crate::task::builder::TaskBuilder;
74            let key = key.clone();
75            let args = Args::deserialize(&task.args).unwrap();
76            let task = TaskBuilder::new(args)
77                .with_task_id(key.task_id.clone())
78                .with_ctx(task.ctx.clone())
79                .build();
80            drop(map);
81            let this = self.get_mut();
82            this.update_status(&key, Status::Running)
83                .expect("Failed to update status");
84            this.persist_to_disk().expect("Failed to persist to disk");
85            Poll::Ready(Some(task))
86        } else {
87            Poll::Pending
88        }
89    }
90}