flowly_kafka/
message.rs

1use bytes::Bytes;
2use chrono::{DateTime, Utc};
3
4pub trait KafkaMessage {
5    type Key: AsRef<[u8]>;
6    type Value;
7
8    fn key(&self) -> Option<Self::Key>;
9    fn value(&self) -> Option<&Self::Value>;
10    fn ts_ms_utc(&self) -> Option<i64>;
11    fn into_value(self) -> Option<Self::Value>;
12}
13
14#[derive(Debug, Clone, PartialEq, Default)]
15pub struct Message<M> {
16    pub key: Option<Bytes>,
17    pub ts_ms_utc: Option<i64>,
18    pub payload: Option<M>,
19    pub partition: i32,
20}
21
22impl<M> Message<M> {
23    pub fn timestamp(&self) -> Option<DateTime<Utc>> {
24        self.ts_ms_utc.and_then(DateTime::from_timestamp_millis)
25    }
26}
27
28impl<M> KafkaMessage for Message<M> {
29    type Key = Bytes;
30    type Value = M;
31
32    #[inline]
33    fn key(&self) -> Option<Self::Key> {
34        self.key.clone()
35    }
36
37    #[inline]
38    fn value(&self) -> Option<&Self::Value> {
39        self.payload.as_ref()
40    }
41
42    #[inline]
43    fn ts_ms_utc(&self) -> Option<i64> {
44        self.ts_ms_utc
45    }
46
47    #[inline]
48    fn into_value(self) -> Option<Self::Value> {
49        self.payload
50    }
51}