1pub mod block;
6pub mod block_index;
7pub mod file_offsets;
8mod forward_reader;
9pub mod id;
10pub mod inner;
11pub mod meta;
12pub mod multi_writer;
13pub mod range;
14pub mod reader;
15pub mod scanner;
16pub mod trailer;
17pub mod value_block;
18pub mod value_block_consumer;
19pub mod writer;
20
21use crate::{
22 block_cache::BlockCache,
23 bloom::{BloomFilter, CompositeHash},
24 descriptor_table::FileDescriptorTable,
25 time::unix_timestamp,
26 tree::inner::TreeId,
27 value::{InternalValue, SeqNo, UserKey},
28};
29use block_index::BlockIndexImpl;
30use forward_reader::ForwardReader;
31use id::GlobalSegmentId;
32use inner::Inner;
33use meta::SegmentId;
34use range::Range;
35use scanner::Scanner;
36use std::{ops::Bound, path::Path, sync::Arc};
37
38#[allow(clippy::module_name_repetitions)]
39pub type SegmentInner = Inner;
40
41#[doc(alias("sstable", "sst", "sorted string table"))]
50#[derive(Clone)]
51pub struct Segment(Arc<Inner>);
52
53impl From<Inner> for Segment {
54 fn from(value: Inner) -> Self {
55 Self(Arc::new(value))
56 }
57}
58
59impl std::ops::Deref for Segment {
60 type Target = Inner;
61
62 fn deref(&self) -> &Self::Target {
63 &self.0
64 }
65}
66
67impl std::fmt::Debug for Segment {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 write!(f, "Segment:{}({})", self.id(), self.metadata.key_range)
70 }
71}
72
73impl Segment {
74 #[must_use]
78 pub fn version_factor(&self) -> f32 {
79 self.metadata.item_count as f32 / self.metadata.key_count as f32
80 }
81
82 #[must_use]
84 pub fn age(&self) -> u128 {
85 let now = unix_timestamp().as_nanos();
86 let created_at = self.metadata.created_at * 1_000;
87 now.saturating_sub(created_at)
88 }
89
90 #[must_use]
92 pub fn global_id(&self) -> GlobalSegmentId {
93 (self.tree_id, self.id()).into()
94 }
95
96 #[must_use]
101 pub fn id(&self) -> SegmentId {
102 self.metadata.id
103 }
104
105 pub(crate) fn verify(&self) -> crate::Result<usize> {
106 use block::checksum::Checksum;
107 use block_index::IndexBlock;
108 use value_block::ValueBlock;
109
110 let mut data_block_count = 0;
111 let mut broken_count = 0;
112
113 let guard = self
114 .descriptor_table
115 .access(&self.global_id())?
116 .expect("should have gotten file");
117
118 let mut file = guard.file.lock().expect("lock is poisoned");
119
120 match &*self.block_index {
122 BlockIndexImpl::Full(block_index) => {
123 for handle in block_index.iter() {
124 let value_block = match ValueBlock::from_file(&mut *file, handle.offset) {
125 Ok(v) => v,
126 Err(e) => {
127 log::error!(
128 "data block {handle:?} could not be loaded, it is probably corrupted: {e:?}"
129 );
130 broken_count += 1;
131 data_block_count += 1;
132 continue;
133 }
134 };
135
136 let (_, data) = ValueBlock::to_bytes_compressed(
137 &value_block.items,
138 value_block.header.previous_block_offset,
139 value_block.header.compression,
140 )?;
141 let actual_checksum = Checksum::from_bytes(&data);
142
143 if value_block.header.checksum != actual_checksum {
144 log::error!("{handle:?} is corrupted, invalid checksum value");
145 broken_count += 1;
146 }
147
148 data_block_count += 1;
149
150 if data_block_count % 1_000 == 0 {
151 log::debug!("Checked {data_block_count} data blocks");
152 }
153 }
154 }
155 BlockIndexImpl::TwoLevel(block_index) => {
156 #[allow(clippy::explicit_iter_loop)]
158 for handle in block_index.top_level_index.iter() {
159 let block = match IndexBlock::from_file(&mut *file, handle.offset) {
160 Ok(v) => v,
161 Err(e) => {
162 log::error!(
163 "index block {handle:?} could not be loaded, it is probably corrupted: {e:?}"
164 );
165 broken_count += 1;
166 continue;
167 }
168 };
169
170 for handle in &*block.items {
171 let value_block = match ValueBlock::from_file(&mut *file, handle.offset) {
172 Ok(v) => v,
173 Err(e) => {
174 log::error!(
175 "data block {handle:?} could not be loaded, it is probably corrupted: {e:?}"
176 );
177 broken_count += 1;
178 data_block_count += 1;
179 continue;
180 }
181 };
182
183 let (_, data) = ValueBlock::to_bytes_compressed(
184 &value_block.items,
185 value_block.header.previous_block_offset,
186 value_block.header.compression,
187 )?;
188 let actual_checksum = Checksum::from_bytes(&data);
189
190 if value_block.header.checksum != actual_checksum {
191 log::error!("{handle:?} is corrupted, invalid checksum value");
192 broken_count += 1;
193 }
194
195 data_block_count += 1;
196
197 if data_block_count % 1_000 == 0 {
198 log::debug!("Checked {data_block_count} data blocks");
199 }
200 }
201 }
202 }
203 }
204
205 if data_block_count != self.metadata.data_block_count {
206 log::error!(
207 "Not all data blocks were visited during verification of disk segment {:?}",
208 self.id(),
209 );
210 broken_count += 1;
211 }
212
213 Ok(broken_count)
214 }
215
216 pub(crate) fn load_bloom<P: AsRef<Path>>(
217 path: P,
218 ptr: value_block::BlockOffset,
219 ) -> crate::Result<Option<BloomFilter>> {
220 Ok(if *ptr > 0 {
221 use crate::coding::Decode;
222 use std::{
223 fs::File,
224 io::{Seek, SeekFrom},
225 };
226
227 let mut reader = File::open(path)?;
228 reader.seek(SeekFrom::Start(*ptr))?;
229 Some(BloomFilter::decode_from(&mut reader)?)
230 } else {
231 None
232 })
233 }
234
235 pub(crate) fn recover<P: AsRef<Path>>(
237 file_path: P,
238 tree_id: TreeId,
239 block_cache: Arc<BlockCache>,
240 descriptor_table: Arc<FileDescriptorTable>,
241 use_full_block_index: bool,
242 ) -> crate::Result<Self> {
243 use block_index::{full_index::FullBlockIndex, two_level_index::TwoLevelBlockIndex};
244 use trailer::SegmentFileTrailer;
245
246 let file_path = file_path.as_ref();
247
248 log::debug!("Recovering segment from file {file_path:?}");
249 let trailer = SegmentFileTrailer::from_file(file_path)?;
250
251 assert_eq!(
252 0, *trailer.offsets.range_tombstones_ptr,
253 "Range tombstones not supported"
254 );
255
256 log::debug!(
257 "Creating block index, with tli_ptr={}",
258 trailer.offsets.tli_ptr
259 );
260
261 let block_index = if use_full_block_index {
262 let block_index =
263 FullBlockIndex::from_file(file_path, &trailer.metadata, &trailer.offsets)?;
264
265 BlockIndexImpl::Full(block_index)
266 } else {
267 let block_index = TwoLevelBlockIndex::from_file(
268 file_path,
269 &trailer.metadata,
270 trailer.offsets.tli_ptr,
271 (tree_id, trailer.metadata.id).into(),
272 descriptor_table.clone(),
273 block_cache.clone(),
274 )?;
275 BlockIndexImpl::TwoLevel(block_index)
276 };
277
278 let bloom_ptr = trailer.offsets.bloom_ptr;
279
280 Ok(Self(Arc::new(Inner {
281 tree_id,
282
283 descriptor_table,
284 metadata: trailer.metadata,
285 offsets: trailer.offsets,
286
287 block_index: Arc::new(block_index),
288 block_cache,
289
290 bloom_filter: Self::load_bloom(file_path, bloom_ptr)?,
291 })))
292 }
293
294 #[must_use]
295 pub fn bloom_filter_size(&self) -> usize {
297 self.bloom_filter
298 .as_ref()
299 .map(super::bloom::BloomFilter::len)
300 .unwrap_or_default()
301 }
302
303 pub fn get<K: AsRef<[u8]>>(
304 &self,
305 key: K,
306 seqno: Option<SeqNo>,
307 hash: CompositeHash,
308 ) -> crate::Result<Option<InternalValue>> {
309 if let Some(seqno) = seqno {
310 if self.metadata.seqnos.0 >= seqno {
311 return Ok(None);
312 }
313 }
314
315 if !self.metadata.key_range.contains_key(&key) {
316 return Ok(None);
317 }
318
319 if let Some(bf) = &self.bloom_filter {
320 if !bf.contains_hash(hash) {
321 return Ok(None);
322 }
323 }
324
325 self.point_read(key, seqno)
326 }
327
328 fn point_read<K: AsRef<[u8]>>(
329 &self,
330 key: K,
331 seqno: Option<SeqNo>,
332 ) -> crate::Result<Option<InternalValue>> {
333 use block_index::BlockIndex;
334 use value_block::{CachePolicy, ValueBlock};
335 use value_block_consumer::ValueBlockConsumer;
336
337 let key = key.as_ref();
338
339 let Some(first_block_handle) = self
340 .block_index
341 .get_lowest_block_containing_key(key, CachePolicy::Write)?
342 else {
343 return Ok(None);
344 };
345
346 let Some(block) = ValueBlock::load_by_block_handle(
347 &self.descriptor_table,
348 &self.block_cache,
349 self.global_id(),
350 first_block_handle,
351 CachePolicy::Write,
352 )?
353 else {
354 return Ok(None);
355 };
356
357 if seqno.is_none() {
358 return Ok(block.get_latest(key).cloned());
364 }
365
366 let mut reader = ForwardReader::new(
367 self.offsets.index_block_ptr,
368 &self.descriptor_table,
369 self.global_id(),
370 &self.block_cache,
371 first_block_handle,
372 );
373 reader.lo_block_size = block.header.data_length.into();
374 reader.lo_block_items = Some(ValueBlockConsumer::with_bounds(block, Some(key), None));
375 reader.lo_initialized = true;
376
377 let mut reader = reader.filter(|x| {
395 match x {
396 Ok(entry) => {
397 if let Some(seqno) = seqno {
399 entry.key.seqno < seqno
400 } else {
401 true
402 }
403 }
404 Err(_) => true,
405 }
406 });
407
408 let Some(entry) = reader.next().transpose()? else {
409 return Ok(None);
410 };
411
412 if &*entry.key.user_key > key {
414 return Ok(None);
415 }
416
417 Ok(Some(entry))
418 }
419
420 pub fn is_key_in_key_range<K: AsRef<[u8]>>(&self, key: K) -> bool {
421 self.metadata.key_range.contains_key(key)
422 }
423
424 #[must_use]
432 #[allow(clippy::iter_without_into_iter)]
433 #[doc(hidden)]
434 pub fn iter(&self) -> Range {
435 self.range((std::ops::Bound::Unbounded, std::ops::Bound::Unbounded))
436 }
437
438 #[doc(hidden)]
439 pub fn scan<P: AsRef<Path>>(&self, base_folder: P) -> crate::Result<Scanner> {
440 let segment_file_path = base_folder.as_ref().join(self.metadata.id.to_string());
441 let block_count = self.metadata.data_block_count.try_into().expect("oops");
442 Scanner::new(segment_file_path, block_count)
443 }
444
445 #[must_use]
451 pub(crate) fn range(&self, range: (Bound<UserKey>, Bound<UserKey>)) -> Range {
452 Range::new(
453 self.offsets.index_block_ptr,
454 self.descriptor_table.clone(),
455 self.global_id(),
456 self.block_cache.clone(),
457 self.block_index.clone(),
458 range,
459 )
460 }
461
462 #[must_use]
464 pub fn get_highest_seqno(&self) -> SeqNo {
465 self.metadata.seqnos.1
466 }
467
468 #[must_use]
470 #[doc(hidden)]
471 pub fn tombstone_count(&self) -> u64 {
472 self.metadata.tombstone_count
473 }
474
475 #[must_use]
477 #[doc(hidden)]
478 pub fn tombstone_ratio(&self) -> f32 {
479 self.metadata.tombstone_count as f32 / self.metadata.key_count as f32
480 }
481
482 pub(crate) fn check_key_range_overlap(
484 &self,
485 bounds: &(Bound<UserKey>, Bound<UserKey>),
486 ) -> bool {
487 self.metadata.key_range.overlaps_with_bounds(bounds)
488 }
489}