use std::{cell::RefCell, rc::Rc, time::Duration};
use tokio::sync::{mpsc, watch};
use wasm_bindgen::prelude::*;
use crate::{EncodedFrame, Error, Timestamp};
use super::{Dimensions, VideoDecoderConfig, VideoFrame};
use derive_more::Display;
#[derive(Debug, Display, Clone, Copy)]
pub enum EncoderBitrateMode {
#[display("constant")]
Constant,
#[display("variable")]
Variable,
#[display("quantizer")]
Quantizer,
}
#[derive(Debug, Default, Clone)]
pub struct VideoEncoderConfig {
pub codec: String,
pub resolution: Dimensions,
pub display: Option<Dimensions>,
pub hardware_acceleration: Option<bool>,
pub latency_optimized: Option<bool>,
pub bit_rate: Option<f64>, pub frame_rate: Option<f64>, pub alpha_preserved: Option<bool>, pub scalability_mode: Option<String>,
pub bitrate_mode: Option<EncoderBitrateMode>,
pub max_gop_duration: Option<Duration>, }
impl VideoEncoderConfig {
pub fn new<T: Into<String>>(codec: T, resolution: Dimensions) -> Self {
Self {
codec: codec.into(),
resolution,
display: None,
hardware_acceleration: None,
latency_optimized: None,
bit_rate: None,
frame_rate: None,
alpha_preserved: None,
scalability_mode: None,
bitrate_mode: None,
max_gop_duration: None,
}
}
pub async fn is_supported(&self) -> Result<bool, Error> {
let res =
wasm_bindgen_futures::JsFuture::from(web_sys::VideoEncoder::is_config_supported(&self.into())).await?;
let supported = js_sys::Reflect::get(&res, &JsValue::from_str("supported"))
.unwrap()
.as_bool()
.unwrap();
Ok(supported)
}
pub fn is_valid(&self) -> Result<(), Error> {
if self.resolution.width == 0 || self.resolution.height == 0 {
return Err(Error::InvalidDimensions);
}
if let Some(display) = self.display {
if display.width == 0 || display.height == 0 {
return Err(Error::InvalidDimensions);
}
}
Ok(())
}
pub fn init(self) -> Result<(VideoEncoder, VideoEncoded), Error> {
let (frames_tx, frames_rx) = mpsc::unbounded_channel();
let (closed_tx, closed_rx) = watch::channel(Ok(()));
let (config_tx, config_rx) = watch::channel(None);
let decoder = VideoEncoder::new(self, config_tx, frames_tx, closed_tx)?;
let decoded = VideoEncoded::new(config_rx, frames_rx, closed_rx);
Ok((decoder, decoded))
}
}
impl From<&VideoEncoderConfig> for web_sys::VideoEncoderConfig {
fn from(this: &VideoEncoderConfig) -> Self {
let config = web_sys::VideoEncoderConfig::new(&this.codec, this.resolution.height, this.resolution.width);
if let Some(Dimensions { width, height }) = this.display {
config.set_display_height(height);
config.set_display_width(width);
}
if let Some(preferred) = this.hardware_acceleration {
config.set_hardware_acceleration(match preferred {
true => web_sys::HardwareAcceleration::PreferHardware,
false => web_sys::HardwareAcceleration::PreferSoftware,
});
}
if let Some(value) = this.latency_optimized {
config.set_latency_mode(match value {
true => web_sys::LatencyMode::Realtime,
false => web_sys::LatencyMode::Quality,
});
}
if let Some(value) = this.bit_rate {
config.set_bitrate(value);
}
if let Some(value) = this.frame_rate {
config.set_framerate(value);
}
if let Some(value) = this.alpha_preserved {
config.set_alpha(match value {
true => web_sys::AlphaOption::Keep,
false => web_sys::AlphaOption::Discard,
});
}
if let Some(value) = &this.scalability_mode {
config.set_scalability_mode(value);
}
if let Some(_value) = &this.bitrate_mode {
}
config
}
}
#[derive(Debug, Default)]
pub struct VideoEncodeOptions {
pub key_frame: Option<bool>,
}
pub struct VideoEncoder {
inner: web_sys::VideoEncoder,
config: VideoEncoderConfig,
last_keyframe: Rc<RefCell<Option<Timestamp>>>,
#[allow(dead_code)]
on_error: Closure<dyn FnMut(JsValue)>,
#[allow(dead_code)]
on_frame: Closure<dyn FnMut(JsValue, JsValue)>,
}
impl VideoEncoder {
fn new(
config: VideoEncoderConfig,
on_config: watch::Sender<Option<VideoDecoderConfig>>,
on_frame: mpsc::UnboundedSender<EncodedFrame>,
on_error: watch::Sender<Result<(), Error>>,
) -> Result<Self, Error> {
let last_keyframe = Rc::new(RefCell::new(None));
let last_keyframe2 = last_keyframe.clone();
let on_error2 = on_error.clone();
let on_error = Closure::wrap(Box::new(move |e: JsValue| {
on_error.send_replace(Err(Error::from(e))).ok();
}) as Box<dyn FnMut(_)>);
let on_frame = Closure::wrap(Box::new(move |frame: JsValue, meta: JsValue| {
let frame: web_sys::EncodedVideoChunk = frame.unchecked_into();
let frame = EncodedFrame::from(frame);
if let Ok(metadata) = meta.dyn_into::<js_sys::Object>() {
if let Ok(config) = js_sys::Reflect::get(&metadata, &"decoderConfig".into()) {
if !config.is_falsy() {
let config: web_sys::VideoDecoderConfig = config.unchecked_into();
let config = VideoDecoderConfig::from(config);
on_config.send_replace(Some(config));
}
}
}
if frame.keyframe {
let mut last_keyframe = last_keyframe2.borrow_mut();
if frame.timestamp > last_keyframe.unwrap_or_default() {
*last_keyframe = Some(frame.timestamp);
}
}
if on_frame.send(frame).is_err() {
on_error2.send_replace(Err(Error::Dropped)).ok();
}
}) as Box<dyn FnMut(_, _)>);
let init = web_sys::VideoEncoderInit::new(on_error.as_ref().unchecked_ref(), on_frame.as_ref().unchecked_ref());
let inner: web_sys::VideoEncoder = web_sys::VideoEncoder::new(&init).unwrap();
inner.configure(&(&config).into())?;
Ok(Self {
config,
inner,
last_keyframe,
on_error,
on_frame,
})
}
pub fn encode(&mut self, frame: &VideoFrame, options: VideoEncodeOptions) -> Result<(), Error> {
let o = web_sys::VideoEncoderEncodeOptions::new();
if let Some(key_frame) = options.key_frame {
o.set_key_frame(key_frame);
} else if let Some(max_gop_duration) = self.config.max_gop_duration {
let timestamp = frame.timestamp();
let mut last_keyframe = self.last_keyframe.borrow_mut();
let duration = timestamp - last_keyframe.unwrap_or_default();
if duration >= max_gop_duration {
o.set_key_frame(true);
}
*last_keyframe = Some(timestamp);
}
self.inner.encode_with_options(frame.inner(), &o)?;
frame.inner().close(); Ok(())
}
pub fn queue_size(&self) -> u32 {
self.inner.encode_queue_size()
}
pub fn config(&self) -> &VideoEncoderConfig {
&self.config
}
pub async fn flush(&mut self) -> Result<(), Error> {
wasm_bindgen_futures::JsFuture::from(self.inner.flush()).await?;
Ok(())
}
}
impl Drop for VideoEncoder {
fn drop(&mut self) {
let _ = self.inner.close();
}
}
pub struct VideoEncoded {
config: watch::Receiver<Option<VideoDecoderConfig>>,
frames: mpsc::UnboundedReceiver<EncodedFrame>,
closed: watch::Receiver<Result<(), Error>>,
}
impl VideoEncoded {
fn new(
config: watch::Receiver<Option<VideoDecoderConfig>>,
frames: mpsc::UnboundedReceiver<EncodedFrame>,
closed: watch::Receiver<Result<(), Error>>,
) -> Self {
Self { config, frames, closed }
}
pub async fn frame(&mut self) -> Result<Option<EncodedFrame>, Error> {
tokio::select! {
biased;
frame = self.frames.recv() => Ok(frame),
Ok(()) = self.closed.changed() => Err(self.closed.borrow().clone().err().unwrap()),
}
}
pub async fn config(&self) -> Option<VideoDecoderConfig> {
self.config
.clone()
.wait_for(|config| config.is_some())
.await
.ok()?
.clone()
}
}