use crate::tasks::{IOAction, IOCallback, IOEvent, IOEventMerged};
use crossfire::BlockingTxTrait;
use embed_collections::SegList;
use io_buffer::Buffer;
use nix::errno::Errno;
use std::io;
use std::os::fd::RawFd;
struct MergedInfo<C: IOCallback> {
first_event: Box<IOEvent<C>>,
tail_offset: i64,
total_size: usize,
}
pub struct MergeBuffer<C: IOCallback> {
pub merge_size_limit: usize,
merged_info: Option<MergedInfo<C>>,
merged_events: SegList<IOEventMerged<C>>,
}
impl<C: IOCallback> MergeBuffer<C> {
#[inline(always)]
pub fn new(merge_size_limit: usize) -> Self {
Self { merge_size_limit, merged_info: None, merged_events: SegList::new() }
}
#[inline(always)]
pub fn may_add_event(&mut self, event: &IOEvent<C>) -> bool {
if let Some(ref info) = self.merged_info {
if event.get_size() as usize > self.merge_size_limit {
return false;
}
return info.tail_offset == event.offset;
} else {
return true;
}
}
#[inline(always)]
pub fn push_event(&mut self, event: IOEvent<C>) -> bool {
if let Some(ref mut info) = self.merged_info {
debug_assert_eq!(info.tail_offset, event.offset, "push_event: event not contiguous");
debug_assert!(
info.total_size + event.get_size() as usize <= self.merge_size_limit,
"push_event: exceeds merge_size_limit"
);
if self.merged_events.is_empty() {
let first_merged = info.first_event.extract_merged();
self.merged_events.push(first_merged);
}
info.total_size += event.get_size() as usize;
info.tail_offset += event.get_size() as i64;
self.merged_events.push(event.into_merged());
return info.total_size >= self.merge_size_limit;
} else {
let size = event.get_size() as usize;
let offset = event.offset;
self.merged_info = Some(MergedInfo {
first_event: Box::new(event),
tail_offset: offset + size as i64,
total_size: size,
});
return size >= self.merge_size_limit;
}
}
#[inline(always)]
pub fn len(&self) -> usize {
if !self.merged_events.is_empty() {
self.merged_events.len()
} else {
self.merged_info.as_ref().map(|_| 1).unwrap_or(0)
}
}
#[inline(always)]
fn take(&mut self, action: IOAction) -> Option<Box<IOEvent<C>>> {
let info = self.merged_info.take()?;
if self.merged_events.is_empty() {
return Some(info.first_event);
}
let sub_tasks = std::mem::replace(&mut self.merged_events, SegList::new());
debug_assert!(sub_tasks.len() > 1);
let size = info.total_size;
let offset = info.first_event.offset;
match Buffer::aligned(size as i32) {
Ok(mut buffer) => {
if action == IOAction::Write {
let mut write_offset = 0;
for merged in sub_tasks.iter() {
buffer.copy_from(write_offset, merged.buf.as_ref());
write_offset += merged.buf.len();
}
}
let mut master = info.first_event;
master.set_merged_tasks(buffer, sub_tasks);
Some(master)
}
Err(_) => {
for merged in sub_tasks.drain() {
if let Some(cb) = merged.cb {
cb.call(offset, Err(Errno::ENOMEM));
}
}
None
}
}
}
#[inline]
pub fn flush(&mut self, fd: RawFd, action: IOAction) -> Option<IOEvent<C>> {
let mut master = self.take(action)?;
master.set_fd(fd);
Some(*master)
}
}
pub struct MergeSubmitter<C: IOCallback, S: BlockingTxTrait<Box<IOEvent<C>>>> {
fd: RawFd,
buffer: MergeBuffer<C>,
sender: S,
action: IOAction,
}
impl<C: IOCallback, S: BlockingTxTrait<Box<IOEvent<C>>>> MergeSubmitter<C, S> {
pub fn new(fd: RawFd, sender: S, merge_size_limit: usize, action: IOAction) -> Self {
log_assert!(merge_size_limit > 0);
Self { fd, buffer: MergeBuffer::<C>::new(merge_size_limit), sender, action }
}
pub fn add_event(&mut self, mut event: IOEvent<C>) -> Result<(), io::Error> {
log_debug_assert_eq!(self.fd, event.fd);
log_debug_assert_eq!(event.action, self.action);
let event_size = event.get_size();
if event_size >= self.buffer.merge_size_limit as u64 || !self.buffer.may_add_event(&event) {
if let Err(e) = self._flush() {
event.set_error(Errno::ESHUTDOWN as i32);
event.callback_unchecked(false);
return Err(e);
}
}
if self.buffer.push_event(event) {
self._flush()?;
}
return Ok(());
}
pub fn flush(&mut self) -> Result<(), io::Error> {
self._flush()
}
#[inline(always)]
fn _flush(&mut self) -> Result<(), io::Error> {
if let Some(event) = self.buffer.flush(self.fd, self.action) {
trace!("mio: submit event from flush {:?}", event);
self.sender
.send(Box::new(event))
.map_err(|_| io::Error::new(io::ErrorKind::Other, "Queue closed"))?;
}
Ok(())
}
}