apalis_core/backend/codec.rs
1//! Utilities for encoding and decoding task arguments and results
2//!
3//! # Overview
4//!
5//! The `Codec` trait allows for converting values
6//! between a type `T` and a more compact or transport-friendly representation.
7//! This is particularly useful for serializing/deserializing, compressing/expanding,
8//! or otherwise encoding/decoding values in a custom format.
9//!
10//! The module includes several implementations of the `Codec` trait, such as `IdentityCodec`
11//! and `NoopCodec`, as well as a JSON codec when the `json` feature is enabled.
12
13use crate::{
14 backend::{Backend, BackendExt},
15 worker::context::WorkerContext,
16};
17/// A trait for converting values between a type `T` and a more compact or
18/// transport-friendly representation for a `Backend`. Examples include json
19/// and bytes.
20///
21/// This is useful when you need to serialize/deserialize, compress/expand,
22/// or otherwise encode/decode values in a custom format.
23///
24/// By default, a backend doesn't care about the specific type implementing [`Codec`]
25/// but rather the [`Codec::Compact`] type. This means if it can accept bytes, you
26/// can use familiar crates such as bincode and rkyv
27///
28/// # Type Parameters
29/// - `T`: The type of value being encoded/decoded.
30pub trait Codec<T> {
31 /// The error type returned if encoding or decoding fails.
32 type Error;
33
34 /// The compact or encoded representation of `T`.
35 ///
36 /// This could be a primitive type, a byte buffer, or any other
37 /// representation that is more efficient to store or transmit.
38 type Compact;
39
40 /// Encode a value of type `T` into its compact representation.
41 ///
42 /// # Errors
43 /// Returns [`Self::Error`] if the value cannot be encoded.
44 fn encode(val: &T) -> Result<Self::Compact, Self::Error>;
45
46 /// Decode a compact representation back into a value of type `T`.
47 ///
48 /// # Errors
49 /// Returns [`Self::Error`] if the compact representation cannot
50 /// be decoded into a valid `T`.
51 fn decode(val: &Self::Compact) -> Result<T, Self::Error>;
52}
53
54/// A codec that performs no transformation, returning the input value as-is.
55#[derive(Debug, Clone, Default)]
56pub struct IdentityCodec;
57
58impl<T> Codec<T> for IdentityCodec
59where
60 T: Clone,
61{
62 type Compact = T;
63 type Error = std::convert::Infallible;
64
65 fn encode(val: &T) -> Result<Self::Compact, Self::Error> {
66 Ok(val.clone())
67 }
68
69 fn decode(val: &Self::Compact) -> Result<T, Self::Error> {
70 Ok(val.clone())
71 }
72}
73
74/// Wrapper that skips decoding and works directly with compact representation.
75///
76/// This is useful for backends that natively handle compact types and do not know the types at compile time.
77/// Examples include backends that work with raw bytes or JSON values like workflows that manipulate dynamic data.
78#[derive(Debug, Clone)]
79pub struct RawDataBackend<B> {
80 inner: B,
81}
82
83impl<B> RawDataBackend<B> {
84 /// Create a new `RawDataBackend` wrapping the given backend.
85 pub fn new(backend: B) -> Self {
86 Self { inner: backend }
87 }
88}
89
90impl<B> Backend for RawDataBackend<B>
91where
92 B: BackendExt,
93{
94 type Args = B::Compact;
95 type IdType = B::IdType;
96 type Context = B::Context;
97 type Error = B::Error;
98 type Stream = B::CompactStream;
99 type Beat = B::Beat;
100 type Layer = B::Layer;
101
102 fn heartbeat(&self, worker: &WorkerContext) -> Self::Beat {
103 self.inner.heartbeat(worker)
104 }
105
106 fn middleware(&self) -> Self::Layer {
107 self.inner.middleware()
108 }
109
110 fn poll(self, worker: &WorkerContext) -> Self::Stream {
111 self.inner.poll_compact(worker)
112 }
113}
114
115/// Encoding for tasks using json
116#[cfg(feature = "json")]
117pub mod json {
118 use std::marker::PhantomData;
119
120 use serde::{Serialize, de::DeserializeOwned};
121 use serde_json::Value;
122
123 use super::Codec;
124
125 /// Json encoding and decoding
126 #[derive(Debug, Clone, Default)]
127 pub struct JsonCodec<Output> {
128 _o: PhantomData<Output>,
129 }
130
131 impl<T: Serialize + DeserializeOwned> Codec<T> for JsonCodec<Vec<u8>> {
132 type Compact = Vec<u8>;
133 type Error = serde_json::Error;
134 fn encode(input: &T) -> Result<Vec<u8>, Self::Error> {
135 serde_json::to_vec(input)
136 }
137
138 fn decode(compact: &Vec<u8>) -> Result<T, Self::Error> {
139 serde_json::from_slice(compact)
140 }
141 }
142
143 impl<T: Serialize + DeserializeOwned> Codec<T> for JsonCodec<String> {
144 type Compact = String;
145 type Error = serde_json::Error;
146 fn encode(input: &T) -> Result<String, Self::Error> {
147 serde_json::to_string(input)
148 }
149 fn decode(compact: &String) -> Result<T, Self::Error> {
150 serde_json::from_str(compact)
151 }
152 }
153
154 impl<T: Serialize + DeserializeOwned> Codec<T> for JsonCodec<Value> {
155 type Compact = Value;
156 type Error = serde_json::Error;
157 fn encode(input: &T) -> Result<Value, Self::Error> {
158 serde_json::to_value(input)
159 }
160
161 fn decode(compact: &Value) -> Result<T, Self::Error> {
162 T::deserialize(compact)
163 }
164 }
165}