libfreemkv/mux/
network.rs1use super::meta;
10use crate::disc::DiscTitle;
11use std::io::{self, BufReader, BufWriter, Write};
12use std::net::{TcpListener, TcpStream};
13
14const NET_BUF_SIZE: usize = 256 * 1024;
16
17enum Mode {
18 Write {
19 writer: BufWriter<TcpStream>,
20 header_written: bool,
21 },
22 Read {
23 reader: BufReader<TcpStream>,
24 },
25}
26
27pub struct NetworkStream {
29 disc_title: DiscTitle,
30 mode: Mode,
31}
32
33impl NetworkStream {
34 pub fn connect(addr: &str) -> io::Result<Self> {
37 let stream = TcpStream::connect(addr)?;
38 Ok(Self {
39 disc_title: DiscTitle::empty(),
40 mode: Mode::Write {
41 writer: BufWriter::with_capacity(NET_BUF_SIZE, stream),
42 header_written: false,
43 },
44 })
45 }
46
47 pub fn meta(mut self, dt: &DiscTitle) -> Self {
49 self.disc_title = dt.clone();
50 self
51 }
52
53 pub fn listen(addr: &str) -> io::Result<Self> {
56 let listener = TcpListener::bind(addr)?;
57 let (stream, _peer) = listener.accept()?;
58 stream.set_nodelay(true)?;
59 let mut reader = BufReader::with_capacity(NET_BUF_SIZE, stream);
60
61 let disc_title = meta::read_header(&mut reader)?
63 .ok_or_else(|| -> io::Error { crate::error::Error::NoMetadata.into() })?
64 .to_title();
65
66 Ok(Self {
67 disc_title,
68 mode: Mode::Read { reader },
69 })
70 }
71}
72
73impl crate::pes::Stream for NetworkStream {
74 fn read(&mut self) -> io::Result<Option<crate::pes::PesFrame>> {
75 match &mut self.mode {
76 Mode::Read { reader } => crate::pes::PesFrame::deserialize(reader),
77 _ => Err(crate::error::Error::StreamWriteOnly.into()),
78 }
79 }
80 fn write(&mut self, frame: &crate::pes::PesFrame) -> io::Result<()> {
81 match &mut self.mode {
82 Mode::Write {
83 writer,
84 header_written,
85 ..
86 } => {
87 if !*header_written {
88 if !self.disc_title.streams.is_empty() {
89 let m = meta::M2tsMeta::from_title(&self.disc_title);
90 meta::write_header(&mut *writer, &m)?;
91 }
92 *header_written = true;
93 }
94 frame.serialize(writer)
95 }
96 _ => Err(crate::error::Error::StreamReadOnly.into()),
97 }
98 }
99 fn finish(&mut self) -> io::Result<()> {
100 if let Mode::Write { writer, .. } = &mut self.mode {
101 writer.flush()?;
102 writer.get_ref().shutdown(std::net::Shutdown::Write)?;
103 }
104 Ok(())
105 }
106 fn info(&self) -> &DiscTitle {
107 &self.disc_title
108 }
109}
110
111#[cfg(test)]
114mod tests {
115 use super::*;
116 use crate::disc::{
117 AudioChannels, AudioStream, Codec, ColorSpace, ContentFormat, FrameRate, HdrFormat,
118 Resolution, SampleRate, Stream, VideoStream,
119 };
120 use std::net::TcpListener;
121
122 fn sample_title() -> DiscTitle {
123 DiscTitle {
124 playlist: "NetworkTest".into(),
125 playlist_id: 1,
126 duration_secs: 3600.0,
127 size_bytes: 0,
128 clips: Vec::new(),
129 streams: vec![
130 Stream::Video(VideoStream {
131 pid: 0x1011,
132 codec: Codec::Hevc,
133 resolution: Resolution::R2160p,
134 frame_rate: FrameRate::F23_976,
135 hdr: HdrFormat::Hdr10,
136 color_space: ColorSpace::Bt2020,
137 secondary: false,
138 label: "Main".into(),
139 }),
140 Stream::Audio(AudioStream {
141 pid: 0x1100,
142 codec: Codec::TrueHd,
143 channels: AudioChannels::Surround71,
144 language: "eng".into(),
145 sample_rate: SampleRate::S48,
146 secondary: false,
147 purpose: crate::disc::LabelPurpose::Normal,
148 label: "English".into(),
149 }),
150 ],
151 chapters: Vec::new(),
152 extents: Vec::new(),
153 content_format: ContentFormat::BdTs,
154 codec_privates: Vec::new(),
155 }
156 }
157
158 #[test]
159 #[ignore] fn network_pes_roundtrip() {
161 use crate::pes;
162
163 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
164 let port = listener.local_addr().unwrap().port();
165 drop(listener);
166
167 let addr = format!("127.0.0.1:{}", port);
168 let addr_clone = addr.clone();
169
170 let handle = std::thread::spawn(move || {
171 let mut ns = NetworkStream::listen(&addr_clone).unwrap();
172 let info = pes::Stream::info(&ns).clone();
173 let mut frames = Vec::new();
174 while let Ok(Some(f)) = pes::Stream::read(&mut ns) {
175 frames.push(f);
176 }
177 (info, frames)
178 });
179
180 std::thread::sleep(std::time::Duration::from_millis(50));
181
182 let dt = sample_title();
183 let mut writer = NetworkStream::connect(&addr).unwrap().meta(&dt);
184 let frame = pes::PesFrame {
185 track: 0,
186 pts: 90000,
187 keyframe: true,
188 data: vec![0x47; 192],
189 };
190 pes::Stream::write(&mut writer, &frame).unwrap();
191 pes::Stream::finish(&mut writer).unwrap();
192
193 let (info, frames) = handle.join().unwrap();
194 assert_eq!(info.playlist, "NetworkTest");
195 assert_eq!(info.streams.len(), 2);
196 assert_eq!(frames.len(), 1);
197 assert_eq!(frames[0].track, 0);
198 assert_eq!(frames[0].pts, 90000);
199 }
200
201 #[test]
202 fn network_empty_addr_errors() {
203 let result = NetworkStream::connect("");
204 assert!(result.is_err());
205 }
206
207 #[test]
208 fn network_no_port_errors() {
209 let result = NetworkStream::connect("127.0.0.1");
210 assert!(result.is_err());
211 }
212}