1use bytes::Bytes;
4use serde::{Deserialize, Serialize};
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
8#[serde(rename_all = "UPPERCASE")]
9pub enum SchemaType {
10 #[default]
12 #[serde(alias = "avro", alias = "AVRO")]
13 Avro,
14
15 #[serde(alias = "json", alias = "JSON")]
17 Json,
18
19 #[serde(alias = "protobuf", alias = "PROTOBUF")]
21 Protobuf,
22}
23
24impl SchemaType {
25 pub fn as_str(&self) -> &'static str {
26 match self {
27 SchemaType::Avro => "AVRO",
28 SchemaType::Json => "JSON",
29 SchemaType::Protobuf => "PROTOBUF",
30 }
31 }
32}
33
34impl std::fmt::Display for SchemaType {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 write!(f, "{}", self.as_str())
37 }
38}
39
40impl std::str::FromStr for SchemaType {
41 type Err = String;
42
43 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
44 match s.to_uppercase().as_str() {
45 "AVRO" => Ok(SchemaType::Avro),
46 "JSON" | "JSONSCHEMA" | "JSON_SCHEMA" => Ok(SchemaType::Json),
47 "PROTOBUF" | "PROTO" => Ok(SchemaType::Protobuf),
48 _ => Err(format!("Unknown schema type: {}", s)),
49 }
50 }
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
55pub struct MessageData {
56 pub offset: u64,
58 #[serde(default)]
60 pub partition: u32,
61 #[serde(with = "crate::serde_utils::option_bytes_serde")]
63 pub key: Option<Bytes>,
64 #[serde(with = "crate::serde_utils::bytes_serde")]
66 pub value: Bytes,
67 pub timestamp: i64,
69 #[serde(default)]
71 pub headers: Vec<(String, Vec<u8>)>,
72}
73
74impl MessageData {
75 pub fn new(offset: u64, value: impl Into<Bytes>, timestamp: i64) -> Self {
77 Self {
78 offset,
79 partition: 0,
80 key: None,
81 value: value.into(),
82 timestamp,
83 headers: Vec::new(),
84 }
85 }
86
87 pub fn with_key(mut self, key: impl Into<Bytes>) -> Self {
89 self.key = Some(key.into());
90 self
91 }
92
93 pub fn with_partition(mut self, partition: u32) -> Self {
95 self.partition = partition;
96 self
97 }
98
99 pub fn with_header(mut self, key: impl Into<String>, value: impl Into<Vec<u8>>) -> Self {
101 self.headers.push((key.into(), value.into()));
102 self
103 }
104
105 pub fn value_bytes(&self) -> &[u8] {
107 &self.value
108 }
109
110 pub fn key_bytes(&self) -> Option<&[u8]> {
112 self.key.as_ref().map(|k| k.as_ref())
113 }
114
115 pub fn size(&self) -> usize {
117 let key_size = self.key.as_ref().map(|k| k.len()).unwrap_or(0);
118 let header_size: usize = self.headers.iter().map(|(k, v)| k.len() + v.len()).sum();
119 key_size + self.value.len() + header_size
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use super::*;
126
127 #[test]
128 fn test_message_data() {
129 let msg = MessageData::new(42, b"hello".to_vec(), 1234567890).with_key(b"key1".to_vec());
130
131 assert_eq!(msg.offset, 42);
132 assert_eq!(msg.value_bytes(), b"hello");
133 assert_eq!(msg.key_bytes(), Some(b"key1".as_slice()));
134 assert_eq!(msg.size(), 4 + 5); }
136
137 #[test]
138 fn test_message_data_no_key() {
139 let msg = MessageData::new(0, b"data".to_vec(), 0);
140
141 assert!(msg.key.is_none());
142 assert_eq!(msg.key_bytes(), None);
143 assert_eq!(msg.size(), 4);
144 }
145}