1use bytes::{Bytes, BytesMut};
43use std::alloc::{alloc, dealloc, Layout};
44use std::ops::Deref;
45use std::ptr::NonNull;
46use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
47use std::sync::Arc;
48
49const CACHE_LINE_SIZE: usize = 64;
51
52const DEFAULT_BUFFER_SIZE: usize = 256 * 1024;
54
55#[derive(Debug)]
60pub struct ZeroCopyBuffer {
61 data: NonNull<u8>,
63 capacity: usize,
65 write_pos: AtomicUsize,
67 id: u64,
69 layout: Layout,
71}
72
73unsafe impl Send for ZeroCopyBuffer {}
75unsafe impl Sync for ZeroCopyBuffer {}
76
77impl ZeroCopyBuffer {
78 pub fn new(capacity: usize) -> Self {
80 Self::with_id(capacity, 0)
81 }
82
83 pub fn with_id(capacity: usize, id: u64) -> Self {
85 let aligned_capacity = (capacity + CACHE_LINE_SIZE - 1) & !(CACHE_LINE_SIZE - 1);
87 let layout =
88 Layout::from_size_align(aligned_capacity, CACHE_LINE_SIZE).expect("Invalid layout");
89
90 let data = unsafe {
92 let ptr = alloc(layout);
93 if ptr.is_null() {
94 std::alloc::handle_alloc_error(layout);
95 }
96 NonNull::new_unchecked(ptr)
97 };
98
99 Self {
100 data,
101 capacity: aligned_capacity,
102 write_pos: AtomicUsize::new(0),
103 id,
104 layout,
105 }
106 }
107
108 pub fn reserve(self: &Arc<Self>, len: usize) -> Option<BufferSlice> {
111 loop {
112 let current = self.write_pos.load(Ordering::Acquire);
113 let new_pos = current + len;
114
115 if new_pos > self.capacity {
116 return None;
117 }
118
119 if self
120 .write_pos
121 .compare_exchange_weak(current, new_pos, Ordering::AcqRel, Ordering::Relaxed)
122 .is_ok()
123 {
124 return Some(BufferSlice::new(Arc::clone(self), current, len));
125 }
126 std::hint::spin_loop();
128 }
129 }
130
131 #[allow(clippy::mut_from_ref)]
137 pub unsafe fn get_mut_slice(&self, offset: usize, len: usize) -> &mut [u8] {
138 assert!(
139 offset + len <= self.capacity,
140 "get_mut_slice out of bounds: offset={} len={} capacity={}",
141 offset,
142 len,
143 self.capacity
144 );
145 std::slice::from_raw_parts_mut(self.data.as_ptr().add(offset), len)
146 }
147
148 pub fn get_slice(&self, offset: usize, len: usize) -> &[u8] {
150 let write_pos = self.write_pos.load(Ordering::Acquire);
151 assert!(
152 offset + len <= write_pos,
153 "get_slice out of bounds: offset={} len={} write_pos={}",
154 offset,
155 len,
156 write_pos
157 );
158 unsafe { std::slice::from_raw_parts(self.data.as_ptr().add(offset), len) }
159 }
160
161 pub fn len(&self) -> usize {
163 self.write_pos.load(Ordering::Acquire)
164 }
165
166 pub fn is_empty(&self) -> bool {
168 self.len() == 0
169 }
170
171 pub fn remaining(&self) -> usize {
173 self.capacity - self.len()
174 }
175
176 pub fn capacity(&self) -> usize {
178 self.capacity
179 }
180
181 pub fn id(&self) -> u64 {
183 self.id
184 }
185
186 pub fn reset(&self) -> bool {
191 self.write_pos.store(0, Ordering::Release);
192 true
193 }
194
195 #[deprecated(note = "Use Arc::clone instead")]
197 pub fn add_ref(&self) {}
198
199 pub fn try_allocate(&self, len: usize) -> Option<usize> {
203 loop {
204 let current = self.write_pos.load(Ordering::Acquire);
205 let new_pos = current + len;
206
207 if new_pos > self.capacity {
208 return None;
209 }
210
211 if self
212 .write_pos
213 .compare_exchange_weak(current, new_pos, Ordering::AcqRel, Ordering::Relaxed)
214 .is_ok()
215 {
216 return Some(current);
217 }
218 std::hint::spin_loop();
219 }
220 }
221
222 #[deprecated(note = "Use Arc::strong_count instead")]
224 pub fn release(&self) -> bool {
225 false
226 }
227
228 #[deprecated(note = "Use Arc::strong_count instead")]
230 pub fn ref_count(&self) -> u32 {
231 0
232 }
233
234 pub fn freeze(&self) -> Bytes {
236 let len = self.len();
237 if len == 0 {
238 return Bytes::new();
239 }
240 Bytes::copy_from_slice(self.get_slice(0, len))
243 }
244}
245
246impl Drop for ZeroCopyBuffer {
247 fn drop(&mut self) {
248 unsafe {
250 dealloc(self.data.as_ptr(), self.layout);
251 }
252 }
253}
254
255#[derive(Debug, Clone)]
258pub struct BufferSlice {
259 buffer: Arc<ZeroCopyBuffer>,
260 offset: usize,
261 len: usize,
262}
263
264impl BufferSlice {
265 pub fn new(buffer: Arc<ZeroCopyBuffer>, offset: usize, len: usize) -> Self {
267 Self {
268 buffer,
269 offset,
270 len,
271 }
272 }
273
274 pub fn as_bytes(&self) -> &[u8] {
276 self.buffer.get_slice(self.offset, self.len)
277 }
278
279 pub unsafe fn as_mut_bytes(&mut self) -> &mut [u8] {
283 self.buffer.get_mut_slice(self.offset, self.len)
284 }
285
286 pub fn write(&mut self, data: &[u8]) -> usize {
288 let write_len = data.len().min(self.len);
289 unsafe {
290 let dest = self.as_mut_bytes();
291 dest[..write_len].copy_from_slice(&data[..write_len]);
292 }
293 write_len
294 }
295
296 pub fn len(&self) -> usize {
298 self.len
299 }
300
301 pub fn is_empty(&self) -> bool {
303 self.len == 0
304 }
305
306 pub fn offset(&self) -> usize {
308 self.offset
309 }
310
311 pub fn to_bytes(&self) -> Bytes {
313 Bytes::copy_from_slice(self.as_bytes())
314 }
315}
316
317impl Deref for BufferSlice {
318 type Target = [u8];
319
320 fn deref(&self) -> &Self::Target {
321 self.as_bytes()
322 }
323}
324
325impl AsRef<[u8]> for BufferSlice {
326 fn as_ref(&self) -> &[u8] {
327 self.as_bytes()
328 }
329}
330
331pub struct ZeroCopyBufferPool {
333 free_buffers: crossbeam_channel::Sender<Arc<ZeroCopyBuffer>>,
335 buffer_receiver: crossbeam_channel::Receiver<Arc<ZeroCopyBuffer>>,
337 buffer_size: usize,
339 next_id: AtomicU64,
341 total_created: AtomicU64,
343 in_use: AtomicU64,
345}
346
347impl ZeroCopyBufferPool {
348 pub fn new(buffer_size: usize, initial_count: usize) -> Self {
350 let (tx, rx) = crossbeam_channel::bounded(initial_count * 2);
351
352 let pool = Self {
353 free_buffers: tx,
354 buffer_receiver: rx,
355 buffer_size,
356 next_id: AtomicU64::new(0),
357 total_created: AtomicU64::new(0),
358 in_use: AtomicU64::new(0),
359 };
360
361 for _ in 0..initial_count {
363 let id = pool.next_id.fetch_add(1, Ordering::Relaxed);
364 let buffer = Arc::new(ZeroCopyBuffer::with_id(buffer_size, id));
365 pool.total_created.fetch_add(1, Ordering::Relaxed);
366 let _ = pool.free_buffers.try_send(buffer);
367 }
368
369 pool
370 }
371
372 pub fn acquire(&self) -> Arc<ZeroCopyBuffer> {
374 match self.buffer_receiver.try_recv() {
375 Ok(buffer) => {
376 if Arc::strong_count(&buffer) == 1 {
378 buffer.reset();
379 }
380 self.in_use.fetch_add(1, Ordering::Relaxed);
381 buffer
382 }
383 Err(_) => {
384 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
386 let buffer = Arc::new(ZeroCopyBuffer::with_id(self.buffer_size, id));
387 self.total_created.fetch_add(1, Ordering::Relaxed);
388 self.in_use.fetch_add(1, Ordering::Relaxed);
389 buffer
390 }
391 }
392 }
393
394 pub fn release(&self, buffer: Arc<ZeroCopyBuffer>) {
396 self.in_use.fetch_sub(1, Ordering::Relaxed);
397
398 if Arc::strong_count(&buffer) == 1 {
400 buffer.reset();
401 let _ = self.free_buffers.try_send(buffer);
402 }
403 }
404
405 pub fn stats(&self) -> BufferPoolStats {
407 BufferPoolStats {
408 buffer_size: self.buffer_size,
409 total_created: self.total_created.load(Ordering::Relaxed),
410 in_use: self.in_use.load(Ordering::Relaxed),
411 available: self.buffer_receiver.len() as u64,
412 }
413 }
414}
415
416#[derive(Debug, Clone)]
417pub struct BufferPoolStats {
418 pub buffer_size: usize,
419 pub total_created: u64,
420 pub in_use: u64,
421 pub available: u64,
422}
423
424#[derive(Debug)]
427pub struct ZeroCopyMessage {
428 pub topic: Arc<str>,
430 pub partition: u32,
432 pub key: Option<BufferRef>,
434 pub value: BufferRef,
436 pub headers: Vec<(Arc<str>, BufferRef)>,
438 pub timestamp: i64,
440}
441
442#[derive(Debug, Clone)]
444pub enum BufferRef {
445 Inline(SmallVec),
447 External(Bytes),
449 Slice {
451 buffer: Arc<ZeroCopyBuffer>,
452 offset: usize,
453 len: usize,
454 },
455}
456
457impl BufferRef {
458 pub fn from_bytes(data: &[u8]) -> Self {
460 if data.len() <= 64 {
461 BufferRef::Inline(SmallVec::from_slice(data))
462 } else {
463 BufferRef::External(Bytes::copy_from_slice(data))
464 }
465 }
466
467 pub fn from_external(data: Bytes) -> Self {
469 if data.len() <= 64 {
470 BufferRef::Inline(SmallVec::from_slice(&data))
471 } else {
472 BufferRef::External(data)
473 }
474 }
475
476 pub fn from_slice(buffer: Arc<ZeroCopyBuffer>, offset: usize, len: usize) -> Self {
478 if len <= 64 {
479 let data = buffer.get_slice(offset, len);
480 BufferRef::Inline(SmallVec::from_slice(data))
481 } else {
482 BufferRef::Slice {
483 buffer,
484 offset,
485 len,
486 }
487 }
488 }
489
490 pub fn as_bytes(&self) -> &[u8] {
492 match self {
493 BufferRef::Inline(sv) => sv.as_slice(),
494 BufferRef::External(b) => b,
495 BufferRef::Slice {
496 buffer,
497 offset,
498 len,
499 } => buffer.get_slice(*offset, *len),
500 }
501 }
502
503 pub fn len(&self) -> usize {
505 match self {
506 BufferRef::Inline(sv) => sv.len(),
507 BufferRef::External(b) => b.len(),
508 BufferRef::Slice { len, .. } => *len,
509 }
510 }
511
512 pub fn is_empty(&self) -> bool {
514 self.len() == 0
515 }
516
517 pub fn to_bytes(&self) -> Bytes {
519 match self {
520 BufferRef::Inline(sv) => Bytes::copy_from_slice(sv.as_slice()),
521 BufferRef::External(b) => b.clone(),
522 BufferRef::Slice {
523 buffer,
524 offset,
525 len,
526 } => Bytes::copy_from_slice(buffer.get_slice(*offset, *len)),
527 }
528 }
529}
530
531impl AsRef<[u8]> for BufferRef {
532 fn as_ref(&self) -> &[u8] {
533 self.as_bytes()
534 }
535}
536
537#[derive(Debug, Clone)]
539pub struct SmallVec {
540 data: [u8; 64],
541 len: u8,
542}
543
544impl SmallVec {
545 pub fn new() -> Self {
546 Self {
547 data: [0u8; 64],
548 len: 0,
549 }
550 }
551
552 pub fn from_slice(slice: &[u8]) -> Self {
553 let len = slice.len().min(64);
554 let mut sv = Self::new();
555 sv.data[..len].copy_from_slice(&slice[..len]);
556 sv.len = len as u8;
557 sv
558 }
559
560 pub fn as_slice(&self) -> &[u8] {
561 &self.data[..self.len as usize]
562 }
563
564 pub fn len(&self) -> usize {
565 self.len as usize
566 }
567
568 pub fn is_empty(&self) -> bool {
569 self.len == 0
570 }
571}
572
573impl Default for SmallVec {
574 fn default() -> Self {
575 Self::new()
576 }
577}
578
579pub struct ZeroCopyProducer {
581 buffer_pool: Arc<ZeroCopyBufferPool>,
583 current_buffer: parking_lot::Mutex<Option<Arc<ZeroCopyBuffer>>>,
585 topic_cache: dashmap::DashMap<String, Arc<str>>,
587 stats: ProducerStats,
589}
590
591impl ZeroCopyProducer {
592 pub fn new(buffer_pool: Arc<ZeroCopyBufferPool>) -> Self {
594 Self {
595 buffer_pool,
596 current_buffer: parking_lot::Mutex::new(None),
597 topic_cache: dashmap::DashMap::new(),
598 stats: ProducerStats::new(),
599 }
600 }
601
602 pub fn with_defaults() -> Self {
604 let pool = Arc::new(ZeroCopyBufferPool::new(DEFAULT_BUFFER_SIZE, 16));
605 Self::new(pool)
606 }
607
608 fn intern_topic(&self, topic: &str) -> Arc<str> {
610 if let Some(interned) = self.topic_cache.get(topic) {
611 return interned.clone();
612 }
613
614 let interned: Arc<str> = Arc::from(topic);
615 self.topic_cache.insert(topic.to_string(), interned.clone());
616 interned
617 }
618
619 pub fn create_message(
621 &self,
622 topic: &str,
623 partition: u32,
624 key: Option<&[u8]>,
625 value: &[u8],
626 ) -> ZeroCopyMessage {
627 self.stats.messages_created.fetch_add(1, Ordering::Relaxed);
628 self.stats
629 .bytes_written
630 .fetch_add(value.len() as u64, Ordering::Relaxed);
631
632 let topic = self.intern_topic(topic);
633 let timestamp = std::time::SystemTime::now()
634 .duration_since(std::time::UNIX_EPOCH)
635 .unwrap_or_default()
636 .as_millis() as i64;
637
638 ZeroCopyMessage {
639 topic,
640 partition,
641 key: key.map(BufferRef::from_bytes),
642 value: BufferRef::from_bytes(value),
643 headers: Vec::new(),
644 timestamp,
645 }
646 }
647
648 pub fn create_message_from_bytes(
650 &self,
651 topic: &str,
652 partition: u32,
653 key: Option<Bytes>,
654 value: Bytes,
655 ) -> ZeroCopyMessage {
656 self.stats.messages_created.fetch_add(1, Ordering::Relaxed);
657 self.stats
658 .bytes_written
659 .fetch_add(value.len() as u64, Ordering::Relaxed);
660
661 let topic = self.intern_topic(topic);
662 let timestamp = std::time::SystemTime::now()
663 .duration_since(std::time::UNIX_EPOCH)
664 .unwrap_or_default()
665 .as_millis() as i64;
666
667 ZeroCopyMessage {
668 topic,
669 partition,
670 key: key.map(BufferRef::from_external),
671 value: BufferRef::from_external(value),
672 headers: Vec::new(),
673 timestamp,
674 }
675 }
676
677 pub fn allocate(&self, size: usize) -> Option<(Arc<ZeroCopyBuffer>, usize)> {
683 let mut guard = self.current_buffer.lock();
684
685 if let Some(ref buffer) = *guard {
687 if let Some(offset) = buffer.try_allocate(size) {
688 return Some((buffer.clone(), offset));
689 }
690 }
691
692 let buffer = self.buffer_pool.acquire();
694 if let Some(offset) = buffer.try_allocate(size) {
695 *guard = Some(buffer.clone());
696 return Some((buffer, offset));
697 }
698
699 None
700 }
701
702 pub fn stats(&self) -> ProducerStatsSnapshot {
704 ProducerStatsSnapshot {
705 messages_created: self.stats.messages_created.load(Ordering::Relaxed),
706 bytes_written: self.stats.bytes_written.load(Ordering::Relaxed),
707 buffer_pool: self.buffer_pool.stats(),
708 }
709 }
710}
711
712struct ProducerStats {
713 messages_created: AtomicU64,
714 bytes_written: AtomicU64,
715}
716
717impl ProducerStats {
718 fn new() -> Self {
719 Self {
720 messages_created: AtomicU64::new(0),
721 bytes_written: AtomicU64::new(0),
722 }
723 }
724}
725
726#[derive(Debug, Clone)]
727pub struct ProducerStatsSnapshot {
728 pub messages_created: u64,
729 pub bytes_written: u64,
730 pub buffer_pool: BufferPoolStats,
731}
732
733pub struct ZeroCopyConsumer {
735 read_buffer: parking_lot::Mutex<BytesMut>,
737 stats: ConsumerStats,
739}
740
741impl ZeroCopyConsumer {
742 pub fn new() -> Self {
744 Self {
745 read_buffer: parking_lot::Mutex::new(BytesMut::with_capacity(DEFAULT_BUFFER_SIZE)),
746 stats: ConsumerStats::new(),
747 }
748 }
749
750 pub fn parse_messages(&self, data: Bytes) -> Vec<ConsumedMessage> {
752 let mut messages = Vec::new();
753 let mut offset = 0;
754
755 while offset < data.len() {
756 if offset + 20 > data.len() {
758 break;
759 }
760
761 let msg_len = u32::from_be_bytes([
763 data[offset],
764 data[offset + 1],
765 data[offset + 2],
766 data[offset + 3],
767 ]) as usize;
768
769 if offset + 4 + msg_len > data.len() {
770 break;
771 }
772
773 let msg_data = data.slice(offset + 4..offset + 4 + msg_len);
775
776 if let Some(msg) = self.parse_single_message(msg_data) {
777 messages.push(msg);
778 self.stats.messages_consumed.fetch_add(1, Ordering::Relaxed);
779 }
780
781 offset += 4 + msg_len;
782 }
783
784 self.stats
785 .bytes_read
786 .fetch_add(offset as u64, Ordering::Relaxed);
787 messages
788 }
789
790 fn parse_single_message(&self, data: Bytes) -> Option<ConsumedMessage> {
792 if data.len() < 16 {
793 return None;
794 }
795
796 let msg_offset = u64::from_be_bytes([
797 data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
798 ]);
799
800 let timestamp = i64::from_be_bytes([
801 data[8], data[9], data[10], data[11], data[12], data[13], data[14], data[15],
802 ]);
803
804 let value = data.slice(16..);
806
807 Some(ConsumedMessage {
808 offset: msg_offset,
809 timestamp,
810 key: None,
811 value,
812 })
813 }
814
815 pub fn stats(&self) -> ConsumerStatsSnapshot {
817 ConsumerStatsSnapshot {
818 messages_consumed: self.stats.messages_consumed.load(Ordering::Relaxed),
819 bytes_read: self.stats.bytes_read.load(Ordering::Relaxed),
820 }
821 }
822
823 pub fn buffer_data(&self, data: &[u8]) -> Bytes {
825 let mut buffer = self.read_buffer.lock();
826 buffer.clear();
827 buffer.extend_from_slice(data);
828 buffer.clone().freeze()
829 }
830}
831
832impl Default for ZeroCopyConsumer {
833 fn default() -> Self {
834 Self::new()
835 }
836}
837
838struct ConsumerStats {
839 messages_consumed: AtomicU64,
840 bytes_read: AtomicU64,
841}
842
843impl ConsumerStats {
844 fn new() -> Self {
845 Self {
846 messages_consumed: AtomicU64::new(0),
847 bytes_read: AtomicU64::new(0),
848 }
849 }
850}
851
852#[derive(Debug, Clone)]
853pub struct ConsumerStatsSnapshot {
854 pub messages_consumed: u64,
855 pub bytes_read: u64,
856}
857
858#[derive(Debug, Clone)]
860pub struct ConsumedMessage {
861 pub offset: u64,
863 pub timestamp: i64,
865 pub key: Option<Bytes>,
867 pub value: Bytes,
869}
870
871impl ConsumedMessage {
872 pub fn value_str(&self) -> Option<&str> {
874 std::str::from_utf8(&self.value).ok()
875 }
876
877 pub fn key_str(&self) -> Option<&str> {
879 self.key.as_ref().and_then(|k| std::str::from_utf8(k).ok())
880 }
881}
882
883#[cfg(test)]
884mod tests {
885 use super::*;
886
887 #[test]
888 fn test_zero_copy_buffer_basic() {
889 let buffer = Arc::new(ZeroCopyBuffer::new(1024));
890 assert_eq!(buffer.len(), 0);
891 assert!(buffer.remaining() >= 1024);
892
893 let slice = buffer.reserve(100).unwrap();
895 assert_eq!(slice.len(), 100);
896 assert_eq!(buffer.len(), 100);
897 }
898
899 #[test]
900 fn test_zero_copy_buffer_write() {
901 let buffer = Arc::new(ZeroCopyBuffer::new(1024));
902
903 let mut slice = buffer.reserve(11).unwrap();
904 slice.write(b"Hello World");
905
906 assert_eq!(slice.as_bytes(), b"Hello World");
907 }
908
909 #[test]
910 fn test_buffer_pool() {
911 let pool = ZeroCopyBufferPool::new(1024, 4);
912 let stats = pool.stats();
913 assert_eq!(stats.total_created, 4);
914 assert_eq!(stats.available, 4);
915
916 let b1 = pool.acquire();
918 let b2 = pool.acquire();
919
920 let stats = pool.stats();
921 assert_eq!(stats.in_use, 2);
922
923 pool.release(b1);
925 pool.release(b2);
926
927 let stats = pool.stats();
928 assert_eq!(stats.in_use, 0);
929 }
930
931 #[test]
932 fn test_buffer_ref_inline() {
933 let small_data = b"small";
934 let buf_ref = BufferRef::from_bytes(small_data);
935
936 match buf_ref {
937 BufferRef::Inline(_) => {}
938 _ => panic!("Expected inline storage for small data"),
939 }
940
941 assert_eq!(buf_ref.as_bytes(), small_data);
942 }
943
944 #[test]
945 fn test_buffer_ref_external() {
946 let large_data = vec![0u8; 100];
947 let buf_ref = BufferRef::from_bytes(&large_data);
948
949 match buf_ref {
950 BufferRef::External(_) => {}
951 _ => panic!("Expected external storage for large data"),
952 }
953
954 assert_eq!(buf_ref.len(), 100);
955 }
956
957 #[test]
958 fn test_zero_copy_producer() {
959 let producer = ZeroCopyProducer::with_defaults();
960
961 let msg = producer.create_message("test-topic", 0, Some(b"key1"), b"value1");
962
963 assert_eq!(&*msg.topic, "test-topic");
964 assert_eq!(msg.partition, 0);
965 assert_eq!(msg.key.unwrap().as_bytes(), b"key1");
966 assert_eq!(msg.value.as_bytes(), b"value1");
967
968 let stats = producer.stats();
969 assert_eq!(stats.messages_created, 1);
970 }
971
972 #[test]
973 fn test_zero_copy_consumer() {
974 let consumer = ZeroCopyConsumer::new();
975
976 let mut data = BytesMut::new();
978
979 data.extend_from_slice(&21u32.to_be_bytes());
981 data.extend_from_slice(&42u64.to_be_bytes());
983 data.extend_from_slice(&1234567890i64.to_be_bytes());
985 data.extend_from_slice(b"hello");
987
988 let messages = consumer.parse_messages(data.freeze());
989
990 assert_eq!(messages.len(), 1);
991 assert_eq!(messages[0].offset, 42);
992 assert_eq!(messages[0].timestamp, 1234567890);
993 assert_eq!(&messages[0].value[..], b"hello");
994 }
995
996 #[test]
997 fn test_small_vec() {
998 let sv = SmallVec::from_slice(b"test data");
999 assert_eq!(sv.as_slice(), b"test data");
1000 assert_eq!(sv.len(), 9);
1001 }
1002
1003 #[test]
1004 fn test_topic_interning() {
1005 let producer = ZeroCopyProducer::with_defaults();
1006
1007 let msg1 = producer.create_message("topic-a", 0, None, b"v1");
1008 let msg2 = producer.create_message("topic-a", 0, None, b"v2");
1009 let msg3 = producer.create_message("topic-b", 0, None, b"v3");
1010
1011 assert!(Arc::ptr_eq(&msg1.topic, &msg2.topic));
1013 assert!(!Arc::ptr_eq(&msg1.topic, &msg3.topic));
1015 }
1016}