reddb_server/application/
ports_impls_schema.rs1use super::*;
2use crate::application::schema::{CreateTablePartitionKind, CreateTablePartitionSpec};
3use crate::storage::query::ast::{PartitionKind, PartitionSpec};
4use crate::storage::query::{
5 CreateColumnDef, CreateTableQuery, CreateTimeSeriesQuery, DropTableQuery, DropTimeSeriesQuery,
6};
7use crate::storage::schema::SqlTypeName;
8
9fn api_query(label: &str, name: &str) -> String {
10 format!("api.{label}({name})")
11}
12
13fn to_create_column_def(
14 column: crate::application::schema::CreateTableColumnInput,
15) -> CreateColumnDef {
16 let sql_type = SqlTypeName::parse_declared(&column.data_type);
17 CreateColumnDef {
18 name: column.name,
19 data_type: column.data_type,
20 sql_type,
21 not_null: column.not_null,
22 default: column.default,
23 compress: column.compress,
24 unique: column.unique,
25 primary_key: column.primary_key,
26 enum_variants: column.enum_variants,
27 array_element: column.array_element,
28 decimal_precision: column.decimal_precision,
29 }
30}
31
32fn to_partition_kind(kind: CreateTablePartitionKind) -> PartitionKind {
33 match kind {
34 CreateTablePartitionKind::Range => PartitionKind::Range,
35 CreateTablePartitionKind::List => PartitionKind::List,
36 CreateTablePartitionKind::Hash => PartitionKind::Hash,
37 }
38}
39
40fn to_partition_spec(spec: CreateTablePartitionSpec) -> PartitionSpec {
41 PartitionSpec {
42 kind: to_partition_kind(spec.kind),
43 column: spec.column,
44 }
45}
46
47impl RuntimeSchemaPort for RedDBRuntime {
48 fn create_table(&self, input: CreateTableInput) -> RedDBResult<RuntimeQueryResult> {
49 let CreateTableInput {
50 name,
51 columns,
52 if_not_exists,
53 default_ttl_ms,
54 context_index_fields,
55 timestamps,
56 partition_by,
57 tenant_by,
58 append_only,
59 } = input;
60 let raw_query = api_query("create_table", &name);
61 let query = CreateTableQuery {
62 collection_model: crate::catalog::CollectionModel::Table,
63 name,
64 columns: columns.into_iter().map(to_create_column_def).collect(),
65 if_not_exists,
66 default_ttl_ms,
67 metrics_rollup_policies: Vec::new(),
68 context_index_fields: context_index_fields.clone(),
69 context_index_enabled: !context_index_fields.is_empty(),
70 timestamps,
71 partition_by: partition_by.map(to_partition_spec),
72 tenant_by,
73 append_only,
74 subscriptions: Vec::new(),
75 vault_own_master_key: false,
76 };
77 RedDBRuntime::execute_create_table(self, &raw_query, &query)
78 }
79
80 fn drop_table(&self, input: DropTableInput) -> RedDBResult<RuntimeQueryResult> {
81 let raw_query = api_query("drop_table", &input.name);
82 let query = DropTableQuery {
83 name: input.name,
84 if_exists: input.if_exists,
85 };
86 RedDBRuntime::execute_drop_table(self, &raw_query, &query)
87 }
88
89 fn create_timeseries(&self, input: CreateTimeSeriesInput) -> RedDBResult<RuntimeQueryResult> {
90 let CreateTimeSeriesInput {
91 name,
92 retention_ms,
93 chunk_size,
94 downsample_policies,
95 if_not_exists,
96 } = input;
97 let raw_query = api_query("create_timeseries", &name);
98 let query = CreateTimeSeriesQuery {
99 name,
100 retention_ms,
101 chunk_size,
102 downsample_policies,
103 if_not_exists,
104 hypertable: None,
105 };
106 RedDBRuntime::execute_create_timeseries(self, &raw_query, &query)
107 }
108
109 fn drop_timeseries(&self, input: DropTimeSeriesInput) -> RedDBResult<RuntimeQueryResult> {
110 let raw_query = api_query("drop_timeseries", &input.name);
111 let query = DropTimeSeriesQuery {
112 name: input.name,
113 if_exists: input.if_exists,
114 };
115 RedDBRuntime::execute_drop_timeseries(self, &raw_query, &query)
116 }
117}