1use {
2 super::{
3 define,
4 define::{epat_pid, epes_stream_id, ts},
5 errors::{MpegTsError, MpegTsErrorValue},
6 pat, pes,
7 pes::PesMuxer,
8 pmt, utils,
9 },
10 bytes::{BufMut, BytesMut},
11 bytesio::{bytes_reader::BytesReader, bytes_writer::BytesWriter},
12};
13
14pub struct TsMuxer {
15 pub bytes_writer: BytesWriter,
16 pat_continuity_counter: u8,
17 pmt_continuity_counter: u8,
18 h264_h265_with_aud: bool,
19 pid: u16,
20 pat_period: i64,
21 pcr_period: i64,
22 pcr_clock: i64,
23 pat: pat::Pat,
24 cur_pmt_index: usize,
25 cur_stream_index: usize,
26
27 packet_number: usize,
28}
29
30impl Default for TsMuxer {
31 fn default() -> Self {
32 Self::new()
33 }
34}
35impl TsMuxer {
36 pub fn new() -> Self {
37 Self {
38 bytes_writer: BytesWriter::new(),
39 pat_continuity_counter: 0,
40 pmt_continuity_counter: 0,
41 h264_h265_with_aud: false,
42 pid: 0x0100,
43 pat_period: 0,
44 pcr_period: 80 * 90,
45 pcr_clock: 0,
46 pat: pat::Pat::new(),
47 cur_pmt_index: 0,
48 cur_stream_index: 0,
49 packet_number: 0,
50 }
51 }
52
53 pub fn reset(&mut self) {
54 self.pat_period = 0;
55 self.pcr_period = 80 * 90;
56 self.pcr_clock = 0;
57
58 self.packet_number = 0;
59 }
60
61 pub fn get_data(&mut self) -> BytesMut {
62 self.bytes_writer.extract_current_bytes()
63 }
64
65 pub fn write(
66 &mut self,
67 pid: u16,
68 pts: i64,
69 dts: i64,
70 flags: u16,
71 payload: BytesMut,
72 ) -> Result<(), MpegTsError> {
73 self.h264_h265_with_aud = (flags & define::MPEG_FLAG_H264_H265_WITH_AUD) > 0;
74
75 self.find_stream(pid)?;
80
81 let cur_pmt = self.pat.pmt.get_mut(self.cur_pmt_index).unwrap();
82 let cur_stream = cur_pmt.streams.get_mut(self.cur_stream_index).unwrap();
83
84 if 0x1FFF == cur_pmt.pcr_pid
85 || (define::epes_stream_id::PES_SID_VIDEO
86 == (cur_stream.stream_id & define::epes_stream_id::PES_SID_VIDEO)
87 && (cur_pmt.pcr_pid != cur_stream.pid))
88 {
89 cur_pmt.pcr_pid = cur_stream.pid;
90 self.pat_period = 0;
91 }
92
93 if cur_pmt.pcr_pid == cur_stream.pid {
94 self.pcr_clock += 1;
95 }
96
97 cur_stream.pts = pts;
98 cur_stream.dts = dts;
99
100 if (flags & define::MPEG_FLAG_IDR_FRAME) > 0 {
101 cur_stream.data_alignment_indicator = 1; } else {
103 cur_stream.data_alignment_indicator = 0; }
105
106 if 0 == self.pat_period || (self.pat_period + define::PAT_PERIOD) <= dts {
107 self.pat_period = dts;
108 let pat_data = pat::PatMuxer::new().write(self.pat.clone())?;
109
110 self.write_ts_header_for_pat_pmt(
111 epat_pid::PAT_TID_PAS,
112 pat_data,
113 self.pat_continuity_counter,
114 )?;
115 self.pat_continuity_counter = (self.pat_continuity_counter + 1) % 16;
116 self.packet_number += 1;
117
118 for pmt_data in &mut self.pat.pmt.clone() {
119 let payload_data = pmt::PmtMuxer::new().write(pmt_data)?;
120 self.write_ts_header_for_pat_pmt(
121 pmt_data.pid,
122 payload_data,
123 self.pmt_continuity_counter,
124 )?;
125 self.pmt_continuity_counter = (self.pmt_continuity_counter + 1) % 16;
126 self.packet_number += 1;
127 }
128 }
129
130 self.write_pes(payload)?;
131
132 Ok(())
133 }
134
135 pub fn write_ts_header_for_pat_pmt(
136 &mut self,
137 pid: u16,
138 payload: BytesMut,
139 continuity_counter: u8,
140 ) -> Result<(), MpegTsError> {
141 self.bytes_writer.write_u8(0x47)?; self.bytes_writer
145 .write_u8(0x40 | ((pid >> 8) as u8 & 0x1F))?; self.bytes_writer.write_u8(pid as u8)?; self.bytes_writer.write_u8(0x10 | continuity_counter)?;
150
151 self.bytes_writer.write_u8(0x00)?; self.bytes_writer.write(&payload)?;
156
157 let left_size = ts::TS_PACKET_SIZE - payload.len() as u8 - 5;
158 for _ in 0..left_size {
159 self.bytes_writer.write_u8(0xFF)?;
160 }
161 Ok(())
162 }
163 pub fn write_pes(&mut self, payload: BytesMut) -> Result<(), MpegTsError> {
165 let mut is_start: bool = true;
166 let mut payload_reader = BytesReader::new(payload);
167
168 while !payload_reader.is_empty() {
169 let mut pes_muxer = PesMuxer::new();
171 if is_start {
172 let cur_pmt = self.pat.pmt.get_mut(self.cur_pmt_index).unwrap();
173 let stream_data = cur_pmt.streams.get_mut(self.cur_stream_index).unwrap();
174 pes_muxer.write_pes_header(
175 payload_reader.len(),
176 stream_data,
177 self.h264_h265_with_aud,
178 )?;
179 }
180
181 let pes_header_length: usize = pes_muxer.len();
182 let mut payload_length = payload_reader.len();
183
184 let mut ts_header = BytesWriter::new();
186 payload_length = self.write_ts_header_for_pes(
187 &mut ts_header,
188 pes_header_length,
189 payload_length,
190 is_start,
191 )?;
192 self.packet_number += 1;
193
194 is_start = false;
195
196 let data = payload_reader.read_bytes(payload_length)?;
197
198 self.bytes_writer.append(&mut ts_header);
199 self.bytes_writer.append(&mut pes_muxer.bytes_writer);
200 self.bytes_writer.write(&data[..])?;
201 }
202 Ok(())
203 }
204 pub fn write_ts_header_for_pes(
205 &mut self,
206
207 ts_header: &mut BytesWriter,
208 pes_header_length: usize,
209 payload_data_length: usize,
210 is_start: bool,
211 ) -> Result<usize, MpegTsError> {
212 let cur_pmt = self.pat.pmt.get_mut(self.cur_pmt_index).unwrap();
213 let stream_data = cur_pmt.streams.get_mut(self.cur_stream_index).unwrap();
214
215 let pcr_pid = cur_pmt.pcr_pid;
216
217 ts_header.write_u8(0x47)?; ts_header.write_u8((stream_data.pid >> 8) as u8 & 0x1F)?; ts_header.write_u8((stream_data.pid & 0xFF) as u8)?; ts_header.write_u8(0x10 | (stream_data.continuity_counter & 0x0F))?; stream_data.continuity_counter = (stream_data.continuity_counter + 1) % 16;
237
238 if is_start {
239 ts_header.or_u8_at(1, define::TS_PAYLOAD_UNIT_START_INDICATOR)?;
241
242 if (stream_data.pid == pcr_pid)
243 || ((stream_data.data_alignment_indicator > 0)
244 && define::PTS_NO_VALUE != stream_data.pts)
245 {
246 ts_header.or_u8_at(3, 0x20)?;
248
249 ts_header.write_u8(0x01)?; ts_header.write_u8(0x00)?; if stream_data.pid == pcr_pid {
256 ts_header.or_u8_at(5, define::AF_FLAG_PCR)?;
258
259 let pcr = if define::PTS_NO_VALUE == stream_data.dts {
260 stream_data.pts
261 } else {
262 stream_data.dts
263 };
264 let mut pcr_result: BytesWriter = BytesWriter::new();
265 utils::pcr_write(&mut pcr_result, pcr * 300)?;
266 ts_header.write(&pcr_result.extract_current_bytes()[..])?;
267 ts_header.add_u8_at(4, 6)?;
269 }
270
271 if (stream_data.data_alignment_indicator > 0)
272 && define::PTS_NO_VALUE != stream_data.pts
273 {
274 ts_header.or_u8_at(5, define::AF_FLAG_RANDOM_ACCESS_INDICATOR)?;
276 }
277 }
278 }
279
280 let ts_header_length = ts_header.len();
291 let mut stuffing_length = define::TS_PACKET_SIZE as i32
292 - (ts_header_length + pes_header_length + payload_data_length) as i32;
293
294 if stuffing_length > 0 {
295 if (ts_header.get(3).unwrap() & 0x20) > 0 {
296 ts_header.add_u8_at(4, stuffing_length as u8)?;
298 } else {
299 ts_header.or_u8_at(3, 0x20)?;
301 stuffing_length -= 1;
303 ts_header.write_u8(stuffing_length as u8)?;
305 if stuffing_length >= 1 {
307 stuffing_length -= 1;
309 ts_header.write_u8(0x00)?;
310 }
311 }
312 for _ in 0..stuffing_length {
313 ts_header.write_u8(0xFF)?;
314 }
315 } else {
316 return Ok(define::TS_PACKET_SIZE - ts_header_length - pes_header_length);
317 }
318
319 Ok(payload_data_length)
320 }
321
322 pub fn find_stream(&mut self, pid: u16) -> Result<(), MpegTsError> {
323 let mut stream_index: usize = 0;
325
326 for (pmt_index, pmt) in self.pat.pmt.iter_mut().enumerate() {
327 for stream in pmt.streams.iter_mut() {
328 if stream.pid == pid {
329 self.cur_pmt_index = pmt_index;
330 self.cur_stream_index = stream_index;
331
332 return Ok(());
333 }
334 stream_index += 1;
335 }
336 }
337
338 Err(MpegTsError {
352 value: MpegTsErrorValue::StreamNotFound,
353 })
354 }
355
356 pub fn add_stream(&mut self, codecid: u8, extra_data: BytesMut) -> Result<u16, MpegTsError> {
357 if self.pat.pmt.is_empty() {
358 self.add_program(1, BytesMut::new())?;
359 }
360
361 self.pmt_add_stream(0, codecid, extra_data)
362 }
363
364 pub fn pmt_add_stream(
365 &mut self,
366 pmt_index: usize,
367 codecid: u8,
368 extra_data: BytesMut,
369 ) -> Result<u16, MpegTsError> {
370 let pmt = &mut self.pat.pmt[pmt_index];
371
372 if pmt.streams.len() == 4 {
373 return Err(MpegTsError {
374 value: MpegTsErrorValue::StreamCountExeceed,
375 });
376 }
377
378 let mut cur_stream = pes::Pes::new(); cur_stream.codec_id = codecid;
381 cur_stream.pid = self.pid;
382 self.pid += 1;
383
384 if utils::is_steam_type_video(codecid) {
385 cur_stream.stream_id = epes_stream_id::PES_SID_VIDEO;
386 } else if utils::is_steam_type_audio(codecid) {
387 cur_stream.stream_id = epes_stream_id::PES_SID_AUDIO;
388 } else {
389 cur_stream.stream_id = epes_stream_id::PES_SID_PRIVATE_1;
390 }
391
392 if !extra_data.is_empty() {
393 cur_stream.esinfo.put(extra_data);
394 }
395
396 pmt.streams.push(cur_stream);
397 pmt.version_number = (pmt.version_number + 1) % 32;
398
399 self.reset();
400
401 Ok(self.pid - 1)
402 }
403
404 pub fn add_program(&mut self, program_number: u16, info: BytesMut) -> Result<(), MpegTsError> {
405 for cur_pmt in self.pat.pmt.iter() {
406 if cur_pmt.program_number == program_number {
407 return Err(MpegTsError {
408 value: MpegTsErrorValue::ProgramNumberExists,
409 });
410 }
411 }
412
413 if self.pat.pmt.len() == 4 {
414 return Err(MpegTsError {
415 value: MpegTsErrorValue::PmtCountExeceed,
416 });
417 }
418 let mut cur_pmt = pmt::Pmt::new(); cur_pmt.pid = self.pid;
421 self.pid += 1;
422 cur_pmt.program_number = program_number;
423 cur_pmt.version_number = 0x00;
424 cur_pmt.continuity_counter = 0;
425 cur_pmt.pcr_pid = 0x1FFF;
426
427 if !info.is_empty() {
428 cur_pmt.program_info.put(&info[..]);
429 }
430
431 self.pat.pmt.push(cur_pmt);
432
433 Ok(())
434 }
435}