pulsar/
message.rs

1//! low level structures used to send and process raw messages
2use crate::connection::RequestKey;
3use crate::error::ConnectionError;
4use bytes::{Buf, BufMut, BytesMut};
5use nom::{
6    bytes::streaming::take,
7    combinator::{map_res, verify},
8    number::streaming::{be_u16, be_u32},
9    IResult,
10};
11use prost::{self, Message as ImplProtobuf};
12use std::convert::TryFrom;
13use std::io::Cursor;
14
15const CRC_CASTAGNOLI: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISCSI);
16
17pub use self::proto::BaseCommand;
18pub use self::proto::MessageMetadata as Metadata;
19
20use self::proto::*;
21
22/// Pulsar binary message
23///
24/// this structure holds any command sent to pulsar, like looking up a topic or
25/// subscribing on a topic
26#[derive(Debug, Clone)]
27pub struct Message {
28    /// Basic pulsar command, as defined in Pulsar's protobuf file
29    pub command: BaseCommand,
30    /// payload for topic messages
31    pub payload: Option<Payload>,
32}
33
34impl Message {
35    /// returns the message's RequestKey if present
36    pub fn request_key(&self) -> Option<RequestKey> {
37        match &self.command {
38            BaseCommand {
39                subscribe: Some(CommandSubscribe { request_id, .. }),
40                ..
41            }
42            | BaseCommand {
43                partition_metadata: Some(CommandPartitionedTopicMetadata { request_id, .. }),
44                ..
45            }
46            | BaseCommand {
47                partition_metadata_response:
48                    Some(CommandPartitionedTopicMetadataResponse { request_id, .. }),
49                ..
50            }
51            | BaseCommand {
52                lookup_topic: Some(CommandLookupTopic { request_id, .. }),
53                ..
54            }
55            | BaseCommand {
56                lookup_topic_response: Some(CommandLookupTopicResponse { request_id, .. }),
57                ..
58            }
59            | BaseCommand {
60                producer: Some(CommandProducer { request_id, .. }),
61                ..
62            }
63            | BaseCommand {
64                producer_success: Some(CommandProducerSuccess { request_id, .. }),
65                ..
66            }
67            | BaseCommand {
68                unsubscribe: Some(CommandUnsubscribe { request_id, .. }),
69                ..
70            }
71            | BaseCommand {
72                seek: Some(CommandSeek { request_id, .. }),
73                ..
74            }
75            | BaseCommand {
76                close_producer: Some(CommandCloseProducer { request_id, .. }),
77                ..
78            }
79            | BaseCommand {
80                success: Some(CommandSuccess { request_id, .. }),
81                ..
82            }
83            | BaseCommand {
84                error: Some(CommandError { request_id, .. }),
85                ..
86            }
87            | BaseCommand {
88                consumer_stats: Some(CommandConsumerStats { request_id, .. }),
89                ..
90            }
91            | BaseCommand {
92                consumer_stats_response: Some(CommandConsumerStatsResponse { request_id, .. }),
93                ..
94            }
95            | BaseCommand {
96                get_last_message_id: Some(CommandGetLastMessageId { request_id, .. }),
97                ..
98            }
99            | BaseCommand {
100                get_last_message_id_response:
101                    Some(CommandGetLastMessageIdResponse { request_id, .. }),
102                ..
103            }
104            | BaseCommand {
105                get_topics_of_namespace: Some(CommandGetTopicsOfNamespace { request_id, .. }),
106                ..
107            }
108            | BaseCommand {
109                get_topics_of_namespace_response:
110                    Some(CommandGetTopicsOfNamespaceResponse { request_id, .. }),
111                ..
112            }
113            | BaseCommand {
114                get_schema: Some(CommandGetSchema { request_id, .. }),
115                ..
116            }
117            | BaseCommand {
118                get_schema_response: Some(CommandGetSchemaResponse { request_id, .. }),
119                ..
120            } => Some(RequestKey::RequestId(*request_id)),
121            BaseCommand {
122                send:
123                    Some(CommandSend {
124                        producer_id,
125                        sequence_id,
126                        ..
127                    }),
128                ..
129            }
130            | BaseCommand {
131                send_error:
132                    Some(CommandSendError {
133                        producer_id,
134                        sequence_id,
135                        ..
136                    }),
137                ..
138            }
139            | BaseCommand {
140                send_receipt:
141                    Some(CommandSendReceipt {
142                        producer_id,
143                        sequence_id,
144                        ..
145                    }),
146                ..
147            } => Some(RequestKey::ProducerSend {
148                producer_id: *producer_id,
149                sequence_id: *sequence_id,
150            }),
151            BaseCommand {
152                active_consumer_change: Some(CommandActiveConsumerChange { consumer_id, .. }),
153                ..
154            }
155            | BaseCommand {
156                message: Some(CommandMessage { consumer_id, .. }),
157                ..
158            }
159            | BaseCommand {
160                flow: Some(CommandFlow { consumer_id, .. }),
161                ..
162            }
163            | BaseCommand {
164                redeliver_unacknowledged_messages:
165                    Some(CommandRedeliverUnacknowledgedMessages { consumer_id, .. }),
166                ..
167            }
168            | BaseCommand {
169                reached_end_of_topic: Some(CommandReachedEndOfTopic { consumer_id }),
170                ..
171            }
172            | BaseCommand {
173                ack: Some(CommandAck { consumer_id, .. }),
174                ..
175            } => Some(RequestKey::Consumer {
176                consumer_id: *consumer_id,
177            }),
178            BaseCommand {
179                close_consumer:
180                    Some(CommandCloseConsumer {
181                        consumer_id,
182                        request_id,
183                    }),
184                ..
185            } => Some(RequestKey::CloseConsumer {
186                consumer_id: *consumer_id,
187                request_id: *request_id,
188            }),
189            BaseCommand {
190                connect: Some(_), ..
191            }
192            | BaseCommand {
193                connected: Some(_), ..
194            }
195            | BaseCommand { ping: Some(_), .. }
196            | BaseCommand { pong: Some(_), .. } => None,
197            _ => {
198                match base_command::Type::try_from(self.command.r#type) {
199                    Ok(type_) => {
200                        warn!(
201                            "Unexpected payload for command of type {:?}. This is likely a bug!",
202                            type_
203                        );
204                    }
205                    Err(()) => {
206                        warn!(
207                            "Received BaseCommand of unexpected type: {}",
208                            self.command.r#type
209                        );
210                    }
211                }
212                None
213            }
214        }
215    }
216}
217
218/// tokio and async-std codec for Pulsar messages
219pub struct Codec;
220
221#[cfg(feature = "tokio-runtime")]
222impl tokio_util::codec::Encoder<Message> for Codec {
223    type Error = ConnectionError;
224
225    fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), ConnectionError> {
226        let command_size = item.command.encoded_len();
227        let metadata_size = item
228            .payload
229            .as_ref()
230            .map(|p| p.metadata.encoded_len())
231            .unwrap_or(0);
232        let payload_size = item.payload.as_ref().map(|p| p.data.len()).unwrap_or(0);
233        let header_size = if item.payload.is_some() { 18 } else { 8 };
234        // Total size does not include the size of the 'totalSize' field, so we subtract 4
235        let total_size = command_size + metadata_size + payload_size + header_size - 4;
236        let mut buf = Vec::with_capacity(total_size + 4);
237
238        // Simple command frame
239        buf.put_u32(total_size as u32);
240        buf.put_u32(command_size as u32);
241        item.command.encode(&mut buf)?;
242
243        // Payload command frame
244        if let Some(payload) = &item.payload {
245            buf.put_u16(0x0e01);
246
247            let crc_offset = buf.len();
248            buf.put_u32(0); // NOTE: Checksum (CRC32c). Overrwritten later to avoid copying.
249
250            let metdata_offset = buf.len();
251            buf.put_u32(metadata_size as u32);
252            payload.metadata.encode(&mut buf)?;
253            buf.put(&payload.data[..]);
254
255            let crc = CRC_CASTAGNOLI.checksum(&buf[metdata_offset..]);
256            let mut crc_buf: &mut [u8] = &mut buf[crc_offset..metdata_offset];
257            crc_buf.put_u32(crc);
258        }
259        if dst.remaining_mut() < buf.len() {
260            dst.reserve(buf.len());
261        }
262        dst.put_slice(&buf);
263        trace!("Encoder sending {} bytes", buf.len());
264        //        println!("Wrote message {:?}", item);
265        Ok(())
266    }
267}
268
269#[cfg(feature = "tokio-runtime")]
270impl tokio_util::codec::Decoder for Codec {
271    type Item = Message;
272    type Error = ConnectionError;
273
274    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Message>, ConnectionError> {
275        trace!("Decoder received {} bytes", src.len());
276        if src.len() >= 4 {
277            let mut buf = Cursor::new(src);
278            // `messageSize` refers only to _remaining_ message size, so we add 4 to get total frame size
279            let message_size = buf.get_u32() as usize + 4;
280            let src = buf.into_inner();
281            if src.len() >= message_size {
282                let msg = {
283                    let (buf, command_frame) =
284                        command_frame(&src[..message_size]).map_err(|err| {
285                            ConnectionError::Decoding(format!(
286                                "Error decoding command frame: {:?}",
287                                err
288                            ))
289                        })?;
290                    let command = BaseCommand::decode(command_frame.command)?;
291
292                    let payload = if !buf.is_empty() {
293                        let (buf, payload_frame) = payload_frame(buf).map_err(|err| {
294                            ConnectionError::Decoding(format!(
295                                "Error decoding payload frame: {:?}",
296                                err
297                            ))
298                        })?;
299
300                        // TODO: Check crc32 of payload data
301
302                        let metadata = Metadata::decode(payload_frame.metadata)?;
303                        Some(Payload {
304                            metadata,
305                            data: buf.to_vec(),
306                        })
307                    } else {
308                        None
309                    };
310
311                    Message { command, payload }
312                };
313
314                //TODO advance as we read, rather than this weird post thing
315                src.advance(message_size);
316                //                println!("Read message {:?}", &msg);
317                return Ok(Some(msg));
318            }
319        }
320        Ok(None)
321    }
322}
323
324#[cfg(feature = "async-std-runtime")]
325impl asynchronous_codec::Encoder for Codec {
326    type Item = Message;
327    type Error = ConnectionError;
328
329    fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), ConnectionError> {
330        let command_size = item.command.encoded_len();
331        let metadata_size = item
332            .payload
333            .as_ref()
334            .map(|p| p.metadata.encoded_len())
335            .unwrap_or(0);
336        let payload_size = item.payload.as_ref().map(|p| p.data.len()).unwrap_or(0);
337        let header_size = if item.payload.is_some() { 18 } else { 8 };
338        // Total size does not include the size of the 'totalSize' field, so we subtract 4
339        let total_size = command_size + metadata_size + payload_size + header_size - 4;
340        let mut buf = Vec::with_capacity(total_size + 4);
341
342        // Simple command frame
343        buf.put_u32(total_size as u32);
344        buf.put_u32(command_size as u32);
345        item.command.encode(&mut buf)?;
346
347        // Payload command frame
348        if let Some(payload) = &item.payload {
349            buf.put_u16(0x0e01);
350
351            let crc_offset = buf.len();
352            buf.put_u32(0); // NOTE: Checksum (CRC32c). Overrwritten later to avoid copying.
353
354            let metdata_offset = buf.len();
355            buf.put_u32(metadata_size as u32);
356            payload.metadata.encode(&mut buf)?;
357            buf.put(&payload.data[..]);
358
359            let crc = CRC_CASTAGNOLI.checksum(&buf[metdata_offset..]);
360            let mut crc_buf: &mut [u8] = &mut buf[crc_offset..metdata_offset];
361            crc_buf.put_u32(crc);
362        }
363        if dst.remaining_mut() < buf.len() {
364            dst.reserve(buf.len());
365        }
366        dst.put_slice(&buf);
367        trace!("Encoder sending {} bytes", buf.len());
368        //        println!("Wrote message {:?}", item);
369        Ok(())
370    }
371}
372
373#[cfg(feature = "async-std-runtime")]
374impl asynchronous_codec::Decoder for Codec {
375    type Item = Message;
376    type Error = ConnectionError;
377
378    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Message>, ConnectionError> {
379        trace!("Decoder received {} bytes", src.len());
380        if src.len() >= 4 {
381            let mut buf = Cursor::new(src);
382            // `messageSize` refers only to _remaining_ message size, so we add 4 to get total frame size
383            let message_size = buf.get_u32() as usize + 4;
384            let src = buf.into_inner();
385            if src.len() >= message_size {
386                let msg = {
387                    let (buf, command_frame) =
388                        command_frame(&src[..message_size]).map_err(|err| {
389                            ConnectionError::Decoding(format!(
390                                "Error decoding command frame: {:?}",
391                                err
392                            ))
393                        })?;
394                    let command = BaseCommand::decode(command_frame.command)?;
395
396                    let payload = if !buf.is_empty() {
397                        let (buf, payload_frame) = payload_frame(buf).map_err(|err| {
398                            ConnectionError::Decoding(format!(
399                                "Error decoding payload frame: {:?}",
400                                err
401                            ))
402                        })?;
403
404                        // TODO: Check crc32 of payload data
405
406                        let metadata = Metadata::decode(payload_frame.metadata)?;
407                        Some(Payload {
408                            metadata,
409                            data: buf.to_vec(),
410                        })
411                    } else {
412                        None
413                    };
414
415                    Message { command, payload }
416                };
417
418                //TODO advance as we read, rather than this weird post thing
419                src.advance(message_size);
420                //                println!("Read message {:?}", &msg);
421                return Ok(Some(msg));
422            }
423        }
424        Ok(None)
425    }
426}
427
428/// message payload
429#[derive(Debug, Clone)]
430pub struct Payload {
431    /// message metadata added by Pulsar
432    pub metadata: Metadata,
433    /// raw message data
434    pub data: Vec<u8>,
435}
436
437struct CommandFrame<'a> {
438    #[allow(dead_code)]
439    total_size: u32,
440    #[allow(dead_code)]
441    command_size: u32,
442    command: &'a [u8],
443}
444
445fn command_frame(i: &[u8]) -> IResult<&[u8], CommandFrame> {
446    let (i, total_size) = be_u32(i)?;
447    let (i, command_size) = be_u32(i)?;
448    let (i, command) = take(command_size)(i)?;
449
450    Ok((
451        i,
452        CommandFrame {
453            total_size,
454            command_size,
455            command,
456        },
457    ))
458}
459
460struct PayloadFrame<'a> {
461    #[allow(dead_code)]
462    magic_number: u16,
463    #[allow(dead_code)]
464    checksum: u32,
465    #[allow(dead_code)]
466    metadata_size: u32,
467    metadata: &'a [u8],
468}
469
470fn payload_frame(i: &[u8]) -> IResult<&[u8], PayloadFrame> {
471    let (i, magic_number) = be_u16(i)?;
472    let (i, checksum) = be_u32(i)?;
473    let (i, metadata_size) = be_u32(i)?;
474    let (i, metadata) = take(metadata_size)(i)?;
475
476    Ok((
477        i,
478        PayloadFrame {
479            magic_number,
480            checksum,
481            metadata_size,
482            metadata,
483        },
484    ))
485}
486
487pub(crate) struct BatchedMessage {
488    pub metadata: proto::SingleMessageMetadata,
489    pub payload: Vec<u8>,
490}
491
492fn batched_message(i: &[u8]) -> IResult<&[u8], BatchedMessage> {
493    let (i, metadata_size) = be_u32(i)?;
494    let (i, metadata) = verify(
495        map_res(take(metadata_size), proto::SingleMessageMetadata::decode),
496        // payload_size is defined as i32 in protobuf
497        |metadata| metadata.payload_size >= 0,
498    )(i)?;
499
500    let (i, payload) = take(metadata.payload_size as u32)(i)?;
501
502    Ok((
503        i,
504        BatchedMessage {
505            metadata,
506            payload: payload.to_vec(),
507        },
508    ))
509}
510
511pub(crate) fn parse_batched_message(
512    count: u32,
513    payload: &[u8],
514) -> Result<Vec<BatchedMessage>, ConnectionError> {
515    let (_, result) =
516        nom::multi::count(batched_message, count as usize)(payload).map_err(|err| {
517            ConnectionError::Decoding(format!("Error decoding batched messages: {:?}", err))
518        })?;
519    Ok(result)
520}
521
522impl BatchedMessage {
523    pub(crate) fn serialize(&self, w: &mut Vec<u8>) {
524        w.put_u32(self.metadata.encoded_len() as u32);
525        let _ = self.metadata.encode(w);
526        w.put_slice(&self.payload);
527    }
528}
529
530#[rustfmt::skip]
531pub mod proto {
532    include!(concat!(env!("OUT_DIR"), "/pulsar.proto.rs"));
533
534    //trait implementations used in Consumer::unacked_messages
535    impl std::cmp::Eq for MessageIdData {}
536
537    impl std::hash::Hash for MessageIdData {
538         fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
539             self.ledger_id.hash(state);
540             self.entry_id.hash(state);
541             self.partition.hash(state);
542             self.batch_index.hash(state);
543             self.ack_set.hash(state);
544             self.batch_size.hash(state);
545         }
546    }
547}
548
549impl TryFrom<i32> for proto::base_command::Type {
550    type Error = ();
551
552    fn try_from(value: i32) -> Result<Self, ()> {
553        match value {
554            2 => Ok(proto::base_command::Type::Connect),
555            3 => Ok(proto::base_command::Type::Connected),
556            4 => Ok(proto::base_command::Type::Subscribe),
557            5 => Ok(proto::base_command::Type::Producer),
558            6 => Ok(proto::base_command::Type::Send),
559            7 => Ok(proto::base_command::Type::SendReceipt),
560            8 => Ok(proto::base_command::Type::SendError),
561            9 => Ok(proto::base_command::Type::Message),
562            10 => Ok(proto::base_command::Type::Ack),
563            11 => Ok(proto::base_command::Type::Flow),
564            12 => Ok(proto::base_command::Type::Unsubscribe),
565            13 => Ok(proto::base_command::Type::Success),
566            14 => Ok(proto::base_command::Type::Error),
567            15 => Ok(proto::base_command::Type::CloseProducer),
568            16 => Ok(proto::base_command::Type::CloseConsumer),
569            17 => Ok(proto::base_command::Type::ProducerSuccess),
570            18 => Ok(proto::base_command::Type::Ping),
571            19 => Ok(proto::base_command::Type::Pong),
572            20 => Ok(proto::base_command::Type::RedeliverUnacknowledgedMessages),
573            21 => Ok(proto::base_command::Type::PartitionedMetadata),
574            22 => Ok(proto::base_command::Type::PartitionedMetadataResponse),
575            23 => Ok(proto::base_command::Type::Lookup),
576            24 => Ok(proto::base_command::Type::LookupResponse),
577            25 => Ok(proto::base_command::Type::ConsumerStats),
578            26 => Ok(proto::base_command::Type::ConsumerStatsResponse),
579            27 => Ok(proto::base_command::Type::ReachedEndOfTopic),
580            28 => Ok(proto::base_command::Type::Seek),
581            29 => Ok(proto::base_command::Type::GetLastMessageId),
582            30 => Ok(proto::base_command::Type::GetLastMessageIdResponse),
583            31 => Ok(proto::base_command::Type::ActiveConsumerChange),
584            32 => Ok(proto::base_command::Type::GetTopicsOfNamespace),
585            33 => Ok(proto::base_command::Type::GetTopicsOfNamespaceResponse),
586            34 => Ok(proto::base_command::Type::GetSchema),
587            35 => Ok(proto::base_command::Type::GetSchemaResponse),
588            _ => Err(()),
589        }
590    }
591}
592
593impl From<prost::EncodeError> for ConnectionError {
594    fn from(e: prost::EncodeError) -> Self {
595        ConnectionError::Encoding(e.to_string())
596    }
597}
598
599impl From<prost::DecodeError> for ConnectionError {
600    fn from(e: prost::DecodeError) -> Self {
601        ConnectionError::Decoding(e.to_string())
602    }
603}
604
605#[cfg(test)]
606mod tests {
607    use crate::message::Codec;
608    use bytes::BytesMut;
609    use std::convert::TryFrom;
610    use tokio_util::codec::{Decoder, Encoder};
611
612    #[test]
613    fn parse_simple_command() {
614        let input: &[u8] = &[
615            0x00, 0x00, 0x00, 0x22, 0x00, 0x00, 0x00, 0x1E, 0x08, 0x02, 0x12, 0x1A, 0x0A, 0x10,
616            0x32, 0x2E, 0x30, 0x2E, 0x31, 0x2D, 0x69, 0x6E, 0x63, 0x75, 0x62, 0x61, 0x74, 0x69,
617            0x6E, 0x67, 0x20, 0x0C, 0x2A, 0x04, 0x6E, 0x6F, 0x6E, 0x65,
618        ];
619
620        let message = Codec.decode(&mut input.into()).unwrap().unwrap();
621
622        {
623            let connect = message.command.connect.as_ref().unwrap();
624            assert_eq!(connect.client_version, "2.0.1-incubating");
625            assert_eq!(connect.auth_method_name.as_ref().unwrap(), "none");
626            assert_eq!(connect.protocol_version.as_ref().unwrap(), &12);
627        }
628
629        let mut output = BytesMut::with_capacity(38);
630        Codec.encode(message, &mut output).unwrap();
631        assert_eq!(&output, input);
632    }
633
634    #[test]
635    fn parse_payload_command() {
636        let input: &[u8] = &[
637            0x00, 0x00, 0x00, 0x3D, 0x00, 0x00, 0x00, 0x08, 0x08, 0x06, 0x32, 0x04, 0x08, 0x00,
638            0x10, 0x08, 0x0E, 0x01, 0x42, 0x83, 0x54, 0xB5, 0x00, 0x00, 0x00, 0x19, 0x0A, 0x0E,
639            0x73, 0x74, 0x61, 0x6E, 0x64, 0x61, 0x6C, 0x6F, 0x6E, 0x65, 0x2D, 0x30, 0x2D, 0x33,
640            0x10, 0x08, 0x18, 0xBE, 0xC0, 0xFC, 0x84, 0xD2, 0x2C, 0x68, 0x65, 0x6C, 0x6C, 0x6F,
641            0x2D, 0x70, 0x75, 0x6C, 0x73, 0x61, 0x72, 0x2D, 0x38,
642        ];
643
644        let message = Codec.decode(&mut input.into()).unwrap().unwrap();
645        {
646            let send = message.command.send.as_ref().unwrap();
647            assert_eq!(send.producer_id, 0);
648            assert_eq!(send.sequence_id, 8);
649        }
650
651        {
652            let payload = message.payload.as_ref().unwrap();
653            assert_eq!(payload.metadata.producer_name, "standalone-0-3");
654            assert_eq!(payload.metadata.sequence_id, 8);
655            assert_eq!(payload.metadata.publish_time, 1533850624062);
656        }
657
658        let mut output = BytesMut::with_capacity(65);
659        Codec.encode(message, &mut output).unwrap();
660        assert_eq!(&output, input);
661    }
662
663    #[test]
664    fn base_command_type_parsing() {
665        use super::proto::base_command::Type;
666        let mut successes = 0;
667        for i in 0..40 {
668            if let Ok(type_) = Type::try_from(i) {
669                successes += 1;
670                assert_eq!(type_ as i32, i);
671            }
672        }
673        assert_eq!(successes, 34);
674    }
675}