apalis_core/backend/impls/json/
sink.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use futures_channel::mpsc::SendError;
7use futures_sink::Sink;
8use serde::{Serialize, de::DeserializeOwned};
9use serde_json::Value;
10
11use crate::{
12    backend::impls::json::{
13        JsonStorage,
14        meta::JsonMapMetadata,
15        util::{TaskKey, TaskWithMeta},
16    },
17    task::{
18        Task,
19        task_id::{RandomId, TaskId},
20    },
21};
22
23impl<Args: Unpin + Serialize + DeserializeOwned> Sink<Task<Value, JsonMapMetadata, RandomId>>
24    for JsonStorage<Args>
25{
26    type Error = SendError;
27
28    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
29        Poll::Ready(Ok(()))
30    }
31
32    fn start_send(
33        self: Pin<&mut Self>,
34        item: Task<Value, JsonMapMetadata, RandomId>,
35    ) -> Result<(), Self::Error> {
36        let this = Pin::get_mut(self);
37
38        this.buffer.push(item);
39        Ok(())
40    }
41
42    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
43        let this = Pin::get_mut(self);
44        let tasks: Vec<_> = this.buffer.drain(..).collect();
45        for task in tasks {
46            use crate::task::task_id::RandomId;
47
48            let task_id = task
49                .parts
50                .task_id
51                .clone()
52                .unwrap_or(TaskId::new(RandomId::default()));
53
54            let key = TaskKey {
55                task_id,
56                queue: std::any::type_name::<Args>().to_owned(),
57                status: crate::task::status::Status::Pending,
58            };
59            this.insert(
60                &key,
61                TaskWithMeta {
62                    args: task.args,
63                    ctx: task.parts.ctx,
64                    result: None,
65                },
66            )
67            .unwrap();
68        }
69        Poll::Ready(Ok(()))
70    }
71
72    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
73        Sink::<Task<Value, JsonMapMetadata, RandomId>>::poll_flush(self, cx)
74    }
75}