use std::{io, fs, cmp};
use std::collections::VecDeque;
use crate::{fs as rapidtar_fs, tape};
#[derive(Clone, Debug)]
pub struct DataZone<P> {
pub ident: Option<P>,
pub length: u64,
pub committed_length: u64,
pub uncommitted_length: u64,
}
impl<P> DataZone<P> {
pub fn new(ident: P) -> DataZone<P> {
DataZone{
ident: Some(ident),
length: 0,
committed_length: 0,
uncommitted_length: 0
}
}
pub fn for_resumption(ident: P, committed: u64) -> DataZone<P> {
DataZone{
ident: Some(ident),
length: committed,
committed_length: committed,
uncommitted_length: 0
}
}
pub fn slack_zone() -> DataZone<P> {
DataZone{
ident: None,
length: 0,
committed_length: 0,
uncommitted_length: 0
}
}
pub fn write_through(&mut self, length: u64) {
self.length += length;
self.committed_length += length;
}
pub fn write_buffered(&mut self, length: u64) {
self.length += length;
self.uncommitted_length += length;
}
pub fn write_committed(&mut self, length: u64) -> Option<u64> {
if self.uncommitted_length >= length {
self.uncommitted_length -= length;
self.committed_length += length;
return None;
}
let overhang = length - self.uncommitted_length;
self.uncommitted_length = 0;
self.committed_length += length - overhang;
Some(overhang)
}
}
impl<P> DataZone<P> where P: Clone + PartialEq {
pub fn merge_zone(&self, other: &Self) -> Option<Self> {
if self.ident != other.ident {
return None;
}
let merged_length = cmp::max(self.length, other.length);
let merged_commit = cmp::min(self.committed_length, other.committed_length);
let consistent_uncommit = merged_length - merged_commit;
Some(DataZone{
ident: self.ident.clone(),
length: merged_length,
committed_length: merged_commit,
uncommitted_length: consistent_uncommit
})
}
}
pub struct DataZoneStream<P> {
cur_zone: Option<DataZone<P>>,
pending_zones: VecDeque<DataZone<P>>
}
impl<P> DataZoneStream<P> where P: Clone + PartialEq {
pub fn new() -> DataZoneStream<P> {
DataZoneStream{
cur_zone: None,
pending_zones: VecDeque::new()
}
}
pub fn write_buffered(&mut self, length: u64) {
if let Some(ref mut zone) = self.cur_zone {
zone.write_buffered(length);
}
}
pub fn write_through(&mut self, length: u64) {
if let Some(ref mut zone) = self.cur_zone {
zone.write_through(length);
}
}
pub fn write_committed(&mut self, length: u64) -> Option<u64> {
let mut commit_remain = length as u64;
while let Some(zone) = self.pending_zones.front_mut() {
commit_remain = zone.write_committed(commit_remain).unwrap_or(0);
if commit_remain == 0 {
return None;
}
self.pending_zones.pop_front();
}
if commit_remain > 0 {
if let Some(ref mut curzone) = self.cur_zone {
return curzone.write_committed(commit_remain);
}
return Some(commit_remain);
} else {
return None;
}
}
pub fn begin_data_zone(&mut self, ident: P) {
self.end_data_zone();
self.cur_zone = Some(DataZone::new(ident.clone()));
}
pub fn resume_data_zone(&mut self, ident: P, committed: u64) {
self.end_data_zone();
self.cur_zone = Some(DataZone::for_resumption(ident.clone(), committed));
}
pub fn end_data_zone(&mut self) {
if let Some(ref zone) = self.cur_zone {
if let Some(_) = zone.ident {
self.pending_zones.push_back(zone.clone());
} else if zone.length > 0 {
self.pending_zones.push_back(zone.clone());
}
}
self.cur_zone = Some(DataZone::slack_zone());
}
pub fn uncommitted_writes(&self, chain: Option<Vec<DataZone<P>>>) -> Vec<DataZone<P>> {
return match chain {
Some(mut zonelist) => {
let first_ident = match self.pending_zones.front() {
Some(datazone) => Some(datazone.ident.clone()),
None => match &self.cur_zone {
Some(curzone) => Some(curzone.ident.clone()),
None => None
}
};
if let Some(first_ident) = first_ident {
let mut i = 0;
let mut start_match = None;
for zone in zonelist.iter() {
if zone.ident == first_ident {
start_match = Some(i);
break;
}
i += 1;
}
if let Some(start_match) = start_match {
let mut inner_iter = zonelist.iter_mut();
for _ in 0..start_match {
inner_iter.next();
}
let my_iter = self.pending_zones.iter();
let mut merge_count = 0;
for (inner, mine) in inner_iter.zip(my_iter) {
if let Some(new_inner) = inner.merge_zone(mine) {
*inner = new_inner;
merge_count += 1;
}
break;
}
if self.pending_zones.len() < merge_count {
let mut my_iter = self.pending_zones.iter();
for _ in 0..merge_count {
my_iter.next();
}
for unmergeable in my_iter {
zonelist.push(unmergeable.clone());
}
if let Some(cur_zone) = &self.cur_zone {
zonelist.push(cur_zone.clone());
}
} else {
if let Some(cur_zone) = &self.cur_zone {
if let Some(inner) = zonelist.get_mut(start_match + merge_count) {
if let Some(new_inner) = inner.merge_zone(&cur_zone) {
*inner = new_inner;
} else {
zonelist.push(cur_zone.clone());
}
} else {
zonelist.push(cur_zone.clone());
}
}
}
} else {
let (left_cz, right_cz) = self.pending_zones.as_slices();
if left_cz.len() > 0 {
zonelist.extend_from_slice(left_cz);
}
if right_cz.len() > 0 {
zonelist.extend_from_slice(right_cz);
}
if let Some(cur_zone) = &self.cur_zone {
zonelist.push(cur_zone.clone());
}
}
}
if let Some(ref maybe_slack) = zonelist.get(zonelist.len() - 1) {
if let None = maybe_slack.ident {
if maybe_slack.length == 0 {
zonelist.pop();
}
}
}
zonelist
},
None => {
let mut zonelist = Vec::new();
let (left_cz, right_cz) = self.pending_zones.as_slices();
if left_cz.len() > 0 {
zonelist.extend_from_slice(left_cz);
}
if right_cz.len() > 0 {
zonelist.extend_from_slice(right_cz);
}
if let Some(cur_zone) = &self.cur_zone {
zonelist.push(cur_zone.clone());
}
zonelist
}
}
}
}
pub trait RecoverableWrite<P> : io::Write {
fn begin_data_zone(&mut self, _ident: P) {
}
fn resume_data_zone(&mut self, _ident: P, _committed: u64) {
}
fn end_data_zone(&mut self) {
}
fn uncommitted_writes(&self) -> Vec<DataZone<P>> {
Vec::new()
}
}
impl <T, P> RecoverableWrite<P> for io::Cursor<T> where io::Cursor<T> : io::Write {
}
impl <P> RecoverableWrite<P> for fs::File {
}
pub struct UnbufferedWriter<W: io::Write> {
inner: W
}
impl<W: io::Write> UnbufferedWriter<W> {
pub fn wrap(inner: W) -> UnbufferedWriter<W> {
UnbufferedWriter {
inner: inner
}
}
pub fn as_inner_writer<'a>(&'a self) -> &'a W {
&self.inner
}
}
impl <W: io::Write> io::Write for UnbufferedWriter<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.inner.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
}
}
impl <W: io::Write, P> RecoverableWrite<P> for UnbufferedWriter<W> {
}
pub struct LimitingWriter<W: io::Write> {
remain: u64,
inner: W,
}
impl<W: io::Write> LimitingWriter<W> {
pub fn wrap(inner: W, limit: u64) -> LimitingWriter<W> {
LimitingWriter {
inner: inner,
remain: limit
}
}
pub fn as_inner_writer(&self) -> &W {
&self.inner
}
}
impl <W: io::Write> io::Write for LimitingWriter<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if buf.len() as u64 > self.remain {
self.inner.flush()?;
return Ok(0)
}
self.remain -= buf.len() as u64;
self.inner.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
}
}
impl<W, I> RecoverableWrite<I> for LimitingWriter<W> where W: RecoverableWrite<I> {
fn begin_data_zone(&mut self, ident: I) {
self.inner.begin_data_zone(ident);
}
fn resume_data_zone(&mut self, ident: I, committed: u64) {
self.inner.resume_data_zone(ident, committed);
}
fn end_data_zone(&mut self) {
self.inner.end_data_zone();
}
fn uncommitted_writes(&self) -> Vec<DataZone<I>> {
self.inner.uncommitted_writes()
}
}
impl<W, I> rapidtar_fs::ArchivalSink<I> for LimitingWriter<W> where W: rapidtar_fs::ArchivalSink<I> + Send {
fn downcast_seek(&mut self) -> Option<&mut dyn io::Seek> {
self.inner.downcast_seek()
}
fn downcast_tapedevice(&mut self) -> Option<&mut dyn tape::TapeDevice> {
self.inner.downcast_tapedevice()
}
}
#[cfg(test)]
mod tests {
use super::{DataZone, DataZoneStream};
#[test]
fn datazone_buffer() {
let mut dz = DataZone::new(0);
dz.write_buffered(1024);
let commit_result = dz.write_committed(768);
assert_eq!(dz.length, 1024);
assert_eq!(dz.committed_length, 768);
assert_eq!(dz.uncommitted_length, 256);
assert_eq!(commit_result, None);
}
#[test]
fn datazone_overhang() {
let mut dz = DataZone::new(0);
dz.write_buffered(1024);
let commit_result = dz.write_committed(1536);
assert_eq!(dz.length, 1024);
assert_eq!(dz.committed_length, 1024);
assert_eq!(dz.uncommitted_length, 0);
assert_eq!(commit_result, Some(512));
}
#[test]
fn datazone_overhang_exact() {
let mut dz = DataZone::new(0);
dz.write_buffered(1536);
let commit_result = dz.write_committed(1536);
assert_eq!(dz.length, 1536);
assert_eq!(dz.committed_length, 1536);
assert_eq!(dz.uncommitted_length, 0);
assert_eq!(commit_result, None);
}
#[test]
fn datazone_stream() {
let mut dzs = DataZoneStream::new();
dzs.begin_data_zone(0);
dzs.write_buffered(512);
dzs.begin_data_zone(1);
dzs.write_buffered(1024);
dzs.begin_data_zone(2);
dzs.write_buffered(768);
let commit_result = dzs.write_committed(1024);
let uncommitted_zones = dzs.uncommitted_writes(None);
assert_eq!(commit_result, None);
assert_eq!(uncommitted_zones.len(), 2);
assert_eq!(uncommitted_zones[0].ident, Some(1));
assert_eq!(uncommitted_zones[0].length, 1024);
assert_eq!(uncommitted_zones[0].committed_length, 512);
assert_eq!(uncommitted_zones[0].uncommitted_length, 512);
assert_eq!(uncommitted_zones[1].ident, Some(2));
assert_eq!(uncommitted_zones[1].length, 768);
assert_eq!(uncommitted_zones[1].committed_length, 0);
assert_eq!(uncommitted_zones[1].uncommitted_length, 768);
}
#[test]
fn datazone_stream_2x() {
let mut dzs = DataZoneStream::new();
dzs.begin_data_zone(0);
dzs.write_buffered(512);
dzs.begin_data_zone(1);
dzs.write_buffered(1024);
dzs.begin_data_zone(2);
dzs.write_buffered(768);
let commit_result = dzs.write_committed(2048);
let uncommitted_zones = dzs.uncommitted_writes(None);
assert_eq!(commit_result, None);
assert_eq!(uncommitted_zones.len(), 1);
assert_eq!(uncommitted_zones[0].ident, Some(2));
assert_eq!(uncommitted_zones[0].length, 768);
assert_eq!(uncommitted_zones[0].committed_length, 512);
assert_eq!(uncommitted_zones[0].uncommitted_length, 256);
}
#[test]
fn datazone_stream_overhang() {
let mut dzs = DataZoneStream::new();
dzs.begin_data_zone(0);
dzs.write_buffered(512);
dzs.begin_data_zone(1);
dzs.write_buffered(1024);
dzs.begin_data_zone(2);
dzs.write_buffered(768);
let commit_result = dzs.write_committed(4096);
let uncommitted_zones = dzs.uncommitted_writes(None);
assert_eq!(commit_result, Some(1792));
assert_eq!(uncommitted_zones.len(), 1);
assert_eq!(uncommitted_zones[0].ident, Some(2));
assert_eq!(uncommitted_zones[0].length, 768);
assert_eq!(uncommitted_zones[0].committed_length, 768);
assert_eq!(uncommitted_zones[0].uncommitted_length, 0);
}
#[test]
fn datazone_stream_merge() {
let mut dzs_behind = DataZoneStream::new();
dzs_behind.begin_data_zone(0);
dzs_behind.write_buffered(512);
dzs_behind.begin_data_zone(1);
dzs_behind.write_buffered(1024);
dzs_behind.begin_data_zone(2);
dzs_behind.write_buffered(768);
let commit_result_behind = dzs_behind.write_committed(1024);
let mut dzs = DataZoneStream::new();
dzs.begin_data_zone(0);
dzs.write_buffered(512);
dzs.begin_data_zone(1);
dzs.write_buffered(1024);
dzs.begin_data_zone(2);
dzs.write_buffered(2048);
let commit_result = dzs.write_committed(4096);
let uncommitted_zones_behind = dzs_behind.uncommitted_writes(None);
assert_eq!(commit_result_behind, None);
assert_eq!(uncommitted_zones_behind.len(), 2);
assert_eq!(uncommitted_zones_behind[0].ident, Some(1));
assert_eq!(uncommitted_zones_behind[0].length, 1024);
assert_eq!(uncommitted_zones_behind[0].committed_length, 512);
assert_eq!(uncommitted_zones_behind[0].uncommitted_length, 512);
assert_eq!(uncommitted_zones_behind[1].ident, Some(2));
assert_eq!(uncommitted_zones_behind[1].length, 768);
assert_eq!(uncommitted_zones_behind[1].committed_length, 0);
assert_eq!(uncommitted_zones_behind[1].uncommitted_length, 768);
let uncommitted_zones = dzs.uncommitted_writes(Some(uncommitted_zones_behind));
assert_eq!(commit_result, Some(512));
assert_eq!(uncommitted_zones.len(), 2);
assert_eq!(uncommitted_zones[0].ident, Some(1));
assert_eq!(uncommitted_zones[0].length, 1024);
assert_eq!(uncommitted_zones[0].committed_length, 512);
assert_eq!(uncommitted_zones[0].uncommitted_length, 512);
assert_eq!(uncommitted_zones[1].ident, Some(2));
assert_eq!(uncommitted_zones[1].length, 2048);
assert_eq!(uncommitted_zones[1].committed_length, 0);
assert_eq!(uncommitted_zones[1].uncommitted_length, 2048);
}
#[test]
fn datazone_stream_overslack() {
let mut dzs = DataZoneStream::new();
dzs.begin_data_zone(0);
dzs.write_buffered(512);
dzs.end_data_zone();
dzs.write_buffered(512);
dzs.begin_data_zone(1);
dzs.write_buffered(1024);
dzs.begin_data_zone(2);
dzs.write_buffered(768);
let commit_result = dzs.write_committed(4096);
let uncommitted_zones = dzs.uncommitted_writes(None);
assert_eq!(commit_result, Some(1280));
assert_eq!(uncommitted_zones.len(), 1);
assert_eq!(uncommitted_zones[0].ident, Some(2));
assert_eq!(uncommitted_zones[0].length, 768);
assert_eq!(uncommitted_zones[0].committed_length, 768);
assert_eq!(uncommitted_zones[0].uncommitted_length, 0);
}
#[test]
fn datazone_stream_mergeslack() {
let mut dzs_behind = DataZoneStream::new();
dzs_behind.begin_data_zone(0);
dzs_behind.write_buffered(512);
dzs_behind.begin_data_zone(1);
dzs_behind.write_buffered(1024);
dzs_behind.end_data_zone();
dzs_behind.write_buffered(512);
dzs_behind.begin_data_zone(2);
dzs_behind.write_buffered(768);
let commit_result_behind = dzs_behind.write_committed(1024);
let mut dzs = DataZoneStream::new();
dzs.begin_data_zone(0);
dzs.write_buffered(512);
dzs.begin_data_zone(1);
dzs.write_buffered(1024);
dzs.end_data_zone();
dzs.write_buffered(512);
dzs.begin_data_zone(2);
dzs.write_buffered(1536);
let commit_result = dzs.write_committed(4096);
let uncommitted_zones_behind = dzs_behind.uncommitted_writes(None);
assert_eq!(commit_result_behind, None);
assert_eq!(uncommitted_zones_behind.len(), 3);
assert_eq!(uncommitted_zones_behind[0].ident, Some(1));
assert_eq!(uncommitted_zones_behind[0].length, 1024);
assert_eq!(uncommitted_zones_behind[0].committed_length, 512);
assert_eq!(uncommitted_zones_behind[0].uncommitted_length, 512);
assert_eq!(uncommitted_zones_behind[1].ident, None);
assert_eq!(uncommitted_zones_behind[1].length, 512);
assert_eq!(uncommitted_zones_behind[1].committed_length, 0);
assert_eq!(uncommitted_zones_behind[1].uncommitted_length, 512);
assert_eq!(uncommitted_zones_behind[2].ident, Some(2));
assert_eq!(uncommitted_zones_behind[2].length, 768);
assert_eq!(uncommitted_zones_behind[2].committed_length, 0);
assert_eq!(uncommitted_zones_behind[2].uncommitted_length, 768);
let uncommitted_zones = dzs.uncommitted_writes(Some(uncommitted_zones_behind));
assert_eq!(commit_result, Some(512));
assert_eq!(uncommitted_zones.len(), 3);
assert_eq!(uncommitted_zones[0].ident, Some(1));
assert_eq!(uncommitted_zones[0].length, 1024);
assert_eq!(uncommitted_zones[0].committed_length, 512);
assert_eq!(uncommitted_zones[0].uncommitted_length, 512);
assert_eq!(uncommitted_zones[1].ident, None);
assert_eq!(uncommitted_zones[1].length, 512);
assert_eq!(uncommitted_zones[1].committed_length, 0);
assert_eq!(uncommitted_zones[1].uncommitted_length, 512);
assert_eq!(uncommitted_zones[2].ident, Some(2));
assert_eq!(uncommitted_zones[2].length, 1536);
assert_eq!(uncommitted_zones[2].committed_length, 0);
assert_eq!(uncommitted_zones[2].uncommitted_length, 1536);
}
}