apalis_core/backend/impls/json/
sink.rs1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use futures_channel::mpsc::SendError;
7use futures_sink::Sink;
8use serde::{Serialize, de::DeserializeOwned};
9use serde_json::Value;
10
11use crate::{
12 backend::impls::json::{
13 JsonStorage,
14 meta::JsonMapMetadata,
15 util::{TaskKey, TaskWithMeta},
16 },
17 task::{Task, task_id::TaskId},
18};
19
20impl<Args: Unpin + Serialize + DeserializeOwned> Sink<Task<Value, JsonMapMetadata>>
21 for JsonStorage<Args>
22{
23 type Error = SendError;
24
25 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
26 Poll::Ready(Ok(()))
27 }
28
29 fn start_send(
30 self: Pin<&mut Self>,
31 item: Task<Value, JsonMapMetadata>,
32 ) -> Result<(), Self::Error> {
33 let this = Pin::get_mut(self);
34
35 this.buffer.push(item);
36 Ok(())
37 }
38
39 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
40 let this = Pin::get_mut(self);
41 let tasks: Vec<_> = this.buffer.drain(..).collect();
42 for task in tasks {
43 use crate::task::task_id::RandomId;
44
45 let task_id = task
46 .parts
47 .task_id
48 .clone()
49 .unwrap_or(TaskId::new(RandomId::default()));
50
51 let key = TaskKey {
52 task_id,
53 queue: std::any::type_name::<Args>().to_owned(),
54 status: crate::task::status::Status::Pending,
55 };
56 this.insert(
57 &key,
58 TaskWithMeta {
59 args: task.args,
60 ctx: task.parts.ctx,
61 result: None,
62 },
63 )
64 .unwrap();
65 }
66 Poll::Ready(Ok(()))
67 }
68
69 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
70 Sink::<Task<Value, JsonMapMetadata>>::poll_flush(self, cx)
71 }
72}