rsbx 2.0.0

Enhanced implementation of SeqBox in Rust
Documentation
#![allow(dead_code)]
use reed_solomon_erasure::ReedSolomon;
use smallvec::SmallVec;
use sbx_block;
use sbx_block::Block;
use sbx_specs::{Version,
                ver_to_block_size,
                SBX_LARGEST_BLOCK_SIZE,
                SBX_FIRST_DATA_SEQ_NUM};

use std::sync::Arc;

use std::fmt;

use json_printer::{JSONPrinter,
                   BracketType};

use super::RSCodecState;

pub struct RSRepairer {
    index          : usize,
    rs_codec       : ReedSolomon,
    data_par_burst : (usize, usize, usize),
    version        : Version,
    buf            : SmallVec<[SmallVec<[u8; SBX_LARGEST_BLOCK_SIZE]>; 32]>,
    buf_present    : SmallVec<[bool; 32]>,
    ref_block      : Block,
    active         : bool,
    json_printer   : Arc<JSONPrinter>,
}

pub struct RSRepairStats<'a> {
    pub version        : Version,
    pub data_par_burst : (usize, usize, usize),
    pub successful     : bool,
    pub start_seq_num  : u32,
    pub present        : &'a SmallVec<[bool; 32]>,
    pub missing_count  : usize,
    pub present_count  : usize,
    json_printer       : Arc<JSONPrinter>,
}

impl<'a> fmt::Display for RSRepairStats<'a> {
    fn fmt(&self, f : &mut fmt::Formatter) -> fmt::Result {
        let block_size = ver_to_block_size(self.version) as u64;

        let json_printer = &self.json_printer;

        let end_seq_num_inc = self.start_seq_num + self.present.len() as u32 - 1;

        if json_printer.json_enabled() {
            if self.missing_count > 0 {
                json_printer.write_open_bracket(f, None, BracketType::Curly)?;

                if self.successful {
                    write_maybe_json!(f, json_printer, "success : true")?;
                } else {
                    write_maybe_json!(f, json_printer, "success : false")?;
                }

                write_maybe_json!(f, json_printer, "block set start : {}",
                                  self.start_seq_num)?;
                write_maybe_json!(f, json_printer, "block set end inclusive : {}",
                                  end_seq_num_inc)?;

                {
                    json_printer.write_open_bracket(f, Some("blocks"), BracketType::Square)?;

                    for i in 0..self.present.len() {
                        if !self.present[i] {
                            json_printer.write_open_bracket(f, None, BracketType::Curly)?;

                            let seq_num = self.start_seq_num + i as u32;

                            let index     =
                                sbx_block::calc_data_block_write_index(seq_num,
                                                                       None,
                                                                       Some(self.data_par_burst));
                            let block_pos = index * block_size;

                            write_maybe_json!(f, json_printer, "seq num : {}", seq_num)?;
                            write_maybe_json!(f, json_printer, "pos : {}", block_pos)?;

                            json_printer.write_close_bracket(f)?;
                        }
                    }

                    json_printer.write_close_bracket(f)?;
                }

                json_printer.write_close_bracket(f)?;

                Ok(())
            } else {
                Ok(())
            }
        } else {
            if self.missing_count > 0 {
                if self.successful {
                    write!(f, "Repair successful for ")?;
                } else {
                    write!(f, "Repair failed     for ")?;
                }

                write!(f, "block set [{}..={}], ",
                       self.start_seq_num,
                       end_seq_num_inc)?;

                if self.successful {
                    write!(f, "repaired block no. : ")?;
                } else {
                    write!(f, "failed   block no. : ")?;
                }

                let mut first_num = true;
                for i in 0..self.present.len() {
                    if !self.present[i] {
                        let seq_num = self.start_seq_num + i as u32;

                        if !first_num {
                            writeln!(f, "")?;
                        }

                        let index     =
                            sbx_block::calc_data_block_write_index(seq_num,
                                                                   None,
                                                                   Some(self.data_par_burst));
                        let block_pos = index * block_size;

                        write!(f, "{} at byte {} (0x{:X})",
                               seq_num,
                               block_pos,
                               block_pos)?;

                        first_num = false;
                    }
                }
                Ok(())
            } else {
                Ok(())
            }
        }
    }
}

macro_rules! mark_active {
    (
        $self:ident
    ) => {{
        $self.active = true;
    }}
}

macro_rules! mark_inactive {
    (
        $self:ident
    ) => {{
        $self.active = false;
    }}
}

macro_rules! incre_index {
    (
        $self:ident
    ) => {{
        $self.index += 1;
    }}
}

macro_rules! reset_index {
    (
        $self:ident
    ) => {{
        $self.index = 0;
    }}
}

macro_rules! codec_ready {
    (
        $self:ident
    ) => {{
        $self.index == $self.rs_codec.total_shard_count()
    }}
}

impl RSRepairer {
    pub fn new(json_printer  : &Arc<JSONPrinter>,
               ref_block     : &Block,
               data_shards   : usize,
               parity_shards : usize,
               burst         : usize) -> RSRepairer {
        let version    = ref_block.get_version();
        let block_size = ver_to_block_size(version);

        let buf : SmallVec<[SmallVec<[u8; SBX_LARGEST_BLOCK_SIZE]>; 32]> =
            smallvec![smallvec![0; block_size]; data_shards + parity_shards];
        let buf_present : SmallVec<[bool; 32]> =
            smallvec![false; data_shards + parity_shards];

        RSRepairer {
            index          : 0,
            rs_codec       : ReedSolomon::new(data_shards,
                                           parity_shards).unwrap(),
            data_par_burst : (data_shards, parity_shards, burst),
            version,
            buf,
            buf_present,
            ref_block      : ref_block.clone(),
            active         : false,
            json_printer   : Arc::clone(json_printer),
        }
    }

    pub fn get_block_buffer(&mut self) -> &mut [u8] {
        assert_not_ready!(self);

        sbx_block::slice_buf_mut(self.version, &mut self.buf[self.index])
    }

    pub fn active(&self) -> bool {
        self.active
    }

    pub fn unfilled_slot_count(&self) -> usize {
        self.total_slot_count() - self.index
    }

    pub fn total_slot_count(&self) -> usize {
        self.rs_codec.total_shard_count()
    }

    pub fn mark_present(&mut self) -> RSCodecState {
        assert_not_ready!(self);

        self.buf_present[self.index] = true;

        incre_index!(self);

        mark_active!(self);

        if codec_ready!(self) {
            RSCodecState::Ready
        } else {
            RSCodecState::NotReady
        }
    }

    pub fn mark_missing(&mut self) -> RSCodecState {
        assert_not_ready!(self);

        self.buf_present[self.index] = false;

        incre_index!(self);

        mark_active!(self);

        if codec_ready!(self) {
            RSCodecState::Ready
        } else {
            RSCodecState::NotReady
        }
    }

    fn missing_count(&self) -> usize {
        self.rs_codec.total_shard_count() - self.present_count()
    }

    fn present_count(&self) -> usize {
        let mut count = 0;
        for p in self.buf_present.iter() {
            if *p { count += 1; }
        }
        count
    }

    pub fn repair_with_block_sync(&mut self,
                                  seq_num : u32)
                                  ->
        (RSRepairStats,
         SmallVec<[(u64, &[u8]); 32]>)
    {
        assert_ready!(self);

        assert!(seq_num >= SBX_FIRST_DATA_SEQ_NUM);

        let mut repaired_blocks =
            SmallVec::with_capacity(self.rs_codec.parity_shard_count());

        let rs_codec      = &self.rs_codec;

        let successful;
        {
            let mut buf : SmallVec<[&mut [u8]; 32]> =
                SmallVec::with_capacity(rs_codec.total_shard_count());
            for s in self.buf.iter_mut() {
                buf.push(sbx_block::slice_data_buf_mut(self.version, s));
            }

            // reconstruct data portion
            successful =
                match rs_codec.reconstruct(&mut buf, &self.buf_present) {
                    Ok(()) => true,
                    Err(_) => false
                };
        }

        let block_set_size = self.rs_codec.total_shard_count() as u32;

        let data_index = seq_num - SBX_FIRST_DATA_SEQ_NUM;

        let block_set_index = data_index / block_set_size;

        let first_data_index_in_cur_set = block_set_index * block_set_size;

        let first_seq_num_in_cur_set = first_data_index_in_cur_set + 1;

        // reconstruct header if successful
        if successful {
            for i in 0..block_set_size as usize {
                if !self.buf_present[i] {
                    self.ref_block.set_seq_num(first_seq_num_in_cur_set + i as u32);
                    self.ref_block.sync_to_buffer(None,
                                                  &mut self.buf[i]).unwrap();
                }
            }
            for i in 0..block_set_size as usize {
                let cur_seq_num = first_seq_num_in_cur_set + i as u32;
                if !self.buf_present[i] {
                    let pos = sbx_block::calc_data_block_write_pos(self.version,
                                                                   cur_seq_num,
                                                                   None,
                                                                   Some(self.data_par_burst));
                    repaired_blocks.push((pos, sbx_block::slice_buf(self.version,
                                                                    &self.buf[i])));
                }
            }
        }

        mark_inactive!(self);

        reset_index!(self);

        (RSRepairStats { version        : self.version,
                         data_par_burst : self.data_par_burst,
                         successful,
                         json_printer   : Arc::clone(&self.json_printer),
                         start_seq_num  : first_seq_num_in_cur_set,
                         present        : &self.buf_present,
                         missing_count  : self.missing_count(),
                         present_count  : self.present_count(), },
         repaired_blocks)
    }
}