use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use crate::backend::BackendKind;
use crate::device::DacInfo;
use crate::error::{Error, Result};
use crate::reconnect::{reconnect_backend_with_retry, ReconnectPolicy};
use crate::stream::{ControlMsg, RunExit, StreamControl};
use super::content_source::{ContentSourceKind, FifoContentSource, FrameContentSource};
use super::output_model::{
self, process_control_messages, LoopCtx, OutputModelAdapter, StepOutcome,
};
use super::session::FrameSessionMetrics;
use super::OutputResetReason;
pub(crate) enum SourceOwned {
Fifo(Box<dyn FifoContentSource>),
Frame(Box<dyn FrameContentSource>),
}
impl SourceOwned {
fn is_frame(&self) -> bool {
matches!(self, SourceOwned::Frame(_))
}
fn as_kind(&mut self) -> ContentSourceKind<'_> {
match self {
SourceOwned::Fifo(s) => ContentSourceKind::Fifo(s.as_mut()),
SourceOwned::Frame(s) => ContentSourceKind::Frame(s.as_mut()),
}
}
fn on_reconnect(&mut self, info: &DacInfo) {
match self {
SourceOwned::Fifo(s) => s.on_reconnect(info),
SourceOwned::Frame(s) => s.on_reconnect(info),
}
}
fn set_frame_capacity_if_supported(&mut self, cap: Option<usize>) {
if let SourceOwned::Frame(s) = self {
s.set_frame_capacity(cap);
}
}
fn is_ended(&self) -> bool {
match self {
SourceOwned::Fifo(s) => s.is_ended(),
SourceOwned::Frame(_) => false,
}
}
fn submit_frame(&mut self, frame: super::Frame) {
match self {
SourceOwned::Fifo(s) => s.submit_frame(frame),
SourceOwned::Frame(s) => s.submit_frame(frame),
}
}
fn arm_startup_blank(&mut self, pps: u32) {
match self {
SourceOwned::Fifo(s) => s.arm_startup_blank(pps),
SourceOwned::Frame(s) => s.arm_startup_blank(pps),
}
}
fn on_disarm(&mut self) {
match self {
SourceOwned::Fifo(s) => s.on_disarm(),
SourceOwned::Frame(s) => s.on_disarm(),
}
}
fn reset_output_filter(&mut self, reason: OutputResetReason) {
match self {
SourceOwned::Fifo(s) => s.reset_output_filter(reason),
SourceOwned::Frame(s) => s.reset_output_filter(reason),
}
}
fn resize_color_delay_micros(&mut self, micros: u64, pps: u32) {
match self {
SourceOwned::Fifo(s) => s.resize_color_delay_micros(micros, pps),
SourceOwned::Frame(s) => s.resize_color_delay_micros(micros, pps),
}
}
fn take_stop_error(&mut self) -> Option<Error> {
match self {
SourceOwned::Fifo(s) => s.take_stop_error(),
SourceOwned::Frame(_) => None,
}
}
}
pub(crate) type PendingFrame = Arc<Mutex<Option<super::Frame>>>;
pub(crate) type ReconnectValidator =
Box<dyn Fn(&DacInfo, &BackendKind, u32) -> std::result::Result<(), RunExit> + Send>;
pub(crate) type ErrorSink = Box<dyn FnMut(Error) + Send>;
pub(crate) struct DriverInputs {
pub backend: BackendKind,
pub source: SourceOwned,
pub control: StreamControl,
pub control_rx: Receiver<ControlMsg>,
pub metrics: FrameSessionMetrics,
pub reconnect_policy: Option<ReconnectPolicy>,
pub validator: ReconnectValidator,
pub error_sink: ErrorSink,
pub target_buffer: Duration,
pub drain_timeout: Duration,
pub pending_frame: Option<PendingFrame>,
}
pub(crate) fn run(mut inputs: DriverInputs) -> Result<RunExit> {
let expected_frame_swap = inputs.source.is_frame();
let mut adapter = output_model::for_backend(&inputs.backend);
let mut shutter_open = false;
let mut last_armed = false;
let mut error_sink = inputs.error_sink;
loop {
inputs.metrics.mark_loop_activity();
if inputs.control.is_stop_requested() {
return Ok(RunExit::Stopped);
}
let pps = inputs.control.pps();
let color_delay_micros = inputs.control.color_delay().as_micros() as u64;
inputs
.source
.resize_color_delay_micros(color_delay_micros, pps);
if let Some(slot) = inputs.pending_frame.as_ref() {
if let Some(frame) = slot.lock().unwrap().take() {
inputs.source.submit_frame(frame);
}
}
if !inputs.backend.is_connected() {
match reconnect(
&mut inputs.backend,
inputs.reconnect_policy.as_ref(),
&inputs.validator,
&mut inputs.source,
expected_frame_swap,
&inputs.control,
&mut shutter_open,
&mut last_armed,
&inputs.metrics,
&mut *adapter,
) {
Ok(()) => continue,
Err(exit) => return Ok(exit),
}
}
if process_control_messages(&inputs.control_rx, &mut shutter_open, &mut inputs.backend) {
return Ok(RunExit::Stopped);
}
let is_armed = inputs.control.is_armed();
handle_shutter_transition(
is_armed,
&mut last_armed,
&mut shutter_open,
&mut inputs.backend,
&mut inputs.source,
pps,
);
let outcome = {
let source = inputs.source.as_kind();
let mut ctx = LoopCtx {
backend: &mut inputs.backend,
source,
control: &inputs.control,
control_rx: &inputs.control_rx,
metrics: &inputs.metrics,
shutter_open: &mut shutter_open,
error_sink: &mut *error_sink,
target_buffer: inputs.target_buffer,
pps,
is_armed,
};
adapter.step(&mut ctx)
};
match outcome {
StepOutcome::Continue => {}
StepOutcome::Stopped => return Ok(RunExit::Stopped),
StepOutcome::Disconnected => {
match reconnect(
&mut inputs.backend,
inputs.reconnect_policy.as_ref(),
&inputs.validator,
&mut inputs.source,
expected_frame_swap,
&inputs.control,
&mut shutter_open,
&mut last_armed,
&inputs.metrics,
&mut *adapter,
) {
Ok(()) => continue,
Err(exit) => return Ok(exit),
}
}
}
if inputs.source.is_ended() {
if let Some(err) = inputs.source.take_stop_error() {
return Err(err);
}
let mut ctx = LoopCtx {
backend: &mut inputs.backend,
source: inputs.source.as_kind(),
control: &inputs.control,
control_rx: &inputs.control_rx,
metrics: &inputs.metrics,
shutter_open: &mut shutter_open,
error_sink: &mut *error_sink,
target_buffer: inputs.target_buffer,
pps,
is_armed,
};
adapter.drain_and_blank(&mut ctx, inputs.drain_timeout);
return Ok(RunExit::ProducerEnded);
}
}
}
#[allow(clippy::too_many_arguments)]
fn reconnect(
backend: &mut BackendKind,
policy: Option<&ReconnectPolicy>,
validator: &ReconnectValidator,
source: &mut SourceOwned,
expected_frame_swap: bool,
control: &StreamControl,
shutter_open: &mut bool,
last_armed: &mut bool,
metrics: &FrameSessionMetrics,
adapter: &mut dyn OutputModelAdapter,
) -> std::result::Result<(), RunExit> {
let Some(policy) = policy else {
return Err(RunExit::Disconnected);
};
metrics.set_connected(false);
let (info, new_backend) = reconnect_backend_with_retry(
policy,
|| control.is_stop_requested(),
|info, new_backend| {
if new_backend.is_frame_swap() != expected_frame_swap {
log::error!(
"'{}' reconnected device has incompatible backend type",
policy.target.device_id
);
return Err(RunExit::Disconnected);
}
validator(info, new_backend, control.pps())
},
|| metrics.mark_loop_activity(),
)?;
*backend = new_backend;
*shutter_open = false;
*last_armed = false;
metrics.set_connected(true);
source.on_reconnect(&info);
if expected_frame_swap {
source.set_frame_capacity_if_supported(backend.frame_capacity());
}
adapter.on_reconnect(&info, backend);
if let Some(cb) = policy.on_reconnect.lock().unwrap().as_mut() {
cb(&info);
}
Ok(())
}
fn handle_shutter_transition(
is_armed: bool,
last_armed: &mut bool,
shutter_open: &mut bool,
backend: &mut BackendKind,
source: &mut SourceOwned,
pps: u32,
) {
if !*last_armed && is_armed {
source.arm_startup_blank(pps);
source.reset_output_filter(OutputResetReason::Arm);
if !*shutter_open {
let _ = backend.set_shutter(true);
*shutter_open = true;
}
} else if *last_armed && !is_armed {
source.on_disarm();
source.reset_output_filter(OutputResetReason::Disarm);
if *shutter_open {
let _ = backend.set_shutter(false);
*shutter_open = false;
}
}
*last_armed = is_armed;
}