1use std::collections::HashMap;
2use std::hash::Hash;
3use std::mem::size_of;
4
5use crate::Key;
6
7use crate::codec::Codec;
8use crate::compaction::{CompactionIndex, compact_shard};
9use crate::config::Config;
10use crate::disk_loc::DiskLoc;
11use crate::engine::Engine;
12use crate::error::{DbError, DbResult};
13use crate::hook::{NoHook, TypedWriteHook};
14use crate::recovery::recover_typed_map;
15use crate::shard::{GLOBAL_GSN, ShardInner};
16use crate::skiplist::node::TypedData;
17use crate::sync::{self, Mutex, MutexGuard};
18use crate::typed_tree::TypedRef;
19
20pub(crate) struct TypedMapEntry<T> {
25 pub(crate) ptr: *mut TypedData<T>,
26}
27
28unsafe impl<T: Send> Send for TypedMapEntry<T> {}
30unsafe impl<T: Sync> Sync for TypedMapEntry<T> {}
31
32pub struct TypedMap<
80 K: Key + Send + Sync + Hash + Eq,
81 T: Send + Sync,
82 C: Codec<T>,
83 H: TypedWriteHook<K, T> = NoHook,
84> {
85 indexes: Vec<Mutex<HashMap<K, TypedMapEntry<T>>>>,
86 collector: seize::Collector,
87 engine: Engine,
88 codec: C,
89 compaction_threshold: f64,
90 shard_prefix_bits: usize,
91 hook: H,
92}
93
94impl<K: Key + Send + Sync + Hash + Eq, T: Send + Sync, C: Codec<T> + Sync> TypedMap<K, T, C> {
95 pub fn open(path: impl AsRef<std::path::Path>, config: Config, codec: C) -> DbResult<Self> {
98 Self::open_inner(path, config, codec, NoHook)
99 }
100}
101
102impl<K: Key + Send + Sync + Hash + Eq, T: Send + Sync, C: Codec<T> + Sync, H: TypedWriteHook<K, T>>
103 TypedMap<K, T, C, H>
104{
105 pub fn open_hooked(
107 path: impl AsRef<std::path::Path>,
108 config: Config,
109 codec: C,
110 hook: H,
111 ) -> DbResult<Self> {
112 Self::open_inner(path, config, codec, hook)
113 }
114
115 fn open_inner(
116 path: impl AsRef<std::path::Path>,
117 config: Config,
118 codec: C,
119 hook: H,
120 ) -> DbResult<Self> {
121 let compaction_threshold = config.compaction_threshold;
122 let shard_prefix_bits = config.shard_prefix_bits;
123 let engine = Engine::open(path, config)?;
124
125 let shard_count = engine.shards().len();
126 let mut indexes = Vec::with_capacity(shard_count);
127 for _ in 0..shard_count {
128 indexes.push(Mutex::new(HashMap::new()));
129 }
130
131 let map = Self {
132 indexes,
133 collector: seize::Collector::new(),
134 engine,
135 codec,
136 compaction_threshold,
137 shard_prefix_bits,
138 hook,
139 };
140
141 let shard_dirs = map.engine.shard_dirs();
143 let shard_dir_refs = Engine::shard_dir_refs(&shard_dirs);
144 let shard_ids = map.engine.shard_ids();
145
146 let hints = map.engine.hints();
147 let max_gsn = recover_typed_map::<K, T, C>(
148 &shard_dir_refs,
149 &shard_ids,
150 map.indexes(),
151 &map.codec,
152 hints,
153 #[cfg(feature = "encryption")]
154 map.engine.cipher(),
155 )?;
156
157 GLOBAL_GSN.fetch_max(max_gsn + 1, std::sync::atomic::Ordering::Relaxed);
158 if hints {
159 for shard in map.engine.shards().iter() {
160 shard.set_key_len(size_of::<K>());
161 }
162 }
163 tracing::info!(
164 key_size = size_of::<K>(),
165 entries = map.len(),
166 "typed_map recovered"
167 );
168
169 Ok(map)
170 }
171
172 pub fn close(self) -> DbResult<()> {
174 if self.engine.hints() {
175 self.sync_hints()?;
176 }
177 self.engine.flush()
178 }
179
180 pub fn flush_buffers(&self) -> DbResult<()> {
182 self.engine.flush_buffers()
183 }
184
185 pub fn config(&self) -> &Config {
187 self.engine.config()
188 }
189}
190
191impl<
192 K: Key + Send + Sync + Hash + Eq,
193 T: Clone + Send + Sync,
194 C: Codec<T> + Sync,
195 H: TypedWriteHook<K, T>,
196> CompactionIndex<K> for TypedMap<K, T, C, H>
197{
198 fn update_if_match(&self, key: &K, old_loc: DiskLoc, new_loc: DiskLoc) -> bool {
199 let mut index = sync::lock(&self.indexes[self.shard_for(key)]);
200 if let Some(entry) = index.get_mut(key) {
201 let data = unsafe { &*entry.ptr };
202 if data.disk == old_loc {
203 let new_data = Box::into_raw(Box::new(TypedData {
204 disk: new_loc,
205 value: data.value.clone(),
206 }));
207 let old_ptr = entry.ptr;
208 entry.ptr = new_data;
209 unsafe {
210 self.collector
211 .retire(old_ptr, seize::reclaim::boxed::<TypedData<T>>);
212 }
213 return true;
214 }
215 }
216 false
217 }
218
219 fn contains_key(&self, key: &K) -> bool {
220 self.contains(key)
221 }
222}
223
224impl<K: Key + Send + Sync + Hash + Eq, T: Send + Sync, C: Codec<T> + Sync, H: TypedWriteHook<K, T>>
225 TypedMap<K, T, C, H>
226{
227 pub fn compact(&self) -> DbResult<usize>
229 where
230 T: Clone,
231 {
232 let mut total_compacted = 0;
233 for shard in self.engine.shards().iter() {
234 total_compacted += compact_shard(shard, self, self.compaction_threshold)?;
235 }
236 Ok(total_compacted)
237 }
238
239 pub fn get(&self, key: &K) -> Option<TypedRef<'_, T>> {
244 metrics::counter!("armdb.ops", "op" => "get", "tree" => "typed_map").increment(1);
245 let guard = self.collector.enter();
246 let data_ptr = {
247 let index = sync::lock(&self.indexes[self.shard_for(key)]);
248 let entry = index.get(key)?;
249 entry.ptr as *const TypedData<T>
250 };
251 Some(TypedRef::new(guard, data_ptr))
252 }
253
254 pub fn get_or_err(&self, key: &K) -> DbResult<TypedRef<'_, T>> {
256 self.get(key).ok_or(DbError::KeyNotFound)
257 }
258
259 pub fn contains(&self, key: &K) -> bool {
261 let index = sync::lock(&self.indexes[self.shard_for(key)]);
262 index.contains_key(key)
263 }
264
265 pub fn put(&self, key: &K, value: T) -> DbResult<Option<TypedRef<'_, T>>> {
270 metrics::counter!("armdb.ops", "op" => "put", "tree" => "typed_map").increment(1);
271 let shard_id = self.shard_for(key);
272 let mut inner = self.engine.shards()[shard_id].lock();
273 let mut index = sync::lock(&self.indexes[shard_id]);
274 let guard = self.collector.enter();
275 self.put_locked(shard_id, &mut inner, &mut index, guard, key, value)
276 }
277
278 pub fn insert(&self, key: &K, value: T) -> DbResult<()> {
281 metrics::counter!("armdb.ops", "op" => "insert", "tree" => "typed_map").increment(1);
282 let shard_id = self.shard_for(key);
283 let mut inner = self.engine.shards()[shard_id].lock();
284 let mut index = sync::lock(&self.indexes[shard_id]);
285 let guard = self.collector.enter();
286 self.insert_locked(shard_id, &mut inner, &mut index, &guard, key, value)
287 }
288
289 pub fn delete(&self, key: &K) -> DbResult<Option<TypedRef<'_, T>>> {
291 metrics::counter!("armdb.ops", "op" => "delete", "tree" => "typed_map").increment(1);
292 let shard_id = self.shard_for(key);
293 let mut inner = self.engine.shards()[shard_id].lock();
294 let mut index = sync::lock(&self.indexes[shard_id]);
295 let guard = self.collector.enter();
296 self.delete_locked(shard_id, &mut inner, &mut index, guard, key)
297 }
298
299 pub fn cas(&self, key: &K, expected: &T, new_value: T) -> DbResult<()>
303 where
304 T: PartialEq,
305 {
306 metrics::counter!("armdb.ops", "op" => "cas", "tree" => "typed_map").increment(1);
307 let shard_id = self.shard_for(key);
308 let mut inner = self.engine.shards()[shard_id].lock();
309 let mut index = sync::lock(&self.indexes[shard_id]);
310
311 let entry = index.get(key).ok_or(DbError::KeyNotFound)?;
312 let current_data = unsafe { &*entry.ptr };
313 if current_data.value != *expected {
314 return Err(DbError::CasMismatch);
315 }
316
317 let mut buf = Vec::new();
318 self.codec.encode_to(&new_value, &mut buf);
319 let (disk_loc, _gsn) = inner.append_entry(shard_id as u8, key.as_bytes(), &buf, false)?;
320
321 let new_data = Box::new(TypedData {
322 disk: disk_loc,
323 value: new_value,
324 });
325
326 self.hook
327 .on_write(key, Some(¤t_data.value), Some(&new_data.value));
328
329 let new_data_ptr = Box::into_raw(new_data);
330 let old_ptr = entry.ptr;
331 index.get_mut(key).expect("key exists").ptr = new_data_ptr;
333
334 let old_disk = unsafe { (*old_ptr).disk };
335 inner.add_dead_bytes(
336 old_disk.file_id as u32,
337 crate::entry::entry_size(size_of::<K>(), old_disk.len),
338 );
339 unsafe {
340 self.collector
341 .retire(old_ptr, seize::reclaim::boxed::<TypedData<T>>);
342 }
343
344 Ok(())
345 }
346
347 pub fn update(&self, key: &K, f: impl FnOnce(&T) -> T) -> DbResult<Option<TypedRef<'_, T>>> {
351 self.update_inner(key, f, false)
352 }
353
354 pub fn fetch_update(
356 &self,
357 key: &K,
358 f: impl FnOnce(&T) -> T,
359 ) -> DbResult<Option<TypedRef<'_, T>>> {
360 self.update_inner(key, f, true)
361 }
362
363 fn update_inner(
364 &self,
365 key: &K,
366 f: impl FnOnce(&T) -> T,
367 return_old: bool,
368 ) -> DbResult<Option<TypedRef<'_, T>>> {
369 metrics::counter!("armdb.ops", "op" => "update", "tree" => "typed_map").increment(1);
370 let shard_id = self.shard_for(key);
371 let mut inner = self.engine.shards()[shard_id].lock();
372 let mut index = sync::lock(&self.indexes[shard_id]);
373
374 let entry = match index.get(key) {
375 Some(e) => e,
376 None => return Ok(None),
377 };
378
379 let old_data = unsafe { &*entry.ptr };
380 let new_value = f(&old_data.value);
381
382 let mut buf = Vec::new();
383 self.codec.encode_to(&new_value, &mut buf);
384 let (disk_loc, _gsn) = inner.append_entry(shard_id as u8, key.as_bytes(), &buf, false)?;
385
386 let new_data = Box::new(TypedData {
387 disk: disk_loc,
388 value: new_value,
389 });
390
391 self.hook
392 .on_write(key, Some(&old_data.value), Some(&new_data.value));
393
394 let new_data_ptr = Box::into_raw(new_data);
395 let old_ptr = entry.ptr;
396 index.get_mut(key).expect("key exists").ptr = new_data_ptr;
397
398 let old_disk = unsafe { (*old_ptr).disk };
399 inner.add_dead_bytes(
400 old_disk.file_id as u32,
401 crate::entry::entry_size(size_of::<K>(), old_disk.len),
402 );
403 unsafe {
404 self.collector
405 .retire(old_ptr, seize::reclaim::boxed::<TypedData<T>>);
406 }
407
408 let data = if return_old {
409 old_ptr as *const TypedData<T>
410 } else {
411 new_data_ptr as *const TypedData<T>
412 };
413 Ok(Some(TypedRef::new(self.collector.enter(), data)))
414 }
415
416 pub fn atomic<R>(
422 &self,
423 shard_key: &K,
424 f: impl FnOnce(&mut TypedMapShard<'_, K, T, C, H>) -> DbResult<R>,
425 ) -> DbResult<R> {
426 let shard_id = self.shard_for(shard_key);
427 let inner = self.engine.shards()[shard_id].lock();
428 let index = sync::lock(&self.indexes[shard_id]);
429 let guard = self.collector.enter();
430 let mut shard = TypedMapShard {
431 map: self,
432 inner,
433 index,
434 shard_id,
435 guard,
436 };
437 f(&mut shard)
438 }
439
440 pub fn len(&self) -> usize {
443 self.indexes.iter().map(|m| sync::lock(m).len()).sum()
444 }
445
446 pub fn is_empty(&self) -> bool {
447 self.indexes.iter().all(|m| sync::lock(m).is_empty())
448 }
449
450 pub fn sync_hints(&self) -> DbResult<()> {
452 for shard in self.engine.shards().iter() {
453 shard.write_active_hint(size_of::<K>())?;
454 }
455 Ok(())
456 }
457
458 pub fn shard_for(&self, key: &K) -> usize {
459 if self.shard_prefix_bits == 0 || self.shard_prefix_bits >= size_of::<K>() * 8 {
460 let hash = xxhash_rust::xxh3::xxh3_64(key.as_bytes());
461 return (hash as usize) % self.engine.shards().len();
462 }
463
464 let full_bytes = self.shard_prefix_bits / 8;
465 let extra_bits = self.shard_prefix_bits % 8;
466
467 let hash = if extra_bits == 0 {
468 xxhash_rust::xxh3::xxh3_64(&key.as_bytes()[..full_bytes])
469 } else {
470 let mut buf = K::zeroed();
471 buf.as_bytes_mut()[..full_bytes].copy_from_slice(&key.as_bytes()[..full_bytes]);
472 let mask = !((1u8 << (8 - extra_bits)) - 1);
473 buf.as_bytes_mut()[full_bytes] = key.as_bytes()[full_bytes] & mask;
474 xxhash_rust::xxh3::xxh3_64(&buf.as_bytes()[..full_bytes + 1])
475 };
476
477 (hash as usize) % self.engine.shards().len()
478 }
479
480 pub fn migrate(&self, f: impl Fn(&K, &T) -> crate::MigrateAction<T>) -> DbResult<usize> {
491 use crate::MigrateAction;
492
493 let mut count = 0;
494 for i in 0..self.engine.shards().len() {
495 let keys: Vec<K> = {
496 let index = sync::lock(&self.indexes[i]);
497 index.keys().copied().collect()
498 };
499 for key in keys {
500 let value_ref = match self.get(&key) {
501 Some(v) => v,
502 None => continue,
503 };
504 let action = f(&key, &*value_ref);
505 drop(value_ref);
506
507 match action {
508 MigrateAction::Keep => {
509 if H::NEEDS_INIT
510 && let Some(v) = self.get(&key)
511 {
512 self.hook.on_init(&key, &*v);
513 }
514 }
515 MigrateAction::Update(value) => {
516 if H::NEEDS_INIT {
517 self.hook.on_init(&key, &value);
518 }
519 let shard_id = self.shard_for(&key);
520 let mut inner = self.engine.shards()[shard_id].lock();
521 let mut index = sync::lock(&self.indexes[shard_id]);
522 let guard = self.collector.enter();
523 self.put_locked_inner::<false>(
524 shard_id, &mut inner, &mut index, guard, &key, value,
525 )?;
526 count += 1;
527 }
528 MigrateAction::Delete => {
529 let shard_id = self.shard_for(&key);
530 let mut inner = self.engine.shards()[shard_id].lock();
531 let mut index = sync::lock(&self.indexes[shard_id]);
532 let guard = self.collector.enter();
533 self.delete_locked_inner::<false>(
534 shard_id, &mut inner, &mut index, guard, &key,
535 )?;
536 count += 1;
537 }
538 }
539 }
540 }
541
542 tracing::info!(mutations = count, "typed_map migration complete");
543 Ok(count)
544 }
545
546 pub(crate) fn replay_init(&self) {
548 if !H::NEEDS_INIT {
549 return;
550 }
551 for shard in &self.indexes {
552 let index = sync::lock(shard);
553 for (key, entry) in index.iter() {
554 let data = unsafe { &*entry.ptr };
555 self.hook.on_init(key, &data.value);
556 }
557 }
558 }
559
560 pub(crate) fn indexes(&self) -> &[Mutex<HashMap<K, TypedMapEntry<T>>>] {
563 &self.indexes
564 }
565
566 fn put_locked<'g>(
567 &self,
568 shard_id: usize,
569 inner: &mut ShardInner,
570 index: &mut HashMap<K, TypedMapEntry<T>>,
571 guard: seize::LocalGuard<'g>,
572 key: &K,
573 value: T,
574 ) -> DbResult<Option<TypedRef<'g, T>>> {
575 self.put_locked_inner::<true>(shard_id, inner, index, guard, key, value)
576 }
577
578 fn put_locked_inner<'g, const HOOKS: bool>(
579 &self,
580 shard_id: usize,
581 inner: &mut ShardInner,
582 index: &mut HashMap<K, TypedMapEntry<T>>,
583 guard: seize::LocalGuard<'g>,
584 key: &K,
585 value: T,
586 ) -> DbResult<Option<TypedRef<'g, T>>> {
587 let mut buf = Vec::new();
588 self.codec.encode_to(&value, &mut buf);
589 let (disk_loc, _gsn) = inner.append_entry(shard_id as u8, key.as_bytes(), &buf, false)?;
590
591 let new_data_ptr = Box::into_raw(Box::new(TypedData {
592 disk: disk_loc,
593 value,
594 }));
595
596 if let Some(old_entry) = index.insert(*key, TypedMapEntry { ptr: new_data_ptr }) {
597 let old_data = old_entry.ptr as *const TypedData<T>;
598
599 if HOOKS {
600 self.hook.on_write(
601 key,
602 Some(unsafe { &(*old_data).value }),
603 Some(unsafe { &(*new_data_ptr).value }),
604 );
605 }
606
607 let old_disk = unsafe { (*old_entry.ptr).disk };
608 inner.add_dead_bytes(
609 old_disk.file_id as u32,
610 crate::entry::entry_size(size_of::<K>(), old_disk.len),
611 );
612 unsafe {
613 self.collector
614 .retire(old_entry.ptr, seize::reclaim::boxed::<TypedData<T>>);
615 }
616
617 Ok(Some(TypedRef::new(guard, old_data)))
618 } else {
619 if HOOKS {
620 self.hook
621 .on_write(key, None, Some(unsafe { &(*new_data_ptr).value }));
622 }
623 Ok(None)
624 }
625 }
626
627 fn insert_locked(
628 &self,
629 shard_id: usize,
630 inner: &mut ShardInner,
631 index: &mut HashMap<K, TypedMapEntry<T>>,
632 _guard: &seize::LocalGuard<'_>,
633 key: &K,
634 value: T,
635 ) -> DbResult<()> {
636 if index.contains_key(key) {
637 return Err(DbError::KeyExists);
638 }
639
640 let mut buf = Vec::new();
641 self.codec.encode_to(&value, &mut buf);
642 let (disk_loc, _gsn) = inner.append_entry(shard_id as u8, key.as_bytes(), &buf, false)?;
643
644 let new_data_ptr = Box::into_raw(Box::new(TypedData {
645 disk: disk_loc,
646 value,
647 }));
648
649 index.insert(*key, TypedMapEntry { ptr: new_data_ptr });
650 self.hook
651 .on_write(key, None, Some(unsafe { &(*new_data_ptr).value }));
652 Ok(())
653 }
654
655 fn delete_locked<'g>(
656 &self,
657 shard_id: usize,
658 inner: &mut ShardInner,
659 index: &mut HashMap<K, TypedMapEntry<T>>,
660 guard: seize::LocalGuard<'g>,
661 key: &K,
662 ) -> DbResult<Option<TypedRef<'g, T>>> {
663 self.delete_locked_inner::<true>(shard_id, inner, index, guard, key)
664 }
665
666 fn delete_locked_inner<'g, const HOOKS: bool>(
667 &self,
668 shard_id: usize,
669 inner: &mut ShardInner,
670 index: &mut HashMap<K, TypedMapEntry<T>>,
671 guard: seize::LocalGuard<'g>,
672 key: &K,
673 ) -> DbResult<Option<TypedRef<'g, T>>> {
674 let entry = match index.remove(key) {
675 Some(e) => e,
676 None => return Ok(None),
677 };
678
679 let old_data = entry.ptr as *const TypedData<T>;
680
681 if HOOKS {
682 self.hook
683 .on_write(key, Some(unsafe { &(*old_data).value }), None);
684 }
685
686 inner.append_entry(shard_id as u8, key.as_bytes(), &[], true)?;
687
688 let old_disk = unsafe { (*entry.ptr).disk };
689 inner.add_dead_bytes(
690 old_disk.file_id as u32,
691 crate::entry::entry_size(size_of::<K>(), old_disk.len),
692 );
693 unsafe {
694 self.collector
695 .retire(entry.ptr, seize::reclaim::boxed::<TypedData<T>>);
696 }
697
698 Ok(Some(TypedRef::new(guard, old_data)))
699 }
700}
701
702impl<K: Key + Send + Sync + Hash + Eq, T: Send + Sync, C: Codec<T>, H: TypedWriteHook<K, T>> Drop
703 for TypedMap<K, T, C, H>
704{
705 fn drop(&mut self) {
706 for index_mutex in &self.indexes {
707 let map = sync::lock(index_mutex);
708 for (_, entry) in map.iter() {
709 unsafe {
710 drop(Box::from_raw(entry.ptr));
711 }
712 }
713 }
714 }
715}
716
717pub struct TypedMapShard<
725 'a,
726 K: Key + Send + Sync + Hash + Eq,
727 T: Send + Sync,
728 C: Codec<T>,
729 H: TypedWriteHook<K, T> = NoHook,
730> {
731 map: &'a TypedMap<K, T, C, H>,
732 inner: MutexGuard<'a, ShardInner>,
733 index: MutexGuard<'a, HashMap<K, TypedMapEntry<T>>>,
734 shard_id: usize,
735 guard: seize::LocalGuard<'a>,
736}
737
738impl<K: Key + Send + Sync + Hash + Eq, T: Send + Sync, C: Codec<T> + Sync, H: TypedWriteHook<K, T>>
739 TypedMapShard<'_, K, T, C, H>
740{
741 pub fn put(&mut self, key: &K, value: T) -> DbResult<Option<TypedRef<'_, T>>> {
742 self.check_shard(key)?;
743 let guard = self.map.collector.enter();
744 self.map.put_locked(
745 self.shard_id,
746 &mut self.inner,
747 &mut self.index,
748 guard,
749 key,
750 value,
751 )
752 }
753
754 pub fn insert(&mut self, key: &K, value: T) -> DbResult<()> {
755 self.check_shard(key)?;
756 self.map.insert_locked(
757 self.shard_id,
758 &mut self.inner,
759 &mut self.index,
760 &self.guard,
761 key,
762 value,
763 )
764 }
765
766 pub fn delete(&mut self, key: &K) -> DbResult<Option<TypedRef<'_, T>>> {
767 self.check_shard(key)?;
768 let guard = self.map.collector.enter();
769 self.map
770 .delete_locked(self.shard_id, &mut self.inner, &mut self.index, guard, key)
771 }
772
773 pub fn get(&self, key: &K) -> Option<&T> {
774 let entry = self.index.get(key)?;
775 Some(unsafe { &(*entry.ptr).value })
776 }
777
778 pub fn get_or_err(&self, key: &K) -> DbResult<&T> {
779 self.get(key).ok_or(DbError::KeyNotFound)
780 }
781
782 pub fn contains(&self, key: &K) -> bool {
783 self.index.contains_key(key)
784 }
785
786 fn check_shard(&self, key: &K) -> DbResult<()> {
787 if self.map.shard_for(key) != self.shard_id {
788 return Err(DbError::ShardMismatch);
789 }
790 Ok(())
791 }
792}
793
794#[cfg(feature = "armour")]
795impl<T, C, H> crate::armour::collection::Collection for TypedMap<T::SelfId, T, C, H>
796where
797 T: crate::CollectionMeta + Clone + Send + Sync,
798 C: crate::Codec<T> + Sync,
799 H: crate::hook::TypedWriteHook<T::SelfId, T>,
800 T::SelfId: crate::Key + Send + Sync + std::hash::Hash + Eq,
801{
802 fn name(&self) -> &str {
803 T::NAME
804 }
805 fn len(&self) -> usize {
806 self.len()
807 }
808 fn compact(&self) -> crate::DbResult<usize> {
809 self.compact()
810 }
811}