abyssiniandb/filedb/inner/
htx.rs

1use super::super::{FileBufSizeParam, FileDbParams, HashBucketsParam};
2use super::piece::PieceMgr;
3use super::semtype::*;
4use super::vfile::VarFile;
5use rabuf::{SmallRead, SmallWrite};
6use std::cell::RefCell;
7use std::convert::TryInto;
8use std::fs::OpenOptions;
9use std::io::{Read, Result, Write};
10use std::path::Path;
11use std::rc::Rc;
12
13type HeaderSignature = [u8; 8];
14
15const CHUNK_SIZE: u32 = 32 * 4 * 1024;
16const HTX_HEADER_SZ: u64 = 128;
17const HTX_HEADER_SIGNATURE: HeaderSignature = [b'a', b'b', b'y', b's', b'd', b'b', b'H', 0u8];
18const DEFAULT_HT_SIZE: u64 = 16 * 1024 * 1024;
19
20#[derive(Debug)]
21pub struct VarFileHtxCache {
22    pub file: VarFile,
23    buckets_size: u64,
24    #[cfg(feature = "htx_print_hits")]
25    hits: u64,
26    #[cfg(feature = "htx_print_hits")]
27    miss: u64,
28}
29
30impl VarFileHtxCache {
31    fn new(file: VarFile) -> Self {
32        Self {
33            file,
34            buckets_size: 0,
35            #[cfg(feature = "htx_print_hits")]
36            hits: 0,
37            #[cfg(feature = "htx_print_hits")]
38            miss: 0,
39        }
40    }
41}
42
43#[derive(Debug, Clone)]
44pub struct HtxFile(pub Rc<RefCell<VarFileHtxCache>>);
45
46const HTX_SIZE_FREE_OFFSET: [u64; 0] = [];
47const HTX_SIZE_ARY: [u32; 0] = [];
48
49/// returns the number of buckets needed to hold the given number of items,
50/// taking the maximum load factor into account.
51fn capacity_to_buckets_size(cap: u64) -> u64 {
52    if cap == 0 {
53        panic!("capacity should NOT be zero.");
54    }
55    //
56    // for small hash buckets. minimum buckets size is 8.
57    if cap < 8 {
58        return 8;
59    }
60    //
61    // otherwise require 1/9 buckets to be empty (88.8% load)
62    let adjusted_cap = cap + cap / 8;
63    //
64    // Any overflows will have been caught by the checked_mul. Also, any
65    // rounding errors from the division above will be cleaned up by
66    // next_power_of_two (which can't overflow because of the previous division).
67    adjusted_cap.next_power_of_two()
68}
69
70impl HtxFile {
71    pub fn open_with_params<P: AsRef<Path>>(
72        path: P,
73        ks_name: &str,
74        sig2: HeaderSignature,
75        params: &FileDbParams,
76    ) -> Result<Self> {
77        let piece_mgr = PieceMgr::new(&HTX_SIZE_FREE_OFFSET, &HTX_SIZE_ARY);
78        let mut pb = path.as_ref().to_path_buf();
79        pb.push(format!("{ks_name}.htx"));
80        let std_file = OpenOptions::new()
81            .read(true)
82            .write(true)
83            .create(true)
84            .truncate(false)
85            .open(pb)?;
86        let mut file = match params.htx_buf_size {
87            FileBufSizeParam::Size(val) => {
88                let idx_buf_chunk_size = CHUNK_SIZE;
89                let idx_buf_num_chunks = val / idx_buf_chunk_size;
90                VarFile::with_capacity(
91                    piece_mgr,
92                    "htx",
93                    std_file,
94                    idx_buf_chunk_size,
95                    idx_buf_num_chunks.try_into().unwrap(),
96                )?
97            }
98            FileBufSizeParam::PerMille(val) => {
99                VarFile::with_per_mille(piece_mgr, "htx", std_file, CHUNK_SIZE, val)?
100            }
101            FileBufSizeParam::Auto => VarFile::new(piece_mgr, "htx", std_file)?,
102        };
103        let file_length: NodePieceOffset = file.seek_to_end()?;
104        //
105        let mut file_nc = VarFileHtxCache::new(file);
106        //
107        if file_length.is_zero() {
108            //
109            let buckets_size = match params.buckets_size {
110                HashBucketsParam::BucketsSize(x) => x.next_power_of_two(),
111                HashBucketsParam::Capacity(x) => capacity_to_buckets_size(x),
112                HashBucketsParam::Default => DEFAULT_HT_SIZE,
113            };
114            //
115            write_htxf_init_header(&mut file_nc.file, sig2, buckets_size)?;
116            #[cfg(feature = "htx_bitmap")]
117            let off = NodePieceOffset::new(HTX_HEADER_SZ + buckets_size * 8 + buckets_size / 8);
118            #[cfg(not(feature = "htx_bitmap"))]
119            let off = NodePieceOffset::new(HTX_HEADER_SZ + buckets_size * 8);
120            //
121            file_nc.file.set_file_length(off)?;
122            let off = NodePieceOffset::new(off.as_value() - 8);
123            file_nc.file.seek_from_start(off)?;
124            file_nc.file.write_u64_le(0)?;
125            //
126            file_nc.buckets_size = buckets_size;
127        } else {
128            check_htxf_header(&mut file_nc.file, sig2)?;
129            file_nc.buckets_size = file_nc.file.read_hash_buckets_size()?;
130        }
131        Ok(Self(Rc::new(RefCell::new(file_nc))))
132    }
133    #[inline]
134    pub fn read_fill_buffer(&self) -> Result<()> {
135        let mut locked = RefCell::borrow_mut(&self.0);
136        locked.file.read_fill_buffer()
137    }
138    #[inline]
139    pub fn flush(&self) -> Result<()> {
140        let mut locked = RefCell::borrow_mut(&self.0);
141        locked.file.flush()
142    }
143    #[inline]
144    pub fn sync_all(&self) -> Result<()> {
145        let mut locked = RefCell::borrow_mut(&self.0);
146        locked.file.sync_all()
147    }
148    #[inline]
149    pub fn sync_data(&self) -> Result<()> {
150        let mut locked = RefCell::borrow_mut(&self.0);
151        locked.file.sync_data()
152    }
153    #[cfg(feature = "rabuf_stats")]
154    #[inline]
155    pub fn buf_stats(&self) -> Vec<(String, i64)> {
156        let locked = RefCell::borrow(&self.0);
157        locked.file.buf_stats()
158    }
159    //
160    #[inline]
161    pub fn read_hash_buckets_size(&self) -> Result<u64> {
162        let mut locked = RefCell::borrow_mut(&self.0);
163        locked.file.read_hash_buckets_size()
164    }
165    #[inline]
166    pub fn read_key_piece_offset(&self, hash: HashValue) -> Result<KeyPieceOffset> {
167        let mut locked = RefCell::borrow_mut(&self.0);
168        let buckets_size = locked.buckets_size;
169        let idx = hash.as_value() % buckets_size;
170        locked.file.read_key_piece_offset(idx)
171    }
172    #[inline]
173    pub fn write_key_piece_offset(&self, hash: HashValue, offset: KeyPieceOffset) -> Result<()> {
174        let mut locked = RefCell::borrow_mut(&self.0);
175        let buckets_size = locked.buckets_size;
176        let idx = hash.as_value() % buckets_size;
177        locked
178            .file
179            .write_key_piece_offset(buckets_size, idx, offset)
180    }
181    #[cfg(feature = "htx_print_hits")]
182    #[inline]
183    pub fn set_hits(&mut self) {
184        let mut locked = RefCell::borrow_mut(&self.0);
185        locked.hits += 1;
186    }
187    #[cfg(feature = "htx_print_hits")]
188    #[inline]
189    pub fn set_miss(&mut self) {
190        let mut locked = RefCell::borrow_mut(&self.0);
191        locked.miss += 1;
192    }
193    #[inline]
194    pub fn read_item_count(&self) -> Result<u64> {
195        let mut locked = RefCell::borrow_mut(&self.0);
196        locked.file.read_item_count()
197    }
198    #[inline]
199    pub fn write_item_count_up(&mut self) -> Result<()> {
200        let mut locked = RefCell::borrow_mut(&self.0);
201        let val = locked.file.read_item_count()?;
202        locked.file.write_item_count(val + 1)
203    }
204    #[inline]
205    pub fn write_item_count_down(&mut self) -> Result<()> {
206        let mut locked = RefCell::borrow_mut(&self.0);
207        let val = locked.file.read_item_count()?;
208        if val > 0 {
209            locked.file.write_item_count(val - 1)
210        } else {
211            Ok(())
212        }
213    }
214}
215
216#[cfg(feature = "htx_print_hits")]
217impl Drop for HtxFile {
218    fn drop(&mut self) {
219        let (hits, miss) = {
220            let locked = RefCell::borrow_mut(&self.0);
221            (locked.hits, locked.miss)
222        };
223        let total = hits + miss;
224        let ratio = hits as f64 / total as f64;
225        eprintln!("htx hits: {}/{} [{:.2}%]", hits, total, 100.0 * ratio);
226    }
227}
228
229// for debug
230impl HtxFile {
231    pub fn _ht_size_and_count(&self) -> Result<(u64, u64)> {
232        let mut locked = RefCell::borrow_mut(&self.0);
233        let ht_size = locked.file.read_hash_buckets_size()?;
234        let count = locked.file.read_item_count()?;
235        Ok((ht_size, count))
236    }
237    pub fn htx_filling_rate_per_mill(&self) -> Result<(u64, u32)> {
238        let mut locked = RefCell::borrow_mut(&self.0);
239        let buckets_size = locked.buckets_size;
240        let mut count = 0;
241        for idx in 0..buckets_size {
242            let offset = locked.file.read_key_piece_offset(idx)?;
243            if !offset.is_zero() {
244                count += 1;
245            }
246        }
247        Ok((count, (count * 1000 / buckets_size) as u32))
248    }
249}
250
251/**
252write initiale header to file.
253
254## header map
255
256The htx header size is 128 bytes.
257
258```text
259+--------+-------+-------------+---------------------------+
260| offset | bytes | name        | comment                   |
261+--------+-------+-------------+---------------------------+
262| 0      | 8     | signature1  | b"abysdbH\0"              |
263| 8      | 8     | signature2  | 8 bytes type signature    |
264| 16     | 8     | ht size     | hash table size           |
265| 24     | 8     | count       | count of items            |
266| 32     | 96    | reserve1    |                           |
267+--------+-------+-------------+---------------------------+
268```
269
270- signature1: always fixed 8 bytes
271- signature2: 8 bytes type signature
272
273*/
274const HTX_HT_SIZE_OFFSET: u64 = 16;
275const HTX_ITEM_COUNT_OFFSET: u64 = 24;
276
277fn write_htxf_init_header(
278    file: &mut VarFile,
279    signature2: HeaderSignature,
280    buckets_size: u64,
281) -> Result<()> {
282    file.seek_from_start(NodePieceOffset::new(0))?;
283    // signature1
284    file.write_all(&HTX_HEADER_SIGNATURE)?;
285    // signature2
286    file.write_all(&signature2)?;
287    // buckets size
288    file.write_u64_le(buckets_size)?;
289    // count .. rserve1
290    file.write_all(&[0u8; 104])?;
291    //
292    Ok(())
293}
294
295fn check_htxf_header(file: &mut VarFile, signature2: HeaderSignature) -> Result<()> {
296    file.seek_from_start(NodePieceOffset::new(0))?;
297    // signature1
298    let mut sig1 = [0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8];
299    file.read_exact(&mut sig1)?;
300    assert!(sig1 == HTX_HEADER_SIGNATURE, "invalid header signature1");
301    // signature2
302    let mut sig2 = [0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8];
303    file.read_exact(&mut sig2)?;
304    assert!(
305        sig2 == signature2,
306        "invalid header signature2, type signature: {sig2:?}"
307    );
308    // top node offset
309    let _top_node_offset = file.read_u64_le()?;
310    assert!(_top_node_offset != 0, "invalid root offset");
311    //
312    Ok(())
313}
314
315impl VarFile {
316    fn read_hash_buckets_size(&mut self) -> Result<u64> {
317        self.seek_from_start(NodePieceOffset::new(HTX_HT_SIZE_OFFSET))?;
318        self.read_u64_le()
319    }
320    fn _read_hash_buckets_size(&mut self, val: u64) -> Result<()> {
321        self.seek_from_start(NodePieceOffset::new(HTX_HT_SIZE_OFFSET))?;
322        self.write_u64_le(val)
323    }
324    fn read_item_count(&mut self) -> Result<u64> {
325        self.seek_from_start(NodePieceOffset::new(HTX_ITEM_COUNT_OFFSET))?;
326        self.read_u64_le()
327    }
328    fn write_item_count(&mut self, val: u64) -> Result<()> {
329        self.seek_from_start(NodePieceOffset::new(HTX_ITEM_COUNT_OFFSET))?;
330        self.write_u64_le(val)
331    }
332    pub fn read_key_piece_offset(&mut self, idx: u64) -> Result<KeyPieceOffset> {
333        self.seek_from_start(NodePieceOffset::new(HTX_HEADER_SZ + 8 * idx))?;
334        self.read_u64_le().map(KeyPieceOffset::new)
335    }
336    fn write_key_piece_offset(
337        &mut self,
338        bucket_size: u64,
339        idx: u64,
340        offset: KeyPieceOffset,
341    ) -> Result<()> {
342        /*<CHECK>
343        let count = self.read_item_count()?;
344        if offset.is_zero() {
345            if count > 0 {
346                self.write_item_count(count - 1)?;
347            }
348        } else {
349            self.write_item_count(count + 1)?;
350        }
351        */
352        // write flag into bitmap
353        #[cfg(feature = "htx_bitmap")]
354        {
355            let bitmap_idx = idx / 8;
356            let bitmap_bit_idx = idx % 8;
357            //
358            let bimap_start = HTX_HEADER_SZ + bucket_size * 8;
359            self.seek_from_start(NodePieceOffset::new(bimap_start + bitmap_idx))?;
360            let mut byte = self.read_u8()?;
361            if offset.is_zero() {
362                byte &= !(1 << bitmap_bit_idx);
363            } else {
364                byte |= 1 << bitmap_bit_idx;
365            }
366            //
367            self.seek_from_start(NodePieceOffset::new(bimap_start + bitmap_idx))?;
368            self.write_u8(byte)?;
369        }
370        // store into bucket
371        self.seek_from_start(NodePieceOffset::new(HTX_HEADER_SZ + 8 * idx))?;
372        self.write_u64_le(offset.into())?;
373        //
374        Ok(())
375    }
376    pub fn next_key_piece_offset(
377        &mut self,
378        buckets_size: u64,
379        idx: u64,
380    ) -> Result<(u64, KeyPieceOffset)> {
381        // write flag into bitmap
382        #[cfg(feature = "htx_bitmap")]
383        let idx = {
384            let bitmap_idx = idx / 8;
385            let bitmap_bit_idx = idx % 8;
386            //
387            if bitmap_bit_idx == 0 {
388                let bimap_start = HTX_HEADER_SZ + buckets_size * 8;
389                self.seek_from_start(NodePieceOffset::new(bimap_start + bitmap_idx))?;
390                let mut idx = idx;
391                //
392                let mut byte_8 = 0;
393                while byte_8 == 0 && idx < buckets_size - 8 {
394                    byte_8 = self.read_u64_le()?;
395                    idx += 8 * 8;
396                }
397                if idx >= 8 * 8 {
398                    self.seek_back_size(NodePieceSize::new(std::mem::size_of_val(&byte_8) as u32))?;
399                    idx -= 8 * 8;
400                }
401                //
402                let mut byte = 0;
403                while byte == 0 && idx < buckets_size {
404                    byte = self.read_u8()?;
405                    idx += 8;
406                }
407                idx - 8
408            } else {
409                idx
410            }
411        };
412        //
413        self.seek_from_start(NodePieceOffset::new(HTX_HEADER_SZ + 8 * idx))?;
414        let mut idx = idx;
415        let mut off = 0;
416        while off == 0 && idx < buckets_size {
417            off = self.read_u64_le()?;
418            idx += 1;
419        }
420        Ok((idx, KeyPieceOffset::new(off)))
421    }
422}
423
424/*
425```text
426hash buckes table:
427+--------+-------+-------------+-----------------------------------+
428| offset | bytes | name        | comment                           |
429+--------+-------+-------------+-----------------------------------+
430| 0      | 8     | key offset  | offset of key piece               |
431| 8      | 8     | key offset  | offset of key piece               |
432| --     | --    | --          | --                                |
433| --     | 8     | key offset  | offset of key piece               |
434+--------+-------+-------------+-----------------------------------+
435
436hash buckes bitmap:
437+--------+-------+-------------+-----------------------------------+
438| offset | bytes | name        | comment                           |
439+--------+-------+-------------+-----------------------------------+
440| 0      | 8     | bitmap1     | bitmap of buckets index           |
441| --     | --    | --          | --                                |
442| --     | 8     | bitmapN     | bitmap of buckets index           |
443+--------+-------+-------------+-----------------------------------+
444```
445*/