reddb-io-server 1.1.0

RedDB server-side engine: storage, runtime, replication, MCP, AI, and the gRPC/HTTP/RedWire/PG-wire dispatchers. Re-exported by the umbrella `reddb` crate.
Documentation
//! Time-series DDL execution

use std::collections::HashMap;
use std::sync::Arc;

use super::*;

const TIMESERIES_META_COLLECTION: &str = "red_timeseries_meta";

impl RedDBRuntime {
    pub fn execute_create_timeseries(
        &self,
        raw_query: &str,
        query: &CreateTimeSeriesQuery,
    ) -> RedDBResult<RuntimeQueryResult> {
        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
        for spec in &query.downsample_policies {
            crate::storage::timeseries::retention::DownsamplePolicy::parse(spec).ok_or_else(
                || RedDBError::Query(format!("invalid downsample policy '{}'", spec)),
            )?;
        }

        let store = self.inner.db.store();
        let exists = store.get_collection(&query.name).is_some();
        if exists {
            if query.if_not_exists {
                return Ok(RuntimeQueryResult::ok_message(
                    raw_query.to_string(),
                    &format!("timeseries '{}' already exists", query.name),
                    "create",
                ));
            }
            return Err(RedDBError::Query(format!(
                "timeseries '{}' already exists",
                query.name
            )));
        }
        store
            .create_collection(&query.name)
            .map_err(|e| RedDBError::Internal(e.to_string()))?;
        if let Some(ttl_ms) = query.retention_ms {
            self.inner
                .db
                .set_collection_default_ttl_ms(&query.name, ttl_ms);
        }
        // CREATE HYPERTABLE declares the collection as a Table so
        // INSERT goes through the row path (which now includes
        // automatic chunk routing). Plain CREATE TIMESERIES keeps
        // the native TimeSeries contract with its metric/value/tags
        // column convention.
        let contract = if query.hypertable.is_some() {
            hypertable_collection_contract(query)
        } else {
            timeseries_collection_contract(query)
        };
        self.inner
            .db
            .save_collection_contract(contract)
            .map_err(|err| RedDBError::Internal(err.to_string()))?;
        save_timeseries_metadata(store.as_ref(), query)?;

        // `CREATE HYPERTABLE` additionally registers a HypertableSpec
        // so chunk routing + retention sweeps can address this table.
        // Plain `CREATE TIMESERIES` leaves `hypertable` = None and the
        // runtime behaves as before.
        if let Some(ht) = &query.hypertable {
            let mut spec = crate::storage::timeseries::HypertableSpec::new(
                query.name.clone(),
                ht.time_column.clone(),
                ht.chunk_interval_ns,
            );
            if let Some(ttl) = ht.default_ttl_ns {
                spec = spec.with_ttl_ns(ttl);
            }
            self.inner.db.hypertables().register(spec);
        }

        self.invalidate_result_cache();
        self.inner
            .db
            .persist_metadata()
            .map_err(|e| RedDBError::Internal(e.to_string()))?;
        // Issue #120 — surface timeseries / hypertable in the
        // schema-vocabulary. The hypertable variant carries the
        // declared time column.
        let columns: Vec<String> = query
            .hypertable
            .as_ref()
            .map(|ht| vec![ht.time_column.clone()])
            .unwrap_or_else(|| vec!["metric".to_string(), "value".to_string()]);
        self.schema_vocabulary_apply(
            crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
                collection: query.name.clone(),
                columns,
                type_tags: Vec::new(),
                description: None,
            },
        );

        let noun = if query.hypertable.is_some() {
            "hypertable"
        } else {
            "timeseries"
        };
        let mut msg = format!("{noun} '{}' created", query.name);
        if let Some(ret) = query.retention_ms {
            msg.push_str(&format!(" (retention={}ms)", ret));
        }
        if let Some(cs) = query.chunk_size {
            msg.push_str(&format!(" (chunk_size={})", cs));
        }
        if !query.downsample_policies.is_empty() {
            msg.push_str(&format!(
                " (downsample_policies={})",
                query.downsample_policies.len()
            ));
        }
        Ok(RuntimeQueryResult::ok_message(
            raw_query.to_string(),
            &msg,
            "create",
        ))
    }

    pub fn execute_drop_timeseries(
        &self,
        raw_query: &str,
        query: &DropTimeSeriesQuery,
    ) -> RedDBResult<RuntimeQueryResult> {
        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
        let store = self.inner.db.store();
        if super::impl_ddl::is_system_schema_name(&query.name) {
            return Err(RedDBError::Query("system schema is read-only".to_string()));
        }
        if store.get_collection(&query.name).is_none() {
            if query.if_exists {
                return Ok(RuntimeQueryResult::ok_message(
                    raw_query.to_string(),
                    &format!("timeseries '{}' does not exist", query.name),
                    "drop",
                ));
            }
            return Err(RedDBError::NotFound(format!(
                "timeseries '{}' not found",
                query.name
            )));
        }
        let actual = crate::runtime::ddl::polymorphic_resolver::resolve(
            &query.name,
            &self.inner.db.catalog_model_snapshot(),
        )?;
        if actual != crate::catalog::CollectionModel::TimeSeries
            && actual != crate::catalog::CollectionModel::Table
        {
            crate::runtime::ddl::polymorphic_resolver::ensure_model_match(
                crate::catalog::CollectionModel::TimeSeries,
                actual,
            )?;
        }
        // Remove from the hypertable registry before dropping the
        // underlying collection — the registry lookup is cheap and
        // staying consistent is the point of having a separate call.
        let _ = self.inner.db.hypertables().unregister(&query.name);
        store
            .drop_collection(&query.name)
            .map_err(|e| RedDBError::Internal(e.to_string()))?;
        self.inner.db.clear_collection_default_ttl_ms(&query.name);
        self.inner
            .db
            .remove_collection_contract(&query.name)
            .map_err(|err| RedDBError::Internal(err.to_string()))?;
        remove_timeseries_metadata(store.as_ref(), &query.name);
        self.invalidate_result_cache();
        self.inner
            .db
            .persist_metadata()
            .map_err(|e| RedDBError::Internal(e.to_string()))?;
        // Issue #120 — invalidate the schema-vocabulary entry for the
        // dropped timeseries / hypertable.
        self.schema_vocabulary_apply(
            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
                collection: query.name.clone(),
            },
        );
        Ok(RuntimeQueryResult::ok_message(
            raw_query.to_string(),
            &format!("timeseries '{}' dropped", query.name),
            "drop",
        ))
    }
}

fn save_timeseries_metadata(
    store: &crate::storage::unified::UnifiedStore,
    query: &CreateTimeSeriesQuery,
) -> RedDBResult<()> {
    remove_timeseries_metadata(store, &query.name);
    let _ = store.get_or_create_collection(TIMESERIES_META_COLLECTION);

    let mut fields = HashMap::new();
    fields.insert(
        "kind".to_string(),
        Value::text("timeseries_config".to_string()),
    );
    fields.insert("series".to_string(), Value::text(query.name.clone()));
    fields.insert(
        "retention_ms".to_string(),
        query
            .retention_ms
            .map(Value::UnsignedInteger)
            .unwrap_or(Value::Null),
    );
    fields.insert(
        "chunk_size".to_string(),
        query
            .chunk_size
            .map(|value| Value::UnsignedInteger(value as u64))
            .unwrap_or(Value::Null),
    );
    fields.insert(
        "downsample_policies".to_string(),
        Value::Array(
            query
                .downsample_policies
                .iter()
                .cloned()
                .map(Value::text)
                .collect(),
        ),
    );

    store
        .insert_auto(
            TIMESERIES_META_COLLECTION,
            UnifiedEntity::new(
                EntityId::new(0),
                EntityKind::TableRow {
                    table: Arc::from(TIMESERIES_META_COLLECTION),
                    row_id: 0,
                },
                EntityData::Row(crate::storage::RowData {
                    columns: Vec::new(),
                    named: Some(fields),
                    schema: None,
                }),
            ),
        )
        .map_err(|err| RedDBError::Internal(err.to_string()))?;

    Ok(())
}

fn remove_timeseries_metadata(store: &crate::storage::unified::UnifiedStore, series: &str) {
    let Some(manager) = store.get_collection(TIMESERIES_META_COLLECTION) else {
        return;
    };
    let rows = manager.query_all(|entity| {
        entity.data.as_row().is_some_and(|row| {
            row.get_field("series").is_some_and(
                |value| matches!(value, Value::Text(candidate) if &**candidate == series),
            )
        })
    });
    for row in rows {
        let _ = store.delete(TIMESERIES_META_COLLECTION, row.id);
    }
}

fn hypertable_collection_contract(
    query: &CreateTimeSeriesQuery,
) -> crate::physical::CollectionContract {
    let now = current_unix_ms();
    crate::physical::CollectionContract {
        name: query.name.clone(),
        // Table model — rows go through the normal INSERT path,
        // which now calls HypertableRegistry::route after each row
        // lands. Hypertable-specific behaviour (chunk bounds, TTL
        // sweeps) lives on the registry, not the contract.
        declared_model: crate::catalog::CollectionModel::Table,
        schema_mode: crate::catalog::SchemaMode::SemiStructured,
        origin: crate::physical::ContractOrigin::Explicit,
        version: 1,
        created_at_unix_ms: now,
        updated_at_unix_ms: now,
        default_ttl_ms: query.retention_ms,
        vector_dimension: None,
        vector_metric: None,
        context_index_fields: Vec::new(),
        declared_columns: Vec::new(),
        table_def: None,
        timestamps_enabled: false,
        context_index_enabled: false,
        // Hypertable data is conceptually immutable once the chunk
        // seals. Reject UPDATE / DELETE at parse time and give the
        // operator a clear message instead of silent coalescing.
        append_only: true,
        subscriptions: Vec::new(),
    }
}

fn timeseries_collection_contract(
    query: &CreateTimeSeriesQuery,
) -> crate::physical::CollectionContract {
    let now = current_unix_ms();
    crate::physical::CollectionContract {
        name: query.name.clone(),
        declared_model: crate::catalog::CollectionModel::TimeSeries,
        schema_mode: crate::catalog::SchemaMode::SemiStructured,
        origin: crate::physical::ContractOrigin::Explicit,
        version: 1,
        created_at_unix_ms: now,
        updated_at_unix_ms: now,
        default_ttl_ms: query.retention_ms,
        vector_dimension: None,
        vector_metric: None,
        context_index_fields: Vec::new(),
        declared_columns: Vec::new(),
        table_def: None,
        timestamps_enabled: false,
        context_index_enabled: false,
        // Time-series collections are append-only by nature — the
        // storage model forbids in-place UPDATE already, so the flag
        // makes the catalog honest rather than changing semantics.
        append_only: true,
        subscriptions: Vec::new(),
    }
}

fn current_unix_ms() -> u128 {
    std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .unwrap_or_default()
        .as_millis()
}