Skip to main content

reddb_server/application/
ports_impls_schema.rs

1use super::*;
2use crate::application::schema::{CreateTablePartitionKind, CreateTablePartitionSpec};
3use crate::storage::query::ast::{PartitionKind, PartitionSpec};
4use crate::storage::query::{
5    CreateColumnDef, CreateTableQuery, CreateTimeSeriesQuery, DropTableQuery, DropTimeSeriesQuery,
6};
7use crate::storage::schema::SqlTypeName;
8
9fn api_query(label: &str, name: &str) -> String {
10    format!("api.{label}({name})")
11}
12
13fn to_create_column_def(
14    column: crate::application::schema::CreateTableColumnInput,
15) -> CreateColumnDef {
16    let sql_type = SqlTypeName::parse_declared(&column.data_type);
17    CreateColumnDef {
18        name: column.name,
19        data_type: column.data_type,
20        sql_type,
21        not_null: column.not_null,
22        default: column.default,
23        compress: column.compress,
24        unique: column.unique,
25        primary_key: column.primary_key,
26        enum_variants: column.enum_variants,
27        array_element: column.array_element,
28        decimal_precision: column.decimal_precision,
29    }
30}
31
32fn to_partition_kind(kind: CreateTablePartitionKind) -> PartitionKind {
33    match kind {
34        CreateTablePartitionKind::Range => PartitionKind::Range,
35        CreateTablePartitionKind::List => PartitionKind::List,
36        CreateTablePartitionKind::Hash => PartitionKind::Hash,
37    }
38}
39
40fn to_partition_spec(spec: CreateTablePartitionSpec) -> PartitionSpec {
41    PartitionSpec {
42        kind: to_partition_kind(spec.kind),
43        column: spec.column,
44    }
45}
46
47impl RuntimeSchemaPort for RedDBRuntime {
48    fn create_table(&self, input: CreateTableInput) -> RedDBResult<RuntimeQueryResult> {
49        let CreateTableInput {
50            name,
51            columns,
52            if_not_exists,
53            default_ttl_ms,
54            context_index_fields,
55            timestamps,
56            partition_by,
57            tenant_by,
58            append_only,
59        } = input;
60        let raw_query = api_query("create_table", &name);
61        let query = CreateTableQuery {
62            collection_model: crate::catalog::CollectionModel::Table,
63            name,
64            columns: columns.into_iter().map(to_create_column_def).collect(),
65            if_not_exists,
66            default_ttl_ms,
67            metrics_rollup_policies: Vec::new(),
68            context_index_fields: context_index_fields.clone(),
69            context_index_enabled: !context_index_fields.is_empty(),
70            timestamps,
71            partition_by: partition_by.map(to_partition_spec),
72            tenant_by,
73            append_only,
74            subscriptions: Vec::new(),
75            analytics_config: Vec::new(),
76            vault_own_master_key: false,
77            ai_policy: None,
78        };
79        RedDBRuntime::execute_create_table(self, &raw_query, &query)
80    }
81
82    fn drop_table(&self, input: DropTableInput) -> RedDBResult<RuntimeQueryResult> {
83        let raw_query = api_query("drop_table", &input.name);
84        let query = DropTableQuery {
85            name: input.name,
86            if_exists: input.if_exists,
87        };
88        RedDBRuntime::execute_drop_table(self, &raw_query, &query)
89    }
90
91    fn create_timeseries(&self, input: CreateTimeSeriesInput) -> RedDBResult<RuntimeQueryResult> {
92        let CreateTimeSeriesInput {
93            name,
94            retention_ms,
95            chunk_size,
96            downsample_policies,
97            if_not_exists,
98        } = input;
99        let raw_query = api_query("create_timeseries", &name);
100        let query = CreateTimeSeriesQuery {
101            name,
102            retention_ms,
103            chunk_size,
104            downsample_policies,
105            if_not_exists,
106            hypertable: None,
107            session_key: None,
108            session_gap_ms: None,
109            // Programmatic create_timeseries has no DDL surface for the
110            // columnar flag; default to the row engine (#911).
111            columnar: false,
112        };
113        RedDBRuntime::execute_create_timeseries(self, &raw_query, &query)
114    }
115
116    fn drop_timeseries(&self, input: DropTimeSeriesInput) -> RedDBResult<RuntimeQueryResult> {
117        let raw_query = api_query("drop_timeseries", &input.name);
118        let query = DropTimeSeriesQuery {
119            name: input.name,
120            if_exists: input.if_exists,
121        };
122        RedDBRuntime::execute_drop_timeseries(self, &raw_query, &query)
123    }
124}