abyssiniandb/filedb/inner/
htx.rs1use super::super::{FileBufSizeParam, FileDbParams, HashBucketsParam};
2use super::piece::PieceMgr;
3use super::semtype::*;
4use super::vfile::VarFile;
5use rabuf::{SmallRead, SmallWrite};
6use std::cell::RefCell;
7use std::convert::TryInto;
8use std::fs::OpenOptions;
9use std::io::{Read, Result, Write};
10use std::path::Path;
11use std::rc::Rc;
12
13type HeaderSignature = [u8; 8];
14
15const CHUNK_SIZE: u32 = 32 * 4 * 1024;
16const HTX_HEADER_SZ: u64 = 128;
17const HTX_HEADER_SIGNATURE: HeaderSignature = [b'a', b'b', b'y', b's', b'd', b'b', b'H', 0u8];
18const DEFAULT_HT_SIZE: u64 = 16 * 1024 * 1024;
19
20#[derive(Debug)]
21pub struct VarFileHtxCache {
22 pub file: VarFile,
23 buckets_size: u64,
24 #[cfg(feature = "htx_print_hits")]
25 hits: u64,
26 #[cfg(feature = "htx_print_hits")]
27 miss: u64,
28}
29
30impl VarFileHtxCache {
31 fn new(file: VarFile) -> Self {
32 Self {
33 file,
34 buckets_size: 0,
35 #[cfg(feature = "htx_print_hits")]
36 hits: 0,
37 #[cfg(feature = "htx_print_hits")]
38 miss: 0,
39 }
40 }
41}
42
43#[derive(Debug, Clone)]
44pub struct HtxFile(pub Rc<RefCell<VarFileHtxCache>>);
45
46const HTX_SIZE_FREE_OFFSET: [u64; 0] = [];
47const HTX_SIZE_ARY: [u32; 0] = [];
48
49fn capacity_to_buckets_size(cap: u64) -> u64 {
52 if cap == 0 {
53 panic!("capacity should NOT be zero.");
54 }
55 if cap < 8 {
58 return 8;
59 }
60 let adjusted_cap = cap + cap / 8;
63 adjusted_cap.next_power_of_two()
68}
69
70impl HtxFile {
71 pub fn open_with_params<P: AsRef<Path>>(
72 path: P,
73 ks_name: &str,
74 sig2: HeaderSignature,
75 params: &FileDbParams,
76 ) -> Result<Self> {
77 let piece_mgr = PieceMgr::new(&HTX_SIZE_FREE_OFFSET, &HTX_SIZE_ARY);
78 let mut pb = path.as_ref().to_path_buf();
79 pb.push(format!("{ks_name}.htx"));
80 let std_file = OpenOptions::new()
81 .read(true)
82 .write(true)
83 .create(true)
84 .truncate(false)
85 .open(pb)?;
86 let mut file = match params.htx_buf_size {
87 FileBufSizeParam::Size(val) => {
88 let idx_buf_chunk_size = CHUNK_SIZE;
89 let idx_buf_num_chunks = val / idx_buf_chunk_size;
90 VarFile::with_capacity(
91 piece_mgr,
92 "htx",
93 std_file,
94 idx_buf_chunk_size,
95 idx_buf_num_chunks.try_into().unwrap(),
96 )?
97 }
98 FileBufSizeParam::PerMille(val) => {
99 VarFile::with_per_mille(piece_mgr, "htx", std_file, CHUNK_SIZE, val)?
100 }
101 FileBufSizeParam::Auto => VarFile::new(piece_mgr, "htx", std_file)?,
102 };
103 let file_length: NodePieceOffset = file.seek_to_end()?;
104 let mut file_nc = VarFileHtxCache::new(file);
106 if file_length.is_zero() {
108 let buckets_size = match params.buckets_size {
110 HashBucketsParam::BucketsSize(x) => x.next_power_of_two(),
111 HashBucketsParam::Capacity(x) => capacity_to_buckets_size(x),
112 HashBucketsParam::Default => DEFAULT_HT_SIZE,
113 };
114 write_htxf_init_header(&mut file_nc.file, sig2, buckets_size)?;
116 #[cfg(feature = "htx_bitmap")]
117 let off = NodePieceOffset::new(HTX_HEADER_SZ + buckets_size * 8 + buckets_size / 8);
118 #[cfg(not(feature = "htx_bitmap"))]
119 let off = NodePieceOffset::new(HTX_HEADER_SZ + buckets_size * 8);
120 file_nc.file.set_file_length(off)?;
122 let off = NodePieceOffset::new(off.as_value() - 8);
123 file_nc.file.seek_from_start(off)?;
124 file_nc.file.write_u64_le(0)?;
125 file_nc.buckets_size = buckets_size;
127 } else {
128 check_htxf_header(&mut file_nc.file, sig2)?;
129 file_nc.buckets_size = file_nc.file.read_hash_buckets_size()?;
130 }
131 Ok(Self(Rc::new(RefCell::new(file_nc))))
132 }
133 #[inline]
134 pub fn read_fill_buffer(&self) -> Result<()> {
135 let mut locked = RefCell::borrow_mut(&self.0);
136 locked.file.read_fill_buffer()
137 }
138 #[inline]
139 pub fn flush(&self) -> Result<()> {
140 let mut locked = RefCell::borrow_mut(&self.0);
141 locked.file.flush()
142 }
143 #[inline]
144 pub fn sync_all(&self) -> Result<()> {
145 let mut locked = RefCell::borrow_mut(&self.0);
146 locked.file.sync_all()
147 }
148 #[inline]
149 pub fn sync_data(&self) -> Result<()> {
150 let mut locked = RefCell::borrow_mut(&self.0);
151 locked.file.sync_data()
152 }
153 #[cfg(feature = "rabuf_stats")]
154 #[inline]
155 pub fn buf_stats(&self) -> Vec<(String, i64)> {
156 let locked = RefCell::borrow(&self.0);
157 locked.file.buf_stats()
158 }
159 #[inline]
161 pub fn read_hash_buckets_size(&self) -> Result<u64> {
162 let mut locked = RefCell::borrow_mut(&self.0);
163 locked.file.read_hash_buckets_size()
164 }
165 #[inline]
166 pub fn read_key_piece_offset(&self, hash: HashValue) -> Result<KeyPieceOffset> {
167 let mut locked = RefCell::borrow_mut(&self.0);
168 let buckets_size = locked.buckets_size;
169 let idx = hash.as_value() % buckets_size;
170 locked.file.read_key_piece_offset(idx)
171 }
172 #[inline]
173 pub fn write_key_piece_offset(&self, hash: HashValue, offset: KeyPieceOffset) -> Result<()> {
174 let mut locked = RefCell::borrow_mut(&self.0);
175 let buckets_size = locked.buckets_size;
176 let idx = hash.as_value() % buckets_size;
177 locked
178 .file
179 .write_key_piece_offset(buckets_size, idx, offset)
180 }
181 #[cfg(feature = "htx_print_hits")]
182 #[inline]
183 pub fn set_hits(&mut self) {
184 let mut locked = RefCell::borrow_mut(&self.0);
185 locked.hits += 1;
186 }
187 #[cfg(feature = "htx_print_hits")]
188 #[inline]
189 pub fn set_miss(&mut self) {
190 let mut locked = RefCell::borrow_mut(&self.0);
191 locked.miss += 1;
192 }
193 #[inline]
194 pub fn read_item_count(&self) -> Result<u64> {
195 let mut locked = RefCell::borrow_mut(&self.0);
196 locked.file.read_item_count()
197 }
198 #[inline]
199 pub fn write_item_count_up(&mut self) -> Result<()> {
200 let mut locked = RefCell::borrow_mut(&self.0);
201 let val = locked.file.read_item_count()?;
202 locked.file.write_item_count(val + 1)
203 }
204 #[inline]
205 pub fn write_item_count_down(&mut self) -> Result<()> {
206 let mut locked = RefCell::borrow_mut(&self.0);
207 let val = locked.file.read_item_count()?;
208 if val > 0 {
209 locked.file.write_item_count(val - 1)
210 } else {
211 Ok(())
212 }
213 }
214}
215
216#[cfg(feature = "htx_print_hits")]
217impl Drop for HtxFile {
218 fn drop(&mut self) {
219 let (hits, miss) = {
220 let locked = RefCell::borrow_mut(&self.0);
221 (locked.hits, locked.miss)
222 };
223 let total = hits + miss;
224 let ratio = hits as f64 / total as f64;
225 eprintln!("htx hits: {}/{} [{:.2}%]", hits, total, 100.0 * ratio);
226 }
227}
228
229impl HtxFile {
231 pub fn _ht_size_and_count(&self) -> Result<(u64, u64)> {
232 let mut locked = RefCell::borrow_mut(&self.0);
233 let ht_size = locked.file.read_hash_buckets_size()?;
234 let count = locked.file.read_item_count()?;
235 Ok((ht_size, count))
236 }
237 pub fn htx_filling_rate_per_mill(&self) -> Result<(u64, u32)> {
238 let mut locked = RefCell::borrow_mut(&self.0);
239 let buckets_size = locked.buckets_size;
240 let mut count = 0;
241 for idx in 0..buckets_size {
242 let offset = locked.file.read_key_piece_offset(idx)?;
243 if !offset.is_zero() {
244 count += 1;
245 }
246 }
247 Ok((count, (count * 1000 / buckets_size) as u32))
248 }
249}
250
251const HTX_HT_SIZE_OFFSET: u64 = 16;
275const HTX_ITEM_COUNT_OFFSET: u64 = 24;
276
277fn write_htxf_init_header(
278 file: &mut VarFile,
279 signature2: HeaderSignature,
280 buckets_size: u64,
281) -> Result<()> {
282 file.seek_from_start(NodePieceOffset::new(0))?;
283 file.write_all(&HTX_HEADER_SIGNATURE)?;
285 file.write_all(&signature2)?;
287 file.write_u64_le(buckets_size)?;
289 file.write_all(&[0u8; 104])?;
291 Ok(())
293}
294
295fn check_htxf_header(file: &mut VarFile, signature2: HeaderSignature) -> Result<()> {
296 file.seek_from_start(NodePieceOffset::new(0))?;
297 let mut sig1 = [0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8];
299 file.read_exact(&mut sig1)?;
300 assert!(sig1 == HTX_HEADER_SIGNATURE, "invalid header signature1");
301 let mut sig2 = [0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8];
303 file.read_exact(&mut sig2)?;
304 assert!(
305 sig2 == signature2,
306 "invalid header signature2, type signature: {sig2:?}"
307 );
308 let _top_node_offset = file.read_u64_le()?;
310 assert!(_top_node_offset != 0, "invalid root offset");
311 Ok(())
313}
314
315impl VarFile {
316 fn read_hash_buckets_size(&mut self) -> Result<u64> {
317 self.seek_from_start(NodePieceOffset::new(HTX_HT_SIZE_OFFSET))?;
318 self.read_u64_le()
319 }
320 fn _read_hash_buckets_size(&mut self, val: u64) -> Result<()> {
321 self.seek_from_start(NodePieceOffset::new(HTX_HT_SIZE_OFFSET))?;
322 self.write_u64_le(val)
323 }
324 fn read_item_count(&mut self) -> Result<u64> {
325 self.seek_from_start(NodePieceOffset::new(HTX_ITEM_COUNT_OFFSET))?;
326 self.read_u64_le()
327 }
328 fn write_item_count(&mut self, val: u64) -> Result<()> {
329 self.seek_from_start(NodePieceOffset::new(HTX_ITEM_COUNT_OFFSET))?;
330 self.write_u64_le(val)
331 }
332 pub fn read_key_piece_offset(&mut self, idx: u64) -> Result<KeyPieceOffset> {
333 self.seek_from_start(NodePieceOffset::new(HTX_HEADER_SZ + 8 * idx))?;
334 self.read_u64_le().map(KeyPieceOffset::new)
335 }
336 fn write_key_piece_offset(
337 &mut self,
338 bucket_size: u64,
339 idx: u64,
340 offset: KeyPieceOffset,
341 ) -> Result<()> {
342 #[cfg(feature = "htx_bitmap")]
354 {
355 let bitmap_idx = idx / 8;
356 let bitmap_bit_idx = idx % 8;
357 let bimap_start = HTX_HEADER_SZ + bucket_size * 8;
359 self.seek_from_start(NodePieceOffset::new(bimap_start + bitmap_idx))?;
360 let mut byte = self.read_u8()?;
361 if offset.is_zero() {
362 byte &= !(1 << bitmap_bit_idx);
363 } else {
364 byte |= 1 << bitmap_bit_idx;
365 }
366 self.seek_from_start(NodePieceOffset::new(bimap_start + bitmap_idx))?;
368 self.write_u8(byte)?;
369 }
370 self.seek_from_start(NodePieceOffset::new(HTX_HEADER_SZ + 8 * idx))?;
372 self.write_u64_le(offset.into())?;
373 Ok(())
375 }
376 pub fn next_key_piece_offset(
377 &mut self,
378 buckets_size: u64,
379 idx: u64,
380 ) -> Result<(u64, KeyPieceOffset)> {
381 #[cfg(feature = "htx_bitmap")]
383 let idx = {
384 let bitmap_idx = idx / 8;
385 let bitmap_bit_idx = idx % 8;
386 if bitmap_bit_idx == 0 {
388 let bimap_start = HTX_HEADER_SZ + buckets_size * 8;
389 self.seek_from_start(NodePieceOffset::new(bimap_start + bitmap_idx))?;
390 let mut idx = idx;
391 let mut byte_8 = 0;
393 while byte_8 == 0 && idx < buckets_size - 8 {
394 byte_8 = self.read_u64_le()?;
395 idx += 8 * 8;
396 }
397 if idx >= 8 * 8 {
398 self.seek_back_size(NodePieceSize::new(std::mem::size_of_val(&byte_8) as u32))?;
399 idx -= 8 * 8;
400 }
401 let mut byte = 0;
403 while byte == 0 && idx < buckets_size {
404 byte = self.read_u8()?;
405 idx += 8;
406 }
407 idx - 8
408 } else {
409 idx
410 }
411 };
412 self.seek_from_start(NodePieceOffset::new(HTX_HEADER_SZ + 8 * idx))?;
414 let mut idx = idx;
415 let mut off = 0;
416 while off == 0 && idx < buckets_size {
417 off = self.read_u64_le()?;
418 idx += 1;
419 }
420 Ok((idx, KeyPieceOffset::new(off)))
421 }
422}
423
424