Skip to main content

reddb_server/runtime/
impl_timeseries.rs

1//! Time-series DDL execution
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use super::*;
7
8const TIMESERIES_META_COLLECTION: &str = "red_timeseries_meta";
9
10impl RedDBRuntime {
11    pub fn execute_create_timeseries(
12        &self,
13        raw_query: &str,
14        query: &CreateTimeSeriesQuery,
15    ) -> RedDBResult<RuntimeQueryResult> {
16        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
17        for spec in &query.downsample_policies {
18            crate::storage::timeseries::retention::DownsamplePolicy::parse(spec).ok_or_else(
19                || RedDBError::Query(format!("invalid downsample policy '{}'", spec)),
20            )?;
21        }
22
23        let store = self.inner.db.store();
24        let exists = store.get_collection(&query.name).is_some();
25        if exists {
26            if query.if_not_exists {
27                return Ok(RuntimeQueryResult::ok_message(
28                    raw_query.to_string(),
29                    &format!("timeseries '{}' already exists", query.name),
30                    "create",
31                ));
32            }
33            return Err(RedDBError::Query(format!(
34                "timeseries '{}' already exists",
35                query.name
36            )));
37        }
38        store
39            .create_collection(&query.name)
40            .map_err(|e| RedDBError::Internal(e.to_string()))?;
41        if let Some(ttl_ms) = query.retention_ms {
42            self.inner
43                .db
44                .set_collection_default_ttl_ms(&query.name, ttl_ms);
45        }
46        // CREATE HYPERTABLE declares the collection as a Table so
47        // INSERT goes through the row path (which now includes
48        // automatic chunk routing). Plain CREATE TIMESERIES keeps
49        // the native TimeSeries contract with its metric/value/tags
50        // column convention.
51        let contract = if query.hypertable.is_some() {
52            hypertable_collection_contract(query)
53        } else {
54            timeseries_collection_contract(query)
55        };
56        self.inner
57            .db
58            .save_collection_contract(contract)
59            .map_err(|err| RedDBError::Internal(err.to_string()))?;
60        // Issue #747 — record per-collection tenant ownership when an
61        // active tenant context exists, so typed surfaces like
62        // `red.timeseries` can scope rows to the creating tenant just
63        // like `red.tables` does for `CREATE TABLE`.
64        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
65            store.set_config_tree(
66                &format!("red.collection_tenants.{}", query.name),
67                &crate::serde_json::Value::String(tenant_id),
68            );
69        }
70        save_timeseries_metadata(store.as_ref(), query)?;
71
72        // `CREATE HYPERTABLE` additionally registers a HypertableSpec
73        // so chunk routing + retention sweeps can address this table.
74        // Plain `CREATE TIMESERIES` leaves `hypertable` = None and the
75        // runtime behaves as before.
76        if let Some(ht) = &query.hypertable {
77            let mut spec = crate::storage::timeseries::HypertableSpec::new(
78                query.name.clone(),
79                ht.time_column.clone(),
80                ht.chunk_interval_ns,
81            );
82            if let Some(ttl) = ht.default_ttl_ns {
83                spec = spec.with_ttl_ns(ttl);
84            }
85            self.inner.db.hypertables().register(spec);
86        }
87
88        self.invalidate_result_cache();
89        self.inner
90            .db
91            .persist_metadata()
92            .map_err(|e| RedDBError::Internal(e.to_string()))?;
93        // Issue #120 — surface timeseries / hypertable in the
94        // schema-vocabulary. The hypertable variant carries the
95        // declared time column.
96        let columns: Vec<String> = query
97            .hypertable
98            .as_ref()
99            .map(|ht| vec![ht.time_column.clone()])
100            .unwrap_or_else(|| vec!["metric".to_string(), "value".to_string()]);
101        self.schema_vocabulary_apply(
102            crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
103                collection: query.name.clone(),
104                columns,
105                type_tags: Vec::new(),
106                description: None,
107            },
108        );
109
110        let noun = if query.hypertable.is_some() {
111            "hypertable"
112        } else {
113            "timeseries"
114        };
115        let mut msg = format!("{noun} '{}' created", query.name);
116        if let Some(ret) = query.retention_ms {
117            msg.push_str(&format!(" (retention={}ms)", ret));
118        }
119        if let Some(cs) = query.chunk_size {
120            msg.push_str(&format!(" (chunk_size={})", cs));
121        }
122        if !query.downsample_policies.is_empty() {
123            msg.push_str(&format!(
124                " (downsample_policies={})",
125                query.downsample_policies.len()
126            ));
127        }
128        Ok(RuntimeQueryResult::ok_message(
129            raw_query.to_string(),
130            &msg,
131            "create",
132        ))
133    }
134
135    pub fn execute_drop_timeseries(
136        &self,
137        raw_query: &str,
138        query: &DropTimeSeriesQuery,
139    ) -> RedDBResult<RuntimeQueryResult> {
140        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
141        let store = self.inner.db.store();
142        if super::impl_ddl::is_system_schema_name(&query.name) {
143            return Err(RedDBError::Query("system schema is read-only".to_string()));
144        }
145        if store.get_collection(&query.name).is_none() {
146            if query.if_exists {
147                return Ok(RuntimeQueryResult::ok_message(
148                    raw_query.to_string(),
149                    &format!("timeseries '{}' does not exist", query.name),
150                    "drop",
151                ));
152            }
153            return Err(RedDBError::NotFound(format!(
154                "timeseries '{}' not found",
155                query.name
156            )));
157        }
158        let actual = crate::runtime::ddl::polymorphic_resolver::resolve(
159            &query.name,
160            &self.inner.db.catalog_model_snapshot(),
161        )?;
162        if actual != crate::catalog::CollectionModel::TimeSeries
163            && actual != crate::catalog::CollectionModel::Table
164        {
165            crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
166                crate::catalog::CollectionModel::TimeSeries,
167                actual,
168            )?;
169        }
170        // Remove from the hypertable registry before dropping the
171        // underlying collection — the registry lookup is cheap and
172        // staying consistent is the point of having a separate call.
173        let _ = self.inner.db.hypertables().unregister(&query.name);
174        store
175            .drop_collection(&query.name)
176            .map_err(|e| RedDBError::Internal(e.to_string()))?;
177        self.inner.db.clear_collection_default_ttl_ms(&query.name);
178        self.inner
179            .db
180            .remove_collection_contract(&query.name)
181            .map_err(|err| RedDBError::Internal(err.to_string()))?;
182        remove_timeseries_metadata(store.as_ref(), &query.name);
183        self.invalidate_result_cache();
184        self.inner
185            .db
186            .persist_metadata()
187            .map_err(|e| RedDBError::Internal(e.to_string()))?;
188        // Issue #120 — invalidate the schema-vocabulary entry for the
189        // dropped timeseries / hypertable.
190        self.schema_vocabulary_apply(
191            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
192                collection: query.name.clone(),
193            },
194        );
195        Ok(RuntimeQueryResult::ok_message(
196            raw_query.to_string(),
197            &format!("timeseries '{}' dropped", query.name),
198            "drop",
199        ))
200    }
201
202    /// Seal every still-open chunk of a hypertable, routing each seal
203    /// through [`seal_chunk_with_config`](crate::storage::timeseries::chunk::seal_chunk_with_config)
204    /// — the production caller PRD #850 lacked (#911). For a collection
205    /// whose contract carries `analytical_storage.columnar = true`, the
206    /// chunk's rows are materialised from the entity store into a
207    /// `TimeSeriesChunk`, sealed columnar, and the resulting RDCC
208    /// `ColumnBlock` recorded in `ChunkMeta.columnar_page` (bytes stashed
209    /// for read-back). Without the flag the chunk falls to the row seal
210    /// and `columnar_page` stays `None` — no behaviour change. Returns the
211    /// number of chunks sealed columnar.
212    pub fn seal_hypertable_chunks(&self, collection: &str) -> RedDBResult<usize> {
213        let analytical = self
214            .inner
215            .db
216            .collection_contract(collection)
217            .and_then(|c| c.analytical_storage.clone());
218        let registry = self.inner.db.hypertables();
219        let Some(spec) = registry.get(collection) else {
220            return Ok(0);
221        };
222        let time_col = spec.time_column.clone();
223        let store = self.inner.db.store();
224        let Some(manager) = store.get_collection(collection) else {
225            return Ok(0);
226        };
227
228        let mut sealed_columnar = 0usize;
229        for meta in registry.show_chunks(collection) {
230            if meta.sealed {
231                continue;
232            }
233            let start = meta.id.start_ns;
234            let end = meta.end_ns_exclusive;
235
236            // Materialise (ts, value) for rows whose time-column value
237            // lands in this chunk's `[start, end)` window — the same
238            // entity/row reader the read-bridge serves row chunks from.
239            let points = materialize_row_points(&manager, &time_col, start, end);
240
241            let mut chunk = crate::storage::timeseries::TimeSeriesChunk::with_max_points(
242                collection.to_string(),
243                HashMap::new(),
244                points.len().max(1),
245            );
246            for (ts, value) in &points {
247                chunk.append(*ts, *value);
248            }
249
250            let routed = crate::storage::timeseries::chunk::seal_chunk_with_config(
251                &mut chunk,
252                analytical.as_ref(),
253                start,
254                0,
255            )
256            .map_err(|err| RedDBError::Internal(format!("columnar seal failed: {err:?}")))?;
257
258            match routed {
259                crate::storage::timeseries::chunk::SealedChunkStorage::Columnar(bytes) => {
260                    let page = crate::storage::engine::PageLocation::new(0, 0, bytes.len() as u32);
261                    registry.seal_chunk_columnar(&meta.id, page, bytes);
262                    sealed_columnar += 1;
263                }
264                crate::storage::timeseries::chunk::SealedChunkStorage::Row => {
265                    registry.seal_chunk(&meta.id);
266                }
267            }
268        }
269        Ok(sealed_columnar)
270    }
271
272    /// Count this hypertable's chunks that were sealed columnar — i.e.
273    /// whose `ChunkMeta.columnar_page` is set (#911). Lets a caller assert
274    /// the columnar arm fired without exposing the registry's chunk type.
275    pub fn columnar_chunk_count(&self, collection: &str) -> usize {
276        self.inner
277            .db
278            .hypertables()
279            .show_chunks(collection)
280            .iter()
281            .filter(|meta| meta.columnar_page.is_some())
282            .count()
283    }
284
285    /// Read back a columnar-sealed chunk's points over `[start_ns, end_ns]`
286    /// (inclusive) via the #856 column-block range scan, decoding the RDCC
287    /// `ColumnBlock` recorded by [`seal_hypertable_chunks`](Self::seal_hypertable_chunks).
288    /// `None` when the chunk was not sealed columnar (or its bytes are not
289    /// RAM-resident). Points come back as `(timestamp_ns, value)`.
290    pub fn columnar_chunk_points(
291        &self,
292        collection: &str,
293        chunk_start_ns: u64,
294        start_ns: u64,
295        end_ns: u64,
296    ) -> Option<Vec<(u64, f64)>> {
297        let id = crate::storage::timeseries::ChunkId {
298            hypertable: collection.to_string(),
299            start_ns: chunk_start_ns,
300        };
301        let bytes = self.inner.db.hypertables().columnar_block(&id)?;
302        let scan =
303            crate::storage::timeseries::chunk::query_column_block_range(&bytes, start_ns, end_ns)
304                .ok()?;
305        Some(
306            scan.points
307                .iter()
308                .map(|p| (p.timestamp_ns, p.value))
309                .collect(),
310        )
311    }
312
313    /// Read-bridge (#861): read every point of `collection` in the
314    /// inclusive range `[start_ns, end_ns]`, dispatching **per chunk** on
315    /// its storage format so row-stored and columnar (`RDCC`) chunks
316    /// coexist after `COLUMNAR` is enabled — with no mass rewrite of the
317    /// pre-existing row data.
318    ///
319    /// Each chunk's [`ChunkMeta::format`](crate::storage::timeseries::ChunkMeta::format)
320    /// is the format-version gate:
321    /// - [`ChunkFormat::ColumnarV1`] → decode the chunk's RDCC `ColumnBlock`
322    ///   through the granule-pruned column-block range scan, after
323    ///   confirming the block's embedded `format_version` is one this build
324    ///   understands ([`peek_column_block_version`]).
325    /// - [`ChunkFormat::Row`] → materialise the chunk's rows from the
326    ///   entity/row store, the same reader the seal sources from.
327    ///
328    /// Points come back merged and timestamp-ordered, so a caller sees one
329    /// logical series regardless of how each chunk is physically stored.
330    /// Chunk windows are disjoint, so a columnar chunk is read only through
331    /// its RDCC block and never double-counted via the row path.
332    pub fn read_bridge_points(
333        &self,
334        collection: &str,
335        start_ns: u64,
336        end_ns: u64,
337    ) -> RedDBResult<Vec<(u64, f64)>> {
338        use crate::storage::timeseries::ChunkFormat;
339        use crate::storage::unified::column_block::{
340            peek_column_block_version, COLUMN_BLOCK_VERSION_V1,
341        };
342
343        let registry = self.inner.db.hypertables();
344        let Some(spec) = registry.get(collection) else {
345            return Ok(Vec::new());
346        };
347        let time_col = spec.time_column.clone();
348        let store = self.inner.db.store();
349
350        let mut out: Vec<(u64, f64)> = Vec::new();
351        for meta in registry.show_chunks(collection) {
352            // Skip chunks whose observed window cannot intersect the query.
353            // An empty chunk has min_ts_ns == u64::MAX, so it is skipped.
354            if meta.max_ts_ns < start_ns || meta.min_ts_ns > end_ns {
355                continue;
356            }
357            match meta.format() {
358                ChunkFormat::ColumnarV1 => {
359                    // RDCC reader. Bytes may be absent post-restart (pending
360                    // the durable page-write bridge); nothing to read then.
361                    let Some(bytes) = registry.columnar_block(&meta.id) else {
362                        continue;
363                    };
364                    // Format-version gate: reject a block this build cannot
365                    // read rather than mis-decode it.
366                    match peek_column_block_version(&bytes) {
367                        Some(COLUMN_BLOCK_VERSION_V1) => {}
368                        Some(v) => {
369                            return Err(RedDBError::Internal(format!(
370                                "chunk {} @ {} carries unsupported columnar format version {v}",
371                                meta.id.hypertable, meta.id.start_ns
372                            )));
373                        }
374                        None => {
375                            return Err(RedDBError::Internal(format!(
376                                "chunk {} @ {} is flagged columnar but its block is not RDCC",
377                                meta.id.hypertable, meta.id.start_ns
378                            )));
379                        }
380                    }
381                    let scan = crate::storage::timeseries::chunk::query_column_block_range(
382                        &bytes, start_ns, end_ns,
383                    )
384                    .map_err(|err| {
385                        RedDBError::Internal(format!("columnar read-bridge decode failed: {err:?}"))
386                    })?;
387                    out.extend(scan.points.iter().map(|p| (p.timestamp_ns, p.value)));
388                }
389                ChunkFormat::Row => {
390                    // Row reader: materialise the chunk window, then filter
391                    // to the inclusive query range (mirrors the columnar
392                    // scan's `[start_ns, end_ns]` contract).
393                    let Some(manager) = store.get_collection(collection) else {
394                        continue;
395                    };
396                    let chunk_start = meta.id.start_ns;
397                    let chunk_end = meta.end_ns_exclusive;
398                    out.extend(
399                        materialize_row_points(&manager, &time_col, chunk_start, chunk_end)
400                            .into_iter()
401                            .filter(|(ts, _)| *ts >= start_ns && *ts <= end_ns),
402                    );
403                }
404            }
405        }
406        out.sort_by_key(|(ts, _)| *ts);
407        Ok(out)
408    }
409}
410
411/// Materialise `(timestamp_ns, value)` rows from the entity/row store for
412/// the half-open chunk window `[start, end)`, timestamp-ordered. This is
413/// the shared row reader: the columnar seal sources its chunk from it, and
414/// the read-bridge serves row-stored chunks through it (#861). `time_col`
415/// names the time axis; the value column follows the `value` convention.
416fn materialize_row_points(
417    manager: &crate::storage::unified::SegmentManager,
418    time_col: &str,
419    start: u64,
420    end: u64,
421) -> Vec<(u64, f64)> {
422    let mut points: Vec<(u64, f64)> = manager
423        .query_all(|entity| {
424            entity
425                .data
426                .as_row()
427                .and_then(|row| row.get_field(time_col))
428                .and_then(field_as_u64)
429                .is_some_and(|ts| ts >= start && ts < end)
430        })
431        .iter()
432        .filter_map(|entity| {
433            let row = entity.data.as_row()?;
434            let ts = row.get_field(time_col).and_then(field_as_u64)?;
435            let value = row.get_field("value").and_then(field_as_f64).unwrap_or(0.0);
436            Some((ts, value))
437        })
438        .collect();
439    points.sort_by_key(|(ts, _)| *ts);
440    points
441}
442
443/// Read a row field as a non-negative `u64` timestamp, accepting the
444/// integer shapes the INSERT path stores for a time column (#911).
445fn field_as_u64(value: &Value) -> Option<u64> {
446    match value {
447        Value::Integer(n) | Value::BigInt(n) | Value::Timestamp(n) if *n >= 0 => Some(*n as u64),
448        Value::UnsignedInteger(n) => Some(*n),
449        _ => None,
450    }
451}
452
453/// Read a row field as `f64` for the columnar value column (#911).
454fn field_as_f64(value: &Value) -> Option<f64> {
455    match value {
456        Value::Float(f) => Some(*f),
457        Value::Integer(n) | Value::BigInt(n) => Some(*n as f64),
458        Value::UnsignedInteger(n) => Some(*n as f64),
459        _ => None,
460    }
461}
462
463fn save_timeseries_metadata(
464    store: &crate::storage::unified::UnifiedStore,
465    query: &CreateTimeSeriesQuery,
466) -> RedDBResult<()> {
467    remove_timeseries_metadata(store, &query.name);
468    let _ = store.get_or_create_collection(TIMESERIES_META_COLLECTION);
469
470    let mut fields = HashMap::new();
471    fields.insert(
472        "kind".to_string(),
473        Value::text("timeseries_config".to_string()),
474    );
475    fields.insert("series".to_string(), Value::text(query.name.clone()));
476    fields.insert(
477        "retention_ms".to_string(),
478        query
479            .retention_ms
480            .map(Value::UnsignedInteger)
481            .unwrap_or(Value::Null),
482    );
483    fields.insert(
484        "chunk_size".to_string(),
485        query
486            .chunk_size
487            .map(|value| Value::UnsignedInteger(value as u64))
488            .unwrap_or(Value::Null),
489    );
490    fields.insert(
491        "downsample_policies".to_string(),
492        Value::Array(
493            query
494                .downsample_policies
495                .iter()
496                .cloned()
497                .map(Value::text)
498                .collect(),
499        ),
500    );
501
502    store
503        .insert_auto(
504            TIMESERIES_META_COLLECTION,
505            UnifiedEntity::new(
506                EntityId::new(0),
507                EntityKind::TableRow {
508                    table: Arc::from(TIMESERIES_META_COLLECTION),
509                    row_id: 0,
510                },
511                EntityData::Row(crate::storage::RowData {
512                    columns: Vec::new(),
513                    named: Some(fields),
514                    schema: None,
515                }),
516            ),
517        )
518        .map_err(|err| RedDBError::Internal(err.to_string()))?;
519
520    Ok(())
521}
522
523fn remove_timeseries_metadata(store: &crate::storage::unified::UnifiedStore, series: &str) {
524    let Some(manager) = store.get_collection(TIMESERIES_META_COLLECTION) else {
525        return;
526    };
527    let rows = manager.query_all(|entity| {
528        entity.data.as_row().is_some_and(|row| {
529            row.get_field("series").is_some_and(
530                |value| matches!(value, Value::Text(candidate) if &**candidate == series),
531            )
532        })
533    });
534    for row in rows {
535        let _ = store.delete(TIMESERIES_META_COLLECTION, row.id);
536    }
537}
538
539/// Build the contract's [`AnalyticalStorageConfig`] when the DDL carried
540/// `COLUMNAR` (#911). `time_key` is the column carrying the time axis —
541/// the hypertable's declared time column, or the timeseries `timestamp`
542/// convention. `None` when columnar was not requested, so non-columnar
543/// collections keep the row engine default.
544fn analytical_storage_for(
545    columnar: bool,
546    time_key: &str,
547) -> Option<crate::catalog::AnalyticalStorageConfig> {
548    columnar.then(|| crate::catalog::AnalyticalStorageConfig {
549        columnar: true,
550        time_key: time_key.to_string(),
551        order_by_key: None,
552    })
553}
554
555fn hypertable_collection_contract(
556    query: &CreateTimeSeriesQuery,
557) -> crate::physical::CollectionContract {
558    let now = current_unix_ms();
559    let time_key = query
560        .hypertable
561        .as_ref()
562        .map(|ht| ht.time_column.as_str())
563        .unwrap_or("timestamp");
564    crate::physical::CollectionContract {
565        name: query.name.clone(),
566        // Table model — rows go through the normal INSERT path,
567        // which now calls HypertableRegistry::route after each row
568        // lands. Hypertable-specific behaviour (chunk bounds, TTL
569        // sweeps) lives on the registry, not the contract.
570        declared_model: crate::catalog::CollectionModel::Table,
571        schema_mode: crate::catalog::SchemaMode::SemiStructured,
572        origin: crate::physical::ContractOrigin::Explicit,
573        version: 1,
574        created_at_unix_ms: now,
575        updated_at_unix_ms: now,
576        default_ttl_ms: query.retention_ms,
577        vector_dimension: None,
578        vector_metric: None,
579        context_index_fields: Vec::new(),
580        declared_columns: Vec::new(),
581        table_def: None,
582        timestamps_enabled: false,
583        context_index_enabled: false,
584        metrics_raw_retention_ms: None,
585        metrics_rollup_policies: Vec::new(),
586        metrics_tenant_identity: None,
587        metrics_namespace: None,
588        // Hypertable data is conceptually immutable once the chunk
589        // seals. Reject UPDATE / DELETE at parse time and give the
590        // operator a clear message instead of silent coalescing.
591        append_only: true,
592        subscriptions: Vec::new(),
593        analytics_config: Vec::new(),
594        session_key: None,
595        session_gap_ms: None,
596        retention_duration_ms: None,
597        analytical_storage: analytical_storage_for(query.columnar, time_key),
598    }
599}
600
601fn timeseries_collection_contract(
602    query: &CreateTimeSeriesQuery,
603) -> crate::physical::CollectionContract {
604    let now = current_unix_ms();
605    crate::physical::CollectionContract {
606        name: query.name.clone(),
607        declared_model: crate::catalog::CollectionModel::TimeSeries,
608        schema_mode: crate::catalog::SchemaMode::SemiStructured,
609        origin: crate::physical::ContractOrigin::Explicit,
610        version: 1,
611        created_at_unix_ms: now,
612        updated_at_unix_ms: now,
613        default_ttl_ms: query.retention_ms,
614        vector_dimension: None,
615        vector_metric: None,
616        context_index_fields: Vec::new(),
617        declared_columns: Vec::new(),
618        table_def: None,
619        timestamps_enabled: false,
620        context_index_enabled: false,
621        metrics_raw_retention_ms: None,
622        metrics_rollup_policies: Vec::new(),
623        metrics_tenant_identity: None,
624        metrics_namespace: None,
625        // Time-series collections are append-only by nature — the
626        // storage model forbids in-place UPDATE already, so the flag
627        // makes the catalog honest rather than changing semantics.
628        append_only: true,
629        subscriptions: Vec::new(),
630        analytics_config: Vec::new(),
631        // `WITH SESSION_KEY <col> SESSION_GAP <duration>` from the
632        // CREATE TIMESERIES DDL becomes the default partition/gap
633        // pairing for the SESSIONIZE operator (slice 2+). Stored on
634        // the contract so a restart preserves the values without an
635        // extra metadata side-table.
636        session_key: query.session_key.clone(),
637        session_gap_ms: query.session_gap_ms,
638        retention_duration_ms: None,
639        // Plain timeseries store points under the `timestamp` axis
640        // convention (the `value` column carries the measurement).
641        analytical_storage: analytical_storage_for(query.columnar, "timestamp"),
642    }
643}
644
645fn current_unix_ms() -> u128 {
646    std::time::SystemTime::now()
647        .duration_since(std::time::UNIX_EPOCH)
648        .unwrap_or_default()
649        .as_millis()
650}