nautilus_event_store/capture/encoder.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3// https://nautechsystems.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Encoders that convert captured bus messages into canonical payload bytes.
17//!
18//! Each registered encoder is responsible for two outputs:
19//!
20//! - The canonical payload bytes, written verbatim into the entry's `payload` field. The
21//! reader pairs the bytes with [`crate::PayloadType`] to dispatch the matching decoder.
22//! - The sidecar [`IndexKey`]s the writer commits in the same backend transaction so
23//! forensics scans by `client_order_id` or `venue_order_id` resolve to a committed
24//! `seq` rather than missing entries the reader can observe before the indices catch
25//! up.
26//!
27//! The trait is type-erased so the registry can lookup by [`std::any::TypeId`]; concrete
28//! encoders are typed via the [`TypedEncoder`] adapter and avoid downcasting at the call
29//! site.
30
31use std::{any::Any, marker::PhantomData};
32
33use bytes::Bytes;
34
35use crate::{backend::IndexKey, entry::PayloadType};
36
37/// Errors returned by an [`Encode`] implementation.
38#[derive(Debug, thiserror::Error)]
39pub enum EncodeError {
40 /// The encoder received a value of an unexpected type.
41 ///
42 /// Surfaces only when the registry is wired incorrectly. The adapter rejects callers
43 /// before invoking the encoder, so this variant exists to keep [`TypedEncoder`]'s
44 /// downcast safe under future refactors.
45 #[error("encoder type mismatch: expected {expected}")]
46 TypeMismatch {
47 /// The Rust type name the encoder was registered for.
48 expected: &'static str,
49 },
50 /// The encoder failed to serialize the message.
51 #[error("encode failure: {0}")]
52 Serialize(String),
53}
54
55/// The canonical payload plus sidecar indices an encoder produces for one captured message.
56///
57/// The encoder does not stamp `seq`, `ts_publish`, or `entry_hash`: those are writer-side
58/// fields. It also does not stamp `headers`; the bus capture adapter carries headers from
59/// the dispatch boundary so encoders stay focused on payload identity.
60///
61/// `payload_type` is `None` for the typical bare-type encoder (e.g. `SubmitOrder`): the
62/// registry's registered tag is used. Envelope encoders that dispatch on a wrapper enum
63/// (e.g. `TradingCommand`, `OrderEventAny`) set it to the inner-variant's canonical tag
64/// so forensics scans see entries identical to the bare-type capture path.
65#[derive(Clone, Debug, PartialEq, Eq)]
66pub struct EncodedPayload {
67 /// The canonical encoded bytes the writer commits as the entry payload.
68 pub payload: Bytes,
69 /// Sidecar index keys produced for this entry. May be empty.
70 pub index_keys: Vec<IndexKey>,
71 /// Optional override for the registry's registered payload type tag. Set by envelope
72 /// encoders to stamp the inner-variant tag on captured entries.
73 pub payload_type: Option<PayloadType>,
74}
75
76impl EncodedPayload {
77 /// Creates a new [`EncodedPayload`] that inherits the registry's registered tag.
78 #[must_use]
79 pub const fn new(payload: Bytes, index_keys: Vec<IndexKey>) -> Self {
80 Self {
81 payload,
82 index_keys,
83 payload_type: None,
84 }
85 }
86
87 /// Creates a new [`EncodedPayload`] with no sidecar indices.
88 #[must_use]
89 pub const fn without_indices(payload: Bytes) -> Self {
90 Self {
91 payload,
92 index_keys: Vec::new(),
93 payload_type: None,
94 }
95 }
96
97 /// Creates a new [`EncodedPayload`] that stamps `payload_type` on the captured entry,
98 /// overriding the registry's registered tag.
99 #[must_use]
100 pub const fn with_payload_type(
101 payload_type: PayloadType,
102 payload: Bytes,
103 index_keys: Vec<IndexKey>,
104 ) -> Self {
105 Self {
106 payload,
107 index_keys,
108 payload_type: Some(payload_type),
109 }
110 }
111}
112
113/// A type-erased encoder used by the registry.
114///
115/// Implementors take the captured message as `&dyn Any` so the registry can dispatch by
116/// [`std::any::TypeId`] without naming the concrete type at the call site. Most callers
117/// build encoders with [`TypedEncoder`] rather than implementing this trait directly.
118pub trait Encode: Send + Sync {
119 /// Encodes the supplied message into canonical payload bytes plus sidecar indices.
120 ///
121 /// # Errors
122 ///
123 /// Returns [`EncodeError::TypeMismatch`] when `message` does not match the encoder's
124 /// expected type, and [`EncodeError::Serialize`] for any encoder-internal failure.
125 fn encode(&self, message: &dyn Any) -> Result<EncodedPayload, EncodeError>;
126}
127
128/// A typed wrapper that adapts a `Fn(&T) -> Result<EncodedPayload, EncodeError>` to
129/// [`Encode`].
130///
131/// Constructed by [`crate::capture::EncoderRegistry::register`]; callers rarely instantiate
132/// directly. The downcast is the only `Any`-handling site in the capture path.
133pub struct TypedEncoder<T: 'static, F> {
134 func: F,
135 _phantom: PhantomData<fn(&T)>,
136}
137
138impl<T: 'static, F> TypedEncoder<T, F>
139where
140 F: Fn(&T) -> Result<EncodedPayload, EncodeError> + Send + Sync,
141{
142 /// Wraps `func` as a typed encoder for `T`.
143 #[must_use]
144 pub const fn new(func: F) -> Self {
145 Self {
146 func,
147 _phantom: PhantomData,
148 }
149 }
150}
151
152impl<T: 'static, F> std::fmt::Debug for TypedEncoder<T, F> {
153 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154 f.debug_struct(stringify!(TypedEncoder))
155 .field("type", &std::any::type_name::<T>())
156 .finish_non_exhaustive()
157 }
158}
159
160impl<T: 'static, F> Encode for TypedEncoder<T, F>
161where
162 F: Fn(&T) -> Result<EncodedPayload, EncodeError> + Send + Sync,
163{
164 fn encode(&self, message: &dyn Any) -> Result<EncodedPayload, EncodeError> {
165 let typed = message
166 .downcast_ref::<T>()
167 .ok_or(EncodeError::TypeMismatch {
168 expected: std::any::type_name::<T>(),
169 })?;
170 (self.func)(typed)
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use bytes::Bytes;
177 use rstest::rstest;
178
179 use super::*;
180 use crate::backend::IndexKind;
181
182 #[derive(Debug)]
183 struct Sample(u8);
184
185 #[derive(Debug)]
186 struct Other;
187
188 fn sample_encoder()
189 -> TypedEncoder<Sample, impl Fn(&Sample) -> Result<EncodedPayload, EncodeError> + Send + Sync>
190 {
191 TypedEncoder::<Sample, _>::new(|s: &Sample| {
192 Ok(EncodedPayload::new(
193 Bytes::copy_from_slice(&[s.0]),
194 vec![IndexKey::new(
195 IndexKind::ClientOrderId,
196 format!("CLI-{}", s.0),
197 )],
198 ))
199 })
200 }
201
202 #[rstest]
203 fn typed_encoder_encodes_matching_value() {
204 let encoder = sample_encoder();
205 let encoded = encoder.encode(&Sample(7)).expect("encode");
206
207 assert_eq!(encoded.payload.as_ref(), &[7]);
208 assert_eq!(encoded.index_keys.len(), 1);
209 assert_eq!(encoded.index_keys[0].kind, IndexKind::ClientOrderId);
210 assert_eq!(encoded.index_keys[0].key, "CLI-7");
211 }
212
213 #[rstest]
214 fn typed_encoder_rejects_other_type() {
215 let encoder = sample_encoder();
216 let err = encoder.encode(&Other).expect_err("type mismatch");
217
218 match err {
219 EncodeError::TypeMismatch { expected } => {
220 assert!(expected.ends_with("Sample"), "expected was: {expected}");
221 }
222 EncodeError::Serialize(_) => panic!("expected TypeMismatch, was Serialize"),
223 }
224 }
225
226 #[rstest]
227 fn encoded_payload_without_indices_has_empty_indices() {
228 let payload = EncodedPayload::without_indices(Bytes::from_static(b"abc"));
229
230 assert_eq!(payload.payload.as_ref(), b"abc");
231 assert!(payload.index_keys.is_empty());
232 }
233}