persy/index/
config.rs

1use crate::{
2    config::Config,
3    error::{CreateIndexError, DropIndexError, IndexChangeError, IndexError, IndexOpsError, PERes},
4    id::{IndexId, PersyId, RecRef},
5    index::{
6        bytevec::ByteVec,
7        keeper::IndexSegmentKeeper,
8        keeper_tx::{ExternalRefs, IndexSegmentKeeperTx},
9        serialization::IndexSerialization,
10    },
11    persy::PersyImpl,
12    snapshots::SnapshotRef,
13    transaction::tx_impl::TransactionImpl,
14    util::io::{ArcSliceRead, InfallibleRead, InfallibleReadFormat, InfallibleWrite, InfallibleWriteFormat},
15};
16use std::{
17    collections::{hash_map::Entry, HashMap},
18    fmt::Display,
19    str,
20    sync::{Arc, Mutex},
21};
22
23pub static INDEX_PAGE_DEFAULT_MIN: usize = 32;
24pub static INDEX_PAGE_DEFAULT_MAX: usize = 128;
25
26/// Enum of all the possible Key or Value types for indexes
27#[derive(Clone)]
28pub enum IndexTypeId {
29    U8,
30    U16,
31    U32,
32    U64,
33    U128,
34    I8,
35    I16,
36    I32,
37    I64,
38    I128,
39    F32W,
40    F64W,
41    String,
42    PersyId,
43    ByteVec,
44}
45
46impl From<u8> for IndexTypeId {
47    fn from(val: u8) -> IndexTypeId {
48        match val {
49            1 => IndexTypeId::U8,
50            2 => IndexTypeId::U16,
51            3 => IndexTypeId::U32,
52            4 => IndexTypeId::U64,
53            14 => IndexTypeId::U128,
54            5 => IndexTypeId::I8,
55            6 => IndexTypeId::I16,
56            7 => IndexTypeId::I32,
57            8 => IndexTypeId::I64,
58            15 => IndexTypeId::I128,
59            9 => IndexTypeId::F32W,
60            10 => IndexTypeId::F64W,
61            12 => IndexTypeId::String,
62            13 => IndexTypeId::PersyId,
63            16 => IndexTypeId::ByteVec,
64            _ => panic!("type node defined for {}", val),
65        }
66    }
67}
68
69pub trait IndexType: IndexTypeWrap + Clone {}
70
71/// Trait implemented by all supported types in the index
72#[cfg(not(feature = "index_container_static"))]
73pub trait IndexTypeInternal: Display + IndexOrd + Clone + IndexSerialization + IndexTypeUnwrap + 'static {
74    fn get_id() -> u8;
75    fn get_type_id() -> IndexTypeId;
76    fn over_size_limit(&self) -> bool {
77        false
78    }
79}
80
81#[cfg(feature = "index_container_static")]
82pub trait IndexTypeInternal:
83    Display + IndexOrd + Clone + crate::index::entries_container::Extractor + IndexSerialization + IndexTypeUnwrap + 'static
84{
85    fn get_id() -> u8;
86    fn get_type_id() -> IndexTypeId;
87    fn over_size_limit(&self) -> bool {
88        false
89    }
90}
91
92pub(crate) fn check_over_size_limit(len: usize) -> bool {
93    len > 1024 * 512
94}
95
96pub trait IndexTypeUnwrap {
97    type Wrapped;
98    fn unwrap(self) -> Self::Wrapped;
99}
100
101pub trait IndexTypeWrap: Sized {
102    type Wrapper: IndexTypeInternal + IndexTypeUnwrap<Wrapped = Self>;
103    fn wrap(self) -> Self::Wrapper
104    where
105        Self: Sized;
106}
107
108macro_rules! impl_wrapper_trait_self {
109    ($($t:ty),+,) => {
110        $(
111        impl IndexTypeUnwrap for $t {
112            type Wrapped = $t;
113            fn unwrap(self) -> $t {
114                self
115            }
116        }
117        impl IndexTypeWrap for $t {
118            type Wrapper = $t;
119            fn wrap(self) -> $t {
120                self
121            }
122        }
123
124        )+
125    }
126}
127impl_wrapper_trait_self!(u8, u16, u32, u64, u128, i8, i16, i32, i64, i128, f32, f64, PersyId, ByteVec,);
128
129macro_rules! impl_index_type {
130    ($t:ty, $v:expr,$v1:ident) => {
131        impl IndexTypeInternal for $t {
132            fn get_id() -> u8 {
133                $v
134            }
135            fn get_type_id() -> IndexTypeId {
136                IndexTypeId::$v1
137            }
138        }
139        impl IndexType for $t {}
140    };
141}
142
143impl_index_type!(u8, 1, U8);
144impl_index_type!(u16, 2, U16);
145impl_index_type!(u32, 3, U32);
146impl_index_type!(u64, 4, U64);
147impl_index_type!(u128, 14, U128);
148impl_index_type!(i8, 5, I8);
149impl_index_type!(i16, 6, I16);
150impl_index_type!(i32, 7, I32);
151impl_index_type!(i64, 8, I64);
152impl_index_type!(i128, 15, I128);
153impl_index_type!(f32, 9, F32W);
154impl_index_type!(f64, 10, F64W);
155impl_index_type!(PersyId, 13, PersyId);
156
157impl IndexType for String {}
158
159pub trait IndexOrd {
160    fn cmp(&self, other: &Self) -> std::cmp::Ordering;
161}
162
163macro_rules! impl_index_ord {
164    ($($t:ty),+) => {
165        $(
166        impl IndexOrd for $t {
167            fn cmp(&self, other: &Self) -> std::cmp::Ordering {
168                std::cmp::Ord::cmp(self, other)
169            }
170        }
171        )+
172    };
173}
174
175impl_index_ord!(u8, u16, u32, u64, u128, i8, i16, i32, i64, i128, PersyId, ByteVec);
176
177impl IndexOrd for f32 {
178    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
179        if self.is_nan() {
180            if other.is_nan() {
181                std::cmp::Ordering::Equal
182            } else {
183                std::cmp::Ordering::Less
184            }
185        } else if other.is_nan() {
186            std::cmp::Ordering::Greater
187        } else {
188            std::cmp::PartialOrd::partial_cmp(self, other).unwrap()
189        }
190    }
191}
192
193impl IndexOrd for f64 {
194    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
195        if self.is_nan() {
196            if other.is_nan() {
197                std::cmp::Ordering::Equal
198            } else {
199                std::cmp::Ordering::Less
200            }
201        } else if other.is_nan() {
202            std::cmp::Ordering::Greater
203        } else {
204            std::cmp::PartialOrd::partial_cmp(self, other).unwrap()
205        }
206    }
207}
208
209pub const INDEX_META_PREFIX: &str = "+_M";
210pub const INDEX_DATA_PREFIX: &str = "+_D";
211
212pub fn is_index_name_meta(name: &str) -> bool {
213    name.starts_with(INDEX_META_PREFIX)
214}
215
216pub fn is_index_name_data(name: &str) -> bool {
217    name.starts_with(INDEX_DATA_PREFIX)
218}
219
220pub fn change_segment_meta_name_to_index_name(segment_name: &mut String) {
221    segment_name.drain(..INDEX_META_PREFIX.len());
222}
223pub fn index_name_from_meta_segment(segment_name: &str) -> String {
224    let mut name = segment_name.to_string();
225    name.drain(..INDEX_META_PREFIX.len());
226    name
227}
228pub fn format_segment_name_meta(index_name: &str) -> String {
229    format!("{}{}", INDEX_META_PREFIX, index_name)
230}
231
232pub fn format_segment_name_data(index_name: &str) -> String {
233    format!("{}{}", INDEX_DATA_PREFIX, index_name)
234}
235
236/// Define the behavior of the index in case a key value pair already exists
237#[derive(Clone, Debug, PartialEq, Eq)]
238pub enum ValueMode {
239    /// An error will return if a key value pair already exists
240    Exclusive,
241    /// The value will be add to a list of values for the key, duplicate value will be collapsed to
242    /// only one entry
243    Cluster,
244    /// The existing value will be replaced with the new value if a key value pair already exists
245    Replace,
246}
247
248impl From<u8> for ValueMode {
249    fn from(value: u8) -> Self {
250        match value {
251            1 => ValueMode::Exclusive,
252            2 => ValueMode::Cluster,
253            3 => ValueMode::Replace,
254            _ => unreachable!("is impossible to get a value mode from values not 1,2,3"),
255        }
256    }
257}
258
259impl ValueMode {
260    fn to_u8(&self) -> u8 {
261        match self {
262            ValueMode::Exclusive => 1,
263            ValueMode::Cluster => 2,
264            ValueMode::Replace => 3,
265        }
266    }
267}
268
269#[derive(Default)]
270struct ConfigIdCache {
271    cache: HashMap<IndexId, RecRef>,
272    hit_count: u32,
273}
274
275pub struct Indexes {
276    config_ids: Mutex<ConfigIdCache>,
277}
278
279#[derive(Clone, Debug, PartialEq, Eq)]
280pub struct IndexConfig {
281    pub name: String,
282    root: Option<RecRef>,
283    pub key_type: u8,
284    pub value_type: u8,
285    page_min: usize,
286    page_max: usize,
287    pub value_mode: ValueMode,
288}
289
290impl IndexConfig {
291    fn serialize(&self, w: &mut dyn InfallibleWrite) {
292        w.write_u8(0);
293        self.serialize_v0(w)
294    }
295    fn serialize_v0(&self, w: &mut dyn InfallibleWrite) {
296        if let Some(ref root) = self.root {
297            w.write_u64(root.page);
298            w.write_u32(root.pos);
299        } else {
300            w.write_u64(0);
301            w.write_u32(0);
302        }
303        w.write_u8(self.key_type);
304        w.write_u8(self.value_type);
305        w.write_u32(self.page_min as u32);
306        w.write_u32(self.page_max as u32);
307        w.write_u8(self.value_mode.to_u8());
308        w.write_u16(self.name.len() as u16);
309        w.write_all(self.name.as_bytes());
310    }
311    fn deserialize(r: &mut dyn InfallibleRead) -> PERes<IndexConfig> {
312        let version = r.read_u8();
313        match version {
314            0u8 => IndexConfig::deserialize_v0(r),
315            _ => panic!("unsupported disk format"),
316        }
317    }
318    fn deserialize_v0(r: &mut dyn InfallibleRead) -> PERes<IndexConfig> {
319        let index_root_page = r.read_u64();
320        let index_root_pos = r.read_u32();
321        let key_type = r.read_u8();
322        let value_type = r.read_u8();
323        let page_min = r.read_u32() as usize;
324        let page_max = r.read_u32() as usize;
325        let value_mode = ValueMode::from(r.read_u8());
326
327        let name_size = r.read_u16() as usize;
328        let mut slice: Vec<u8> = vec![0; name_size];
329        r.read_exact(&mut slice);
330        let name: String = str::from_utf8(&slice[0..name_size])?.into();
331        let root = if index_root_page != 0 && index_root_pos != 0 {
332            Some(RecRef::new(index_root_page, index_root_pos))
333        } else {
334            None
335        };
336        Ok(IndexConfig {
337            name,
338            root,
339            key_type,
340            value_type,
341            page_min,
342            page_max,
343            value_mode,
344        })
345    }
346
347    pub fn check<K: IndexTypeInternal, V: IndexTypeInternal>(&self) -> Result<(), IndexOpsError> {
348        if self.key_type != K::get_id() {
349            Err(IndexOpsError::IndexTypeMismatch("key type".into()))
350        } else if self.value_type != V::get_id() {
351            Err(IndexOpsError::IndexTypeMismatch("value type".into()))
352        } else {
353            Ok(())
354        }
355    }
356    pub fn get_root(&self) -> Option<RecRef> {
357        self.root
358    }
359}
360
361impl Indexes {
362    pub fn new(_config: &Arc<Config>) -> Indexes {
363        Indexes {
364            config_ids: Default::default(),
365        }
366    }
367
368    pub fn create_index<K, V>(
369        p: &PersyImpl,
370        tx: &mut TransactionImpl,
371        name: &str,
372        min: usize,
373        max: usize,
374        value_mode: ValueMode,
375    ) -> Result<(), CreateIndexError>
376    where
377        K: IndexTypeInternal,
378        V: IndexTypeInternal,
379    {
380        debug_assert!(min <= max / 2);
381        let segment_name_meta = format_segment_name_meta(name);
382        p.create_segment(tx, &segment_name_meta)?;
383        let segment_name_data = format_segment_name_data(name);
384        p.create_segment(tx, &segment_name_data)?;
385        let cfg = IndexConfig {
386            name: name.to_string(),
387            root: None,
388            key_type: K::get_id(),
389            value_type: V::get_id(),
390            page_min: min,
391            page_max: max,
392            value_mode,
393        };
394        let mut scfg = Vec::new();
395        cfg.serialize(&mut scfg);
396        p.insert_record(tx, &segment_name_meta, &scfg)?;
397        Ok(())
398    }
399
400    pub fn drop_index(p: &PersyImpl, tx: &mut TransactionImpl, name: &str) -> Result<(), DropIndexError> {
401        let segment_name_meta = format_segment_name_meta(name);
402        p.drop_segment(tx, &segment_name_meta)?;
403        let segment_name_data = format_segment_name_data(name);
404        p.drop_segment(tx, &segment_name_data)?;
405        Ok(())
406    }
407
408    pub fn update_index_root(
409        p: &PersyImpl,
410        tx: &mut TransactionImpl,
411        index_id: &IndexId,
412        root: Option<RecRef>,
413    ) -> Result<(), IndexChangeError> {
414        let mut scan = p.scan_tx(tx, index_id.get_meta_id())?;
415        let metadata = scan.next(p, tx);
416        drop(scan);
417
418        let (id, mut config) = if let Some((rid, content, _)) = metadata {
419            (rid, IndexConfig::deserialize(&mut ArcSliceRead::new_vec(content))?)
420        } else {
421            return Err(IndexChangeError::IndexNotFound);
422        };
423
424        if config.root != root {
425            config.root = root;
426            let mut scfg = Vec::new();
427            config.serialize(&mut scfg);
428            p.update(tx, index_id.get_meta_id(), &id.0, &scfg)?;
429        }
430        Ok(())
431    }
432
433    pub fn get_index_tx(
434        p: &PersyImpl,
435        tx: &TransactionImpl,
436        index_id: &IndexId,
437    ) -> Result<(IndexConfig, u16), IndexError> {
438        let mut scan = p.scan_tx(tx, index_id.get_meta_id())?;
439        let metadata = scan.next(p, tx);
440        drop(scan);
441        if let Some((_, content, version)) = metadata {
442            Ok((IndexConfig::deserialize(&mut ArcSliceRead::new_vec(content))?, version))
443        } else {
444            Err(IndexError::IndexNotFound)
445        }
446    }
447
448    pub fn get_config_id(p: &PersyImpl, tx: &mut TransactionImpl, index_id: &IndexId) -> Result<PersyId, IndexError> {
449        let mut scan = p.scan_tx(tx, index_id.get_meta_id())?;
450        let metadata = scan.next(p, tx);
451        drop(scan);
452        if let Some((id, _, _)) = metadata {
453            Ok(id)
454        } else {
455            Err(IndexError::IndexNotFound)
456        }
457    }
458
459    pub fn get_index(p: &PersyImpl, snapshot_ref: &SnapshotRef, index_id: &IndexId) -> Result<IndexConfig, IndexError> {
460        let mut indexes = p.indexes().config_ids.lock().unwrap();
461        let segment_meta = index_id.get_meta_id();
462        indexes.hit_count += 1;
463        if indexes.hit_count > 1000 {
464            indexes.hit_count = 0;
465            indexes.cache.retain(|i, _| p.exists_segment_by_id(&i.get_meta_id()));
466        }
467        match indexes.cache.entry(index_id.clone()) {
468            Entry::Occupied(o) => {
469                let id = o.get();
470                let info_read = p
471                    .read_snap_fn(segment_meta, id, snapshot_ref, |mut c| IndexConfig::deserialize(&mut c))
472                    .map_err(IndexError::from)?;
473                if let Some(info) = info_read {
474                    Ok(info.map_err(IndexError::from)?)
475                } else {
476                    o.remove();
477                    Err(IndexError::IndexNotFound)
478                }
479            }
480            Entry::Vacant(v) => {
481                let (id, index) = p
482                    .scan_snapshot_index(segment_meta, snapshot_ref)?
483                    .next(p)
484                    .map(|(id, content)| {
485                        Ok((
486                            id,
487                            IndexConfig::deserialize(&mut ArcSliceRead::new_vec(content)).map_err(IndexError::from)?,
488                        ))
489                    })
490                    .unwrap_or(Err(IndexError::IndexNotFound))?;
491                v.insert(id.0);
492                Ok(index)
493            }
494        }
495    }
496
497    pub fn get_index_keeper<'a, K: IndexTypeInternal, V: IndexTypeInternal>(
498        p: &'a PersyImpl,
499        snapshot: &SnapshotRef,
500        index_id: &IndexId,
501    ) -> Result<IndexSegmentKeeper<'a>, IndexOpsError> {
502        let config = Indexes::get_index(p, snapshot, index_id)?;
503        config.check::<K, V>()?;
504        Ok(IndexSegmentKeeper::new(
505            &config.name,
506            index_id,
507            config.root,
508            p,
509            snapshot,
510            config.value_mode,
511        ))
512    }
513
514    pub fn get_index_keeper_tx_read<'a, K: IndexTypeInternal, V: IndexTypeInternal>(
515        p: &'a PersyImpl,
516        snapshot: &SnapshotRef,
517        tx: &'a mut TransactionImpl,
518        index_id: &IndexId,
519    ) -> Result<IndexSegmentKeeper<'a>, IndexOpsError> {
520        let (config, _) = Indexes::get_index_tx(p, tx, index_id)?;
521        config.check::<K, V>()?;
522        Ok(IndexSegmentKeeper::new(
523            &config.name,
524            index_id,
525            config.root,
526            p,
527            snapshot,
528            config.value_mode,
529        ))
530    }
531
532    pub fn get_index_keeper_tx<'a, K: IndexTypeInternal, V: IndexTypeInternal>(
533        store: ExternalRefs<'a>,
534        index_id: &IndexId,
535    ) -> Result<IndexSegmentKeeperTx<'a, K, V>, IndexOpsError> {
536        let (config, version) = Indexes::get_index_tx(store.persy, store.tx, index_id)?;
537        config.check::<K, V>()?;
538        Ok(IndexSegmentKeeperTx::new(
539            &config.name,
540            index_id,
541            config.root,
542            version,
543            store,
544            config.value_mode,
545            config.page_min,
546            config.page_max,
547        ))
548    }
549
550    pub fn check_index<K: IndexTypeInternal, V: IndexTypeInternal>(
551        p: &PersyImpl,
552        tx: &mut TransactionImpl,
553        index_id: &IndexId,
554    ) -> Result<(), IndexOpsError> {
555        let (config, _version) = Indexes::get_index_tx(p, tx, index_id)?;
556        config.check::<K, V>()
557    }
558}
559
560#[cfg(test)]
561mod tests;