Skip to main content

shape_runtime/type_schema/
current.rs

1//! Task-local "current" `TypeSchemaRegistry` handle.
2//!
3//! This module exposes an ambient registry that is pushed by `Runtime`'s
4//! execution entry points and consumed by free functions that previously
5//! reached for the process-global `STDLIB_SCHEMA_REGISTRY` / `NEXT_SCHEMA_ID`
6//! statics. The scoping is three-layered to cover both async and synchronous
7//! entry points, with a process-wide default as the final fallback:
8//!
9//! 1. **Task-local** (`CURRENT_SCHEMA_REGISTRY`) — survives task migration
10//!    across tokio worker threads, so any descendant `.await` in a Shape
11//!    execution future inherits the registry automatically.
12//! 2. **Thread-local** (`SYNC_CURRENT_SCHEMA_REGISTRY`) — fallback for
13//!    synchronous entry points (CLI, tests, REPL one-shots) that are not
14//!    running under a tokio task. A [`SyncRegistryScope`] RAII guard
15//!    pushes/pops the value so nested Runtimes on one thread compose
16//!    correctly.
17//! 3. **Process-wide default** (`DEFAULT_SCHEMA_REGISTRY`) — a single
18//!    stdlib-populated registry shared by scopeless callers. Preserves the
19//!    pre-B1.7 semantic that every caller sees *some* registry: ad-hoc
20//!    tooling, static initialisers, and unit tests that don't install a
21//!    scope all share this handle instead of being forced to panic or
22//!    observe `None`.
23//!
24//! Mirrors the B5 `shape_value::shape_graph_current::DEFAULT_SHAPE_TABLE`
25//! pattern that retired the legacy `GLOBAL_SHAPE_TABLE` static.
26//!
27//! Lookup order: task-local → thread-local → process default. Both
28//! [`current_registry`] and [`try_current_registry`] always return a
29//! usable handle; callers never need to branch on `None` or on a panic
30//! any more. Scoped callers (installed by `Runtime`) still get per-VM
31//! isolation, which was the original B1 goal.
32
33use super::TypeSchemaRegistry;
34use std::cell::RefCell;
35use std::future::Future;
36use std::sync::{Arc, LazyLock};
37
38tokio::task_local! {
39    /// Task-local current registry. Set by the `with_async_scope` helper
40    /// around any async execution entry. Inherited by all descendant
41    /// `.await`s of that future.
42    static CURRENT_SCHEMA_REGISTRY: Arc<TypeSchemaRegistry>;
43}
44
45thread_local! {
46    /// Synchronous fallback. Managed exclusively by [`SyncRegistryScope`]
47    /// for push/pop semantics — callers should not touch this directly.
48    static SYNC_CURRENT_SCHEMA_REGISTRY: RefCell<Option<Arc<TypeSchemaRegistry>>> =
49        const { RefCell::new(None) };
50}
51
52/// Process-wide default registry used when neither a task-local nor a
53/// thread-local scope is active.
54///
55/// Callers that poke `lookup_schema_for_fields` /
56/// `register_predeclared_any_schema` directly from a stdlib helper or a
57/// unit test — without a `Runtime::enter_schema_scope` — historically
58/// relied on the pre-B1.7 `STDLIB_SCHEMA_REGISTRY` /
59/// `FALLBACK_PREDECLARED_REGISTRY` statics always being available. This
60/// fallback preserves that semantic: scopeless callers share one
61/// isolated-per-process registry instead of panicking or getting `None`.
62/// Scoped callers (Runtime-installed) still get per-VM isolation.
63///
64/// The registry is seeded with the canonical stdlib types
65/// (Row / Option / Result / builtin fixed-layout) via
66/// [`TypeSchemaRegistry::new_with_stdlib`] so predeclared-schema
67/// resolution can match against the same stdlib surface that scoped
68/// registries expose.
69static DEFAULT_SCHEMA_REGISTRY: LazyLock<Arc<TypeSchemaRegistry>> =
70    LazyLock::new(|| Arc::new(TypeSchemaRegistry::new_with_stdlib()));
71
72/// Return the process-wide default schema-registry handle.
73///
74/// Exposed so downstream crates and tests that want to explicitly mirror
75/// a schema into the default registry (for example, snapshot-decoding
76/// tooling that runs outside any Runtime scope) can do so without
77/// constructing a fresh registry and losing shared predeclared caches.
78pub fn default_registry() -> Arc<TypeSchemaRegistry> {
79    DEFAULT_SCHEMA_REGISTRY.clone()
80}
81
82/// RAII guard that installs a registry on the thread-local slot for the
83/// lifetime of the guard.
84///
85/// The previous value (if any) is captured on construction and restored on
86/// drop, so nested scopes compose correctly. Used by synchronous Runtime
87/// entry points (CLI, unit tests, REPL one-shot) that are not running under
88/// a tokio task.
89///
90/// ```ignore
91/// let _scope = SyncRegistryScope::enter(runtime.schema_registry_arc());
92/// // ... invoke any code that calls current_registry() ...
93/// // scope restores previous value on drop
94/// ```
95#[must_use = "the scope only lives as long as the guard is held"]
96pub struct SyncRegistryScope {
97    prev: Option<Arc<TypeSchemaRegistry>>,
98}
99
100impl SyncRegistryScope {
101    /// Install `registry` as the current thread-local registry, saving the
102    /// previous value for restoration on drop.
103    pub fn enter(registry: Arc<TypeSchemaRegistry>) -> Self {
104        let prev = SYNC_CURRENT_SCHEMA_REGISTRY
105            .with(|cell| cell.borrow_mut().replace(registry));
106        Self { prev }
107    }
108}
109
110impl Drop for SyncRegistryScope {
111    fn drop(&mut self) {
112        SYNC_CURRENT_SCHEMA_REGISTRY.with(|cell| {
113            *cell.borrow_mut() = self.prev.take();
114        });
115    }
116}
117
118/// Return a handle to the current ambient `TypeSchemaRegistry`.
119///
120/// Lookup order: task-local → thread-local → process-wide default. This
121/// function never panics and always returns a usable registry. Scoped
122/// callers get their own per-Runtime registry; scopeless callers share
123/// the process-wide default seeded with canonical stdlib types.
124pub fn current_registry() -> Arc<TypeSchemaRegistry> {
125    if let Ok(r) = CURRENT_SCHEMA_REGISTRY.try_with(|r| r.clone()) {
126        return r;
127    }
128    if let Some(r) = SYNC_CURRENT_SCHEMA_REGISTRY.with(|cell| cell.borrow().clone()) {
129        return r;
130    }
131    DEFAULT_SCHEMA_REGISTRY.clone()
132}
133
134/// Alias for [`current_registry`] returning `Option` for historical API
135/// compatibility.
136///
137/// Returns `Some(registry)` unconditionally — the process-wide default is
138/// always available. Retained so pre-B1.7 call sites that matched on
139/// `Option` compile without churn; prefer [`current_registry`] in new code.
140pub fn try_current_registry() -> Option<Arc<TypeSchemaRegistry>> {
141    Some(current_registry())
142}
143
144/// Run `fut` with `registry` installed as the task-local current registry.
145///
146/// The installed registry is inherited by all descendant `.await` points of
147/// `fut` and survives tokio task migration across worker threads.
148pub async fn with_async_scope<R>(
149    registry: Arc<TypeSchemaRegistry>,
150    fut: impl Future<Output = R>,
151) -> R {
152    CURRENT_SCHEMA_REGISTRY.scope(registry, fut).await
153}
154
155/// Test-only helper: construct a default scope with a fresh
156/// stdlib-populated registry. Held by the returned guard; drop it to
157/// restore the previous value.
158///
159/// Prefer this in unit tests that indirectly touch `current_registry()` so
160/// they don't all need to thread a registry manually.
161#[cfg(test)]
162pub(crate) fn test_runtime_scope() -> SyncRegistryScope {
163    let registry = Arc::new(TypeSchemaRegistry::new_with_stdlib());
164    SyncRegistryScope::enter(registry)
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170    use crate::type_schema::FieldType;
171
172    #[test]
173    fn sync_scope_push_pop_restores_previous() {
174        let r1 = Arc::new(TypeSchemaRegistry::new_with_stdlib());
175        let r2 = Arc::new(TypeSchemaRegistry::new_with_stdlib());
176
177        // Baseline is the process-wide default handle (B1.7: no more None).
178        let baseline = current_registry();
179        assert!(Arc::ptr_eq(&baseline, &DEFAULT_SCHEMA_REGISTRY));
180
181        let outer = SyncRegistryScope::enter(r1.clone());
182        assert!(Arc::ptr_eq(&current_registry(), &r1));
183
184        {
185            let inner = SyncRegistryScope::enter(r2.clone());
186            assert!(Arc::ptr_eq(&current_registry(), &r2));
187            drop(inner);
188        }
189
190        // r1 restored after inner drop
191        assert!(Arc::ptr_eq(&current_registry(), &r1));
192        drop(outer);
193
194        // Default fallback visible again after outer drop.
195        assert!(Arc::ptr_eq(&current_registry(), &baseline));
196    }
197
198    #[test]
199    fn current_registry_falls_back_to_process_default_without_scope() {
200        // On a fresh thread with no installed scope, the process-wide
201        // default handle is returned so scopeless stdlib / unit-test
202        // callers retain pre-B1.7 predeclared-schema semantics.
203        let first = current_registry();
204        let second = current_registry();
205        assert!(Arc::ptr_eq(&first, &second));
206        // The default is a populated stdlib registry.
207        assert!(first.has_type("Row"));
208        assert!(first.has_type("Option"));
209        assert!(first.has_type("Result"));
210    }
211
212    #[test]
213    fn test_runtime_scope_installs_stdlib_registry() {
214        let _guard = test_runtime_scope();
215        let reg = current_registry();
216        assert!(reg.has_type("Row"));
217        assert!(reg.has_type("Option"));
218        assert!(reg.has_type("Result"));
219    }
220
221    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
222    async fn async_scope_survives_task_migration() {
223        let registry = Arc::new(TypeSchemaRegistry::new_with_stdlib());
224        let expected_id = registry.clone();
225
226        with_async_scope(registry, async move {
227            // Force the task to yield so the scheduler may migrate us.
228            tokio::task::yield_now().await;
229
230            let observed = current_registry();
231            assert!(Arc::ptr_eq(&observed, &expected_id));
232
233            // And nested scopes compose normally.
234            let mut inner = TypeSchemaRegistry::new();
235            inner.register_type("Inner", vec![("n".to_string(), FieldType::F64)]);
236            let inner = Arc::new(inner);
237
238            with_async_scope(inner.clone(), async {
239                tokio::task::yield_now().await;
240                assert!(Arc::ptr_eq(&current_registry(), &inner));
241            })
242            .await;
243
244            // Outer scope restored.
245            assert!(Arc::ptr_eq(&current_registry(), &expected_id));
246        })
247        .await;
248    }
249
250    #[test]
251    fn task_local_takes_precedence_over_thread_local() {
252        // Build a single-thread runtime so we stay on this test's thread
253        // and can observe the thread-local being visible from inside.
254        let rt = tokio::runtime::Builder::new_current_thread()
255            .build()
256            .expect("current-thread runtime");
257
258        let sync_reg = Arc::new(TypeSchemaRegistry::new_with_stdlib());
259        let async_reg = Arc::new(TypeSchemaRegistry::new_with_stdlib());
260
261        let _guard = SyncRegistryScope::enter(sync_reg.clone());
262        // Without an async scope, the sync one wins.
263        assert!(Arc::ptr_eq(&current_registry(), &sync_reg));
264
265        rt.block_on(async {
266            // Task-local overrides thread-local while it's active.
267            with_async_scope(async_reg.clone(), async {
268                assert!(Arc::ptr_eq(&current_registry(), &async_reg));
269            })
270            .await;
271        });
272
273        // After the async scope ends, the thread-local is visible again.
274        assert!(Arc::ptr_eq(&current_registry(), &sync_reg));
275    }
276}