Skip to main content

tempest_engine/catalog/
mod.rs

1use std::{collections::{BTreeMap, HashMap}, marker::PhantomData, ops::Deref, path::PathBuf};
2
3use derive_more::{Display, Error, From};
4use serde::{Deserialize, Serialize};
5use tempest_core::{
6    journal::{Journal, JournalError, JournalHandle, Replayable},
7    tempest_str::TempestStr,
8};
9use tempest_io::Io;
10use tempest_rt::JoinHandle;
11
12use crate::{
13    catalog::schema::{
14        DatabaseId, DatabaseSchema, EnumSchema, EnumVariantDef, FieldDef, FieldId, FlatField,
15        StructSchema, TableId, TableSchema, TypeExpr, TypeId, TypeSchema, VariantId,
16    },
17    config::CatalogConfig,
18    row::resolved::ResolvedTable,
19};
20
21/// Resolves each entry in `ref_args` against the enclosing `generic_args`,
22/// producing concrete TypeExprs that can be stored on a FlatField for later decoding.
23fn resolve_type_args(ref_args: &[TypeExpr], generic_args: &[TypeExpr]) -> Vec<TypeExpr> {
24    ref_args
25        .iter()
26        .map(|arg| match arg {
27            TypeExpr::GenericParam(i) => generic_args
28                .get(*i as usize)
29                .cloned()
30                .unwrap_or_else(|| arg.clone()),
31            other => other.clone(),
32        })
33        .collect()
34}
35
36#[instrument(skip_all, level = "trace")]
37pub(crate) fn flatten_schema(
38    fields: &BTreeMap<FieldId, FieldDef>,
39    generic_args: &[TypeExpr],
40    catalog: &CatalogState,
41    prefix: &str,
42) -> Result<Vec<FlatField>, CatalogError> {
43    let mut result = Vec::new();
44    for (_, def) in fields {
45        let field_name: TempestStr<'static> = if prefix.is_empty() {
46            def.name.clone()
47        } else {
48            TempestStr::from_owned(format!("{}{}", prefix, def.name))
49                .expect("dotted field name does not contain null bytes")
50        };
51        match &def.ty {
52            TypeExpr::Primitive(ty) => {
53                trace!(name = %field_name, ?ty, "flat primitive field");
54                result.push(FlatField { name: field_name, ty: *ty, type_args: vec![] });
55            }
56            TypeExpr::Ref(type_id, ref_args) => {
57                let type_schema = catalog
58                    .get_type(*type_id)
59                    .ok_or(CatalogError::TypeNotFound(*type_id))?;
60                match type_schema {
61                    TypeSchema::Struct(struct_schema) => {
62                        let sub_prefix = format!("{}{}.", prefix, def.name);
63                        trace!(name = %def.name, "recursing into Ref field");
64                        let sub = flatten_schema(&struct_schema.fields, ref_args, catalog, &sub_prefix)?;
65                        result.extend(sub);
66                    }
67                    TypeSchema::Enum(_) => {
68                        trace!(name = %field_name, "enum leaf field");
69                        let resolved_args = resolve_type_args(ref_args, generic_args);
70                        result.push(FlatField {
71                            name: field_name,
72                            ty: crate::types::TempestType::Enum(**type_id),
73                            type_args: resolved_args,
74                        });
75                    }
76                }
77            }
78            TypeExpr::GenericParam(i) => match generic_args.get(*i as usize) {
79                Some(TypeExpr::Primitive(ty)) => {
80                    trace!(name = %field_name, ?ty, "flat generic-param primitive field");
81                    result.push(FlatField { name: field_name, ty: *ty, type_args: vec![] });
82                }
83                Some(TypeExpr::Ref(type_id, ref_args)) => {
84                    let type_schema = catalog
85                        .get_type(*type_id)
86                        .ok_or(CatalogError::TypeNotFound(*type_id))?;
87                    match type_schema {
88                        TypeSchema::Struct(struct_schema) => {
89                            let sub_prefix = format!("{}{}.", prefix, def.name);
90                            trace!(name = %def.name, "recursing into generic-param Ref field");
91                            let sub = flatten_schema(&struct_schema.fields, ref_args, catalog, &sub_prefix)?;
92                            result.extend(sub);
93                        }
94                        TypeSchema::Enum(_) => {
95                            trace!(name = %field_name, "generic-param enum leaf field");
96                            let resolved_args = resolve_type_args(ref_args, generic_args);
97                            result.push(FlatField {
98                                name: field_name,
99                                ty: crate::types::TempestType::Enum(**type_id),
100                                type_args: resolved_args,
101                            });
102                        }
103                    }
104                }
105                Some(TypeExpr::GenericParam(_)) => unreachable!("type args must be concrete"),
106                None => unreachable!("generic param index out of range - catalog is corrupt"),
107            },
108        }
109    }
110    Ok(result)
111}
112
113/// Resolves a root-to-leaf `FieldId` path to a flat field index.
114/// Traverses the type hierarchy following each FieldId, builds the dot-separated
115/// flat field name, then looks it up in `flat_fields`.
116pub(crate) fn pk_path_to_flat_idx(
117    path: &[FieldId],
118    fields: &std::collections::BTreeMap<FieldId, FieldDef>,
119    generic_args: &[TypeExpr],
120    catalog: &CatalogState,
121    flat_fields: &[FlatField],
122) -> Option<usize> {
123    let mut name = String::new();
124    let mut current_fields = fields;
125    let mut current_args: std::borrow::Cow<[TypeExpr]> = std::borrow::Cow::Borrowed(generic_args);
126
127    for (i, fid) in path.iter().enumerate() {
128        let def = current_fields.get(fid)?;
129        if name.is_empty() {
130            name.push_str(&def.name);
131        } else {
132            name.push('.');
133            name.push_str(&def.name);
134        }
135        if i < path.len() - 1 {
136            let resolved = match &def.ty {
137                TypeExpr::GenericParam(idx) => current_args.get(*idx as usize)?.clone(),
138                other => other.clone(),
139            };
140            match resolved {
141                TypeExpr::Ref(type_id, ref_args) => {
142                    let type_schema = catalog.get_type(type_id)?;
143                    let struct_schema = type_schema.as_struct()?;
144                    current_fields = &struct_schema.fields;
145                    current_args = std::borrow::Cow::Owned(ref_args);
146                }
147                _ => return None,
148            }
149        }
150    }
151    flat_fields.iter().position(|ff| ff.name.as_ref() == name.as_str())
152}
153
154pub mod schema;
155
156#[cfg(test)]
157mod tests;
158
159/// The set of mutations that can be applied to the catalog in format version 1.
160///
161/// Each variant represents a single atomic, semantic change to the catalog state.
162/// Implementation details like ID allocation are never recorded as edits.
163/// IDs are derived from the edits themselves during replay.
164#[derive(Debug, Clone, Serialize, Deserialize)]
165pub enum CatalogEditV1 {
166    /// Registers a new database with its assigned [`DatabaseId`].
167    CreateDatabase((DatabaseId, DatabaseSchema)),
168    /// Registers a new table with its assigned [`TableId`].
169    CreateTable((TableId, TableSchema)),
170    /// Registers a new struct type with its assigned [`TypeId`].
171    CreateType((TypeId, StructSchema)),
172    /// Registers a new enum type with its assigned [`TypeId`].
173    CreateEnum((TypeId, EnumSchema)),
174    /// A point-in-time snapshot of the full catalog state, written on journal
175    /// rotation. Contains only live entries - dropped tables and databases are
176    /// omitted, collapsing any create/drop history into the current state.
177    Snapshot(Vec<CatalogEdit>),
178}
179
180/// A versioned, append-only log of every mutation to the [`Catalog`].
181///
182/// Wrapping edits in a version enum allows the on-disk format to evolve
183/// without breaking existing journals - old `V1` edits remain valid even
184/// as new variants are added.
185#[repr(u16)]
186#[derive(derive_more::Debug, Clone, Serialize, Deserialize)]
187pub enum CatalogEdit {
188    #[debug("{:?}", _0)]
189    V1(CatalogEditV1) = 1,
190}
191
192#[derive(Debug, Display, Error, From)]
193pub enum CatalogError {
194    #[display("journal error: {}", _0)]
195    JournalError(JournalError),
196
197    #[from(skip)]
198    #[display("database with ID {} was not found", _0)]
199    DatabaseNotFound(#[error(not(source))] DatabaseId),
200    #[from(skip)]
201    #[display("database with name '{}' already exists", _0)]
202    DatabaseAlreadyExists(#[error(not(source))] TempestStr<'static>),
203
204    #[from(skip)]
205    #[display("table with ID {} was not found", _0)]
206    TableNotFound(#[error(not(source))] TableId),
207    #[from(skip)]
208    #[display("table with name '{}' already exists inside of this scope", _0)]
209    TableAlreadyExists(#[error(not(source))] TempestStr<'static>),
210
211    #[from(skip)]
212    #[display("type with ID {} was not found", _0)]
213    TypeNotFound(#[error(not(source))] TypeId),
214    #[from(skip)]
215    #[display("type with name '{}' already exists inside of this scope", _0)]
216    TypeAlreadyExists(#[error(not(source))] TempestStr<'static>),
217}
218
219#[derive(Debug, Clone)]
220pub struct CatalogState {
221    /// Monotonically increasing generator for the table IDs.
222    /// Incremented automatically inside of [`Self::apply()].
223    next_table_id: TableId,
224    /// Contains the definitions of all tables, accessible through their unique, stable ID.
225    pub tables: HashMap<TableId, TableSchema>,
226
227    /// Monotonically increasing generator for the database IDs.
228    /// Incremented automatically inside of [`Self::apply()].
229    next_database_id: DatabaseId,
230    /// Contains the definitions of all databases, accessible through their unique, stable ID.
231    pub databases: HashMap<DatabaseId, DatabaseSchema>,
232
233    /// Monotonically increasing generator for the type IDs.
234    /// Incremented automatically inside of [`Self::apply()].
235    next_type_id: TypeId,
236    /// Contains the definitions of all user-created types, accessible through their unique, stable ID.
237    pub types: HashMap<TypeId, TypeSchema>,
238
239    /// Global built-in types (e.g. `Option[T]`). Populated at startup, never journaled.
240    /// TypeIds use reserved values descending from `u32::MAX`.
241    pub global_types: HashMap<TypeId, TypeSchema>,
242}
243
244impl Default for CatalogState {
245    fn default() -> Self {
246        let mut global_types = HashMap::new();
247        let option_schema = TypeSchema::Enum(EnumSchema {
248            database_id: None,
249            name: "Option".into(),
250            generic_params: vec!["T".into()],
251            variants: {
252                let mut v = BTreeMap::new();
253                v.insert(VariantId(0), EnumVariantDef { name: "None".into(), fields: vec![] });
254                v.insert(VariantId(1), EnumVariantDef { name: "Some".into(), fields: vec![TypeExpr::GenericParam(0)] });
255                v
256            },
257        });
258        global_types.insert(TypeId(u32::MAX), option_schema);
259
260        Self {
261            next_table_id: TableId::default(),
262            tables: HashMap::new(),
263            next_database_id: DatabaseId::default(),
264            databases: HashMap::new(),
265            next_type_id: TypeId::default(),
266            types: HashMap::new(),
267            global_types,
268        }
269    }
270}
271
272impl CatalogState {
273    fn create_database_edit(
274        &self,
275        schema: DatabaseSchema,
276    ) -> Result<(DatabaseId, CatalogEdit), CatalogError> {
277        if self.databases.values().any(|db| db.name == schema.name) {
278            return Err(CatalogError::DatabaseAlreadyExists(schema.name));
279        }
280
281        let id = self.next_database_id;
282        trace!(?id, "assigned id to create database edit");
283
284        Ok((
285            id,
286            CatalogEdit::V1(CatalogEditV1::CreateDatabase((id, schema))),
287        ))
288    }
289
290    fn create_table_edit(
291        &self,
292        schema: TableSchema,
293    ) -> Result<(TableId, CatalogEdit), CatalogError> {
294        let db = self
295            .databases
296            .get(&schema.database_id)
297            .ok_or(CatalogError::DatabaseNotFound(schema.database_id))?;
298
299        let id = self.next_table_id;
300        trace!(?id, "assigned id to create table edit");
301
302        if db.tables.iter().any(|id| {
303            self.tables[id].database_id == schema.database_id && self.tables[id].name == schema.name
304        }) {
305            return Err(CatalogError::TableAlreadyExists(schema.name));
306        }
307
308        Ok((
309            id,
310            CatalogEdit::V1(CatalogEditV1::CreateTable((id, schema))),
311        ))
312    }
313
314    fn create_type_edit(
315        &self,
316        schema: StructSchema,
317    ) -> Result<(TypeId, CatalogEdit), CatalogError> {
318        if self
319            .types
320            .values()
321            .any(|t| t.database_id() == schema.database_id && t.name() == &schema.name)
322        {
323            return Err(CatalogError::TypeAlreadyExists(schema.name));
324        }
325
326        let id = self.next_type_id;
327        trace!(?id, "assigned id to create type edit");
328
329        Ok((id, CatalogEdit::V1(CatalogEditV1::CreateType((id, schema)))))
330    }
331
332    fn create_enum_edit(
333        &self,
334        schema: EnumSchema,
335    ) -> Result<(TypeId, CatalogEdit), CatalogError> {
336        if self
337            .types
338            .values()
339            .any(|t| t.database_id() == schema.database_id && t.name() == &schema.name)
340        {
341            return Err(CatalogError::TypeAlreadyExists(schema.name));
342        }
343
344        let id = self.next_type_id;
345        trace!(?id, "assigned id to create enum edit");
346
347        Ok((id, CatalogEdit::V1(CatalogEditV1::CreateEnum((id, schema)))))
348    }
349
350    pub(crate) fn get_database_by_name(
351        &self,
352        name: &TempestStr,
353    ) -> Option<(DatabaseId, &DatabaseSchema)> {
354        for (&id, schema) in &self.databases {
355            if schema.name == *name {
356                return Some((id, schema));
357            }
358        }
359        None
360    }
361
362    pub(crate) fn get_table_by_name(
363        &self,
364        database_id: DatabaseId,
365        name: &TempestStr,
366    ) -> Option<(TableId, &TableSchema)> {
367        for (&id, schema) in &self.tables {
368            if schema.database_id == database_id && schema.name == *name {
369                return Some((id, schema));
370            }
371        }
372        None
373    }
374
375    pub(crate) fn get_type_by_name(
376        &self,
377        database_id: DatabaseId,
378        name: &TempestStr,
379    ) -> Option<(TypeId, &TypeSchema)> {
380        for (&id, schema) in &self.types {
381            if schema.database_id() == Some(database_id) && schema.name() == name {
382                return Some((id, schema));
383            }
384        }
385        None
386    }
387
388    /// Looks up a type by ID, checking user types then global types.
389    pub fn get_type(&self, id: TypeId) -> Option<&TypeSchema> {
390        self.types.get(&id).or_else(|| self.global_types.get(&id))
391    }
392
393    /// Finds a global built-in type by name.
394    pub(crate) fn get_global_type_by_name(&self, name: &TempestStr) -> Option<(TypeId, &TypeSchema)> {
395        self.global_types.iter().find(|(_, s)| s.name() == name).map(|(&id, s)| (id, s))
396    }
397
398    /// Returns the dot-separated flat field name for a primary key path.
399    /// e.g. `[FieldId(1), FieldId(0)]` → `"address.city"`.
400    pub fn pk_path_name(
401        &self,
402        path: &[FieldId],
403        table_schema: &TableSchema,
404    ) -> String {
405        let struct_schema = self.get_type(table_schema.type_id)
406            .expect("type not found in catalog")
407            .as_struct()
408            .expect("table type must be a struct");
409        let mut name = String::new();
410        let mut current_fields = &struct_schema.fields;
411        let mut current_args: std::borrow::Cow<[TypeExpr]> =
412            std::borrow::Cow::Borrowed(&table_schema.generic_args);
413
414        for (i, fid) in path.iter().enumerate() {
415            let def = &current_fields[fid];
416            if name.is_empty() {
417                name.push_str(&def.name);
418            } else {
419                name.push('.');
420                name.push_str(&def.name);
421            }
422            if i < path.len() - 1 {
423                let resolved = match &def.ty {
424                    TypeExpr::GenericParam(idx) => current_args[*idx as usize].clone(),
425                    other => other.clone(),
426                };
427                if let TypeExpr::Ref(type_id, ref_args) = resolved {
428                    if let Some(ts) = self.get_type(type_id) {
429                        if let Some(s) = ts.as_struct() {
430                            current_fields = &s.fields;
431                            current_args = std::borrow::Cow::Owned(ref_args);
432                        }
433                    }
434                }
435            }
436        }
437        name
438    }
439
440    pub fn tables_in_database(
441        &self,
442        database: &str,
443    ) -> impl Iterator<Item = (TableId, &TableSchema)> {
444        self.databases
445            .iter()
446            .filter(|(_, db)| Some(&db.name) == TempestStr::from_borrowed(database).ok().as_ref())
447            .map(|(_, db)| db.tables.iter().map(|t| (*t, &self.tables[t])))
448            .flatten()
449    }
450
451    pub fn types_in_database(
452        &self,
453        database: &str,
454    ) -> impl Iterator<Item = (TypeId, &TypeSchema)> {
455        self.databases
456            .iter()
457            .filter(|(_, db)| Some(&db.name) == TempestStr::from_borrowed(database).ok().as_ref())
458            .map(|(_, db)| db.types.iter().map(|t| (*t, &self.types[t])))
459            .flatten()
460    }
461
462    /// # Panics
463    ///
464    /// Panics if there is no table that matches `table_id`.
465    pub(crate) fn resolved_table(&self, table_id: TableId) -> ResolvedTable<'_> {
466        let table_schema = self
467            .tables
468            .get(&table_id)
469            .expect("table not found in catalog");
470        let struct_schema = self
471            .get_type(table_schema.type_id)
472            .expect("type not found in catalog")
473            .as_struct()
474            .expect("table type must be a struct");
475        let flat_fields = flatten_schema(
476            &struct_schema.fields,
477            &table_schema.generic_args,
478            self,
479            "",
480        )
481        .expect("flat schema build failed - catalog is inconsistent");
482        let primary_key = table_schema.primary_key.iter()
483            .map(|path| pk_path_to_flat_idx(path, &struct_schema.fields, &table_schema.generic_args, self, &flat_fields)
484                .expect("pk path not found in flat fields - catalog is inconsistent"))
485            .collect();
486        ResolvedTable {
487            id: table_id,
488            fields: &struct_schema.fields,
489            generic_args: &table_schema.generic_args,
490            primary_key,
491            flat_fields,
492        }
493    }
494}
495
496impl Replayable for CatalogState {
497    type Edit = CatalogEdit;
498
499    #[instrument(skip_all, level = "debug")]
500    fn apply(&mut self, edit: Self::Edit) {
501        match edit {
502            CatalogEdit::V1(v1) => match v1 {
503                CatalogEditV1::CreateDatabase((id, schema)) => {
504                    debug!(?id, ?schema, "applying create database edit");
505                    assert!(!self.databases.contains_key(&id));
506                    self.next_database_id = DatabaseId(*id + 1).max(self.next_database_id);
507                    self.databases.insert(id, schema);
508                }
509                CatalogEditV1::CreateTable((id, schema)) => {
510                    debug!(?id, ?schema, "applying create table edit");
511                    assert!(!self.tables.contains_key(&id));
512                    self.next_table_id = TableId(*id + 1).max(self.next_table_id);
513                    // add the id to the database's table set
514                    self.databases
515                        .get_mut(&schema.database_id)
516                        .expect("database must exist when applying CreateTable")
517                        .tables
518                        .insert(id);
519                    self.tables.insert(id, schema);
520                }
521                CatalogEditV1::CreateType((id, schema)) => {
522                    debug!(?id, ?schema, "applying create type edit");
523                    assert!(!self.types.contains_key(&id));
524                    self.next_type_id = TypeId(*id + 1).max(self.next_type_id);
525                    if let Some(db_id) = schema.database_id {
526                        self.databases
527                            .get_mut(&db_id)
528                            .expect("database must exist when applying CreateType")
529                            .types
530                            .insert(id);
531                    }
532                    self.types.insert(id, TypeSchema::Struct(schema));
533                }
534                CatalogEditV1::CreateEnum((id, schema)) => {
535                    debug!(?id, ?schema, "applying create enum edit");
536                    assert!(!self.types.contains_key(&id));
537                    self.next_type_id = TypeId(*id + 1).max(self.next_type_id);
538                    if let Some(db_id) = schema.database_id {
539                        self.databases
540                            .get_mut(&db_id)
541                            .expect("database must exist when applying CreateEnum")
542                            .types
543                            .insert(id);
544                    }
545                    self.types.insert(id, TypeSchema::Enum(schema));
546                }
547                CatalogEditV1::Snapshot(edits) => {
548                    debug!(count = edits.len(), "applying snapshot edits");
549                    for e in edits {
550                        self.apply(e);
551                    }
552                }
553            },
554        }
555    }
556
557    fn snapshot(&self) -> Self::Edit {
558        let mut edits = Vec::new();
559
560        edits.extend(self.databases.iter().map(|(id, schema)| {
561            CatalogEdit::V1(CatalogEditV1::CreateDatabase((id.clone(), schema.clone())))
562        }));
563
564        edits.extend(self.types.iter().map(|(id, schema)| match schema {
565            TypeSchema::Struct(s) => CatalogEdit::V1(CatalogEditV1::CreateType((*id, s.clone()))),
566            TypeSchema::Enum(e) => CatalogEdit::V1(CatalogEditV1::CreateEnum((*id, e.clone()))),
567        }));
568
569        edits.extend(self.tables.iter().map(|(id, schema)| {
570            CatalogEdit::V1(CatalogEditV1::CreateTable((id.clone(), schema.clone())))
571        }));
572
573        CatalogEdit::V1(CatalogEditV1::Snapshot(edits))
574    }
575
576    fn filename_prefix() -> &'static str {
577        "catalog"
578    }
579
580    fn initial() -> Self {
581        CatalogState::default()
582    }
583}
584
585/// # Catalog
586///
587/// The catalog is the authoritative registry of all databases and tables in a
588/// Tempest instance. It maps stable numeric [`DatabaseId`]s and [`TableId`]s to
589/// their definitions, and persists every mutation to a [`Journal`] for recovery
590/// across restarts.
591///
592/// All mutations are validated before being written - nothing reaches the journal
593/// that would not survive replay. Reads are served directly from the in-memory
594/// [`CatalogState`] via [`Deref`].
595pub(crate) struct Catalog<I: Io> {
596    data: CatalogState,
597    journal: JournalHandle<CatalogState>,
598    journal_handle: JoinHandle<()>,
599    _marker: PhantomData<I>,
600}
601
602impl<I: Io> Catalog<I> {
603    /// Opens the catalog at `tempest_root/catalog`, replaying any existing
604    /// journal to restore state. Creates the directory if it does not exist.
605    #[instrument(skip_all, level = "info")]
606    pub(crate) async fn open(dir: PathBuf, config: CatalogConfig) -> Result<Self, CatalogError> {
607        info!("opening catalog at {:?}", dir);
608        let (journal, journal_handle) =
609            Journal::<CatalogState, I>::new(dir, config.journal.clone()).await?;
610        let data = journal.data().clone();
611
612        info!("finished opening catalog");
613
614        Ok(Self {
615            data,
616            journal,
617            journal_handle,
618            _marker: PhantomData,
619        })
620    }
621
622    /// Registers a new database, returning its assigned [`DatabaseId`].
623    ///
624    /// # Errors
625    ///
626    /// - [`CatalogError::DatabaseAlreadyExists`]: A database with the same name already exists.
627    #[instrument(skip_all, level = "info")]
628    pub(crate) async fn create_database(
629        &mut self,
630        schema: DatabaseSchema,
631    ) -> Result<DatabaseId, CatalogError> {
632        let (id, edit) = self.create_database_edit(schema)?;
633        debug!("perstisting database schema to journal");
634        self.journal.append(edit.clone()).await?;
635        self.data.apply(edit);
636        Ok(id)
637    }
638
639    /// Registers a new table within an existing database, returning its assigned [`TableId`].
640    ///
641    /// # Errors
642    ///
643    /// - [`CatalogError::DatabaseNotFound`]: The table's `database_id` does not correspond to
644    ///   a known database.
645    /// - [`CatalogError::TableAlreadyExists`]: A table with the same name already exists within
646    ///   that database.
647    #[instrument(skip_all, level = "info")]
648    pub(crate) async fn create_table(
649        &mut self,
650        schema: TableSchema,
651    ) -> Result<TableId, CatalogError> {
652        let (id, edit) = self.create_table_edit(schema)?;
653        debug!("perstisting table schema to journal");
654        self.journal.append(edit.clone()).await?;
655        self.data.apply(edit);
656        Ok(id)
657    }
658
659    #[instrument(skip_all, level = "info")]
660    pub(crate) async fn create_type(
661        &mut self,
662        schema: StructSchema,
663    ) -> Result<TypeId, CatalogError> {
664        let (id, edit) = self.create_type_edit(schema)?;
665        debug!("persisting type schema to journal");
666        self.journal.append(edit.clone()).await?;
667        self.data.apply(edit);
668        Ok(id)
669    }
670
671    #[instrument(skip_all, level = "info")]
672    pub(crate) async fn create_enum(
673        &mut self,
674        schema: EnumSchema,
675    ) -> Result<TypeId, CatalogError> {
676        let (id, edit) = self.create_enum_edit(schema)?;
677        debug!("persisting enum schema to journal");
678        self.journal.append(edit.clone()).await?;
679        self.data.apply(edit);
680        Ok(id)
681    }
682
683    pub(crate) async fn shutdown(self) -> Result<(), CatalogError> {
684        drop(self.journal);
685        let _ = self.journal_handle.await;
686        Ok(())
687    }
688}
689
690// Allow for accessing the current state, like the databases, tables, etc., directly through the
691// `Catalog`, just as if it contained them itself. Makes the external use cleaner.
692impl<I: Io> Deref for Catalog<I> {
693    type Target = CatalogState;
694
695    fn deref(&self) -> &Self::Target {
696        &self.data
697    }
698}
699
700#[cfg(test)]
701pub(crate) mod testing {
702    use std::collections::BTreeMap;
703
704    use crate::{
705        catalog::schema::{FieldDef, FieldId, TypeExpr},
706        types::TempestType,
707    };
708
709    use super::*;
710    pub(crate) fn create_catalog_state_for_testing() -> CatalogState {
711        let mut state = CatalogState::initial();
712
713        let (db_id, edit) = state
714            .create_database_edit(DatabaseSchema::new("main".into()))
715            .unwrap();
716        state.apply(edit);
717
718        // create a type
719        let mut fields = BTreeMap::new();
720        fields.insert(
721            FieldId(0),
722            FieldDef {
723                name: "id".into(),
724                ty: TypeExpr::Primitive(TempestType::Int64),
725            },
726        );
727        fields.insert(
728            FieldId(1),
729            FieldDef {
730                name: "name".into(),
731                ty: TypeExpr::Primitive(TempestType::String),
732            },
733        );
734        let (type_id, edit) = state
735            .create_type_edit(StructSchema {
736                database_id: Some(db_id),
737                name: "User".into(),
738                generic_params: Vec::new(),
739                fields,
740            })
741            .unwrap();
742        state.apply(edit);
743
744        // create a table
745        let (_, edit) = state
746            .create_table_edit(TableSchema {
747                database_id: db_id,
748                name: "users".into(),
749                type_id,
750                generic_args: Vec::new(),
751                primary_key: vec![vec![FieldId(0)]],
752            })
753            .unwrap();
754        state.apply(edit);
755
756        state
757    }
758}