1pub mod block;
6pub mod block_index;
7pub mod file_offsets;
8mod forward_reader;
9pub mod id;
10pub mod inner;
11pub mod meta;
12pub mod multi_writer;
13pub mod range;
14pub mod reader;
15pub mod scanner;
16pub mod trailer;
17pub mod value_block;
18pub mod value_block_consumer;
19pub mod writer;
20
21use crate::{
22 bloom::{BloomFilter, CompositeHash},
23 cache::Cache,
24 descriptor_table::FileDescriptorTable,
25 time::unix_timestamp,
26 tree::inner::TreeId,
27 value::{InternalValue, SeqNo, UserKey},
28};
29use block_index::BlockIndexImpl;
30use forward_reader::ForwardReader;
31use id::GlobalSegmentId;
32use inner::Inner;
33use meta::SegmentId;
34use range::Range;
35use scanner::Scanner;
36use std::{
37 ops::Bound,
38 path::Path,
39 sync::{atomic::AtomicBool, Arc},
40};
41
42#[allow(clippy::module_name_repetitions)]
43pub type SegmentInner = Inner;
44
45#[doc(alias("sstable", "sst", "sorted string table"))]
54#[derive(Clone)]
55pub struct Segment(Arc<Inner>);
56
57impl From<Inner> for Segment {
58 fn from(value: Inner) -> Self {
59 Self(Arc::new(value))
60 }
61}
62
63impl std::ops::Deref for Segment {
64 type Target = Inner;
65
66 fn deref(&self) -> &Self::Target {
67 &self.0
68 }
69}
70
71impl std::fmt::Debug for Segment {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 write!(f, "Segment:{}({})", self.id(), self.metadata.key_range)
74 }
75}
76
77impl Segment {
78 #[must_use]
82 pub fn version_factor(&self) -> f32 {
83 self.metadata.item_count as f32 / self.metadata.key_count as f32
84 }
85
86 #[must_use]
88 pub fn age(&self) -> u128 {
89 let now = unix_timestamp().as_nanos();
90 let created_at = self.metadata.created_at * 1_000;
91 now.saturating_sub(created_at)
92 }
93
94 #[must_use]
96 pub fn global_id(&self) -> GlobalSegmentId {
97 (self.tree_id, self.id()).into()
98 }
99
100 #[must_use]
105 pub fn id(&self) -> SegmentId {
106 self.metadata.id
107 }
108
109 pub(crate) fn verify(&self) -> crate::Result<usize> {
110 use block::checksum::Checksum;
111 use block_index::IndexBlock;
112 use value_block::ValueBlock;
113
114 let mut data_block_count = 0;
115 let mut broken_count = 0;
116
117 let guard = self
118 .descriptor_table
119 .access(&self.global_id())?
120 .expect("should have gotten file");
121
122 let mut file = guard.file.lock().expect("lock is poisoned");
123
124 match &*self.block_index {
126 BlockIndexImpl::Full(block_index) => {
127 for handle in block_index.iter() {
128 let value_block = match ValueBlock::from_file(&mut *file, handle.offset) {
129 Ok(v) => v,
130 Err(e) => {
131 log::error!(
132 "data block {handle:?} could not be loaded, it is probably corrupted: {e:?}"
133 );
134 broken_count += 1;
135 data_block_count += 1;
136 continue;
137 }
138 };
139
140 let (_, data) = ValueBlock::to_bytes_compressed(
141 &value_block.items,
142 value_block.header.previous_block_offset,
143 value_block.header.compression,
144 )?;
145 let actual_checksum = Checksum::from_bytes(&data);
146
147 if value_block.header.checksum != actual_checksum {
148 log::error!("{handle:?} is corrupted, invalid checksum value");
149 broken_count += 1;
150 }
151
152 data_block_count += 1;
153
154 if data_block_count % 1_000 == 0 {
155 log::debug!("Checked {data_block_count} data blocks");
156 }
157 }
158 }
159 BlockIndexImpl::TwoLevel(block_index) => {
160 #[allow(clippy::explicit_iter_loop)]
162 for handle in block_index.top_level_index.iter() {
163 let block = match IndexBlock::from_file(&mut *file, handle.offset) {
164 Ok(v) => v,
165 Err(e) => {
166 log::error!(
167 "index block {handle:?} could not be loaded, it is probably corrupted: {e:?}"
168 );
169 broken_count += 1;
170 continue;
171 }
172 };
173
174 for handle in &*block.items {
175 let value_block = match ValueBlock::from_file(&mut *file, handle.offset) {
176 Ok(v) => v,
177 Err(e) => {
178 log::error!(
179 "data block {handle:?} could not be loaded, it is probably corrupted: {e:?}"
180 );
181 broken_count += 1;
182 data_block_count += 1;
183 continue;
184 }
185 };
186
187 let (_, data) = ValueBlock::to_bytes_compressed(
188 &value_block.items,
189 value_block.header.previous_block_offset,
190 value_block.header.compression,
191 )?;
192 let actual_checksum = Checksum::from_bytes(&data);
193
194 if value_block.header.checksum != actual_checksum {
195 log::error!("{handle:?} is corrupted, invalid checksum value");
196 broken_count += 1;
197 }
198
199 data_block_count += 1;
200
201 if data_block_count % 1_000 == 0 {
202 log::debug!("Checked {data_block_count} data blocks");
203 }
204 }
205 }
206 }
207 }
208
209 if data_block_count != self.metadata.data_block_count {
210 log::error!(
211 "Not all data blocks were visited during verification of disk segment {:?}",
212 self.id(),
213 );
214 broken_count += 1;
215 }
216
217 Ok(broken_count)
218 }
219
220 pub(crate) fn load_bloom(
221 path: &Path,
222 ptr: block::offset::BlockOffset,
223 ) -> crate::Result<Option<BloomFilter>> {
224 Ok(if *ptr > 0 {
225 use crate::coding::Decode;
226 use std::{
227 fs::File,
228 io::{Seek, SeekFrom},
229 };
230
231 let mut reader = File::open(path)?;
232 reader.seek(SeekFrom::Start(*ptr))?;
233 Some(BloomFilter::decode_from(&mut reader)?)
234 } else {
235 None
236 })
237 }
238
239 pub(crate) fn recover(
241 file_path: &Path,
242 tree_id: TreeId,
243 cache: Arc<Cache>,
244 descriptor_table: Arc<FileDescriptorTable>,
245 use_full_block_index: bool,
246 ) -> crate::Result<Self> {
247 use block_index::{full_index::FullBlockIndex, two_level_index::TwoLevelBlockIndex};
248 use trailer::SegmentFileTrailer;
249
250 log::debug!("Recovering segment from file {file_path:?}");
251 let trailer = SegmentFileTrailer::from_file(file_path)?;
252
253 assert_eq!(
254 0, *trailer.offsets.range_tombstones_ptr,
255 "Range tombstones not supported"
256 );
257
258 log::debug!(
259 "Creating block index, with tli_ptr={}",
260 trailer.offsets.tli_ptr
261 );
262
263 let block_index = if use_full_block_index {
264 let block_index =
265 FullBlockIndex::from_file(file_path, &trailer.metadata, &trailer.offsets)?;
266
267 BlockIndexImpl::Full(block_index)
268 } else {
269 let block_index = TwoLevelBlockIndex::from_file(
270 file_path,
271 &trailer.metadata,
272 trailer.offsets.tli_ptr,
273 (tree_id, trailer.metadata.id).into(),
274 descriptor_table.clone(),
275 cache.clone(),
276 )?;
277 BlockIndexImpl::TwoLevel(block_index)
278 };
279
280 let bloom_ptr = trailer.offsets.bloom_ptr;
281
282 Ok(Self(Arc::new(Inner {
283 path: file_path.into(),
284
285 tree_id,
286
287 descriptor_table,
288 metadata: trailer.metadata,
289 offsets: trailer.offsets,
290
291 block_index: Arc::new(block_index),
292 cache,
293
294 bloom_filter: Self::load_bloom(file_path, bloom_ptr)?,
295
296 is_deleted: AtomicBool::default(),
297 })))
298 }
299
300 pub(crate) fn mark_as_deleted(&self) {
301 self.0
302 .is_deleted
303 .store(true, std::sync::atomic::Ordering::Release);
304 }
305
306 #[must_use]
307 pub fn bloom_filter_size(&self) -> usize {
309 self.bloom_filter
310 .as_ref()
311 .map(super::bloom::BloomFilter::len)
312 .unwrap_or_default()
313 }
314
315 pub fn get(
316 &self,
317 key: &[u8],
318 seqno: Option<SeqNo>,
319 hash: CompositeHash,
320 ) -> crate::Result<Option<InternalValue>> {
321 if let Some(seqno) = seqno {
322 if self.metadata.seqnos.0 >= seqno {
323 return Ok(None);
324 }
325 }
326
327 if let Some(bf) = &self.bloom_filter {
328 if !bf.contains_hash(hash) {
329 return Ok(None);
330 }
331 }
332
333 self.point_read(key, seqno)
334 }
335
336 fn point_read(&self, key: &[u8], seqno: Option<SeqNo>) -> crate::Result<Option<InternalValue>> {
337 use block_index::BlockIndex;
338 use value_block::{CachePolicy, ValueBlock};
339 use value_block_consumer::ValueBlockConsumer;
340
341 let Some(first_block_handle) = self
342 .block_index
343 .get_lowest_block_containing_key(key, CachePolicy::Write)?
344 else {
345 return Ok(None);
346 };
347
348 let Some(block) = ValueBlock::load_by_block_handle(
349 &self.descriptor_table,
350 &self.cache,
351 self.global_id(),
352 first_block_handle,
353 CachePolicy::Write,
354 )?
355 else {
356 return Ok(None);
357 };
358
359 if seqno.is_none() {
360 return Ok(block.get_latest(key).cloned());
366 }
367
368 let mut reader = ForwardReader::new(
369 self.offsets.index_block_ptr,
370 &self.descriptor_table,
371 self.global_id(),
372 &self.cache,
373 first_block_handle,
374 );
375 reader.lo_block_size = block.header.data_length.into();
376 reader.lo_block_items = Some(ValueBlockConsumer::with_bounds(block, Some(key), None));
377 reader.lo_initialized = true;
378
379 let mut reader = reader.filter(|x| {
397 match x {
398 Ok(entry) => {
399 if let Some(seqno) = seqno {
401 entry.key.seqno < seqno
402 } else {
403 true
404 }
405 }
406 Err(_) => true,
407 }
408 });
409
410 let Some(entry) = reader.next().transpose()? else {
411 return Ok(None);
412 };
413
414 if &*entry.key.user_key > key {
416 return Ok(None);
417 }
418
419 Ok(Some(entry))
420 }
421
422 #[must_use]
423 pub fn is_key_in_key_range(&self, key: &[u8]) -> bool {
424 self.metadata.key_range.contains_key(key)
425 }
426
427 #[must_use]
433 #[allow(clippy::iter_without_into_iter)]
434 #[doc(hidden)]
435 pub fn iter(&self) -> Range {
436 self.range((std::ops::Bound::Unbounded, std::ops::Bound::Unbounded))
437 }
438
439 #[doc(hidden)]
440 pub fn scan<P: AsRef<Path>>(&self, base_folder: P) -> crate::Result<Scanner> {
441 let segment_file_path = base_folder.as_ref().join(self.metadata.id.to_string());
442 let block_count = self.metadata.data_block_count.try_into().expect("oops");
443 Scanner::new(&segment_file_path, block_count)
444 }
445
446 #[must_use]
452 pub(crate) fn range(&self, range: (Bound<UserKey>, Bound<UserKey>)) -> Range {
453 Range::new(
454 self.offsets.index_block_ptr,
455 self.descriptor_table.clone(),
456 self.global_id(),
457 self.cache.clone(),
458 self.block_index.clone(),
459 range,
460 )
461 }
462
463 #[must_use]
465 pub fn get_highest_seqno(&self) -> SeqNo {
466 self.metadata.seqnos.1
467 }
468
469 #[must_use]
471 #[doc(hidden)]
472 pub fn tombstone_count(&self) -> u64 {
473 self.metadata.tombstone_count
474 }
475
476 #[must_use]
478 #[doc(hidden)]
479 pub fn tombstone_ratio(&self) -> f32 {
480 self.metadata.tombstone_count as f32 / self.metadata.key_count as f32
481 }
482
483 pub(crate) fn check_key_range_overlap(
485 &self,
486 bounds: &(Bound<UserKey>, Bound<UserKey>),
487 ) -> bool {
488 self.metadata.key_range.overlaps_with_bounds(bounds)
489 }
490}