Skip to main content

nautilus_event_store/capture/
encoder.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Encoders that convert captured bus messages into canonical payload bytes.
17//!
18//! Each registered encoder is responsible for two outputs:
19//!
20//! - The canonical payload bytes, written verbatim into the entry's `payload` field. The
21//!   reader pairs the bytes with [`crate::PayloadType`] to dispatch the matching decoder.
22//! - The sidecar [`IndexKey`]s the writer commits in the same backend transaction so
23//!   forensics scans by `client_order_id` or `venue_order_id` resolve to a committed
24//!   `seq` rather than missing entries the reader can observe before the indices catch
25//!   up.
26//!
27//! The trait is type-erased so the registry can lookup by [`std::any::TypeId`]; concrete
28//! encoders are typed via the [`TypedEncoder`] adapter and avoid downcasting at the call
29//! site.
30
31use std::{any::Any, marker::PhantomData};
32
33use bytes::Bytes;
34
35use crate::{backend::IndexKey, entry::PayloadType};
36
37/// Errors returned by an [`Encode`] implementation.
38#[derive(Debug, thiserror::Error)]
39pub enum EncodeError {
40    /// The encoder received a value of an unexpected type.
41    ///
42    /// Surfaces only when the registry is wired incorrectly. The adapter rejects callers
43    /// before invoking the encoder, so this variant exists to keep [`TypedEncoder`]'s
44    /// downcast safe under future refactors.
45    #[error("encoder type mismatch: expected {expected}")]
46    TypeMismatch {
47        /// The Rust type name the encoder was registered for.
48        expected: &'static str,
49    },
50    /// The encoder failed to serialize the message.
51    #[error("encode failure: {0}")]
52    Serialize(String),
53}
54
55/// The canonical payload plus sidecar indices an encoder produces for one captured message.
56///
57/// The encoder does not stamp `seq`, `ts_publish`, or `entry_hash`: those are writer-side
58/// fields. It also does not stamp `headers`; the bus capture adapter carries headers from
59/// the dispatch boundary so encoders stay focused on payload identity.
60///
61/// `payload_type` is `None` for the typical bare-type encoder (e.g. `SubmitOrder`): the
62/// registry's registered tag is used. Envelope encoders that dispatch on a wrapper enum
63/// (e.g. `TradingCommand`, `OrderEventAny`) set it to the inner-variant's canonical tag
64/// so forensics scans see entries identical to the bare-type capture path.
65#[derive(Clone, Debug, PartialEq, Eq)]
66pub struct EncodedPayload {
67    /// The canonical encoded bytes the writer commits as the entry payload.
68    pub payload: Bytes,
69    /// Sidecar index keys produced for this entry. May be empty.
70    pub index_keys: Vec<IndexKey>,
71    /// Optional override for the registry's registered payload type tag. Set by envelope
72    /// encoders to stamp the inner-variant tag on captured entries.
73    pub payload_type: Option<PayloadType>,
74}
75
76impl EncodedPayload {
77    /// Creates a new [`EncodedPayload`] that inherits the registry's registered tag.
78    #[must_use]
79    pub const fn new(payload: Bytes, index_keys: Vec<IndexKey>) -> Self {
80        Self {
81            payload,
82            index_keys,
83            payload_type: None,
84        }
85    }
86
87    /// Creates a new [`EncodedPayload`] with no sidecar indices.
88    #[must_use]
89    pub const fn without_indices(payload: Bytes) -> Self {
90        Self {
91            payload,
92            index_keys: Vec::new(),
93            payload_type: None,
94        }
95    }
96
97    /// Creates a new [`EncodedPayload`] that stamps `payload_type` on the captured entry,
98    /// overriding the registry's registered tag.
99    #[must_use]
100    pub const fn with_payload_type(
101        payload_type: PayloadType,
102        payload: Bytes,
103        index_keys: Vec<IndexKey>,
104    ) -> Self {
105        Self {
106            payload,
107            index_keys,
108            payload_type: Some(payload_type),
109        }
110    }
111}
112
113/// A type-erased encoder used by the registry.
114///
115/// Implementors take the captured message as `&dyn Any` so the registry can dispatch by
116/// [`std::any::TypeId`] without naming the concrete type at the call site. Most callers
117/// build encoders with [`TypedEncoder`] rather than implementing this trait directly.
118pub trait Encode: Send + Sync {
119    /// Encodes the supplied message into canonical payload bytes plus sidecar indices.
120    ///
121    /// # Errors
122    ///
123    /// Returns [`EncodeError::TypeMismatch`] when `message` does not match the encoder's
124    /// expected type, and [`EncodeError::Serialize`] for any encoder-internal failure.
125    fn encode(&self, message: &dyn Any) -> Result<EncodedPayload, EncodeError>;
126}
127
128/// A typed wrapper that adapts a `Fn(&T) -> Result<EncodedPayload, EncodeError>` to
129/// [`Encode`].
130///
131/// Constructed by [`crate::capture::EncoderRegistry::register`]; callers rarely instantiate
132/// directly. The downcast is the only `Any`-handling site in the capture path.
133pub struct TypedEncoder<T: 'static, F> {
134    func: F,
135    _phantom: PhantomData<fn(&T)>,
136}
137
138impl<T: 'static, F> TypedEncoder<T, F>
139where
140    F: Fn(&T) -> Result<EncodedPayload, EncodeError> + Send + Sync,
141{
142    /// Wraps `func` as a typed encoder for `T`.
143    #[must_use]
144    pub const fn new(func: F) -> Self {
145        Self {
146            func,
147            _phantom: PhantomData,
148        }
149    }
150}
151
152impl<T: 'static, F> std::fmt::Debug for TypedEncoder<T, F> {
153    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154        f.debug_struct(stringify!(TypedEncoder))
155            .field("type", &std::any::type_name::<T>())
156            .finish_non_exhaustive()
157    }
158}
159
160impl<T: 'static, F> Encode for TypedEncoder<T, F>
161where
162    F: Fn(&T) -> Result<EncodedPayload, EncodeError> + Send + Sync,
163{
164    fn encode(&self, message: &dyn Any) -> Result<EncodedPayload, EncodeError> {
165        let typed = message
166            .downcast_ref::<T>()
167            .ok_or(EncodeError::TypeMismatch {
168                expected: std::any::type_name::<T>(),
169            })?;
170        (self.func)(typed)
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use bytes::Bytes;
177    use rstest::rstest;
178
179    use super::*;
180    use crate::backend::IndexKind;
181
182    #[derive(Debug)]
183    struct Sample(u8);
184
185    #[derive(Debug)]
186    struct Other;
187
188    fn sample_encoder()
189    -> TypedEncoder<Sample, impl Fn(&Sample) -> Result<EncodedPayload, EncodeError> + Send + Sync>
190    {
191        TypedEncoder::<Sample, _>::new(|s: &Sample| {
192            Ok(EncodedPayload::new(
193                Bytes::copy_from_slice(&[s.0]),
194                vec![IndexKey::new(
195                    IndexKind::ClientOrderId,
196                    format!("CLI-{}", s.0),
197                )],
198            ))
199        })
200    }
201
202    #[rstest]
203    fn typed_encoder_encodes_matching_value() {
204        let encoder = sample_encoder();
205        let encoded = encoder.encode(&Sample(7)).expect("encode");
206
207        assert_eq!(encoded.payload.as_ref(), &[7]);
208        assert_eq!(encoded.index_keys.len(), 1);
209        assert_eq!(encoded.index_keys[0].kind, IndexKind::ClientOrderId);
210        assert_eq!(encoded.index_keys[0].key, "CLI-7");
211    }
212
213    #[rstest]
214    fn typed_encoder_rejects_other_type() {
215        let encoder = sample_encoder();
216        let err = encoder.encode(&Other).expect_err("type mismatch");
217
218        match err {
219            EncodeError::TypeMismatch { expected } => {
220                assert!(expected.ends_with("Sample"), "expected was: {expected}");
221            }
222            EncodeError::Serialize(_) => panic!("expected TypeMismatch, was Serialize"),
223        }
224    }
225
226    #[rstest]
227    fn encoded_payload_without_indices_has_empty_indices() {
228        let payload = EncodedPayload::without_indices(Bytes::from_static(b"abc"));
229
230        assert_eq!(payload.payload.as_ref(), b"abc");
231        assert!(payload.index_keys.is_empty());
232    }
233}