1use crate::types::{Frame, FileHeader, HandleMetadata};
2use anyhow::Result;
3use std::fs::{File, OpenOptions};
4use std::io::{Read, Seek, SeekFrom, Write};
5use std::path::{Path, PathBuf};
6
7pub struct SegmentWriter {
9 file: File,
10 #[allow(dead_code)]
11 path: PathBuf,
12 current_offset: u64,
13 seg_id: u32,
14}
15
16impl SegmentWriter {
17 pub fn new(store_path: &Path, seg_id: u32) -> Result<Self> {
18 let path = store_path.join(format!("seg-{:06}.sift", seg_id));
19 let file = OpenOptions::new()
20 .create(true)
21 .append(true)
22 .open(&path)?;
23
24 let current_offset = file.metadata()?.len();
26
27 Ok(Self {
28 file,
29 path,
30 current_offset,
31 seg_id,
32 })
33 }
34
35 pub fn write_frame(&mut self, frame: &Frame) -> Result<HandleMetadata> {
36 let start_offset = self.current_offset;
37
38 let frame_content_len = frame.frame_size() - 4;
40
41 self.file.write_all(&(frame_content_len as u32).to_le_bytes())?;
43
44 let header_bytes = frame.header.to_bytes();
46 let header_crc = crc32fast::hash(&header_bytes);
47 self.file.write_all(&header_crc.to_le_bytes())?;
48
49 self.file.write_all(&header_bytes)?;
51
52 let content_crc = crc32fast::hash(&frame.content);
54 self.file.write_all(&content_crc.to_le_bytes())?;
55
56 self.file.write_all(&frame.content)?;
58
59 let line_tbl_offset = (self.current_offset + 4 + 4 + 64 + 4 + frame.content.len() as u64) as u32;
61 self.file.write_all(&frame.line_table)?;
62
63 let mut frame_data = Vec::new();
65 frame_data.extend_from_slice(&header_crc.to_le_bytes());
66 frame_data.extend_from_slice(&header_bytes);
67 frame_data.extend_from_slice(&content_crc.to_le_bytes());
68 frame_data.extend_from_slice(&frame.content);
69 frame_data.extend_from_slice(&frame.line_table);
70 let frame_crc = crc32fast::hash(&frame_data);
71 self.file.write_all(&frame_crc.to_le_bytes())?;
72
73 let total_written = frame.frame_size();
75 let padded_size = frame.padded_frame_size();
76 let padding_needed = padded_size - total_written;
77 if padding_needed > 0 {
78 let padding = vec![0u8; padding_needed];
79 self.file.write_all(&padding)?;
80 }
81
82 self.file.flush()?;
83
84 let metadata = HandleMetadata {
85 seg_id: self.seg_id,
86 offset: start_offset,
87 frame_len: padded_size as u32,
88 line_tbl_offset,
89 };
90
91 self.current_offset += padded_size as u64;
92
93 Ok(metadata)
94 }
95
96 pub fn current_offset(&self) -> u64 {
97 self.current_offset
98 }
99
100 pub fn seg_id(&self) -> u32 {
101 self.seg_id
102 }
103}
104
105pub struct SegmentReader {
107 file: File,
108 #[allow(dead_code)]
109 seg_id: u32,
110}
111
112impl SegmentReader {
113 pub fn new(store_path: &Path, seg_id: u32) -> Result<Self> {
114 let path = store_path.join(format!("seg-{:06}.sift", seg_id));
115 let file = File::open(path)?;
116
117 Ok(Self { file, seg_id })
118 }
119
120 pub fn read_frame(&mut self, metadata: &HandleMetadata) -> Result<Frame> {
121 self.file.seek(SeekFrom::Start(metadata.offset))?;
123
124 let mut buf = [0u8; 4];
126 self.file.read_exact(&mut buf)?;
127 let _frame_len = u32::from_le_bytes(buf);
128
129 self.file.read_exact(&mut buf)?;
131 let expected_header_crc = u32::from_le_bytes(buf);
132
133 let mut header_buf = [0u8; 64];
135 self.file.read_exact(&mut header_buf)?;
136 let actual_header_crc = crc32fast::hash(&header_buf);
137
138 if actual_header_crc != expected_header_crc {
139 anyhow::bail!("Header CRC mismatch");
140 }
141
142 let header = FileHeader::from_bytes(&header_buf)?;
143
144 self.file.read_exact(&mut buf)?;
146 let expected_content_crc = u32::from_le_bytes(buf);
147
148 let mut content = vec![0u8; header.content_len as usize];
150 self.file.read_exact(&mut content)?;
151 let actual_content_crc = crc32fast::hash(&content);
152
153 if actual_content_crc != expected_content_crc {
154 anyhow::bail!("Content CRC mismatch");
155 }
156
157 let mut line_table = vec![0u8; header.line_tbl_len as usize];
159 self.file.read_exact(&mut line_table)?;
160
161 self.file.read_exact(&mut buf)?;
163 let expected_frame_crc = u32::from_le_bytes(buf);
164
165 let mut frame_data = Vec::new();
166 frame_data.extend_from_slice(&expected_header_crc.to_le_bytes());
167 frame_data.extend_from_slice(&header_buf);
168 frame_data.extend_from_slice(&expected_content_crc.to_le_bytes());
169 frame_data.extend_from_slice(&content);
170 frame_data.extend_from_slice(&line_table);
171 let actual_frame_crc = crc32fast::hash(&frame_data);
172
173 if actual_frame_crc != expected_frame_crc {
174 anyhow::bail!("Frame CRC mismatch");
175 }
176
177 Ok(Frame {
178 header,
179 content,
180 line_table,
181 })
182 }
183}
184
185pub fn generate_line_table(content: &[u8]) -> Vec<u8> {
187 let mut positions = Vec::new();
188
189 for (i, &byte) in content.iter().enumerate() {
191 if byte == b'\n' {
192 positions.push(i as u32);
193 }
194 }
195
196 let mut line_table = Vec::new();
198 let mut last_pos = 0u32;
199
200 for pos in positions {
201 let delta = pos - last_pos;
202 write_varint(&mut line_table, delta as u64);
203 last_pos = pos;
204 }
205
206 line_table
207}
208
209pub fn decode_line_table(line_table: &[u8]) -> Result<Vec<u32>> {
211 let mut positions = Vec::new();
212 let mut cursor = 0;
213 let mut current_pos = 0u32;
214
215 while cursor < line_table.len() {
216 let (delta, bytes_read) = read_varint(&line_table[cursor..])?;
217 current_pos += delta as u32;
218 positions.push(current_pos);
219 cursor += bytes_read;
220 }
221
222 Ok(positions)
223}
224
225pub fn byte_to_line(byte_offset: usize, newline_positions: &[u32]) -> u32 {
227 match newline_positions.binary_search(&(byte_offset as u32)) {
229 Ok(index) => (index + 1) as u32, Err(index) => (index + 1) as u32, }
232}
233
234fn write_varint(buf: &mut Vec<u8>, mut value: u64) {
236 while value >= 0x80 {
237 buf.push((value as u8) | 0x80);
238 value >>= 7;
239 }
240 buf.push(value as u8);
241}
242
243fn read_varint(data: &[u8]) -> Result<(u64, usize)> {
244 let mut result = 0u64;
245 let mut shift = 0;
246 let mut bytes_read = 0;
247
248 for &byte in data {
249 bytes_read += 1;
250 result |= ((byte & 0x7F) as u64) << shift;
251
252 if (byte & 0x80) == 0 {
253 return Ok((result, bytes_read));
254 }
255
256 shift += 7;
257 if shift >= 64 {
258 anyhow::bail!("Varint too large");
259 }
260 }
261
262 anyhow::bail!("Incomplete varint");
263}