Skip to main content

vortex_session/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4pub mod registry;
5
6use std::any::Any;
7use std::any::TypeId;
8use std::any::type_name;
9use std::fmt::Debug;
10use std::hash::BuildHasherDefault;
11use std::hash::Hasher;
12use std::ops::Deref;
13use std::ops::DerefMut;
14use std::sync::Arc;
15
16use dashmap::DashMap;
17use dashmap::Entry;
18use dashmap::mapref::one::MappedRef;
19use dashmap::mapref::one::MappedRefMut;
20use vortex_error::VortexExpect;
21use vortex_error::vortex_panic;
22
23/// A Vortex session encapsulates the set of extensible arrays, layouts, compute functions, dtypes,
24/// etc. that are available for use in a given context.
25///
26/// It is also the entry-point passed to dynamic libraries to initialize Vortex plugins.
27#[derive(Clone, Debug)]
28pub struct VortexSession(Arc<SessionVars>);
29
30impl VortexSession {
31    /// Create a new [`VortexSession`] with no session state.
32    ///
33    /// It is recommended to use the `default()` method instead provided by the main `vortex` crate.
34    pub fn empty() -> Self {
35        Self(Default::default())
36    }
37
38    /// Inserts a new session variable of type `V` with its default value.
39    ///
40    /// # Panics
41    ///
42    /// If a variable of that type already exists.
43    pub fn with<V: SessionVar + Default>(self) -> Self {
44        self.with_some(V::default())
45    }
46
47    /// Inserts a new session variable of type `V`.
48    ///
49    /// # Panics
50    ///
51    /// If a variable of that type already exists.
52    pub fn with_some<V: SessionVar>(self, var: V) -> Self {
53        match self.0.entry(TypeId::of::<V>()) {
54            Entry::Occupied(_) => {
55                vortex_panic!(
56                    "Session variable of type {} already exists",
57                    type_name::<V>()
58                );
59            }
60            Entry::Vacant(e) => {
61                e.insert(Box::new(var));
62            }
63        }
64        self
65    }
66
67    /// Allow deserializing unknown plugin IDs as non-executable foreign placeholders.
68    pub fn allow_unknown(self) -> Self {
69        let mut policy = <Self as SessionExt>::get_mut::<UnknownPluginPolicy>(&self);
70        policy.allow_unknown = true;
71        drop(policy);
72        self
73    }
74
75    /// Returns whether unknown plugins should deserialize as foreign placeholders.
76    pub fn allows_unknown(&self) -> bool {
77        <Self as SessionExt>::get_opt::<UnknownPluginPolicy>(self)
78            .map(|p| p.allow_unknown)
79            .unwrap_or(false)
80    }
81}
82
83#[derive(Debug, Clone, Copy, Default)]
84struct UnknownPluginPolicy {
85    allow_unknown: bool,
86}
87
88impl SessionVar for UnknownPluginPolicy {
89    fn as_any(&self) -> &dyn Any {
90        self
91    }
92
93    fn as_any_mut(&mut self) -> &mut dyn Any {
94        self
95    }
96}
97
98/// Trait for accessing and modifying the state of a Vortex session.
99pub trait SessionExt: Sized + private::Sealed {
100    /// Returns the [`VortexSession`].
101    fn session(&self) -> VortexSession;
102
103    /// Returns the scope variable of type `V`, or inserts a default one if it does not exist.
104    fn get<V: SessionVar + Default>(&self) -> Ref<'_, V>;
105
106    /// Returns the scope variable of type `V` if it exists.
107    fn get_opt<V: SessionVar>(&self) -> Option<Ref<'_, V>>;
108
109    /// Returns the scope variable of type `V`, or inserts a default one if it does not exist.
110    ///
111    /// Note that the returned value internally holds a lock on the variable.
112    fn get_mut<V: SessionVar + Default>(&self) -> RefMut<'_, V>;
113
114    /// Returns the scope variable of type `V`, if it exists.
115    ///
116    /// Note that the returned value internally holds a lock on the variable.
117    fn get_mut_opt<V: SessionVar>(&self) -> Option<RefMut<'_, V>>;
118}
119
120mod private {
121    pub trait Sealed {}
122    impl Sealed for super::VortexSession {}
123}
124
125impl SessionExt for VortexSession {
126    fn session(&self) -> VortexSession {
127        self.clone()
128    }
129
130    /// Returns the scope variable of type `V`, or inserts a default one if it does not exist.
131    fn get<V: SessionVar + Default>(&self) -> Ref<'_, V> {
132        // NOTE(ngates): we don't use `entry().or_insert_with_key()` here because the DashMap
133        //  would immediately acquire an exclusive write lock.
134        if let Some(v) = self.0.get(&TypeId::of::<V>()) {
135            return Ref(v.map(|v| {
136                (**v)
137                    .as_any()
138                    .downcast_ref::<V>()
139                    .vortex_expect("Type mismatch - this is a bug")
140            }));
141        }
142
143        // If we get here, the value was not present, so we insert the default with a write lock.
144        Ref(self
145            .0
146            .entry(TypeId::of::<V>())
147            .or_insert_with(|| Box::new(V::default()))
148            .downgrade()
149            .map(|v| {
150                (**v)
151                    .as_any()
152                    .downcast_ref::<V>()
153                    .vortex_expect("Type mismatch - this is a bug")
154            }))
155    }
156
157    fn get_opt<V: SessionVar>(&self) -> Option<Ref<'_, V>> {
158        self.0.get(&TypeId::of::<V>()).map(|v| {
159            Ref(v.map(|v| {
160                (**v)
161                    .as_any()
162                    .downcast_ref::<V>()
163                    .vortex_expect("Type mismatch - this is a bug")
164            }))
165        })
166    }
167
168    /// Returns the scope variable of type `V`, or inserts a default one if it does not exist.
169    ///
170    /// Note that the returned value internally holds a lock on the variable.
171    fn get_mut<V: SessionVar + Default>(&self) -> RefMut<'_, V> {
172        RefMut(
173            self.0
174                .entry(TypeId::of::<V>())
175                .or_insert_with(|| Box::new(V::default()))
176                .map(|v| {
177                    (**v)
178                        .as_any_mut()
179                        .downcast_mut::<V>()
180                        .vortex_expect("Type mismatch - this is a bug")
181                }),
182        )
183    }
184
185    fn get_mut_opt<V: SessionVar>(&self) -> Option<RefMut<'_, V>> {
186        self.0.get_mut(&TypeId::of::<V>()).map(|v| {
187            RefMut(v.map(|v| {
188                (**v)
189                    .as_any_mut()
190                    .downcast_mut::<V>()
191                    .vortex_expect("Type mismatch - this is a bug")
192            }))
193        })
194    }
195}
196
197/// A TypeMap based on `https://docs.rs/http/1.2.0/src/http/extensions.rs.html#41-266`.
198type SessionVars = DashMap<TypeId, Box<dyn SessionVar>, BuildHasherDefault<IdHasher>>;
199
200/// With TypeIds as keys, there's no need to hash them. They are already hashes
201/// themselves, coming from the compiler. The IdHasher just holds the u64 of
202/// the TypeId, and then returns it, instead of doing any bit fiddling.
203#[derive(Default)]
204struct IdHasher(u64);
205
206impl Hasher for IdHasher {
207    #[inline]
208    fn finish(&self) -> u64 {
209        self.0
210    }
211
212    fn write(&mut self, _: &[u8]) {
213        unreachable!("TypeId calls write_u64");
214    }
215
216    #[inline]
217    fn write_u64(&mut self, id: u64) {
218        self.0 = id;
219    }
220}
221
222/// This trait defines variables that can be stored against a Vortex session.
223///
224/// Users should implement this trait for anything that you want to store on a `VortexSession`.
225pub trait SessionVar: Any + Send + Sync + Debug + 'static {
226    fn as_any(&self) -> &dyn Any;
227    fn as_any_mut(&mut self) -> &mut dyn Any;
228}
229
230// NOTE(ngates): we don't want to expose that the internals of a session is a DashMap, so we have
231// our own wrapped Ref type.
232pub struct Ref<'a, T>(MappedRef<'a, TypeId, Box<dyn SessionVar>, T>);
233impl<'a, T> Deref for Ref<'a, T> {
234    type Target = T;
235
236    fn deref(&self) -> &Self::Target {
237        &self.0
238    }
239}
240impl<'a, T> Ref<'a, T> {
241    /// Map this reference to a different target.
242    pub fn map<F, U>(self, f: F) -> Ref<'a, U>
243    where
244        F: FnOnce(&T) -> &U,
245    {
246        Ref(self.0.map(f))
247    }
248}
249
250pub struct RefMut<'a, T>(MappedRefMut<'a, TypeId, Box<dyn SessionVar>, T>);
251impl<'a, T> Deref for RefMut<'a, T> {
252    type Target = T;
253
254    fn deref(&self) -> &Self::Target {
255        &self.0
256    }
257}
258impl<'a, T> DerefMut for RefMut<'a, T> {
259    fn deref_mut(&mut self) -> &mut Self::Target {
260        self.0.deref_mut()
261    }
262}
263impl<'a, T> RefMut<'a, T> {
264    /// Map this mutable reference to a different target.
265    pub fn map<F, U>(self, f: F) -> RefMut<'a, U>
266    where
267        F: FnOnce(&mut T) -> &mut U,
268    {
269        RefMut(self.0.map(f))
270    }
271}
272
273#[cfg(test)]
274mod tests {
275    use super::VortexSession;
276
277    #[test]
278    fn allow_unknown_flag_is_opt_in() {
279        let session = VortexSession::empty();
280        assert!(!session.allows_unknown());
281
282        let session = session.allow_unknown();
283        assert!(session.allows_unknown());
284    }
285}