use std::sync::Arc;
use futures::stream;
use pgwire::api::results::{DataRowEncoder, QueryResponse, Response};
use pgwire::error::PgWireResult;
use crate::control::security::catalog::StoredCollection;
use crate::control::security::identity::AuthenticatedIdentity;
use crate::control::state::SharedState;
use super::super::types::{int8_field, sqlstate_error, text_field};
use super::timeseries_helpers::{format_bytes, parse_with_clause};
pub fn create_timeseries(
state: &SharedState,
identity: &AuthenticatedIdentity,
parts: &[&str],
) -> PgWireResult<Vec<Response>> {
if parts.len() < 3 {
return Err(sqlstate_error(
"42601",
"syntax: CREATE TIMESERIES <name> [WITH (key = 'value', ...)]",
));
}
let name = parts[2].to_lowercase();
let tenant_id = identity.tenant_id;
if let Some(catalog) = state.credentials.catalog()
&& let Ok(Some(_)) = catalog.get_collection(tenant_id.as_u32(), &name)
{
return Err(sqlstate_error(
"42P07",
&format!("collection '{name}' already exists"),
));
}
let config_json = parse_with_clause(parts);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let fields = vec![
("timestamp".into(), "TIMESTAMP".into()),
("value".into(), "FLOAT".into()),
];
let coll = StoredCollection {
tenant_id: tenant_id.as_u32(),
name: name.clone(),
owner: identity.username.clone(),
created_at: now,
fields,
field_defs: Vec::new(),
event_defs: Vec::new(),
collection_type: nodedb_types::CollectionType::timeseries("timestamp", "1h"),
timeseries_config: config_json,
is_active: true,
};
if let Some(catalog) = state.credentials.catalog() {
catalog
.put_collection(&coll)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
}
if let Some(registries) = state.timeseries_registries() {
let config = nodedb_types::timeseries::TieredPartitionConfig::origin_defaults();
let registry =
crate::engine::timeseries::partition_registry::PartitionRegistry::new(config);
let key = format!("{}:{}", tenant_id.as_u32(), name);
let mut regs =
crate::control::lock_utils::lock_or_recover(registries.lock(), "ts_registries");
regs.insert(key, registry);
}
tracing::info!(
collection = name,
tenant = tenant_id.as_u32(),
"timeseries collection created"
);
Ok(vec![Response::Execution(pgwire::api::results::Tag::new(
"CREATE TIMESERIES",
))])
}
pub fn show_partitions(
state: &SharedState,
identity: &AuthenticatedIdentity,
parts: &[&str],
) -> PgWireResult<Vec<Response>> {
if parts.len() < 4 {
return Err(sqlstate_error(
"42601",
"syntax: SHOW PARTITIONS FOR <collection>",
));
}
let name = parts[3].to_lowercase();
let tenant_id = identity.tenant_id;
if let Some(catalog) = state.credentials.catalog() {
match catalog.get_collection(tenant_id.as_u32(), &name) {
Ok(Some(coll)) if coll.collection_type.is_timeseries() => {}
Ok(Some(_)) => {
return Err(sqlstate_error(
"42809",
&format!("'{name}' is not a timeseries collection"),
));
}
_ => {
return Err(sqlstate_error(
"42P01",
&format!("collection '{name}' does not exist"),
));
}
}
}
let schema = Arc::new(vec![
text_field("partition"),
int8_field("min_ts"),
int8_field("max_ts"),
int8_field("rows"),
text_field("size"),
text_field("state"),
]);
let mut rows = Vec::new();
let mut encoder = DataRowEncoder::new(schema.clone());
if let Some(registries) = state.timeseries_registries() {
let regs = crate::control::lock_utils::lock_or_recover(registries.lock(), "ts_registries");
let key = format!("{}:{}", tenant_id.as_u32(), name);
if let Some(registry) = regs.get(&key) {
for (_, entry) in registry.iter() {
if !entry.meta.is_queryable() {
continue;
}
encoder
.encode_field(&entry.dir_name)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
encoder
.encode_field(&entry.meta.min_ts)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
encoder
.encode_field(&entry.meta.max_ts)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
encoder
.encode_field(&(entry.meta.row_count as i64))
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
encoder
.encode_field(&format_bytes(entry.meta.size_bytes))
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
encoder
.encode_field(&format!("{:?}", entry.meta.state))
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
rows.push(Ok(encoder.take_row()));
}
}
}
Ok(vec![Response::Query(QueryResponse::new(
schema,
stream::iter(rows),
))])
}
pub fn alter_timeseries(
state: &SharedState,
identity: &AuthenticatedIdentity,
parts: &[&str],
) -> PgWireResult<Vec<Response>> {
if parts.len() < 5 || parts[3].to_uppercase() != "SET" {
return Err(sqlstate_error(
"42601",
"syntax: ALTER TIMESERIES <name> SET (key = 'value', ...)",
));
}
let name = parts[2].to_lowercase();
let tenant_id = identity.tenant_id;
if let Some(catalog) = state.credentials.catalog() {
let mut coll = catalog
.get_collection(tenant_id.as_u32(), &name)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?
.ok_or_else(|| {
sqlstate_error("42P01", &format!("collection '{name}' does not exist"))
})?;
if !coll.collection_type.is_timeseries() {
return Err(sqlstate_error(
"42809",
&format!("'{name}' is not a timeseries collection"),
));
}
let new_config = parse_with_clause(parts);
if let Some(cfg) = new_config {
coll.timeseries_config = Some(cfg);
}
if let Some(registries) = state.timeseries_registries() {
let key = format!("{}:{}", tenant_id.as_u32(), name);
let mut regs =
crate::control::lock_utils::lock_or_recover(registries.lock(), "ts_registries");
if let Some(registry) = regs.get_mut(&key)
&& let Some(config) = coll.get_timeseries_config()
&& let Some(partition_by) = config.get("partition_by").and_then(|v| v.as_str())
&& let Ok(interval) =
nodedb_types::timeseries::PartitionInterval::parse(partition_by)
{
registry.set_partition_interval(interval);
}
}
catalog
.put_collection(&coll)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
} else {
return Err(sqlstate_error("XX000", "catalog unavailable"));
}
tracing::info!(collection = name, "timeseries config updated");
Ok(vec![Response::Execution(pgwire::api::results::Tag::new(
"ALTER TIMESERIES",
))])
}
pub fn rewrite_partitions(
state: &SharedState,
identity: &AuthenticatedIdentity,
parts: &[&str],
) -> PgWireResult<Vec<Response>> {
if parts.len() < 4 {
return Err(sqlstate_error(
"42601",
"syntax: REWRITE PARTITIONS FOR <collection>",
));
}
let name = parts[3].to_lowercase();
let tenant_id = identity.tenant_id;
if let Some(catalog) = state.credentials.catalog() {
match catalog.get_collection(tenant_id.as_u32(), &name) {
Ok(Some(coll)) if coll.collection_type.is_timeseries() => {}
Ok(Some(_)) => {
return Err(sqlstate_error(
"42809",
&format!("'{name}' is not a timeseries collection"),
));
}
_ => {
return Err(sqlstate_error(
"42P01",
&format!("collection '{name}' does not exist"),
));
}
}
}
let partitions_to_rewrite: Vec<String> = if let Some(registries) = state.timeseries_registries()
{
let key = format!("{}:{}", tenant_id.as_u32(), name);
let regs = crate::control::lock_utils::lock_or_recover(registries.lock(), "ts_registries");
if let Some(registry) = regs.get(&key) {
registry
.iter()
.filter(|(_, e)| {
e.meta.state == nodedb_types::timeseries::PartitionState::Sealed
|| e.meta.state == nodedb_types::timeseries::PartitionState::Merged
})
.map(|(_, e)| e.dir_name.clone())
.collect()
} else {
Vec::new()
}
} else {
Vec::new()
};
let sealed_count = partitions_to_rewrite.len();
tracing::info!(
collection = name,
sealed_partitions = sealed_count,
"REWRITE PARTITIONS scheduled (async, non-blocking)"
);
if sealed_count > 0 {
let wal_dir = state.wal.wal_dir();
let ts_base = wal_dir
.parent()
.unwrap_or(wal_dir)
.join("timeseries")
.to_path_buf();
let collection_name = name.clone();
tokio::task::spawn_blocking(move || {
let mut rewritten = 0usize;
for dir_name in &partitions_to_rewrite {
let partition_dir = ts_base.join(dir_name);
if !partition_dir.exists() {
continue;
}
match crate::engine::timeseries::merge::merge_partitions(
&ts_base,
std::slice::from_ref(&partition_dir),
&format!("{dir_name}.rewrite"),
) {
Ok(result) => {
let rewrite_dir = ts_base.join(format!("{dir_name}.rewrite"));
let backup_dir = ts_base.join(format!("{dir_name}.old"));
if std::fs::rename(&partition_dir, &backup_dir).is_ok()
&& std::fs::rename(&rewrite_dir, &partition_dir).is_ok()
{
let _ = std::fs::remove_dir_all(&backup_dir);
let meta_path = partition_dir.join("partition.meta");
let _ = std::fs::write(
&meta_path,
serde_json::to_vec_pretty(&result.meta).unwrap_or_default(),
);
rewritten += 1;
}
}
Err(e) => {
tracing::warn!(
partition = dir_name,
error = %e,
"rewrite failed for partition"
);
}
}
}
tracing::info!(
collection = collection_name,
rewritten,
total = sealed_count,
"REWRITE PARTITIONS completed"
);
});
}
Ok(vec![Response::Execution(pgwire::api::results::Tag::new(
"REWRITE PARTITIONS",
))])
}