#[cfg(feature = "avro")]
pub mod avro;
#[cfg(feature = "cloudevents")]
pub mod cloudevents;
pub mod json;
#[cfg(feature = "protobuf")]
pub mod protobuf;
#[cfg(feature = "schemreg")]
pub mod schema_registry;
#[cfg(feature = "avro")]
pub use avro::AvroEncoder;
#[cfg(feature = "cloudevents")]
pub use cloudevents::CloudEventsEncoder;
pub use json::{JsonCodec, JsonEncoder, JsonPrettyEncoder};
#[cfg(feature = "protobuf")]
pub use protobuf::ProtobufEncoder;
#[cfg(feature = "schemreg")]
pub use schema_registry::{
decode_wire_format, encode_wire_format, CachedSchemaRegistry, CompatibilityLevel,
ConfluentAvroCodec, ConfluentAvroDecoder, ConfluentAvroEncoder, ConfluentJsonSchemaCodec,
ConfluentJsonSchemaDecoder, ConfluentJsonSchemaEncoder, ConfluentSchemaRegistry, EncodeTarget,
SchemaId, SchemaRegistryAuth, SchemaRegistryClient, SchemaRegistryConfig, SchemaType,
SubjectNameStrategy, EVENT_JSON_SCHEMA, KEY_AVRO_SCHEMA, KEY_JSON_SCHEMA,
};
use crate::core::{Event, Result};
#[derive(Debug, Clone)]
pub struct EncodedOutput {
pub bytes: Vec<u8>,
pub content_type: &'static str,
}
impl EncodedOutput {
pub fn new(bytes: Vec<u8>, content_type: &'static str) -> Self {
Self {
bytes,
content_type,
}
}
}
pub trait EventEncoder: Send + Sync {
fn encode(&self, event: &Event) -> Result<EncodedOutput>;
fn content_type(&self) -> &'static str;
fn encode_key(&self, event: &Event) -> Option<Vec<u8>> {
let value = event.primary_key_values()?;
serde_json::to_vec(&value).ok()
}
}
#[derive(Debug, Clone)]
pub struct CodecOutput {
pub key: Option<Vec<u8>>,
pub value: Vec<u8>,
pub content_type: &'static str,
}
impl CodecOutput {
pub fn new(key: Option<Vec<u8>>, value: Vec<u8>, content_type: &'static str) -> Self {
Self {
key,
value,
content_type,
}
}
}
pub trait Codec: Send + Sync {
fn encode(&self, event: &Event) -> Result<CodecOutput>;
fn content_type(&self) -> &'static str;
fn boxed(self) -> BoxedCodec
where
Self: Sized + 'static,
{
BoxedCodec::new(self)
}
}
#[derive(Debug, Clone, Default)]
pub struct EncoderCodec<E> {
encoder: E,
}
impl<E: EventEncoder> EncoderCodec<E> {
pub fn new(encoder: E) -> Self {
Self { encoder }
}
pub fn encoder(&self) -> &E {
&self.encoder
}
pub fn into_encoder(self) -> E {
self.encoder
}
}
impl<E: EventEncoder> Codec for EncoderCodec<E> {
fn encode(&self, event: &Event) -> Result<CodecOutput> {
let key = self.encoder.encode_key(event);
let value_output = self.encoder.encode(event)?;
Ok(CodecOutput::new(
key,
value_output.bytes,
value_output.content_type,
))
}
fn content_type(&self) -> &'static str {
self.encoder.content_type()
}
}
pub struct BoxedCodec(Box<dyn Codec>);
impl BoxedCodec {
pub fn new<C: Codec + 'static>(codec: C) -> Self {
Self(Box::new(codec))
}
}
impl Codec for BoxedCodec {
fn encode(&self, event: &Event) -> Result<CodecOutput> {
self.0.encode(event)
}
fn content_type(&self) -> &'static str {
self.0.content_type()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::codec::json::JsonEncoder;
use crate::core::{Event, Operation, SourceMetadata, EVENT_ENVELOPE_VERSION};
fn sample_event() -> Event {
Event {
before: None,
after: Some(serde_json::json!({"id": 1})),
op: Operation::Insert,
source: SourceMetadata {
source_name: "test".into(),
offset: "0".into(),
timestamp: 1,
},
ts: 1,
schema: None,
table: "t".into(),
primary_key: None,
snapshot: None,
transaction: None,
envelope_version: EVENT_ENVELOPE_VERSION,
before_is_key_only: false,
}
}
#[test]
fn json_encoder_content_type_matches_output() {
let enc = JsonEncoder;
let out = enc.encode(&sample_event()).unwrap();
assert_eq!(out.content_type, enc.content_type());
}
#[test]
fn encoded_output_fields_accessible() {
let out = EncodedOutput::new(b"hello".to_vec(), "text/plain");
assert_eq!(out.content_type, "text/plain");
assert_eq!(out.bytes, b"hello");
}
#[test]
fn encoder_codec_no_primary_key_gives_no_key() {
let codec = EncoderCodec::new(JsonEncoder);
let event = sample_event(); let out = codec.encode(&event).unwrap();
assert!(out.key.is_none());
assert!(!out.value.is_empty());
assert_eq!(out.content_type, "application/json");
}
#[test]
fn encoder_codec_with_primary_key_encodes_key() {
let codec = EncoderCodec::new(JsonEncoder);
let mut event = sample_event();
event.primary_key = Some(vec!["id".into()]);
event.after = Some(serde_json::json!({"id": 7, "name": "bob"}));
let out = codec.encode(&event).unwrap();
let key = out.key.expect("key should be present");
let parsed: serde_json::Value = serde_json::from_slice(&key).unwrap();
assert_eq!(parsed["id"], 7);
}
#[test]
fn encoder_codec_content_type_matches_encoder() {
let codec = EncoderCodec::new(JsonEncoder);
let event = sample_event();
let out = codec.encode(&event).unwrap();
assert_eq!(out.content_type, codec.content_type());
}
#[test]
fn codec_output_constructor() {
let o = CodecOutput::new(Some(b"k".to_vec()), b"v".to_vec(), "text/plain");
assert_eq!(o.key.unwrap(), b"k");
assert_eq!(o.value, b"v");
assert_eq!(o.content_type, "text/plain");
}
#[test]
fn json_codec_default_works() {
use crate::codec::json::JsonCodec;
let codec = JsonCodec::default();
let mut event = sample_event();
event.primary_key = Some(vec!["id".into()]);
event.after = Some(serde_json::json!({"id": 1}));
let out = codec.encode(&event).unwrap();
assert!(out.key.is_some());
assert_eq!(out.content_type, "application/json");
}
#[test]
fn boxed_codec_erases_type_and_encodes() {
use crate::codec::json::JsonCodec;
let codec: BoxedCodec = JsonCodec::default().boxed();
let mut event = sample_event();
event.primary_key = Some(vec!["id".into()]);
event.after = Some(serde_json::json!({"id": 42}));
let out = codec.encode(&event).unwrap();
assert_eq!(out.content_type, "application/json");
assert!(out.key.is_some());
}
#[test]
fn boxed_codec_new_works() {
use crate::codec::json::JsonCodec;
let codec = BoxedCodec::new(JsonCodec::default());
let event = sample_event();
let out = codec.encode(&event).unwrap();
assert!(!out.value.is_empty());
}
}