1use crate::{
2 config::Config,
3 error::{CreateIndexError, DropIndexError, IndexChangeError, IndexError, IndexOpsError, PERes},
4 id::{IndexId, PersyId, RecRef},
5 index::{
6 bytevec::ByteVec,
7 keeper::IndexSegmentKeeper,
8 keeper_tx::{ExternalRefs, IndexSegmentKeeperTx},
9 serialization::IndexSerialization,
10 },
11 persy::PersyImpl,
12 snapshots::SnapshotRef,
13 transaction::tx_impl::TransactionImpl,
14 util::io::{ArcSliceRead, InfallibleRead, InfallibleReadFormat, InfallibleWrite, InfallibleWriteFormat},
15};
16use std::{
17 collections::{hash_map::Entry, HashMap},
18 fmt::Display,
19 str,
20 sync::{Arc, Mutex},
21};
22
23pub static INDEX_PAGE_DEFAULT_MIN: usize = 32;
24pub static INDEX_PAGE_DEFAULT_MAX: usize = 128;
25
26#[derive(Clone)]
28pub enum IndexTypeId {
29 U8,
30 U16,
31 U32,
32 U64,
33 U128,
34 I8,
35 I16,
36 I32,
37 I64,
38 I128,
39 F32W,
40 F64W,
41 String,
42 PersyId,
43 ByteVec,
44}
45
46impl From<u8> for IndexTypeId {
47 fn from(val: u8) -> IndexTypeId {
48 match val {
49 1 => IndexTypeId::U8,
50 2 => IndexTypeId::U16,
51 3 => IndexTypeId::U32,
52 4 => IndexTypeId::U64,
53 14 => IndexTypeId::U128,
54 5 => IndexTypeId::I8,
55 6 => IndexTypeId::I16,
56 7 => IndexTypeId::I32,
57 8 => IndexTypeId::I64,
58 15 => IndexTypeId::I128,
59 9 => IndexTypeId::F32W,
60 10 => IndexTypeId::F64W,
61 12 => IndexTypeId::String,
62 13 => IndexTypeId::PersyId,
63 16 => IndexTypeId::ByteVec,
64 _ => panic!("type node defined for {}", val),
65 }
66 }
67}
68
69pub trait IndexType: IndexTypeWrap + Clone {}
70
71#[cfg(not(feature = "index_container_static"))]
73pub trait IndexTypeInternal: Display + IndexOrd + Clone + IndexSerialization + IndexTypeUnwrap + 'static {
74 fn get_id() -> u8;
75 fn get_type_id() -> IndexTypeId;
76 fn over_size_limit(&self) -> bool {
77 false
78 }
79}
80
81#[cfg(feature = "index_container_static")]
82pub trait IndexTypeInternal:
83 Display + IndexOrd + Clone + crate::index::entries_container::Extractor + IndexSerialization + IndexTypeUnwrap + 'static
84{
85 fn get_id() -> u8;
86 fn get_type_id() -> IndexTypeId;
87 fn over_size_limit(&self) -> bool {
88 false
89 }
90}
91
92pub(crate) fn check_over_size_limit(len: usize) -> bool {
93 len > 1024 * 512
94}
95
96pub trait IndexTypeUnwrap {
97 type Wrapped;
98 fn unwrap(self) -> Self::Wrapped;
99}
100
101pub trait IndexTypeWrap: Sized {
102 type Wrapper: IndexTypeInternal + IndexTypeUnwrap<Wrapped = Self>;
103 fn wrap(self) -> Self::Wrapper
104 where
105 Self: Sized;
106}
107
108macro_rules! impl_wrapper_trait_self {
109 ($($t:ty),+,) => {
110 $(
111 impl IndexTypeUnwrap for $t {
112 type Wrapped = $t;
113 fn unwrap(self) -> $t {
114 self
115 }
116 }
117 impl IndexTypeWrap for $t {
118 type Wrapper = $t;
119 fn wrap(self) -> $t {
120 self
121 }
122 }
123
124 )+
125 }
126}
127impl_wrapper_trait_self!(u8, u16, u32, u64, u128, i8, i16, i32, i64, i128, f32, f64, PersyId, ByteVec,);
128
129macro_rules! impl_index_type {
130 ($t:ty, $v:expr,$v1:ident) => {
131 impl IndexTypeInternal for $t {
132 fn get_id() -> u8 {
133 $v
134 }
135 fn get_type_id() -> IndexTypeId {
136 IndexTypeId::$v1
137 }
138 }
139 impl IndexType for $t {}
140 };
141}
142
143impl_index_type!(u8, 1, U8);
144impl_index_type!(u16, 2, U16);
145impl_index_type!(u32, 3, U32);
146impl_index_type!(u64, 4, U64);
147impl_index_type!(u128, 14, U128);
148impl_index_type!(i8, 5, I8);
149impl_index_type!(i16, 6, I16);
150impl_index_type!(i32, 7, I32);
151impl_index_type!(i64, 8, I64);
152impl_index_type!(i128, 15, I128);
153impl_index_type!(f32, 9, F32W);
154impl_index_type!(f64, 10, F64W);
155impl_index_type!(PersyId, 13, PersyId);
156
157impl IndexType for String {}
158
159pub trait IndexOrd {
160 fn cmp(&self, other: &Self) -> std::cmp::Ordering;
161}
162
163macro_rules! impl_index_ord {
164 ($($t:ty),+) => {
165 $(
166 impl IndexOrd for $t {
167 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
168 std::cmp::Ord::cmp(self, other)
169 }
170 }
171 )+
172 };
173}
174
175impl_index_ord!(u8, u16, u32, u64, u128, i8, i16, i32, i64, i128, PersyId, ByteVec);
176
177impl IndexOrd for f32 {
178 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
179 if self.is_nan() {
180 if other.is_nan() {
181 std::cmp::Ordering::Equal
182 } else {
183 std::cmp::Ordering::Less
184 }
185 } else if other.is_nan() {
186 std::cmp::Ordering::Greater
187 } else {
188 std::cmp::PartialOrd::partial_cmp(self, other).unwrap()
189 }
190 }
191}
192
193impl IndexOrd for f64 {
194 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
195 if self.is_nan() {
196 if other.is_nan() {
197 std::cmp::Ordering::Equal
198 } else {
199 std::cmp::Ordering::Less
200 }
201 } else if other.is_nan() {
202 std::cmp::Ordering::Greater
203 } else {
204 std::cmp::PartialOrd::partial_cmp(self, other).unwrap()
205 }
206 }
207}
208
209pub const INDEX_META_PREFIX: &str = "+_M";
210pub const INDEX_DATA_PREFIX: &str = "+_D";
211
212pub fn is_index_name_meta(name: &str) -> bool {
213 name.starts_with(INDEX_META_PREFIX)
214}
215
216pub fn is_index_name_data(name: &str) -> bool {
217 name.starts_with(INDEX_DATA_PREFIX)
218}
219
220pub fn change_segment_meta_name_to_index_name(segment_name: &mut String) {
221 segment_name.drain(..INDEX_META_PREFIX.len());
222}
223pub fn index_name_from_meta_segment(segment_name: &str) -> String {
224 let mut name = segment_name.to_string();
225 name.drain(..INDEX_META_PREFIX.len());
226 name
227}
228pub fn format_segment_name_meta(index_name: &str) -> String {
229 format!("{}{}", INDEX_META_PREFIX, index_name)
230}
231
232pub fn format_segment_name_data(index_name: &str) -> String {
233 format!("{}{}", INDEX_DATA_PREFIX, index_name)
234}
235
236#[derive(Clone, Debug, PartialEq, Eq)]
238pub enum ValueMode {
239 Exclusive,
241 Cluster,
244 Replace,
246}
247
248impl From<u8> for ValueMode {
249 fn from(value: u8) -> Self {
250 match value {
251 1 => ValueMode::Exclusive,
252 2 => ValueMode::Cluster,
253 3 => ValueMode::Replace,
254 _ => unreachable!("is impossible to get a value mode from values not 1,2,3"),
255 }
256 }
257}
258
259impl ValueMode {
260 fn to_u8(&self) -> u8 {
261 match self {
262 ValueMode::Exclusive => 1,
263 ValueMode::Cluster => 2,
264 ValueMode::Replace => 3,
265 }
266 }
267}
268
269#[derive(Default)]
270struct ConfigIdCache {
271 cache: HashMap<IndexId, RecRef>,
272 hit_count: u32,
273}
274
275pub struct Indexes {
276 config_ids: Mutex<ConfigIdCache>,
277}
278
279#[derive(Clone, Debug, PartialEq, Eq)]
280pub struct IndexConfig {
281 pub name: String,
282 root: Option<RecRef>,
283 pub key_type: u8,
284 pub value_type: u8,
285 page_min: usize,
286 page_max: usize,
287 pub value_mode: ValueMode,
288}
289
290impl IndexConfig {
291 fn serialize(&self, w: &mut dyn InfallibleWrite) {
292 w.write_u8(0);
293 self.serialize_v0(w)
294 }
295 fn serialize_v0(&self, w: &mut dyn InfallibleWrite) {
296 if let Some(ref root) = self.root {
297 w.write_u64(root.page);
298 w.write_u32(root.pos);
299 } else {
300 w.write_u64(0);
301 w.write_u32(0);
302 }
303 w.write_u8(self.key_type);
304 w.write_u8(self.value_type);
305 w.write_u32(self.page_min as u32);
306 w.write_u32(self.page_max as u32);
307 w.write_u8(self.value_mode.to_u8());
308 w.write_u16(self.name.len() as u16);
309 w.write_all(self.name.as_bytes());
310 }
311 fn deserialize(r: &mut dyn InfallibleRead) -> PERes<IndexConfig> {
312 let version = r.read_u8();
313 match version {
314 0u8 => IndexConfig::deserialize_v0(r),
315 _ => panic!("unsupported disk format"),
316 }
317 }
318 fn deserialize_v0(r: &mut dyn InfallibleRead) -> PERes<IndexConfig> {
319 let index_root_page = r.read_u64();
320 let index_root_pos = r.read_u32();
321 let key_type = r.read_u8();
322 let value_type = r.read_u8();
323 let page_min = r.read_u32() as usize;
324 let page_max = r.read_u32() as usize;
325 let value_mode = ValueMode::from(r.read_u8());
326
327 let name_size = r.read_u16() as usize;
328 let mut slice: Vec<u8> = vec![0; name_size];
329 r.read_exact(&mut slice);
330 let name: String = str::from_utf8(&slice[0..name_size])?.into();
331 let root = if index_root_page != 0 && index_root_pos != 0 {
332 Some(RecRef::new(index_root_page, index_root_pos))
333 } else {
334 None
335 };
336 Ok(IndexConfig {
337 name,
338 root,
339 key_type,
340 value_type,
341 page_min,
342 page_max,
343 value_mode,
344 })
345 }
346
347 pub fn check<K: IndexTypeInternal, V: IndexTypeInternal>(&self) -> Result<(), IndexOpsError> {
348 if self.key_type != K::get_id() {
349 Err(IndexOpsError::IndexTypeMismatch("key type".into()))
350 } else if self.value_type != V::get_id() {
351 Err(IndexOpsError::IndexTypeMismatch("value type".into()))
352 } else {
353 Ok(())
354 }
355 }
356 pub fn get_root(&self) -> Option<RecRef> {
357 self.root
358 }
359}
360
361impl Indexes {
362 pub fn new(_config: &Arc<Config>) -> Indexes {
363 Indexes {
364 config_ids: Default::default(),
365 }
366 }
367
368 pub fn create_index<K, V>(
369 p: &PersyImpl,
370 tx: &mut TransactionImpl,
371 name: &str,
372 min: usize,
373 max: usize,
374 value_mode: ValueMode,
375 ) -> Result<(), CreateIndexError>
376 where
377 K: IndexTypeInternal,
378 V: IndexTypeInternal,
379 {
380 debug_assert!(min <= max / 2);
381 let segment_name_meta = format_segment_name_meta(name);
382 p.create_segment(tx, &segment_name_meta)?;
383 let segment_name_data = format_segment_name_data(name);
384 p.create_segment(tx, &segment_name_data)?;
385 let cfg = IndexConfig {
386 name: name.to_string(),
387 root: None,
388 key_type: K::get_id(),
389 value_type: V::get_id(),
390 page_min: min,
391 page_max: max,
392 value_mode,
393 };
394 let mut scfg = Vec::new();
395 cfg.serialize(&mut scfg);
396 p.insert_record(tx, &segment_name_meta, &scfg)?;
397 Ok(())
398 }
399
400 pub fn drop_index(p: &PersyImpl, tx: &mut TransactionImpl, name: &str) -> Result<(), DropIndexError> {
401 let segment_name_meta = format_segment_name_meta(name);
402 p.drop_segment(tx, &segment_name_meta)?;
403 let segment_name_data = format_segment_name_data(name);
404 p.drop_segment(tx, &segment_name_data)?;
405 Ok(())
406 }
407
408 pub fn update_index_root(
409 p: &PersyImpl,
410 tx: &mut TransactionImpl,
411 index_id: &IndexId,
412 root: Option<RecRef>,
413 ) -> Result<(), IndexChangeError> {
414 let mut scan = p.scan_tx(tx, index_id.get_meta_id())?;
415 let metadata = scan.next(p, tx);
416 drop(scan);
417
418 let (id, mut config) = if let Some((rid, content, _)) = metadata {
419 (rid, IndexConfig::deserialize(&mut ArcSliceRead::new_vec(content))?)
420 } else {
421 return Err(IndexChangeError::IndexNotFound);
422 };
423
424 if config.root != root {
425 config.root = root;
426 let mut scfg = Vec::new();
427 config.serialize(&mut scfg);
428 p.update(tx, index_id.get_meta_id(), &id.0, &scfg)?;
429 }
430 Ok(())
431 }
432
433 pub fn get_index_tx(
434 p: &PersyImpl,
435 tx: &TransactionImpl,
436 index_id: &IndexId,
437 ) -> Result<(IndexConfig, u16), IndexError> {
438 let mut scan = p.scan_tx(tx, index_id.get_meta_id())?;
439 let metadata = scan.next(p, tx);
440 drop(scan);
441 if let Some((_, content, version)) = metadata {
442 Ok((IndexConfig::deserialize(&mut ArcSliceRead::new_vec(content))?, version))
443 } else {
444 Err(IndexError::IndexNotFound)
445 }
446 }
447
448 pub fn get_config_id(p: &PersyImpl, tx: &mut TransactionImpl, index_id: &IndexId) -> Result<PersyId, IndexError> {
449 let mut scan = p.scan_tx(tx, index_id.get_meta_id())?;
450 let metadata = scan.next(p, tx);
451 drop(scan);
452 if let Some((id, _, _)) = metadata {
453 Ok(id)
454 } else {
455 Err(IndexError::IndexNotFound)
456 }
457 }
458
459 pub fn get_index(p: &PersyImpl, snapshot_ref: &SnapshotRef, index_id: &IndexId) -> Result<IndexConfig, IndexError> {
460 let mut indexes = p.indexes().config_ids.lock().unwrap();
461 let segment_meta = index_id.get_meta_id();
462 indexes.hit_count += 1;
463 if indexes.hit_count > 1000 {
464 indexes.hit_count = 0;
465 indexes.cache.retain(|i, _| p.exists_segment_by_id(&i.get_meta_id()));
466 }
467 match indexes.cache.entry(index_id.clone()) {
468 Entry::Occupied(o) => {
469 let id = o.get();
470 let info_read = p
471 .read_snap_fn(segment_meta, id, snapshot_ref, |mut c| IndexConfig::deserialize(&mut c))
472 .map_err(IndexError::from)?;
473 if let Some(info) = info_read {
474 Ok(info.map_err(IndexError::from)?)
475 } else {
476 o.remove();
477 Err(IndexError::IndexNotFound)
478 }
479 }
480 Entry::Vacant(v) => {
481 let (id, index) = p
482 .scan_snapshot_index(segment_meta, snapshot_ref)?
483 .next(p)
484 .map(|(id, content)| {
485 Ok((
486 id,
487 IndexConfig::deserialize(&mut ArcSliceRead::new_vec(content)).map_err(IndexError::from)?,
488 ))
489 })
490 .unwrap_or(Err(IndexError::IndexNotFound))?;
491 v.insert(id.0);
492 Ok(index)
493 }
494 }
495 }
496
497 pub fn get_index_keeper<'a, K: IndexTypeInternal, V: IndexTypeInternal>(
498 p: &'a PersyImpl,
499 snapshot: &SnapshotRef,
500 index_id: &IndexId,
501 ) -> Result<IndexSegmentKeeper<'a>, IndexOpsError> {
502 let config = Indexes::get_index(p, snapshot, index_id)?;
503 config.check::<K, V>()?;
504 Ok(IndexSegmentKeeper::new(
505 &config.name,
506 index_id,
507 config.root,
508 p,
509 snapshot,
510 config.value_mode,
511 ))
512 }
513
514 pub fn get_index_keeper_tx_read<'a, K: IndexTypeInternal, V: IndexTypeInternal>(
515 p: &'a PersyImpl,
516 snapshot: &SnapshotRef,
517 tx: &'a mut TransactionImpl,
518 index_id: &IndexId,
519 ) -> Result<IndexSegmentKeeper<'a>, IndexOpsError> {
520 let (config, _) = Indexes::get_index_tx(p, tx, index_id)?;
521 config.check::<K, V>()?;
522 Ok(IndexSegmentKeeper::new(
523 &config.name,
524 index_id,
525 config.root,
526 p,
527 snapshot,
528 config.value_mode,
529 ))
530 }
531
532 pub fn get_index_keeper_tx<'a, K: IndexTypeInternal, V: IndexTypeInternal>(
533 store: ExternalRefs<'a>,
534 index_id: &IndexId,
535 ) -> Result<IndexSegmentKeeperTx<'a, K, V>, IndexOpsError> {
536 let (config, version) = Indexes::get_index_tx(store.persy, store.tx, index_id)?;
537 config.check::<K, V>()?;
538 Ok(IndexSegmentKeeperTx::new(
539 &config.name,
540 index_id,
541 config.root,
542 version,
543 store,
544 config.value_mode,
545 config.page_min,
546 config.page_max,
547 ))
548 }
549
550 pub fn check_index<K: IndexTypeInternal, V: IndexTypeInternal>(
551 p: &PersyImpl,
552 tx: &mut TransactionImpl,
553 index_id: &IndexId,
554 ) -> Result<(), IndexOpsError> {
555 let (config, _version) = Indexes::get_index_tx(p, tx, index_id)?;
556 config.check::<K, V>()
557 }
558}
559
560#[cfg(test)]
561mod tests;