apalis_core/backend/impls/json/
sink.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use futures_channel::mpsc::SendError;
7use futures_sink::Sink;
8use serde::Serialize;
9
10use crate::{
11    backend::impls::json::{
12        meta::JsonMapMetadata,
13        util::{TaskKey, TaskWithMeta},
14        JsonStorage,
15    },
16    task::{task_id::TaskId, Task},
17};
18
19impl<Args: Unpin + Serialize> Sink<Task<Args, JsonMapMetadata>> for JsonStorage<Args> {
20    type Error = SendError;
21
22    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
23        Poll::Ready(Ok(()))
24    }
25
26    fn start_send(
27        self: Pin<&mut Self>,
28        item: Task<Args, JsonMapMetadata>,
29    ) -> Result<(), Self::Error> {
30        let this = Pin::get_mut(self);
31
32        this.buffer.push(item);
33        Ok(())
34    }
35
36    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
37        let this = Pin::get_mut(self);
38        let tasks: Vec<_> = this.buffer.drain(..).collect();
39        for task in tasks {
40            use crate::task::task_id::RandomId;
41
42            let task_id = task
43                .parts
44                .task_id
45                .clone()
46                .unwrap_or(TaskId::new(RandomId::default()));
47
48            let key = TaskKey {
49                task_id,
50                namespace: std::any::type_name::<Args>().to_owned(),
51                status: crate::task::status::Status::Pending,
52            };
53            this.insert(
54                &key,
55                TaskWithMeta {
56                    args: serde_json::to_value(task.args).unwrap(),
57                    ctx: task.parts.ctx,
58                    result: None,
59                },
60            )
61            .unwrap();
62        }
63        Poll::Ready(Ok(()))
64    }
65
66    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
67        Sink::<Task<Args, JsonMapMetadata>>::poll_flush(self, cx)
68    }
69}