pub mod stateless;
use std::collections::VecDeque;
use std::os::fd::AsFd;
use std::os::fd::BorrowedFd;
use std::sync::Arc;
use nix::errno::Errno;
use nix::sys::eventfd::EventFd;
pub use crate::BlockingMode;
use crate::video_frame::VideoFrame;
use crate::DecodedFormat;
use crate::Resolution;
pub trait FramePool {
type Descriptor;
fn coded_resolution(&self) -> Resolution;
fn set_coded_resolution(&mut self, resolution: Resolution);
fn add_frames(&mut self, descriptors: Vec<Self::Descriptor>) -> Result<(), anyhow::Error>;
fn num_free_frames(&self) -> usize;
fn num_managed_frames(&self) -> usize;
fn clear(&mut self);
}
#[derive(Clone, Debug)]
pub struct StreamInfo {
pub format: DecodedFormat,
pub coded_resolution: Resolution,
pub display_resolution: Resolution,
pub min_num_frames: usize,
}
pub enum DecoderEvent<H: DecodedHandle> {
FrameReady(H),
FormatChanged,
}
pub trait DecodedHandle {
type Frame: VideoFrame;
fn video_frame(&self) -> Arc<Self::Frame>;
fn timestamp(&self) -> u64;
fn coded_resolution(&self) -> Resolution;
fn display_resolution(&self) -> Resolution;
fn is_ready(&self) -> bool;
fn sync(&self) -> anyhow::Result<()>;
}
impl<H> DecodedHandle for Box<H>
where
H: DecodedHandle + ?Sized,
{
type Frame = H::Frame;
fn video_frame(&self) -> Arc<Self::Frame> {
self.as_ref().video_frame()
}
fn timestamp(&self) -> u64 {
self.as_ref().timestamp()
}
fn coded_resolution(&self) -> Resolution {
self.as_ref().coded_resolution()
}
fn display_resolution(&self) -> Resolution {
self.as_ref().display_resolution()
}
fn is_ready(&self) -> bool {
self.as_ref().is_ready()
}
fn sync(&self) -> anyhow::Result<()> {
self.as_ref().sync()
}
}
pub type DynDecodedHandle<F> = Box<dyn DecodedHandle<Frame = F>>;
struct ReadyFramesQueue<T> {
queue: VecDeque<T>,
poll_fd: EventFd,
}
impl<T> ReadyFramesQueue<T> {
fn new() -> Result<Self, Errno> {
let poll_fd = EventFd::new()?;
Ok(Self { queue: Default::default(), poll_fd })
}
fn push(&mut self, handle: T) {
self.queue.push_back(handle);
if let Err(e) = self.poll_fd.write(1) {
log::error!("failed to write ready frames queue poll FD: {:#}", e);
}
}
pub fn poll_fd(&self) -> BorrowedFd {
self.poll_fd.as_fd()
}
}
impl<T> Extend<T> for ReadyFramesQueue<T> {
fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
let len_before = self.queue.len();
self.queue.extend(iter);
if let Err(e) = self.poll_fd.write((self.queue.len() - len_before) as u64) {
log::error!("failed to write ready frames queue poll FD: {:#}", e);
}
}
}
impl<T> Iterator for ReadyFramesQueue<T> {
type Item = T;
fn next(&mut self) -> Option<T> {
let next = self.queue.pop_front();
if next.is_some() && self.queue.is_empty() {
if let Err(e) = self.poll_fd.read() {
log::error!("failed to read ready frames queue poll FD: {:#}", e);
}
}
next
}
}
#[cfg(test)]
mod tests {
use nix::sys::epoll::Epoll;
use nix::sys::epoll::EpollCreateFlags;
use nix::sys::epoll::EpollEvent;
use nix::sys::epoll::EpollFlags;
use nix::sys::epoll::EpollTimeout;
use super::ReadyFramesQueue;
#[test]
fn test_ready_frame_queue_poll() {
let mut queue = ReadyFramesQueue::<()>::new().unwrap();
let epoll = Epoll::new(EpollCreateFlags::empty()).unwrap();
epoll.add(queue.poll_fd(), EpollEvent::new(EpollFlags::EPOLLIN, 1)).unwrap();
let mut events = [EpollEvent::empty()];
let nb_fds = epoll.wait(&mut events, EpollTimeout::ZERO).unwrap();
assert_eq!(nb_fds, 0);
queue.push(());
let mut events = [EpollEvent::empty()];
let nb_fds = epoll.wait(&mut events, EpollTimeout::ZERO).unwrap();
assert_eq!(nb_fds, 1);
assert_eq!(events, [EpollEvent::new(EpollFlags::EPOLLIN, 1)]);
queue.next().unwrap();
let mut events = [EpollEvent::empty()];
let nb_fds = epoll.wait(&mut events, EpollTimeout::ZERO).unwrap();
assert_eq!(nb_fds, 0);
assert_eq!(events, [EpollEvent::empty()]);
queue.extend(std::iter::repeat(()).take(3));
let mut events = [EpollEvent::empty()];
let nb_fds = epoll.wait(&mut events, EpollTimeout::ZERO).unwrap();
assert_eq!(nb_fds, 1);
assert_eq!(events, [EpollEvent::new(EpollFlags::EPOLLIN, 1)]);
queue.next().unwrap();
let mut events = [EpollEvent::empty()];
let nb_fds = epoll.wait(&mut events, EpollTimeout::ZERO).unwrap();
assert_eq!(nb_fds, 1);
assert_eq!(events, [EpollEvent::new(EpollFlags::EPOLLIN, 1)]);
queue.next().unwrap();
let mut events = [EpollEvent::empty()];
let nb_fds = epoll.wait(&mut events, EpollTimeout::ZERO).unwrap();
assert_eq!(nb_fds, 1);
assert_eq!(events, [EpollEvent::new(EpollFlags::EPOLLIN, 1)]);
queue.next().unwrap();
let mut events = [EpollEvent::empty()];
let nb_fds = epoll.wait(&mut events, EpollTimeout::ZERO).unwrap();
assert_eq!(nb_fds, 0);
assert_eq!(events, [EpollEvent::empty()]);
}
}