apalis_core/backend/impls/json/
sink.rs1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use futures_channel::mpsc::SendError;
7use futures_sink::Sink;
8use serde::{Serialize, de::DeserializeOwned};
9use serde_json::Value;
10
11use crate::{
12 backend::impls::json::{
13 JsonStorage,
14 meta::JsonMapMetadata,
15 util::{TaskKey, TaskWithMeta},
16 },
17 task::{
18 Task,
19 task_id::{RandomId, TaskId},
20 },
21};
22
23impl<Args: Unpin + Serialize + DeserializeOwned> Sink<Task<Value, JsonMapMetadata, RandomId>>
24 for JsonStorage<Args>
25{
26 type Error = SendError;
27
28 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
29 Poll::Ready(Ok(()))
30 }
31
32 fn start_send(
33 self: Pin<&mut Self>,
34 item: Task<Value, JsonMapMetadata, RandomId>,
35 ) -> Result<(), Self::Error> {
36 let this = Pin::get_mut(self);
37
38 this.buffer.push(item);
39 Ok(())
40 }
41
42 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
43 let this = Pin::get_mut(self);
44 let tasks: Vec<_> = this.buffer.drain(..).collect();
45 for task in tasks {
46 use crate::task::task_id::RandomId;
47
48 let task_id = task
49 .parts
50 .task_id
51 .clone()
52 .unwrap_or(TaskId::new(RandomId::default()));
53
54 let key = TaskKey {
55 task_id,
56 queue: std::any::type_name::<Args>().to_owned(),
57 status: crate::task::status::Status::Pending,
58 };
59 this.insert(
60 &key,
61 TaskWithMeta {
62 args: task.args,
63 ctx: task.parts.ctx,
64 result: None,
65 },
66 )
67 .unwrap();
68 }
69 Poll::Ready(Ok(()))
70 }
71
72 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
73 Sink::<Task<Value, JsonMapMetadata, RandomId>>::poll_flush(self, cx)
74 }
75}