Skip to main content

libfreemkv/mux/
network.rs

1//! NetworkStream — PES frames over TCP with embedded metadata.
2//!
3//! **Security:** Data is transmitted over plain TCP with no encryption.
4//! Use only on trusted networks (LAN).
5//!
6//! Write side (sender): connects to a listener, sends FMKV header + PES frames.
7//! Read side (receiver): listens for a connection, reads FMKV header + PES frames.
8
9use super::meta;
10use crate::disc::DiscTitle;
11use std::io::{self, BufReader, BufWriter, Write};
12use std::net::{TcpListener, TcpStream};
13
14/// I/O buffer size for network reads/writes.
15const NET_BUF_SIZE: usize = 256 * 1024;
16
17enum Mode {
18    Write {
19        writer: BufWriter<TcpStream>,
20        header_written: bool,
21    },
22    Read {
23        reader: BufReader<TcpStream>,
24    },
25}
26
27/// TCP network stream for distributed rip/remux.
28pub struct NetworkStream {
29    disc_title: DiscTitle,
30    mode: Mode,
31}
32
33impl NetworkStream {
34    /// Connect to a remote listener for writing.
35    /// Sends FMKV metadata header on first write.
36    pub fn connect(addr: &str) -> io::Result<Self> {
37        let stream = TcpStream::connect(addr)?;
38        Ok(Self {
39            disc_title: DiscTitle::empty(),
40            mode: Mode::Write {
41                writer: BufWriter::with_capacity(NET_BUF_SIZE, stream),
42                header_written: false,
43            },
44        })
45    }
46
47    /// Set stream metadata (for write side). Returns self for chaining.
48    pub fn meta(mut self, dt: &DiscTitle) -> Self {
49        self.disc_title = dt.clone();
50        self
51    }
52
53    /// Listen for an incoming connection and read from it.
54    /// Extracts FMKV metadata header from the sender.
55    pub fn listen(addr: &str) -> io::Result<Self> {
56        let listener = TcpListener::bind(addr)?;
57        let (stream, _peer) = listener.accept()?;
58        stream.set_nodelay(true)?;
59        let mut reader = BufReader::with_capacity(NET_BUF_SIZE, stream);
60
61        // Read FMKV metadata header
62        let disc_title = meta::read_header(&mut reader)?
63            .ok_or_else(|| -> io::Error { crate::error::Error::NoMetadata.into() })?
64            .to_title();
65
66        Ok(Self {
67            disc_title,
68            mode: Mode::Read { reader },
69        })
70    }
71}
72
73impl crate::pes::Stream for NetworkStream {
74    fn read(&mut self) -> io::Result<Option<crate::pes::PesFrame>> {
75        match &mut self.mode {
76            Mode::Read { reader } => crate::pes::PesFrame::deserialize(reader),
77            _ => Err(crate::error::Error::StreamWriteOnly.into()),
78        }
79    }
80    fn write(&mut self, frame: &crate::pes::PesFrame) -> io::Result<()> {
81        match &mut self.mode {
82            Mode::Write {
83                writer,
84                header_written,
85                ..
86            } => {
87                if !*header_written {
88                    if !self.disc_title.streams.is_empty() {
89                        let m = meta::M2tsMeta::from_title(&self.disc_title);
90                        meta::write_header(&mut *writer, &m)?;
91                    }
92                    *header_written = true;
93                }
94                frame.serialize(writer)
95            }
96            _ => Err(crate::error::Error::StreamReadOnly.into()),
97        }
98    }
99    fn finish(&mut self) -> io::Result<()> {
100        if let Mode::Write { writer, .. } = &mut self.mode {
101            writer.flush()?;
102            writer.get_ref().shutdown(std::net::Shutdown::Write)?;
103        }
104        Ok(())
105    }
106    fn info(&self) -> &DiscTitle {
107        &self.disc_title
108    }
109}
110
111// NetworkStream is PES-only — no IOStream/Read/Write byte interface.
112
113#[cfg(test)]
114mod tests {
115    use super::*;
116    use crate::disc::{
117        AudioChannels, AudioStream, Codec, ColorSpace, ContentFormat, FrameRate, HdrFormat,
118        Resolution, SampleRate, Stream, VideoStream,
119    };
120    use std::net::TcpListener;
121
122    fn sample_title() -> DiscTitle {
123        DiscTitle {
124            playlist: "NetworkTest".into(),
125            playlist_id: 1,
126            duration_secs: 3600.0,
127            size_bytes: 0,
128            clips: Vec::new(),
129            streams: vec![
130                Stream::Video(VideoStream {
131                    pid: 0x1011,
132                    codec: Codec::Hevc,
133                    resolution: Resolution::R2160p,
134                    frame_rate: FrameRate::F23_976,
135                    hdr: HdrFormat::Hdr10,
136                    color_space: ColorSpace::Bt2020,
137                    secondary: false,
138                    label: "Main".into(),
139                }),
140                Stream::Audio(AudioStream {
141                    pid: 0x1100,
142                    codec: Codec::TrueHd,
143                    channels: AudioChannels::Surround71,
144                    language: "eng".into(),
145                    sample_rate: SampleRate::S48,
146                    secondary: false,
147                    purpose: crate::disc::LabelPurpose::Normal,
148                    label: "English".into(),
149                }),
150            ],
151            chapters: Vec::new(),
152            extents: Vec::new(),
153            content_format: ContentFormat::BdTs,
154            codec_privates: Vec::new(),
155        }
156    }
157
158    #[test]
159    #[ignore] // Requires TCP; may be flaky in CI environments
160    fn network_pes_roundtrip() {
161        use crate::pes;
162
163        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
164        let port = listener.local_addr().unwrap().port();
165        drop(listener);
166
167        let addr = format!("127.0.0.1:{}", port);
168        let addr_clone = addr.clone();
169
170        let handle = std::thread::spawn(move || {
171            let mut ns = NetworkStream::listen(&addr_clone).unwrap();
172            let info = pes::Stream::info(&ns).clone();
173            let mut frames = Vec::new();
174            while let Ok(Some(f)) = pes::Stream::read(&mut ns) {
175                frames.push(f);
176            }
177            (info, frames)
178        });
179
180        std::thread::sleep(std::time::Duration::from_millis(50));
181
182        let dt = sample_title();
183        let mut writer = NetworkStream::connect(&addr).unwrap().meta(&dt);
184        let frame = pes::PesFrame {
185            track: 0,
186            pts: 90000,
187            keyframe: true,
188            data: vec![0x47; 192],
189        };
190        pes::Stream::write(&mut writer, &frame).unwrap();
191        pes::Stream::finish(&mut writer).unwrap();
192
193        let (info, frames) = handle.join().unwrap();
194        assert_eq!(info.playlist, "NetworkTest");
195        assert_eq!(info.streams.len(), 2);
196        assert_eq!(frames.len(), 1);
197        assert_eq!(frames[0].track, 0);
198        assert_eq!(frames[0].pts, 90000);
199    }
200
201    #[test]
202    fn network_empty_addr_errors() {
203        let result = NetworkStream::connect("");
204        assert!(result.is_err());
205    }
206
207    #[test]
208    fn network_no_port_errors() {
209        let result = NetworkStream::connect("127.0.0.1");
210        assert!(result.is_err());
211    }
212}