Skip to main content

armdb/
typed_map.rs

1use std::collections::HashMap;
2use std::hash::Hash;
3use std::mem::size_of;
4
5use crate::Key;
6
7use crate::codec::Codec;
8use crate::compaction::{CompactionIndex, compact_shard};
9use crate::config::Config;
10use crate::disk_loc::DiskLoc;
11use crate::engine::Engine;
12use crate::error::{DbError, DbResult};
13use crate::hook::{NoHook, TypedWriteHook};
14use crate::recovery::recover_typed_map;
15use crate::shard::{GLOBAL_GSN, ShardInner};
16use crate::skiplist::node::TypedData;
17use crate::sync::{self, Mutex, MutexGuard};
18use crate::typed_tree::TypedRef;
19
20// ---------------------------------------------------------------------------
21// TypedMapEntry
22// ---------------------------------------------------------------------------
23
24pub(crate) struct TypedMapEntry<T> {
25    pub(crate) ptr: *mut TypedData<T>,
26}
27
28// SAFETY: TypedData<T> is Send+Sync when T is, and we manage the pointer lifetime via seize.
29unsafe impl<T: Send> Send for TypedMapEntry<T> {}
30unsafe impl<T: Sync> Sync for TypedMapEntry<T> {}
31
32// ---------------------------------------------------------------------------
33// TypedMap
34// ---------------------------------------------------------------------------
35
36/// A map with fixed-size keys and typed values `T`. Values are encoded via
37/// a [`Codec`] for disk persistence but stored as `T` in memory — reads never
38/// touch disk and return [`TypedRef<T>`](TypedRef) (guard-protected reference).
39///
40/// Each `TypedMap` owns its storage engine — one map = one database directory.
41///
42/// # Clone-free reads
43///
44/// Returns [`TypedRef<T>`](TypedRef) — a guard-protected reference. The `seize` guard
45/// prevents reclamation of the old data while the reference exists.
46/// `put()`, `delete()`, and `update()` return the old value **without requiring `T: Clone`**.
47///
48/// The only method that requires `T: Clone` is [`compact()`](Self::compact).
49///
50/// # Write hooks
51///
52/// Uses [`TypedWriteHook<K, T>`] — the hook receives `&T` directly (not encoded bytes).
53/// `on_write` fires on `put`/`insert`/`delete`/`cas`/`update`.
54/// Does **not** fire inside `atomic()`. Old value is always provided (it lives in
55/// memory) — `NEEDS_OLD_VALUE` is ignored.
56///
57/// `on_init` fires once per live entry during `migrate()` or `replay_init()`
58/// (enable via `NEEDS_INIT = true`).
59///
60/// # No ordered iteration
61///
62/// HashMap-based, O(1) lookup. Use [`TypedTree`](crate::TypedTree) if you need
63/// prefix/range scans.
64///
65/// # Usage
66///
67/// ```ignore
68/// let map = TypedMap::<[u8; 16], MyValue, RapiraCodec>::open(
69///     "data/sessions",
70///     Config::default(),
71///     RapiraCodec,
72/// )?;
73/// map.put(&key, value)?;
74/// if let Some(r) = map.get(&key) {
75///     println!("{:?}", &*r);  // TypedRef<MyValue> derefs to &MyValue
76/// }
77/// map.close()?;
78/// ```
79pub struct TypedMap<
80    K: Key + Send + Sync + Hash + Eq,
81    T: Send + Sync,
82    C: Codec<T>,
83    H: TypedWriteHook<K, T> = NoHook,
84> {
85    indexes: Vec<Mutex<HashMap<K, TypedMapEntry<T>>>>,
86    collector: seize::Collector,
87    engine: Engine,
88    codec: C,
89    compaction_threshold: f64,
90    shard_prefix_bits: usize,
91    hook: H,
92}
93
94impl<K: Key + Send + Sync + Hash + Eq, T: Send + Sync, C: Codec<T> + Sync> TypedMap<K, T, C> {
95    /// Open or create a `TypedMap` at the given path.
96    /// Recovers the index from existing data files on disk.
97    pub fn open(path: impl AsRef<std::path::Path>, config: Config, codec: C) -> DbResult<Self> {
98        Self::open_inner(path, config, codec, NoHook)
99    }
100}
101
102impl<K: Key + Send + Sync + Hash + Eq, T: Send + Sync, C: Codec<T> + Sync, H: TypedWriteHook<K, T>>
103    TypedMap<K, T, C, H>
104{
105    /// Open or create a `TypedMap` with a write hook for secondary index maintenance.
106    pub fn open_hooked(
107        path: impl AsRef<std::path::Path>,
108        config: Config,
109        codec: C,
110        hook: H,
111    ) -> DbResult<Self> {
112        Self::open_inner(path, config, codec, hook)
113    }
114
115    fn open_inner(
116        path: impl AsRef<std::path::Path>,
117        config: Config,
118        codec: C,
119        hook: H,
120    ) -> DbResult<Self> {
121        let compaction_threshold = config.compaction_threshold;
122        let shard_prefix_bits = config.shard_prefix_bits;
123        let engine = Engine::open(path, config)?;
124
125        let shard_count = engine.shards().len();
126        let mut indexes = Vec::with_capacity(shard_count);
127        for _ in 0..shard_count {
128            indexes.push(Mutex::new(HashMap::new()));
129        }
130
131        let map = Self {
132            indexes,
133            collector: seize::Collector::new(),
134            engine,
135            codec,
136            compaction_threshold,
137            shard_prefix_bits,
138            hook,
139        };
140
141        // Recover index from disk
142        let shard_dirs = map.engine.shard_dirs();
143        let shard_dir_refs = Engine::shard_dir_refs(&shard_dirs);
144        let shard_ids = map.engine.shard_ids();
145
146        let hints = map.engine.hints();
147        let max_gsn = recover_typed_map::<K, T, C>(
148            &shard_dir_refs,
149            &shard_ids,
150            map.indexes(),
151            &map.codec,
152            hints,
153            #[cfg(feature = "encryption")]
154            map.engine.cipher(),
155        )?;
156
157        GLOBAL_GSN.fetch_max(max_gsn + 1, std::sync::atomic::Ordering::Relaxed);
158        if hints {
159            for shard in map.engine.shards().iter() {
160                shard.set_key_len(size_of::<K>());
161            }
162        }
163        tracing::info!(
164            key_size = size_of::<K>(),
165            entries = map.len(),
166            "typed_map recovered"
167        );
168
169        Ok(map)
170    }
171
172    /// Graceful shutdown: write hint files (if enabled), flush write buffers + fsync.
173    pub fn close(self) -> DbResult<()> {
174        if self.engine.hints() {
175            self.sync_hints()?;
176        }
177        self.engine.flush()
178    }
179
180    /// Flush all shard write buffers to disk (without fsync).
181    pub fn flush_buffers(&self) -> DbResult<()> {
182        self.engine.flush_buffers()
183    }
184
185    /// Get the database configuration.
186    pub fn config(&self) -> &Config {
187        self.engine.config()
188    }
189}
190
191impl<
192    K: Key + Send + Sync + Hash + Eq,
193    T: Clone + Send + Sync,
194    C: Codec<T> + Sync,
195    H: TypedWriteHook<K, T>,
196> CompactionIndex<K> for TypedMap<K, T, C, H>
197{
198    fn update_if_match(&self, key: &K, old_loc: DiskLoc, new_loc: DiskLoc) -> bool {
199        let mut index = sync::lock(&self.indexes[self.shard_for(key)]);
200        if let Some(entry) = index.get_mut(key) {
201            let data = unsafe { &*entry.ptr };
202            if data.disk == old_loc {
203                let new_data = Box::into_raw(Box::new(TypedData {
204                    disk: new_loc,
205                    value: data.value.clone(),
206                }));
207                let old_ptr = entry.ptr;
208                entry.ptr = new_data;
209                unsafe {
210                    self.collector
211                        .retire(old_ptr, seize::reclaim::boxed::<TypedData<T>>);
212                }
213                return true;
214            }
215        }
216        false
217    }
218
219    fn contains_key(&self, key: &K) -> bool {
220        self.contains(key)
221    }
222}
223
224impl<K: Key + Send + Sync + Hash + Eq, T: Send + Sync, C: Codec<T> + Sync, H: TypedWriteHook<K, T>>
225    TypedMap<K, T, C, H>
226{
227    /// Trigger a compaction pass across all shards.
228    pub fn compact(&self) -> DbResult<usize>
229    where
230        T: Clone,
231    {
232        let mut total_compacted = 0;
233        for shard in self.engine.shards().iter() {
234            total_compacted += compact_shard(shard, self, self.compaction_threshold)?;
235        }
236        Ok(total_compacted)
237    }
238
239    // -- Reads ----------------------------------------------------------------
240
241    /// Get a guard-protected reference to a value by key. O(1) lookup.
242    /// Brief shard lock to read pointer, then lock-free access via seize guard.
243    pub fn get(&self, key: &K) -> Option<TypedRef<'_, T>> {
244        metrics::counter!("armdb.ops", "op" => "get", "tree" => "typed_map").increment(1);
245        let guard = self.collector.enter();
246        let data_ptr = {
247            let index = sync::lock(&self.indexes[self.shard_for(key)]);
248            let entry = index.get(key)?;
249            entry.ptr as *const TypedData<T>
250        };
251        Some(TypedRef::new(guard, data_ptr))
252    }
253
254    /// Get a guard-protected reference to a value by key, returning `Err(KeyNotFound)` if absent.
255    pub fn get_or_err(&self, key: &K) -> DbResult<TypedRef<'_, T>> {
256        self.get(key).ok_or(DbError::KeyNotFound)
257    }
258
259    /// Check if a key exists.
260    pub fn contains(&self, key: &K) -> bool {
261        let index = sync::lock(&self.indexes[self.shard_for(key)]);
262        index.contains_key(key)
263    }
264
265    // -- Writes ---------------------------------------------------------------
266
267    /// Insert or update a key-value pair. Returns a `TypedRef` to the old value
268    /// if the key existed (valid while the guard lives).
269    pub fn put(&self, key: &K, value: T) -> DbResult<Option<TypedRef<'_, T>>> {
270        metrics::counter!("armdb.ops", "op" => "put", "tree" => "typed_map").increment(1);
271        let shard_id = self.shard_for(key);
272        let mut inner = self.engine.shards()[shard_id].lock();
273        let mut index = sync::lock(&self.indexes[shard_id]);
274        let guard = self.collector.enter();
275        self.put_locked(shard_id, &mut inner, &mut index, guard, key, value)
276    }
277
278    /// Insert a key-value pair only if the key does not exist.
279    /// Returns `Err(KeyExists)` if the key is already present.
280    pub fn insert(&self, key: &K, value: T) -> DbResult<()> {
281        metrics::counter!("armdb.ops", "op" => "insert", "tree" => "typed_map").increment(1);
282        let shard_id = self.shard_for(key);
283        let mut inner = self.engine.shards()[shard_id].lock();
284        let mut index = sync::lock(&self.indexes[shard_id]);
285        let guard = self.collector.enter();
286        self.insert_locked(shard_id, &mut inner, &mut index, &guard, key, value)
287    }
288
289    /// Delete a key. Returns a `TypedRef` to the old value if the key existed.
290    pub fn delete(&self, key: &K) -> DbResult<Option<TypedRef<'_, T>>> {
291        metrics::counter!("armdb.ops", "op" => "delete", "tree" => "typed_map").increment(1);
292        let shard_id = self.shard_for(key);
293        let mut inner = self.engine.shards()[shard_id].lock();
294        let mut index = sync::lock(&self.indexes[shard_id]);
295        let guard = self.collector.enter();
296        self.delete_locked(shard_id, &mut inner, &mut index, guard, key)
297    }
298
299    /// Compare-and-swap: if current value == expected, replace with new_value.
300    /// Returns `Ok(())` on success, `Err(CasMismatch)` if current != expected,
301    /// `Err(KeyNotFound)` if key doesn't exist.
302    pub fn cas(&self, key: &K, expected: &T, new_value: T) -> DbResult<()>
303    where
304        T: PartialEq,
305    {
306        metrics::counter!("armdb.ops", "op" => "cas", "tree" => "typed_map").increment(1);
307        let shard_id = self.shard_for(key);
308        let mut inner = self.engine.shards()[shard_id].lock();
309        let mut index = sync::lock(&self.indexes[shard_id]);
310
311        let entry = index.get(key).ok_or(DbError::KeyNotFound)?;
312        let current_data = unsafe { &*entry.ptr };
313        if current_data.value != *expected {
314            return Err(DbError::CasMismatch);
315        }
316
317        let mut buf = Vec::new();
318        self.codec.encode_to(&new_value, &mut buf);
319        let (disk_loc, _gsn) = inner.append_entry(shard_id as u8, key.as_bytes(), &buf, false)?;
320
321        let new_data = Box::new(TypedData {
322            disk: disk_loc,
323            value: new_value,
324        });
325
326        self.hook
327            .on_write(key, Some(&current_data.value), Some(&new_data.value));
328
329        let new_data_ptr = Box::into_raw(new_data);
330        let old_ptr = entry.ptr;
331        // Update entry in-place
332        index.get_mut(key).expect("key exists").ptr = new_data_ptr;
333
334        let old_disk = unsafe { (*old_ptr).disk };
335        inner.add_dead_bytes(
336            old_disk.file_id as u32,
337            crate::entry::entry_size(size_of::<K>(), old_disk.len),
338        );
339        unsafe {
340            self.collector
341                .retire(old_ptr, seize::reclaim::boxed::<TypedData<T>>);
342        }
343
344        Ok(())
345    }
346
347    /// Atomically read-modify-write. Returns `Some(TypedRef)` to the **new** value
348    /// if key existed, `None` otherwise.
349    /// The closure must not be heavy (shard lock is held).
350    pub fn update(&self, key: &K, f: impl FnOnce(&T) -> T) -> DbResult<Option<TypedRef<'_, T>>> {
351        self.update_inner(key, f, false)
352    }
353
354    /// Like [`update()`](Self::update), but returns `Some(TypedRef)` to the **old** value.
355    pub fn fetch_update(
356        &self,
357        key: &K,
358        f: impl FnOnce(&T) -> T,
359    ) -> DbResult<Option<TypedRef<'_, T>>> {
360        self.update_inner(key, f, true)
361    }
362
363    fn update_inner(
364        &self,
365        key: &K,
366        f: impl FnOnce(&T) -> T,
367        return_old: bool,
368    ) -> DbResult<Option<TypedRef<'_, T>>> {
369        metrics::counter!("armdb.ops", "op" => "update", "tree" => "typed_map").increment(1);
370        let shard_id = self.shard_for(key);
371        let mut inner = self.engine.shards()[shard_id].lock();
372        let mut index = sync::lock(&self.indexes[shard_id]);
373
374        let entry = match index.get(key) {
375            Some(e) => e,
376            None => return Ok(None),
377        };
378
379        let old_data = unsafe { &*entry.ptr };
380        let new_value = f(&old_data.value);
381
382        let mut buf = Vec::new();
383        self.codec.encode_to(&new_value, &mut buf);
384        let (disk_loc, _gsn) = inner.append_entry(shard_id as u8, key.as_bytes(), &buf, false)?;
385
386        let new_data = Box::new(TypedData {
387            disk: disk_loc,
388            value: new_value,
389        });
390
391        self.hook
392            .on_write(key, Some(&old_data.value), Some(&new_data.value));
393
394        let new_data_ptr = Box::into_raw(new_data);
395        let old_ptr = entry.ptr;
396        index.get_mut(key).expect("key exists").ptr = new_data_ptr;
397
398        let old_disk = unsafe { (*old_ptr).disk };
399        inner.add_dead_bytes(
400            old_disk.file_id as u32,
401            crate::entry::entry_size(size_of::<K>(), old_disk.len),
402        );
403        unsafe {
404            self.collector
405                .retire(old_ptr, seize::reclaim::boxed::<TypedData<T>>);
406        }
407
408        let data = if return_old {
409            old_ptr as *const TypedData<T>
410        } else {
411            new_data_ptr as *const TypedData<T>
412        };
413        Ok(Some(TypedRef::new(self.collector.enter(), data)))
414    }
415
416    // -- Atomic ---------------------------------------------------------------
417
418    /// Atomically execute multiple operations on a single shard.
419    /// All keys must route to the same shard as `shard_key`.
420    /// The closure must be short — shard lock is held for its duration.
421    pub fn atomic<R>(
422        &self,
423        shard_key: &K,
424        f: impl FnOnce(&mut TypedMapShard<'_, K, T, C, H>) -> DbResult<R>,
425    ) -> DbResult<R> {
426        let shard_id = self.shard_for(shard_key);
427        let inner = self.engine.shards()[shard_id].lock();
428        let index = sync::lock(&self.indexes[shard_id]);
429        let guard = self.collector.enter();
430        let mut shard = TypedMapShard {
431            map: self,
432            inner,
433            index,
434            shard_id,
435            guard,
436        };
437        f(&mut shard)
438    }
439
440    // -- Info -----------------------------------------------------------------
441
442    pub fn len(&self) -> usize {
443        self.indexes.iter().map(|m| sync::lock(m).len()).sum()
444    }
445
446    pub fn is_empty(&self) -> bool {
447        self.indexes.iter().all(|m| sync::lock(m).is_empty())
448    }
449
450    /// Write hint files for all active shard files. Call during graceful shutdown.
451    pub fn sync_hints(&self) -> DbResult<()> {
452        for shard in self.engine.shards().iter() {
453            shard.write_active_hint(size_of::<K>())?;
454        }
455        Ok(())
456    }
457
458    pub fn shard_for(&self, key: &K) -> usize {
459        if self.shard_prefix_bits == 0 || self.shard_prefix_bits >= size_of::<K>() * 8 {
460            let hash = xxhash_rust::xxh3::xxh3_64(key.as_bytes());
461            return (hash as usize) % self.engine.shards().len();
462        }
463
464        let full_bytes = self.shard_prefix_bits / 8;
465        let extra_bits = self.shard_prefix_bits % 8;
466
467        let hash = if extra_bits == 0 {
468            xxhash_rust::xxh3::xxh3_64(&key.as_bytes()[..full_bytes])
469        } else {
470            let mut buf = K::zeroed();
471            buf.as_bytes_mut()[..full_bytes].copy_from_slice(&key.as_bytes()[..full_bytes]);
472            let mask = !((1u8 << (8 - extra_bits)) - 1);
473            buf.as_bytes_mut()[full_bytes] = key.as_bytes()[full_bytes] & mask;
474            xxhash_rust::xxh3::xxh3_64(&buf.as_bytes()[..full_bytes + 1])
475        };
476
477        (hash as usize) % self.engine.shards().len()
478    }
479
480    // -- Migrate --------------------------------------------------------------
481
482    /// Iterate all entries and optionally mutate them. Call once at startup.
483    ///
484    /// The callback receives each (key, &T) and returns `MigrateAction`:
485    /// - `Keep` — no change (fires `on_init` if `NEEDS_INIT`)
486    /// - `Update(new_value)` — replace value (hook-free write, fires `on_init`)
487    /// - `Delete` — remove entry (hook-free tombstone)
488    ///
489    /// Returns the number of mutated entries.
490    pub fn migrate(&self, f: impl Fn(&K, &T) -> crate::MigrateAction<T>) -> DbResult<usize> {
491        use crate::MigrateAction;
492
493        let mut count = 0;
494        for i in 0..self.engine.shards().len() {
495            let keys: Vec<K> = {
496                let index = sync::lock(&self.indexes[i]);
497                index.keys().copied().collect()
498            };
499            for key in keys {
500                let value_ref = match self.get(&key) {
501                    Some(v) => v,
502                    None => continue,
503                };
504                let action = f(&key, &*value_ref);
505                drop(value_ref);
506
507                match action {
508                    MigrateAction::Keep => {
509                        if H::NEEDS_INIT
510                            && let Some(v) = self.get(&key)
511                        {
512                            self.hook.on_init(&key, &*v);
513                        }
514                    }
515                    MigrateAction::Update(value) => {
516                        if H::NEEDS_INIT {
517                            self.hook.on_init(&key, &value);
518                        }
519                        let shard_id = self.shard_for(&key);
520                        let mut inner = self.engine.shards()[shard_id].lock();
521                        let mut index = sync::lock(&self.indexes[shard_id]);
522                        let guard = self.collector.enter();
523                        self.put_locked_inner::<false>(
524                            shard_id, &mut inner, &mut index, guard, &key, value,
525                        )?;
526                        count += 1;
527                    }
528                    MigrateAction::Delete => {
529                        let shard_id = self.shard_for(&key);
530                        let mut inner = self.engine.shards()[shard_id].lock();
531                        let mut index = sync::lock(&self.indexes[shard_id]);
532                        let guard = self.collector.enter();
533                        self.delete_locked_inner::<false>(
534                            shard_id, &mut inner, &mut index, guard, &key,
535                        )?;
536                        count += 1;
537                    }
538                }
539            }
540        }
541
542        tracing::info!(mutations = count, "typed_map migration complete");
543        Ok(count)
544    }
545
546    /// Replay `on_init` for every live entry. Used when no migration runs.
547    pub(crate) fn replay_init(&self) {
548        if !H::NEEDS_INIT {
549            return;
550        }
551        for shard in &self.indexes {
552            let index = sync::lock(shard);
553            for (key, entry) in index.iter() {
554                let data = unsafe { &*entry.ptr };
555                self.hook.on_init(key, &data.value);
556            }
557        }
558    }
559
560    // -- Private --------------------------------------------------------------
561
562    pub(crate) fn indexes(&self) -> &[Mutex<HashMap<K, TypedMapEntry<T>>>] {
563        &self.indexes
564    }
565
566    fn put_locked<'g>(
567        &self,
568        shard_id: usize,
569        inner: &mut ShardInner,
570        index: &mut HashMap<K, TypedMapEntry<T>>,
571        guard: seize::LocalGuard<'g>,
572        key: &K,
573        value: T,
574    ) -> DbResult<Option<TypedRef<'g, T>>> {
575        self.put_locked_inner::<true>(shard_id, inner, index, guard, key, value)
576    }
577
578    fn put_locked_inner<'g, const HOOKS: bool>(
579        &self,
580        shard_id: usize,
581        inner: &mut ShardInner,
582        index: &mut HashMap<K, TypedMapEntry<T>>,
583        guard: seize::LocalGuard<'g>,
584        key: &K,
585        value: T,
586    ) -> DbResult<Option<TypedRef<'g, T>>> {
587        let mut buf = Vec::new();
588        self.codec.encode_to(&value, &mut buf);
589        let (disk_loc, _gsn) = inner.append_entry(shard_id as u8, key.as_bytes(), &buf, false)?;
590
591        let new_data_ptr = Box::into_raw(Box::new(TypedData {
592            disk: disk_loc,
593            value,
594        }));
595
596        if let Some(old_entry) = index.insert(*key, TypedMapEntry { ptr: new_data_ptr }) {
597            let old_data = old_entry.ptr as *const TypedData<T>;
598
599            if HOOKS {
600                self.hook.on_write(
601                    key,
602                    Some(unsafe { &(*old_data).value }),
603                    Some(unsafe { &(*new_data_ptr).value }),
604                );
605            }
606
607            let old_disk = unsafe { (*old_entry.ptr).disk };
608            inner.add_dead_bytes(
609                old_disk.file_id as u32,
610                crate::entry::entry_size(size_of::<K>(), old_disk.len),
611            );
612            unsafe {
613                self.collector
614                    .retire(old_entry.ptr, seize::reclaim::boxed::<TypedData<T>>);
615            }
616
617            Ok(Some(TypedRef::new(guard, old_data)))
618        } else {
619            if HOOKS {
620                self.hook
621                    .on_write(key, None, Some(unsafe { &(*new_data_ptr).value }));
622            }
623            Ok(None)
624        }
625    }
626
627    fn insert_locked(
628        &self,
629        shard_id: usize,
630        inner: &mut ShardInner,
631        index: &mut HashMap<K, TypedMapEntry<T>>,
632        _guard: &seize::LocalGuard<'_>,
633        key: &K,
634        value: T,
635    ) -> DbResult<()> {
636        if index.contains_key(key) {
637            return Err(DbError::KeyExists);
638        }
639
640        let mut buf = Vec::new();
641        self.codec.encode_to(&value, &mut buf);
642        let (disk_loc, _gsn) = inner.append_entry(shard_id as u8, key.as_bytes(), &buf, false)?;
643
644        let new_data_ptr = Box::into_raw(Box::new(TypedData {
645            disk: disk_loc,
646            value,
647        }));
648
649        index.insert(*key, TypedMapEntry { ptr: new_data_ptr });
650        self.hook
651            .on_write(key, None, Some(unsafe { &(*new_data_ptr).value }));
652        Ok(())
653    }
654
655    fn delete_locked<'g>(
656        &self,
657        shard_id: usize,
658        inner: &mut ShardInner,
659        index: &mut HashMap<K, TypedMapEntry<T>>,
660        guard: seize::LocalGuard<'g>,
661        key: &K,
662    ) -> DbResult<Option<TypedRef<'g, T>>> {
663        self.delete_locked_inner::<true>(shard_id, inner, index, guard, key)
664    }
665
666    fn delete_locked_inner<'g, const HOOKS: bool>(
667        &self,
668        shard_id: usize,
669        inner: &mut ShardInner,
670        index: &mut HashMap<K, TypedMapEntry<T>>,
671        guard: seize::LocalGuard<'g>,
672        key: &K,
673    ) -> DbResult<Option<TypedRef<'g, T>>> {
674        let entry = match index.remove(key) {
675            Some(e) => e,
676            None => return Ok(None),
677        };
678
679        let old_data = entry.ptr as *const TypedData<T>;
680
681        if HOOKS {
682            self.hook
683                .on_write(key, Some(unsafe { &(*old_data).value }), None);
684        }
685
686        inner.append_entry(shard_id as u8, key.as_bytes(), &[], true)?;
687
688        let old_disk = unsafe { (*entry.ptr).disk };
689        inner.add_dead_bytes(
690            old_disk.file_id as u32,
691            crate::entry::entry_size(size_of::<K>(), old_disk.len),
692        );
693        unsafe {
694            self.collector
695                .retire(entry.ptr, seize::reclaim::boxed::<TypedData<T>>);
696        }
697
698        Ok(Some(TypedRef::new(guard, old_data)))
699    }
700}
701
702impl<K: Key + Send + Sync + Hash + Eq, T: Send + Sync, C: Codec<T>, H: TypedWriteHook<K, T>> Drop
703    for TypedMap<K, T, C, H>
704{
705    fn drop(&mut self) {
706        for index_mutex in &self.indexes {
707            let map = sync::lock(index_mutex);
708            for (_, entry) in map.iter() {
709                unsafe {
710                    drop(Box::from_raw(entry.ptr));
711                }
712            }
713        }
714    }
715}
716
717// ---------------------------------------------------------------------------
718// TypedMapShard
719// ---------------------------------------------------------------------------
720
721/// Handle for atomic multi-key operations within a single shard.
722/// Obtained via [`TypedMap::atomic`]. The shard + index locks are held for the
723/// lifetime of this struct — keep the closure short.
724pub struct TypedMapShard<
725    'a,
726    K: Key + Send + Sync + Hash + Eq,
727    T: Send + Sync,
728    C: Codec<T>,
729    H: TypedWriteHook<K, T> = NoHook,
730> {
731    map: &'a TypedMap<K, T, C, H>,
732    inner: MutexGuard<'a, ShardInner>,
733    index: MutexGuard<'a, HashMap<K, TypedMapEntry<T>>>,
734    shard_id: usize,
735    guard: seize::LocalGuard<'a>,
736}
737
738impl<K: Key + Send + Sync + Hash + Eq, T: Send + Sync, C: Codec<T> + Sync, H: TypedWriteHook<K, T>>
739    TypedMapShard<'_, K, T, C, H>
740{
741    pub fn put(&mut self, key: &K, value: T) -> DbResult<Option<TypedRef<'_, T>>> {
742        self.check_shard(key)?;
743        let guard = self.map.collector.enter();
744        self.map.put_locked(
745            self.shard_id,
746            &mut self.inner,
747            &mut self.index,
748            guard,
749            key,
750            value,
751        )
752    }
753
754    pub fn insert(&mut self, key: &K, value: T) -> DbResult<()> {
755        self.check_shard(key)?;
756        self.map.insert_locked(
757            self.shard_id,
758            &mut self.inner,
759            &mut self.index,
760            &self.guard,
761            key,
762            value,
763        )
764    }
765
766    pub fn delete(&mut self, key: &K) -> DbResult<Option<TypedRef<'_, T>>> {
767        self.check_shard(key)?;
768        let guard = self.map.collector.enter();
769        self.map
770            .delete_locked(self.shard_id, &mut self.inner, &mut self.index, guard, key)
771    }
772
773    pub fn get(&self, key: &K) -> Option<&T> {
774        let entry = self.index.get(key)?;
775        Some(unsafe { &(*entry.ptr).value })
776    }
777
778    pub fn get_or_err(&self, key: &K) -> DbResult<&T> {
779        self.get(key).ok_or(DbError::KeyNotFound)
780    }
781
782    pub fn contains(&self, key: &K) -> bool {
783        self.index.contains_key(key)
784    }
785
786    fn check_shard(&self, key: &K) -> DbResult<()> {
787        if self.map.shard_for(key) != self.shard_id {
788            return Err(DbError::ShardMismatch);
789        }
790        Ok(())
791    }
792}
793
794#[cfg(feature = "armour")]
795impl<T, C, H> crate::armour::collection::Collection for TypedMap<T::SelfId, T, C, H>
796where
797    T: crate::CollectionMeta + Clone + Send + Sync,
798    C: crate::Codec<T> + Sync,
799    H: crate::hook::TypedWriteHook<T::SelfId, T>,
800    T::SelfId: crate::Key + Send + Sync + std::hash::Hash + Eq,
801{
802    fn name(&self) -> &str {
803        T::NAME
804    }
805    fn len(&self) -> usize {
806        self.len()
807    }
808    fn compact(&self) -> crate::DbResult<usize> {
809        self.compact()
810    }
811}