apalis_core/backend/impls/json/
sink.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use futures_channel::mpsc::SendError;
7use futures_sink::Sink;
8use serde::Serialize;
9use serde_json::Value;
10
11use crate::{
12    backend::impls::json::{
13        JsonStorage,
14        meta::JsonMapMetadata,
15        util::{TaskKey, TaskWithMeta},
16    },
17    task::{Task, task_id::TaskId},
18};
19
20impl<Args: Unpin + Serialize> Sink<Task<Value, JsonMapMetadata>> for JsonStorage<Args> {
21    type Error = SendError;
22
23    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
24        Poll::Ready(Ok(()))
25    }
26
27    fn start_send(
28        self: Pin<&mut Self>,
29        item: Task<Value, JsonMapMetadata>,
30    ) -> Result<(), Self::Error> {
31        let this = Pin::get_mut(self);
32
33        this.buffer.push(item);
34        Ok(())
35    }
36
37    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
38        let this = Pin::get_mut(self);
39        let tasks: Vec<_> = this.buffer.drain(..).collect();
40        for task in tasks {
41            use crate::task::task_id::RandomId;
42
43            let task_id = task
44                .parts
45                .task_id
46                .clone()
47                .unwrap_or(TaskId::new(RandomId::default()));
48
49            let key = TaskKey {
50                task_id,
51                queue: std::any::type_name::<Args>().to_owned(),
52                status: crate::task::status::Status::Pending,
53            };
54            this.insert(
55                &key,
56                TaskWithMeta {
57                    args: task.args,
58                    ctx: task.parts.ctx,
59                    result: None,
60                },
61            )
62            .unwrap();
63        }
64        Poll::Ready(Ok(()))
65    }
66
67    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
68        Sink::<Task<Value, JsonMapMetadata>>::poll_flush(self, cx)
69    }
70}