apalis_core/backend/
codec.rs

1//! Utilities for encoding and decoding task arguments and results
2//!
3//! # Overview
4//!
5//! The `Codec` trait allows for converting values
6//! between a type `T` and a more compact or transport-friendly representation.
7//! This is particularly useful for serializing/deserializing, compressing/expanding,
8//! or otherwise encoding/decoding values in a custom format.
9//!
10//! The module includes several implementations of the `Codec` trait, such as `IdentityCodec`
11//! and `NoopCodec`, as well as a JSON codec when the `json` feature is enabled.
12
13use crate::{
14    backend::{Backend, BackendExt},
15    worker::context::WorkerContext,
16};
17/// A trait for converting values between a type `T` and a more compact or
18/// transport-friendly representation for a `Backend`. Examples include json
19/// and bytes.
20///
21/// This is useful when you need to serialize/deserialize, compress/expand,
22/// or otherwise encode/decode values in a custom format.
23///
24/// By default, a backend doesn't care about the specific type implementing [`Codec`]
25/// but rather the [`Codec::Compact`] type. This means if it can accept bytes, you
26/// can use familiar crates such as bincode and rkyv
27///
28/// # Type Parameters
29/// - `T`: The type of value being encoded/decoded.
30pub trait Codec<T> {
31    /// The error type returned if encoding or decoding fails.
32    type Error;
33
34    /// The compact or encoded representation of `T`.
35    ///
36    /// This could be a primitive type, a byte buffer, or any other
37    /// representation that is more efficient to store or transmit.
38    type Compact;
39
40    /// Encode a value of type `T` into its compact representation.
41    ///
42    /// # Errors
43    /// Returns [`Self::Error`] if the value cannot be encoded.
44    fn encode(val: &T) -> Result<Self::Compact, Self::Error>;
45
46    /// Decode a compact representation back into a value of type `T`.
47    ///
48    /// # Errors
49    /// Returns [`Self::Error`] if the compact representation cannot
50    /// be decoded into a valid `T`.
51    fn decode(val: &Self::Compact) -> Result<T, Self::Error>;
52}
53
54/// A codec that performs no transformation, returning the input value as-is.
55#[derive(Debug, Clone, Default)]
56pub struct IdentityCodec;
57
58impl<T> Codec<T> for IdentityCodec
59where
60    T: Clone,
61{
62    type Compact = T;
63    type Error = std::convert::Infallible;
64
65    fn encode(val: &T) -> Result<Self::Compact, Self::Error> {
66        Ok(val.clone())
67    }
68
69    fn decode(val: &Self::Compact) -> Result<T, Self::Error> {
70        Ok(val.clone())
71    }
72}
73
74/// Wrapper that skips decoding and works directly with compact representation.
75///
76/// This is useful for backends that natively handle compact types and do not know the types at compile time.
77/// Examples include backends that work with raw bytes or JSON values like workflows that manipulate dynamic data.
78#[derive(Debug, Clone)]
79pub struct RawDataBackend<B> {
80    inner: B,
81}
82
83impl<B> RawDataBackend<B> {
84    /// Create a new `RawDataBackend` wrapping the given backend.
85    pub fn new(backend: B) -> Self {
86        Self { inner: backend }
87    }
88}
89
90impl<B> Backend for RawDataBackend<B>
91where
92    B: BackendExt,
93{
94    type Args = B::Compact;
95    type IdType = B::IdType;
96    type Context = B::Context;
97    type Error = B::Error;
98    type Stream = B::CompactStream;
99    type Beat = B::Beat;
100    type Layer = B::Layer;
101
102    fn heartbeat(&self, worker: &WorkerContext) -> Self::Beat {
103        self.inner.heartbeat(worker)
104    }
105
106    fn middleware(&self) -> Self::Layer {
107        self.inner.middleware()
108    }
109
110    fn poll(self, worker: &WorkerContext) -> Self::Stream {
111        self.inner.poll_compact(worker)
112    }
113}
114
115/// Encoding for tasks using json
116#[cfg(feature = "json")]
117pub mod json {
118    use std::marker::PhantomData;
119
120    use serde::{Serialize, de::DeserializeOwned};
121    use serde_json::Value;
122
123    use super::Codec;
124
125    /// Json encoding and decoding
126    #[derive(Debug, Clone, Default)]
127    pub struct JsonCodec<Output> {
128        _o: PhantomData<Output>,
129    }
130
131    impl<T: Serialize + DeserializeOwned> Codec<T> for JsonCodec<Vec<u8>> {
132        type Compact = Vec<u8>;
133        type Error = serde_json::Error;
134        fn encode(input: &T) -> Result<Vec<u8>, Self::Error> {
135            serde_json::to_vec(input)
136        }
137
138        fn decode(compact: &Vec<u8>) -> Result<T, Self::Error> {
139            serde_json::from_slice(compact)
140        }
141    }
142
143    impl<T: Serialize + DeserializeOwned> Codec<T> for JsonCodec<String> {
144        type Compact = String;
145        type Error = serde_json::Error;
146        fn encode(input: &T) -> Result<String, Self::Error> {
147            serde_json::to_string(input)
148        }
149        fn decode(compact: &String) -> Result<T, Self::Error> {
150            serde_json::from_str(compact)
151        }
152    }
153
154    impl<T: Serialize + DeserializeOwned> Codec<T> for JsonCodec<Value> {
155        type Compact = Value;
156        type Error = serde_json::Error;
157        fn encode(input: &T) -> Result<Value, Self::Error> {
158            serde_json::to_value(input)
159        }
160
161        fn decode(compact: &Value) -> Result<T, Self::Error> {
162            T::deserialize(compact)
163        }
164    }
165}