mod ring_buffer;
use std::{
io::{self, Read, Write},
marker::PhantomData,
os::fd::AsFd,
};
use crate::exec::event::{EventHandle, EventRegistry, PollEvent, Process};
use self::ring_buffer::RingBuffer;
pub(super) struct Pipe<L, R> {
left: L,
right: R,
buffer_lr: Buffer<L, R>,
buffer_rl: Buffer<R, L>,
background: bool,
}
impl<L: Read + Write + AsFd, R: Read + Write + AsFd> Pipe<L, R> {
pub fn new<T: Process>(
left: L,
right: R,
registry: &mut EventRegistry<T>,
f_left: fn(PollEvent) -> T::Event,
f_right: fn(PollEvent) -> T::Event,
) -> Self {
Self {
buffer_lr: Buffer::new(
registry.register_event(&left, PollEvent::Readable, f_left),
registry.register_event(&right, PollEvent::Writable, f_right),
registry,
),
buffer_rl: Buffer::new(
registry.register_event(&right, PollEvent::Readable, f_right),
registry.register_event(&left, PollEvent::Writable, f_left),
registry,
),
left,
right,
background: false,
}
}
pub(super) fn left(&self) -> &L {
&self.left
}
pub(super) fn left_mut(&mut self) -> &mut L {
&mut self.left
}
pub(super) fn right(&self) -> &R {
&self.right
}
pub(super) fn ignore_events<T: Process>(&mut self, registry: &mut EventRegistry<T>) {
self.buffer_lr.read_handle.ignore(registry);
self.buffer_lr.write_handle.ignore(registry);
self.buffer_rl.read_handle.ignore(registry);
self.buffer_rl.write_handle.ignore(registry);
}
pub(super) fn disable_input<T: Process>(&mut self, registry: &mut EventRegistry<T>) {
self.buffer_lr.read_handle.ignore(registry);
self.background = true;
}
pub(super) fn enable_input<T: Process>(&mut self, registry: &mut EventRegistry<T>) {
self.buffer_lr.read_handle.resume(registry);
self.background = false;
}
pub(super) fn resume_events<T: Process>(&mut self, registry: &mut EventRegistry<T>) {
if !self.background {
self.buffer_lr.read_handle.resume(registry);
}
self.buffer_lr.write_handle.resume(registry);
self.buffer_rl.read_handle.resume(registry);
self.buffer_rl.write_handle.resume(registry);
}
pub(super) fn on_left_event<T: Process>(
&mut self,
poll_event: PollEvent,
registry: &mut EventRegistry<T>,
) -> io::Result<()> {
match poll_event {
PollEvent::Readable => self.buffer_lr.read(&mut self.left, registry),
PollEvent::Writable => {
if self.buffer_rl.write(&mut self.left, registry)? {
self.buffer_rl.read_handle.resume(registry);
}
Ok(())
}
}
}
pub(super) fn on_right_event<T: Process>(
&mut self,
poll_event: PollEvent,
registry: &mut EventRegistry<T>,
) -> io::Result<()> {
match poll_event {
PollEvent::Readable => self.buffer_rl.read(&mut self.right, registry),
PollEvent::Writable => {
if self.buffer_lr.write(&mut self.right, registry)? && !self.background {
self.buffer_lr.read_handle.resume(registry);
}
Ok(())
}
}
}
pub(super) fn flush_left(&mut self) -> io::Result<()> {
let buffer = &mut self.buffer_rl;
let source = &mut self.right;
let sink = &mut self.left;
buffer.internal.remove(sink)?;
if buffer.write_handle.is_active() {
let mut buf = [0u8; RingBuffer::LEN];
loop {
match source.read(&mut buf) {
Ok(read_bytes) => sink.write_all(&buf[..read_bytes])?,
Err(e) if e.kind() == io::ErrorKind::WouldBlock => break,
Err(e) => return Err(e),
}
}
}
sink.flush()
}
}
struct Buffer<R, W> {
internal: RingBuffer,
read_handle: EventHandle,
write_handle: EventHandle,
marker: PhantomData<(R, W)>,
}
impl<R: Read, W: Write> Buffer<R, W> {
fn new<T: Process>(
read_handle: EventHandle,
mut write_handle: EventHandle,
registry: &mut EventRegistry<T>,
) -> Self {
write_handle.ignore(registry);
Self {
internal: RingBuffer::new(),
read_handle,
write_handle,
marker: PhantomData,
}
}
fn read<T: Process>(
&mut self,
read: &mut R,
registry: &mut EventRegistry<T>,
) -> io::Result<()> {
if self.internal.is_full() {
self.read_handle.ignore(registry);
return Ok(());
}
let inserted_len = self.internal.insert(read)?;
if inserted_len > 0 {
self.write_handle.resume(registry);
}
Ok(())
}
fn write<T: Process>(
&mut self,
write: &mut W,
registry: &mut EventRegistry<T>,
) -> io::Result<bool> {
if self.internal.is_empty() {
self.write_handle.ignore(registry);
return Ok(false);
}
let removed_len = self.internal.remove(write)?;
Ok(removed_len > 0)
}
}