1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
extern crate rdkafka_sys as rdkafka;
use self::rdkafka::types::*;
use std::slice;
use std::str;
#[derive(Debug)]
pub struct Message {
ptr: *mut RDKafkaMessage,
}
unsafe impl Send for Message {}
impl<'a> Message {
pub fn new(ptr: *mut RDKafkaMessage) -> Message {
Message { ptr: ptr }
}
pub fn ptr(&self) -> *mut RDKafkaMessage {
self.ptr
}
pub fn key_len(&self) -> usize {
unsafe { (*self.ptr).key_len }
}
pub fn payload_len(&self) -> usize {
unsafe { (*self.ptr).len }
}
pub fn key(&'a self) -> Option<&'a [u8]> {
unsafe {
if (*self.ptr).key.is_null() {
None
} else {
Some(slice::from_raw_parts::<u8>((*self.ptr).key as *const u8, (*self.ptr).key_len))
}
}
}
pub fn payload(&'a self) -> Option<&'a [u8]> {
unsafe {
if (*self.ptr).payload.is_null() {
None
} else {
Some(slice::from_raw_parts::<u8>((*self.ptr).payload as *const u8, (*self.ptr).len))
}
}
}
pub fn payload_view<P: ?Sized + FromBytes>(&'a self) -> Option<Result<&'a P, P::Error>> {
self.payload().map(P::from_bytes)
}
pub fn key_view<K: ?Sized + FromBytes>(&'a self) -> Option<Result<&'a K, K::Error>> {
self.key().map(K::from_bytes)
}
pub fn partition(&self) -> i32 {
unsafe { (*self.ptr).partition }
}
pub fn offset(&self) -> i64 {
unsafe { (*self.ptr).offset }
}
}
impl Drop for Message {
fn drop(&mut self) {
trace!("Destroying message {:?}", self);
unsafe { rdkafka::rd_kafka_message_destroy(self.ptr) };
}
}
pub trait FromBytes {
type Error;
fn from_bytes(&[u8]) -> Result<&Self, Self::Error>;
}
impl FromBytes for [u8] {
type Error = ();
fn from_bytes(bytes: &[u8]) -> Result<&Self, Self::Error> {
Ok(bytes)
}
}
impl FromBytes for str {
type Error = str::Utf8Error;
fn from_bytes(bytes: &[u8]) -> Result<&Self, Self::Error> {
str::from_utf8(bytes)
}
}
pub trait ToBytes {
fn to_bytes(&self) -> &[u8];
}
impl<'a> ToBytes for &'a [u8] {
fn to_bytes(&self) -> &[u8] {
self
}
}
impl ToBytes for Vec<u8> {
fn to_bytes(&self) -> &[u8] {
self.as_slice()
}
}
impl<'a> ToBytes for &'a str {
fn to_bytes(&self) -> &[u8] {
self.as_bytes()
}
}
impl ToBytes for String {
fn to_bytes(&self) -> &[u8] {
self.as_bytes()
}
}
impl ToBytes for () {
fn to_bytes(&self) -> &[u8] {
&[]
}
}