vortex_session/session.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! The [`VortexSession`] container.
5//!
6//! A [`VortexSession`] is a type-map of [`VortexSessionVar`]s keyed by [`TypeId`]. It is backed by
7//! an [`ArcSwap`], giving lock-free reads and copy-on-write writes:
8//!
9//! * **Reads** ([`SessionExt::get`], [`SessionExt::get_opt`]) load the current snapshot
10//! without taking any lock and hand back a [`SessionGuard`] that derefs to the variable. Because a
11//! read never takes a lock, it can never deadlock, and because it never holds a lock across the
12//! returned reference there is no reader/writer contention.
13//!
14//! * **Writes** ([`VortexSession::with_some`], [`SessionExt::get`] on a missing default) are
15//! copy-on-write: the map is cloned, the change applied to the private copy, and the new map
16//! atomically published via [`ArcSwap::rcu`]. The closure passed to `rcu` only clones the map and
17//! inserts an already-constructed value, so no user code (in particular, no `Default::default`
18//! implementation) ever runs while a lock is held. This is the key difference from the previous
19//! `DashMap`-backed session, where `entry().or_insert_with(f)` ran `f` while holding the shard's
20//! write lock and could deadlock if `f` re-entered the session.
21//!
22//! A modified session is produced by mutating it **in place**: [`VortexSession::with_some`] — and
23//! the configuration `with_*` helpers built on it — apply their change copy-on-write to the shared
24//! backing cell. Clones of a session share that cell, so a variable registered through one clone
25//! (or one `with_*` call) is visible to all of them. This is what late plugin/encoding registration
26//! relies on.
27//!
28//! To build a session from scratch, start from [`VortexSession::empty`] and chain the `with_*`
29//! helpers. Each [`empty`](VortexSession::empty) creates its own backing cell, so a session built
30//! this way is independent of any other.
31
32use std::any::TypeId;
33use std::any::type_name;
34use std::fmt::Debug;
35use std::fmt::Formatter;
36use std::hash::BuildHasherDefault;
37use std::marker::PhantomData;
38use std::ops::Deref;
39use std::ops::DerefMut;
40use std::sync::Arc;
41
42use arc_swap::ArcSwap;
43use arc_swap::Guard;
44use vortex_error::VortexExpect;
45use vortex_error::vortex_panic;
46use vortex_utils::aliases::hash_map::HashMap;
47
48use crate::IdHasher;
49use crate::SessionExt;
50use crate::SessionVar;
51use crate::UnknownPluginPolicy;
52
53/// A [`SessionVar`] that can be stored in a [`VortexSession`].
54///
55/// This trait is implemented automatically for every [`SessionVar`] that is also [`Clone`], so
56/// types opt in by implementing [`Clone`] rather than implementing this trait directly. The trait
57/// itself stays object-safe (so it can be stored as `Arc<dyn VortexSessionVar>`); the [`Clone`]
58/// bound lives on the blanket impl rather than the trait.
59///
60/// Requiring [`Clone`] lets the configuration `with_*` helpers read a variable, modify a copy, and
61/// re-insert the result.
62pub trait VortexSessionVar: SessionVar {}
63
64impl<V: SessionVar + Clone> VortexSessionVar for V {}
65
66/// The immutable type-map backing a published [`VortexSession`] snapshot.
67type SessionVars = HashMap<TypeId, Arc<dyn VortexSessionVar>, BuildHasherDefault<IdHasher>>;
68
69/// A reference to a session variable of type `V`, returned by [`SessionExt::get`] and
70/// [`SessionExt::get_opt`].
71///
72/// It borrows the session's current snapshot through an [`arc_swap::Guard`], so reads never take a
73/// lock or a full [`Arc`] clone. The guard is tied to the session borrow it was read from, so it is
74/// meant to be used on the stack rather than stored in a long-lived data structure (holding it pins
75/// an internal arc-swap slot, which can contend with concurrent writers). `SessionGuard` derefs to
76/// `V`, so it can be used wherever a `&V` is expected.
77pub struct SessionGuard<'a, V> {
78 snapshot: Guard<Arc<SessionVars>>,
79 _session: PhantomData<&'a VortexSession>,
80 _marker: PhantomData<fn() -> V>,
81}
82
83impl<V: VortexSessionVar> Deref for SessionGuard<'_, V> {
84 type Target = V;
85
86 fn deref(&self) -> &V {
87 // The constructor of `SessionGuard` guarantees the variable is present in `snapshot`.
88 self.snapshot
89 .get(&TypeId::of::<V>())
90 .vortex_expect("SessionGuard invariant: variable present in snapshot")
91 .as_any()
92 .downcast_ref::<V>()
93 .vortex_expect("Type mismatch - this is a bug")
94 }
95}
96
97impl<V: VortexSessionVar> Debug for SessionGuard<'_, V> {
98 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
99 Debug::fmt(&**self, f)
100 }
101}
102
103/// A copy-on-write mutable handle to a session variable of type `V`, returned by
104/// [`SessionExt::get_mut`].
105///
106/// It holds a private clone of the variable and exposes it through [`DerefMut`]. When the handle is
107/// dropped, the (possibly mutated) value is re-inserted into the
108/// session, replacing the previous one. Because the session stores each variable behind a shared
109/// [`Arc`] observed by every clone, handing out a plain `&mut` to the stored value would be unsound;
110/// this guard provides mutable access by cloning on read and publishing on drop instead.
111pub struct SessionMut<'a, V: VortexSessionVar> {
112 session: &'a VortexSession,
113 // `Some` for the whole lifetime of the guard; taken in `drop` to move it back into the session.
114 value: Option<V>,
115}
116
117impl<V: VortexSessionVar> Deref for SessionMut<'_, V> {
118 type Target = V;
119
120 fn deref(&self) -> &V {
121 self.value
122 .as_ref()
123 .vortex_expect("SessionMut invariant: value present until drop")
124 }
125}
126
127impl<V: VortexSessionVar> DerefMut for SessionMut<'_, V> {
128 fn deref_mut(&mut self) -> &mut V {
129 self.value
130 .as_mut()
131 .vortex_expect("SessionMut invariant: value present until drop")
132 }
133}
134
135impl<V: VortexSessionVar> Drop for SessionMut<'_, V> {
136 fn drop(&mut self) {
137 if let Some(value) = self.value.take() {
138 self.session.register(value);
139 }
140 }
141}
142
143impl<V: VortexSessionVar> Debug for SessionMut<'_, V> {
144 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
145 Debug::fmt(&**self, f)
146 }
147}
148
149/// A Vortex session encapsulates the set of extensible arrays, layouts, compute functions,
150/// dtypes, etc. that are available for use in a given context.
151///
152/// It is also the entry-point passed to dynamic libraries to initialize Vortex plugins.
153///
154/// Cloning a session is cheap and shares the backing store: a variable registered through one
155/// clone (via [`VortexSession::with_some`] or one of the `with_*` helpers) is observed by all
156/// clones. To build an *independent* session, start from [`VortexSession::empty`].
157#[derive(Clone)]
158pub struct VortexSession(Arc<ArcSwap<SessionVars>>);
159
160impl Debug for VortexSession {
161 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
162 f.debug_tuple("VortexSession")
163 .field(&self.0.load().as_ref())
164 .finish()
165 }
166}
167
168impl VortexSession {
169 /// Create a new [`VortexSession`] with no session state.
170 pub fn empty() -> Self {
171 Self(Arc::new(ArcSwap::from_pointee(SessionVars::default())))
172 }
173
174 /// Inserts `V::default()` if no variable of type `V` is present yet, copy-on-write.
175 ///
176 /// The default is constructed *outside* the [`ArcSwap::rcu`] closure, so `V::default()` never
177 /// runs under a lock and is never run more than once — it may therefore freely re-enter the
178 /// session (read or even register other variables) without risk of deadlock. The closure only
179 /// clones the map and inserts the already-built value, so if a concurrent writer published a
180 /// value (or the closure is retried under contention) the first-published value is kept and the
181 /// prebuilt default is simply dropped.
182 fn insert_default<V: VortexSessionVar + Default>(&self) {
183 let default: Arc<dyn VortexSessionVar> = Arc::new(V::default());
184 self.0.rcu(|current| {
185 let mut next = SessionVars::clone(current);
186 next.entry(TypeId::of::<V>())
187 .or_insert_with(|| Arc::clone(&default));
188 next
189 });
190 }
191
192 /// Inserts a session variable of type `V`, replacing any existing variable of that type.
193 ///
194 /// This is the internal copy-on-write insert primitive behind [`with_some`](Self::with_some) and
195 /// [`get_mut`](SessionExt::get_mut); it is not public, so a variable can only enter the type-map
196 /// through those (or through a default inserted by [`get`](SessionExt::get)). The mutation is
197 /// applied in place to the shared backing store, so it is visible through every clone.
198 fn register<V: VortexSessionVar>(&self, var: V) {
199 let var: Arc<dyn VortexSessionVar> = Arc::new(var);
200 self.0.rcu(|current| {
201 let mut next = SessionVars::clone(current);
202 next.insert(TypeId::of::<V>(), Arc::clone(&var));
203 next
204 });
205 }
206
207 /// Inserts a new session variable of type `V` with its default value, mutating this session in
208 /// place and returning it for chaining.
209 ///
210 /// The change is applied copy-on-write to the shared backing store, so it is observed through
211 /// every clone of this session.
212 ///
213 /// # Panics
214 ///
215 /// If a variable of that type already exists.
216 pub fn with<V: VortexSessionVar + Default>(self) -> Self {
217 self.with_some(V::default())
218 }
219
220 /// Inserts a new session variable of type `V`, mutating this session in place and returning it
221 /// for chaining.
222 ///
223 /// The change is applied copy-on-write to the shared backing store, so it is observed through
224 /// every clone of this session.
225 ///
226 /// # Panics
227 ///
228 /// If a variable of that type already exists.
229 pub fn with_some<V: VortexSessionVar>(self, var: V) -> Self {
230 if self.get_opt::<V>().is_some() {
231 vortex_panic!(
232 "Session variable of type {} already exists",
233 type_name::<V>()
234 );
235 }
236 self.register(var);
237 self
238 }
239
240 /// Returns whether unknown plugins should deserialize as foreign placeholders.
241 pub fn allows_unknown(&self) -> bool {
242 self.get_opt::<UnknownPluginPolicy>()
243 .is_some_and(|p| p.allow_unknown)
244 }
245
246 /// Allow deserializing unknown plugin IDs as non-executable foreign placeholders.
247 ///
248 /// Mutates this session in place and returns it for chaining.
249 pub fn allow_unknown(self) -> Self {
250 self.get_mut::<UnknownPluginPolicy>().allow_unknown = true;
251 self
252 }
253}
254
255impl SessionExt for VortexSession {
256 fn session(&self) -> VortexSession {
257 self.clone()
258 }
259
260 fn get<V: VortexSessionVar + Default>(&self) -> SessionGuard<'_, V> {
261 if self.get_opt::<V>().is_none() {
262 self.insert_default::<V>();
263 }
264 self.get_opt::<V>()
265 .vortex_expect("variable was just inserted")
266 }
267
268 fn get_opt<V: VortexSessionVar>(&self) -> Option<SessionGuard<'_, V>> {
269 let snapshot = self.0.load();
270 snapshot
271 .contains_key(&TypeId::of::<V>())
272 .then(|| SessionGuard {
273 snapshot,
274 _session: PhantomData,
275 _marker: PhantomData,
276 })
277 }
278
279 fn get_mut<V: VortexSessionVar + Default + Clone>(&self) -> SessionMut<'_, V> {
280 let value = (*self.get::<V>()).clone();
281 SessionMut {
282 session: self,
283 value: Some(value),
284 }
285 }
286}
287
288#[cfg(test)]
289mod tests {
290 use std::any::Any;
291
292 use super::VortexSession;
293 use crate::SessionExt;
294 use crate::SessionVar;
295
296 #[derive(Clone, Debug, Default, PartialEq, Eq)]
297 struct Counter {
298 count: u32,
299 }
300
301 impl SessionVar for Counter {
302 fn as_any(&self) -> &dyn Any {
303 self
304 }
305
306 fn as_any_mut(&mut self) -> &mut dyn Any {
307 self
308 }
309 }
310
311 #[derive(Clone, Debug, Default)]
312 struct Other;
313
314 impl SessionVar for Other {
315 fn as_any(&self) -> &dyn Any {
316 self
317 }
318
319 fn as_any_mut(&mut self) -> &mut dyn Any {
320 self
321 }
322 }
323
324 thread_local! {
325 /// Lets `Reentrant::default` reach back into the session it is being inserted into.
326 static REENTRANT_SESSION: std::cell::RefCell<Option<VortexSession>> =
327 const { std::cell::RefCell::new(None) };
328 }
329
330 #[derive(Clone, Debug)]
331 struct Reentrant {
332 inner: u32,
333 }
334
335 impl Default for Reentrant {
336 fn default() -> Self {
337 // Re-enter the *same* session while this default is being constructed, itself
338 // triggering another default insertion. This is only sound because the default is
339 // built outside the `rcu` closure (no lock held); if it ran under the closure this
340 // would deadlock or recurse forever.
341 REENTRANT_SESSION.with(|s| {
342 if let Some(session) = s.borrow().as_ref() {
343 drop(session.get::<Counter>());
344 }
345 });
346 Reentrant { inner: 7 }
347 }
348 }
349
350 impl SessionVar for Reentrant {
351 fn as_any(&self) -> &dyn Any {
352 self
353 }
354
355 fn as_any_mut(&mut self) -> &mut dyn Any {
356 self
357 }
358 }
359
360 #[test]
361 fn with_some_round_trip() {
362 let session = VortexSession::empty().with_some(Counter { count: 1 });
363
364 assert_eq!(*session.get::<Counter>(), Counter { count: 1 });
365 assert!(session.get_opt::<Other>().is_none());
366 }
367
368 #[test]
369 fn get_inserts_default() {
370 let session = VortexSession::empty();
371 assert!(session.get_opt::<Counter>().is_none());
372
373 assert_eq!(session.get::<Counter>().count, 0);
374 // The default was published, so it is now observable.
375 assert!(session.get_opt::<Counter>().is_some());
376 }
377
378 #[test]
379 fn register_is_visible_through_clones() {
380 let session = VortexSession::empty();
381 let clone = session.clone();
382
383 session.register(Counter { count: 7 });
384
385 // Registration mutates the shared backing store.
386 assert_eq!(clone.get::<Counter>().count, 7);
387 }
388
389 #[test]
390 fn with_some_mutates_shared_store() {
391 let session = VortexSession::empty();
392 let clone = session.clone();
393
394 let configured = session.with_some(Counter { count: 5 });
395 assert_eq!(configured.get::<Counter>().count, 5);
396
397 // `with_some` mutates the shared backing store in place, so the clone observes it too.
398 assert_eq!(clone.get::<Counter>().count, 5);
399 }
400
401 #[test]
402 fn allow_unknown_mutates_shared_store() {
403 let session = VortexSession::empty();
404 let clone = session.clone();
405 assert!(!clone.allows_unknown());
406
407 session.allow_unknown();
408
409 // The flag is flipped on the shared backing store.
410 assert!(clone.allows_unknown());
411 }
412
413 #[test]
414 fn empty_sessions_are_independent() {
415 // Each `empty()` creates its own backing cell, so separately built sessions do not share
416 // state.
417 let session = VortexSession::empty().with_some(Counter { count: 1 });
418 let other = VortexSession::empty().with_some(Counter { count: 2 });
419
420 session.register(Counter { count: 9 });
421 assert_eq!(session.get::<Counter>().count, 9);
422 assert_eq!(other.get::<Counter>().count, 2);
423 }
424
425 #[test]
426 #[should_panic(expected = "already exists")]
427 fn with_some_duplicate_panics() {
428 VortexSession::empty()
429 .with::<Counter>()
430 .with_some(Counter { count: 1 });
431 }
432
433 #[test]
434 fn allow_unknown_flag_is_opt_in() {
435 let session = VortexSession::empty();
436 assert!(!session.allows_unknown());
437
438 let session = session.allow_unknown();
439 assert!(session.allows_unknown());
440 }
441
442 #[test]
443 fn get_opt_does_not_insert_a_default() {
444 let session = VortexSession::empty();
445
446 // Unlike `get`, `get_opt` is a pure read and never publishes a default.
447 assert!(session.get_opt::<Counter>().is_none());
448 assert!(session.get_opt::<Counter>().is_none());
449 }
450
451 #[test]
452 fn inserting_a_default_while_holding_a_guard_succeeds() {
453 let session = VortexSession::empty().with_some(Counter { count: 5 });
454
455 // Hold a read guard for one variable...
456 let counter = session.get::<Counter>();
457
458 // ...then read a *missing* variable, which inserts its default copy-on-write via `rcu`
459 // while `counter` is still alive. A `SessionGuard` is a lock-free snapshot, not a lock, so
460 // the write proceeds without waiting on the outstanding guard rather than deadlocking.
461 let other = session.get::<Other>();
462
463 // The held guard still observes the snapshot it was read from (which arc-swap keeps alive
464 // for the guard's lifetime), and the freshly read guard sees the just-inserted default.
465 assert_eq!(counter.count, 5);
466 let _: &Other = &other;
467
468 // The inserted default is also observable through a subsequent independent read.
469 assert!(session.get_opt::<Other>().is_some());
470 }
471
472 #[test]
473 fn a_held_guard_keeps_observing_its_own_snapshot_after_a_write() {
474 let session = VortexSession::empty().with_some(Counter { count: 1 });
475
476 let held = session.get::<Counter>();
477 session.register(Counter { count: 2 });
478
479 // The held guard pins the snapshot it was read from, so it still sees the old value, while
480 // a fresh read sees the newly published one.
481 assert_eq!(held.count, 1);
482 assert_eq!(session.get::<Counter>().count, 2);
483 }
484
485 #[test]
486 fn default_insertion_may_reenter_the_session_without_deadlocking() {
487 let session = VortexSession::empty();
488 REENTRANT_SESSION.with(|s| *s.borrow_mut() = Some(session.clone()));
489
490 // `get::<Reentrant>` inserts a default; building that default re-enters the same session
491 // via `get::<Counter>` (another default insertion). Because each default is constructed
492 // outside the `rcu` closure, both inserts complete instead of deadlocking.
493 assert_eq!(session.get::<Reentrant>().inner, 7);
494 assert!(session.get_opt::<Reentrant>().is_some());
495 assert!(session.get_opt::<Counter>().is_some());
496
497 REENTRANT_SESSION.with(|s| *s.borrow_mut() = None);
498 }
499
500 #[test]
501 fn get_mut_publishes_on_drop() {
502 let session = VortexSession::empty();
503 session.register(Counter { count: 1 });
504
505 session.get_mut::<Counter>().count = 42;
506
507 assert_eq!(session.get::<Counter>().count, 42);
508 }
509
510 #[test]
511 fn get_mut_inserts_default_then_mutates() {
512 let session = VortexSession::empty();
513 assert!(session.get_opt::<Counter>().is_none());
514
515 session.get_mut::<Counter>().count += 5;
516
517 assert_eq!(session.get::<Counter>().count, 5);
518 }
519
520 #[test]
521 fn get_mut_mutation_is_visible_through_clones() {
522 let session = VortexSession::empty().with_some(Counter { count: 1 });
523 let clone = session.clone();
524
525 session.get_mut::<Counter>().count = 9;
526
527 // The mutated value is published copy-on-write to the shared backing store.
528 assert_eq!(clone.get::<Counter>().count, 9);
529 }
530}