Skip to main content

reddb_server/runtime/
impl_timeseries.rs

1//! Time-series DDL execution
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use super::*;
7
8const TIMESERIES_META_COLLECTION: &str = "red_timeseries_meta";
9
10impl RedDBRuntime {
11    pub fn execute_create_timeseries(
12        &self,
13        raw_query: &str,
14        query: &CreateTimeSeriesQuery,
15    ) -> RedDBResult<RuntimeQueryResult> {
16        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
17        for spec in &query.downsample_policies {
18            crate::storage::timeseries::retention::DownsamplePolicy::parse(spec).ok_or_else(
19                || RedDBError::Query(format!("invalid downsample policy '{}'", spec)),
20            )?;
21        }
22
23        let store = self.inner.db.store();
24        let exists = store.get_collection(&query.name).is_some();
25        if exists {
26            if query.if_not_exists {
27                return Ok(RuntimeQueryResult::ok_message(
28                    raw_query.to_string(),
29                    &format!("timeseries '{}' already exists", query.name),
30                    "create",
31                ));
32            }
33            return Err(RedDBError::Query(format!(
34                "timeseries '{}' already exists",
35                query.name
36            )));
37        }
38        store
39            .create_collection(&query.name)
40            .map_err(|e| RedDBError::Internal(e.to_string()))?;
41        if let Some(ttl_ms) = query.retention_ms {
42            self.inner
43                .db
44                .set_collection_default_ttl_ms(&query.name, ttl_ms);
45        }
46        // CREATE HYPERTABLE declares the collection as a Table so
47        // INSERT goes through the row path (which now includes
48        // automatic chunk routing). Plain CREATE TIMESERIES keeps
49        // the native TimeSeries contract with its metric/value/tags
50        // column convention.
51        let contract = if query.hypertable.is_some() {
52            hypertable_collection_contract(query)
53        } else {
54            timeseries_collection_contract(query)
55        };
56        self.inner
57            .db
58            .save_collection_contract(contract)
59            .map_err(|err| RedDBError::Internal(err.to_string()))?;
60        save_timeseries_metadata(store.as_ref(), query)?;
61
62        // `CREATE HYPERTABLE` additionally registers a HypertableSpec
63        // so chunk routing + retention sweeps can address this table.
64        // Plain `CREATE TIMESERIES` leaves `hypertable` = None and the
65        // runtime behaves as before.
66        if let Some(ht) = &query.hypertable {
67            let mut spec = crate::storage::timeseries::HypertableSpec::new(
68                query.name.clone(),
69                ht.time_column.clone(),
70                ht.chunk_interval_ns,
71            );
72            if let Some(ttl) = ht.default_ttl_ns {
73                spec = spec.with_ttl_ns(ttl);
74            }
75            self.inner.db.hypertables().register(spec);
76        }
77
78        self.invalidate_result_cache();
79        self.inner
80            .db
81            .persist_metadata()
82            .map_err(|e| RedDBError::Internal(e.to_string()))?;
83        // Issue #120 — surface timeseries / hypertable in the
84        // schema-vocabulary. The hypertable variant carries the
85        // declared time column.
86        let columns: Vec<String> = query
87            .hypertable
88            .as_ref()
89            .map(|ht| vec![ht.time_column.clone()])
90            .unwrap_or_else(|| vec!["metric".to_string(), "value".to_string()]);
91        self.schema_vocabulary_apply(
92            crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
93                collection: query.name.clone(),
94                columns,
95                type_tags: Vec::new(),
96                description: None,
97            },
98        );
99
100        let noun = if query.hypertable.is_some() {
101            "hypertable"
102        } else {
103            "timeseries"
104        };
105        let mut msg = format!("{noun} '{}' created", query.name);
106        if let Some(ret) = query.retention_ms {
107            msg.push_str(&format!(" (retention={}ms)", ret));
108        }
109        if let Some(cs) = query.chunk_size {
110            msg.push_str(&format!(" (chunk_size={})", cs));
111        }
112        if !query.downsample_policies.is_empty() {
113            msg.push_str(&format!(
114                " (downsample_policies={})",
115                query.downsample_policies.len()
116            ));
117        }
118        Ok(RuntimeQueryResult::ok_message(
119            raw_query.to_string(),
120            &msg,
121            "create",
122        ))
123    }
124
125    pub fn execute_drop_timeseries(
126        &self,
127        raw_query: &str,
128        query: &DropTimeSeriesQuery,
129    ) -> RedDBResult<RuntimeQueryResult> {
130        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
131        let store = self.inner.db.store();
132        if super::impl_ddl::is_system_schema_name(&query.name) {
133            return Err(RedDBError::Query("system schema is read-only".to_string()));
134        }
135        if store.get_collection(&query.name).is_none() {
136            if query.if_exists {
137                return Ok(RuntimeQueryResult::ok_message(
138                    raw_query.to_string(),
139                    &format!("timeseries '{}' does not exist", query.name),
140                    "drop",
141                ));
142            }
143            return Err(RedDBError::NotFound(format!(
144                "timeseries '{}' not found",
145                query.name
146            )));
147        }
148        let actual = crate::runtime::ddl::polymorphic_resolver::resolve(
149            &query.name,
150            &self.inner.db.catalog_model_snapshot(),
151        )?;
152        if actual != crate::catalog::CollectionModel::TimeSeries
153            && actual != crate::catalog::CollectionModel::Table
154        {
155            crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
156                crate::catalog::CollectionModel::TimeSeries,
157                actual,
158            )?;
159        }
160        // Remove from the hypertable registry before dropping the
161        // underlying collection — the registry lookup is cheap and
162        // staying consistent is the point of having a separate call.
163        let _ = self.inner.db.hypertables().unregister(&query.name);
164        store
165            .drop_collection(&query.name)
166            .map_err(|e| RedDBError::Internal(e.to_string()))?;
167        self.inner.db.clear_collection_default_ttl_ms(&query.name);
168        self.inner
169            .db
170            .remove_collection_contract(&query.name)
171            .map_err(|err| RedDBError::Internal(err.to_string()))?;
172        remove_timeseries_metadata(store.as_ref(), &query.name);
173        self.invalidate_result_cache();
174        self.inner
175            .db
176            .persist_metadata()
177            .map_err(|e| RedDBError::Internal(e.to_string()))?;
178        // Issue #120 — invalidate the schema-vocabulary entry for the
179        // dropped timeseries / hypertable.
180        self.schema_vocabulary_apply(
181            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
182                collection: query.name.clone(),
183            },
184        );
185        Ok(RuntimeQueryResult::ok_message(
186            raw_query.to_string(),
187            &format!("timeseries '{}' dropped", query.name),
188            "drop",
189        ))
190    }
191}
192
193fn save_timeseries_metadata(
194    store: &crate::storage::unified::UnifiedStore,
195    query: &CreateTimeSeriesQuery,
196) -> RedDBResult<()> {
197    remove_timeseries_metadata(store, &query.name);
198    let _ = store.get_or_create_collection(TIMESERIES_META_COLLECTION);
199
200    let mut fields = HashMap::new();
201    fields.insert(
202        "kind".to_string(),
203        Value::text("timeseries_config".to_string()),
204    );
205    fields.insert("series".to_string(), Value::text(query.name.clone()));
206    fields.insert(
207        "retention_ms".to_string(),
208        query
209            .retention_ms
210            .map(Value::UnsignedInteger)
211            .unwrap_or(Value::Null),
212    );
213    fields.insert(
214        "chunk_size".to_string(),
215        query
216            .chunk_size
217            .map(|value| Value::UnsignedInteger(value as u64))
218            .unwrap_or(Value::Null),
219    );
220    fields.insert(
221        "downsample_policies".to_string(),
222        Value::Array(
223            query
224                .downsample_policies
225                .iter()
226                .cloned()
227                .map(Value::text)
228                .collect(),
229        ),
230    );
231
232    store
233        .insert_auto(
234            TIMESERIES_META_COLLECTION,
235            UnifiedEntity::new(
236                EntityId::new(0),
237                EntityKind::TableRow {
238                    table: Arc::from(TIMESERIES_META_COLLECTION),
239                    row_id: 0,
240                },
241                EntityData::Row(crate::storage::RowData {
242                    columns: Vec::new(),
243                    named: Some(fields),
244                    schema: None,
245                }),
246            ),
247        )
248        .map_err(|err| RedDBError::Internal(err.to_string()))?;
249
250    Ok(())
251}
252
253fn remove_timeseries_metadata(store: &crate::storage::unified::UnifiedStore, series: &str) {
254    let Some(manager) = store.get_collection(TIMESERIES_META_COLLECTION) else {
255        return;
256    };
257    let rows = manager.query_all(|entity| {
258        entity.data.as_row().is_some_and(|row| {
259            row.get_field("series").is_some_and(
260                |value| matches!(value, Value::Text(candidate) if &**candidate == series),
261            )
262        })
263    });
264    for row in rows {
265        let _ = store.delete(TIMESERIES_META_COLLECTION, row.id);
266    }
267}
268
269fn hypertable_collection_contract(
270    query: &CreateTimeSeriesQuery,
271) -> crate::physical::CollectionContract {
272    let now = current_unix_ms();
273    crate::physical::CollectionContract {
274        name: query.name.clone(),
275        // Table model — rows go through the normal INSERT path,
276        // which now calls HypertableRegistry::route after each row
277        // lands. Hypertable-specific behaviour (chunk bounds, TTL
278        // sweeps) lives on the registry, not the contract.
279        declared_model: crate::catalog::CollectionModel::Table,
280        schema_mode: crate::catalog::SchemaMode::SemiStructured,
281        origin: crate::physical::ContractOrigin::Explicit,
282        version: 1,
283        created_at_unix_ms: now,
284        updated_at_unix_ms: now,
285        default_ttl_ms: query.retention_ms,
286        vector_dimension: None,
287        vector_metric: None,
288        context_index_fields: Vec::new(),
289        declared_columns: Vec::new(),
290        table_def: None,
291        timestamps_enabled: false,
292        context_index_enabled: false,
293        // Hypertable data is conceptually immutable once the chunk
294        // seals. Reject UPDATE / DELETE at parse time and give the
295        // operator a clear message instead of silent coalescing.
296        append_only: true,
297        subscriptions: Vec::new(),
298    }
299}
300
301fn timeseries_collection_contract(
302    query: &CreateTimeSeriesQuery,
303) -> crate::physical::CollectionContract {
304    let now = current_unix_ms();
305    crate::physical::CollectionContract {
306        name: query.name.clone(),
307        declared_model: crate::catalog::CollectionModel::TimeSeries,
308        schema_mode: crate::catalog::SchemaMode::SemiStructured,
309        origin: crate::physical::ContractOrigin::Explicit,
310        version: 1,
311        created_at_unix_ms: now,
312        updated_at_unix_ms: now,
313        default_ttl_ms: query.retention_ms,
314        vector_dimension: None,
315        vector_metric: None,
316        context_index_fields: Vec::new(),
317        declared_columns: Vec::new(),
318        table_def: None,
319        timestamps_enabled: false,
320        context_index_enabled: false,
321        // Time-series collections are append-only by nature — the
322        // storage model forbids in-place UPDATE already, so the flag
323        // makes the catalog honest rather than changing semantics.
324        append_only: true,
325        subscriptions: Vec::new(),
326    }
327}
328
329fn current_unix_ms() -> u128 {
330    std::time::SystemTime::now()
331        .duration_since(std::time::UNIX_EPOCH)
332        .unwrap_or_default()
333        .as_millis()
334}