1use bytes::Bytes;
4use serde::{Deserialize, Serialize};
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
8#[serde(rename_all = "UPPERCASE")]
9pub enum SchemaType {
10 #[default]
12 #[serde(alias = "avro", alias = "AVRO")]
13 Avro,
14
15 #[serde(alias = "json", alias = "JSON")]
17 Json,
18
19 #[serde(alias = "protobuf", alias = "PROTOBUF")]
21 Protobuf,
22}
23
24impl SchemaType {
25 pub fn as_str(&self) -> &'static str {
26 match self {
27 SchemaType::Avro => "AVRO",
28 SchemaType::Json => "JSON",
29 SchemaType::Protobuf => "PROTOBUF",
30 }
31 }
32}
33
34impl std::fmt::Display for SchemaType {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 write!(f, "{}", self.as_str())
37 }
38}
39
40impl std::str::FromStr for SchemaType {
41 type Err = String;
42
43 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
44 match s.to_uppercase().as_str() {
45 "AVRO" => Ok(SchemaType::Avro),
46 "JSON" | "JSONSCHEMA" | "JSON_SCHEMA" => Ok(SchemaType::Json),
47 "PROTOBUF" | "PROTO" => Ok(SchemaType::Protobuf),
48 _ => Err(format!("Unknown schema type: {}", s)),
49 }
50 }
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
55pub struct MessageData {
56 pub offset: u64,
58 #[serde(with = "crate::serde_utils::option_bytes_serde")]
60 pub key: Option<Bytes>,
61 #[serde(with = "crate::serde_utils::bytes_serde")]
63 pub value: Bytes,
64 pub timestamp: i64,
66}
67
68impl MessageData {
69 pub fn new(offset: u64, value: impl Into<Bytes>, timestamp: i64) -> Self {
71 Self {
72 offset,
73 key: None,
74 value: value.into(),
75 timestamp,
76 }
77 }
78
79 pub fn with_key(mut self, key: impl Into<Bytes>) -> Self {
81 self.key = Some(key.into());
82 self
83 }
84
85 pub fn value_bytes(&self) -> &[u8] {
87 &self.value
88 }
89
90 pub fn key_bytes(&self) -> Option<&[u8]> {
92 self.key.as_ref().map(|k| k.as_ref())
93 }
94
95 pub fn size(&self) -> usize {
97 let key_size = self.key.as_ref().map(|k| k.len()).unwrap_or(0);
98 key_size + self.value.len()
99 }
100}
101
102#[cfg(test)]
103mod tests {
104 use super::*;
105
106 #[test]
107 fn test_message_data() {
108 let msg = MessageData::new(42, b"hello".to_vec(), 1234567890).with_key(b"key1".to_vec());
109
110 assert_eq!(msg.offset, 42);
111 assert_eq!(msg.value_bytes(), b"hello");
112 assert_eq!(msg.key_bytes(), Some(b"key1".as_slice()));
113 assert_eq!(msg.size(), 4 + 5); }
115
116 #[test]
117 fn test_message_data_no_key() {
118 let msg = MessageData::new(0, b"data".to_vec(), 0);
119
120 assert!(msg.key.is_none());
121 assert_eq!(msg.key_bytes(), None);
122 assert_eq!(msg.size(), 4);
123 }
124}