apalis_core/backend/impls/json/
backend.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use futures_channel::mpsc::SendError;
7use futures_core::{stream::BoxStream, Stream};
8use futures_util::{stream, StreamExt};
9use serde::de::DeserializeOwned;
10use serde_json::Value;
11
12use crate::{
13    backend::{
14        codec::json::JsonCodec,
15        impls::json::{
16            meta::JsonMapMetadata,
17            util::{FindFirstWith, JsonAck},
18            JsonStorage,
19        },
20        Backend, TaskStream,
21    },
22    task::{status::Status, task_id::RandomId, Task},
23    worker::{context::WorkerContext, ext::ack::AcknowledgeLayer},
24};
25
26impl<Args: 'static + Send + DeserializeOwned + Unpin> Backend<Args> for JsonStorage<Args> {
27    type IdType = RandomId;
28    type Error = SendError;
29    type Context = JsonMapMetadata;
30    type Stream = TaskStream<Task<Args, JsonMapMetadata>, SendError>;
31    type Layer = AcknowledgeLayer<JsonAck<Args>>;
32    type Beat = BoxStream<'static, Result<(), Self::Error>>;
33
34    type Codec = JsonCodec<Value>;
35
36    fn heartbeat(&self, _: &WorkerContext) -> Self::Beat {
37        stream::once(async { Ok(()) }).boxed()
38    }
39    fn middleware(&self) -> Self::Layer {
40        AcknowledgeLayer::new(JsonAck {
41            inner: self.clone(),
42        })
43    }
44    fn poll(self, _worker: &WorkerContext) -> Self::Stream {
45        let stream = self.map(|r| Ok(Some(r))).boxed();
46        stream
47    }
48}
49impl<Args: DeserializeOwned + Unpin> Stream for JsonStorage<Args> {
50    type Item = Task<Args, JsonMapMetadata>;
51
52    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
53        let map = self.tasks.try_write().unwrap();
54        if let Some((key, task)) = map.find_first_with(|s, _| {
55            s.namespace == std::any::type_name::<Args>() && s.status == Status::Pending
56        }) {
57            use crate::task::builder::TaskBuilder;
58            let key = key.clone();
59            let args = Args::deserialize(&task.args).unwrap();
60            let task = TaskBuilder::new(args)
61                .with_task_id(key.task_id.clone())
62                .with_ctx(task.ctx.clone())
63                .build();
64            drop(map);
65            let this = self.get_mut();
66            this.update_status(&key, Status::Running)
67                .expect("Failed to update status");
68            this.persist_to_disk().expect("Failed to persist to disk");
69            Poll::Ready(Some(task))
70        } else {
71            Poll::Pending
72        }
73    }
74}