Skip to main content

vortex_session/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4pub mod registry;
5
6use std::any::Any;
7use std::any::TypeId;
8use std::any::type_name;
9use std::fmt::Debug;
10use std::hash::BuildHasherDefault;
11use std::hash::Hasher;
12use std::ops::Deref;
13use std::ops::DerefMut;
14use std::sync::Arc;
15
16use dashmap::DashMap;
17use dashmap::Entry;
18use vortex_error::VortexExpect;
19use vortex_error::vortex_panic;
20
21/// A Vortex session encapsulates the set of extensible arrays, layouts, compute functions, dtypes,
22/// etc. that are available for use in a given context.
23///
24/// It is also the entry-point passed to dynamic libraries to initialize Vortex plugins.
25#[derive(Clone, Debug)]
26pub struct VortexSession(Arc<SessionVars>);
27
28impl VortexSession {
29    /// Create a new [`VortexSession`] with no session state.
30    ///
31    /// It is recommended to use the `default()` method instead provided by the main `vortex` crate.
32    pub fn empty() -> Self {
33        Self(Default::default())
34    }
35
36    /// Inserts a new session variable of type `V` with its default value.
37    ///
38    /// # Panics
39    ///
40    /// If a variable of that type already exists.
41    pub fn with<V: SessionVar + Default>(self) -> Self {
42        self.with_some(V::default())
43    }
44
45    /// Inserts a new session variable of type `V`.
46    ///
47    /// # Panics
48    ///
49    /// If a variable of that type already exists.
50    pub fn with_some<V: SessionVar>(self, var: V) -> Self {
51        match self.0.entry(TypeId::of::<V>()) {
52            Entry::Occupied(_) => {
53                vortex_panic!(
54                    "Session variable of type {} already exists",
55                    type_name::<V>()
56                );
57            }
58            Entry::Vacant(e) => {
59                e.insert(Box::new(var));
60            }
61        }
62        self
63    }
64
65    /// Allow deserializing unknown plugin IDs as non-executable foreign placeholders.
66    pub fn allow_unknown(self) -> Self {
67        let mut policy = <Self as SessionExt>::get_mut::<UnknownPluginPolicy>(&self);
68        policy.allow_unknown = true;
69        drop(policy);
70        self
71    }
72
73    /// Returns whether unknown plugins should deserialize as foreign placeholders.
74    pub fn allows_unknown(&self) -> bool {
75        <Self as SessionExt>::get::<UnknownPluginPolicy>(self).allow_unknown
76    }
77}
78
79#[derive(Debug, Clone, Copy, Default)]
80struct UnknownPluginPolicy {
81    allow_unknown: bool,
82}
83
84/// Trait for accessing and modifying the state of a Vortex session.
85pub trait SessionExt: Sized + private::Sealed {
86    /// Returns the [`VortexSession`].
87    fn session(&self) -> VortexSession;
88
89    /// Returns the scope variable of type `V`, or inserts a default one if it does not exist.
90    fn get<V: SessionVar + Default>(&self) -> Ref<'_, V>;
91
92    /// Returns the scope variable of type `V` if it exists.
93    fn get_opt<V: SessionVar>(&self) -> Option<Ref<'_, V>>;
94
95    /// Returns the scope variable of type `V`, or inserts a default one if it does not exist.
96    ///
97    /// Note that the returned value internally holds a lock on the variable.
98    fn get_mut<V: SessionVar + Default>(&self) -> RefMut<'_, V>;
99
100    /// Returns the scope variable of type `V`, if it exists.
101    ///
102    /// Note that the returned value internally holds a lock on the variable.
103    fn get_mut_opt<V: SessionVar>(&self) -> Option<RefMut<'_, V>>;
104}
105
106mod private {
107    pub trait Sealed {}
108    impl Sealed for super::VortexSession {}
109}
110
111impl SessionExt for VortexSession {
112    fn session(&self) -> VortexSession {
113        self.clone()
114    }
115
116    /// Returns the scope variable of type `V`, or inserts a default one if it does not exist.
117    fn get<V: SessionVar + Default>(&self) -> Ref<'_, V> {
118        // NOTE(ngates): we don't use `entry().or_insert_with_key()` here because the DashMap
119        //  would immediately acquire an exclusive write lock.
120        if let Some(v) = self.0.get(&TypeId::of::<V>()) {
121            return Ref(v.map(|v| {
122                (**v)
123                    .as_any()
124                    .downcast_ref::<V>()
125                    .vortex_expect("Type mismatch - this is a bug")
126            }));
127        }
128
129        // If we get here, the value was not present, so we insert the default with a write lock.
130        Ref(self
131            .0
132            .entry(TypeId::of::<V>())
133            .or_insert_with(|| Box::new(V::default()))
134            .downgrade()
135            .map(|v| {
136                (**v)
137                    .as_any()
138                    .downcast_ref::<V>()
139                    .vortex_expect("Type mismatch - this is a bug")
140            }))
141    }
142
143    fn get_opt<V: SessionVar>(&self) -> Option<Ref<'_, V>> {
144        self.0.get(&TypeId::of::<V>()).map(|v| {
145            Ref(v.map(|v| {
146                (**v)
147                    .as_any()
148                    .downcast_ref::<V>()
149                    .vortex_expect("Type mismatch - this is a bug")
150            }))
151        })
152    }
153
154    /// Returns the scope variable of type `V`, or inserts a default one if it does not exist.
155    ///
156    /// Note that the returned value internally holds a lock on the variable.
157    fn get_mut<V: SessionVar + Default>(&self) -> RefMut<'_, V> {
158        RefMut(
159            self.0
160                .entry(TypeId::of::<V>())
161                .or_insert_with(|| Box::new(V::default()))
162                .map(|v| {
163                    (**v)
164                        .as_any_mut()
165                        .downcast_mut::<V>()
166                        .vortex_expect("Type mismatch - this is a bug")
167                }),
168        )
169    }
170
171    fn get_mut_opt<V: SessionVar>(&self) -> Option<RefMut<'_, V>> {
172        self.0.get_mut(&TypeId::of::<V>()).map(|v| {
173            RefMut(v.map(|v| {
174                (**v)
175                    .as_any_mut()
176                    .downcast_mut::<V>()
177                    .vortex_expect("Type mismatch - this is a bug")
178            }))
179        })
180    }
181}
182
183/// A TypeMap based on `https://docs.rs/http/1.2.0/src/http/extensions.rs.html#41-266`.
184type SessionVars = DashMap<TypeId, Box<dyn SessionVar>, BuildHasherDefault<IdHasher>>;
185
186/// With TypeIds as keys, there's no need to hash them. They are already hashes
187/// themselves, coming from the compiler. The IdHasher just holds the u64 of
188/// the TypeId, and then returns it, instead of doing any bit fiddling.
189#[derive(Default)]
190struct IdHasher(u64);
191
192impl Hasher for IdHasher {
193    #[inline]
194    fn finish(&self) -> u64 {
195        self.0
196    }
197
198    fn write(&mut self, _: &[u8]) {
199        unreachable!("TypeId calls write_u64");
200    }
201
202    #[inline]
203    fn write_u64(&mut self, id: u64) {
204        self.0 = id;
205    }
206}
207
208/// This trait defines variables that can be stored against a Vortex session.
209pub trait SessionVar: Any + Send + Sync + Debug + 'static {
210    fn as_any(&self) -> &dyn Any;
211    fn as_any_mut(&mut self) -> &mut dyn Any;
212}
213
214impl<T: Send + Sync + Debug + 'static> SessionVar for T {
215    fn as_any(&self) -> &dyn Any {
216        self
217    }
218
219    fn as_any_mut(&mut self) -> &mut dyn Any {
220        self
221    }
222}
223
224// NOTE(ngates): we don't want to expose that the internals of a session is a DashMap, so we have
225// our own wrapped Ref type.
226pub struct Ref<'a, T>(dashmap::mapref::one::MappedRef<'a, TypeId, Box<dyn SessionVar>, T>);
227impl<'a, T> Deref for Ref<'a, T> {
228    type Target = T;
229
230    fn deref(&self) -> &Self::Target {
231        &self.0
232    }
233}
234impl<'a, T> Ref<'a, T> {
235    /// Map this reference to a different target.
236    pub fn map<F, U>(self, f: F) -> Ref<'a, U>
237    where
238        F: FnOnce(&T) -> &U,
239    {
240        Ref(self.0.map(f))
241    }
242}
243
244pub struct RefMut<'a, T>(dashmap::mapref::one::MappedRefMut<'a, TypeId, Box<dyn SessionVar>, T>);
245impl<'a, T> Deref for RefMut<'a, T> {
246    type Target = T;
247
248    fn deref(&self) -> &Self::Target {
249        &self.0
250    }
251}
252impl<'a, T> DerefMut for RefMut<'a, T> {
253    fn deref_mut(&mut self) -> &mut Self::Target {
254        self.0.deref_mut()
255    }
256}
257impl<'a, T> RefMut<'a, T> {
258    /// Map this mutable reference to a different target.
259    pub fn map<F, U>(self, f: F) -> RefMut<'a, U>
260    where
261        F: FnOnce(&mut T) -> &mut U,
262    {
263        RefMut(self.0.map(f))
264    }
265}
266
267#[cfg(test)]
268mod tests {
269    use super::VortexSession;
270
271    #[test]
272    fn allow_unknown_flag_is_opt_in() {
273        let session = VortexSession::empty();
274        assert!(!session.allows_unknown());
275
276        let session = session.allow_unknown();
277        assert!(session.allows_unknown());
278    }
279}