1use std::{collections::{BTreeMap, HashMap}, marker::PhantomData, ops::Deref, path::PathBuf};
2
3use derive_more::{Display, Error, From};
4use serde::{Deserialize, Serialize};
5use tempest_core::{
6 journal::{Journal, JournalError, JournalHandle, Replayable},
7 tempest_str::TempestStr,
8};
9use tempest_io::Io;
10use tempest_rt::JoinHandle;
11
12use crate::{
13 catalog::schema::{
14 DatabaseId, DatabaseSchema, EnumSchema, EnumVariantDef, FieldDef, FieldId, FlatField,
15 StructSchema, TableId, TableSchema, TypeExpr, TypeId, TypeSchema, VariantId,
16 },
17 config::CatalogConfig,
18 row::resolved::ResolvedTable,
19};
20
21fn resolve_type_args(ref_args: &[TypeExpr], generic_args: &[TypeExpr]) -> Vec<TypeExpr> {
24 ref_args
25 .iter()
26 .map(|arg| match arg {
27 TypeExpr::GenericParam(i) => generic_args
28 .get(*i as usize)
29 .cloned()
30 .unwrap_or_else(|| arg.clone()),
31 other => other.clone(),
32 })
33 .collect()
34}
35
36#[instrument(skip_all, level = "trace")]
37pub(crate) fn flatten_schema(
38 fields: &BTreeMap<FieldId, FieldDef>,
39 generic_args: &[TypeExpr],
40 catalog: &CatalogState,
41 prefix: &str,
42) -> Result<Vec<FlatField>, CatalogError> {
43 let mut result = Vec::new();
44 for (_, def) in fields {
45 let field_name: TempestStr<'static> = if prefix.is_empty() {
46 def.name.clone()
47 } else {
48 TempestStr::from_owned(format!("{}{}", prefix, def.name))
49 .expect("dotted field name does not contain null bytes")
50 };
51 match &def.ty {
52 TypeExpr::Primitive(ty) => {
53 trace!(name = %field_name, ?ty, "flat primitive field");
54 result.push(FlatField { name: field_name, ty: *ty, type_args: vec![] });
55 }
56 TypeExpr::Ref(type_id, ref_args) => {
57 let type_schema = catalog
58 .get_type(*type_id)
59 .ok_or(CatalogError::TypeNotFound(*type_id))?;
60 match type_schema {
61 TypeSchema::Struct(struct_schema) => {
62 let sub_prefix = format!("{}{}.", prefix, def.name);
63 trace!(name = %def.name, "recursing into Ref field");
64 let sub = flatten_schema(&struct_schema.fields, ref_args, catalog, &sub_prefix)?;
65 result.extend(sub);
66 }
67 TypeSchema::Enum(_) => {
68 trace!(name = %field_name, "enum leaf field");
69 let resolved_args = resolve_type_args(ref_args, generic_args);
70 result.push(FlatField {
71 name: field_name,
72 ty: crate::types::TempestType::Enum(**type_id),
73 type_args: resolved_args,
74 });
75 }
76 }
77 }
78 TypeExpr::GenericParam(i) => match generic_args.get(*i as usize) {
79 Some(TypeExpr::Primitive(ty)) => {
80 trace!(name = %field_name, ?ty, "flat generic-param primitive field");
81 result.push(FlatField { name: field_name, ty: *ty, type_args: vec![] });
82 }
83 Some(TypeExpr::Ref(type_id, ref_args)) => {
84 let type_schema = catalog
85 .get_type(*type_id)
86 .ok_or(CatalogError::TypeNotFound(*type_id))?;
87 match type_schema {
88 TypeSchema::Struct(struct_schema) => {
89 let sub_prefix = format!("{}{}.", prefix, def.name);
90 trace!(name = %def.name, "recursing into generic-param Ref field");
91 let sub = flatten_schema(&struct_schema.fields, ref_args, catalog, &sub_prefix)?;
92 result.extend(sub);
93 }
94 TypeSchema::Enum(_) => {
95 trace!(name = %field_name, "generic-param enum leaf field");
96 let resolved_args = resolve_type_args(ref_args, generic_args);
97 result.push(FlatField {
98 name: field_name,
99 ty: crate::types::TempestType::Enum(**type_id),
100 type_args: resolved_args,
101 });
102 }
103 }
104 }
105 Some(TypeExpr::GenericParam(_)) => unreachable!("type args must be concrete"),
106 None => unreachable!("generic param index out of range - catalog is corrupt"),
107 },
108 }
109 }
110 Ok(result)
111}
112
113pub(crate) fn pk_path_to_flat_idx(
117 path: &[FieldId],
118 fields: &std::collections::BTreeMap<FieldId, FieldDef>,
119 generic_args: &[TypeExpr],
120 catalog: &CatalogState,
121 flat_fields: &[FlatField],
122) -> Option<usize> {
123 let mut name = String::new();
124 let mut current_fields = fields;
125 let mut current_args: std::borrow::Cow<[TypeExpr]> = std::borrow::Cow::Borrowed(generic_args);
126
127 for (i, fid) in path.iter().enumerate() {
128 let def = current_fields.get(fid)?;
129 if name.is_empty() {
130 name.push_str(&def.name);
131 } else {
132 name.push('.');
133 name.push_str(&def.name);
134 }
135 if i < path.len() - 1 {
136 let resolved = match &def.ty {
137 TypeExpr::GenericParam(idx) => current_args.get(*idx as usize)?.clone(),
138 other => other.clone(),
139 };
140 match resolved {
141 TypeExpr::Ref(type_id, ref_args) => {
142 let type_schema = catalog.get_type(type_id)?;
143 let struct_schema = type_schema.as_struct()?;
144 current_fields = &struct_schema.fields;
145 current_args = std::borrow::Cow::Owned(ref_args);
146 }
147 _ => return None,
148 }
149 }
150 }
151 flat_fields.iter().position(|ff| ff.name.as_ref() == name.as_str())
152}
153
154pub mod schema;
155
156#[cfg(test)]
157mod tests;
158
159#[derive(Debug, Clone, Serialize, Deserialize)]
165pub enum CatalogEditV1 {
166 CreateDatabase((DatabaseId, DatabaseSchema)),
168 CreateTable((TableId, TableSchema)),
170 CreateType((TypeId, StructSchema)),
172 CreateEnum((TypeId, EnumSchema)),
174 Snapshot(Vec<CatalogEdit>),
178}
179
180#[repr(u16)]
186#[derive(derive_more::Debug, Clone, Serialize, Deserialize)]
187pub enum CatalogEdit {
188 #[debug("{:?}", _0)]
189 V1(CatalogEditV1) = 1,
190}
191
192#[derive(Debug, Display, Error, From)]
193pub enum CatalogError {
194 #[display("journal error: {}", _0)]
195 JournalError(JournalError),
196
197 #[from(skip)]
198 #[display("database with ID {} was not found", _0)]
199 DatabaseNotFound(#[error(not(source))] DatabaseId),
200 #[from(skip)]
201 #[display("database with name '{}' already exists", _0)]
202 DatabaseAlreadyExists(#[error(not(source))] TempestStr<'static>),
203
204 #[from(skip)]
205 #[display("table with ID {} was not found", _0)]
206 TableNotFound(#[error(not(source))] TableId),
207 #[from(skip)]
208 #[display("table with name '{}' already exists inside of this scope", _0)]
209 TableAlreadyExists(#[error(not(source))] TempestStr<'static>),
210
211 #[from(skip)]
212 #[display("type with ID {} was not found", _0)]
213 TypeNotFound(#[error(not(source))] TypeId),
214 #[from(skip)]
215 #[display("type with name '{}' already exists inside of this scope", _0)]
216 TypeAlreadyExists(#[error(not(source))] TempestStr<'static>),
217}
218
219#[derive(Debug, Clone)]
220pub struct CatalogState {
221 next_table_id: TableId,
224 pub tables: HashMap<TableId, TableSchema>,
226
227 next_database_id: DatabaseId,
230 pub databases: HashMap<DatabaseId, DatabaseSchema>,
232
233 next_type_id: TypeId,
236 pub types: HashMap<TypeId, TypeSchema>,
238
239 pub global_types: HashMap<TypeId, TypeSchema>,
242}
243
244impl Default for CatalogState {
245 fn default() -> Self {
246 let mut global_types = HashMap::new();
247 let option_schema = TypeSchema::Enum(EnumSchema {
248 database_id: None,
249 name: "Option".into(),
250 generic_params: vec!["T".into()],
251 variants: {
252 let mut v = BTreeMap::new();
253 v.insert(VariantId(0), EnumVariantDef { name: "None".into(), fields: vec![] });
254 v.insert(VariantId(1), EnumVariantDef { name: "Some".into(), fields: vec![TypeExpr::GenericParam(0)] });
255 v
256 },
257 });
258 global_types.insert(TypeId(u32::MAX), option_schema);
259
260 Self {
261 next_table_id: TableId::default(),
262 tables: HashMap::new(),
263 next_database_id: DatabaseId::default(),
264 databases: HashMap::new(),
265 next_type_id: TypeId::default(),
266 types: HashMap::new(),
267 global_types,
268 }
269 }
270}
271
272impl CatalogState {
273 fn create_database_edit(
274 &self,
275 schema: DatabaseSchema,
276 ) -> Result<(DatabaseId, CatalogEdit), CatalogError> {
277 if self.databases.values().any(|db| db.name == schema.name) {
278 return Err(CatalogError::DatabaseAlreadyExists(schema.name));
279 }
280
281 let id = self.next_database_id;
282 trace!(?id, "assigned id to create database edit");
283
284 Ok((
285 id,
286 CatalogEdit::V1(CatalogEditV1::CreateDatabase((id, schema))),
287 ))
288 }
289
290 fn create_table_edit(
291 &self,
292 schema: TableSchema,
293 ) -> Result<(TableId, CatalogEdit), CatalogError> {
294 let db = self
295 .databases
296 .get(&schema.database_id)
297 .ok_or(CatalogError::DatabaseNotFound(schema.database_id))?;
298
299 let id = self.next_table_id;
300 trace!(?id, "assigned id to create table edit");
301
302 if db.tables.iter().any(|id| {
303 self.tables[id].database_id == schema.database_id && self.tables[id].name == schema.name
304 }) {
305 return Err(CatalogError::TableAlreadyExists(schema.name));
306 }
307
308 Ok((
309 id,
310 CatalogEdit::V1(CatalogEditV1::CreateTable((id, schema))),
311 ))
312 }
313
314 fn create_type_edit(
315 &self,
316 schema: StructSchema,
317 ) -> Result<(TypeId, CatalogEdit), CatalogError> {
318 if self
319 .types
320 .values()
321 .any(|t| t.database_id() == schema.database_id && t.name() == &schema.name)
322 {
323 return Err(CatalogError::TypeAlreadyExists(schema.name));
324 }
325
326 let id = self.next_type_id;
327 trace!(?id, "assigned id to create type edit");
328
329 Ok((id, CatalogEdit::V1(CatalogEditV1::CreateType((id, schema)))))
330 }
331
332 fn create_enum_edit(
333 &self,
334 schema: EnumSchema,
335 ) -> Result<(TypeId, CatalogEdit), CatalogError> {
336 if self
337 .types
338 .values()
339 .any(|t| t.database_id() == schema.database_id && t.name() == &schema.name)
340 {
341 return Err(CatalogError::TypeAlreadyExists(schema.name));
342 }
343
344 let id = self.next_type_id;
345 trace!(?id, "assigned id to create enum edit");
346
347 Ok((id, CatalogEdit::V1(CatalogEditV1::CreateEnum((id, schema)))))
348 }
349
350 pub(crate) fn get_database_by_name(
351 &self,
352 name: &TempestStr,
353 ) -> Option<(DatabaseId, &DatabaseSchema)> {
354 for (&id, schema) in &self.databases {
355 if schema.name == *name {
356 return Some((id, schema));
357 }
358 }
359 None
360 }
361
362 pub(crate) fn get_table_by_name(
363 &self,
364 database_id: DatabaseId,
365 name: &TempestStr,
366 ) -> Option<(TableId, &TableSchema)> {
367 for (&id, schema) in &self.tables {
368 if schema.database_id == database_id && schema.name == *name {
369 return Some((id, schema));
370 }
371 }
372 None
373 }
374
375 pub(crate) fn get_type_by_name(
376 &self,
377 database_id: DatabaseId,
378 name: &TempestStr,
379 ) -> Option<(TypeId, &TypeSchema)> {
380 for (&id, schema) in &self.types {
381 if schema.database_id() == Some(database_id) && schema.name() == name {
382 return Some((id, schema));
383 }
384 }
385 None
386 }
387
388 pub fn get_type(&self, id: TypeId) -> Option<&TypeSchema> {
390 self.types.get(&id).or_else(|| self.global_types.get(&id))
391 }
392
393 pub(crate) fn get_global_type_by_name(&self, name: &TempestStr) -> Option<(TypeId, &TypeSchema)> {
395 self.global_types.iter().find(|(_, s)| s.name() == name).map(|(&id, s)| (id, s))
396 }
397
398 pub fn pk_path_name(
401 &self,
402 path: &[FieldId],
403 table_schema: &TableSchema,
404 ) -> String {
405 let struct_schema = self.get_type(table_schema.type_id)
406 .expect("type not found in catalog")
407 .as_struct()
408 .expect("table type must be a struct");
409 let mut name = String::new();
410 let mut current_fields = &struct_schema.fields;
411 let mut current_args: std::borrow::Cow<[TypeExpr]> =
412 std::borrow::Cow::Borrowed(&table_schema.generic_args);
413
414 for (i, fid) in path.iter().enumerate() {
415 let def = ¤t_fields[fid];
416 if name.is_empty() {
417 name.push_str(&def.name);
418 } else {
419 name.push('.');
420 name.push_str(&def.name);
421 }
422 if i < path.len() - 1 {
423 let resolved = match &def.ty {
424 TypeExpr::GenericParam(idx) => current_args[*idx as usize].clone(),
425 other => other.clone(),
426 };
427 if let TypeExpr::Ref(type_id, ref_args) = resolved {
428 if let Some(ts) = self.get_type(type_id) {
429 if let Some(s) = ts.as_struct() {
430 current_fields = &s.fields;
431 current_args = std::borrow::Cow::Owned(ref_args);
432 }
433 }
434 }
435 }
436 }
437 name
438 }
439
440 pub fn tables_in_database(
441 &self,
442 database: &str,
443 ) -> impl Iterator<Item = (TableId, &TableSchema)> {
444 self.databases
445 .iter()
446 .filter(|(_, db)| Some(&db.name) == TempestStr::from_borrowed(database).ok().as_ref())
447 .map(|(_, db)| db.tables.iter().map(|t| (*t, &self.tables[t])))
448 .flatten()
449 }
450
451 pub fn types_in_database(
452 &self,
453 database: &str,
454 ) -> impl Iterator<Item = (TypeId, &TypeSchema)> {
455 self.databases
456 .iter()
457 .filter(|(_, db)| Some(&db.name) == TempestStr::from_borrowed(database).ok().as_ref())
458 .map(|(_, db)| db.types.iter().map(|t| (*t, &self.types[t])))
459 .flatten()
460 }
461
462 pub(crate) fn resolved_table(&self, table_id: TableId) -> ResolvedTable<'_> {
466 let table_schema = self
467 .tables
468 .get(&table_id)
469 .expect("table not found in catalog");
470 let struct_schema = self
471 .get_type(table_schema.type_id)
472 .expect("type not found in catalog")
473 .as_struct()
474 .expect("table type must be a struct");
475 let flat_fields = flatten_schema(
476 &struct_schema.fields,
477 &table_schema.generic_args,
478 self,
479 "",
480 )
481 .expect("flat schema build failed - catalog is inconsistent");
482 let primary_key = table_schema.primary_key.iter()
483 .map(|path| pk_path_to_flat_idx(path, &struct_schema.fields, &table_schema.generic_args, self, &flat_fields)
484 .expect("pk path not found in flat fields - catalog is inconsistent"))
485 .collect();
486 ResolvedTable {
487 id: table_id,
488 fields: &struct_schema.fields,
489 generic_args: &table_schema.generic_args,
490 primary_key,
491 flat_fields,
492 }
493 }
494}
495
496impl Replayable for CatalogState {
497 type Edit = CatalogEdit;
498
499 #[instrument(skip_all, level = "debug")]
500 fn apply(&mut self, edit: Self::Edit) {
501 match edit {
502 CatalogEdit::V1(v1) => match v1 {
503 CatalogEditV1::CreateDatabase((id, schema)) => {
504 debug!(?id, ?schema, "applying create database edit");
505 assert!(!self.databases.contains_key(&id));
506 self.next_database_id = DatabaseId(*id + 1).max(self.next_database_id);
507 self.databases.insert(id, schema);
508 }
509 CatalogEditV1::CreateTable((id, schema)) => {
510 debug!(?id, ?schema, "applying create table edit");
511 assert!(!self.tables.contains_key(&id));
512 self.next_table_id = TableId(*id + 1).max(self.next_table_id);
513 self.databases
515 .get_mut(&schema.database_id)
516 .expect("database must exist when applying CreateTable")
517 .tables
518 .insert(id);
519 self.tables.insert(id, schema);
520 }
521 CatalogEditV1::CreateType((id, schema)) => {
522 debug!(?id, ?schema, "applying create type edit");
523 assert!(!self.types.contains_key(&id));
524 self.next_type_id = TypeId(*id + 1).max(self.next_type_id);
525 if let Some(db_id) = schema.database_id {
526 self.databases
527 .get_mut(&db_id)
528 .expect("database must exist when applying CreateType")
529 .types
530 .insert(id);
531 }
532 self.types.insert(id, TypeSchema::Struct(schema));
533 }
534 CatalogEditV1::CreateEnum((id, schema)) => {
535 debug!(?id, ?schema, "applying create enum edit");
536 assert!(!self.types.contains_key(&id));
537 self.next_type_id = TypeId(*id + 1).max(self.next_type_id);
538 if let Some(db_id) = schema.database_id {
539 self.databases
540 .get_mut(&db_id)
541 .expect("database must exist when applying CreateEnum")
542 .types
543 .insert(id);
544 }
545 self.types.insert(id, TypeSchema::Enum(schema));
546 }
547 CatalogEditV1::Snapshot(edits) => {
548 debug!(count = edits.len(), "applying snapshot edits");
549 for e in edits {
550 self.apply(e);
551 }
552 }
553 },
554 }
555 }
556
557 fn snapshot(&self) -> Self::Edit {
558 let mut edits = Vec::new();
559
560 edits.extend(self.databases.iter().map(|(id, schema)| {
561 CatalogEdit::V1(CatalogEditV1::CreateDatabase((id.clone(), schema.clone())))
562 }));
563
564 edits.extend(self.types.iter().map(|(id, schema)| match schema {
565 TypeSchema::Struct(s) => CatalogEdit::V1(CatalogEditV1::CreateType((*id, s.clone()))),
566 TypeSchema::Enum(e) => CatalogEdit::V1(CatalogEditV1::CreateEnum((*id, e.clone()))),
567 }));
568
569 edits.extend(self.tables.iter().map(|(id, schema)| {
570 CatalogEdit::V1(CatalogEditV1::CreateTable((id.clone(), schema.clone())))
571 }));
572
573 CatalogEdit::V1(CatalogEditV1::Snapshot(edits))
574 }
575
576 fn filename_prefix() -> &'static str {
577 "catalog"
578 }
579
580 fn initial() -> Self {
581 CatalogState::default()
582 }
583}
584
585pub(crate) struct Catalog<I: Io> {
596 data: CatalogState,
597 journal: JournalHandle<CatalogState>,
598 journal_handle: JoinHandle<()>,
599 _marker: PhantomData<I>,
600}
601
602impl<I: Io> Catalog<I> {
603 #[instrument(skip_all, level = "info")]
606 pub(crate) async fn open(dir: PathBuf, config: CatalogConfig) -> Result<Self, CatalogError> {
607 info!("opening catalog at {:?}", dir);
608 let (journal, journal_handle) =
609 Journal::<CatalogState, I>::new(dir, config.journal.clone()).await?;
610 let data = journal.data().clone();
611
612 info!("finished opening catalog");
613
614 Ok(Self {
615 data,
616 journal,
617 journal_handle,
618 _marker: PhantomData,
619 })
620 }
621
622 #[instrument(skip_all, level = "info")]
628 pub(crate) async fn create_database(
629 &mut self,
630 schema: DatabaseSchema,
631 ) -> Result<DatabaseId, CatalogError> {
632 let (id, edit) = self.create_database_edit(schema)?;
633 debug!("perstisting database schema to journal");
634 self.journal.append(edit.clone()).await?;
635 self.data.apply(edit);
636 Ok(id)
637 }
638
639 #[instrument(skip_all, level = "info")]
648 pub(crate) async fn create_table(
649 &mut self,
650 schema: TableSchema,
651 ) -> Result<TableId, CatalogError> {
652 let (id, edit) = self.create_table_edit(schema)?;
653 debug!("perstisting table schema to journal");
654 self.journal.append(edit.clone()).await?;
655 self.data.apply(edit);
656 Ok(id)
657 }
658
659 #[instrument(skip_all, level = "info")]
660 pub(crate) async fn create_type(
661 &mut self,
662 schema: StructSchema,
663 ) -> Result<TypeId, CatalogError> {
664 let (id, edit) = self.create_type_edit(schema)?;
665 debug!("persisting type schema to journal");
666 self.journal.append(edit.clone()).await?;
667 self.data.apply(edit);
668 Ok(id)
669 }
670
671 #[instrument(skip_all, level = "info")]
672 pub(crate) async fn create_enum(
673 &mut self,
674 schema: EnumSchema,
675 ) -> Result<TypeId, CatalogError> {
676 let (id, edit) = self.create_enum_edit(schema)?;
677 debug!("persisting enum schema to journal");
678 self.journal.append(edit.clone()).await?;
679 self.data.apply(edit);
680 Ok(id)
681 }
682
683 pub(crate) async fn shutdown(self) -> Result<(), CatalogError> {
684 drop(self.journal);
685 let _ = self.journal_handle.await;
686 Ok(())
687 }
688}
689
690impl<I: Io> Deref for Catalog<I> {
693 type Target = CatalogState;
694
695 fn deref(&self) -> &Self::Target {
696 &self.data
697 }
698}
699
700#[cfg(test)]
701pub(crate) mod testing {
702 use std::collections::BTreeMap;
703
704 use crate::{
705 catalog::schema::{FieldDef, FieldId, TypeExpr},
706 types::TempestType,
707 };
708
709 use super::*;
710 pub(crate) fn create_catalog_state_for_testing() -> CatalogState {
711 let mut state = CatalogState::initial();
712
713 let (db_id, edit) = state
714 .create_database_edit(DatabaseSchema::new("main".into()))
715 .unwrap();
716 state.apply(edit);
717
718 let mut fields = BTreeMap::new();
720 fields.insert(
721 FieldId(0),
722 FieldDef {
723 name: "id".into(),
724 ty: TypeExpr::Primitive(TempestType::Int64),
725 },
726 );
727 fields.insert(
728 FieldId(1),
729 FieldDef {
730 name: "name".into(),
731 ty: TypeExpr::Primitive(TempestType::String),
732 },
733 );
734 let (type_id, edit) = state
735 .create_type_edit(StructSchema {
736 database_id: Some(db_id),
737 name: "User".into(),
738 generic_params: Vec::new(),
739 fields,
740 })
741 .unwrap();
742 state.apply(edit);
743
744 let (_, edit) = state
746 .create_table_edit(TableSchema {
747 database_id: db_id,
748 name: "users".into(),
749 type_id,
750 generic_args: Vec::new(),
751 primary_key: vec![vec![FieldId(0)]],
752 })
753 .unwrap();
754 state.apply(edit);
755
756 state
757 }
758}