xmpegts/
ts.rs

1use {
2    super::{
3        define,
4        define::{epat_pid, epes_stream_id, ts},
5        errors::{MpegTsError, MpegTsErrorValue},
6        pat, pes,
7        pes::PesMuxer,
8        pmt, utils,
9    },
10    bytes::{BufMut, BytesMut},
11    bytesio::{bytes_reader::BytesReader, bytes_writer::BytesWriter},
12};
13
14pub struct TsMuxer {
15    pub bytes_writer: BytesWriter,
16    pat_continuity_counter: u8,
17    pmt_continuity_counter: u8,
18    h264_h265_with_aud: bool,
19    pid: u16,
20    pat_period: i64,
21    pcr_period: i64,
22    pcr_clock: i64,
23    pat: pat::Pat,
24    cur_pmt_index: usize,
25    cur_stream_index: usize,
26
27    packet_number: usize,
28}
29
30impl Default for TsMuxer {
31    fn default() -> Self {
32        Self::new()
33    }
34}
35impl TsMuxer {
36    pub fn new() -> Self {
37        Self {
38            bytes_writer: BytesWriter::new(),
39            pat_continuity_counter: 0,
40            pmt_continuity_counter: 0,
41            h264_h265_with_aud: false,
42            pid: 0x0100,
43            pat_period: 0,
44            pcr_period: 80 * 90,
45            pcr_clock: 0,
46            pat: pat::Pat::new(),
47            cur_pmt_index: 0,
48            cur_stream_index: 0,
49            packet_number: 0,
50        }
51    }
52
53    pub fn reset(&mut self) {
54        self.pat_period = 0;
55        self.pcr_period = 80 * 90;
56        self.pcr_clock = 0;
57
58        self.packet_number = 0;
59    }
60
61    pub fn get_data(&mut self) -> BytesMut {
62        self.bytes_writer.extract_current_bytes()
63    }
64
65    pub fn write(
66        &mut self,
67        pid: u16,
68        pts: i64,
69        dts: i64,
70        flags: u16,
71        payload: BytesMut,
72    ) -> Result<(), MpegTsError> {
73        self.h264_h265_with_aud = (flags & define::MPEG_FLAG_H264_H265_WITH_AUD) > 0;
74
75        //print!("pes payload length {}\n", payload.len());
76        //self.packet_number += payload.len();
77        //print!("pes payload sum length {}\n", self.payload_sum);
78
79        self.find_stream(pid)?;
80
81        let cur_pmt = self.pat.pmt.get_mut(self.cur_pmt_index).unwrap();
82        let cur_stream = cur_pmt.streams.get_mut(self.cur_stream_index).unwrap();
83
84        if 0x1FFF == cur_pmt.pcr_pid
85            || (define::epes_stream_id::PES_SID_VIDEO
86                == (cur_stream.stream_id & define::epes_stream_id::PES_SID_VIDEO)
87                && (cur_pmt.pcr_pid != cur_stream.pid))
88        {
89            cur_pmt.pcr_pid = cur_stream.pid;
90            self.pat_period = 0;
91        }
92
93        if cur_pmt.pcr_pid == cur_stream.pid {
94            self.pcr_clock += 1;
95        }
96
97        cur_stream.pts = pts;
98        cur_stream.dts = dts;
99
100        if (flags & define::MPEG_FLAG_IDR_FRAME) > 0 {
101            cur_stream.data_alignment_indicator = 1; // idr frame
102        } else {
103            cur_stream.data_alignment_indicator = 0; // idr frame
104        }
105
106        if 0 == self.pat_period || (self.pat_period + define::PAT_PERIOD) <= dts {
107            self.pat_period = dts;
108            let pat_data = pat::PatMuxer::new().write(self.pat.clone())?;
109
110            self.write_ts_header_for_pat_pmt(
111                epat_pid::PAT_TID_PAS,
112                pat_data,
113                self.pat_continuity_counter,
114            )?;
115            self.pat_continuity_counter = (self.pat_continuity_counter + 1) % 16;
116            self.packet_number += 1;
117
118            for pmt_data in &mut self.pat.pmt.clone() {
119                let payload_data = pmt::PmtMuxer::new().write(pmt_data)?;
120                self.write_ts_header_for_pat_pmt(
121                    pmt_data.pid,
122                    payload_data,
123                    self.pmt_continuity_counter,
124                )?;
125                self.pmt_continuity_counter = (self.pmt_continuity_counter + 1) % 16;
126                self.packet_number += 1;
127            }
128        }
129
130        self.write_pes(payload)?;
131
132        Ok(())
133    }
134
135    pub fn write_ts_header_for_pat_pmt(
136        &mut self,
137        pid: u16,
138        payload: BytesMut,
139        continuity_counter: u8,
140    ) -> Result<(), MpegTsError> {
141        /*sync byte*/
142        self.bytes_writer.write_u8(0x47)?; //0
143                                           /*PID 13 bits*/
144        self.bytes_writer
145            .write_u8(0x40 | ((pid >> 8) as u8 & 0x1F))?; //1
146
147        self.bytes_writer.write_u8(pid as u8)?; //2
148
149        self.bytes_writer.write_u8(0x10 | continuity_counter)?;
150
151        /*adaption field control*/
152        self.bytes_writer.write_u8(0x00)?; //4
153
154        /*payload data*/
155        self.bytes_writer.write(&payload)?;
156
157        let left_size = ts::TS_PACKET_SIZE - payload.len() as u8 - 5;
158        for _ in 0..left_size {
159            self.bytes_writer.write_u8(0xFF)?;
160        }
161        Ok(())
162    }
163    //2.4.3.6 PES packet P35
164    pub fn write_pes(&mut self, payload: BytesMut) -> Result<(), MpegTsError> {
165        let mut is_start: bool = true;
166        let mut payload_reader = BytesReader::new(payload);
167
168        while !payload_reader.is_empty() {
169            //write pes header
170            let mut pes_muxer = PesMuxer::new();
171            if is_start {
172                let cur_pmt = self.pat.pmt.get_mut(self.cur_pmt_index).unwrap();
173                let stream_data = cur_pmt.streams.get_mut(self.cur_stream_index).unwrap();
174                pes_muxer.write_pes_header(
175                    payload_reader.len(),
176                    stream_data,
177                    self.h264_h265_with_aud,
178                )?;
179            }
180
181            let pes_header_length: usize = pes_muxer.len();
182            let mut payload_length = payload_reader.len();
183
184            //write ts header
185            let mut ts_header = BytesWriter::new();
186            payload_length = self.write_ts_header_for_pes(
187                &mut ts_header,
188                pes_header_length,
189                payload_length,
190                is_start,
191            )?;
192            self.packet_number += 1;
193
194            is_start = false;
195
196            let data = payload_reader.read_bytes(payload_length)?;
197
198            self.bytes_writer.append(&mut ts_header);
199            self.bytes_writer.append(&mut pes_muxer.bytes_writer);
200            self.bytes_writer.write(&data[..])?;
201        }
202        Ok(())
203    }
204    pub fn write_ts_header_for_pes(
205        &mut self,
206
207        ts_header: &mut BytesWriter,
208        pes_header_length: usize,
209        payload_data_length: usize,
210        is_start: bool,
211    ) -> Result<usize, MpegTsError> {
212        let cur_pmt = self.pat.pmt.get_mut(self.cur_pmt_index).unwrap();
213        let stream_data = cur_pmt.streams.get_mut(self.cur_stream_index).unwrap();
214
215        let pcr_pid = cur_pmt.pcr_pid;
216
217        /****************************************************************/
218        /*        ts header 4 bytes without adaptation filed            */
219        /*****************************************************************
220         0                   1                   2                   3
221         0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
222        +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
223        |   sync byte   | | | |          PID            |   |   |       |
224        +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
225        */
226
227        /*sync byte*/
228        ts_header.write_u8(0x47)?; //0
229
230        /*PID 13 bits*/
231        ts_header.write_u8((stream_data.pid >> 8) as u8 & 0x1F)?; //1
232        ts_header.write_u8((stream_data.pid & 0xFF) as u8)?; //2
233
234        /*continuity counter 4 bits*/
235        ts_header.write_u8(0x10 | (stream_data.continuity_counter & 0x0F))?; //3
236        stream_data.continuity_counter = (stream_data.continuity_counter + 1) % 16;
237
238        if is_start {
239            /*payload unit start indicator*/
240            ts_header.or_u8_at(1, define::TS_PAYLOAD_UNIT_START_INDICATOR)?;
241
242            if (stream_data.pid == pcr_pid)
243                || ((stream_data.data_alignment_indicator > 0)
244                    && define::PTS_NO_VALUE != stream_data.pts)
245            {
246                /*adaption field control*/
247                ts_header.or_u8_at(3, 0x20)?;
248
249                /*adaption filed length -- set value to 1 for flags*/
250                ts_header.write_u8(0x01)?; //4
251
252                /*will be used for adaptation field flags if have*/
253                ts_header.write_u8(0x00)?; //5
254
255                if stream_data.pid == pcr_pid {
256                    /*adaption field flags*/
257                    ts_header.or_u8_at(5, define::AF_FLAG_PCR)?;
258
259                    let pcr = if define::PTS_NO_VALUE == stream_data.dts {
260                        stream_data.pts
261                    } else {
262                        stream_data.dts
263                    };
264                    let mut pcr_result: BytesWriter = BytesWriter::new();
265                    utils::pcr_write(&mut pcr_result, pcr * 300)?;
266                    ts_header.write(&pcr_result.extract_current_bytes()[..])?;
267                    /*adaption filed length -- add 6 for pcr length*/
268                    ts_header.add_u8_at(4, 6)?;
269                }
270
271                if (stream_data.data_alignment_indicator > 0)
272                    && define::PTS_NO_VALUE != stream_data.pts
273                {
274                    /*adaption field flags*/
275                    ts_header.or_u8_at(5, define::AF_FLAG_RANDOM_ACCESS_INDICATOR)?;
276                }
277            }
278        }
279
280        /*
281        +-------------------------------------------------------------------------+
282        |        ts header                              | PES data                |
283        +-------------------------------------------------------------------------+
284        | 4bytes fixed header | adaption field(stuffing)| pes header | pes payload|
285        +-------------------------------------------------------------------------+
286        */
287        // If payload data cannot fill up the 188 bytes packet,
288        // then stuffling bytes need to be filled in the adaptation field,
289
290        let ts_header_length = ts_header.len();
291        let mut stuffing_length = define::TS_PACKET_SIZE as i32
292            - (ts_header_length + pes_header_length + payload_data_length) as i32;
293
294        if stuffing_length > 0 {
295            if (ts_header.get(3).unwrap() & 0x20) > 0 {
296                /*adaption filed length -- add 6 for pcr length*/
297                ts_header.add_u8_at(4, stuffing_length as u8)?;
298            } else {
299                /*adaption field control*/
300                ts_header.or_u8_at(3, 0x20)?;
301                /*AF length,because it occupys one byte,so here sub one.*/
302                stuffing_length -= 1;
303                /*adaption filed length*/
304                ts_header.write_u8(stuffing_length as u8)?;
305                /*add flag*/
306                if stuffing_length >= 1 {
307                    /*adaptation field flags flag occupies one byte, sub one.*/
308                    stuffing_length -= 1;
309                    ts_header.write_u8(0x00)?;
310                }
311            }
312            for _ in 0..stuffing_length {
313                ts_header.write_u8(0xFF)?;
314            }
315        } else {
316            return Ok(define::TS_PACKET_SIZE - ts_header_length - pes_header_length);
317        }
318
319        Ok(payload_data_length)
320    }
321
322    pub fn find_stream(&mut self, pid: u16) -> Result<(), MpegTsError> {
323        // let mut pmt_index: usize = 0;
324        let mut stream_index: usize = 0;
325
326        for (pmt_index, pmt) in self.pat.pmt.iter_mut().enumerate() {
327            for stream in pmt.streams.iter_mut() {
328                if stream.pid == pid {
329                    self.cur_pmt_index = pmt_index;
330                    self.cur_stream_index = stream_index;
331
332                    return Ok(());
333                }
334                stream_index += 1;
335            }
336        }
337
338        // for pmt in self.pat.pmt.iter_mut() {
339        //     for stream in pmt.streams.iter_mut() {
340        //         if stream.pid == pid {
341        //             self.cur_pmt_index = pmt_index;
342        //             self.cur_stream_index = stream_index;
343
344        //             return Ok(());
345        //         }
346        //         stream_index += 1;
347        //     }
348        //     pmt_index += 1;
349        // }
350
351        Err(MpegTsError {
352            value: MpegTsErrorValue::StreamNotFound,
353        })
354    }
355
356    pub fn add_stream(&mut self, codecid: u8, extra_data: BytesMut) -> Result<u16, MpegTsError> {
357        if self.pat.pmt.is_empty() {
358            self.add_program(1, BytesMut::new())?;
359        }
360
361        self.pmt_add_stream(0, codecid, extra_data)
362    }
363
364    pub fn pmt_add_stream(
365        &mut self,
366        pmt_index: usize,
367        codecid: u8,
368        extra_data: BytesMut,
369    ) -> Result<u16, MpegTsError> {
370        let pmt = &mut self.pat.pmt[pmt_index];
371
372        if pmt.streams.len() == 4 {
373            return Err(MpegTsError {
374                value: MpegTsErrorValue::StreamCountExeceed,
375            });
376        }
377
378        let mut cur_stream = pes::Pes::new(); //&mut pmt.streams[pmt.stream_count];
379
380        cur_stream.codec_id = codecid;
381        cur_stream.pid = self.pid;
382        self.pid += 1;
383
384        if utils::is_steam_type_video(codecid) {
385            cur_stream.stream_id = epes_stream_id::PES_SID_VIDEO;
386        } else if utils::is_steam_type_audio(codecid) {
387            cur_stream.stream_id = epes_stream_id::PES_SID_AUDIO;
388        } else {
389            cur_stream.stream_id = epes_stream_id::PES_SID_PRIVATE_1;
390        }
391
392        if !extra_data.is_empty() {
393            cur_stream.esinfo.put(extra_data);
394        }
395
396        pmt.streams.push(cur_stream);
397        pmt.version_number = (pmt.version_number + 1) % 32;
398
399        self.reset();
400
401        Ok(self.pid - 1)
402    }
403
404    pub fn add_program(&mut self, program_number: u16, info: BytesMut) -> Result<(), MpegTsError> {
405        for cur_pmt in self.pat.pmt.iter() {
406            if cur_pmt.program_number == program_number {
407                return Err(MpegTsError {
408                    value: MpegTsErrorValue::ProgramNumberExists,
409                });
410            }
411        }
412
413        if self.pat.pmt.len() == 4 {
414            return Err(MpegTsError {
415                value: MpegTsErrorValue::PmtCountExeceed,
416            });
417        }
418        let mut cur_pmt = pmt::Pmt::new(); //&mut self.pat.pmt[self.pat.pmt_count];
419
420        cur_pmt.pid = self.pid;
421        self.pid += 1;
422        cur_pmt.program_number = program_number;
423        cur_pmt.version_number = 0x00;
424        cur_pmt.continuity_counter = 0;
425        cur_pmt.pcr_pid = 0x1FFF;
426
427        if !info.is_empty() {
428            cur_pmt.program_info.put(&info[..]);
429        }
430
431        self.pat.pmt.push(cur_pmt);
432
433        Ok(())
434    }
435}