shape_runtime/type_schema/current.rs
1//! Task-local "current" `TypeSchemaRegistry` handle.
2//!
3//! This module exposes an ambient registry that is pushed by `Runtime`'s
4//! execution entry points and consumed by free functions that previously
5//! reached for the process-global `STDLIB_SCHEMA_REGISTRY` / `NEXT_SCHEMA_ID`
6//! statics. The scoping is three-layered to cover both async and synchronous
7//! entry points, with a process-wide default as the final fallback:
8//!
9//! 1. **Task-local** (`CURRENT_SCHEMA_REGISTRY`) — survives task migration
10//! across tokio worker threads, so any descendant `.await` in a Shape
11//! execution future inherits the registry automatically.
12//! 2. **Thread-local** (`SYNC_CURRENT_SCHEMA_REGISTRY`) — fallback for
13//! synchronous entry points (CLI, tests, REPL one-shots) that are not
14//! running under a tokio task. A [`SyncRegistryScope`] RAII guard
15//! pushes/pops the value so nested Runtimes on one thread compose
16//! correctly.
17//! 3. **Process-wide default** (`DEFAULT_SCHEMA_REGISTRY`) — a single
18//! stdlib-populated registry shared by scopeless callers. Preserves the
19//! pre-B1.7 semantic that every caller sees *some* registry: ad-hoc
20//! tooling, static initialisers, and unit tests that don't install a
21//! scope all share this handle instead of being forced to panic or
22//! observe `None`.
23//!
24//! Mirrors the B5 `shape_value::shape_graph_current::DEFAULT_SHAPE_TABLE`
25//! pattern that retired the legacy `GLOBAL_SHAPE_TABLE` static.
26//!
27//! Lookup order: task-local → thread-local → process default. Both
28//! [`current_registry`] and [`try_current_registry`] always return a
29//! usable handle; callers never need to branch on `None` or on a panic
30//! any more. Scoped callers (installed by `Runtime`) still get per-VM
31//! isolation, which was the original B1 goal.
32
33use super::TypeSchemaRegistry;
34use std::cell::RefCell;
35use std::future::Future;
36use std::sync::{Arc, LazyLock};
37
38tokio::task_local! {
39 /// Task-local current registry. Set by the `with_async_scope` helper
40 /// around any async execution entry. Inherited by all descendant
41 /// `.await`s of that future.
42 static CURRENT_SCHEMA_REGISTRY: Arc<TypeSchemaRegistry>;
43}
44
45thread_local! {
46 /// Synchronous fallback. Managed exclusively by [`SyncRegistryScope`]
47 /// for push/pop semantics — callers should not touch this directly.
48 static SYNC_CURRENT_SCHEMA_REGISTRY: RefCell<Option<Arc<TypeSchemaRegistry>>> =
49 const { RefCell::new(None) };
50}
51
52/// Process-wide default registry used when neither a task-local nor a
53/// thread-local scope is active.
54///
55/// Callers that poke `lookup_schema_for_fields` /
56/// `register_predeclared_any_schema` directly from a stdlib helper or a
57/// unit test — without a `Runtime::enter_schema_scope` — historically
58/// relied on the pre-B1.7 `STDLIB_SCHEMA_REGISTRY` /
59/// `FALLBACK_PREDECLARED_REGISTRY` statics always being available. This
60/// fallback preserves that semantic: scopeless callers share one
61/// isolated-per-process registry instead of panicking or getting `None`.
62/// Scoped callers (Runtime-installed) still get per-VM isolation.
63///
64/// The registry is seeded with the canonical stdlib types
65/// (Row / Option / Result / builtin fixed-layout) via
66/// [`TypeSchemaRegistry::new_with_stdlib`] so predeclared-schema
67/// resolution can match against the same stdlib surface that scoped
68/// registries expose.
69static DEFAULT_SCHEMA_REGISTRY: LazyLock<Arc<TypeSchemaRegistry>> =
70 LazyLock::new(|| Arc::new(TypeSchemaRegistry::new_with_stdlib()));
71
72/// Return the process-wide default schema-registry handle.
73///
74/// Exposed so downstream crates and tests that want to explicitly mirror
75/// a schema into the default registry (for example, snapshot-decoding
76/// tooling that runs outside any Runtime scope) can do so without
77/// constructing a fresh registry and losing shared predeclared caches.
78pub fn default_registry() -> Arc<TypeSchemaRegistry> {
79 DEFAULT_SCHEMA_REGISTRY.clone()
80}
81
82/// RAII guard that installs a registry on the thread-local slot for the
83/// lifetime of the guard.
84///
85/// The previous value (if any) is captured on construction and restored on
86/// drop, so nested scopes compose correctly. Used by synchronous Runtime
87/// entry points (CLI, unit tests, REPL one-shot) that are not running under
88/// a tokio task.
89///
90/// ```ignore
91/// let _scope = SyncRegistryScope::enter(runtime.schema_registry_arc());
92/// // ... invoke any code that calls current_registry() ...
93/// // scope restores previous value on drop
94/// ```
95#[must_use = "the scope only lives as long as the guard is held"]
96pub struct SyncRegistryScope {
97 prev: Option<Arc<TypeSchemaRegistry>>,
98}
99
100impl SyncRegistryScope {
101 /// Install `registry` as the current thread-local registry, saving the
102 /// previous value for restoration on drop.
103 pub fn enter(registry: Arc<TypeSchemaRegistry>) -> Self {
104 let prev = SYNC_CURRENT_SCHEMA_REGISTRY
105 .with(|cell| cell.borrow_mut().replace(registry));
106 Self { prev }
107 }
108}
109
110impl Drop for SyncRegistryScope {
111 fn drop(&mut self) {
112 SYNC_CURRENT_SCHEMA_REGISTRY.with(|cell| {
113 *cell.borrow_mut() = self.prev.take();
114 });
115 }
116}
117
118/// Return a handle to the current ambient `TypeSchemaRegistry`.
119///
120/// Lookup order: task-local → thread-local → process-wide default. This
121/// function never panics and always returns a usable registry. Scoped
122/// callers get their own per-Runtime registry; scopeless callers share
123/// the process-wide default seeded with canonical stdlib types.
124pub fn current_registry() -> Arc<TypeSchemaRegistry> {
125 if let Ok(r) = CURRENT_SCHEMA_REGISTRY.try_with(|r| r.clone()) {
126 return r;
127 }
128 if let Some(r) = SYNC_CURRENT_SCHEMA_REGISTRY.with(|cell| cell.borrow().clone()) {
129 return r;
130 }
131 DEFAULT_SCHEMA_REGISTRY.clone()
132}
133
134/// Alias for [`current_registry`] returning `Option` for historical API
135/// compatibility.
136///
137/// Returns `Some(registry)` unconditionally — the process-wide default is
138/// always available. Retained so pre-B1.7 call sites that matched on
139/// `Option` compile without churn; prefer [`current_registry`] in new code.
140pub fn try_current_registry() -> Option<Arc<TypeSchemaRegistry>> {
141 Some(current_registry())
142}
143
144/// Run `fut` with `registry` installed as the task-local current registry.
145///
146/// The installed registry is inherited by all descendant `.await` points of
147/// `fut` and survives tokio task migration across worker threads.
148pub async fn with_async_scope<R>(
149 registry: Arc<TypeSchemaRegistry>,
150 fut: impl Future<Output = R>,
151) -> R {
152 CURRENT_SCHEMA_REGISTRY.scope(registry, fut).await
153}
154
155/// Test-only helper: construct a default scope with a fresh
156/// stdlib-populated registry. Held by the returned guard; drop it to
157/// restore the previous value.
158///
159/// Prefer this in unit tests that indirectly touch `current_registry()` so
160/// they don't all need to thread a registry manually.
161#[cfg(test)]
162pub(crate) fn test_runtime_scope() -> SyncRegistryScope {
163 let registry = Arc::new(TypeSchemaRegistry::new_with_stdlib());
164 SyncRegistryScope::enter(registry)
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170 use crate::type_schema::FieldType;
171
172 #[test]
173 fn sync_scope_push_pop_restores_previous() {
174 let r1 = Arc::new(TypeSchemaRegistry::new_with_stdlib());
175 let r2 = Arc::new(TypeSchemaRegistry::new_with_stdlib());
176
177 // Baseline is the process-wide default handle (B1.7: no more None).
178 let baseline = current_registry();
179 assert!(Arc::ptr_eq(&baseline, &DEFAULT_SCHEMA_REGISTRY));
180
181 let outer = SyncRegistryScope::enter(r1.clone());
182 assert!(Arc::ptr_eq(¤t_registry(), &r1));
183
184 {
185 let inner = SyncRegistryScope::enter(r2.clone());
186 assert!(Arc::ptr_eq(¤t_registry(), &r2));
187 drop(inner);
188 }
189
190 // r1 restored after inner drop
191 assert!(Arc::ptr_eq(¤t_registry(), &r1));
192 drop(outer);
193
194 // Default fallback visible again after outer drop.
195 assert!(Arc::ptr_eq(¤t_registry(), &baseline));
196 }
197
198 #[test]
199 fn current_registry_falls_back_to_process_default_without_scope() {
200 // On a fresh thread with no installed scope, the process-wide
201 // default handle is returned so scopeless stdlib / unit-test
202 // callers retain pre-B1.7 predeclared-schema semantics.
203 let first = current_registry();
204 let second = current_registry();
205 assert!(Arc::ptr_eq(&first, &second));
206 // The default is a populated stdlib registry.
207 assert!(first.has_type("Row"));
208 assert!(first.has_type("Option"));
209 assert!(first.has_type("Result"));
210 }
211
212 #[test]
213 fn test_runtime_scope_installs_stdlib_registry() {
214 let _guard = test_runtime_scope();
215 let reg = current_registry();
216 assert!(reg.has_type("Row"));
217 assert!(reg.has_type("Option"));
218 assert!(reg.has_type("Result"));
219 }
220
221 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
222 async fn async_scope_survives_task_migration() {
223 let registry = Arc::new(TypeSchemaRegistry::new_with_stdlib());
224 let expected_id = registry.clone();
225
226 with_async_scope(registry, async move {
227 // Force the task to yield so the scheduler may migrate us.
228 tokio::task::yield_now().await;
229
230 let observed = current_registry();
231 assert!(Arc::ptr_eq(&observed, &expected_id));
232
233 // And nested scopes compose normally.
234 let mut inner = TypeSchemaRegistry::new();
235 inner.register_type("Inner", vec![("n".to_string(), FieldType::F64)]);
236 let inner = Arc::new(inner);
237
238 with_async_scope(inner.clone(), async {
239 tokio::task::yield_now().await;
240 assert!(Arc::ptr_eq(¤t_registry(), &inner));
241 })
242 .await;
243
244 // Outer scope restored.
245 assert!(Arc::ptr_eq(¤t_registry(), &expected_id));
246 })
247 .await;
248 }
249
250 #[test]
251 fn task_local_takes_precedence_over_thread_local() {
252 // Build a single-thread runtime so we stay on this test's thread
253 // and can observe the thread-local being visible from inside.
254 let rt = tokio::runtime::Builder::new_current_thread()
255 .build()
256 .expect("current-thread runtime");
257
258 let sync_reg = Arc::new(TypeSchemaRegistry::new_with_stdlib());
259 let async_reg = Arc::new(TypeSchemaRegistry::new_with_stdlib());
260
261 let _guard = SyncRegistryScope::enter(sync_reg.clone());
262 // Without an async scope, the sync one wins.
263 assert!(Arc::ptr_eq(¤t_registry(), &sync_reg));
264
265 rt.block_on(async {
266 // Task-local overrides thread-local while it's active.
267 with_async_scope(async_reg.clone(), async {
268 assert!(Arc::ptr_eq(¤t_registry(), &async_reg));
269 })
270 .await;
271 });
272
273 // After the async scope ends, the thread-local is visible again.
274 assert!(Arc::ptr_eq(¤t_registry(), &sync_reg));
275 }
276}