use crate::action::{Action, Props, StatefulAction, DEFAULT, INFINITE, VISUAL};
use crate::comm::{QWriter, Signal};
use crate::resource::{
Color, IoManager, ResourceAddr, ResourceManager, ResourceValue, StreamMode, Trigger, Volume,
};
use crate::server::{AsyncSignal, Config, State, SyncSignal};
use crate::util::spin_sleeper;
use eframe::egui;
use eframe::egui::{CentralPanel, Color32, CursorIcon, Frame, TextureId, Vec2};
use eyre::{eyre, Context, Result};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::sync::mpsc::{Receiver, Sender, TryRecvError};
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::thread::JoinHandle;
use std::time::Duration;
#[derive(Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct Stream {
src: PathBuf,
#[serde(default)]
width: Option<u16>,
#[serde(default)]
volume: Volume,
#[serde(default)]
looping: bool,
#[serde(default)]
trigger: Trigger,
#[serde(default)]
background: Color,
}
stateful_arc!(Stream {
frame: Arc<Mutex<Option<(TextureId, Vec2)>>>,
framerate: f64,
width: Option<u16>,
looping: bool,
link_start: Sender<()>,
link_stop: Option<Receiver<()>>,
join_handle: Option<JoinHandle<Result<()>>>,
background: Color32,
});
impl Action for Stream {
#[inline(always)]
fn resources(&self, _config: &Config) -> Vec<ResourceAddr> {
if let Trigger::Ext(trig) = &self.trigger {
vec![
ResourceAddr::Stream(self.src.clone()),
ResourceAddr::Ref(trig.clone()),
]
} else {
vec![ResourceAddr::Stream(self.src.clone())]
}
}
#[inline]
fn init(self) -> Result<Box<dyn Action>> {
if let Volume::Value(vol) = self.volume {
if !(0.0..=1.0).contains(&vol) {
return Err(eyre!(
"Stream volume should be a float number between 0.0 and 1.0"
));
}
}
Ok(Box::new(self))
}
fn stateful(
&self,
_io: &IoManager,
res: &ResourceManager,
config: &Config,
_sync_writer: &QWriter<SyncSignal>,
_async_writer: &QWriter<AsyncSignal>,
) -> Result<Box<dyn StatefulAction>> {
let src = ResourceAddr::Stream(self.src.clone());
let stream = if let ResourceValue::Stream(stream) = res.fetch(&src)? {
stream
} else {
return Err(eyre!("Resource value and address types don't match."));
};
let use_trigger = config.use_trigger().value();
let media_mode = match (&self.trigger, use_trigger) {
(Trigger::Ext(trig), true) => {
let trig = ResourceAddr::Ref(trig.clone());
let trig = if let ResourceValue::Ref(trig) = res.fetch(&trig)? {
trig
} else {
return Err(eyre!("Resource value and address types don't match."));
};
StreamMode::WithExtTrigger(trig)
}
(Trigger::Int, false) => StreamMode::SansIntTrigger,
_ => StreamMode::Normal,
};
let frame = Arc::new(Mutex::new(None));
let volume = self.volume.or(&config.volume()).value();
let mut stream = stream.cloned(frame.clone(), media_mode, volume)?;
if !stream.has_video() && self.width.is_some() {
return Err(eyre!(
"Video-less stream `?` should not be supplied a width"
));
}
let framerate = stream.framerate();
let sleeper = spin_sleeper();
let period = if stream.has_video() {
Duration::from_secs_f64(0.5 / framerate)
} else {
Duration::from_millis(5)
};
let done = Arc::new(Mutex::new(Ok(stream.eos())));
let (tx_start, rx_start) = mpsc::channel();
let (tx_stop, rx_stop) = mpsc::channel();
let looping = self.looping;
let done_clone = done.clone();
let join_handle = thread::spawn(move || -> Result<()> {
if rx_start.recv().is_err() {
return Ok(());
}
stream.start()?;
loop {
if let Err(TryRecvError::Disconnected) = rx_start.try_recv() {
stream.pause()?;
break;
}
sleeper.sleep(period);
let mut done = done_clone.lock().unwrap();
match (stream.eos(), stream.process_bus(looping)) {
(true, _) => *done = Ok(true),
(false, Ok(true)) => *done = Ok(true),
(false, Err(e)) => *done = Err(e),
_ => {}
}
if let Ok(true) = *done {
break;
}
}
let _ = tx_stop.send(());
Ok(())
});
Ok(Box::new(StatefulStream {
done,
frame,
framerate,
width: self.width,
looping,
link_start: tx_start,
link_stop: Some(rx_stop),
join_handle: Some(join_handle),
background: self.background.into(),
}))
}
}
impl StatefulAction for StatefulStream {
impl_stateful!();
#[inline(always)]
fn props(&self) -> Props {
match (self.framerate, self.looping) {
(f, false) if f > 0.0 => VISUAL,
(f, true) if f > 0.0 => INFINITE | VISUAL,
(_, false) => DEFAULT,
(_, true) => INFINITE,
}
.into()
}
fn start(
&mut self,
sync_writer: &mut QWriter<SyncSignal>,
_async_writer: &mut QWriter<AsyncSignal>,
_state: &State,
) -> Result<Signal> {
self.link_start
.send(())
.wrap_err("Failed to send start signal to concurrent stream thread.")?;
let rx_stop = self
.link_stop
.take()
.ok_or_else(|| eyre!("Link to streaming thread could not be acquired for action"))?;
let done = self.done.clone();
let join_handle = self
.join_handle
.take()
.ok_or_else(|| eyre!("JoinHandle for action has died prematurely"))?;
if let Ok(true) = *self.done.lock().unwrap() {
sync_writer.push(SyncSignal::UpdateGraph);
return Ok(Signal::none());
}
{
let mut sync_writer = sync_writer.clone();
thread::spawn(move || {
let _ = rx_stop.recv();
*done.lock().unwrap() = match join_handle.join() {
Ok(Ok(_)) => Ok(true),
Ok(Err(e)) => Err(e).wrap_err("Stream decoder thread failed with error."),
Err(_) => Err(eyre!("Failed to graciously close stream decoder thread.")),
};
sync_writer.push(SyncSignal::UpdateGraph);
});
}
sync_writer.push(SyncSignal::Repaint);
Ok(Signal::none())
}
fn show(
&mut self,
ui: &mut egui::Ui,
_sync_writer: &mut QWriter<SyncSignal>,
_async_writer: &mut QWriter<AsyncSignal>,
_state: &State,
) -> Result<()> {
let (texture, size) = self
.frame
.lock()
.unwrap()
.unwrap_or_else(|| (TextureId::default(), Vec2::splat(1.0)));
ui.output().cursor_icon = CursorIcon::None;
CentralPanel::default()
.frame(Frame::default().fill(self.background))
.show_inside(ui, |ui| {
ui.centered_and_justified(|ui| {
if let Some(width) = self.width {
let scale = width as f32 / size.x;
ui.image(texture, size * scale);
} else {
ui.image(texture, size);
}
});
});
if self.framerate > 0.0 {
ui.ctx().request_repaint();
}
Ok(())
}
fn debug(&self) -> Vec<(&str, String)> {
<dyn StatefulAction>::debug(self)
.into_iter()
.chain([
("framerate", format!("{:?}", self.framerate)),
("looping", format!("{:?}", self.looping)),
])
.collect()
}
}