Skip to main content

vortex_session/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4pub mod registry;
5
6use std::any::Any;
7use std::any::TypeId;
8use std::any::type_name;
9use std::fmt::Debug;
10use std::hash::BuildHasherDefault;
11use std::hash::Hasher;
12use std::ops::Deref;
13use std::ops::DerefMut;
14use std::sync::Arc;
15
16use dashmap::DashMap;
17use dashmap::Entry;
18use vortex_error::VortexExpect;
19use vortex_error::vortex_panic;
20
21/// A Vortex session encapsulates the set of extensible arrays, layouts, compute functions, dtypes,
22/// etc. that are available for use in a given context.
23///
24/// It is also the entry-point passed to dynamic libraries to initialize Vortex plugins.
25#[derive(Clone, Debug)]
26pub struct VortexSession(Arc<SessionVars>);
27
28impl VortexSession {
29    /// Create a new [`VortexSession`] with no session state.
30    ///
31    /// It is recommended to use the `default()` method instead provided by the main `vortex` crate.
32    pub fn empty() -> Self {
33        Self(Default::default())
34    }
35
36    /// Inserts a new session variable of type `V` with its default value.
37    ///
38    /// # Panics
39    ///
40    /// If a variable of that type already exists.
41    pub fn with<V: SessionVar + Default>(self) -> Self {
42        self.with_some(V::default())
43    }
44
45    /// Inserts a new session variable of type `V`.
46    ///
47    /// # Panics
48    ///
49    /// If a variable of that type already exists.
50    pub fn with_some<V: SessionVar>(self, var: V) -> Self {
51        match self.0.entry(TypeId::of::<V>()) {
52            Entry::Occupied(_) => {
53                vortex_panic!(
54                    "Session variable of type {} already exists",
55                    type_name::<V>()
56                );
57            }
58            Entry::Vacant(e) => {
59                e.insert(Box::new(var));
60            }
61        }
62        self
63    }
64
65    /// Allow deserializing unknown plugin IDs as non-executable foreign placeholders.
66    pub fn allow_unknown(self) -> Self {
67        let mut policy = <Self as SessionExt>::get_mut::<UnknownPluginPolicy>(&self);
68        policy.allow_unknown = true;
69        drop(policy);
70        self
71    }
72
73    /// Returns whether unknown plugins should deserialize as foreign placeholders.
74    pub fn allows_unknown(&self) -> bool {
75        <Self as SessionExt>::get_opt::<UnknownPluginPolicy>(self)
76            .map(|p| p.allow_unknown)
77            .unwrap_or(false)
78    }
79}
80
81#[derive(Debug, Clone, Copy, Default)]
82struct UnknownPluginPolicy {
83    allow_unknown: bool,
84}
85
86impl SessionVar for UnknownPluginPolicy {
87    fn as_any(&self) -> &dyn Any {
88        self
89    }
90
91    fn as_any_mut(&mut self) -> &mut dyn Any {
92        self
93    }
94}
95
96/// Trait for accessing and modifying the state of a Vortex session.
97pub trait SessionExt: Sized + private::Sealed {
98    /// Returns the [`VortexSession`].
99    fn session(&self) -> VortexSession;
100
101    /// Returns the scope variable of type `V`, or inserts a default one if it does not exist.
102    fn get<V: SessionVar + Default>(&self) -> Ref<'_, V>;
103
104    /// Returns the scope variable of type `V` if it exists.
105    fn get_opt<V: SessionVar>(&self) -> Option<Ref<'_, V>>;
106
107    /// Returns the scope variable of type `V`, or inserts a default one if it does not exist.
108    ///
109    /// Note that the returned value internally holds a lock on the variable.
110    fn get_mut<V: SessionVar + Default>(&self) -> RefMut<'_, V>;
111
112    /// Returns the scope variable of type `V`, if it exists.
113    ///
114    /// Note that the returned value internally holds a lock on the variable.
115    fn get_mut_opt<V: SessionVar>(&self) -> Option<RefMut<'_, V>>;
116}
117
118mod private {
119    pub trait Sealed {}
120    impl Sealed for super::VortexSession {}
121}
122
123impl SessionExt for VortexSession {
124    fn session(&self) -> VortexSession {
125        self.clone()
126    }
127
128    /// Returns the scope variable of type `V`, or inserts a default one if it does not exist.
129    fn get<V: SessionVar + Default>(&self) -> Ref<'_, V> {
130        // NOTE(ngates): we don't use `entry().or_insert_with_key()` here because the DashMap
131        //  would immediately acquire an exclusive write lock.
132        if let Some(v) = self.0.get(&TypeId::of::<V>()) {
133            return Ref(v.map(|v| {
134                (**v)
135                    .as_any()
136                    .downcast_ref::<V>()
137                    .vortex_expect("Type mismatch - this is a bug")
138            }));
139        }
140
141        // If we get here, the value was not present, so we insert the default with a write lock.
142        Ref(self
143            .0
144            .entry(TypeId::of::<V>())
145            .or_insert_with(|| Box::new(V::default()))
146            .downgrade()
147            .map(|v| {
148                (**v)
149                    .as_any()
150                    .downcast_ref::<V>()
151                    .vortex_expect("Type mismatch - this is a bug")
152            }))
153    }
154
155    fn get_opt<V: SessionVar>(&self) -> Option<Ref<'_, V>> {
156        self.0.get(&TypeId::of::<V>()).map(|v| {
157            Ref(v.map(|v| {
158                (**v)
159                    .as_any()
160                    .downcast_ref::<V>()
161                    .vortex_expect("Type mismatch - this is a bug")
162            }))
163        })
164    }
165
166    /// Returns the scope variable of type `V`, or inserts a default one if it does not exist.
167    ///
168    /// Note that the returned value internally holds a lock on the variable.
169    fn get_mut<V: SessionVar + Default>(&self) -> RefMut<'_, V> {
170        RefMut(
171            self.0
172                .entry(TypeId::of::<V>())
173                .or_insert_with(|| Box::new(V::default()))
174                .map(|v| {
175                    (**v)
176                        .as_any_mut()
177                        .downcast_mut::<V>()
178                        .vortex_expect("Type mismatch - this is a bug")
179                }),
180        )
181    }
182
183    fn get_mut_opt<V: SessionVar>(&self) -> Option<RefMut<'_, V>> {
184        self.0.get_mut(&TypeId::of::<V>()).map(|v| {
185            RefMut(v.map(|v| {
186                (**v)
187                    .as_any_mut()
188                    .downcast_mut::<V>()
189                    .vortex_expect("Type mismatch - this is a bug")
190            }))
191        })
192    }
193}
194
195/// A TypeMap based on `https://docs.rs/http/1.2.0/src/http/extensions.rs.html#41-266`.
196type SessionVars = DashMap<TypeId, Box<dyn SessionVar>, BuildHasherDefault<IdHasher>>;
197
198/// With TypeIds as keys, there's no need to hash them. They are already hashes
199/// themselves, coming from the compiler. The IdHasher just holds the u64 of
200/// the TypeId, and then returns it, instead of doing any bit fiddling.
201#[derive(Default)]
202struct IdHasher(u64);
203
204impl Hasher for IdHasher {
205    #[inline]
206    fn finish(&self) -> u64 {
207        self.0
208    }
209
210    fn write(&mut self, _: &[u8]) {
211        unreachable!("TypeId calls write_u64");
212    }
213
214    #[inline]
215    fn write_u64(&mut self, id: u64) {
216        self.0 = id;
217    }
218}
219
220/// This trait defines variables that can be stored against a Vortex session.
221///
222/// Users should implement this trait for anything that you want to store on a `VortexSession`.
223pub trait SessionVar: Any + Send + Sync + Debug + 'static {
224    fn as_any(&self) -> &dyn Any;
225    fn as_any_mut(&mut self) -> &mut dyn Any;
226}
227
228// NOTE(ngates): we don't want to expose that the internals of a session is a DashMap, so we have
229// our own wrapped Ref type.
230pub struct Ref<'a, T>(dashmap::mapref::one::MappedRef<'a, TypeId, Box<dyn SessionVar>, T>);
231impl<'a, T> Deref for Ref<'a, T> {
232    type Target = T;
233
234    fn deref(&self) -> &Self::Target {
235        &self.0
236    }
237}
238impl<'a, T> Ref<'a, T> {
239    /// Map this reference to a different target.
240    pub fn map<F, U>(self, f: F) -> Ref<'a, U>
241    where
242        F: FnOnce(&T) -> &U,
243    {
244        Ref(self.0.map(f))
245    }
246}
247
248pub struct RefMut<'a, T>(dashmap::mapref::one::MappedRefMut<'a, TypeId, Box<dyn SessionVar>, T>);
249impl<'a, T> Deref for RefMut<'a, T> {
250    type Target = T;
251
252    fn deref(&self) -> &Self::Target {
253        &self.0
254    }
255}
256impl<'a, T> DerefMut for RefMut<'a, T> {
257    fn deref_mut(&mut self) -> &mut Self::Target {
258        self.0.deref_mut()
259    }
260}
261impl<'a, T> RefMut<'a, T> {
262    /// Map this mutable reference to a different target.
263    pub fn map<F, U>(self, f: F) -> RefMut<'a, U>
264    where
265        F: FnOnce(&mut T) -> &mut U,
266    {
267        RefMut(self.0.map(f))
268    }
269}
270
271#[cfg(test)]
272mod tests {
273    use super::VortexSession;
274
275    #[test]
276    fn allow_unknown_flag_is_opt_in() {
277        let session = VortexSession::empty();
278        assert!(!session.allows_unknown());
279
280        let session = session.allow_unknown();
281        assert!(session.allows_unknown());
282    }
283}