tycho_block_util/archive/
reader.rs1use tl_proto::TlRead;
2use tycho_types::models::BlockId;
3
4use super::ArchiveEntryType;
5use crate::archive::proto::{ARCHIVE_ENTRY_HEADER_LEN, ARCHIVE_PREFIX, ArchiveEntryHeader};
6
7pub struct ArchiveReader<'a> {
9 data: &'a [u8],
10}
11
12impl<'a> ArchiveReader<'a> {
13 pub fn new(mut data: &'a [u8]) -> Result<Self, ArchiveReaderError> {
15 read_archive_prefix(&mut data)?;
16 Ok(Self { data })
17 }
18}
19
20impl<'a> Iterator for ArchiveReader<'a> {
21 type Item = Result<ArchiveEntry<'a>, ArchiveReaderError>;
22
23 #[inline]
24 fn next(&mut self) -> Option<Self::Item> {
25 read_next_entry(&mut self.data)
26 }
27}
28
29fn read_next_entry<'a>(
30 data: &mut &'a [u8],
31) -> Option<Result<ArchiveEntry<'a>, ArchiveReaderError>> {
32 if data.len() < 8 {
33 return None;
34 }
35
36 Some('item: {
37 let Ok(header) = ArchiveEntryHeader::read_from(data) else {
39 break 'item Err(ArchiveReaderError::InvalidArchiveEntryHeader);
40 };
41 let data_len = header.data_len as usize;
42
43 let Some((head, tail)) = data.split_at_checked(data_len) else {
45 break 'item Err(ArchiveReaderError::UnexpectedEntryEof);
46 };
47
48 *data = tail;
49
50 Ok(ArchiveEntry {
52 block_id: header.block_id,
53 ty: header.ty,
54 data: head,
55 })
56 })
57}
58
59pub struct ArchiveEntry<'a> {
61 pub block_id: BlockId,
62 pub ty: ArchiveEntryType,
63 pub data: &'a [u8],
64}
65
66#[derive(Default)]
68pub enum ArchiveVerifier {
69 #[default]
70 Start,
71 EntryHeader {
72 buffer: [u8; ARCHIVE_ENTRY_HEADER_LEN],
73 filled: usize,
74 },
75 EntryData {
76 data_len: usize,
77 },
78}
79
80impl ArchiveVerifier {
81 pub fn write_verify(&mut self, mut part: &[u8]) -> Result<(), ArchiveReaderError> {
83 loop {
84 let part_len = part.len();
85 if part_len == 0 {
86 return Ok(());
87 }
88
89 match self {
90 Self::Start if part_len >= 4 => {
91 read_archive_prefix(&mut part)?;
92 *self = Self::EntryHeader {
93 buffer: [0; ARCHIVE_ENTRY_HEADER_LEN],
94 filled: 0,
95 }
96 }
97 Self::Start => return Err(ArchiveReaderError::TooSmallInitialBatch),
98 Self::EntryHeader { buffer, filled } => {
99 let remaining = std::cmp::min(part_len, ARCHIVE_ENTRY_HEADER_LEN - *filled);
100
101 unsafe {
106 std::ptr::copy_nonoverlapping(
107 part.as_ptr(),
108 buffer.as_mut_ptr().add(*filled),
109 remaining,
110 );
111 };
112
113 part = part.split_at(remaining).1;
114 *filled += remaining;
115
116 if *filled == ARCHIVE_ENTRY_HEADER_LEN {
117 let Ok(header) = ArchiveEntryHeader::read_from(&mut buffer.as_slice())
118 else {
119 return Err(ArchiveReaderError::InvalidArchiveEntryHeader);
120 };
121 *self = Self::EntryData {
122 data_len: header.data_len as usize,
123 };
124 }
125 }
126 Self::EntryData { data_len } => {
127 let remaining = std::cmp::min(part_len, *data_len);
128 *data_len -= remaining;
129 part = part.split_at(remaining).1;
130
131 if *data_len == 0 {
132 *self = Self::EntryHeader {
133 buffer: [0; ARCHIVE_ENTRY_HEADER_LEN],
134 filled: 0,
135 }
136 }
137 }
138 }
139 }
140 }
141
142 pub fn final_check(&self) -> Result<(), ArchiveReaderError> {
144 if matches!(self, Self::EntryHeader { filled: 0, .. }) {
145 Ok(())
146 } else {
147 Err(ArchiveReaderError::UnexpectedArchiveEof)
148 }
149 }
150}
151
152fn read_archive_prefix(buf: &mut &[u8]) -> Result<(), ArchiveReaderError> {
153 match buf.split_first_chunk() {
154 Some((header, tail)) if header == &ARCHIVE_PREFIX => {
155 *buf = tail;
156 Ok(())
157 }
158 _ => Err(ArchiveReaderError::InvalidArchiveHeader),
159 }
160}
161
162#[derive(Debug, thiserror::Error)]
163pub enum ArchiveReaderError {
164 #[error("invalid archive header")]
165 InvalidArchiveHeader,
166 #[error("unexpected archive eof")]
167 UnexpectedArchiveEof,
168 #[error("invalid archive entry header")]
169 InvalidArchiveEntryHeader,
170 #[error("invalid archive entry name")]
171 InvalidArchiveEntryName,
172 #[error("unexpected entry eof")]
173 UnexpectedEntryEof,
174 #[error("too small initial batch")]
175 TooSmallInitialBatch,
176 #[error(transparent)]
177 Other(#[from] anyhow::Error),
178}