rtmp/messages/
parser.rs

1use {
2    super::{
3        define::{msg_type_id, RtmpMessageData},
4        errors::MessageError,
5    },
6    crate::{
7        chunk::ChunkInfo,
8        protocol_control_messages::reader::ProtocolControlMessageReader,
9        user_control_messages::reader::EventMessagesReader,
10        // utils,
11    },
12    bytesio::bytes_reader::BytesReader,
13    xflv::amf0::{amf0_markers, amf0_reader::Amf0Reader},
14};
15
16pub struct MessageParser {
17    chunk_info: ChunkInfo,
18}
19
20impl MessageParser {
21    pub fn new(chunk_info: ChunkInfo) -> Self {
22        Self { chunk_info }
23    }
24    pub fn parse(self) -> Result<Option<RtmpMessageData>, MessageError> {
25        let mut reader = BytesReader::new(self.chunk_info.payload);
26
27        match self.chunk_info.message_header.msg_type_id {
28            msg_type_id::COMMAND_AMF0 | msg_type_id::COMMAND_AMF3 => {
29                if self.chunk_info.message_header.msg_type_id == msg_type_id::COMMAND_AMF3 {
30                    reader.read_u8()?;
31                }
32                let mut amf_reader = Amf0Reader::new(reader);
33
34                let command_name = amf_reader.read_with_type(amf0_markers::STRING)?;
35                let transaction_id = amf_reader.read_with_type(amf0_markers::NUMBER)?;
36
37                // match command_name.clone() {
38                //     Amf0ValueType::UTF8String(val) => {
39                //         log::info!("command_name:{}", val);
40                //     }
41                //     _ => {}
42                // }
43
44                //The third value can be an object or NULL object
45                let command_obj_raw = amf_reader.read_with_type(amf0_markers::OBJECT);
46                let command_obj = match command_obj_raw {
47                    Ok(val) => val,
48                    Err(_) => amf_reader.read_with_type(amf0_markers::NULL)?,
49                };
50
51                let others = amf_reader.read_all()?;
52
53                return Ok(Some(RtmpMessageData::Amf0Command {
54                    command_name,
55                    transaction_id,
56                    command_object: command_obj,
57                    others,
58                }));
59            }
60
61            msg_type_id::AUDIO => {
62                log::trace!(
63                    "receive audio msg , msg length is{}\n",
64                    self.chunk_info.message_header.msg_length
65                );
66
67                return Ok(Some(RtmpMessageData::AudioData {
68                    data: reader.extract_remaining_bytes(),
69                }));
70            }
71            msg_type_id::VIDEO => {
72                log::trace!(
73                    "receive video msg , msg length is{}\n",
74                    self.chunk_info.message_header.msg_length
75                );
76                return Ok(Some(RtmpMessageData::VideoData {
77                    data: reader.extract_remaining_bytes(),
78                }));
79            }
80            msg_type_id::USER_CONTROL_EVENT => {
81                log::trace!(
82                    "receive user control event msg , msg length is{}\n",
83                    self.chunk_info.message_header.msg_length
84                );
85                let data = EventMessagesReader::new(reader).parse_event()?;
86                return Ok(Some(data));
87            }
88            msg_type_id::SET_CHUNK_SIZE => {
89                let chunk_size = ProtocolControlMessageReader::new(reader).read_set_chunk_size()?;
90                return Ok(Some(RtmpMessageData::SetChunkSize { chunk_size }));
91            }
92            msg_type_id::ABORT => {
93                let chunk_stream_id =
94                    ProtocolControlMessageReader::new(reader).read_abort_message()?;
95                return Ok(Some(RtmpMessageData::AbortMessage { chunk_stream_id }));
96            }
97            msg_type_id::ACKNOWLEDGEMENT => {
98                let sequence_number =
99                    ProtocolControlMessageReader::new(reader).read_acknowledgement()?;
100                return Ok(Some(RtmpMessageData::Acknowledgement { sequence_number }));
101            }
102            msg_type_id::WIN_ACKNOWLEDGEMENT_SIZE => {
103                let size =
104                    ProtocolControlMessageReader::new(reader).read_window_acknowledgement_size()?;
105                return Ok(Some(RtmpMessageData::WindowAcknowledgementSize { size }));
106            }
107            msg_type_id::SET_PEER_BANDWIDTH => {
108                let properties =
109                    ProtocolControlMessageReader::new(reader).read_set_peer_bandwidth()?;
110                return Ok(Some(RtmpMessageData::SetPeerBandwidth { properties }));
111            }
112            msg_type_id::DATA_AMF0 | msg_type_id::DATA_AMF3 => {
113                //let values = Amf0Reader::new(reader).read_all()?;
114                return Ok(Some(RtmpMessageData::AmfData {
115                    raw_data: reader.extract_remaining_bytes(),
116                }));
117            }
118
119            msg_type_id::SHARED_OBJ_AMF3 | msg_type_id::SHARED_OBJ_AMF0 => {}
120
121            msg_type_id::AGGREGATE => {}
122
123            _ => {}
124        }
125        log::warn!(
126            "the msg_type_id is not processed: {}",
127            self.chunk_info.message_header.msg_type_id
128        );
129        Ok(None)
130    }
131}
132
133#[cfg(test)]
134mod tests {
135
136    use super::MessageParser;
137    use crate::chunk::unpacketizer::ChunkUnpacketizer;
138    use crate::chunk::unpacketizer::UnpackResult;
139
140    #[test]
141    fn test_message_parse() {
142        let mut unpacker = ChunkUnpacketizer::new();
143
144        let data: [u8; 205] = [
145            2, 0, 0, 0, 0, 0, 4, 1, 0, 0, 0, 0, 0, 0, 16, 0, //set chunk size
146            //connect
147            3, //|format+csid|
148            0, 0, 0, //timestamp
149            0, 0, 177, //msg_length
150            20,  //msg_type_id 0x14
151            0, 0, 0, 0, //msg_stream_id
152            2, 0, 7, 99, 111, 110, 110, 101, 99, 116, 0, 63, 240, 0, 0, 0, 0, 0, 0, //body
153            3, 0, 3, 97, 112, 112, 2, 0, 6, 104, 97, 114, 108, 97, 110, 0, 4, 116, 121, 112, 101,
154            2, 0, 10, 110, 111, 110, 112, 114, 105, 118, 97, 116, 101, 0, 8, 102, 108, 97, 115,
155            104, 86, 101, 114, 2, 0, 31, 70, 77, 76, 69, 47, 51, 46, 48, 32, 40, 99, 111, 109, 112,
156            97, 116, 105, 98, 108, 101, 59, 32, 70, 77, 83, 99, 47, 49, 46, 48, 41, 0, 6, 115, 119,
157            102, 85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108,
158            104, 111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 5, 116, 99,
159            85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108, 104,
160            111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 0, 9,
161        ];
162
163        unpacker.extend_data(&data[..]);
164
165        loop {
166            let result = unpacker.read_chunk();
167
168            let rv = match result {
169                Ok(val) => val,
170                Err(_) => {
171                    print!("end-----------");
172                    return;
173                }
174            };
175
176            if let UnpackResult::ChunkInfo(chunk_info) = rv {
177                let _ = chunk_info.message_header.msg_streamd_id;
178                let _ = chunk_info.message_header.timestamp;
179
180                let message_parser = MessageParser::new(chunk_info);
181                let _ = message_parser.parse();
182            }
183        }
184    }
185}