1use {
2 super::{
3 define::{msg_type_id, RtmpMessageData},
4 errors::MessageError,
5 },
6 crate::{
7 chunk::ChunkInfo,
8 protocol_control_messages::reader::ProtocolControlMessageReader,
9 user_control_messages::reader::EventMessagesReader,
10 },
12 bytesio::bytes_reader::BytesReader,
13 xflv::amf0::{amf0_markers, amf0_reader::Amf0Reader},
14};
15
16pub struct MessageParser {
17 chunk_info: ChunkInfo,
18}
19
20impl MessageParser {
21 pub fn new(chunk_info: ChunkInfo) -> Self {
22 Self { chunk_info }
23 }
24 pub fn parse(self) -> Result<Option<RtmpMessageData>, MessageError> {
25 let mut reader = BytesReader::new(self.chunk_info.payload);
26
27 match self.chunk_info.message_header.msg_type_id {
28 msg_type_id::COMMAND_AMF0 | msg_type_id::COMMAND_AMF3 => {
29 if self.chunk_info.message_header.msg_type_id == msg_type_id::COMMAND_AMF3 {
30 reader.read_u8()?;
31 }
32 let mut amf_reader = Amf0Reader::new(reader);
33
34 let command_name = amf_reader.read_with_type(amf0_markers::STRING)?;
35 let transaction_id = amf_reader.read_with_type(amf0_markers::NUMBER)?;
36
37 let command_obj_raw = amf_reader.read_with_type(amf0_markers::OBJECT);
46 let command_obj = match command_obj_raw {
47 Ok(val) => val,
48 Err(_) => amf_reader.read_with_type(amf0_markers::NULL)?,
49 };
50
51 let others = amf_reader.read_all()?;
52
53 return Ok(Some(RtmpMessageData::Amf0Command {
54 command_name,
55 transaction_id,
56 command_object: command_obj,
57 others,
58 }));
59 }
60
61 msg_type_id::AUDIO => {
62 log::trace!(
63 "receive audio msg , msg length is{}\n",
64 self.chunk_info.message_header.msg_length
65 );
66
67 return Ok(Some(RtmpMessageData::AudioData {
68 data: reader.extract_remaining_bytes(),
69 }));
70 }
71 msg_type_id::VIDEO => {
72 log::trace!(
73 "receive video msg , msg length is{}\n",
74 self.chunk_info.message_header.msg_length
75 );
76 return Ok(Some(RtmpMessageData::VideoData {
77 data: reader.extract_remaining_bytes(),
78 }));
79 }
80 msg_type_id::USER_CONTROL_EVENT => {
81 log::trace!(
82 "receive user control event msg , msg length is{}\n",
83 self.chunk_info.message_header.msg_length
84 );
85 let data = EventMessagesReader::new(reader).parse_event()?;
86 return Ok(Some(data));
87 }
88 msg_type_id::SET_CHUNK_SIZE => {
89 let chunk_size = ProtocolControlMessageReader::new(reader).read_set_chunk_size()?;
90 return Ok(Some(RtmpMessageData::SetChunkSize { chunk_size }));
91 }
92 msg_type_id::ABORT => {
93 let chunk_stream_id =
94 ProtocolControlMessageReader::new(reader).read_abort_message()?;
95 return Ok(Some(RtmpMessageData::AbortMessage { chunk_stream_id }));
96 }
97 msg_type_id::ACKNOWLEDGEMENT => {
98 let sequence_number =
99 ProtocolControlMessageReader::new(reader).read_acknowledgement()?;
100 return Ok(Some(RtmpMessageData::Acknowledgement { sequence_number }));
101 }
102 msg_type_id::WIN_ACKNOWLEDGEMENT_SIZE => {
103 let size =
104 ProtocolControlMessageReader::new(reader).read_window_acknowledgement_size()?;
105 return Ok(Some(RtmpMessageData::WindowAcknowledgementSize { size }));
106 }
107 msg_type_id::SET_PEER_BANDWIDTH => {
108 let properties =
109 ProtocolControlMessageReader::new(reader).read_set_peer_bandwidth()?;
110 return Ok(Some(RtmpMessageData::SetPeerBandwidth { properties }));
111 }
112 msg_type_id::DATA_AMF0 | msg_type_id::DATA_AMF3 => {
113 return Ok(Some(RtmpMessageData::AmfData {
115 raw_data: reader.extract_remaining_bytes(),
116 }));
117 }
118
119 msg_type_id::SHARED_OBJ_AMF3 | msg_type_id::SHARED_OBJ_AMF0 => {}
120
121 msg_type_id::AGGREGATE => {}
122
123 _ => {}
124 }
125 log::warn!(
126 "the msg_type_id is not processed: {}",
127 self.chunk_info.message_header.msg_type_id
128 );
129 Ok(None)
130 }
131}
132
133#[cfg(test)]
134mod tests {
135
136 use super::MessageParser;
137 use crate::chunk::unpacketizer::ChunkUnpacketizer;
138 use crate::chunk::unpacketizer::UnpackResult;
139
140 #[test]
141 fn test_message_parse() {
142 let mut unpacker = ChunkUnpacketizer::new();
143
144 let data: [u8; 205] = [
145 2, 0, 0, 0, 0, 0, 4, 1, 0, 0, 0, 0, 0, 0, 16, 0, 3, 0, 0, 0, 0, 0, 177, 20, 0, 0, 0, 0, 2, 0, 7, 99, 111, 110, 110, 101, 99, 116, 0, 63, 240, 0, 0, 0, 0, 0, 0, 3, 0, 3, 97, 112, 112, 2, 0, 6, 104, 97, 114, 108, 97, 110, 0, 4, 116, 121, 112, 101,
154 2, 0, 10, 110, 111, 110, 112, 114, 105, 118, 97, 116, 101, 0, 8, 102, 108, 97, 115,
155 104, 86, 101, 114, 2, 0, 31, 70, 77, 76, 69, 47, 51, 46, 48, 32, 40, 99, 111, 109, 112,
156 97, 116, 105, 98, 108, 101, 59, 32, 70, 77, 83, 99, 47, 49, 46, 48, 41, 0, 6, 115, 119,
157 102, 85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108,
158 104, 111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 5, 116, 99,
159 85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108, 104,
160 111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 0, 9,
161 ];
162
163 unpacker.extend_data(&data[..]);
164
165 loop {
166 let result = unpacker.read_chunk();
167
168 let rv = match result {
169 Ok(val) => val,
170 Err(_) => {
171 print!("end-----------");
172 return;
173 }
174 };
175
176 if let UnpackResult::ChunkInfo(chunk_info) = rv {
177 let _ = chunk_info.message_header.msg_streamd_id;
178 let _ = chunk_info.message_header.timestamp;
179
180 let message_parser = MessageParser::new(chunk_info);
181 let _ = message_parser.parse();
182 }
183 }
184 }
185}