apalis_core/backend/impls/json/
sink.rs1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use futures_channel::mpsc::SendError;
7use futures_sink::Sink;
8use serde::Serialize;
9
10use crate::{
11 backend::impls::json::{
12 meta::JsonMapMetadata,
13 util::{TaskKey, TaskWithMeta},
14 JsonStorage,
15 },
16 task::{task_id::TaskId, Task},
17};
18
19impl<Args: Unpin + Serialize> Sink<Task<Args, JsonMapMetadata>> for JsonStorage<Args> {
20 type Error = SendError;
21
22 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
23 Poll::Ready(Ok(()))
24 }
25
26 fn start_send(
27 self: Pin<&mut Self>,
28 item: Task<Args, JsonMapMetadata>,
29 ) -> Result<(), Self::Error> {
30 let this = Pin::get_mut(self);
31
32 this.buffer.push(item);
33 Ok(())
34 }
35
36 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
37 let this = Pin::get_mut(self);
38 let tasks: Vec<_> = this.buffer.drain(..).collect();
39 for task in tasks {
40 use crate::task::task_id::RandomId;
41
42 let task_id = task
43 .parts
44 .task_id
45 .clone()
46 .unwrap_or(TaskId::new(RandomId::default()));
47
48 let key = TaskKey {
49 task_id,
50 namespace: std::any::type_name::<Args>().to_owned(),
51 status: crate::task::status::Status::Pending,
52 };
53 this.insert(
54 &key,
55 TaskWithMeta {
56 args: serde_json::to_value(task.args).unwrap(),
57 ctx: task.parts.ctx,
58 result: None,
59 },
60 )
61 .unwrap();
62 }
63 Poll::Ready(Ok(()))
64 }
65
66 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
67 Sink::<Task<Args, JsonMapMetadata>>::poll_flush(self, cx)
68 }
69}