Skip to main content

reddb_server/application/
ports_impls_schema.rs

1use super::*;
2use crate::application::schema::{CreateTablePartitionKind, CreateTablePartitionSpec};
3use crate::storage::query::ast::{PartitionKind, PartitionSpec};
4use crate::storage::query::{
5    CreateColumnDef, CreateTableQuery, CreateTimeSeriesQuery, DropTableQuery, DropTimeSeriesQuery,
6};
7use crate::storage::schema::SqlTypeName;
8
9fn api_query(label: &str, name: &str) -> String {
10    format!("api.{label}({name})")
11}
12
13fn to_create_column_def(
14    column: crate::application::schema::CreateTableColumnInput,
15) -> CreateColumnDef {
16    let sql_type = SqlTypeName::parse_declared(&column.data_type);
17    CreateColumnDef {
18        name: column.name,
19        data_type: column.data_type,
20        sql_type,
21        not_null: column.not_null,
22        default: column.default,
23        compress: column.compress,
24        unique: column.unique,
25        primary_key: column.primary_key,
26        enum_variants: column.enum_variants,
27        array_element: column.array_element,
28        decimal_precision: column.decimal_precision,
29    }
30}
31
32fn to_partition_kind(kind: CreateTablePartitionKind) -> PartitionKind {
33    match kind {
34        CreateTablePartitionKind::Range => PartitionKind::Range,
35        CreateTablePartitionKind::List => PartitionKind::List,
36        CreateTablePartitionKind::Hash => PartitionKind::Hash,
37    }
38}
39
40fn to_partition_spec(spec: CreateTablePartitionSpec) -> PartitionSpec {
41    PartitionSpec {
42        kind: to_partition_kind(spec.kind),
43        column: spec.column,
44    }
45}
46
47impl RuntimeSchemaPort for RedDBRuntime {
48    fn create_table(&self, input: CreateTableInput) -> RedDBResult<RuntimeQueryResult> {
49        let CreateTableInput {
50            name,
51            columns,
52            if_not_exists,
53            default_ttl_ms,
54            context_index_fields,
55            timestamps,
56            partition_by,
57            tenant_by,
58            append_only,
59        } = input;
60        let raw_query = api_query("create_table", &name);
61        let query = CreateTableQuery {
62            collection_model: crate::catalog::CollectionModel::Table,
63            name,
64            columns: columns.into_iter().map(to_create_column_def).collect(),
65            if_not_exists,
66            default_ttl_ms,
67            metrics_rollup_policies: Vec::new(),
68            context_index_fields: context_index_fields.clone(),
69            context_index_enabled: !context_index_fields.is_empty(),
70            timestamps,
71            partition_by: partition_by.map(to_partition_spec),
72            tenant_by,
73            append_only,
74            subscriptions: Vec::new(),
75            vault_own_master_key: false,
76        };
77        RedDBRuntime::execute_create_table(self, &raw_query, &query)
78    }
79
80    fn drop_table(&self, input: DropTableInput) -> RedDBResult<RuntimeQueryResult> {
81        let raw_query = api_query("drop_table", &input.name);
82        let query = DropTableQuery {
83            name: input.name,
84            if_exists: input.if_exists,
85        };
86        RedDBRuntime::execute_drop_table(self, &raw_query, &query)
87    }
88
89    fn create_timeseries(&self, input: CreateTimeSeriesInput) -> RedDBResult<RuntimeQueryResult> {
90        let CreateTimeSeriesInput {
91            name,
92            retention_ms,
93            chunk_size,
94            downsample_policies,
95            if_not_exists,
96        } = input;
97        let raw_query = api_query("create_timeseries", &name);
98        let query = CreateTimeSeriesQuery {
99            name,
100            retention_ms,
101            chunk_size,
102            downsample_policies,
103            if_not_exists,
104            hypertable: None,
105        };
106        RedDBRuntime::execute_create_timeseries(self, &raw_query, &query)
107    }
108
109    fn drop_timeseries(&self, input: DropTimeSeriesInput) -> RedDBResult<RuntimeQueryResult> {
110        let raw_query = api_query("drop_timeseries", &input.name);
111        let query = DropTimeSeriesQuery {
112            name: input.name,
113            if_exists: input.if_exists,
114        };
115        RedDBRuntime::execute_drop_timeseries(self, &raw_query, &query)
116    }
117}