tycho_block_util/archive/
reader.rs

1use tl_proto::TlRead;
2use tycho_types::models::BlockId;
3
4use super::ArchiveEntryType;
5use crate::archive::proto::{ARCHIVE_ENTRY_HEADER_LEN, ARCHIVE_PREFIX, ArchiveEntryHeader};
6
7/// Stateful archive package reader.
8pub struct ArchiveReader<'a> {
9    data: &'a [u8],
10}
11
12impl<'a> ArchiveReader<'a> {
13    /// Starts reading archive package
14    pub fn new(mut data: &'a [u8]) -> Result<Self, ArchiveReaderError> {
15        read_archive_prefix(&mut data)?;
16        Ok(Self { data })
17    }
18}
19
20impl<'a> Iterator for ArchiveReader<'a> {
21    type Item = Result<ArchiveEntry<'a>, ArchiveReaderError>;
22
23    #[inline]
24    fn next(&mut self) -> Option<Self::Item> {
25        read_next_entry(&mut self.data)
26    }
27}
28
29fn read_next_entry<'a>(
30    data: &mut &'a [u8],
31) -> Option<Result<ArchiveEntry<'a>, ArchiveReaderError>> {
32    if data.len() < 8 {
33        return None;
34    }
35
36    Some('item: {
37        // Read archive entry header
38        let Ok(header) = ArchiveEntryHeader::read_from(data) else {
39            break 'item Err(ArchiveReaderError::InvalidArchiveEntryHeader);
40        };
41        let data_len = header.data_len as usize;
42
43        // Read data
44        let Some((head, tail)) = data.split_at_checked(data_len) else {
45            break 'item Err(ArchiveReaderError::UnexpectedEntryEof);
46        };
47
48        *data = tail;
49
50        // Done
51        Ok(ArchiveEntry {
52            block_id: header.block_id,
53            ty: header.ty,
54            data: head,
55        })
56    })
57}
58
59/// Parsed archive entry
60pub struct ArchiveEntry<'a> {
61    pub block_id: BlockId,
62    pub ty: ArchiveEntryType,
63    pub data: &'a [u8],
64}
65
66/// Archive data stream verifier.
67#[derive(Default)]
68pub enum ArchiveVerifier {
69    #[default]
70    Start,
71    EntryHeader {
72        buffer: [u8; ARCHIVE_ENTRY_HEADER_LEN],
73        filled: usize,
74    },
75    EntryData {
76        data_len: usize,
77    },
78}
79
80impl ArchiveVerifier {
81    /// Verifies next archive chunk.
82    pub fn write_verify(&mut self, mut part: &[u8]) -> Result<(), ArchiveReaderError> {
83        loop {
84            let part_len = part.len();
85            if part_len == 0 {
86                return Ok(());
87            }
88
89            match self {
90                Self::Start if part_len >= 4 => {
91                    read_archive_prefix(&mut part)?;
92                    *self = Self::EntryHeader {
93                        buffer: [0; ARCHIVE_ENTRY_HEADER_LEN],
94                        filled: 0,
95                    }
96                }
97                Self::Start => return Err(ArchiveReaderError::TooSmallInitialBatch),
98                Self::EntryHeader { buffer, filled } => {
99                    let remaining = std::cmp::min(part_len, ARCHIVE_ENTRY_HEADER_LEN - *filled);
100
101                    // SAFETY:
102                    // - `offset < part.len()`
103                    // - `filled < buffer.len()`
104                    // - `offset + remaining < part.len() && `
105                    unsafe {
106                        std::ptr::copy_nonoverlapping(
107                            part.as_ptr(),
108                            buffer.as_mut_ptr().add(*filled),
109                            remaining,
110                        );
111                    };
112
113                    part = part.split_at(remaining).1;
114                    *filled += remaining;
115
116                    if *filled == ARCHIVE_ENTRY_HEADER_LEN {
117                        let Ok(header) = ArchiveEntryHeader::read_from(&mut buffer.as_slice())
118                        else {
119                            return Err(ArchiveReaderError::InvalidArchiveEntryHeader);
120                        };
121                        *self = Self::EntryData {
122                            data_len: header.data_len as usize,
123                        };
124                    }
125                }
126                Self::EntryData { data_len } => {
127                    let remaining = std::cmp::min(part_len, *data_len);
128                    *data_len -= remaining;
129                    part = part.split_at(remaining).1;
130
131                    if *data_len == 0 {
132                        *self = Self::EntryHeader {
133                            buffer: [0; ARCHIVE_ENTRY_HEADER_LEN],
134                            filled: 0,
135                        }
136                    }
137                }
138            }
139        }
140    }
141
142    /// Ensures that the verifier is in the correct state.
143    pub fn final_check(&self) -> Result<(), ArchiveReaderError> {
144        if matches!(self, Self::EntryHeader { filled: 0, .. }) {
145            Ok(())
146        } else {
147            Err(ArchiveReaderError::UnexpectedArchiveEof)
148        }
149    }
150}
151
152fn read_archive_prefix(buf: &mut &[u8]) -> Result<(), ArchiveReaderError> {
153    match buf.split_first_chunk() {
154        Some((header, tail)) if header == &ARCHIVE_PREFIX => {
155            *buf = tail;
156            Ok(())
157        }
158        _ => Err(ArchiveReaderError::InvalidArchiveHeader),
159    }
160}
161
162#[derive(Debug, thiserror::Error)]
163pub enum ArchiveReaderError {
164    #[error("invalid archive header")]
165    InvalidArchiveHeader,
166    #[error("unexpected archive eof")]
167    UnexpectedArchiveEof,
168    #[error("invalid archive entry header")]
169    InvalidArchiveEntryHeader,
170    #[error("invalid archive entry name")]
171    InvalidArchiveEntryName,
172    #[error("unexpected entry eof")]
173    UnexpectedEntryEof,
174    #[error("too small initial batch")]
175    TooSmallInitialBatch,
176    #[error(transparent)]
177    Other(#[from] anyhow::Error),
178}