1use std::collections::HashMap;
4use std::sync::Arc;
5
6use super::*;
7
8const TIMESERIES_META_COLLECTION: &str = "red_timeseries_meta";
9
10impl RedDBRuntime {
11 pub fn execute_create_timeseries(
12 &self,
13 raw_query: &str,
14 query: &CreateTimeSeriesQuery,
15 ) -> RedDBResult<RuntimeQueryResult> {
16 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
17 for spec in &query.downsample_policies {
18 crate::storage::timeseries::retention::DownsamplePolicy::parse(spec).ok_or_else(
19 || RedDBError::Query(format!("invalid downsample policy '{}'", spec)),
20 )?;
21 }
22
23 let store = self.inner.db.store();
24 let exists = store.get_collection(&query.name).is_some();
25 if exists {
26 if query.if_not_exists {
27 return Ok(RuntimeQueryResult::ok_message(
28 raw_query.to_string(),
29 &format!("timeseries '{}' already exists", query.name),
30 "create",
31 ));
32 }
33 return Err(RedDBError::Query(format!(
34 "timeseries '{}' already exists",
35 query.name
36 )));
37 }
38 store
39 .create_collection(&query.name)
40 .map_err(|e| RedDBError::Internal(e.to_string()))?;
41 if let Some(ttl_ms) = query.retention_ms {
42 self.inner
43 .db
44 .set_collection_default_ttl_ms(&query.name, ttl_ms);
45 }
46 let contract = if query.hypertable.is_some() {
52 hypertable_collection_contract(query)
53 } else {
54 timeseries_collection_contract(query)
55 };
56 self.inner
57 .db
58 .save_collection_contract(contract)
59 .map_err(|err| RedDBError::Internal(err.to_string()))?;
60 save_timeseries_metadata(store.as_ref(), query)?;
61
62 if let Some(ht) = &query.hypertable {
67 let mut spec = crate::storage::timeseries::HypertableSpec::new(
68 query.name.clone(),
69 ht.time_column.clone(),
70 ht.chunk_interval_ns,
71 );
72 if let Some(ttl) = ht.default_ttl_ns {
73 spec = spec.with_ttl_ns(ttl);
74 }
75 self.inner.db.hypertables().register(spec);
76 }
77
78 self.invalidate_result_cache();
79 self.inner
80 .db
81 .persist_metadata()
82 .map_err(|e| RedDBError::Internal(e.to_string()))?;
83 let columns: Vec<String> = query
87 .hypertable
88 .as_ref()
89 .map(|ht| vec![ht.time_column.clone()])
90 .unwrap_or_else(|| vec!["metric".to_string(), "value".to_string()]);
91 self.schema_vocabulary_apply(
92 crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
93 collection: query.name.clone(),
94 columns,
95 type_tags: Vec::new(),
96 description: None,
97 },
98 );
99
100 let noun = if query.hypertable.is_some() {
101 "hypertable"
102 } else {
103 "timeseries"
104 };
105 let mut msg = format!("{noun} '{}' created", query.name);
106 if let Some(ret) = query.retention_ms {
107 msg.push_str(&format!(" (retention={}ms)", ret));
108 }
109 if let Some(cs) = query.chunk_size {
110 msg.push_str(&format!(" (chunk_size={})", cs));
111 }
112 if !query.downsample_policies.is_empty() {
113 msg.push_str(&format!(
114 " (downsample_policies={})",
115 query.downsample_policies.len()
116 ));
117 }
118 Ok(RuntimeQueryResult::ok_message(
119 raw_query.to_string(),
120 &msg,
121 "create",
122 ))
123 }
124
125 pub fn execute_drop_timeseries(
126 &self,
127 raw_query: &str,
128 query: &DropTimeSeriesQuery,
129 ) -> RedDBResult<RuntimeQueryResult> {
130 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
131 let store = self.inner.db.store();
132 if super::impl_ddl::is_system_schema_name(&query.name) {
133 return Err(RedDBError::Query("system schema is read-only".to_string()));
134 }
135 if store.get_collection(&query.name).is_none() {
136 if query.if_exists {
137 return Ok(RuntimeQueryResult::ok_message(
138 raw_query.to_string(),
139 &format!("timeseries '{}' does not exist", query.name),
140 "drop",
141 ));
142 }
143 return Err(RedDBError::NotFound(format!(
144 "timeseries '{}' not found",
145 query.name
146 )));
147 }
148 let actual = crate::runtime::ddl::polymorphic_resolver::resolve(
149 &query.name,
150 &self.inner.db.catalog_model_snapshot(),
151 )?;
152 if actual != crate::catalog::CollectionModel::TimeSeries
153 && actual != crate::catalog::CollectionModel::Table
154 {
155 crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
156 crate::catalog::CollectionModel::TimeSeries,
157 actual,
158 )?;
159 }
160 let _ = self.inner.db.hypertables().unregister(&query.name);
164 store
165 .drop_collection(&query.name)
166 .map_err(|e| RedDBError::Internal(e.to_string()))?;
167 self.inner.db.clear_collection_default_ttl_ms(&query.name);
168 self.inner
169 .db
170 .remove_collection_contract(&query.name)
171 .map_err(|err| RedDBError::Internal(err.to_string()))?;
172 remove_timeseries_metadata(store.as_ref(), &query.name);
173 self.invalidate_result_cache();
174 self.inner
175 .db
176 .persist_metadata()
177 .map_err(|e| RedDBError::Internal(e.to_string()))?;
178 self.schema_vocabulary_apply(
181 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
182 collection: query.name.clone(),
183 },
184 );
185 Ok(RuntimeQueryResult::ok_message(
186 raw_query.to_string(),
187 &format!("timeseries '{}' dropped", query.name),
188 "drop",
189 ))
190 }
191}
192
193fn save_timeseries_metadata(
194 store: &crate::storage::unified::UnifiedStore,
195 query: &CreateTimeSeriesQuery,
196) -> RedDBResult<()> {
197 remove_timeseries_metadata(store, &query.name);
198 let _ = store.get_or_create_collection(TIMESERIES_META_COLLECTION);
199
200 let mut fields = HashMap::new();
201 fields.insert(
202 "kind".to_string(),
203 Value::text("timeseries_config".to_string()),
204 );
205 fields.insert("series".to_string(), Value::text(query.name.clone()));
206 fields.insert(
207 "retention_ms".to_string(),
208 query
209 .retention_ms
210 .map(Value::UnsignedInteger)
211 .unwrap_or(Value::Null),
212 );
213 fields.insert(
214 "chunk_size".to_string(),
215 query
216 .chunk_size
217 .map(|value| Value::UnsignedInteger(value as u64))
218 .unwrap_or(Value::Null),
219 );
220 fields.insert(
221 "downsample_policies".to_string(),
222 Value::Array(
223 query
224 .downsample_policies
225 .iter()
226 .cloned()
227 .map(Value::text)
228 .collect(),
229 ),
230 );
231
232 store
233 .insert_auto(
234 TIMESERIES_META_COLLECTION,
235 UnifiedEntity::new(
236 EntityId::new(0),
237 EntityKind::TableRow {
238 table: Arc::from(TIMESERIES_META_COLLECTION),
239 row_id: 0,
240 },
241 EntityData::Row(crate::storage::RowData {
242 columns: Vec::new(),
243 named: Some(fields),
244 schema: None,
245 }),
246 ),
247 )
248 .map_err(|err| RedDBError::Internal(err.to_string()))?;
249
250 Ok(())
251}
252
253fn remove_timeseries_metadata(store: &crate::storage::unified::UnifiedStore, series: &str) {
254 let Some(manager) = store.get_collection(TIMESERIES_META_COLLECTION) else {
255 return;
256 };
257 let rows = manager.query_all(|entity| {
258 entity.data.as_row().is_some_and(|row| {
259 row.get_field("series").is_some_and(
260 |value| matches!(value, Value::Text(candidate) if &**candidate == series),
261 )
262 })
263 });
264 for row in rows {
265 let _ = store.delete(TIMESERIES_META_COLLECTION, row.id);
266 }
267}
268
269fn hypertable_collection_contract(
270 query: &CreateTimeSeriesQuery,
271) -> crate::physical::CollectionContract {
272 let now = current_unix_ms();
273 crate::physical::CollectionContract {
274 name: query.name.clone(),
275 declared_model: crate::catalog::CollectionModel::Table,
280 schema_mode: crate::catalog::SchemaMode::SemiStructured,
281 origin: crate::physical::ContractOrigin::Explicit,
282 version: 1,
283 created_at_unix_ms: now,
284 updated_at_unix_ms: now,
285 default_ttl_ms: query.retention_ms,
286 vector_dimension: None,
287 vector_metric: None,
288 context_index_fields: Vec::new(),
289 declared_columns: Vec::new(),
290 table_def: None,
291 timestamps_enabled: false,
292 context_index_enabled: false,
293 append_only: true,
297 subscriptions: Vec::new(),
298 }
299}
300
301fn timeseries_collection_contract(
302 query: &CreateTimeSeriesQuery,
303) -> crate::physical::CollectionContract {
304 let now = current_unix_ms();
305 crate::physical::CollectionContract {
306 name: query.name.clone(),
307 declared_model: crate::catalog::CollectionModel::TimeSeries,
308 schema_mode: crate::catalog::SchemaMode::SemiStructured,
309 origin: crate::physical::ContractOrigin::Explicit,
310 version: 1,
311 created_at_unix_ms: now,
312 updated_at_unix_ms: now,
313 default_ttl_ms: query.retention_ms,
314 vector_dimension: None,
315 vector_metric: None,
316 context_index_fields: Vec::new(),
317 declared_columns: Vec::new(),
318 table_def: None,
319 timestamps_enabled: false,
320 context_index_enabled: false,
321 append_only: true,
325 subscriptions: Vec::new(),
326 }
327}
328
329fn current_unix_ms() -> u128 {
330 std::time::SystemTime::now()
331 .duration_since(std::time::UNIX_EPOCH)
332 .unwrap_or_default()
333 .as_millis()
334}