reddb_server/application/
ports_impls_schema.rs1use super::*;
2use crate::application::schema::{CreateTablePartitionKind, CreateTablePartitionSpec};
3use crate::storage::query::ast::{PartitionKind, PartitionSpec};
4use crate::storage::query::{
5 CreateColumnDef, CreateTableQuery, CreateTimeSeriesQuery, DropTableQuery, DropTimeSeriesQuery,
6};
7use crate::storage::schema::SqlTypeName;
8
9fn api_query(label: &str, name: &str) -> String {
10 format!("api.{label}({name})")
11}
12
13fn to_create_column_def(
14 column: crate::application::schema::CreateTableColumnInput,
15) -> CreateColumnDef {
16 let sql_type = SqlTypeName::parse_declared(&column.data_type);
17 CreateColumnDef {
18 name: column.name,
19 data_type: column.data_type,
20 sql_type,
21 not_null: column.not_null,
22 default: column.default,
23 compress: column.compress,
24 unique: column.unique,
25 primary_key: column.primary_key,
26 enum_variants: column.enum_variants,
27 array_element: column.array_element,
28 decimal_precision: column.decimal_precision,
29 }
30}
31
32fn to_partition_kind(kind: CreateTablePartitionKind) -> PartitionKind {
33 match kind {
34 CreateTablePartitionKind::Range => PartitionKind::Range,
35 CreateTablePartitionKind::List => PartitionKind::List,
36 CreateTablePartitionKind::Hash => PartitionKind::Hash,
37 }
38}
39
40fn to_partition_spec(spec: CreateTablePartitionSpec) -> PartitionSpec {
41 PartitionSpec {
42 kind: to_partition_kind(spec.kind),
43 column: spec.column,
44 }
45}
46
47impl RuntimeSchemaPort for RedDBRuntime {
48 fn create_table(&self, input: CreateTableInput) -> RedDBResult<RuntimeQueryResult> {
49 let CreateTableInput {
50 name,
51 columns,
52 if_not_exists,
53 default_ttl_ms,
54 context_index_fields,
55 timestamps,
56 partition_by,
57 tenant_by,
58 append_only,
59 } = input;
60 let raw_query = api_query("create_table", &name);
61 let query = CreateTableQuery {
62 collection_model: crate::catalog::CollectionModel::Table,
63 name,
64 columns: columns.into_iter().map(to_create_column_def).collect(),
65 if_not_exists,
66 default_ttl_ms,
67 metrics_rollup_policies: Vec::new(),
68 context_index_fields: context_index_fields.clone(),
69 context_index_enabled: !context_index_fields.is_empty(),
70 timestamps,
71 partition_by: partition_by.map(to_partition_spec),
72 tenant_by,
73 append_only,
74 subscriptions: Vec::new(),
75 analytics_config: Vec::new(),
76 vault_own_master_key: false,
77 };
78 RedDBRuntime::execute_create_table(self, &raw_query, &query)
79 }
80
81 fn drop_table(&self, input: DropTableInput) -> RedDBResult<RuntimeQueryResult> {
82 let raw_query = api_query("drop_table", &input.name);
83 let query = DropTableQuery {
84 name: input.name,
85 if_exists: input.if_exists,
86 };
87 RedDBRuntime::execute_drop_table(self, &raw_query, &query)
88 }
89
90 fn create_timeseries(&self, input: CreateTimeSeriesInput) -> RedDBResult<RuntimeQueryResult> {
91 let CreateTimeSeriesInput {
92 name,
93 retention_ms,
94 chunk_size,
95 downsample_policies,
96 if_not_exists,
97 } = input;
98 let raw_query = api_query("create_timeseries", &name);
99 let query = CreateTimeSeriesQuery {
100 name,
101 retention_ms,
102 chunk_size,
103 downsample_policies,
104 if_not_exists,
105 hypertable: None,
106 session_key: None,
107 session_gap_ms: None,
108 };
109 RedDBRuntime::execute_create_timeseries(self, &raw_query, &query)
110 }
111
112 fn drop_timeseries(&self, input: DropTimeSeriesInput) -> RedDBResult<RuntimeQueryResult> {
113 let raw_query = api_query("drop_timeseries", &input.name);
114 let query = DropTimeSeriesQuery {
115 name: input.name,
116 if_exists: input.if_exists,
117 };
118 RedDBRuntime::execute_drop_timeseries(self, &raw_query, &query)
119 }
120}