Skip to main content

reddb_server/runtime/
impl_timeseries.rs

1//! Time-series DDL execution
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use super::*;
7
8const TIMESERIES_META_COLLECTION: &str = "red_timeseries_meta";
9
10impl RedDBRuntime {
11    pub fn execute_create_timeseries(
12        &self,
13        raw_query: &str,
14        query: &CreateTimeSeriesQuery,
15    ) -> RedDBResult<RuntimeQueryResult> {
16        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
17        for spec in &query.downsample_policies {
18            crate::storage::timeseries::retention::DownsamplePolicy::parse(spec).ok_or_else(
19                || RedDBError::Query(format!("invalid downsample policy '{}'", spec)),
20            )?;
21        }
22
23        let store = self.inner.db.store();
24        let exists = store.get_collection(&query.name).is_some();
25        if exists {
26            if query.if_not_exists {
27                return Ok(RuntimeQueryResult::ok_message(
28                    raw_query.to_string(),
29                    &format!("timeseries '{}' already exists", query.name),
30                    "create",
31                ));
32            }
33            return Err(RedDBError::Query(format!(
34                "timeseries '{}' already exists",
35                query.name
36            )));
37        }
38        store
39            .create_collection(&query.name)
40            .map_err(|e| RedDBError::Internal(e.to_string()))?;
41        if let Some(ttl_ms) = query.retention_ms {
42            self.inner
43                .db
44                .set_collection_default_ttl_ms(&query.name, ttl_ms);
45        }
46        // CREATE HYPERTABLE declares the collection as a Table so
47        // INSERT goes through the row path (which now includes
48        // automatic chunk routing). Plain CREATE TIMESERIES keeps
49        // the native TimeSeries contract with its metric/value/tags
50        // column convention.
51        let contract = if query.hypertable.is_some() {
52            hypertable_collection_contract(query)
53        } else {
54            timeseries_collection_contract(query)
55        };
56        self.inner
57            .db
58            .save_collection_contract(contract)
59            .map_err(|err| RedDBError::Internal(err.to_string()))?;
60        // Issue #747 — record per-collection tenant ownership when an
61        // active tenant context exists, so typed surfaces like
62        // `red.timeseries` can scope rows to the creating tenant just
63        // like `red.tables` does for `CREATE TABLE`.
64        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
65            store.set_config_tree(
66                &format!("red.collection_tenants.{}", query.name),
67                &crate::serde_json::Value::String(tenant_id),
68            );
69        }
70        save_timeseries_metadata(store.as_ref(), query)?;
71
72        // `CREATE HYPERTABLE` additionally registers a HypertableSpec
73        // so chunk routing + retention sweeps can address this table.
74        // Plain `CREATE TIMESERIES` leaves `hypertable` = None and the
75        // runtime behaves as before.
76        if let Some(ht) = &query.hypertable {
77            let mut spec = crate::storage::timeseries::HypertableSpec::new(
78                query.name.clone(),
79                ht.time_column.clone(),
80                ht.chunk_interval_ns,
81            );
82            if let Some(ttl) = ht.default_ttl_ns {
83                spec = spec.with_ttl_ns(ttl);
84            }
85            self.inner.db.hypertables().register(spec);
86        }
87
88        self.invalidate_result_cache();
89        self.inner
90            .db
91            .persist_metadata()
92            .map_err(|e| RedDBError::Internal(e.to_string()))?;
93        // Issue #120 — surface timeseries / hypertable in the
94        // schema-vocabulary. The hypertable variant carries the
95        // declared time column.
96        let columns: Vec<String> = query
97            .hypertable
98            .as_ref()
99            .map(|ht| vec![ht.time_column.clone()])
100            .unwrap_or_else(|| vec!["metric".to_string(), "value".to_string()]);
101        self.schema_vocabulary_apply(
102            crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
103                collection: query.name.clone(),
104                columns,
105                type_tags: Vec::new(),
106                description: None,
107            },
108        );
109
110        let noun = if query.hypertable.is_some() {
111            "hypertable"
112        } else {
113            "timeseries"
114        };
115        let mut msg = format!("{noun} '{}' created", query.name);
116        if let Some(ret) = query.retention_ms {
117            msg.push_str(&format!(" (retention={}ms)", ret));
118        }
119        if let Some(cs) = query.chunk_size {
120            msg.push_str(&format!(" (chunk_size={})", cs));
121        }
122        if !query.downsample_policies.is_empty() {
123            msg.push_str(&format!(
124                " (downsample_policies={})",
125                query.downsample_policies.len()
126            ));
127        }
128        Ok(RuntimeQueryResult::ok_message(
129            raw_query.to_string(),
130            &msg,
131            "create",
132        ))
133    }
134
135    pub fn execute_drop_timeseries(
136        &self,
137        raw_query: &str,
138        query: &DropTimeSeriesQuery,
139    ) -> RedDBResult<RuntimeQueryResult> {
140        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
141        let store = self.inner.db.store();
142        if super::impl_ddl::is_system_schema_name(&query.name) {
143            return Err(RedDBError::Query("system schema is read-only".to_string()));
144        }
145        if store.get_collection(&query.name).is_none() {
146            if query.if_exists {
147                return Ok(RuntimeQueryResult::ok_message(
148                    raw_query.to_string(),
149                    &format!("timeseries '{}' does not exist", query.name),
150                    "drop",
151                ));
152            }
153            return Err(RedDBError::NotFound(format!(
154                "timeseries '{}' not found",
155                query.name
156            )));
157        }
158        let actual = crate::runtime::ddl::polymorphic_resolver::resolve(
159            &query.name,
160            &self.inner.db.catalog_model_snapshot(),
161        )?;
162        if actual != crate::catalog::CollectionModel::TimeSeries
163            && actual != crate::catalog::CollectionModel::Table
164        {
165            crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
166                crate::catalog::CollectionModel::TimeSeries,
167                actual,
168            )?;
169        }
170        // Remove from the hypertable registry before dropping the
171        // underlying collection — the registry lookup is cheap and
172        // staying consistent is the point of having a separate call.
173        let _ = self.inner.db.hypertables().unregister(&query.name);
174        store
175            .drop_collection(&query.name)
176            .map_err(|e| RedDBError::Internal(e.to_string()))?;
177        self.inner.db.clear_collection_default_ttl_ms(&query.name);
178        self.inner
179            .db
180            .remove_collection_contract(&query.name)
181            .map_err(|err| RedDBError::Internal(err.to_string()))?;
182        remove_timeseries_metadata(store.as_ref(), &query.name);
183        self.invalidate_result_cache();
184        self.inner
185            .db
186            .persist_metadata()
187            .map_err(|e| RedDBError::Internal(e.to_string()))?;
188        // Issue #120 — invalidate the schema-vocabulary entry for the
189        // dropped timeseries / hypertable.
190        self.schema_vocabulary_apply(
191            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
192                collection: query.name.clone(),
193            },
194        );
195        Ok(RuntimeQueryResult::ok_message(
196            raw_query.to_string(),
197            &format!("timeseries '{}' dropped", query.name),
198            "drop",
199        ))
200    }
201}
202
203fn save_timeseries_metadata(
204    store: &crate::storage::unified::UnifiedStore,
205    query: &CreateTimeSeriesQuery,
206) -> RedDBResult<()> {
207    remove_timeseries_metadata(store, &query.name);
208    let _ = store.get_or_create_collection(TIMESERIES_META_COLLECTION);
209
210    let mut fields = HashMap::new();
211    fields.insert(
212        "kind".to_string(),
213        Value::text("timeseries_config".to_string()),
214    );
215    fields.insert("series".to_string(), Value::text(query.name.clone()));
216    fields.insert(
217        "retention_ms".to_string(),
218        query
219            .retention_ms
220            .map(Value::UnsignedInteger)
221            .unwrap_or(Value::Null),
222    );
223    fields.insert(
224        "chunk_size".to_string(),
225        query
226            .chunk_size
227            .map(|value| Value::UnsignedInteger(value as u64))
228            .unwrap_or(Value::Null),
229    );
230    fields.insert(
231        "downsample_policies".to_string(),
232        Value::Array(
233            query
234                .downsample_policies
235                .iter()
236                .cloned()
237                .map(Value::text)
238                .collect(),
239        ),
240    );
241
242    store
243        .insert_auto(
244            TIMESERIES_META_COLLECTION,
245            UnifiedEntity::new(
246                EntityId::new(0),
247                EntityKind::TableRow {
248                    table: Arc::from(TIMESERIES_META_COLLECTION),
249                    row_id: 0,
250                },
251                EntityData::Row(crate::storage::RowData {
252                    columns: Vec::new(),
253                    named: Some(fields),
254                    schema: None,
255                }),
256            ),
257        )
258        .map_err(|err| RedDBError::Internal(err.to_string()))?;
259
260    Ok(())
261}
262
263fn remove_timeseries_metadata(store: &crate::storage::unified::UnifiedStore, series: &str) {
264    let Some(manager) = store.get_collection(TIMESERIES_META_COLLECTION) else {
265        return;
266    };
267    let rows = manager.query_all(|entity| {
268        entity.data.as_row().is_some_and(|row| {
269            row.get_field("series").is_some_and(
270                |value| matches!(value, Value::Text(candidate) if &**candidate == series),
271            )
272        })
273    });
274    for row in rows {
275        let _ = store.delete(TIMESERIES_META_COLLECTION, row.id);
276    }
277}
278
279fn hypertable_collection_contract(
280    query: &CreateTimeSeriesQuery,
281) -> crate::physical::CollectionContract {
282    let now = current_unix_ms();
283    crate::physical::CollectionContract {
284        name: query.name.clone(),
285        // Table model — rows go through the normal INSERT path,
286        // which now calls HypertableRegistry::route after each row
287        // lands. Hypertable-specific behaviour (chunk bounds, TTL
288        // sweeps) lives on the registry, not the contract.
289        declared_model: crate::catalog::CollectionModel::Table,
290        schema_mode: crate::catalog::SchemaMode::SemiStructured,
291        origin: crate::physical::ContractOrigin::Explicit,
292        version: 1,
293        created_at_unix_ms: now,
294        updated_at_unix_ms: now,
295        default_ttl_ms: query.retention_ms,
296        vector_dimension: None,
297        vector_metric: None,
298        context_index_fields: Vec::new(),
299        declared_columns: Vec::new(),
300        table_def: None,
301        timestamps_enabled: false,
302        context_index_enabled: false,
303        metrics_raw_retention_ms: None,
304        metrics_rollup_policies: Vec::new(),
305        metrics_tenant_identity: None,
306        metrics_namespace: None,
307        // Hypertable data is conceptually immutable once the chunk
308        // seals. Reject UPDATE / DELETE at parse time and give the
309        // operator a clear message instead of silent coalescing.
310        append_only: true,
311        subscriptions: Vec::new(),
312        analytics_config: Vec::new(),
313        session_key: None,
314        session_gap_ms: None,
315        retention_duration_ms: None,
316    }
317}
318
319fn timeseries_collection_contract(
320    query: &CreateTimeSeriesQuery,
321) -> crate::physical::CollectionContract {
322    let now = current_unix_ms();
323    crate::physical::CollectionContract {
324        name: query.name.clone(),
325        declared_model: crate::catalog::CollectionModel::TimeSeries,
326        schema_mode: crate::catalog::SchemaMode::SemiStructured,
327        origin: crate::physical::ContractOrigin::Explicit,
328        version: 1,
329        created_at_unix_ms: now,
330        updated_at_unix_ms: now,
331        default_ttl_ms: query.retention_ms,
332        vector_dimension: None,
333        vector_metric: None,
334        context_index_fields: Vec::new(),
335        declared_columns: Vec::new(),
336        table_def: None,
337        timestamps_enabled: false,
338        context_index_enabled: false,
339        metrics_raw_retention_ms: None,
340        metrics_rollup_policies: Vec::new(),
341        metrics_tenant_identity: None,
342        metrics_namespace: None,
343        // Time-series collections are append-only by nature — the
344        // storage model forbids in-place UPDATE already, so the flag
345        // makes the catalog honest rather than changing semantics.
346        append_only: true,
347        subscriptions: Vec::new(),
348        analytics_config: Vec::new(),
349        // `WITH SESSION_KEY <col> SESSION_GAP <duration>` from the
350        // CREATE TIMESERIES DDL becomes the default partition/gap
351        // pairing for the SESSIONIZE operator (slice 2+). Stored on
352        // the contract so a restart preserves the values without an
353        // extra metadata side-table.
354        session_key: query.session_key.clone(),
355        session_gap_ms: query.session_gap_ms,
356        retention_duration_ms: None,
357    }
358}
359
360fn current_unix_ms() -> u128 {
361    std::time::SystemTime::now()
362        .duration_since(std::time::UNIX_EPOCH)
363        .unwrap_or_default()
364        .as_millis()
365}