Skip to main content

vortex_session/
session.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! The [`VortexSession`] container.
5//!
6//! A [`VortexSession`] is a type-map of [`VortexSessionVar`]s keyed by [`TypeId`]. It is backed by
7//! an [`ArcSwap`], giving lock-free reads and copy-on-write writes:
8//!
9//! * **Reads** ([`SessionExt::get`], [`SessionExt::get_opt`]) load the current snapshot
10//!   without taking any lock and hand back a [`SessionGuard`] that derefs to the variable. Because a
11//!   read never takes a lock, it can never deadlock, and because it never holds a lock across the
12//!   returned reference there is no reader/writer contention.
13//!
14//! * **Writes** ([`VortexSession::with_some`], [`SessionExt::get`] on a missing default) are
15//!   copy-on-write: the map is cloned, the change applied to the private copy, and the new map
16//!   atomically published via [`ArcSwap::rcu`]. The closure passed to `rcu` only clones the map and
17//!   inserts an already-constructed value, so no user code (in particular, no `Default::default`
18//!   implementation) ever runs while a lock is held. This is the key difference from the previous
19//!   `DashMap`-backed session, where `entry().or_insert_with(f)` ran `f` while holding the shard's
20//!   write lock and could deadlock if `f` re-entered the session.
21//!
22//! A modified session is produced by mutating it **in place**: [`VortexSession::with_some`] — and
23//! the configuration `with_*` helpers built on it — apply their change copy-on-write to the shared
24//! backing cell. Clones of a session share that cell, so a variable registered through one clone
25//! (or one `with_*` call) is visible to all of them. This is what late plugin/encoding registration
26//! relies on.
27//!
28//! To build a session from scratch, start from [`VortexSession::empty`] and chain the `with_*`
29//! helpers. Each [`empty`](VortexSession::empty) creates its own backing cell, so a session built
30//! this way is independent of any other.
31
32use std::any::TypeId;
33use std::any::type_name;
34use std::fmt::Debug;
35use std::fmt::Formatter;
36use std::hash::BuildHasherDefault;
37use std::marker::PhantomData;
38use std::ops::Deref;
39use std::ops::DerefMut;
40use std::sync::Arc;
41
42use arc_swap::ArcSwap;
43use arc_swap::Guard;
44use vortex_error::VortexExpect;
45use vortex_error::vortex_panic;
46use vortex_utils::aliases::hash_map::HashMap;
47
48use crate::IdHasher;
49use crate::SessionExt;
50use crate::SessionVar;
51use crate::UnknownPluginPolicy;
52
53/// A [`SessionVar`] that can be stored in a [`VortexSession`].
54///
55/// This trait is implemented automatically for every [`SessionVar`] that is also [`Clone`], so
56/// types opt in by implementing [`Clone`] rather than implementing this trait directly. The trait
57/// itself stays object-safe (so it can be stored as `Arc<dyn VortexSessionVar>`); the [`Clone`]
58/// bound lives on the blanket impl rather than the trait.
59///
60/// Requiring [`Clone`] lets the configuration `with_*` helpers read a variable, modify a copy, and
61/// re-insert the result.
62pub trait VortexSessionVar: SessionVar {}
63
64impl<V: SessionVar + Clone> VortexSessionVar for V {}
65
66/// The immutable type-map backing a published [`VortexSession`] snapshot.
67type SessionVars = HashMap<TypeId, Arc<dyn VortexSessionVar>, BuildHasherDefault<IdHasher>>;
68
69/// A reference to a session variable of type `V`, returned by [`SessionExt::get`] and
70/// [`SessionExt::get_opt`].
71///
72/// It borrows the session's current snapshot through an [`arc_swap::Guard`], so reads never take a
73/// lock or a full [`Arc`] clone. The guard is tied to the session borrow it was read from, so it is
74/// meant to be used on the stack rather than stored in a long-lived data structure (holding it pins
75/// an internal arc-swap slot, which can contend with concurrent writers). `SessionGuard` derefs to
76/// `V`, so it can be used wherever a `&V` is expected.
77pub struct SessionGuard<'a, V> {
78    snapshot: Guard<Arc<SessionVars>>,
79    _session: PhantomData<&'a VortexSession>,
80    _marker: PhantomData<fn() -> V>,
81}
82
83impl<V: VortexSessionVar> Deref for SessionGuard<'_, V> {
84    type Target = V;
85
86    fn deref(&self) -> &V {
87        // The constructor of `SessionGuard` guarantees the variable is present in `snapshot`.
88        self.snapshot
89            .get(&TypeId::of::<V>())
90            .vortex_expect("SessionGuard invariant: variable present in snapshot")
91            .as_any()
92            .downcast_ref::<V>()
93            .vortex_expect("Type mismatch - this is a bug")
94    }
95}
96
97impl<V: VortexSessionVar> Debug for SessionGuard<'_, V> {
98    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
99        Debug::fmt(&**self, f)
100    }
101}
102
103/// A copy-on-write mutable handle to a session variable of type `V`, returned by
104/// [`SessionExt::get_mut`].
105///
106/// It holds a private clone of the variable and exposes it through [`DerefMut`]. When the handle is
107/// dropped, the (possibly mutated) value is re-inserted into the
108/// session, replacing the previous one. Because the session stores each variable behind a shared
109/// [`Arc`] observed by every clone, handing out a plain `&mut` to the stored value would be unsound;
110/// this guard provides mutable access by cloning on read and publishing on drop instead.
111pub struct SessionMut<'a, V: VortexSessionVar> {
112    session: &'a VortexSession,
113    // `Some` for the whole lifetime of the guard; taken in `drop` to move it back into the session.
114    value: Option<V>,
115}
116
117impl<V: VortexSessionVar> Deref for SessionMut<'_, V> {
118    type Target = V;
119
120    fn deref(&self) -> &V {
121        self.value
122            .as_ref()
123            .vortex_expect("SessionMut invariant: value present until drop")
124    }
125}
126
127impl<V: VortexSessionVar> DerefMut for SessionMut<'_, V> {
128    fn deref_mut(&mut self) -> &mut V {
129        self.value
130            .as_mut()
131            .vortex_expect("SessionMut invariant: value present until drop")
132    }
133}
134
135impl<V: VortexSessionVar> Drop for SessionMut<'_, V> {
136    fn drop(&mut self) {
137        if let Some(value) = self.value.take() {
138            self.session.register(value);
139        }
140    }
141}
142
143impl<V: VortexSessionVar> Debug for SessionMut<'_, V> {
144    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
145        Debug::fmt(&**self, f)
146    }
147}
148
149/// A Vortex session encapsulates the set of extensible arrays, layouts, compute functions,
150/// dtypes, etc. that are available for use in a given context.
151///
152/// It is also the entry-point passed to dynamic libraries to initialize Vortex plugins.
153///
154/// Cloning a session is cheap and shares the backing store: a variable registered through one
155/// clone (via [`VortexSession::with_some`] or one of the `with_*` helpers) is observed by all
156/// clones. To build an *independent* session, start from [`VortexSession::empty`].
157#[derive(Clone)]
158pub struct VortexSession(Arc<ArcSwap<SessionVars>>);
159
160impl Debug for VortexSession {
161    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
162        f.debug_tuple("VortexSession")
163            .field(&self.0.load().as_ref())
164            .finish()
165    }
166}
167
168impl VortexSession {
169    /// Create a new [`VortexSession`] with no session state.
170    pub fn empty() -> Self {
171        Self(Arc::new(ArcSwap::from_pointee(SessionVars::default())))
172    }
173
174    /// Inserts `V::default()` if no variable of type `V` is present yet, copy-on-write.
175    ///
176    /// The default is constructed *outside* the [`ArcSwap::rcu`] closure, so `V::default()` never
177    /// runs under a lock and is never run more than once — it may therefore freely re-enter the
178    /// session (read or even register other variables) without risk of deadlock. The closure only
179    /// clones the map and inserts the already-built value, so if a concurrent writer published a
180    /// value (or the closure is retried under contention) the first-published value is kept and the
181    /// prebuilt default is simply dropped.
182    fn insert_default<V: VortexSessionVar + Default>(&self) {
183        let default: Arc<dyn VortexSessionVar> = Arc::new(V::default());
184        self.0.rcu(|current| {
185            let mut next = SessionVars::clone(current);
186            next.entry(TypeId::of::<V>())
187                .or_insert_with(|| Arc::clone(&default));
188            next
189        });
190    }
191
192    /// Inserts a session variable of type `V`, replacing any existing variable of that type.
193    ///
194    /// This is the internal copy-on-write insert primitive behind [`with_some`](Self::with_some) and
195    /// [`get_mut`](SessionExt::get_mut); it is not public, so a variable can only enter the type-map
196    /// through those (or through a default inserted by [`get`](SessionExt::get)). The mutation is
197    /// applied in place to the shared backing store, so it is visible through every clone.
198    fn register<V: VortexSessionVar>(&self, var: V) {
199        let var: Arc<dyn VortexSessionVar> = Arc::new(var);
200        self.0.rcu(|current| {
201            let mut next = SessionVars::clone(current);
202            next.insert(TypeId::of::<V>(), Arc::clone(&var));
203            next
204        });
205    }
206
207    /// Inserts a new session variable of type `V` with its default value, mutating this session in
208    /// place and returning it for chaining.
209    ///
210    /// The change is applied copy-on-write to the shared backing store, so it is observed through
211    /// every clone of this session.
212    ///
213    /// # Panics
214    ///
215    /// If a variable of that type already exists.
216    pub fn with<V: VortexSessionVar + Default>(self) -> Self {
217        self.with_some(V::default())
218    }
219
220    /// Inserts a new session variable of type `V`, mutating this session in place and returning it
221    /// for chaining.
222    ///
223    /// The change is applied copy-on-write to the shared backing store, so it is observed through
224    /// every clone of this session.
225    ///
226    /// # Panics
227    ///
228    /// If a variable of that type already exists.
229    pub fn with_some<V: VortexSessionVar>(self, var: V) -> Self {
230        if self.get_opt::<V>().is_some() {
231            vortex_panic!(
232                "Session variable of type {} already exists",
233                type_name::<V>()
234            );
235        }
236        self.register(var);
237        self
238    }
239
240    /// Returns whether unknown plugins should deserialize as foreign placeholders.
241    pub fn allows_unknown(&self) -> bool {
242        self.get_opt::<UnknownPluginPolicy>()
243            .is_some_and(|p| p.allow_unknown)
244    }
245
246    /// Allow deserializing unknown plugin IDs as non-executable foreign placeholders.
247    ///
248    /// Mutates this session in place and returns it for chaining.
249    pub fn allow_unknown(self) -> Self {
250        self.get_mut::<UnknownPluginPolicy>().allow_unknown = true;
251        self
252    }
253}
254
255impl SessionExt for VortexSession {
256    fn session(&self) -> VortexSession {
257        self.clone()
258    }
259
260    fn get<V: VortexSessionVar + Default>(&self) -> SessionGuard<'_, V> {
261        if self.get_opt::<V>().is_none() {
262            self.insert_default::<V>();
263        }
264        self.get_opt::<V>()
265            .vortex_expect("variable was just inserted")
266    }
267
268    fn get_opt<V: VortexSessionVar>(&self) -> Option<SessionGuard<'_, V>> {
269        let snapshot = self.0.load();
270        snapshot
271            .contains_key(&TypeId::of::<V>())
272            .then(|| SessionGuard {
273                snapshot,
274                _session: PhantomData,
275                _marker: PhantomData,
276            })
277    }
278
279    fn get_mut<V: VortexSessionVar + Default + Clone>(&self) -> SessionMut<'_, V> {
280        let value = (*self.get::<V>()).clone();
281        SessionMut {
282            session: self,
283            value: Some(value),
284        }
285    }
286}
287
288#[cfg(test)]
289mod tests {
290    use std::any::Any;
291
292    use super::VortexSession;
293    use crate::SessionExt;
294    use crate::SessionVar;
295
296    #[derive(Clone, Debug, Default, PartialEq, Eq)]
297    struct Counter {
298        count: u32,
299    }
300
301    impl SessionVar for Counter {
302        fn as_any(&self) -> &dyn Any {
303            self
304        }
305
306        fn as_any_mut(&mut self) -> &mut dyn Any {
307            self
308        }
309    }
310
311    #[derive(Clone, Debug, Default)]
312    struct Other;
313
314    impl SessionVar for Other {
315        fn as_any(&self) -> &dyn Any {
316            self
317        }
318
319        fn as_any_mut(&mut self) -> &mut dyn Any {
320            self
321        }
322    }
323
324    thread_local! {
325        /// Lets `Reentrant::default` reach back into the session it is being inserted into.
326        static REENTRANT_SESSION: std::cell::RefCell<Option<VortexSession>> =
327            const { std::cell::RefCell::new(None) };
328    }
329
330    #[derive(Clone, Debug)]
331    struct Reentrant {
332        inner: u32,
333    }
334
335    impl Default for Reentrant {
336        fn default() -> Self {
337            // Re-enter the *same* session while this default is being constructed, itself
338            // triggering another default insertion. This is only sound because the default is
339            // built outside the `rcu` closure (no lock held); if it ran under the closure this
340            // would deadlock or recurse forever.
341            REENTRANT_SESSION.with(|s| {
342                if let Some(session) = s.borrow().as_ref() {
343                    drop(session.get::<Counter>());
344                }
345            });
346            Reentrant { inner: 7 }
347        }
348    }
349
350    impl SessionVar for Reentrant {
351        fn as_any(&self) -> &dyn Any {
352            self
353        }
354
355        fn as_any_mut(&mut self) -> &mut dyn Any {
356            self
357        }
358    }
359
360    #[test]
361    fn with_some_round_trip() {
362        let session = VortexSession::empty().with_some(Counter { count: 1 });
363
364        assert_eq!(*session.get::<Counter>(), Counter { count: 1 });
365        assert!(session.get_opt::<Other>().is_none());
366    }
367
368    #[test]
369    fn get_inserts_default() {
370        let session = VortexSession::empty();
371        assert!(session.get_opt::<Counter>().is_none());
372
373        assert_eq!(session.get::<Counter>().count, 0);
374        // The default was published, so it is now observable.
375        assert!(session.get_opt::<Counter>().is_some());
376    }
377
378    #[test]
379    fn register_is_visible_through_clones() {
380        let session = VortexSession::empty();
381        let clone = session.clone();
382
383        session.register(Counter { count: 7 });
384
385        // Registration mutates the shared backing store.
386        assert_eq!(clone.get::<Counter>().count, 7);
387    }
388
389    #[test]
390    fn with_some_mutates_shared_store() {
391        let session = VortexSession::empty();
392        let clone = session.clone();
393
394        let configured = session.with_some(Counter { count: 5 });
395        assert_eq!(configured.get::<Counter>().count, 5);
396
397        // `with_some` mutates the shared backing store in place, so the clone observes it too.
398        assert_eq!(clone.get::<Counter>().count, 5);
399    }
400
401    #[test]
402    fn allow_unknown_mutates_shared_store() {
403        let session = VortexSession::empty();
404        let clone = session.clone();
405        assert!(!clone.allows_unknown());
406
407        session.allow_unknown();
408
409        // The flag is flipped on the shared backing store.
410        assert!(clone.allows_unknown());
411    }
412
413    #[test]
414    fn empty_sessions_are_independent() {
415        // Each `empty()` creates its own backing cell, so separately built sessions do not share
416        // state.
417        let session = VortexSession::empty().with_some(Counter { count: 1 });
418        let other = VortexSession::empty().with_some(Counter { count: 2 });
419
420        session.register(Counter { count: 9 });
421        assert_eq!(session.get::<Counter>().count, 9);
422        assert_eq!(other.get::<Counter>().count, 2);
423    }
424
425    #[test]
426    #[should_panic(expected = "already exists")]
427    fn with_some_duplicate_panics() {
428        VortexSession::empty()
429            .with::<Counter>()
430            .with_some(Counter { count: 1 });
431    }
432
433    #[test]
434    fn allow_unknown_flag_is_opt_in() {
435        let session = VortexSession::empty();
436        assert!(!session.allows_unknown());
437
438        let session = session.allow_unknown();
439        assert!(session.allows_unknown());
440    }
441
442    #[test]
443    fn get_opt_does_not_insert_a_default() {
444        let session = VortexSession::empty();
445
446        // Unlike `get`, `get_opt` is a pure read and never publishes a default.
447        assert!(session.get_opt::<Counter>().is_none());
448        assert!(session.get_opt::<Counter>().is_none());
449    }
450
451    #[test]
452    fn inserting_a_default_while_holding_a_guard_succeeds() {
453        let session = VortexSession::empty().with_some(Counter { count: 5 });
454
455        // Hold a read guard for one variable...
456        let counter = session.get::<Counter>();
457
458        // ...then read a *missing* variable, which inserts its default copy-on-write via `rcu`
459        // while `counter` is still alive. A `SessionGuard` is a lock-free snapshot, not a lock, so
460        // the write proceeds without waiting on the outstanding guard rather than deadlocking.
461        let other = session.get::<Other>();
462
463        // The held guard still observes the snapshot it was read from (which arc-swap keeps alive
464        // for the guard's lifetime), and the freshly read guard sees the just-inserted default.
465        assert_eq!(counter.count, 5);
466        let _: &Other = &other;
467
468        // The inserted default is also observable through a subsequent independent read.
469        assert!(session.get_opt::<Other>().is_some());
470    }
471
472    #[test]
473    fn a_held_guard_keeps_observing_its_own_snapshot_after_a_write() {
474        let session = VortexSession::empty().with_some(Counter { count: 1 });
475
476        let held = session.get::<Counter>();
477        session.register(Counter { count: 2 });
478
479        // The held guard pins the snapshot it was read from, so it still sees the old value, while
480        // a fresh read sees the newly published one.
481        assert_eq!(held.count, 1);
482        assert_eq!(session.get::<Counter>().count, 2);
483    }
484
485    #[test]
486    fn default_insertion_may_reenter_the_session_without_deadlocking() {
487        let session = VortexSession::empty();
488        REENTRANT_SESSION.with(|s| *s.borrow_mut() = Some(session.clone()));
489
490        // `get::<Reentrant>` inserts a default; building that default re-enters the same session
491        // via `get::<Counter>` (another default insertion). Because each default is constructed
492        // outside the `rcu` closure, both inserts complete instead of deadlocking.
493        assert_eq!(session.get::<Reentrant>().inner, 7);
494        assert!(session.get_opt::<Reentrant>().is_some());
495        assert!(session.get_opt::<Counter>().is_some());
496
497        REENTRANT_SESSION.with(|s| *s.borrow_mut() = None);
498    }
499
500    #[test]
501    fn get_mut_publishes_on_drop() {
502        let session = VortexSession::empty();
503        session.register(Counter { count: 1 });
504
505        session.get_mut::<Counter>().count = 42;
506
507        assert_eq!(session.get::<Counter>().count, 42);
508    }
509
510    #[test]
511    fn get_mut_inserts_default_then_mutates() {
512        let session = VortexSession::empty();
513        assert!(session.get_opt::<Counter>().is_none());
514
515        session.get_mut::<Counter>().count += 5;
516
517        assert_eq!(session.get::<Counter>().count, 5);
518    }
519
520    #[test]
521    fn get_mut_mutation_is_visible_through_clones() {
522        let session = VortexSession::empty().with_some(Counter { count: 1 });
523        let clone = session.clone();
524
525        session.get_mut::<Counter>().count = 9;
526
527        // The mutated value is published copy-on-write to the shared backing store.
528        assert_eq!(clone.get::<Counter>().count, 9);
529    }
530}