1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
use dataplane::batch::{Batch, BATCH_FILE_HEADER_SIZE, BATCH_HEADER_SIZE};
use dataplane::Offset;
use std::io::{Error as IoError, ErrorKind, Cursor};
use tracing::{warn, debug};
use std::os::unix::io::RawFd;
use nix::sys::uio::pread;
use fluvio_future::file_slice::AsyncFileSlice;

// only encode information necessary to decode batches efficiently
pub struct FileBatch {
    pub(crate) batch: Batch,
    pub(crate) records: Vec<u8>,
}

impl FileBatch {
    pub(crate) fn base_offset(&self) -> Offset {
        self.batch.base_offset
    }

    pub(crate) fn offset_delta(&self) -> i32 {
        self.batch.header.last_offset_delta
    }
}

/// Iterator that returns batch from file
pub struct FileBatchIterator {
    fd: RawFd,
    offset: i64,
    end: i64,
}

impl FileBatchIterator {
    #[allow(unused)]
    pub fn new(fd: RawFd, offset: i64, len: i64) -> Self {
        Self {
            fd,
            offset,
            end: offset + len,
        }
    }

    pub fn from_raw_slice(slice: AsyncFileSlice) -> Self {
        use std::os::unix::io::AsRawFd;
        let offset = slice.position() as i64;
        Self {
            fd: slice.as_raw_fd(),
            offset,
            end: offset + slice.len() as i64,
        }
    }
}

impl Iterator for FileBatchIterator {
    type Item = Result<FileBatch, IoError>;

    fn next(&mut self) -> Option<Self::Item> {
        if self.offset >= self.end {
            return None;
        }

        let mut header = vec![0u8; BATCH_FILE_HEADER_SIZE];
        let bytes_read = match pread(self.fd, &mut header, self.offset)
            .map_err(|err| IoError::new(ErrorKind::Other, format!("pread error {}", err)))
        {
            Ok(bytes) => bytes,
            Err(err) => return Some(Err(err)),
        };

        if bytes_read < header.len() {
            warn!(bytes_read, header_len = header.len());
            return Some(Err(IoError::new(
                ErrorKind::UnexpectedEof,
                format!(
                    "not eough for batch header {} out of {}",
                    bytes_read,
                    header.len()
                ),
            )));
        }

        let mut batch = Batch::default();
        if let Err(err) = batch.decode_from_file_buf(&mut Cursor::new(header), 0) {
            return Some(Err(IoError::new(
                ErrorKind::Other,
                format!("decodinge batch header error {}", err),
            )));
        }

        let remainder = batch.batch_len as usize - BATCH_HEADER_SIZE as usize;

        debug!(
            file_offset = self.offset,
            base_offset = batch.base_offset,
            "fbatch header"
        );

        let mut records = vec![0u8; remainder];

        self.offset += BATCH_FILE_HEADER_SIZE as i64;

        let bytes_read = match pread(self.fd, &mut records, self.offset)
            .map_err(|err| IoError::new(ErrorKind::Other, format!("pread error {}", err)))
        {
            Ok(bytes) => bytes,
            Err(err) => return Some(Err(err)),
        };

        if bytes_read < records.len() {
            warn!(bytes_read, record_len = records.len());
            return Some(Err(IoError::new(
                ErrorKind::UnexpectedEof,
                format!(
                    "not enough for batch records {} out of {}",
                    bytes_read,
                    records.len()
                ),
            )));
        }

        self.offset += bytes_read as i64;

        debug!(file_offset = self.offset, "fbatch end");

        Some(Ok(FileBatch { batch, records }))
    }
}

#[cfg(test)]
mod test {
    use std::{fs::File, io::Write};
    use std::env::temp_dir;
    use std::os::unix::io::AsRawFd;

    use fluvio_storage::config::DEFAULT_MAX_BATCH_SIZE;

    use super::*;

    #[test]
    fn test_file() {
        let path = temp_dir().join("pread.txt");
        let mut file = File::create(&path).expect("create");
        file.write_all(b"Hello, world!").expect("write");
        file.sync_all().expect("flush");
        drop(file);

        let read_only = File::open(path).expect("open");
        let fd = read_only.as_raw_fd();
        let mut buf = vec![0; DEFAULT_MAX_BATCH_SIZE as usize];
        // let mut buf = BytesMut::with_capacity(64);
        let bytes_read = pread(fd, &mut buf, 1).expect("");
        println!("bytes read: {}", bytes_read);
        assert!(bytes_read > 2);
    }
}