apalis_core/backend/impls/json/
sink.rs1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use futures_channel::mpsc::SendError;
7use futures_sink::Sink;
8use serde::Serialize;
9use serde_json::Value;
10
11use crate::{
12 backend::impls::json::{
13 JsonStorage,
14 meta::JsonMapMetadata,
15 util::{TaskKey, TaskWithMeta},
16 },
17 task::{Task, task_id::TaskId},
18};
19
20impl<Args: Unpin + Serialize> Sink<Task<Value, JsonMapMetadata>> for JsonStorage<Args> {
21 type Error = SendError;
22
23 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
24 Poll::Ready(Ok(()))
25 }
26
27 fn start_send(
28 self: Pin<&mut Self>,
29 item: Task<Value, JsonMapMetadata>,
30 ) -> Result<(), Self::Error> {
31 let this = Pin::get_mut(self);
32
33 this.buffer.push(item);
34 Ok(())
35 }
36
37 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
38 let this = Pin::get_mut(self);
39 let tasks: Vec<_> = this.buffer.drain(..).collect();
40 for task in tasks {
41 use crate::task::task_id::RandomId;
42
43 let task_id = task
44 .parts
45 .task_id
46 .clone()
47 .unwrap_or(TaskId::new(RandomId::default()));
48
49 let key = TaskKey {
50 task_id,
51 queue: std::any::type_name::<Args>().to_owned(),
52 status: crate::task::status::Status::Pending,
53 };
54 this.insert(
55 &key,
56 TaskWithMeta {
57 args: task.args,
58 ctx: task.parts.ctx,
59 result: None,
60 },
61 )
62 .unwrap();
63 }
64 Poll::Ready(Ok(()))
65 }
66
67 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
68 Sink::<Task<Value, JsonMapMetadata>>::poll_flush(self, cx)
69 }
70}