use gst::prelude::*;
use gstreamer as gst;
use crate::video_sink::memory_video_sink;
use crate::{PlayingState, VideoInfo};
use gstreamer::Bus;
use std::sync::mpsc::{self, Receiver, Sender};
use crate::frame_handler::FrameHandler;
use crate::playbin_query::{audio_tracks, subtitle_tracks, video_duration};
use std::thread;
pub(crate) enum InternalMessage {
VideoStreamAction(VideoStreamAction),
RequestPositionUpdate,
}
#[derive(Debug, Clone, Copy)]
pub enum VideoStreamAction {
SetCurrentSubtitleTrack(Option<usize>),
SetCurrentAudioTrack(Option<usize>),
SetVolume(f32),
SetPlay,
SetPause,
SeekToSeconds(f64),
Close,
}
impl VideoStreamAction {
}
#[derive(Debug, Clone)]
pub enum VideoStreamEvent {
VideoLoaded(VideoInfo),
NewFrame,
Error(String),
CurrentAudioTrackChanged(usize),
CurrentSubtitleTrackChanged(usize),
VolumeChanged(f32),
PlayingStateChanged(PlayingState),
PositionChanged(f64),
Closed,
}
#[derive(Default, Debug, Clone)]
pub struct FrameData {
pub data: Vec<u8>,
pub size: [usize; 2],
}
fn handle_action(
video_action: VideoStreamAction,
playbin_pipeline: &gst::Element,
sender: &Sender<VideoStreamEvent>,
) -> bool {
match video_action {
VideoStreamAction::SetCurrentSubtitleTrack(track_id) => {
if let Some(subtitle_track_id) = track_id {
playbin_pipeline.set_property("current-text", subtitle_track_id as i32);
sender
.send(VideoStreamEvent::CurrentSubtitleTrackChanged(
subtitle_track_id,
))
.unwrap();
} else {
todo!("disable subtitles")
}
}
VideoStreamAction::SetCurrentAudioTrack(audio_track_id) => {
if let Some(audio_track_id) = audio_track_id {
playbin_pipeline.set_property("current-audio", audio_track_id as i32);
sender
.send(VideoStreamEvent::CurrentAudioTrackChanged(audio_track_id))
.unwrap();
} else {
todo!("disable audio")
}
}
VideoStreamAction::SetVolume(volume) => {
sender
.send(VideoStreamEvent::VolumeChanged(volume))
.unwrap();
playbin_pipeline.set_property("volume", volume as f64);
}
VideoStreamAction::SetPlay => {
playbin_pipeline.set_state(gst::State::Playing).unwrap();
sender
.send(VideoStreamEvent::PlayingStateChanged(PlayingState::Playing))
.unwrap();
}
VideoStreamAction::SetPause => {
playbin_pipeline.set_state(gst::State::Paused).unwrap();
sender
.send(VideoStreamEvent::PlayingStateChanged(PlayingState::Paused))
.unwrap();
}
VideoStreamAction::SeekToSeconds(seconds) => {
let position_ns = (seconds * 1_000_000_000.0) as u64;
sender
.send(VideoStreamEvent::PositionChanged(seconds))
.unwrap();
playbin_pipeline
.seek(
1.0,
gst::SeekFlags::FLUSH | gst::SeekFlags::ACCURATE,
gst::SeekType::Set,
gst::ClockTime::from_nseconds(position_ns),
gst::SeekType::None,
gst::ClockTime::NONE,
)
.unwrap_or_else(|_| println!("Seek failed"));
}
VideoStreamAction::Close => {
playbin_pipeline.set_state(gst::State::Null).unwrap();
sender.send(VideoStreamEvent::Closed).unwrap();
return true;
}
}
false
}
fn handle_message(
message: InternalMessage,
playbin_pipeline: &gst::Element,
sender: &Sender<VideoStreamEvent>,
) -> bool {
match message {
InternalMessage::VideoStreamAction(video_action) => {
handle_action(video_action, playbin_pipeline, sender)
}
InternalMessage::RequestPositionUpdate => {
let mut position_query = gst::query::Position::new(gst::Format::Time);
if playbin_pipeline.query(&mut position_query) {
let position_nanoseconds = match position_query.result() {
gstreamer::GenericFormattedValue::Time(Some(position)) => position.nseconds(),
_ => 0,
};
sender
.send(VideoStreamEvent::PositionChanged(
position_nanoseconds as f64 / 1_000_000_000.0,
))
.unwrap();
false
} else {
println!("Position query failed");
false
}
}
}
}
fn wait_for_video_to_load(playbin_message_bus: &Bus) {
while let Some(msg) = playbin_message_bus.timed_pop(gst::ClockTime::NONE) {
if let gst::MessageView::AsyncDone(_) = msg.view() {
break;
}
}
}
pub fn open_video(
uri: impl Into<String>,
frame_data_handler: impl FrameHandler + 'static,
) -> (Sender<VideoStreamAction>, Receiver<VideoStreamEvent>) {
let (actions_sender, actions_receiver) = mpsc::channel();
let (event_sender, event_receiver) = mpsc::channel();
let uri = uri.into();
thread::spawn(move || {
open_video_internal(&uri, actions_receiver, event_sender, frame_data_handler);
});
(actions_sender, event_receiver)
}
fn open_video_internal(
uri: &str,
receiver: Receiver<VideoStreamAction>,
sender: Sender<VideoStreamEvent>,
frame_data_handler: impl FrameHandler + 'static,
) {
let (internal_sender, internal_receiver) = mpsc::channel::<InternalMessage>();
gst::init().expect("to initialize gstreamer without errors");
let memory_video_sink =
memory_video_sink(internal_sender.clone(), sender.clone(), frame_data_handler);
let playbin_pipeline = gst::ElementFactory::make("playbin")
.property("uri", uri)
.build()
.unwrap();
playbin_pipeline.set_property("video-sink", memory_video_sink);
let playbin_message_bus = playbin_pipeline.bus().unwrap();
playbin_pipeline
.set_state(gst::State::Playing)
.expect("Unable to set the pipeline to the `Playing` state");
wait_for_video_to_load(&playbin_message_bus);
let bus_thread_handle = thread::spawn(move || {
for msg in playbin_message_bus.iter_timed(gst::ClockTime::NONE) {
use gst::MessageView;
match msg.view() {
MessageView::Eos(..) => break,
MessageView::Error(err) => {
println!(
"Error from {:?}: {} ({:?})",
err.src().map(|s| s.path_string()),
err.error(),
err.debug()
);
break;
}
MessageView::StateChanged(_state_changed) =>
{
}
_ => (),
}
}
});
let action_receiver_thread_handle = thread::spawn(move || {
while let Some(message) = receiver.iter().next() {
let needs_to_be_closed = matches!(&message, VideoStreamAction::Close);
internal_sender
.send(InternalMessage::VideoStreamAction(message))
.unwrap();
if needs_to_be_closed {
break;
}
}
});
let video_state = VideoInfo {
title: "Test title.mkv".to_string(),
current_subtitle_track: Some(0),
current_audio_track: Some(0),
volume: 1.0,
subtitle_tracks: subtitle_tracks(&playbin_pipeline),
audio_tracks: audio_tracks(&playbin_pipeline),
playing_state: PlayingState::Playing,
duration: video_duration(&playbin_pipeline),
current_position: 0.0,
};
sender
.send(VideoStreamEvent::VideoLoaded(video_state))
.expect("to notify the video has loaded");
while let Some(message) = internal_receiver.iter().next() {
let needs_to_close_stream = handle_message(message, &playbin_pipeline, &sender);
if needs_to_close_stream {
break;
}
}
playbin_pipeline
.set_state(gst::State::Null)
.expect("Unable to set the pipeline to the `Null` state");
action_receiver_thread_handle.join().unwrap();
bus_thread_handle.join().unwrap();
println!("All video rendering threads closed");
}