flute 1.11.0

File Delivery over Unidirectional Transport (FLUTE)
Documentation
use std::sync::Arc;

use super::filedesc;
use super::objectdesc::ObjectDataSource;
use crate::common::{partition, pkt};
use crate::error::FluteError;
use crate::tools::error::Result;

#[derive(Debug)]
pub struct BlockEncoder {
    file: Arc<filedesc::FileDesc>,
    curr_content_offset: u64,
    curr_sbn: u32,
    a_large: u64,
    a_small: u64,
    nb_a_large: u64,
    nb_blocks: u64,
    blocks: Vec<Box<Block>>,
    block_multiplex_windows: usize,
    block_multiplex_index: usize,
    read_end: bool,
    source_size_transferred: usize,
    nb_pkt_sent: usize,
    stopped: bool,
    closabled_object: bool,
}

use super::block::Block;

impl BlockEncoder {
    pub fn new(
        file: Arc<filedesc::FileDesc>,
        block_multiplex_windows: usize,
        closabled_object: bool,
    ) -> Result<BlockEncoder> {
        match &file.object.source {
            ObjectDataSource::Buffer(_) => {}
            ObjectDataSource::Stream(stream) => {
                stream.lock().unwrap().seek(std::io::SeekFrom::Start(0))?;
            }
        }

        let mut block = BlockEncoder {
            file,
            curr_content_offset: 0,
            curr_sbn: 0,
            a_large: 0,
            a_small: 0,
            nb_a_large: 0,
            nb_blocks: 0,
            blocks: Vec::new(),
            block_multiplex_windows,
            block_multiplex_index: 0,
            read_end: false,
            source_size_transferred: 0,
            nb_pkt_sent: 0,
            stopped: false,
            closabled_object,
        };
        block.block_partitioning();
        Ok(block)
    }

    pub fn read(&mut self, force_close_object: bool) -> Option<pkt::Pkt> {
        if self.stopped {
            return None;
        }

        if force_close_object {
            self.stopped = true;
        }

        loop {
            self.read_window();

            if self.blocks.is_empty() {
                if self.nb_pkt_sent == 0 {
                    log::debug!("Empty file ? Send a pkt containing close object flag");
                    self.nb_pkt_sent += 1;

                    debug_assert!(self.file.object.transfer_length == 0);
                    return Some(pkt::Pkt {
                        payload: Vec::new(),
                        transfer_length: self.file.object.transfer_length,
                        esi: 0,
                        sbn: 0,
                        toi: self.file.toi,
                        fdt_id: self.file.fdt_id,
                        cenc: self.file.object.config.cenc,
                        inband_cenc: self.file.object.config.inband_cenc,
                        close_object: true,
                        source_block_length: 0,
                        sender_current_time: self.file.sender_current_time,
                    });
                }

                return None;
            }

            if self.block_multiplex_index >= self.blocks.len() {
                self.block_multiplex_index = 0;
            }

            let block = &mut self.blocks[self.block_multiplex_index];
            let symbol = block.read();
            if symbol.is_none() {
                self.blocks.remove(self.block_multiplex_index);
                continue;
            }

            let (symbol, is_last_symbol) = symbol.as_ref().unwrap();

            self.block_multiplex_index += 1;
            if symbol.is_source_symbol {
                self.source_size_transferred += symbol.symbols.len();
            }

            self.nb_pkt_sent += 1;

            let is_last_packet = (self.source_size_transferred
                >= self.file.object.transfer_length as usize)
                && *is_last_symbol;

            return Some(pkt::Pkt {
                payload: symbol.symbols.to_vec(),
                transfer_length: self.file.object.transfer_length,
                esi: symbol.esi,
                sbn: symbol.sbn,
                toi: self.file.toi,
                fdt_id: self.file.fdt_id,
                cenc: self.file.object.config.cenc,
                inband_cenc: self.file.object.config.inband_cenc,
                close_object: force_close_object || (self.closabled_object && is_last_packet),
                source_block_length: block.nb_source_symbols as u32,
                sender_current_time: self.file.sender_current_time,
            });
        }
    }

    fn block_partitioning(&mut self) {
        let oti = &self.file.oti;
        (self.a_large, self.a_small, self.nb_a_large, self.nb_blocks) =
            partition::block_partitioning(
                oti.maximum_source_block_length as u64,
                self.file.object.transfer_length,
                oti.encoding_symbol_length as u64,
            );
    }

    fn read_block(&mut self) -> Result<()> {
        debug_assert!(!self.read_end);
        let source = &self.file.object.source;
        match source {
            ObjectDataSource::Buffer(_) => self.read_block_buffer(),
            ObjectDataSource::Stream(_) => self.read_block_stream(),
        }
    }

    fn read_block_buffer(&mut self) -> Result<()> {
        log::debug!("Read block nb {}", self.curr_sbn);

        let content = match &self.file.object.source {
            ObjectDataSource::Buffer(buffer) => Ok(buffer),
            _ => Err(FluteError::new("Not a data source buffer")),
        }?;

        let oti = &self.file.oti;
        let block_length = match self.curr_sbn as u64 {
            value if value < self.nb_a_large => self.a_large,
            _ => self.a_small,
        };

        let offset_start = self.curr_content_offset as usize;
        let mut offset_end =
            offset_start + (block_length * oti.encoding_symbol_length as u64) as usize;
        if offset_end > content.len() {
            offset_end = content.len();
        }

        let buffer = &content.as_slice()[offset_start..offset_end];
        let block = Block::new_from_buffer(self.curr_sbn, buffer, block_length, oti)?;
        self.blocks.push(block);
        self.curr_sbn += 1;
        self.read_end = offset_end == content.len();
        self.curr_content_offset = offset_end as u64;
        log::debug!(
            "offset={}/{} end={}",
            self.curr_content_offset,
            content.len(),
            self.read_end
        );
        Ok(())
    }

    fn read_block_stream(&mut self) -> Result<()> {
        log::info!("Read block nb {}", self.curr_sbn);

        let mut stream = match &self.file.object.source {
            ObjectDataSource::Stream(stream) => Ok(stream.lock().unwrap()),
            _ => Err(FluteError::new("Not a data source stream")),
        }?;

        let oti = &self.file.oti;
        let block_length = match self.curr_sbn as u64 {
            value if value < self.nb_a_large => self.a_large,
            _ => self.a_small,
        };
        let mut buffer: Vec<u8> =
            vec![0; block_length as usize * oti.encoding_symbol_length as usize];
        let result = match stream.read(&mut buffer) {
            Ok(s) => s,
            Err(e) => {
                log::error!("Fail to read file {:?}", e.to_string());
                self.read_end = true;
                return Ok(());
            }
        };

        if result == 0 {
            self.read_end = true;
            return Ok(());
        }

        buffer.truncate(result);

        let block = Block::new_from_buffer(self.curr_sbn, &buffer, block_length, oti)?;
        self.blocks.push(block);
        self.curr_sbn += 1;
        self.curr_content_offset += buffer.len() as u64;
        Ok(())
    }

    fn read_window(&mut self) {
        while !self.read_end && (self.blocks.len() < self.block_multiplex_windows) {
            match self.read_block() {
                Ok(_) => {}
                Err(_) => self.read_end = true, // TODO handle error
            };
        }
    }
}