shape_runtime/type_schema/
mod.rs1use shape_value::ValueWord;
23use std::collections::{HashMap, HashSet};
24use std::sync::RwLock;
25use std::sync::atomic::{AtomicU32, Ordering};
26
27pub mod builtin_schemas;
29pub mod enum_support;
30pub mod field_types;
31pub mod intersection;
32pub mod physical_binding;
33pub mod registry;
34pub mod schema;
35
36pub use builtin_schemas::BuiltinSchemaIds;
38pub use enum_support::{EnumInfo, EnumVariantInfo};
39pub use field_types::{FieldAnnotation, FieldDef, FieldType};
40pub use physical_binding::PhysicalSchemaBinding;
41pub use registry::{TypeSchemaBuilder, TypeSchemaRegistry};
42pub use schema::{TypeBinding, TypeBindingError, TypeSchema};
43
44#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
46pub enum SchemaError {
47 #[error("Field collision on '{field_name}': type '{type1}' vs '{type2}'")]
49 FieldCollision {
50 field_name: String,
51 type1: String,
52 type2: String,
53 },
54 #[error("Schema not found: {0}")]
56 NotFound(String),
57}
58
59pub type SchemaId = u32;
61
62static NEXT_SCHEMA_ID: AtomicU32 = AtomicU32::new(1);
64
65pub(crate) fn next_schema_id() -> SchemaId {
67 NEXT_SCHEMA_ID.fetch_add(1, Ordering::SeqCst)
68}
69
70pub fn ensure_next_schema_id_above(max_existing_id: SchemaId) {
76 let required_next = max_existing_id.saturating_add(1);
77 let mut current = NEXT_SCHEMA_ID.load(Ordering::SeqCst);
78
79 while current < required_next {
80 match NEXT_SCHEMA_ID.compare_exchange(
81 current,
82 required_next,
83 Ordering::SeqCst,
84 Ordering::SeqCst,
85 ) {
86 Ok(_) => break,
87 Err(actual) => current = actual,
88 }
89 }
90}
91
92static STDLIB_SCHEMA_REGISTRY: std::sync::LazyLock<TypeSchemaRegistry> =
95 std::sync::LazyLock::new(|| {
96 let (registry, _ids) = TypeSchemaRegistry::with_stdlib_types_and_builtin_ids();
97 registry
98 });
99
100static PREDECLARED_SCHEMA_CACHE: std::sync::LazyLock<RwLock<HashMap<String, SchemaId>>> =
106 std::sync::LazyLock::new(|| RwLock::new(HashMap::new()));
107
108static PREDECLARED_SCHEMA_REGISTRY: std::sync::LazyLock<RwLock<HashMap<SchemaId, TypeSchema>>> =
110 std::sync::LazyLock::new(|| RwLock::new(HashMap::new()));
111
112fn schema_key_from_fields(fields: &[&str]) -> String {
113 fields.join("\u{1f}")
114}
115
116pub fn register_predeclared_any_schema(fields: &[String]) -> SchemaId {
121 let field_refs: Vec<&str> = fields.iter().map(|s| s.as_str()).collect();
122 let key = schema_key_from_fields(&field_refs);
123
124 if let Ok(cache) = PREDECLARED_SCHEMA_CACHE.read() {
125 if let Some(id) = cache.get(&key) {
126 return *id;
127 }
128 }
129
130 let typed_fields: Vec<(String, FieldType)> = fields
131 .iter()
132 .map(|name| (name.clone(), FieldType::Any))
133 .collect();
134
135 let schema = TypeSchema::new(format!("__predecl_{}", fields.join("_")), typed_fields);
136 let id = schema.id;
137
138 if let Ok(mut reg) = PREDECLARED_SCHEMA_REGISTRY.write() {
139 reg.insert(id, schema);
140 }
141 if let Ok(mut cache) = PREDECLARED_SCHEMA_CACHE.write() {
142 cache.insert(key, id);
143 }
144
145 id
146}
147
148fn lookup_predeclared_schema_by_id(id: SchemaId) -> Option<TypeSchema> {
149 PREDECLARED_SCHEMA_REGISTRY
150 .read()
151 .ok()
152 .and_then(|reg| reg.get(&id).cloned())
153}
154
155fn lookup_predeclared_schema_id(fields: &[&str]) -> Option<SchemaId> {
156 let key = schema_key_from_fields(fields);
157
158 if let Ok(cache) = PREDECLARED_SCHEMA_CACHE.read() {
159 if let Some(id) = cache.get(&key) {
160 return Some(*id);
161 }
162 }
163
164 STDLIB_SCHEMA_REGISTRY
165 .type_names()
166 .filter_map(|name| STDLIB_SCHEMA_REGISTRY.get(name))
167 .find(|schema| {
168 if schema.fields.len() != fields.len() {
169 return false;
170 }
171 schema
172 .fields
173 .iter()
174 .map(|f| f.name.as_str())
175 .eq(fields.iter().copied())
176 })
177 .map(|schema| schema.id)
178}
179
180fn lookup_schema_by_id(id: SchemaId) -> Option<TypeSchema> {
181 STDLIB_SCHEMA_REGISTRY
182 .get_by_id(id)
183 .cloned()
184 .or_else(|| lookup_predeclared_schema_by_id(id))
185}
186
187pub fn lookup_schema_by_id_public(id: SchemaId) -> Option<TypeSchema> {
191 lookup_schema_by_id(id)
192}
193
194fn schema_matches_field_set(schema: &TypeSchema, fields: &[&str]) -> bool {
195 if schema.fields.len() != fields.len() {
196 return false;
197 }
198 let wanted: HashSet<&str> = fields.iter().copied().collect();
199 schema
200 .fields
201 .iter()
202 .all(|field| wanted.contains(field.name.as_str()))
203}
204
205fn lookup_schema_for_fields(fields: &[&str]) -> Option<TypeSchema> {
213 if let Some(id) = lookup_predeclared_schema_id(fields) {
214 return lookup_schema_by_id(id);
215 }
216
217 if let Some(schema) = STDLIB_SCHEMA_REGISTRY
218 .type_names()
219 .filter_map(|name| STDLIB_SCHEMA_REGISTRY.get(name))
220 .find(|schema| schema_matches_field_set(schema, fields))
221 {
222 return Some(schema.clone());
223 }
224
225 if let Some(schema) = PREDECLARED_SCHEMA_REGISTRY.read().ok().and_then(|reg| {
226 reg.values()
227 .find(|schema| schema_matches_field_set(schema, fields))
228 .cloned()
229 }) {
230 return Some(schema);
231 }
232
233 let owned: Vec<String> = fields.iter().map(|s| s.to_string()).collect();
235 let id = register_predeclared_any_schema(&owned);
236 lookup_predeclared_schema_by_id(id)
237}
238
239pub fn typed_object_from_pairs(fields: &[(&str, ValueWord)]) -> ValueWord {
258 let field_names: Vec<&str> = fields.iter().map(|(name, _)| *name).collect();
259 let schema = lookup_schema_for_fields(&field_names).unwrap_or_else(|| {
260 panic!(
261 "Missing predeclared schema for fields [{}]. Runtime schema synthesis is disabled.",
262 field_names.join(", ")
263 )
264 });
265 let value_by_name: HashMap<&str, &ValueWord> =
266 fields.iter().map(|(name, value)| (*name, value)).collect();
267
268 let mut slots = Vec::with_capacity(schema.fields.len());
270 let mut heap_mask: u64 = 0;
271 for (i, field_def) in schema.fields.iter().enumerate() {
272 let value = value_by_name
273 .get(field_def.name.as_str())
274 .unwrap_or_else(|| {
275 panic!(
276 "Missing field '{}' while materializing typed object",
277 field_def.name
278 )
279 });
280 let (slot, is_heap) = nb_to_slot(value);
281 slots.push(slot);
282 if is_heap {
283 heap_mask |= 1u64 << i;
284 }
285 }
286
287 ValueWord::from_heap_value(shape_value::heap_value::HeapValue::TypedObject {
288 schema_id: schema.id as u64,
289 slots: slots.into_boxed_slice(),
290 heap_mask,
291 })
292}
293
294pub fn typed_object_from_nb_pairs(
301 fields: &[(&str, shape_value::ValueWord)],
302) -> shape_value::ValueWord {
303 let field_names: Vec<&str> = fields.iter().map(|(name, _)| *name).collect();
304 let schema = lookup_schema_for_fields(&field_names).unwrap_or_else(|| {
305 panic!(
306 "Missing predeclared schema for fields [{}]. Runtime schema synthesis is disabled.",
307 field_names.join(", ")
308 )
309 });
310 let value_by_name: HashMap<&str, &shape_value::ValueWord> =
311 fields.iter().map(|(name, value)| (*name, value)).collect();
312
313 let mut slots = Vec::with_capacity(schema.fields.len());
315 let mut heap_mask: u64 = 0;
316 for (i, field_def) in schema.fields.iter().enumerate() {
317 let nb = value_by_name
318 .get(field_def.name.as_str())
319 .unwrap_or_else(|| {
320 panic!(
321 "Missing field '{}' while materializing typed object",
322 field_def.name
323 )
324 });
325 let (slot, is_heap) = nb_to_slot(nb);
326 slots.push(slot);
327 if is_heap {
328 heap_mask |= 1u64 << i;
329 }
330 }
331
332 shape_value::ValueWord::from_heap_value(shape_value::heap_value::HeapValue::TypedObject {
333 schema_id: schema.id as u64,
334 slots: slots.into_boxed_slice(),
335 heap_mask,
336 })
337}
338
339#[cfg(test)]
340mod tests {
341 use super::*;
342 use shape_value::ValueWord;
343
344 #[test]
345 fn typed_object_from_nb_pairs_is_order_insensitive_for_builtin_schema() {
346 let obj = typed_object_from_nb_pairs(&[
347 (
348 "function",
349 ValueWord::from_string(std::sync::Arc::new("f".to_string())),
350 ),
351 (
352 "file",
353 ValueWord::from_string(std::sync::Arc::new("m".to_string())),
354 ),
355 ("line", ValueWord::from_i64(42)),
356 ("ip", ValueWord::from_i64(7)),
357 ]);
358
359 let map = typed_object_to_hashmap_nb(&obj).expect("typed object should decode");
360 assert_eq!(map.get("function").and_then(|v| v.as_str()), Some("f"));
361 assert_eq!(map.get("file").and_then(|v| v.as_str()), Some("m"));
362 assert_eq!(map.get("line").and_then(|v| v.as_i64()), Some(42));
363 assert_eq!(map.get("ip").and_then(|v| v.as_i64()), Some(7));
364 }
365}
366
367pub fn typed_object_to_hashmap(value: &ValueWord) -> Option<HashMap<String, ValueWord>> {
374 typed_object_to_hashmap_nb(value)
376}
377
378pub fn typed_object_to_hashmap_nb(
385 value: &shape_value::ValueWord,
386) -> Option<HashMap<String, shape_value::ValueWord>> {
387 let (schema_id, slots, heap_mask) = value.as_typed_object()?;
388 let sid = schema_id as SchemaId;
389 let schema = STDLIB_SCHEMA_REGISTRY
390 .get_by_id(sid)
391 .cloned()
392 .or_else(|| lookup_predeclared_schema_by_id(sid))?;
393 let mut map = HashMap::with_capacity(schema.fields.len());
394 for (i, field_def) in schema.fields.iter().enumerate() {
395 if i < slots.len() {
396 let val = if heap_mask & (1u64 << i) != 0 {
397 slots[i].as_heap_nb()
398 } else {
399 unsafe { shape_value::ValueWord::clone_from_bits(slots[i].raw()) }
406 };
407 map.insert(field_def.name.clone(), val);
408 }
409 }
410 Some(map)
411}
412
413pub(crate) fn nb_to_slot(nb: &shape_value::ValueWord) -> (shape_value::slot::ValueSlot, bool) {
423 use shape_value::NanTag;
424 use shape_value::slot::ValueSlot;
425
426 match nb.tag() {
427 NanTag::Heap => {
428 let hv = nb.as_heap_ref().unwrap().clone();
429 (ValueSlot::from_heap(hv), true)
430 }
431 _ => {
432 (ValueSlot::from_raw(nb.raw_bits()), false)
434 }
435 }
436}