mp4_stream/
lib.rs

1//! A fast and easy to use fMP4 streaming implementation.
2//!
3//! mp4-stream is an efficient and scalable implementation of fragmented MP4
4//! video streaming. It uses channels to separate video capture and encoding from
5//! MP4 muxing, making it possible to stream live video over multiple
6//! connections. It can also handle live configuration updates, which require
7//! restarting the individual streams, but the video capture worker does not
8//! have to be restarted.
9//!
10//! # Example
11//!
12//! ```rust,ignore
13//! use mp4_stream::{config::Config, VideoStream, stream_media_segments};
14//! use std::{fs, thread, io::Write};
15//!
16//! // Create a configuration
17//! let config = Config::default();
18//! let config_clone = config.clone();
19//! // Create a channel to send requests for video data on
20//! let (tx, rx) = flume::unbounded();
21//! // Start another thread to capture video and send it on the channel
22//! thread::spawn(move || {
23//!     stream_media_segments(rx, config_clone, None).unwrap();
24//! });
25//!
26//! let mut file = fs::File::create("video.mp4")?;
27//! // Create a stream from the channel
28//! let stream = VideoStream::new(&config, tx)?;
29//! // Take the first 10 segments and write them to a file
30//! for segment in stream.take(10) {
31//!     file.write_all(&segment?)?;
32//! }
33//! # Ok::<(), Box<dyn std::error::Error>>(())
34//! ```
35//!
36//! # Cargo Features
37//!
38//! - `tokio`: provides a [`Stream`](futures_core::Stream) implementation for the [`VideoStream`](crate::VideoStream)
39//!   type using Tokio's runtime.
40//! - `quickcheck`: Provides implementations of [`Arbitrary`](quickcheck::Arbitrary) for types in the
41//!   [`config`] module.
42//! - `serde`: Add implementations of [`Serialize`](serde::Serialize) and [`Deserialize`](serde::Deserialize)
43//!   for types in the [`config`] and [`capabilities`] modules.
44//! - `tracing`: Enable logging and instrumentation with the [`tracing`] crate.
45
46#![warn(clippy::unwrap_used)]
47#![warn(clippy::expect_used)]
48#![warn(clippy::todo)]
49#![warn(clippy::unimplemented)]
50#![warn(clippy::dbg_macro)]
51#![warn(missing_docs)]
52#![warn(clippy::missing_panics_doc)]
53#![warn(clippy::missing_errors_doc)]
54
55pub mod capabilities;
56pub mod config;
57
58use config::{Config, Format, Rotation};
59
60use bmff::*;
61use chrono::{Duration, Utc};
62use fixed::types::{I16F16, I8F8, U16F16};
63use flume::r#async::RecvStream;
64use futures_lite::stream::{self, Stream, StreamExt};
65use quick_error::quick_error;
66use rscam::Camera;
67use std::{
68    collections::HashMap,
69    io::{self, prelude::*},
70    sync::Arc,
71};
72
73quick_error! {
74    /// The error type for `mp4-stream`.
75    #[derive(Debug)]
76    #[non_exhaustive]
77    pub enum Error {
78        /// I/O error. This wraps an [`std::io::Error`].
79        Io(err: std::io::Error) {
80            source(err)
81            display("{}", err)
82            from()
83        }
84        /// Software encoding error. This wraps an [`x264::Error`], which carries
85        /// no additional information.
86        Encoding(err: x264::Error) {
87            display("Encoding error: {:?}", err)
88            from()
89        }
90        /// Camera or video capture error. This wraps an [`rscam::Error`].
91        Camera(err: rscam::Error) {
92            source(err)
93            display("{}", err)
94            from()
95        }
96        /// Another unspecified error.
97        Other(err: String) {
98            display("{}", err)
99            from()
100        }
101    }
102}
103
104/// A `Result` type alias for `mp4-stream`'s [`Error`] type.
105pub type Result<T> = std::result::Result<T, Error>;
106
107fn matrix(rotation: Rotation) -> [[fixed::types::I16F16; 3]; 3] {
108    match rotation {
109        Rotation::R0 => MATRIX_0,
110        Rotation::R90 => MATRIX_90,
111        Rotation::R180 => MATRIX_180,
112        Rotation::R270 => MATRIX_270,
113    }
114}
115
116#[derive(Debug, Clone)]
117struct InitSegment {
118    ftyp: FileTypeBox,
119    moov: MovieBox,
120}
121
122impl InitSegment {
123    fn size(&self) -> u64 {
124        self.ftyp.size() + self.moov.size()
125    }
126}
127
128impl WriteTo for InitSegment {
129    fn write_to(&self, mut w: impl Write) -> io::Result<()> {
130        write_to(&self.ftyp, &mut w)?;
131        write_to(&self.moov, &mut w)?;
132        Ok(())
133    }
134}
135
136impl InitSegment {
137    fn new(config: &Config) -> Self {
138        let sps = vec![
139            0x67, 0x64, 0x00, 0x1f, 0xac, 0xd9, 0x80, 0x50, 0x05, 0xbb, 0x01, 0x6a, 0x02, 0x02,
140            0x02, 0x80, 0x00, 0x00, 0x03, 0x00, 0x80, 0x00, 0x00, 0x1e, 0x07, 0x8c, 0x18, 0xcd,
141        ]; // TODO
142        let pps = vec![0x68, 0xe9, 0x7b, 0x2c, 0x8b]; // TODO
143        let timescale = config.interval.1;
144        let (width, height) = config.resolution;
145
146        let ftyp = FileTypeBox {
147            major_brand: *b"isom",
148            minor_version: 0,
149            compatible_brands: vec![*b"isom", *b"iso6", *b"iso2", *b"avc1", *b"mp41"],
150        };
151
152        let time = Utc::now();
153        let duration = Some(Duration::zero());
154
155        let moov = MovieBox {
156            mvhd: MovieHeaderBox {
157                creation_time: time,
158                modification_time: time,
159                timescale,
160                duration,
161                rate: I16F16::from_num(1),
162                volume: I8F8::from_num(1),
163                matrix: matrix(config.rotation),
164                next_track_id: 0,
165            },
166            trak: vec![TrackBox {
167                tkhd: TrackHeaderBox {
168                    flags: TrackHeaderFlags::TRACK_ENABLED
169                        | TrackHeaderFlags::TRACK_IN_MOVIE
170                        | TrackHeaderFlags::TRACK_IN_PREVIEW,
171                    creation_time: time,
172                    modification_time: time,
173                    track_id: 1,
174                    timescale,
175                    duration,
176                    layer: 0,
177                    alternate_group: 0,
178                    volume: I8F8::from_num(1),
179                    matrix: matrix(config.rotation),
180                    width: U16F16::from_num(width),
181                    height: U16F16::from_num(height),
182                },
183                tref: None,
184                edts: None,
185                mdia: MediaBox {
186                    mdhd: MediaHeaderBox {
187                        creation_time: time,
188                        modification_time: time,
189                        timescale,
190                        duration,
191                        language: *b"und",
192                    },
193                    hdlr: HandlerBox {
194                        handler_type: HandlerType::Video,
195                        name: "foo".to_string(), // TODO
196                    },
197                    minf: MediaInformationBox {
198                        media_header: MediaHeader::Video(VideoMediaHeaderBox {
199                            graphics_mode: GraphicsMode::Copy,
200                            opcolor: [0, 0, 0],
201                        }),
202                        dinf: DataInformationBox {
203                            dref: DataReferenceBox {
204                                data_entries: vec![DataEntry::Url(DataEntryUrlBox {
205                                    flags: DataEntryFlags::SELF_CONTAINED,
206                                    location: String::new(),
207                                })],
208                            },
209                        },
210                        stbl: SampleTableBox {
211                            stsd: SampleDescriptionBox {
212                                entries: vec![Box::new(AvcSampleEntry {
213                                    data_reference_index: 1,
214                                    width: width as u16,
215                                    height: height as u16,
216                                    horiz_resolution: U16F16::from_num(72),
217                                    vert_resolution: U16F16::from_num(72),
218                                    frame_count: 1,
219                                    depth: 0x0018,
220                                    avcc: AvcConfigurationBox {
221                                        configuration: AvcDecoderConfigurationRecord {
222                                            profile_idc: 0x64, // high
223                                            constraint_set_flag: 0x00,
224                                            level_idc: 0x1f, // 0x2a: 4.2 0b0010_1100
225                                            sequence_parameter_set: sps,
226                                            picture_parameter_set: pps,
227                                        },
228                                    },
229                                })],
230                            },
231                            stts: TimeToSampleBox { samples: vec![] },
232                            stsc: SampleToChunkBox { entries: vec![] },
233                            stsz: SampleSizeBox {
234                                sample_size: SampleSize::Different(vec![]),
235                            },
236                            stco: ChunkOffsetBox {
237                                chunk_offsets: vec![],
238                            },
239                        },
240                    },
241                },
242            }],
243            mvex: Some(MovieExtendsBox {
244                mehd: None,
245                trex: vec![TrackExtendsBox {
246                    track_id: 1,
247                    default_sample_description_index: 1,
248                    default_sample_duration: 0,
249                    default_sample_size: 0,
250                    default_sample_flags: DefaultSampleFlags::empty(),
251                }],
252            }),
253        };
254
255        Self { ftyp, moov }
256    }
257}
258
259/// An opaque type representing an fMP4 media segment.
260///
261/// It is passed between the streaming thread and [`VideoStream`]s.
262#[derive(Debug, Clone)]
263pub struct MediaSegment {
264    moof: MovieFragmentBox,
265    mdat: MediaDataBox,
266}
267
268impl MediaSegment {
269    #[cfg_attr(
270        feature = "tracing",
271        tracing::instrument(level = "trace", skip(sample_sizes, data))
272    )]
273    fn new(config: &Config, sequence_number: u32, sample_sizes: Vec<u32>, data: Vec<u8>) -> Self {
274        let timescale = config.interval.1;
275        let mut moof = MovieFragmentBox {
276            mfhd: MovieFragmentHeaderBox { sequence_number },
277            traf: vec![TrackFragmentBox {
278                tfhd: TrackFragmentHeaderBox {
279                    track_id: 1,
280                    base_data_offset: Some(0),
281                    sample_description_index: None,
282                    default_sample_duration: Some(
283                        timescale * config.interval.0 / config.interval.1,
284                    ),
285                    default_sample_size: None,
286                    default_sample_flags: {
287                        #[allow(clippy::unwrap_used)] // infallible
288                        Some(DefaultSampleFlags::from_bits(0x0101_0000).unwrap())
289                    }, // not I-frame
290                },
291                trun: vec![TrackFragmentRunBox {
292                    data_offset: Some(0),
293                    first_sample_flags: Some(0x0200_0000), // I-frame
294                    sample_durations: None,
295                    sample_sizes: Some(sample_sizes),
296                    sample_flags: None,
297                    sample_composition_time_offsets: None,
298                }],
299            }],
300        };
301
302        moof.traf[0].trun[0].data_offset = Some(moof.size() as i32 + 8);
303
304        Self {
305            moof,
306            mdat: MediaDataBox {
307                headers: None,
308                data: Arc::new(data),
309            },
310        }
311    }
312
313    fn size(&self) -> u64 {
314        self.moof.size() + self.mdat.size()
315    }
316
317    fn base_data_offset(&mut self) -> &mut Option<u64> {
318        &mut self.moof.traf[0].tfhd.base_data_offset
319    }
320
321    fn sequence_number(&mut self) -> &mut u32 {
322        &mut self.moof.mfhd.sequence_number
323    }
324
325    fn add_headers(&mut self, headers: Vec<u8>) {
326        // MediaSegments constructed with `new` should always have sample_sizes
327        #[allow(clippy::unwrap_used)]
328        {
329            self.moof.traf[0].trun[0].sample_sizes.as_mut().unwrap()[0] += headers.len() as u32;
330        }
331        self.mdat.headers = Some(headers);
332    }
333}
334
335impl WriteTo for MediaSegment {
336    fn write_to(&self, mut w: impl Write) -> io::Result<()> {
337        write_to(&self.moof, &mut w)?;
338        write_to(&self.mdat, &mut w)?;
339        Ok(())
340    }
341}
342
343/// Creates a new video stream.
344///
345/// # Errors
346///
347/// This function may return an [`Error::Other`] if all receivers on
348/// `stream_sub_tx` have ben dropped.
349#[allow(clippy::missing_panics_doc)]
350#[cfg_attr(
351    feature = "tracing",
352    tracing::instrument(level = "debug", skip(stream_sub_tx))
353)]
354pub async fn stream(
355    config: &Config,
356    stream_sub_tx: flume::Sender<StreamSubscriber>,
357) -> Result<impl Stream<Item = io::Result<Vec<u8>>>> {
358    struct StreamState {
359        init_segment: Option<InitSegment>,
360        size: u64,
361        sequence_number: u32,
362        segment_stream: RecvStream<'static, MediaSegment>,
363        headers: Option<Vec<u8>>,
364    }
365
366    let (tx, rx) = flume::unbounded();
367    stream_sub_tx
368        .send_async(tx)
369        .await
370        .map_err(|_| "Failed to communicate with streaming task".to_string())?;
371    // if the send succeeds, the other side will respond immediately
372    #[allow(clippy::unwrap_used)]
373    let (headers, segment_rx) = rx.recv_async().await.unwrap();
374
375    let init_segment = InitSegment::new(config);
376    let state = StreamState {
377        size: init_segment.size(),
378        init_segment: Some(init_segment),
379        sequence_number: 1,
380        segment_stream: segment_rx.into_stream(),
381        headers: Some(headers),
382    };
383
384    Ok(stream::try_unfold(state, |mut state| async move {
385        if let Some(init_segment) = state.init_segment.take() {
386            let mut buf = Vec::with_capacity(init_segment.size() as usize);
387            init_segment.write_to(&mut buf)?;
388            return Ok(Some((buf, state)));
389        }
390
391        let Some(mut segment) = state.segment_stream.next().await else {
392            #[cfg(feature = "tracing")]
393            tracing::trace!("VideoStream ended");
394            return Ok(None);
395        };
396
397        if let Some(headers) = state.headers.take() {
398            segment.add_headers(headers);
399        }
400        *segment.base_data_offset() = Some(state.size);
401        *segment.sequence_number() = state.sequence_number;
402        state.sequence_number += 1;
403        let size = segment.size();
404        state.size += size;
405
406        let mut buf = Vec::with_capacity(size as usize);
407        segment.write_to(&mut buf)?;
408
409        #[cfg(feature = "tracing")]
410        tracing::trace!(
411            "VideoStream sent media segment with sequence number {}",
412            state.sequence_number - 1
413        );
414
415        Ok(Some((buf, state)))
416    }))
417}
418
419struct FrameIter {
420    camera: Camera,
421}
422
423impl FrameIter {
424    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace"))]
425    fn new(config: &Config) -> Result<Self> {
426        let mut camera = Camera::new(
427            config
428                .device
429                .as_os_str()
430                .to_str()
431                .ok_or_else(|| "failed to convert device path to string".to_string())?,
432        )?;
433
434        let controls: HashMap<String, u32> = camera
435            .controls()
436            .filter_map(|x| x.ok())
437            .map(|ctl| (ctl.name, ctl.id))
438            .collect();
439
440        for (name, val) in &config.v4l2_controls {
441            if let Some(id) = controls.get(name) {
442                camera.set_control(*id, val).unwrap_or(()); // ignore failure
443            } else {
444                #[cfg(feature = "tracing")]
445                tracing::warn!("Couldn't find control {}", name);
446            }
447        }
448
449        camera.start(&rscam::Config {
450            interval: config.interval,
451            resolution: config.resolution,
452            format: &<[u8; 4]>::from(config.format),
453            ..Default::default()
454        })?;
455
456        Ok(Self { camera })
457    }
458}
459
460impl Iterator for FrameIter {
461    type Item = std::io::Result<rscam::Frame>;
462
463    fn next(&mut self) -> Option<Self::Item> {
464        Some(self.camera.capture())
465    }
466}
467
468enum SegmentIter {
469    Software {
470        config: Config,
471        encoder: x264::Encoder,
472        timestamp: i64,
473        timescale: u32,
474        frames: FrameIter,
475    },
476    Hardware {
477        config: Config,
478        frames: FrameIter,
479    },
480}
481
482impl SegmentIter {
483    #[cfg_attr(
484        feature = "tracing",
485        tracing::instrument(level = "trace", skip(frames))
486    )]
487    fn new(config: Config, frames: FrameIter) -> x264::Result<Self> {
488        Ok(match config.format {
489            Format::H264 => Self::Hardware { frames, config },
490            format => Self::Software {
491                timescale: config.interval.1,
492                encoder: {
493                    let timescale = config.interval.1;
494                    let bitrate = 896_000;
495                    let colorspace = match format {
496                        Format::H264 => unreachable!(),
497                        Format::BGR3 => x264::Colorspace::BGR,
498                        Format::RGB3 => x264::Colorspace::RGB,
499                        Format::YUYV => x264::Colorspace::YUYV,
500                        Format::YV12 => x264::Colorspace::YV12,
501                    };
502                    let encoding = x264::Encoding::from(colorspace);
503
504                    x264::Setup::preset(x264::Preset::Superfast, x264::Tune::None, false, true)
505                        .fps(config.interval.0, config.interval.1)
506                        .timebase(1, timescale)
507                        .bitrate(bitrate)
508                        .high()
509                        .annexb(false)
510                        .max_keyframe_interval(60)
511                        .scenecut_threshold(0)
512                        .build(
513                            encoding,
514                            config.resolution.0 as i32,
515                            config.resolution.1 as i32,
516                        )?
517                },
518                timestamp: 0,
519                config,
520                frames,
521            },
522        })
523    }
524
525    fn get_headers(&mut self) -> x264::Result<Vec<u8>> {
526        Ok(match self {
527            Self::Software { encoder, .. } => encoder.headers()?.entirety().to_vec(),
528            Self::Hardware { .. } => Vec::new(),
529        })
530    }
531}
532
533impl Iterator for SegmentIter {
534    type Item = Result<MediaSegment>;
535
536    #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
537    fn next(&mut self) -> Option<Self::Item> {
538        match self {
539            Self::Software {
540                config,
541                encoder,
542                timestamp,
543                timescale,
544                frames,
545            } => {
546                let mut sample_sizes = vec![];
547                let mut buf = vec![];
548
549                for _ in 0..60 {
550                    let frame = match frames.next() {
551                        Some(Ok(f)) => f,
552                        Some(Err(e)) => {
553                            #[cfg(feature = "tracing")]
554                            tracing::warn!("Capturing frame failed with error {:?}", e);
555                            return Some(Err(e.into()));
556                        }
557                        None => unreachable!(),
558                    };
559
560                    let image = x264::Image::new(
561                        x264::Colorspace::YUYV,
562                        config.resolution.0 as i32,
563                        config.resolution.1 as i32,
564                        &[x264::Plane {
565                            stride: config.resolution.0 as i32 * 2,
566                            data: &frame,
567                        }],
568                    );
569
570                    let (data, _) = match encoder.encode(*timestamp, image) {
571                        Ok(x) => x,
572                        Err(e) => {
573                            #[cfg(feature = "tracing")]
574                            tracing::warn!("Encoding frame failed with error {:?}", e);
575                            return Some(Err(e.into()));
576                        }
577                    };
578
579                    sample_sizes.push(data.entirety().len() as u32);
580                    buf.extend_from_slice(data.entirety());
581                    *timestamp +=
582                        *timescale as i64 * config.interval.0 as i64 / config.interval.1 as i64;
583                }
584
585                Some(Ok(MediaSegment::new(config, 0, sample_sizes, buf)))
586            }
587            Self::Hardware { frames, config } => {
588                let mut sample_sizes = Vec::new();
589                let mut buf = Vec::new();
590                for _ in 0..60 {
591                    let frame = match frames.next() {
592                        Some(Ok(f)) => f,
593                        Some(Err(e)) => {
594                            #[cfg(feature = "tracing")]
595                            tracing::warn!("Capturing frame failed with error {:?}", e);
596                            return Some(Err(e.into()));
597                        }
598                        None => unreachable!(),
599                    };
600                    sample_sizes.push(frame.len() as u32);
601                    buf.extend_from_slice(&frame);
602                }
603                Some(Ok(MediaSegment::new(config, 0, sample_sizes, buf)))
604            }
605        }
606    }
607}
608
609/// A channel receiver for [`MediaSegment`]s.
610///
611/// `None` is a marker indicating that the config has changed and the stream
612/// has restarted.
613pub type MediaSegReceiver = flume::Receiver<MediaSegment>;
614
615/// A channel for adding a subscriber to the stream.
616///
617/// The main capture and encoding thread will receive these and respond with a
618/// tuple of a [`MediaSegReceiver`] and the H264 headers for the stream.
619pub type StreamSubscriber = flume::Sender<(Vec<u8>, MediaSegReceiver)>;
620
621/// Start capturing video.
622///
623/// The optional `config_rx` parameter can be used to send configuration updates. The
624/// function will send `None` to all subscribed channels to indicate that the config has
625/// changed and then restart the stream with the new config.
626///
627/// This function may block indefinitely, and should be called in its own thread
628/// or with Tokio's [`spawn_blocking`](tokio::task::spawn_blocking) function or similar.
629///
630/// # Errors
631///
632/// This function may return an [`Error::Camera`] if interacting with the provided camera
633/// device fails, an [`Error::Other`] if the device path is invalid UTF-8, or an
634/// [`Error::Encoding`] if constructing an encoder fails.
635#[allow(clippy::missing_panics_doc)]
636#[cfg_attr(
637    feature = "tracing",
638    tracing::instrument(level = "debug", skip(rx, config_rx))
639)]
640pub fn stream_media_segments(
641    rx: flume::Receiver<StreamSubscriber>,
642    mut config: Config,
643    config_rx: Option<flume::Receiver<Config>>,
644) -> Result<std::convert::Infallible> {
645    'main: loop {
646        #[cfg(feature = "tracing")]
647        tracing::trace!("Starting stream with config {:?}", config);
648        let mut senders: Vec<flume::Sender<MediaSegment>> = Vec::new();
649
650        let frames = FrameIter::new(&config)?;
651        let mut segments = SegmentIter::new(config.clone(), frames)?;
652        let headers = segments.get_headers()?;
653
654        loop {
655            if let Some(Ok(new_config)) = config_rx.as_ref().map(flume::Receiver::try_recv) {
656                config = new_config;
657                senders.clear();
658                #[cfg(feature = "tracing")]
659                tracing::trace!("Config updated to {:?}, restarting stream", config);
660                continue 'main;
661            }
662            if let Ok(sender) = rx.try_recv() {
663                let (tx, rx) = flume::unbounded();
664                senders.push(tx);
665                sender.send((headers.clone(), rx)).unwrap_or(());
666            }
667
668            #[cfg(feature = "tracing")]
669            let time = std::time::Instant::now();
670            #[allow(clippy::unwrap_used)] // the iterator never returns `None`
671            let Ok(media_segment) = segments.next().unwrap() else {
672                break;
673            };
674            senders.retain(|sender| sender.send(media_segment.clone()).is_ok());
675            #[cfg(feature = "tracing")]
676            tracing::trace!("Sent media segment, took {:?} to capture", time.elapsed());
677        }
678    }
679}