1use std::collections::HashMap;
4use std::sync::Arc;
5
6use super::*;
7
8const TIMESERIES_META_COLLECTION: &str = "red_timeseries_meta";
9
10impl RedDBRuntime {
11 pub fn execute_create_timeseries(
12 &self,
13 raw_query: &str,
14 query: &CreateTimeSeriesQuery,
15 ) -> RedDBResult<RuntimeQueryResult> {
16 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
17 for spec in &query.downsample_policies {
18 crate::storage::timeseries::retention::DownsamplePolicy::parse(spec).ok_or_else(
19 || RedDBError::Query(format!("invalid downsample policy '{}'", spec)),
20 )?;
21 }
22
23 let store = self.inner.db.store();
24 let exists = store.get_collection(&query.name).is_some();
25 if exists {
26 if query.if_not_exists {
27 return Ok(RuntimeQueryResult::ok_message(
28 raw_query.to_string(),
29 &format!("timeseries '{}' already exists", query.name),
30 "create",
31 ));
32 }
33 return Err(RedDBError::Query(format!(
34 "timeseries '{}' already exists",
35 query.name
36 )));
37 }
38 store
39 .create_collection(&query.name)
40 .map_err(|e| RedDBError::Internal(e.to_string()))?;
41 if let Some(ttl_ms) = query.retention_ms {
42 self.inner
43 .db
44 .set_collection_default_ttl_ms(&query.name, ttl_ms);
45 }
46 let contract = if query.hypertable.is_some() {
52 hypertable_collection_contract(query)
53 } else {
54 timeseries_collection_contract(query)
55 };
56 self.inner
57 .db
58 .save_collection_contract(contract)
59 .map_err(|err| RedDBError::Internal(err.to_string()))?;
60 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
65 store.set_config_tree(
66 &format!("red.collection_tenants.{}", query.name),
67 &crate::serde_json::Value::String(tenant_id),
68 );
69 }
70 save_timeseries_metadata(store.as_ref(), query)?;
71
72 if let Some(ht) = &query.hypertable {
77 let mut spec = crate::storage::timeseries::HypertableSpec::new(
78 query.name.clone(),
79 ht.time_column.clone(),
80 ht.chunk_interval_ns,
81 );
82 if let Some(ttl) = ht.default_ttl_ns {
83 spec = spec.with_ttl_ns(ttl);
84 }
85 self.inner.db.hypertables().register(spec);
86 }
87
88 self.invalidate_result_cache();
89 self.inner
90 .db
91 .persist_metadata()
92 .map_err(|e| RedDBError::Internal(e.to_string()))?;
93 let columns: Vec<String> = query
97 .hypertable
98 .as_ref()
99 .map(|ht| vec![ht.time_column.clone()])
100 .unwrap_or_else(|| vec!["metric".to_string(), "value".to_string()]);
101 self.schema_vocabulary_apply(
102 crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
103 collection: query.name.clone(),
104 columns,
105 type_tags: Vec::new(),
106 description: None,
107 },
108 );
109
110 let noun = if query.hypertable.is_some() {
111 "hypertable"
112 } else {
113 "timeseries"
114 };
115 let mut msg = format!("{noun} '{}' created", query.name);
116 if let Some(ret) = query.retention_ms {
117 msg.push_str(&format!(" (retention={}ms)", ret));
118 }
119 if let Some(cs) = query.chunk_size {
120 msg.push_str(&format!(" (chunk_size={})", cs));
121 }
122 if !query.downsample_policies.is_empty() {
123 msg.push_str(&format!(
124 " (downsample_policies={})",
125 query.downsample_policies.len()
126 ));
127 }
128 Ok(RuntimeQueryResult::ok_message(
129 raw_query.to_string(),
130 &msg,
131 "create",
132 ))
133 }
134
135 pub fn execute_drop_timeseries(
136 &self,
137 raw_query: &str,
138 query: &DropTimeSeriesQuery,
139 ) -> RedDBResult<RuntimeQueryResult> {
140 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
141 let store = self.inner.db.store();
142 if super::impl_ddl::is_system_schema_name(&query.name) {
143 return Err(RedDBError::Query("system schema is read-only".to_string()));
144 }
145 if store.get_collection(&query.name).is_none() {
146 if query.if_exists {
147 return Ok(RuntimeQueryResult::ok_message(
148 raw_query.to_string(),
149 &format!("timeseries '{}' does not exist", query.name),
150 "drop",
151 ));
152 }
153 return Err(RedDBError::NotFound(format!(
154 "timeseries '{}' not found",
155 query.name
156 )));
157 }
158 let actual = crate::runtime::ddl::polymorphic_resolver::resolve(
159 &query.name,
160 &self.inner.db.catalog_model_snapshot(),
161 )?;
162 if actual != crate::catalog::CollectionModel::TimeSeries
163 && actual != crate::catalog::CollectionModel::Table
164 {
165 crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
166 crate::catalog::CollectionModel::TimeSeries,
167 actual,
168 )?;
169 }
170 let _ = self.inner.db.hypertables().unregister(&query.name);
174 store
175 .drop_collection(&query.name)
176 .map_err(|e| RedDBError::Internal(e.to_string()))?;
177 self.inner.db.clear_collection_default_ttl_ms(&query.name);
178 self.inner
179 .db
180 .remove_collection_contract(&query.name)
181 .map_err(|err| RedDBError::Internal(err.to_string()))?;
182 remove_timeseries_metadata(store.as_ref(), &query.name);
183 self.invalidate_result_cache();
184 self.inner
185 .db
186 .persist_metadata()
187 .map_err(|e| RedDBError::Internal(e.to_string()))?;
188 self.schema_vocabulary_apply(
191 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
192 collection: query.name.clone(),
193 },
194 );
195 Ok(RuntimeQueryResult::ok_message(
196 raw_query.to_string(),
197 &format!("timeseries '{}' dropped", query.name),
198 "drop",
199 ))
200 }
201}
202
203fn save_timeseries_metadata(
204 store: &crate::storage::unified::UnifiedStore,
205 query: &CreateTimeSeriesQuery,
206) -> RedDBResult<()> {
207 remove_timeseries_metadata(store, &query.name);
208 let _ = store.get_or_create_collection(TIMESERIES_META_COLLECTION);
209
210 let mut fields = HashMap::new();
211 fields.insert(
212 "kind".to_string(),
213 Value::text("timeseries_config".to_string()),
214 );
215 fields.insert("series".to_string(), Value::text(query.name.clone()));
216 fields.insert(
217 "retention_ms".to_string(),
218 query
219 .retention_ms
220 .map(Value::UnsignedInteger)
221 .unwrap_or(Value::Null),
222 );
223 fields.insert(
224 "chunk_size".to_string(),
225 query
226 .chunk_size
227 .map(|value| Value::UnsignedInteger(value as u64))
228 .unwrap_or(Value::Null),
229 );
230 fields.insert(
231 "downsample_policies".to_string(),
232 Value::Array(
233 query
234 .downsample_policies
235 .iter()
236 .cloned()
237 .map(Value::text)
238 .collect(),
239 ),
240 );
241
242 store
243 .insert_auto(
244 TIMESERIES_META_COLLECTION,
245 UnifiedEntity::new(
246 EntityId::new(0),
247 EntityKind::TableRow {
248 table: Arc::from(TIMESERIES_META_COLLECTION),
249 row_id: 0,
250 },
251 EntityData::Row(crate::storage::RowData {
252 columns: Vec::new(),
253 named: Some(fields),
254 schema: None,
255 }),
256 ),
257 )
258 .map_err(|err| RedDBError::Internal(err.to_string()))?;
259
260 Ok(())
261}
262
263fn remove_timeseries_metadata(store: &crate::storage::unified::UnifiedStore, series: &str) {
264 let Some(manager) = store.get_collection(TIMESERIES_META_COLLECTION) else {
265 return;
266 };
267 let rows = manager.query_all(|entity| {
268 entity.data.as_row().is_some_and(|row| {
269 row.get_field("series").is_some_and(
270 |value| matches!(value, Value::Text(candidate) if &**candidate == series),
271 )
272 })
273 });
274 for row in rows {
275 let _ = store.delete(TIMESERIES_META_COLLECTION, row.id);
276 }
277}
278
279fn hypertable_collection_contract(
280 query: &CreateTimeSeriesQuery,
281) -> crate::physical::CollectionContract {
282 let now = current_unix_ms();
283 crate::physical::CollectionContract {
284 name: query.name.clone(),
285 declared_model: crate::catalog::CollectionModel::Table,
290 schema_mode: crate::catalog::SchemaMode::SemiStructured,
291 origin: crate::physical::ContractOrigin::Explicit,
292 version: 1,
293 created_at_unix_ms: now,
294 updated_at_unix_ms: now,
295 default_ttl_ms: query.retention_ms,
296 vector_dimension: None,
297 vector_metric: None,
298 context_index_fields: Vec::new(),
299 declared_columns: Vec::new(),
300 table_def: None,
301 timestamps_enabled: false,
302 context_index_enabled: false,
303 metrics_raw_retention_ms: None,
304 metrics_rollup_policies: Vec::new(),
305 metrics_tenant_identity: None,
306 metrics_namespace: None,
307 append_only: true,
311 subscriptions: Vec::new(),
312 analytics_config: Vec::new(),
313 session_key: None,
314 session_gap_ms: None,
315 retention_duration_ms: None,
316 }
317}
318
319fn timeseries_collection_contract(
320 query: &CreateTimeSeriesQuery,
321) -> crate::physical::CollectionContract {
322 let now = current_unix_ms();
323 crate::physical::CollectionContract {
324 name: query.name.clone(),
325 declared_model: crate::catalog::CollectionModel::TimeSeries,
326 schema_mode: crate::catalog::SchemaMode::SemiStructured,
327 origin: crate::physical::ContractOrigin::Explicit,
328 version: 1,
329 created_at_unix_ms: now,
330 updated_at_unix_ms: now,
331 default_ttl_ms: query.retention_ms,
332 vector_dimension: None,
333 vector_metric: None,
334 context_index_fields: Vec::new(),
335 declared_columns: Vec::new(),
336 table_def: None,
337 timestamps_enabled: false,
338 context_index_enabled: false,
339 metrics_raw_retention_ms: None,
340 metrics_rollup_policies: Vec::new(),
341 metrics_tenant_identity: None,
342 metrics_namespace: None,
343 append_only: true,
347 subscriptions: Vec::new(),
348 analytics_config: Vec::new(),
349 session_key: query.session_key.clone(),
355 session_gap_ms: query.session_gap_ms,
356 retention_duration_ms: None,
357 }
358}
359
360fn current_unix_ms() -> u128 {
361 std::time::SystemTime::now()
362 .duration_since(std::time::UNIX_EPOCH)
363 .unwrap_or_default()
364 .as_millis()
365}