1use std::collections::VecDeque;
8use std::time::Duration;
9
10use muldiv::MulDiv;
11
12use log::trace;
13
14use crate::{Cea608, DTVCCPacket, Framerate};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
18pub enum WriterError {
19 #[error("Writing the data would overflow by {0} bytes")]
21 WouldOverflow(usize),
22 #[error("The resource is not writable")]
24 ReadOnly,
25 #[error("Empty service was attempted to be written")]
27 EmptyService,
28}
29
30#[derive(Debug, Default)]
32pub struct CCDataWriter {
33 output_cea608_padding: bool,
35 output_padding: bool,
36 packets: VecDeque<DTVCCPacket>,
38 pending_packet_data: Vec<u8>,
40 cea608_1: VecDeque<(u8, u8)>,
41 cea608_2: VecDeque<(u8, u8)>,
42 last_cea608_was_field1: bool,
43}
44
45impl CCDataWriter {
46 pub fn set_output_cea608_padding(&mut self, output_cea608_padding: bool) {
48 self.output_cea608_padding = output_cea608_padding;
49 }
50
51 pub fn output_cea608_padding(&self) -> bool {
53 self.output_cea608_padding
54 }
55
56 pub fn set_output_padding(&mut self, output_padding: bool) {
58 self.output_padding = output_padding;
59 }
60
61 pub fn output_padding(&self) -> bool {
63 self.output_padding
64 }
65
66 pub fn push_packet(&mut self, packet: DTVCCPacket) {
68 self.packets.push_front(packet)
69 }
70
71 pub fn push_cea608(&mut self, cea608: Cea608) {
73 match cea608 {
74 Cea608::Field1(byte0, byte1) => {
75 if byte0 != 0x80 || byte1 != 0x80 {
76 self.cea608_1.push_front((byte0, byte1))
77 }
78 }
79 Cea608::Field2(byte0, byte1) => {
80 if byte0 != 0x80 || byte1 != 0x80 {
81 self.cea608_2.push_front((byte0, byte1))
82 }
83 }
84 }
85 }
86
87 pub fn flush(&mut self) {
89 self.packets.clear();
90 self.pending_packet_data.clear();
91 self.cea608_1.clear();
92 self.cea608_2.clear();
93 }
94
95 pub fn buffered_cea608_field1_duration(&self) -> Duration {
97 Duration::from_micros(
99 (self.cea608_1.len() as u64)
100 .mul_div_ceil(1001 * 1_000_000, 60000)
101 .unwrap(),
102 )
103 }
104
105 pub fn buffered_cea608_field2_duration(&self) -> Duration {
107 Duration::from_micros(
109 (self.cea608_2.len() as u64)
110 .mul_div_ceil(1001 * 1_000_000, 60000)
111 .unwrap(),
112 )
113 }
114
115 fn buffered_packet_bytes(&self) -> usize {
116 self.pending_packet_data.len()
117 + self
118 .packets
119 .iter()
120 .map(|packet| packet.len())
121 .sum::<usize>()
122 }
123
124 pub fn buffered_packet_duration(&self) -> Duration {
126 Duration::from_micros(
128 ((self.buffered_packet_bytes() + 1) as u64 / 2)
129 .mul_div_ceil(2 * 1001 * 1_000_000, 9_600_000 / 8)
130 .unwrap(),
131 )
132 }
133
134 pub fn write<W: std::io::Write>(
137 &mut self,
138 framerate: Framerate,
139 w: &mut W,
140 ) -> Result<(), std::io::Error> {
141 let mut cea608_pair_rem = if self.output_cea608_padding {
142 framerate.cea608_pairs_per_frame()
143 } else {
144 framerate
145 .cea608_pairs_per_frame()
146 .min(self.cea608_1.len().max(self.cea608_2.len() * 2))
147 };
148
149 let mut cc_count_rem = if self.output_padding {
150 framerate.max_cc_count()
151 } else {
152 framerate.max_cc_count().min(
153 cea608_pair_rem
154 + self.pending_packet_data.len() / 3
155 + self.packets.iter().map(|p| p.cc_count()).sum::<usize>(),
156 )
157 };
158 trace!("writing with cc_count: {cc_count_rem} and {cea608_pair_rem} cea608 pairs");
159
160 let reserved = 0x80;
161 let process_cc_flag = 0x40;
162 w.write_all(&[
163 reserved | process_cc_flag | (cc_count_rem & 0x1f) as u8,
164 0xFF,
165 ])?;
166 while cc_count_rem > 0 {
167 if cea608_pair_rem > 0 {
168 if !self.last_cea608_was_field1 {
169 trace!("attempting to write a cea608 byte pair from field 1");
170 if let Some((byte0, byte1)) = self.cea608_1.pop_back() {
171 w.write_all(&[0xFC, byte0, byte1])?;
172 cc_count_rem -= 1;
173 } else if !self.cea608_2.is_empty() {
174 w.write_all(&[0xFC, 0x80, 0x80])?;
176 cc_count_rem -= 1;
177 } else if self.output_cea608_padding {
178 w.write_all(&[0xF8, 0x80, 0x80])?;
179 cc_count_rem -= 1;
180 }
181 self.last_cea608_was_field1 = true;
182 } else {
183 trace!("attempting to write a cea608 byte pair from field 2");
184 if let Some((byte0, byte1)) = self.cea608_2.pop_back() {
185 w.write_all(&[0xFD, byte0, byte1])?;
186 cc_count_rem -= 1;
187 } else if self.output_cea608_padding {
188 w.write_all(&[0xF9, 0x80, 0x80])?;
189 cc_count_rem -= 1;
190 }
191 self.last_cea608_was_field1 = false;
192 }
193 cea608_pair_rem -= 1;
194 } else {
195 let mut current_packet_data = &mut self.pending_packet_data;
196 let mut packet_offset = 0;
197 while packet_offset >= current_packet_data.len() {
198 if let Some(packet) = self.packets.pop_back() {
199 trace!("starting packet {packet:?}");
200 packet.write_as_cc_data(&mut current_packet_data)?;
201 } else {
202 trace!("no packet to write");
203 break;
204 }
205 }
206
207 trace!("cea708 pending data length {}", current_packet_data.len(),);
208
209 while packet_offset < current_packet_data.len() && cc_count_rem > 0 {
210 assert!(current_packet_data.len() >= packet_offset + 3);
211 w.write_all(¤t_packet_data[packet_offset..packet_offset + 3])?;
212 packet_offset += 3;
213 cc_count_rem -= 1;
214 }
215
216 self.pending_packet_data = current_packet_data[packet_offset..].to_vec();
217
218 if self.packets.is_empty() && self.pending_packet_data.is_empty() {
219 if self.output_padding {
221 trace!("writing {cc_count_rem} padding bytes");
222 while cc_count_rem > 0 {
223 w.write_all(&[0xFA, 0x00, 0x00])?;
224 cc_count_rem -= 1;
225 }
226 }
227 break;
228 }
229 }
230 }
231 Ok(())
232 }
233}