apalis_core/backend/impls/json/
sink.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use futures_channel::mpsc::SendError;
7use futures_sink::Sink;
8use serde::{Serialize, de::DeserializeOwned};
9use serde_json::Value;
10
11use crate::{
12    backend::impls::json::{
13        JsonStorage,
14        meta::JsonMapMetadata,
15        util::{TaskKey, TaskWithMeta},
16    },
17    task::{Task, task_id::TaskId},
18};
19
20impl<Args: Unpin + Serialize + DeserializeOwned> Sink<Task<Value, JsonMapMetadata>>
21    for JsonStorage<Args>
22{
23    type Error = SendError;
24
25    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
26        Poll::Ready(Ok(()))
27    }
28
29    fn start_send(
30        self: Pin<&mut Self>,
31        item: Task<Value, JsonMapMetadata>,
32    ) -> Result<(), Self::Error> {
33        let this = Pin::get_mut(self);
34
35        this.buffer.push(item);
36        Ok(())
37    }
38
39    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
40        let this = Pin::get_mut(self);
41        let tasks: Vec<_> = this.buffer.drain(..).collect();
42        for task in tasks {
43            use crate::task::task_id::RandomId;
44
45            let task_id = task
46                .parts
47                .task_id
48                .clone()
49                .unwrap_or(TaskId::new(RandomId::default()));
50
51            let key = TaskKey {
52                task_id,
53                queue: std::any::type_name::<Args>().to_owned(),
54                status: crate::task::status::Status::Pending,
55            };
56            this.insert(
57                &key,
58                TaskWithMeta {
59                    args: task.args,
60                    ctx: task.parts.ctx,
61                    result: None,
62                },
63            )
64            .unwrap();
65        }
66        Poll::Ready(Ok(()))
67    }
68
69    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
70        Sink::<Task<Value, JsonMapMetadata>>::poll_flush(self, cx)
71    }
72}