1use std::collections::HashMap;
4use std::sync::Arc;
5
6use super::*;
7
8const TIMESERIES_META_COLLECTION: &str = "red_timeseries_meta";
9
10impl RedDBRuntime {
11 pub fn execute_create_timeseries(
12 &self,
13 raw_query: &str,
14 query: &CreateTimeSeriesQuery,
15 ) -> RedDBResult<RuntimeQueryResult> {
16 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
17 for spec in &query.downsample_policies {
18 crate::storage::timeseries::retention::DownsamplePolicy::parse(spec).ok_or_else(
19 || RedDBError::Query(format!("invalid downsample policy '{}'", spec)),
20 )?;
21 }
22
23 let store = self.inner.db.store();
24 let exists = store.get_collection(&query.name).is_some();
25 if exists {
26 if query.if_not_exists {
27 return Ok(RuntimeQueryResult::ok_message(
28 raw_query.to_string(),
29 &format!("timeseries '{}' already exists", query.name),
30 "create",
31 ));
32 }
33 return Err(RedDBError::Query(format!(
34 "timeseries '{}' already exists",
35 query.name
36 )));
37 }
38 store
39 .create_collection(&query.name)
40 .map_err(|e| RedDBError::Internal(e.to_string()))?;
41 if let Some(ttl_ms) = query.retention_ms {
42 self.inner
43 .db
44 .set_collection_default_ttl_ms(&query.name, ttl_ms);
45 }
46 let contract = if query.hypertable.is_some() {
52 hypertable_collection_contract(query)
53 } else {
54 timeseries_collection_contract(query)
55 };
56 self.inner
57 .db
58 .save_collection_contract(contract)
59 .map_err(|err| RedDBError::Internal(err.to_string()))?;
60 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
65 store.set_config_tree(
66 &format!("red.collection_tenants.{}", query.name),
67 &crate::serde_json::Value::String(tenant_id),
68 );
69 }
70 save_timeseries_metadata(store.as_ref(), query)?;
71
72 if let Some(ht) = &query.hypertable {
77 let mut spec = crate::storage::timeseries::HypertableSpec::new(
78 query.name.clone(),
79 ht.time_column.clone(),
80 ht.chunk_interval_ns,
81 );
82 if let Some(ttl) = ht.default_ttl_ns {
83 spec = spec.with_ttl_ns(ttl);
84 }
85 self.inner.db.hypertables().register(spec);
86 }
87
88 self.invalidate_result_cache();
89 self.inner
90 .db
91 .persist_metadata()
92 .map_err(|e| RedDBError::Internal(e.to_string()))?;
93 let columns: Vec<String> = query
97 .hypertable
98 .as_ref()
99 .map(|ht| vec![ht.time_column.clone()])
100 .unwrap_or_else(|| vec!["metric".to_string(), "value".to_string()]);
101 self.schema_vocabulary_apply(
102 crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
103 collection: query.name.clone(),
104 columns,
105 type_tags: Vec::new(),
106 description: None,
107 },
108 );
109
110 let noun = if query.hypertable.is_some() {
111 "hypertable"
112 } else {
113 "timeseries"
114 };
115 let mut msg = format!("{noun} '{}' created", query.name);
116 if let Some(ret) = query.retention_ms {
117 msg.push_str(&format!(" (retention={}ms)", ret));
118 }
119 if let Some(cs) = query.chunk_size {
120 msg.push_str(&format!(" (chunk_size={})", cs));
121 }
122 if !query.downsample_policies.is_empty() {
123 msg.push_str(&format!(
124 " (downsample_policies={})",
125 query.downsample_policies.len()
126 ));
127 }
128 Ok(RuntimeQueryResult::ok_message(
129 raw_query.to_string(),
130 &msg,
131 "create",
132 ))
133 }
134
135 pub fn execute_drop_timeseries(
136 &self,
137 raw_query: &str,
138 query: &DropTimeSeriesQuery,
139 ) -> RedDBResult<RuntimeQueryResult> {
140 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
141 let store = self.inner.db.store();
142 if super::impl_ddl::is_system_schema_name(&query.name) {
143 return Err(RedDBError::Query("system schema is read-only".to_string()));
144 }
145 if store.get_collection(&query.name).is_none() {
146 if query.if_exists {
147 return Ok(RuntimeQueryResult::ok_message(
148 raw_query.to_string(),
149 &format!("timeseries '{}' does not exist", query.name),
150 "drop",
151 ));
152 }
153 return Err(RedDBError::NotFound(format!(
154 "timeseries '{}' not found",
155 query.name
156 )));
157 }
158 let actual = crate::runtime::ddl::polymorphic_resolver::resolve(
159 &query.name,
160 &self.inner.db.catalog_model_snapshot(),
161 )?;
162 if actual != crate::catalog::CollectionModel::TimeSeries
163 && actual != crate::catalog::CollectionModel::Table
164 {
165 crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
166 crate::catalog::CollectionModel::TimeSeries,
167 actual,
168 )?;
169 }
170 let _ = self.inner.db.hypertables().unregister(&query.name);
174 store
175 .drop_collection(&query.name)
176 .map_err(|e| RedDBError::Internal(e.to_string()))?;
177 self.inner.db.clear_collection_default_ttl_ms(&query.name);
178 self.inner
179 .db
180 .remove_collection_contract(&query.name)
181 .map_err(|err| RedDBError::Internal(err.to_string()))?;
182 remove_timeseries_metadata(store.as_ref(), &query.name);
183 self.invalidate_result_cache();
184 self.inner
185 .db
186 .persist_metadata()
187 .map_err(|e| RedDBError::Internal(e.to_string()))?;
188 self.schema_vocabulary_apply(
191 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
192 collection: query.name.clone(),
193 },
194 );
195 Ok(RuntimeQueryResult::ok_message(
196 raw_query.to_string(),
197 &format!("timeseries '{}' dropped", query.name),
198 "drop",
199 ))
200 }
201
202 pub fn seal_hypertable_chunks(&self, collection: &str) -> RedDBResult<usize> {
213 let analytical = self
214 .inner
215 .db
216 .collection_contract(collection)
217 .and_then(|c| c.analytical_storage.clone());
218 let registry = self.inner.db.hypertables();
219 let Some(spec) = registry.get(collection) else {
220 return Ok(0);
221 };
222 let time_col = spec.time_column.clone();
223 let store = self.inner.db.store();
224 let Some(manager) = store.get_collection(collection) else {
225 return Ok(0);
226 };
227
228 let mut sealed_columnar = 0usize;
229 for meta in registry.show_chunks(collection) {
230 if meta.sealed {
231 continue;
232 }
233 let start = meta.id.start_ns;
234 let end = meta.end_ns_exclusive;
235
236 let points = materialize_row_points(&manager, &time_col, start, end);
240
241 let mut chunk = crate::storage::timeseries::TimeSeriesChunk::with_max_points(
242 collection.to_string(),
243 HashMap::new(),
244 points.len().max(1),
245 );
246 for (ts, value) in &points {
247 chunk.append(*ts, *value);
248 }
249
250 let routed = crate::storage::timeseries::chunk::seal_chunk_with_config(
251 &mut chunk,
252 analytical.as_ref(),
253 start,
254 0,
255 )
256 .map_err(|err| RedDBError::Internal(format!("columnar seal failed: {err:?}")))?;
257
258 match routed {
259 crate::storage::timeseries::chunk::SealedChunkStorage::Columnar(bytes) => {
260 let page = crate::storage::engine::PageLocation::new(0, 0, bytes.len() as u32);
261 registry.seal_chunk_columnar(&meta.id, page, bytes);
262 sealed_columnar += 1;
263 }
264 crate::storage::timeseries::chunk::SealedChunkStorage::Row => {
265 registry.seal_chunk(&meta.id);
266 }
267 }
268 }
269 Ok(sealed_columnar)
270 }
271
272 pub fn columnar_chunk_count(&self, collection: &str) -> usize {
276 self.inner
277 .db
278 .hypertables()
279 .show_chunks(collection)
280 .iter()
281 .filter(|meta| meta.columnar_page.is_some())
282 .count()
283 }
284
285 pub fn columnar_chunk_points(
291 &self,
292 collection: &str,
293 chunk_start_ns: u64,
294 start_ns: u64,
295 end_ns: u64,
296 ) -> Option<Vec<(u64, f64)>> {
297 let id = crate::storage::timeseries::ChunkId {
298 hypertable: collection.to_string(),
299 start_ns: chunk_start_ns,
300 };
301 let bytes = self.inner.db.hypertables().columnar_block(&id)?;
302 let scan =
303 crate::storage::timeseries::chunk::query_column_block_range(&bytes, start_ns, end_ns)
304 .ok()?;
305 Some(
306 scan.points
307 .iter()
308 .map(|p| (p.timestamp_ns, p.value))
309 .collect(),
310 )
311 }
312
313 pub fn read_bridge_points(
333 &self,
334 collection: &str,
335 start_ns: u64,
336 end_ns: u64,
337 ) -> RedDBResult<Vec<(u64, f64)>> {
338 use crate::storage::timeseries::ChunkFormat;
339 use crate::storage::unified::column_block::{
340 peek_column_block_version, COLUMN_BLOCK_VERSION_V1,
341 };
342
343 let registry = self.inner.db.hypertables();
344 let Some(spec) = registry.get(collection) else {
345 return Ok(Vec::new());
346 };
347 let time_col = spec.time_column.clone();
348 let store = self.inner.db.store();
349
350 let mut out: Vec<(u64, f64)> = Vec::new();
351 for meta in registry.show_chunks(collection) {
352 if meta.max_ts_ns < start_ns || meta.min_ts_ns > end_ns {
355 continue;
356 }
357 match meta.format() {
358 ChunkFormat::ColumnarV1 => {
359 let Some(bytes) = registry.columnar_block(&meta.id) else {
362 continue;
363 };
364 match peek_column_block_version(&bytes) {
367 Some(COLUMN_BLOCK_VERSION_V1) => {}
368 Some(v) => {
369 return Err(RedDBError::Internal(format!(
370 "chunk {} @ {} carries unsupported columnar format version {v}",
371 meta.id.hypertable, meta.id.start_ns
372 )));
373 }
374 None => {
375 return Err(RedDBError::Internal(format!(
376 "chunk {} @ {} is flagged columnar but its block is not RDCC",
377 meta.id.hypertable, meta.id.start_ns
378 )));
379 }
380 }
381 let scan = crate::storage::timeseries::chunk::query_column_block_range(
382 &bytes, start_ns, end_ns,
383 )
384 .map_err(|err| {
385 RedDBError::Internal(format!("columnar read-bridge decode failed: {err:?}"))
386 })?;
387 out.extend(scan.points.iter().map(|p| (p.timestamp_ns, p.value)));
388 }
389 ChunkFormat::Row => {
390 let Some(manager) = store.get_collection(collection) else {
394 continue;
395 };
396 let chunk_start = meta.id.start_ns;
397 let chunk_end = meta.end_ns_exclusive;
398 out.extend(
399 materialize_row_points(&manager, &time_col, chunk_start, chunk_end)
400 .into_iter()
401 .filter(|(ts, _)| *ts >= start_ns && *ts <= end_ns),
402 );
403 }
404 }
405 }
406 out.sort_by_key(|(ts, _)| *ts);
407 Ok(out)
408 }
409}
410
411fn materialize_row_points(
417 manager: &crate::storage::unified::SegmentManager,
418 time_col: &str,
419 start: u64,
420 end: u64,
421) -> Vec<(u64, f64)> {
422 let mut points: Vec<(u64, f64)> = manager
423 .query_all(|entity| {
424 entity
425 .data
426 .as_row()
427 .and_then(|row| row.get_field(time_col))
428 .and_then(field_as_u64)
429 .is_some_and(|ts| ts >= start && ts < end)
430 })
431 .iter()
432 .filter_map(|entity| {
433 let row = entity.data.as_row()?;
434 let ts = row.get_field(time_col).and_then(field_as_u64)?;
435 let value = row.get_field("value").and_then(field_as_f64).unwrap_or(0.0);
436 Some((ts, value))
437 })
438 .collect();
439 points.sort_by_key(|(ts, _)| *ts);
440 points
441}
442
443fn field_as_u64(value: &Value) -> Option<u64> {
446 match value {
447 Value::Integer(n) | Value::BigInt(n) | Value::Timestamp(n) if *n >= 0 => Some(*n as u64),
448 Value::UnsignedInteger(n) => Some(*n),
449 _ => None,
450 }
451}
452
453fn field_as_f64(value: &Value) -> Option<f64> {
455 match value {
456 Value::Float(f) => Some(*f),
457 Value::Integer(n) | Value::BigInt(n) => Some(*n as f64),
458 Value::UnsignedInteger(n) => Some(*n as f64),
459 _ => None,
460 }
461}
462
463fn save_timeseries_metadata(
464 store: &crate::storage::unified::UnifiedStore,
465 query: &CreateTimeSeriesQuery,
466) -> RedDBResult<()> {
467 remove_timeseries_metadata(store, &query.name);
468 let _ = store.get_or_create_collection(TIMESERIES_META_COLLECTION);
469
470 let mut fields = HashMap::new();
471 fields.insert(
472 "kind".to_string(),
473 Value::text("timeseries_config".to_string()),
474 );
475 fields.insert("series".to_string(), Value::text(query.name.clone()));
476 fields.insert(
477 "retention_ms".to_string(),
478 query
479 .retention_ms
480 .map(Value::UnsignedInteger)
481 .unwrap_or(Value::Null),
482 );
483 fields.insert(
484 "chunk_size".to_string(),
485 query
486 .chunk_size
487 .map(|value| Value::UnsignedInteger(value as u64))
488 .unwrap_or(Value::Null),
489 );
490 fields.insert(
491 "downsample_policies".to_string(),
492 Value::Array(
493 query
494 .downsample_policies
495 .iter()
496 .cloned()
497 .map(Value::text)
498 .collect(),
499 ),
500 );
501
502 store
503 .insert_auto(
504 TIMESERIES_META_COLLECTION,
505 UnifiedEntity::new(
506 EntityId::new(0),
507 EntityKind::TableRow {
508 table: Arc::from(TIMESERIES_META_COLLECTION),
509 row_id: 0,
510 },
511 EntityData::Row(crate::storage::RowData {
512 columns: Vec::new(),
513 named: Some(fields),
514 schema: None,
515 }),
516 ),
517 )
518 .map_err(|err| RedDBError::Internal(err.to_string()))?;
519
520 Ok(())
521}
522
523fn remove_timeseries_metadata(store: &crate::storage::unified::UnifiedStore, series: &str) {
524 let Some(manager) = store.get_collection(TIMESERIES_META_COLLECTION) else {
525 return;
526 };
527 let rows = manager.query_all(|entity| {
528 entity.data.as_row().is_some_and(|row| {
529 row.get_field("series").is_some_and(
530 |value| matches!(value, Value::Text(candidate) if &**candidate == series),
531 )
532 })
533 });
534 for row in rows {
535 let _ = store.delete(TIMESERIES_META_COLLECTION, row.id);
536 }
537}
538
539fn analytical_storage_for(
545 columnar: bool,
546 time_key: &str,
547) -> Option<crate::catalog::AnalyticalStorageConfig> {
548 columnar.then(|| crate::catalog::AnalyticalStorageConfig {
549 columnar: true,
550 time_key: time_key.to_string(),
551 order_by_key: None,
552 })
553}
554
555fn hypertable_collection_contract(
556 query: &CreateTimeSeriesQuery,
557) -> crate::physical::CollectionContract {
558 let now = current_unix_ms();
559 let time_key = query
560 .hypertable
561 .as_ref()
562 .map(|ht| ht.time_column.as_str())
563 .unwrap_or("timestamp");
564 crate::physical::CollectionContract {
565 name: query.name.clone(),
566 declared_model: crate::catalog::CollectionModel::Table,
571 schema_mode: crate::catalog::SchemaMode::SemiStructured,
572 origin: crate::physical::ContractOrigin::Explicit,
573 version: 1,
574 created_at_unix_ms: now,
575 updated_at_unix_ms: now,
576 default_ttl_ms: query.retention_ms,
577 vector_dimension: None,
578 vector_metric: None,
579 context_index_fields: Vec::new(),
580 declared_columns: Vec::new(),
581 table_def: None,
582 timestamps_enabled: false,
583 context_index_enabled: false,
584 metrics_raw_retention_ms: None,
585 metrics_rollup_policies: Vec::new(),
586 metrics_tenant_identity: None,
587 metrics_namespace: None,
588 append_only: true,
592 subscriptions: Vec::new(),
593 analytics_config: Vec::new(),
594 session_key: None,
595 session_gap_ms: None,
596 retention_duration_ms: None,
597 analytical_storage: analytical_storage_for(query.columnar, time_key),
598 }
599}
600
601fn timeseries_collection_contract(
602 query: &CreateTimeSeriesQuery,
603) -> crate::physical::CollectionContract {
604 let now = current_unix_ms();
605 crate::physical::CollectionContract {
606 name: query.name.clone(),
607 declared_model: crate::catalog::CollectionModel::TimeSeries,
608 schema_mode: crate::catalog::SchemaMode::SemiStructured,
609 origin: crate::physical::ContractOrigin::Explicit,
610 version: 1,
611 created_at_unix_ms: now,
612 updated_at_unix_ms: now,
613 default_ttl_ms: query.retention_ms,
614 vector_dimension: None,
615 vector_metric: None,
616 context_index_fields: Vec::new(),
617 declared_columns: Vec::new(),
618 table_def: None,
619 timestamps_enabled: false,
620 context_index_enabled: false,
621 metrics_raw_retention_ms: None,
622 metrics_rollup_policies: Vec::new(),
623 metrics_tenant_identity: None,
624 metrics_namespace: None,
625 append_only: true,
629 subscriptions: Vec::new(),
630 analytics_config: Vec::new(),
631 session_key: query.session_key.clone(),
637 session_gap_ms: query.session_gap_ms,
638 retention_duration_ms: None,
639 analytical_storage: analytical_storage_for(query.columnar, "timestamp"),
642 }
643}
644
645fn current_unix_ms() -> u128 {
646 std::time::SystemTime::now()
647 .duration_since(std::time::UNIX_EPOCH)
648 .unwrap_or_default()
649 .as_millis()
650}