Skip to main content

vortex_session/
registry.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Many session types use a registry of objects that can be looked up by name to construct
5//! contexts. This module provides a generic registry type for that purpose.
6
7use std::cmp::Ordering;
8use std::fmt;
9use std::fmt::Debug;
10use std::fmt::Display;
11use std::fmt::Formatter;
12use std::hash::Hash;
13use std::ops::Deref;
14use std::sync::Arc;
15use std::sync::LazyLock;
16use std::sync::OnceLock;
17
18use lasso::Spur;
19use lasso::ThreadedRodeo;
20use parking_lot::RwLock;
21use vortex_error::VortexExpect;
22use vortex_utils::aliases::dash_map::DashMap;
23
24/// Global string interner for [`Id`] values.
25static INTERNER: LazyLock<ThreadedRodeo> = LazyLock::new(ThreadedRodeo::new);
26
27/// A lightweight, copyable identifier backed by a global string interner.
28///
29/// Used for array encoding IDs, scalar function IDs, layout IDs, and similar
30/// globally-unique string identifiers throughout Vortex. Equality and hashing
31/// are O(1) symbol comparisons.
32#[derive(Clone, Copy, PartialEq, Eq, Hash)]
33pub struct Id(Spur);
34
35impl Id {
36    /// Intern a string and return its `Id`.
37    pub fn new(s: &str) -> Self {
38        Self(INTERNER.get_or_intern(s))
39    }
40
41    /// Intern a string and return its `Id`.
42    pub fn new_static(s: &'static str) -> Self {
43        Self(INTERNER.get_or_intern_static(s))
44    }
45
46    /// Returns the interned string.
47    pub fn as_str(&self) -> &str {
48        let s = INTERNER.resolve(&self.0);
49        // SAFETY: INTERNER is 'static and its arena is append-only, so resolved string
50        // pointers are stable for the lifetime of the program.
51        unsafe { &*(s as *const str) }
52    }
53}
54
55impl From<&str> for Id {
56    fn from(s: &str) -> Self {
57        Self::new(s)
58    }
59}
60
61impl Display for Id {
62    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
63        f.write_str(self.as_str())
64    }
65}
66
67impl Debug for Id {
68    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
69        write!(f, "Id(\"{}\")", self.as_str())
70    }
71}
72
73impl PartialOrd for Id {
74    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
75        Some(self.cmp(other))
76    }
77}
78
79impl Ord for Id {
80    fn cmp(&self, other: &Self) -> Ordering {
81        self.as_str().cmp(other.as_str())
82    }
83}
84
85impl AsRef<str> for Id {
86    fn as_ref(&self) -> &str {
87        self.as_str()
88    }
89}
90
91impl PartialEq<&Id> for Id {
92    fn eq(&self, other: &&Id) -> bool {
93        self == *other
94    }
95}
96
97impl PartialEq<Id> for &Id {
98    fn eq(&self, other: &Id) -> bool {
99        *self == other
100    }
101}
102
103/// A lazily-initialized, cached [`Id`] for use as a `static`.
104///
105/// Avoids repeated interner write-lock acquisition by storing the interned [`Id`]
106/// on first access and returning the cached copy on all subsequent calls.
107///
108/// # Example
109///
110/// ```
111/// use vortex_session::registry::{CachedId, Id};
112///
113/// static MY_ID: CachedId = CachedId::new("my.encoding");
114///
115/// fn get_id() -> Id {
116///     *MY_ID
117/// }
118/// ```
119pub struct CachedId {
120    s: &'static str,
121    cached: OnceLock<Id>,
122}
123
124impl CachedId {
125    /// Create a new `CachedId` that will intern `s` on first access.
126    pub const fn new(s: &'static str) -> Self {
127        Self {
128            s,
129            cached: OnceLock::new(),
130        }
131    }
132}
133
134impl Deref for CachedId {
135    type Target = Id;
136
137    fn deref(&self) -> &Id {
138        self.cached.get_or_init(|| Id::new(self.s))
139    }
140}
141
142/// A registry of items that are keyed by a string identifier.
143#[derive(Clone, Debug)]
144pub struct Registry<T>(Arc<DashMap<Id, T>>);
145
146impl<T> Default for Registry<T> {
147    fn default() -> Self {
148        Self(Default::default())
149    }
150}
151
152impl<T: Clone> Registry<T> {
153    pub fn empty() -> Self {
154        Self(Default::default())
155    }
156
157    /// List the IDs in the registry.
158    pub fn ids(&self) -> impl Iterator<Item = Id> + '_ {
159        self.0.iter().map(|i| *i.key())
160    }
161
162    /// List the items in the registry.
163    pub fn items(&self) -> impl Iterator<Item = T> + '_ {
164        self.0.iter().map(|i| i.value().clone())
165    }
166
167    /// Return the items with the given IDs.
168    pub fn find_many<'a>(
169        &self,
170        ids: impl IntoIterator<Item = &'a Id>,
171    ) -> impl Iterator<Item = Option<impl Deref<Target = T>>> {
172        ids.into_iter().map(|id| self.0.get(id))
173    }
174
175    /// Find the item with the given ID.
176    pub fn find(&self, id: &Id) -> Option<T> {
177        self.0.get(id).as_deref().cloned()
178    }
179
180    /// Register a new item, replacing any existing item with the same ID.
181    pub fn register(&self, id: impl Into<Id>, item: impl Into<T>) {
182        self.0.insert(id.into(), item.into());
183    }
184
185    /// Register a new item, replacing any existing item with the same ID, and return self for
186    pub fn with(self, id: impl Into<Id>, item: impl Into<T>) -> Self {
187        self.register(id, item.into());
188        self
189    }
190}
191
192/// A [`ReadContext`] holds a set of interned IDs for use during deserialization, mapping
193/// u16 indices to IDs.
194#[derive(Clone, Debug)]
195pub struct ReadContext {
196    ids: Arc<[Id]>,
197}
198
199impl ReadContext {
200    /// Create a context with the given initial IDs.
201    pub fn new(ids: impl Into<Arc<[Id]>>) -> Self {
202        Self { ids: ids.into() }
203    }
204
205    /// Resolve an interned ID by its index.
206    pub fn resolve(&self, idx: u16) -> Option<Id> {
207        self.ids.get(idx as usize).cloned()
208    }
209
210    pub fn ids(&self) -> &[Id] {
211        &self.ids
212    }
213}
214
215/// A [`Context`] holds a set of interned IDs for use during serialization/deserialization, mapping
216/// IDs to u16 indices.
217///
218/// ## Upcoming Changes
219///
220/// 1. This object holds an Arc of RwLock internally because we need concurrent access from the
221///    layout writer code path. We should update SegmentSink to take an Array rather than
222///    ByteBuffer such that serializing arrays is done sequentially.
223/// 2. The name is terrible. `Interner<T>` is better, but I want to minimize breakage for now.
224#[derive(Clone, Debug)]
225pub struct Context<T> {
226    // TODO(ngates): it's a long story, but if we make SegmentSink and SegmentSource take an
227    //  enum of Segment { Array, DType, Buffer } then we don't actually need a mutable context
228    //  in the LayoutWriter, therefore we don't need a RwLock here and everyone is happier.
229    ids: Arc<RwLock<Vec<Id>>>,
230    // Optional registry used to filter the permissible interned items.
231    registry: Option<Registry<T>>,
232}
233
234impl<T> Default for Context<T> {
235    fn default() -> Self {
236        Self {
237            ids: Arc::new(RwLock::new(Vec::new())),
238            registry: None,
239        }
240    }
241}
242
243impl<T: Clone> Context<T> {
244    /// Create a context with the given initial IDs.
245    pub fn new(ids: Vec<Id>) -> Self {
246        Self {
247            ids: Arc::new(RwLock::new(ids)),
248            registry: None,
249        }
250    }
251
252    /// Create an empty context.
253    pub fn empty() -> Self {
254        Self::default()
255    }
256
257    /// Configure a registry to restrict the permissible set of interned items.
258    pub fn with_registry(mut self, registry: Registry<T>) -> Self {
259        self.registry = Some(registry);
260        self
261    }
262
263    /// Intern an ID, returning its index.
264    pub fn intern(&self, id: &Id) -> Option<u16> {
265        if let Some(registry) = &self.registry
266            && registry.find(id).is_none()
267        {
268            // ID not in registry, cannot intern.
269            return None;
270        }
271
272        let mut ids = self.ids.write();
273        if let Some(idx) = ids.iter().position(|e| e == id) {
274            return Some(u16::try_from(idx).vortex_expect("Cannot have more than u16::MAX items"));
275        }
276
277        let idx = ids.len();
278        assert!(
279            idx < u16::MAX as usize,
280            "Cannot have more than u16::MAX items"
281        );
282        ids.push(*id);
283        Some(u16::try_from(idx).vortex_expect("checked already"))
284    }
285
286    /// Get the list of interned IDs.
287    pub fn to_ids(&self) -> Vec<Id> {
288        self.ids.read().clone()
289    }
290}