buffett-core 0.1.1

Core library for Bitconch:buffett
Documentation
use crate::packet::{SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE};
use buffett_interface::pubkey::Pubkey;
use std::cmp;
use std::mem;
use std::result;
use crate::window::WindowSlot;

pub const NUM_DATA: usize = 16; 
pub const NUM_CODING: usize = 4; 
pub const ERASURE_SET_SIZE: usize = NUM_DATA + NUM_CODING; 

pub const JERASURE_ALIGN: usize = 4; 

macro_rules! align {
    ($x:expr, $align:expr) => {
        $x + ($align - 1) & !($align - 1)
    };
}

#[derive(Debug, PartialEq, Eq)]
pub enum ErasureError {
    NotEnoughBlocksToDecode,
    DecodeError,
    EncodeError,
    InvalidBlockSize,
}

pub type Result<T> = result::Result<T, ErasureError>;



extern "C" {
    fn jerasure_matrix_encode(
        k: i32,
        m: i32,
        w: i32,
        matrix: *const i32,
        data_ptrs: *const *const u8,
        coding_ptrs: *const *mut u8,
        size: i32,
    );
    fn jerasure_matrix_decode(
        k: i32,
        m: i32,
        w: i32,
        matrix: *const i32,
        row_k_ones: i32,
        erasures: *const i32,
        data_ptrs: *const *mut u8,
        coding_ptrs: *const *mut u8,
        size: i32,
    ) -> i32;
    fn galois_single_divide(a: i32, b: i32, w: i32) -> i32;
}

fn get_matrix(m: i32, k: i32, w: i32) -> Vec<i32> {
    let mut matrix = vec![0; (m * k) as usize];
    for i in 0..m {
        for j in 0..k {
            unsafe {
                matrix[(i * k + j) as usize] = galois_single_divide(1, i ^ (m + j), w);
            }
        }
    }
    matrix
}

pub const ERASURE_W: i32 = 32;


pub fn generate_coding_blocks(coding: &mut [&mut [u8]], data: &[&[u8]]) -> Result<()> {
    if data.is_empty() {
        return Ok(());
    }
    let k = data.len() as i32;
    let m = coding.len() as i32;
    let block_len = data[0].len() as i32;
    let matrix: Vec<i32> = get_matrix(m, k, ERASURE_W);
    let mut data_arg = Vec::with_capacity(data.len());
    for block in data {
        if block_len != block.len() as i32 {
            error!(
                "data block size incorrect {} expected {}",
                block.len(),
                block_len
            );
            return Err(ErasureError::InvalidBlockSize);
        }
        data_arg.push(block.as_ptr());
    }
    let mut coding_arg = Vec::with_capacity(coding.len());
    for mut block in coding {
        if block_len != block.len() as i32 {
            error!(
                "coding block size incorrect {} expected {}",
                block.len(),
                block_len
            );
            return Err(ErasureError::InvalidBlockSize);
        }
        coding_arg.push(block.as_mut_ptr());
    }

    unsafe {
        jerasure_matrix_encode(
            k,
            m,
            ERASURE_W,
            matrix.as_ptr(),
            data_arg.as_ptr(),
            coding_arg.as_ptr(),
            block_len,
        );
    }
    Ok(())
}


pub fn decode_blocks(
    data: &mut [&mut [u8]],
    coding: &mut [&mut [u8]],
    erasures: &[i32],
) -> Result<()> {
    if data.is_empty() {
        return Ok(());
    }
    let block_len = data[0].len();
    let matrix: Vec<i32> = get_matrix(coding.len() as i32, data.len() as i32, ERASURE_W);

    
    let mut coding_arg: Vec<*mut u8> = Vec::new();
    for x in coding.iter_mut() {
        if x.len() != block_len {
            return Err(ErasureError::InvalidBlockSize);
        }
        coding_arg.push(x.as_mut_ptr());
    }

    
    let mut data_arg: Vec<*mut u8> = Vec::new();
    for x in data.iter_mut() {
        if x.len() != block_len {
            return Err(ErasureError::InvalidBlockSize);
        }
        data_arg.push(x.as_mut_ptr());
    }
    let ret = unsafe {
        jerasure_matrix_decode(
            data.len() as i32,
            coding.len() as i32,
            ERASURE_W,
            matrix.as_ptr(),
            0,
            erasures.as_ptr(),
            data_arg.as_ptr(),
            coding_arg.as_ptr(),
            data[0].len() as i32,
        )
    };
    trace!("jerasure_matrix_decode ret: {}", ret);
    for x in data[erasures[0] as usize][0..8].iter() {
        trace!("{} ", x)
    }
    trace!("");
    if ret < 0 {
        return Err(ErasureError::DecodeError);
    }
    Ok(())
}


pub fn generate_coding(
    id: &Pubkey,
    window: &mut [WindowSlot],
    receive_index: u64,
    num_blobs: usize,
    transmit_index_coding: &mut u64,
) -> Result<()> {
    
    let coding_index_start =
        receive_index - (receive_index % NUM_DATA as u64) + (NUM_DATA - NUM_CODING) as u64;

    let start_idx = receive_index as usize % window.len();
    let mut block_start = start_idx - (start_idx % NUM_DATA);

    loop {
        let block_end = block_start + NUM_DATA;
        if block_end > (start_idx + num_blobs) {
            break;
        }
        info!(
            "generate_coding {} start: {} end: {} start_idx: {} num_blobs: {}",
            id, block_start, block_end, start_idx, num_blobs
        );

        let mut max_data_size = 0;

        
        for i in block_start..block_end {
            let n = i % window.len();
            trace!("{} window[{}] = {:?}", id, n, window[n].data);

            if let Some(b) = &window[n].data {
                max_data_size = cmp::max(b.read().unwrap().meta.size, max_data_size);
            } else {
                trace!("{} data block is null @ {}", id, n);
                return Ok(());
            }
        }

       
        max_data_size = align!(max_data_size, JERASURE_ALIGN);

        trace!("{} max_data_size: {}", id, max_data_size);

        let mut data_blobs = Vec::with_capacity(NUM_DATA);
        for i in block_start..block_end {
            let n = i % window.len();

            if let Some(b) = &window[n].data {
                
                let mut b_wl = b.write().unwrap();
                for i in b_wl.meta.size..max_data_size {
                    b_wl.data[i] = 0;
                }
                data_blobs.push(b);
            }
        }

        
        *transmit_index_coding = cmp::min(*transmit_index_coding, coding_index_start);

        let mut coding_blobs = Vec::with_capacity(NUM_CODING);
        let coding_start = block_end - NUM_CODING;
        for i in coding_start..block_end {
            let n = i % window.len();
            assert!(window[n].coding.is_none());

            window[n].coding = Some(SharedBlob::default());

            let coding = window[n].coding.clone().unwrap();
            let mut coding_wl = coding.write().unwrap();
            for i in 0..max_data_size {
                coding_wl.data[i] = 0;
            }
            
            if let Some(data) = &window[n].data {
                let data_rl = data.read().unwrap();

                let index = data_rl.get_index().unwrap();
                let id = data_rl.get_id().unwrap();

                trace!(
                    "{} copying index {} id {:?} from data to coding",
                    id,
                    index,
                    id
                );
                coding_wl.set_index(index).unwrap();
                coding_wl.set_id(id).unwrap();
            }
            coding_wl.set_size(max_data_size);
            if coding_wl.set_coding().is_err() {
                return Err(ErasureError::EncodeError);
            }

            coding_blobs.push(coding.clone());
        }

        let data_locks: Vec<_> = data_blobs.iter().map(|b| b.read().unwrap()).collect();

        let data_ptrs: Vec<_> = data_locks
            .iter()
            .enumerate()
            .map(|(i, l)| {
                trace!("{} i: {} data: {}", id, i, l.data[0]);
                &l.data[..max_data_size]
            }).collect();

        let mut coding_locks: Vec<_> = coding_blobs.iter().map(|b| b.write().unwrap()).collect();

        let mut coding_ptrs: Vec<_> = coding_locks
            .iter_mut()
            .enumerate()
            .map(|(i, l)| {
                trace!("{} i: {} coding: {}", id, i, l.data[0],);
                &mut l.data_mut()[..max_data_size]
            }).collect();

        generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?;
        debug!(
            "{} start_idx: {} data: {}:{} coding: {}:{}",
            id, start_idx, block_start, block_end, coding_start, block_end
        );
        block_start = block_end;
    }
    Ok(())
}


fn is_missing(id: &Pubkey, idx: u64, window_slot: &mut Option<SharedBlob>, c_or_d: &str) -> bool {
    if let Some(blob) = window_slot.take() {
        let blob_idx = blob.read().unwrap().get_index().unwrap();
        if blob_idx == idx {
            trace!("recover {}: idx: {} good {}", id, idx, c_or_d);
            
            mem::replace(window_slot, Some(blob));
            false
        } else {
            trace!(
                "recover {}: idx: {} old {} {}, recycling",
                id,
                idx,
                c_or_d,
                blob_idx,
            );
            true
        }
    } else {
        trace!("recover {}: idx: {} None {}", id, idx, c_or_d);
        
        true
    }
}


fn find_missing(
    id: &Pubkey,
    block_start_idx: u64,
    block_start: usize,
    window: &mut [WindowSlot],
) -> (usize, usize) {
    let mut data_missing = 0;
    let mut coding_missing = 0;
    let block_end = block_start + NUM_DATA;
    let coding_start = block_start + NUM_DATA - NUM_CODING;

    
    for i in block_start..block_end {
        let idx = (i - block_start) as u64 + block_start_idx;
        let n = i % window.len();

        if is_missing(id, idx, &mut window[n].data, "data") {
            data_missing += 1;
        }

        if i >= coding_start && is_missing(id, idx, &mut window[n].coding, "coding") {
            coding_missing += 1;
        }
    }
    (data_missing, coding_missing)
}


pub fn recover(id: &Pubkey, window: &mut [WindowSlot], start_idx: u64, start: usize) -> Result<()> {
    let block_start = start - (start % NUM_DATA);
    let block_start_idx = start_idx - (start_idx % NUM_DATA as u64);

    debug!("start: {} block_start: {}", start, block_start);

    let coding_start = block_start + NUM_DATA - NUM_CODING;
    let block_end = block_start + NUM_DATA;
    trace!(
        "recover {}: block_start_idx: {} block_start: {} coding_start: {} block_end: {}",
        id,
        block_start_idx,
        block_start,
        coding_start,
        block_end
    );

    let (data_missing, coding_missing) = find_missing(id, block_start_idx, block_start, window);

    
    if data_missing == 0 {
        
        return Ok(());
    }

    if (data_missing + coding_missing) > NUM_CODING {
        trace!(
            "recover {}: start: {} skipping recovery data: {} coding: {}",
            id,
            block_start,
            data_missing,
            coding_missing
        );
        
        return Err(ErasureError::NotEnoughBlocksToDecode);
    }

    trace!(
        "recover {}: recovering: data: {} coding: {}",
        id,
        data_missing,
        coding_missing
    );
    let mut blobs: Vec<SharedBlob> = Vec::with_capacity(NUM_DATA + NUM_CODING);
    let mut locks = Vec::with_capacity(NUM_DATA + NUM_CODING);
    let mut erasures: Vec<i32> = Vec::with_capacity(NUM_CODING);
    let mut meta = None;
    let mut size = None;

    
    for i in block_start..block_end {
        let j = i % window.len();

        if let Some(b) = window[j].data.clone() {
            if meta.is_none() {
                meta = Some(b.read().unwrap().meta.clone());
                trace!("recover {} meta at {} {:?}", id, j, meta);
            }
            blobs.push(b);
        } else {
            let n = SharedBlob::default();
            window[j].data = Some(n.clone());
            
            blobs.push(n);
            erasures.push((i - block_start) as i32);
        }
    }
    for i in coding_start..block_end {
        let j = i % window.len();
        if let Some(b) = window[j].coding.clone() {
            if size.is_none() {
                size = Some(b.read().unwrap().meta.size - BLOB_HEADER_SIZE);
                trace!(
                    "{} recover size {} from {}",
                    id,
                    size.unwrap(),
                    i as u64 + block_start_idx
                );
            }
            blobs.push(b);
        } else {
            let n = SharedBlob::default();
            window[j].coding = Some(n.clone());
            
            blobs.push(n);
            erasures.push(((i - coding_start) + NUM_DATA) as i32);
        }
    }

    
    let size = size.unwrap();
    for i in block_start..block_end {
        let j = i % window.len();

        if let Some(b) = &window[j].data {
            let mut b_wl = b.write().unwrap();
            for i in b_wl.meta.size..size {
                b_wl.data[i] = 0;
            }
        }
    }

    
    erasures.push(-1);
    trace!("erasures[]: {} {:?} data_size: {}", id, erasures, size,);
    
    for b in &blobs {
        locks.push(b.write().unwrap());
    }

    {
        let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING);
        let mut data_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_DATA);
        for (i, l) in locks.iter_mut().enumerate() {
            if i < NUM_DATA {
                trace!("{} pushing data: {}", id, i);
                data_ptrs.push(&mut l.data[..size]);
            } else {
                trace!("{} pushing coding: {}", id, i);
                coding_ptrs.push(&mut l.data_mut()[..size]);
            }
        }
        trace!(
            "{} coding_ptrs.len: {} data_ptrs.len {}",
            id,
            coding_ptrs.len(),
            data_ptrs.len()
        );
        decode_blocks(
            data_ptrs.as_mut_slice(),
            coding_ptrs.as_mut_slice(),
            &erasures,
        )?;
    }

    let mut corrupt = false;
    
    for i in &erasures[..erasures.len() - 1] {
        let n = *i as usize;
        let mut idx = n as u64 + block_start_idx;

        let mut data_size;
        if n < NUM_DATA {
            data_size = locks[n].get_data_size().unwrap() as usize;
            data_size -= BLOB_HEADER_SIZE;
            if data_size > BLOB_DATA_SIZE {
                error!("{} corrupt data blob[{}] data_size: {}", id, idx, data_size);
                corrupt = true;
            }
        } else {
            data_size = size;
            idx -= NUM_CODING as u64;
            locks[n].set_index(idx).unwrap();

            if data_size - BLOB_HEADER_SIZE > BLOB_DATA_SIZE {
                error!(
                    "{} corrupt coding blob[{}] data_size: {}",
                    id, idx, data_size
                );
                corrupt = true;
            }
        }

        locks[n].meta = meta.clone().unwrap();
        locks[n].set_size(data_size);
        trace!(
            "{} erasures[{}] ({}) size: {} data[0]: {}",
            id,
            *i,
            idx,
            data_size,
            locks[n].data()[0]
        );
    }
    assert!(!corrupt, " {} ", id);

    Ok(())
}