use rtrb::{Consumer, Producer, RingBuffer};
use crate::{
Client, Control, Frames, ProcessHandler, ProcessScope, TransportPosition, TransportState,
};
pub struct ProcessorChannels<Command, Notification> {
notifications: Producer<Notification>,
commands: Consumer<Command>,
}
impl<Command, Notification> ProcessorChannels<Command, Notification> {
pub fn recv_command(&mut self) -> Option<Command> {
self.commands.pop().ok()
}
pub fn drain_commands(&mut self) -> impl Iterator<Item = Command> + '_ {
std::iter::from_fn(move || self.commands.pop().ok())
}
pub fn try_notify(&mut self, notification: Notification) -> Result<(), Notification> {
self.notifications
.push(notification)
.map_err(|rtrb::PushError::Full(n)| n)
}
}
pub struct Controller<Command, Notification> {
notifications: Consumer<Notification>,
commands: Producer<Command>,
}
impl<Command, Notification> Controller<Command, Notification> {
pub fn send_command(&mut self, command: Command) -> Result<(), Command> {
self.commands
.push(command)
.map_err(|rtrb::PushError::Full(cmd)| cmd)
}
pub fn recv_notification(&mut self) -> Option<Notification> {
self.notifications.pop().ok()
}
pub fn drain_notifications(&mut self) -> impl Iterator<Item = Notification> + '_ {
std::iter::from_fn(move || self.notifications.pop().ok())
}
}
pub trait ControlledProcessorTrait: Send + Sized {
type Command: Send;
type Notification: Send;
const SLOW_SYNC: bool = false;
fn sync(
&mut self,
_client: &Client,
_state: TransportState,
_pos: &TransportPosition,
_channels: &mut ProcessorChannels<Self::Command, Self::Notification>,
) -> bool {
true
}
fn buffer_size(
&mut self,
client: &Client,
size: Frames,
channels: &mut ProcessorChannels<Self::Command, Self::Notification>,
) -> Control;
fn process(
&mut self,
client: &Client,
scope: &ProcessScope,
channels: &mut ProcessorChannels<Self::Command, Self::Notification>,
) -> Control;
#[must_use = "the processor instance must be used with Client::activate"]
fn instance(
self,
notification_channel_size: usize,
command_channel_size: usize,
) -> (
ControlledProcessorInstance<Self>,
Controller<Self::Command, Self::Notification>,
) {
let (notifications, notifications_other) =
RingBuffer::<Self::Notification>::new(notification_channel_size);
let (commands_other, commands) = RingBuffer::<Self::Command>::new(command_channel_size);
let controller = Controller {
notifications: notifications_other,
commands: commands_other,
};
let processor = ControlledProcessorInstance {
inner: self,
channels: ProcessorChannels {
notifications,
commands,
},
};
(processor, controller)
}
}
pub struct ControlledProcessorInstance<T: ControlledProcessorTrait> {
pub inner: T,
channels: ProcessorChannels<T::Command, T::Notification>,
}
impl<T: ControlledProcessorTrait> ProcessHandler for ControlledProcessorInstance<T> {
fn process(&mut self, client: &Client, scope: &ProcessScope) -> Control {
self.inner.process(client, scope, &mut self.channels)
}
const SLOW_SYNC: bool = T::SLOW_SYNC;
fn buffer_size(&mut self, client: &Client, size: Frames) -> Control {
self.inner.buffer_size(client, size, &mut self.channels)
}
fn sync(&mut self, client: &Client, state: TransportState, pos: &TransportPosition) -> bool {
self.inner.sync(client, state, pos, &mut self.channels)
}
}