1#![warn(clippy::unwrap_used)]
47#![warn(clippy::expect_used)]
48#![warn(clippy::todo)]
49#![warn(clippy::unimplemented)]
50#![warn(clippy::dbg_macro)]
51#![warn(missing_docs)]
52#![warn(clippy::missing_panics_doc)]
53#![warn(clippy::missing_errors_doc)]
54
55pub mod capabilities;
56pub mod config;
57
58use config::{Config, Format, Rotation};
59
60use bmff::*;
61use chrono::{Duration, Utc};
62use fixed::types::{I16F16, I8F8, U16F16};
63use flume::r#async::RecvStream;
64use futures_lite::stream::{self, Stream, StreamExt};
65use quick_error::quick_error;
66use rscam::Camera;
67use std::{
68 collections::HashMap,
69 io::{self, prelude::*},
70 sync::Arc,
71};
72
73quick_error! {
74 #[derive(Debug)]
76 #[non_exhaustive]
77 pub enum Error {
78 Io(err: std::io::Error) {
80 source(err)
81 display("{}", err)
82 from()
83 }
84 Encoding(err: x264::Error) {
87 display("Encoding error: {:?}", err)
88 from()
89 }
90 Camera(err: rscam::Error) {
92 source(err)
93 display("{}", err)
94 from()
95 }
96 Other(err: String) {
98 display("{}", err)
99 from()
100 }
101 }
102}
103
104pub type Result<T> = std::result::Result<T, Error>;
106
107fn matrix(rotation: Rotation) -> [[fixed::types::I16F16; 3]; 3] {
108 match rotation {
109 Rotation::R0 => MATRIX_0,
110 Rotation::R90 => MATRIX_90,
111 Rotation::R180 => MATRIX_180,
112 Rotation::R270 => MATRIX_270,
113 }
114}
115
116#[derive(Debug, Clone)]
117struct InitSegment {
118 ftyp: FileTypeBox,
119 moov: MovieBox,
120}
121
122impl InitSegment {
123 fn size(&self) -> u64 {
124 self.ftyp.size() + self.moov.size()
125 }
126}
127
128impl WriteTo for InitSegment {
129 fn write_to(&self, mut w: impl Write) -> io::Result<()> {
130 write_to(&self.ftyp, &mut w)?;
131 write_to(&self.moov, &mut w)?;
132 Ok(())
133 }
134}
135
136impl InitSegment {
137 fn new(config: &Config) -> Self {
138 let sps = vec![
139 0x67, 0x64, 0x00, 0x1f, 0xac, 0xd9, 0x80, 0x50, 0x05, 0xbb, 0x01, 0x6a, 0x02, 0x02,
140 0x02, 0x80, 0x00, 0x00, 0x03, 0x00, 0x80, 0x00, 0x00, 0x1e, 0x07, 0x8c, 0x18, 0xcd,
141 ]; let pps = vec![0x68, 0xe9, 0x7b, 0x2c, 0x8b]; let timescale = config.interval.1;
144 let (width, height) = config.resolution;
145
146 let ftyp = FileTypeBox {
147 major_brand: *b"isom",
148 minor_version: 0,
149 compatible_brands: vec![*b"isom", *b"iso6", *b"iso2", *b"avc1", *b"mp41"],
150 };
151
152 let time = Utc::now();
153 let duration = Some(Duration::zero());
154
155 let moov = MovieBox {
156 mvhd: MovieHeaderBox {
157 creation_time: time,
158 modification_time: time,
159 timescale,
160 duration,
161 rate: I16F16::from_num(1),
162 volume: I8F8::from_num(1),
163 matrix: matrix(config.rotation),
164 next_track_id: 0,
165 },
166 trak: vec![TrackBox {
167 tkhd: TrackHeaderBox {
168 flags: TrackHeaderFlags::TRACK_ENABLED
169 | TrackHeaderFlags::TRACK_IN_MOVIE
170 | TrackHeaderFlags::TRACK_IN_PREVIEW,
171 creation_time: time,
172 modification_time: time,
173 track_id: 1,
174 timescale,
175 duration,
176 layer: 0,
177 alternate_group: 0,
178 volume: I8F8::from_num(1),
179 matrix: matrix(config.rotation),
180 width: U16F16::from_num(width),
181 height: U16F16::from_num(height),
182 },
183 tref: None,
184 edts: None,
185 mdia: MediaBox {
186 mdhd: MediaHeaderBox {
187 creation_time: time,
188 modification_time: time,
189 timescale,
190 duration,
191 language: *b"und",
192 },
193 hdlr: HandlerBox {
194 handler_type: HandlerType::Video,
195 name: "foo".to_string(), },
197 minf: MediaInformationBox {
198 media_header: MediaHeader::Video(VideoMediaHeaderBox {
199 graphics_mode: GraphicsMode::Copy,
200 opcolor: [0, 0, 0],
201 }),
202 dinf: DataInformationBox {
203 dref: DataReferenceBox {
204 data_entries: vec![DataEntry::Url(DataEntryUrlBox {
205 flags: DataEntryFlags::SELF_CONTAINED,
206 location: String::new(),
207 })],
208 },
209 },
210 stbl: SampleTableBox {
211 stsd: SampleDescriptionBox {
212 entries: vec![Box::new(AvcSampleEntry {
213 data_reference_index: 1,
214 width: width as u16,
215 height: height as u16,
216 horiz_resolution: U16F16::from_num(72),
217 vert_resolution: U16F16::from_num(72),
218 frame_count: 1,
219 depth: 0x0018,
220 avcc: AvcConfigurationBox {
221 configuration: AvcDecoderConfigurationRecord {
222 profile_idc: 0x64, constraint_set_flag: 0x00,
224 level_idc: 0x1f, sequence_parameter_set: sps,
226 picture_parameter_set: pps,
227 },
228 },
229 })],
230 },
231 stts: TimeToSampleBox { samples: vec![] },
232 stsc: SampleToChunkBox { entries: vec![] },
233 stsz: SampleSizeBox {
234 sample_size: SampleSize::Different(vec![]),
235 },
236 stco: ChunkOffsetBox {
237 chunk_offsets: vec![],
238 },
239 },
240 },
241 },
242 }],
243 mvex: Some(MovieExtendsBox {
244 mehd: None,
245 trex: vec![TrackExtendsBox {
246 track_id: 1,
247 default_sample_description_index: 1,
248 default_sample_duration: 0,
249 default_sample_size: 0,
250 default_sample_flags: DefaultSampleFlags::empty(),
251 }],
252 }),
253 };
254
255 Self { ftyp, moov }
256 }
257}
258
259#[derive(Debug, Clone)]
263pub struct MediaSegment {
264 moof: MovieFragmentBox,
265 mdat: MediaDataBox,
266}
267
268impl MediaSegment {
269 #[cfg_attr(
270 feature = "tracing",
271 tracing::instrument(level = "trace", skip(sample_sizes, data))
272 )]
273 fn new(config: &Config, sequence_number: u32, sample_sizes: Vec<u32>, data: Vec<u8>) -> Self {
274 let timescale = config.interval.1;
275 let mut moof = MovieFragmentBox {
276 mfhd: MovieFragmentHeaderBox { sequence_number },
277 traf: vec![TrackFragmentBox {
278 tfhd: TrackFragmentHeaderBox {
279 track_id: 1,
280 base_data_offset: Some(0),
281 sample_description_index: None,
282 default_sample_duration: Some(
283 timescale * config.interval.0 / config.interval.1,
284 ),
285 default_sample_size: None,
286 default_sample_flags: {
287 #[allow(clippy::unwrap_used)] Some(DefaultSampleFlags::from_bits(0x0101_0000).unwrap())
289 }, },
291 trun: vec![TrackFragmentRunBox {
292 data_offset: Some(0),
293 first_sample_flags: Some(0x0200_0000), sample_durations: None,
295 sample_sizes: Some(sample_sizes),
296 sample_flags: None,
297 sample_composition_time_offsets: None,
298 }],
299 }],
300 };
301
302 moof.traf[0].trun[0].data_offset = Some(moof.size() as i32 + 8);
303
304 Self {
305 moof,
306 mdat: MediaDataBox {
307 headers: None,
308 data: Arc::new(data),
309 },
310 }
311 }
312
313 fn size(&self) -> u64 {
314 self.moof.size() + self.mdat.size()
315 }
316
317 fn base_data_offset(&mut self) -> &mut Option<u64> {
318 &mut self.moof.traf[0].tfhd.base_data_offset
319 }
320
321 fn sequence_number(&mut self) -> &mut u32 {
322 &mut self.moof.mfhd.sequence_number
323 }
324
325 fn add_headers(&mut self, headers: Vec<u8>) {
326 #[allow(clippy::unwrap_used)]
328 {
329 self.moof.traf[0].trun[0].sample_sizes.as_mut().unwrap()[0] += headers.len() as u32;
330 }
331 self.mdat.headers = Some(headers);
332 }
333}
334
335impl WriteTo for MediaSegment {
336 fn write_to(&self, mut w: impl Write) -> io::Result<()> {
337 write_to(&self.moof, &mut w)?;
338 write_to(&self.mdat, &mut w)?;
339 Ok(())
340 }
341}
342
343#[allow(clippy::missing_panics_doc)]
350#[cfg_attr(
351 feature = "tracing",
352 tracing::instrument(level = "debug", skip(stream_sub_tx))
353)]
354pub async fn stream(
355 config: &Config,
356 stream_sub_tx: flume::Sender<StreamSubscriber>,
357) -> Result<impl Stream<Item = io::Result<Vec<u8>>>> {
358 struct StreamState {
359 init_segment: Option<InitSegment>,
360 size: u64,
361 sequence_number: u32,
362 segment_stream: RecvStream<'static, MediaSegment>,
363 headers: Option<Vec<u8>>,
364 }
365
366 let (tx, rx) = flume::unbounded();
367 stream_sub_tx
368 .send_async(tx)
369 .await
370 .map_err(|_| "Failed to communicate with streaming task".to_string())?;
371 #[allow(clippy::unwrap_used)]
373 let (headers, segment_rx) = rx.recv_async().await.unwrap();
374
375 let init_segment = InitSegment::new(config);
376 let state = StreamState {
377 size: init_segment.size(),
378 init_segment: Some(init_segment),
379 sequence_number: 1,
380 segment_stream: segment_rx.into_stream(),
381 headers: Some(headers),
382 };
383
384 Ok(stream::try_unfold(state, |mut state| async move {
385 if let Some(init_segment) = state.init_segment.take() {
386 let mut buf = Vec::with_capacity(init_segment.size() as usize);
387 init_segment.write_to(&mut buf)?;
388 return Ok(Some((buf, state)));
389 }
390
391 let Some(mut segment) = state.segment_stream.next().await else {
392 #[cfg(feature = "tracing")]
393 tracing::trace!("VideoStream ended");
394 return Ok(None);
395 };
396
397 if let Some(headers) = state.headers.take() {
398 segment.add_headers(headers);
399 }
400 *segment.base_data_offset() = Some(state.size);
401 *segment.sequence_number() = state.sequence_number;
402 state.sequence_number += 1;
403 let size = segment.size();
404 state.size += size;
405
406 let mut buf = Vec::with_capacity(size as usize);
407 segment.write_to(&mut buf)?;
408
409 #[cfg(feature = "tracing")]
410 tracing::trace!(
411 "VideoStream sent media segment with sequence number {}",
412 state.sequence_number - 1
413 );
414
415 Ok(Some((buf, state)))
416 }))
417}
418
419struct FrameIter {
420 camera: Camera,
421}
422
423impl FrameIter {
424 #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace"))]
425 fn new(config: &Config) -> Result<Self> {
426 let mut camera = Camera::new(
427 config
428 .device
429 .as_os_str()
430 .to_str()
431 .ok_or_else(|| "failed to convert device path to string".to_string())?,
432 )?;
433
434 let controls: HashMap<String, u32> = camera
435 .controls()
436 .filter_map(|x| x.ok())
437 .map(|ctl| (ctl.name, ctl.id))
438 .collect();
439
440 for (name, val) in &config.v4l2_controls {
441 if let Some(id) = controls.get(name) {
442 camera.set_control(*id, val).unwrap_or(()); } else {
444 #[cfg(feature = "tracing")]
445 tracing::warn!("Couldn't find control {}", name);
446 }
447 }
448
449 camera.start(&rscam::Config {
450 interval: config.interval,
451 resolution: config.resolution,
452 format: &<[u8; 4]>::from(config.format),
453 ..Default::default()
454 })?;
455
456 Ok(Self { camera })
457 }
458}
459
460impl Iterator for FrameIter {
461 type Item = std::io::Result<rscam::Frame>;
462
463 fn next(&mut self) -> Option<Self::Item> {
464 Some(self.camera.capture())
465 }
466}
467
468enum SegmentIter {
469 Software {
470 config: Config,
471 encoder: x264::Encoder,
472 timestamp: i64,
473 timescale: u32,
474 frames: FrameIter,
475 },
476 Hardware {
477 config: Config,
478 frames: FrameIter,
479 },
480}
481
482impl SegmentIter {
483 #[cfg_attr(
484 feature = "tracing",
485 tracing::instrument(level = "trace", skip(frames))
486 )]
487 fn new(config: Config, frames: FrameIter) -> x264::Result<Self> {
488 Ok(match config.format {
489 Format::H264 => Self::Hardware { frames, config },
490 format => Self::Software {
491 timescale: config.interval.1,
492 encoder: {
493 let timescale = config.interval.1;
494 let bitrate = 896_000;
495 let colorspace = match format {
496 Format::H264 => unreachable!(),
497 Format::BGR3 => x264::Colorspace::BGR,
498 Format::RGB3 => x264::Colorspace::RGB,
499 Format::YUYV => x264::Colorspace::YUYV,
500 Format::YV12 => x264::Colorspace::YV12,
501 };
502 let encoding = x264::Encoding::from(colorspace);
503
504 x264::Setup::preset(x264::Preset::Superfast, x264::Tune::None, false, true)
505 .fps(config.interval.0, config.interval.1)
506 .timebase(1, timescale)
507 .bitrate(bitrate)
508 .high()
509 .annexb(false)
510 .max_keyframe_interval(60)
511 .scenecut_threshold(0)
512 .build(
513 encoding,
514 config.resolution.0 as i32,
515 config.resolution.1 as i32,
516 )?
517 },
518 timestamp: 0,
519 config,
520 frames,
521 },
522 })
523 }
524
525 fn get_headers(&mut self) -> x264::Result<Vec<u8>> {
526 Ok(match self {
527 Self::Software { encoder, .. } => encoder.headers()?.entirety().to_vec(),
528 Self::Hardware { .. } => Vec::new(),
529 })
530 }
531}
532
533impl Iterator for SegmentIter {
534 type Item = Result<MediaSegment>;
535
536 #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
537 fn next(&mut self) -> Option<Self::Item> {
538 match self {
539 Self::Software {
540 config,
541 encoder,
542 timestamp,
543 timescale,
544 frames,
545 } => {
546 let mut sample_sizes = vec![];
547 let mut buf = vec![];
548
549 for _ in 0..60 {
550 let frame = match frames.next() {
551 Some(Ok(f)) => f,
552 Some(Err(e)) => {
553 #[cfg(feature = "tracing")]
554 tracing::warn!("Capturing frame failed with error {:?}", e);
555 return Some(Err(e.into()));
556 }
557 None => unreachable!(),
558 };
559
560 let image = x264::Image::new(
561 x264::Colorspace::YUYV,
562 config.resolution.0 as i32,
563 config.resolution.1 as i32,
564 &[x264::Plane {
565 stride: config.resolution.0 as i32 * 2,
566 data: &frame,
567 }],
568 );
569
570 let (data, _) = match encoder.encode(*timestamp, image) {
571 Ok(x) => x,
572 Err(e) => {
573 #[cfg(feature = "tracing")]
574 tracing::warn!("Encoding frame failed with error {:?}", e);
575 return Some(Err(e.into()));
576 }
577 };
578
579 sample_sizes.push(data.entirety().len() as u32);
580 buf.extend_from_slice(data.entirety());
581 *timestamp +=
582 *timescale as i64 * config.interval.0 as i64 / config.interval.1 as i64;
583 }
584
585 Some(Ok(MediaSegment::new(config, 0, sample_sizes, buf)))
586 }
587 Self::Hardware { frames, config } => {
588 let mut sample_sizes = Vec::new();
589 let mut buf = Vec::new();
590 for _ in 0..60 {
591 let frame = match frames.next() {
592 Some(Ok(f)) => f,
593 Some(Err(e)) => {
594 #[cfg(feature = "tracing")]
595 tracing::warn!("Capturing frame failed with error {:?}", e);
596 return Some(Err(e.into()));
597 }
598 None => unreachable!(),
599 };
600 sample_sizes.push(frame.len() as u32);
601 buf.extend_from_slice(&frame);
602 }
603 Some(Ok(MediaSegment::new(config, 0, sample_sizes, buf)))
604 }
605 }
606 }
607}
608
609pub type MediaSegReceiver = flume::Receiver<MediaSegment>;
614
615pub type StreamSubscriber = flume::Sender<(Vec<u8>, MediaSegReceiver)>;
620
621#[allow(clippy::missing_panics_doc)]
636#[cfg_attr(
637 feature = "tracing",
638 tracing::instrument(level = "debug", skip(rx, config_rx))
639)]
640pub fn stream_media_segments(
641 rx: flume::Receiver<StreamSubscriber>,
642 mut config: Config,
643 config_rx: Option<flume::Receiver<Config>>,
644) -> Result<std::convert::Infallible> {
645 'main: loop {
646 #[cfg(feature = "tracing")]
647 tracing::trace!("Starting stream with config {:?}", config);
648 let mut senders: Vec<flume::Sender<MediaSegment>> = Vec::new();
649
650 let frames = FrameIter::new(&config)?;
651 let mut segments = SegmentIter::new(config.clone(), frames)?;
652 let headers = segments.get_headers()?;
653
654 loop {
655 if let Some(Ok(new_config)) = config_rx.as_ref().map(flume::Receiver::try_recv) {
656 config = new_config;
657 senders.clear();
658 #[cfg(feature = "tracing")]
659 tracing::trace!("Config updated to {:?}, restarting stream", config);
660 continue 'main;
661 }
662 if let Ok(sender) = rx.try_recv() {
663 let (tx, rx) = flume::unbounded();
664 senders.push(tx);
665 sender.send((headers.clone(), rx)).unwrap_or(());
666 }
667
668 #[cfg(feature = "tracing")]
669 let time = std::time::Instant::now();
670 #[allow(clippy::unwrap_used)] let Ok(media_segment) = segments.next().unwrap() else {
672 break;
673 };
674 senders.retain(|sender| sender.send(media_segment.clone()).is_ok());
675 #[cfg(feature = "tracing")]
676 tracing::trace!("Sent media segment, took {:?} to capture", time.elapsed());
677 }
678 }
679}