1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors
//! A concurrent, copy-on-write map backed by an [`ArcSwap`].
use std::borrow::Borrow;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::hash::Hash;
use std::sync::Arc;
use arc_swap::ArcSwap;
use vortex_utils::aliases::hash_map::HashMap;
/// A concurrent [`HashMap`] backed by an [`ArcSwap`], offering lock-free reads
/// and copy-on-write writes.
///
/// Reads load the current snapshot without blocking writers. Writes clone the
/// whole map, apply their change, and atomically publish the new version, so a
/// reader always observes a consistent snapshot and writers never block readers.
///
/// This is the shared building block behind the session-scoped registries (the
/// optimizer-kernel and aggregate-function registries). Because every write
/// clones the entire map, it is intended for maps that are written rarely
/// (typically only while a session is being configured) and read often.
pub(crate) struct ArcSwapMap<K, V> {
inner: ArcSwap<HashMap<K, V>>,
}
impl<K, V> Default for ArcSwapMap<K, V> {
fn default() -> Self {
Self {
inner: ArcSwap::from_pointee(HashMap::default()),
}
}
}
impl<K: Debug, V: Debug> Debug for ArcSwapMap<K, V> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
self.read(|map| f.debug_tuple("ArcSwapMap").field(map).finish())
}
}
impl<K, V> ArcSwapMap<K, V> {
/// Read the current snapshot, passing it to `f`.
///
/// Every lookup inside `f` observes the same snapshot, which matters when a
/// single logical read consults more than one key.
pub(crate) fn read<R>(&self, f: impl FnOnce(&HashMap<K, V>) -> R) -> R {
f(&self.inner.load())
}
/// Replace the map with the result of applying `f` to a private copy.
///
/// Writes are copy-on-write via [`ArcSwap::rcu`], so `f` may run more than
/// once under contention and must not move out of its captures.
fn modify(&self, f: impl Fn(&mut HashMap<K, V>))
where
K: Clone,
V: Clone,
{
self.inner.rcu(|existing| {
let mut map = existing.as_ref().clone();
f(&mut map);
map
});
}
}
impl<K: Eq + Hash, V: Clone> ArcSwapMap<K, V> {
/// Return a clone of the value stored under `key`, if present.
pub(crate) fn get<Q>(&self, key: &Q) -> Option<V>
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
self.inner.load().get(key).cloned()
}
/// Insert `value` under `key`, replacing any existing value.
pub(crate) fn insert(&self, key: K, value: V)
where
K: Clone,
{
self.modify(|map| {
map.insert(key.clone(), value.clone());
});
}
}
impl<K: Eq + Hash + Clone, T: Clone> ArcSwapMap<K, Arc<[T]>> {
/// Append `values` to the list stored under `key`, creating it if absent.
///
/// Each key maps to an immutable `Arc<[T]>`; appending rebuilds that slice
/// copy-on-write so existing readers keep their previous snapshot.
pub(crate) fn extend(&self, key: K, values: &[T]) {
self.modify(|map| {
let merged: Arc<[T]> = match map.get(&key) {
Some(existing) => existing.iter().chain(values).cloned().collect(),
None => values.into(),
};
map.insert(key.clone(), merged);
});
}
/// Append a single `value` to the list stored under `key`, creating it if
/// absent.
pub(crate) fn push(&self, key: K, value: T) {
self.extend(key, &[value]);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn get_and_insert() {
let map = ArcSwapMap::<u32, i32>::default();
assert_eq!(map.get(&1), None);
map.insert(1, 10);
map.insert(1, 20);
assert_eq!(map.get(&1), Some(20));
}
#[test]
fn extend_appends_per_key() {
let map = ArcSwapMap::<u32, Arc<[i32]>>::default();
map.extend(1, &[1, 2]);
map.extend(1, &[3]);
map.extend(2, &[4]);
assert_eq!(map.get(&1).as_deref(), Some([1, 2, 3].as_slice()));
assert_eq!(map.get(&2).as_deref(), Some([4].as_slice()));
}
#[test]
fn push_appends_single_values() {
let map = ArcSwapMap::<u32, Arc<[i32]>>::default();
map.push(1, 1);
map.push(1, 2);
assert_eq!(map.get(&1).as_deref(), Some([1, 2].as_slice()));
}
#[test]
fn read_observes_a_single_snapshot() {
let map = ArcSwapMap::<u32, i32>::default();
map.insert(1, 1);
map.insert(2, 2);
assert_eq!(map.read(|m| m.values().sum::<i32>()), 3);
}
}