1use crate::connection::RequestKey;
3use crate::error::ConnectionError;
4use bytes::{Buf, BufMut, BytesMut};
5use nom::{
6 bytes::streaming::take,
7 combinator::{map_res, verify},
8 number::streaming::{be_u16, be_u32},
9 IResult,
10};
11use prost::{self, Message as ImplProtobuf};
12use std::convert::TryFrom;
13use std::io::Cursor;
14
15const CRC_CASTAGNOLI: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISCSI);
16
17pub use self::proto::BaseCommand;
18pub use self::proto::MessageMetadata as Metadata;
19
20use self::proto::*;
21
22#[derive(Debug, Clone)]
27pub struct Message {
28 pub command: BaseCommand,
30 pub payload: Option<Payload>,
32}
33
34impl Message {
35 pub fn request_key(&self) -> Option<RequestKey> {
37 match &self.command {
38 BaseCommand {
39 subscribe: Some(CommandSubscribe { request_id, .. }),
40 ..
41 }
42 | BaseCommand {
43 partition_metadata: Some(CommandPartitionedTopicMetadata { request_id, .. }),
44 ..
45 }
46 | BaseCommand {
47 partition_metadata_response:
48 Some(CommandPartitionedTopicMetadataResponse { request_id, .. }),
49 ..
50 }
51 | BaseCommand {
52 lookup_topic: Some(CommandLookupTopic { request_id, .. }),
53 ..
54 }
55 | BaseCommand {
56 lookup_topic_response: Some(CommandLookupTopicResponse { request_id, .. }),
57 ..
58 }
59 | BaseCommand {
60 producer: Some(CommandProducer { request_id, .. }),
61 ..
62 }
63 | BaseCommand {
64 producer_success: Some(CommandProducerSuccess { request_id, .. }),
65 ..
66 }
67 | BaseCommand {
68 unsubscribe: Some(CommandUnsubscribe { request_id, .. }),
69 ..
70 }
71 | BaseCommand {
72 seek: Some(CommandSeek { request_id, .. }),
73 ..
74 }
75 | BaseCommand {
76 close_producer: Some(CommandCloseProducer { request_id, .. }),
77 ..
78 }
79 | BaseCommand {
80 success: Some(CommandSuccess { request_id, .. }),
81 ..
82 }
83 | BaseCommand {
84 error: Some(CommandError { request_id, .. }),
85 ..
86 }
87 | BaseCommand {
88 consumer_stats: Some(CommandConsumerStats { request_id, .. }),
89 ..
90 }
91 | BaseCommand {
92 consumer_stats_response: Some(CommandConsumerStatsResponse { request_id, .. }),
93 ..
94 }
95 | BaseCommand {
96 get_last_message_id: Some(CommandGetLastMessageId { request_id, .. }),
97 ..
98 }
99 | BaseCommand {
100 get_last_message_id_response:
101 Some(CommandGetLastMessageIdResponse { request_id, .. }),
102 ..
103 }
104 | BaseCommand {
105 get_topics_of_namespace: Some(CommandGetTopicsOfNamespace { request_id, .. }),
106 ..
107 }
108 | BaseCommand {
109 get_topics_of_namespace_response:
110 Some(CommandGetTopicsOfNamespaceResponse { request_id, .. }),
111 ..
112 }
113 | BaseCommand {
114 get_schema: Some(CommandGetSchema { request_id, .. }),
115 ..
116 }
117 | BaseCommand {
118 get_schema_response: Some(CommandGetSchemaResponse { request_id, .. }),
119 ..
120 } => Some(RequestKey::RequestId(*request_id)),
121 BaseCommand {
122 send:
123 Some(CommandSend {
124 producer_id,
125 sequence_id,
126 ..
127 }),
128 ..
129 }
130 | BaseCommand {
131 send_error:
132 Some(CommandSendError {
133 producer_id,
134 sequence_id,
135 ..
136 }),
137 ..
138 }
139 | BaseCommand {
140 send_receipt:
141 Some(CommandSendReceipt {
142 producer_id,
143 sequence_id,
144 ..
145 }),
146 ..
147 } => Some(RequestKey::ProducerSend {
148 producer_id: *producer_id,
149 sequence_id: *sequence_id,
150 }),
151 BaseCommand {
152 active_consumer_change: Some(CommandActiveConsumerChange { consumer_id, .. }),
153 ..
154 }
155 | BaseCommand {
156 message: Some(CommandMessage { consumer_id, .. }),
157 ..
158 }
159 | BaseCommand {
160 flow: Some(CommandFlow { consumer_id, .. }),
161 ..
162 }
163 | BaseCommand {
164 redeliver_unacknowledged_messages:
165 Some(CommandRedeliverUnacknowledgedMessages { consumer_id, .. }),
166 ..
167 }
168 | BaseCommand {
169 reached_end_of_topic: Some(CommandReachedEndOfTopic { consumer_id }),
170 ..
171 }
172 | BaseCommand {
173 ack: Some(CommandAck { consumer_id, .. }),
174 ..
175 } => Some(RequestKey::Consumer {
176 consumer_id: *consumer_id,
177 }),
178 BaseCommand {
179 close_consumer:
180 Some(CommandCloseConsumer {
181 consumer_id,
182 request_id,
183 }),
184 ..
185 } => Some(RequestKey::CloseConsumer {
186 consumer_id: *consumer_id,
187 request_id: *request_id,
188 }),
189 BaseCommand {
190 connect: Some(_), ..
191 }
192 | BaseCommand {
193 connected: Some(_), ..
194 }
195 | BaseCommand { ping: Some(_), .. }
196 | BaseCommand { pong: Some(_), .. } => None,
197 _ => {
198 match base_command::Type::try_from(self.command.r#type) {
199 Ok(type_) => {
200 warn!(
201 "Unexpected payload for command of type {:?}. This is likely a bug!",
202 type_
203 );
204 }
205 Err(()) => {
206 warn!(
207 "Received BaseCommand of unexpected type: {}",
208 self.command.r#type
209 );
210 }
211 }
212 None
213 }
214 }
215 }
216}
217
218pub struct Codec;
220
221#[cfg(feature = "tokio-runtime")]
222impl tokio_util::codec::Encoder<Message> for Codec {
223 type Error = ConnectionError;
224
225 fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), ConnectionError> {
226 let command_size = item.command.encoded_len();
227 let metadata_size = item
228 .payload
229 .as_ref()
230 .map(|p| p.metadata.encoded_len())
231 .unwrap_or(0);
232 let payload_size = item.payload.as_ref().map(|p| p.data.len()).unwrap_or(0);
233 let header_size = if item.payload.is_some() { 18 } else { 8 };
234 let total_size = command_size + metadata_size + payload_size + header_size - 4;
236 let mut buf = Vec::with_capacity(total_size + 4);
237
238 buf.put_u32(total_size as u32);
240 buf.put_u32(command_size as u32);
241 item.command.encode(&mut buf)?;
242
243 if let Some(payload) = &item.payload {
245 buf.put_u16(0x0e01);
246
247 let crc_offset = buf.len();
248 buf.put_u32(0); let metdata_offset = buf.len();
251 buf.put_u32(metadata_size as u32);
252 payload.metadata.encode(&mut buf)?;
253 buf.put(&payload.data[..]);
254
255 let crc = CRC_CASTAGNOLI.checksum(&buf[metdata_offset..]);
256 let mut crc_buf: &mut [u8] = &mut buf[crc_offset..metdata_offset];
257 crc_buf.put_u32(crc);
258 }
259 if dst.remaining_mut() < buf.len() {
260 dst.reserve(buf.len());
261 }
262 dst.put_slice(&buf);
263 trace!("Encoder sending {} bytes", buf.len());
264 Ok(())
266 }
267}
268
269#[cfg(feature = "tokio-runtime")]
270impl tokio_util::codec::Decoder for Codec {
271 type Item = Message;
272 type Error = ConnectionError;
273
274 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Message>, ConnectionError> {
275 trace!("Decoder received {} bytes", src.len());
276 if src.len() >= 4 {
277 let mut buf = Cursor::new(src);
278 let message_size = buf.get_u32() as usize + 4;
280 let src = buf.into_inner();
281 if src.len() >= message_size {
282 let msg = {
283 let (buf, command_frame) =
284 command_frame(&src[..message_size]).map_err(|err| {
285 ConnectionError::Decoding(format!(
286 "Error decoding command frame: {:?}",
287 err
288 ))
289 })?;
290 let command = BaseCommand::decode(command_frame.command)?;
291
292 let payload = if !buf.is_empty() {
293 let (buf, payload_frame) = payload_frame(buf).map_err(|err| {
294 ConnectionError::Decoding(format!(
295 "Error decoding payload frame: {:?}",
296 err
297 ))
298 })?;
299
300 let metadata = Metadata::decode(payload_frame.metadata)?;
303 Some(Payload {
304 metadata,
305 data: buf.to_vec(),
306 })
307 } else {
308 None
309 };
310
311 Message { command, payload }
312 };
313
314 src.advance(message_size);
316 return Ok(Some(msg));
318 }
319 }
320 Ok(None)
321 }
322}
323
324#[cfg(feature = "async-std-runtime")]
325impl asynchronous_codec::Encoder for Codec {
326 type Item = Message;
327 type Error = ConnectionError;
328
329 fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), ConnectionError> {
330 let command_size = item.command.encoded_len();
331 let metadata_size = item
332 .payload
333 .as_ref()
334 .map(|p| p.metadata.encoded_len())
335 .unwrap_or(0);
336 let payload_size = item.payload.as_ref().map(|p| p.data.len()).unwrap_or(0);
337 let header_size = if item.payload.is_some() { 18 } else { 8 };
338 let total_size = command_size + metadata_size + payload_size + header_size - 4;
340 let mut buf = Vec::with_capacity(total_size + 4);
341
342 buf.put_u32(total_size as u32);
344 buf.put_u32(command_size as u32);
345 item.command.encode(&mut buf)?;
346
347 if let Some(payload) = &item.payload {
349 buf.put_u16(0x0e01);
350
351 let crc_offset = buf.len();
352 buf.put_u32(0); let metdata_offset = buf.len();
355 buf.put_u32(metadata_size as u32);
356 payload.metadata.encode(&mut buf)?;
357 buf.put(&payload.data[..]);
358
359 let crc = CRC_CASTAGNOLI.checksum(&buf[metdata_offset..]);
360 let mut crc_buf: &mut [u8] = &mut buf[crc_offset..metdata_offset];
361 crc_buf.put_u32(crc);
362 }
363 if dst.remaining_mut() < buf.len() {
364 dst.reserve(buf.len());
365 }
366 dst.put_slice(&buf);
367 trace!("Encoder sending {} bytes", buf.len());
368 Ok(())
370 }
371}
372
373#[cfg(feature = "async-std-runtime")]
374impl asynchronous_codec::Decoder for Codec {
375 type Item = Message;
376 type Error = ConnectionError;
377
378 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Message>, ConnectionError> {
379 trace!("Decoder received {} bytes", src.len());
380 if src.len() >= 4 {
381 let mut buf = Cursor::new(src);
382 let message_size = buf.get_u32() as usize + 4;
384 let src = buf.into_inner();
385 if src.len() >= message_size {
386 let msg = {
387 let (buf, command_frame) =
388 command_frame(&src[..message_size]).map_err(|err| {
389 ConnectionError::Decoding(format!(
390 "Error decoding command frame: {:?}",
391 err
392 ))
393 })?;
394 let command = BaseCommand::decode(command_frame.command)?;
395
396 let payload = if !buf.is_empty() {
397 let (buf, payload_frame) = payload_frame(buf).map_err(|err| {
398 ConnectionError::Decoding(format!(
399 "Error decoding payload frame: {:?}",
400 err
401 ))
402 })?;
403
404 let metadata = Metadata::decode(payload_frame.metadata)?;
407 Some(Payload {
408 metadata,
409 data: buf.to_vec(),
410 })
411 } else {
412 None
413 };
414
415 Message { command, payload }
416 };
417
418 src.advance(message_size);
420 return Ok(Some(msg));
422 }
423 }
424 Ok(None)
425 }
426}
427
428#[derive(Debug, Clone)]
430pub struct Payload {
431 pub metadata: Metadata,
433 pub data: Vec<u8>,
435}
436
437struct CommandFrame<'a> {
438 #[allow(dead_code)]
439 total_size: u32,
440 #[allow(dead_code)]
441 command_size: u32,
442 command: &'a [u8],
443}
444
445fn command_frame(i: &[u8]) -> IResult<&[u8], CommandFrame> {
446 let (i, total_size) = be_u32(i)?;
447 let (i, command_size) = be_u32(i)?;
448 let (i, command) = take(command_size)(i)?;
449
450 Ok((
451 i,
452 CommandFrame {
453 total_size,
454 command_size,
455 command,
456 },
457 ))
458}
459
460struct PayloadFrame<'a> {
461 #[allow(dead_code)]
462 magic_number: u16,
463 #[allow(dead_code)]
464 checksum: u32,
465 #[allow(dead_code)]
466 metadata_size: u32,
467 metadata: &'a [u8],
468}
469
470fn payload_frame(i: &[u8]) -> IResult<&[u8], PayloadFrame> {
471 let (i, magic_number) = be_u16(i)?;
472 let (i, checksum) = be_u32(i)?;
473 let (i, metadata_size) = be_u32(i)?;
474 let (i, metadata) = take(metadata_size)(i)?;
475
476 Ok((
477 i,
478 PayloadFrame {
479 magic_number,
480 checksum,
481 metadata_size,
482 metadata,
483 },
484 ))
485}
486
487pub(crate) struct BatchedMessage {
488 pub metadata: proto::SingleMessageMetadata,
489 pub payload: Vec<u8>,
490}
491
492fn batched_message(i: &[u8]) -> IResult<&[u8], BatchedMessage> {
493 let (i, metadata_size) = be_u32(i)?;
494 let (i, metadata) = verify(
495 map_res(take(metadata_size), proto::SingleMessageMetadata::decode),
496 |metadata| metadata.payload_size >= 0,
498 )(i)?;
499
500 let (i, payload) = take(metadata.payload_size as u32)(i)?;
501
502 Ok((
503 i,
504 BatchedMessage {
505 metadata,
506 payload: payload.to_vec(),
507 },
508 ))
509}
510
511pub(crate) fn parse_batched_message(
512 count: u32,
513 payload: &[u8],
514) -> Result<Vec<BatchedMessage>, ConnectionError> {
515 let (_, result) =
516 nom::multi::count(batched_message, count as usize)(payload).map_err(|err| {
517 ConnectionError::Decoding(format!("Error decoding batched messages: {:?}", err))
518 })?;
519 Ok(result)
520}
521
522impl BatchedMessage {
523 pub(crate) fn serialize(&self, w: &mut Vec<u8>) {
524 w.put_u32(self.metadata.encoded_len() as u32);
525 let _ = self.metadata.encode(w);
526 w.put_slice(&self.payload);
527 }
528}
529
530#[rustfmt::skip]
531pub mod proto {
532 include!(concat!(env!("OUT_DIR"), "/pulsar.proto.rs"));
533
534 impl std::cmp::Eq for MessageIdData {}
536
537 impl std::hash::Hash for MessageIdData {
538 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
539 self.ledger_id.hash(state);
540 self.entry_id.hash(state);
541 self.partition.hash(state);
542 self.batch_index.hash(state);
543 self.ack_set.hash(state);
544 self.batch_size.hash(state);
545 }
546 }
547}
548
549impl TryFrom<i32> for proto::base_command::Type {
550 type Error = ();
551
552 fn try_from(value: i32) -> Result<Self, ()> {
553 match value {
554 2 => Ok(proto::base_command::Type::Connect),
555 3 => Ok(proto::base_command::Type::Connected),
556 4 => Ok(proto::base_command::Type::Subscribe),
557 5 => Ok(proto::base_command::Type::Producer),
558 6 => Ok(proto::base_command::Type::Send),
559 7 => Ok(proto::base_command::Type::SendReceipt),
560 8 => Ok(proto::base_command::Type::SendError),
561 9 => Ok(proto::base_command::Type::Message),
562 10 => Ok(proto::base_command::Type::Ack),
563 11 => Ok(proto::base_command::Type::Flow),
564 12 => Ok(proto::base_command::Type::Unsubscribe),
565 13 => Ok(proto::base_command::Type::Success),
566 14 => Ok(proto::base_command::Type::Error),
567 15 => Ok(proto::base_command::Type::CloseProducer),
568 16 => Ok(proto::base_command::Type::CloseConsumer),
569 17 => Ok(proto::base_command::Type::ProducerSuccess),
570 18 => Ok(proto::base_command::Type::Ping),
571 19 => Ok(proto::base_command::Type::Pong),
572 20 => Ok(proto::base_command::Type::RedeliverUnacknowledgedMessages),
573 21 => Ok(proto::base_command::Type::PartitionedMetadata),
574 22 => Ok(proto::base_command::Type::PartitionedMetadataResponse),
575 23 => Ok(proto::base_command::Type::Lookup),
576 24 => Ok(proto::base_command::Type::LookupResponse),
577 25 => Ok(proto::base_command::Type::ConsumerStats),
578 26 => Ok(proto::base_command::Type::ConsumerStatsResponse),
579 27 => Ok(proto::base_command::Type::ReachedEndOfTopic),
580 28 => Ok(proto::base_command::Type::Seek),
581 29 => Ok(proto::base_command::Type::GetLastMessageId),
582 30 => Ok(proto::base_command::Type::GetLastMessageIdResponse),
583 31 => Ok(proto::base_command::Type::ActiveConsumerChange),
584 32 => Ok(proto::base_command::Type::GetTopicsOfNamespace),
585 33 => Ok(proto::base_command::Type::GetTopicsOfNamespaceResponse),
586 34 => Ok(proto::base_command::Type::GetSchema),
587 35 => Ok(proto::base_command::Type::GetSchemaResponse),
588 _ => Err(()),
589 }
590 }
591}
592
593impl From<prost::EncodeError> for ConnectionError {
594 fn from(e: prost::EncodeError) -> Self {
595 ConnectionError::Encoding(e.to_string())
596 }
597}
598
599impl From<prost::DecodeError> for ConnectionError {
600 fn from(e: prost::DecodeError) -> Self {
601 ConnectionError::Decoding(e.to_string())
602 }
603}
604
605#[cfg(test)]
606mod tests {
607 use crate::message::Codec;
608 use bytes::BytesMut;
609 use std::convert::TryFrom;
610 use tokio_util::codec::{Decoder, Encoder};
611
612 #[test]
613 fn parse_simple_command() {
614 let input: &[u8] = &[
615 0x00, 0x00, 0x00, 0x22, 0x00, 0x00, 0x00, 0x1E, 0x08, 0x02, 0x12, 0x1A, 0x0A, 0x10,
616 0x32, 0x2E, 0x30, 0x2E, 0x31, 0x2D, 0x69, 0x6E, 0x63, 0x75, 0x62, 0x61, 0x74, 0x69,
617 0x6E, 0x67, 0x20, 0x0C, 0x2A, 0x04, 0x6E, 0x6F, 0x6E, 0x65,
618 ];
619
620 let message = Codec.decode(&mut input.into()).unwrap().unwrap();
621
622 {
623 let connect = message.command.connect.as_ref().unwrap();
624 assert_eq!(connect.client_version, "2.0.1-incubating");
625 assert_eq!(connect.auth_method_name.as_ref().unwrap(), "none");
626 assert_eq!(connect.protocol_version.as_ref().unwrap(), &12);
627 }
628
629 let mut output = BytesMut::with_capacity(38);
630 Codec.encode(message, &mut output).unwrap();
631 assert_eq!(&output, input);
632 }
633
634 #[test]
635 fn parse_payload_command() {
636 let input: &[u8] = &[
637 0x00, 0x00, 0x00, 0x3D, 0x00, 0x00, 0x00, 0x08, 0x08, 0x06, 0x32, 0x04, 0x08, 0x00,
638 0x10, 0x08, 0x0E, 0x01, 0x42, 0x83, 0x54, 0xB5, 0x00, 0x00, 0x00, 0x19, 0x0A, 0x0E,
639 0x73, 0x74, 0x61, 0x6E, 0x64, 0x61, 0x6C, 0x6F, 0x6E, 0x65, 0x2D, 0x30, 0x2D, 0x33,
640 0x10, 0x08, 0x18, 0xBE, 0xC0, 0xFC, 0x84, 0xD2, 0x2C, 0x68, 0x65, 0x6C, 0x6C, 0x6F,
641 0x2D, 0x70, 0x75, 0x6C, 0x73, 0x61, 0x72, 0x2D, 0x38,
642 ];
643
644 let message = Codec.decode(&mut input.into()).unwrap().unwrap();
645 {
646 let send = message.command.send.as_ref().unwrap();
647 assert_eq!(send.producer_id, 0);
648 assert_eq!(send.sequence_id, 8);
649 }
650
651 {
652 let payload = message.payload.as_ref().unwrap();
653 assert_eq!(payload.metadata.producer_name, "standalone-0-3");
654 assert_eq!(payload.metadata.sequence_id, 8);
655 assert_eq!(payload.metadata.publish_time, 1533850624062);
656 }
657
658 let mut output = BytesMut::with_capacity(65);
659 Codec.encode(message, &mut output).unwrap();
660 assert_eq!(&output, input);
661 }
662
663 #[test]
664 fn base_command_type_parsing() {
665 use super::proto::base_command::Type;
666 let mut successes = 0;
667 for i in 0..40 {
668 if let Ok(type_) = Type::try_from(i) {
669 successes += 1;
670 assert_eq!(type_ as i32, i);
671 }
672 }
673 assert_eq!(successes, 34);
674 }
675}