use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use image::DynamicImage;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::task::JoinHandle;
use tokio_stream::Stream;
use crate::audio::AudioFormat;
use crate::configuration::ExtractOptions;
use crate::error::UnbundleError;
use crate::metadata::MediaMetadata;
use crate::unbundle::MediaFile;
use crate::video::FrameRange;
const DEFAULT_CHANNEL_CAPACITY: usize = 8;
pub struct FrameStream {
receiver: Receiver<Result<(u64, DynamicImage), UnbundleError>>,
#[allow(dead_code)]
handle: JoinHandle<()>,
}
impl Stream for FrameStream {
type Item = Result<(u64, DynamicImage), UnbundleError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.receiver.poll_recv(cx)
}
}
pub(crate) fn create_frame_stream(
source: String,
range: FrameRange,
config: ExtractOptions,
channel_capacity: Option<usize>,
) -> FrameStream {
let capacity = channel_capacity.unwrap_or(DEFAULT_CHANNEL_CAPACITY).max(1);
let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
let handle = tokio::task::spawn_blocking(move || {
let result = decode_frames_blocking(&source, range, &config, &sender);
if let Err(e) = result {
let _ = sender.blocking_send(Err(e));
}
});
FrameStream { receiver, handle }
}
fn decode_frames_blocking(
source: &str,
range: FrameRange,
config: &ExtractOptions,
sender: &Sender<Result<(u64, DynamicImage), UnbundleError>>,
) -> Result<(), UnbundleError> {
let mut unbundler = MediaFile::open_source(source)?;
unbundler
.video()
.for_each_frame_with_options(range, config, |frame_number, image| {
sender
.blocking_send(Ok((frame_number, image)))
.map_err(|_| UnbundleError::Cancelled)
})
}
pub struct AudioFuture {
handle: JoinHandle<Result<Vec<u8>, UnbundleError>>,
}
impl Future for AudioFuture {
type Output = Result<Vec<u8>, UnbundleError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.handle)
.poll(cx)
.map(|result| result.unwrap_or_else(|_| Err(UnbundleError::Cancelled)))
}
}
pub(crate) fn create_audio_future(
source: String,
format: AudioFormat,
track_index: Option<usize>,
range: Option<(Duration, Duration)>,
config: ExtractOptions,
) -> AudioFuture {
let handle = tokio::task::spawn_blocking(move || {
let mut unbundler = MediaFile::open_source(&source)?;
let mut extractor = if let Some(index) = track_index {
unbundler.audio_track(index)?
} else {
unbundler.audio()
};
match range {
Some((start, end)) => extractor.extract_range_with_options(start, end, format, &config),
None => extractor.extract_with_options(format, &config),
}
});
AudioFuture { handle }
}
pub struct MetadataFuture {
pub(crate) handle: JoinHandle<Result<MediaMetadata, UnbundleError>>,
}
impl Future for MetadataFuture {
type Output = Result<MediaMetadata, UnbundleError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.handle)
.poll(cx)
.map(|result| result.unwrap_or_else(|_| Err(UnbundleError::Cancelled)))
}
}
pub(crate) fn create_metadata_future(source: String) -> MetadataFuture {
let handle = tokio::task::spawn_blocking(move || {
let unbundler = MediaFile::open_source(&source)?;
Ok(unbundler.metadata.clone())
});
MetadataFuture { handle }
}