alopex-server 0.5.0

Server component for Alopex DB
Documentation
use std::sync::Arc;

use alopex_core::columnar::kvs_bridge::ColumnarKvsBridge;
use alopex_core::columnar::segment_v2::SegmentMetaV2;
use alopex_core::storage::format::bincode_config;
use alopex_core::types::TxnMode;
use alopex_core::{KVStore, KVTransaction};
use alopex_sql::ast::ddl::VectorMetric;
use alopex_sql::catalog::persistent::{PersistedTableMeta, TABLES_PREFIX};
use alopex_sql::catalog::{
    Catalog, CatalogError, CatalogOverlay, PersistentCatalog, TableMetadata,
};
use alopex_sql::planner::types::ResolvedType;
use axum::extract::{Extension, Query};
use axum::response::Response;
use bincode::Options;
use serde::{Deserialize, Serialize};

use crate::error::{Result, ServerError};
use crate::http::{error_response, json_response, RequestContext};
use crate::server::ServerState;

const DEFAULT_RESOURCE_LIMIT: usize = 50;
const DEFAULT_COLUMNAR_COLUMN_LIMIT: usize = 20;
const SYSTEM_PREFIXES: [&str; 6] = [
    "__catalog__/",
    "hnsw:",
    "__alopex_",
    "__alopex:",
    "vector:",
    "columnar:",
];

#[derive(Debug, Deserialize)]
pub struct AdminResourcesQuery {
    pub limit: Option<usize>,
    pub include_columnar_columns: Option<bool>,
    pub columnar_column_limit: Option<usize>,
    pub kv_prefix: Option<String>,
}

#[derive(Debug, Serialize)]
pub struct AdminResourcesResponse {
    pub sql_tables: Vec<SqlTableResource>,
    pub columnar_segments: Vec<ColumnarSegmentResource>,
    pub kv_keys: Vec<String>,
    pub truncated: TruncatedSections,
}

#[derive(Debug, Serialize)]
pub struct TruncatedSections {
    pub sql_tables: bool,
    pub columnar_segments: bool,
    pub kv_keys: bool,
}

#[derive(Debug, Serialize)]
pub struct SqlTableResource {
    pub name: String,
    pub columns: Vec<SqlColumnResource>,
}

#[derive(Debug, Serialize)]
pub struct SqlColumnResource {
    pub name: String,
    pub data_type: String,
}

#[derive(Debug, Serialize)]
pub struct ColumnarSegmentResource {
    pub id: String,
    pub columns: Option<Vec<String>>,
}

pub async fn list(
    Extension(state): Extension<Arc<ServerState>>,
    Extension(ctx): Extension<RequestContext>,
    Query(query): Query<AdminResourcesQuery>,
) -> Response {
    match list_impl(state.clone(), query) {
        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
        Err(err) => error_response(err, &ctx),
    }
}

fn list_impl(
    state: Arc<ServerState>,
    query: AdminResourcesQuery,
) -> Result<AdminResourcesResponse> {
    let limit = query.limit.unwrap_or(DEFAULT_RESOURCE_LIMIT);
    let columnar_column_limit = query
        .columnar_column_limit
        .unwrap_or(DEFAULT_COLUMNAR_COLUMN_LIMIT);
    let include_columnar_columns = query.include_columnar_columns.unwrap_or(false);

    let (sql_tables, sql_truncated) =
        list_sql_resources(state.store.clone(), state.catalog.clone(), limit)?;
    let (columnar_segments, columnar_truncated) = list_columnar_resources(
        state.store.clone(),
        limit,
        include_columnar_columns,
        columnar_column_limit,
    )?;
    let (kv_keys, kv_truncated) = list_kv_resources(state.store.clone(), limit, query.kv_prefix)?;

    Ok(AdminResourcesResponse {
        sql_tables,
        columnar_segments,
        kv_keys,
        truncated: TruncatedSections {
            sql_tables: sql_truncated,
            columnar_segments: columnar_truncated,
            kv_keys: kv_truncated,
        },
    })
}

fn list_sql_resources(
    store: Arc<alopex_core::kv::any::AnyKV>,
    catalog: Arc<std::sync::RwLock<dyn Catalog + Send + Sync>>,
    limit: usize,
) -> Result<(Vec<SqlTableResource>, bool)> {
    let mut tables = list_sql_resources_from_store(store.clone())?;
    if tables.is_empty() {
        tables = match PersistentCatalog::load(store.clone()) {
            Ok(catalog) => {
                let overlay = CatalogOverlay::new();
                let mut tables = catalog.list_tables_in_txn("default", "default", &overlay);
                if tables.is_empty() {
                    tables = catalog.list_tables_in_txn("main", "default", &overlay);
                }
                tables
            }
            Err(CatalogError::Kv(alopex_core::Error::NotFound)) => Vec::new(),
            Err(err) => return Err(ServerError::Catalog(err)),
        };
    }
    if tables.is_empty() {
        let guard = catalog
            .read()
            .map_err(|_| ServerError::Internal("catalog lock poisoned".into()))?;
        tables = guard.list_tables();
    }

    tables.sort_by(|a, b| a.name.cmp(&b.name));
    let truncated = tables.len() > limit;
    if tables.len() > limit {
        tables.truncate(limit);
    }

    let resources = tables
        .into_iter()
        .map(|table| SqlTableResource {
            name: table.name,
            columns: table
                .columns
                .into_iter()
                .map(|column| SqlColumnResource {
                    name: column.name,
                    data_type: resolved_type_to_string(&column.data_type),
                })
                .collect(),
        })
        .collect();

    Ok((resources, truncated))
}

fn list_sql_resources_from_store(
    store: Arc<alopex_core::kv::any::AnyKV>,
) -> Result<Vec<TableMetadata>> {
    let mut txn = store.begin(TxnMode::ReadOnly)?;
    let mut tables = Vec::new();
    for (_key, value) in txn.scan_prefix(TABLES_PREFIX)? {
        let persisted: PersistedTableMeta = bincode_config()
            .deserialize(&value)
            .map_err(|err| ServerError::BadRequest(format!("catalog entry invalid: {err}")))?;
        tables.push(TableMetadata::from(persisted));
    }
    txn.commit_self()?;
    Ok(tables)
}

fn list_columnar_resources(
    store: Arc<alopex_core::kv::any::AnyKV>,
    limit: usize,
    include_columns: bool,
    columnar_column_limit: usize,
) -> Result<(Vec<ColumnarSegmentResource>, bool)> {
    let bridge = ColumnarKvsBridge::new(store);
    let mut segments = bridge.list_segments().map_err(map_columnar_error)?;
    segments.sort_by(|a, b| a.0.cmp(&b.0).then_with(|| a.1.cmp(&b.1)));
    let truncated = segments.len() > limit;
    if segments.len() > limit {
        segments.truncate(limit);
    }

    let mut resources = Vec::with_capacity(segments.len());
    for (table_id, segment_id) in segments {
        let id = format!("{}:{}", table_id, segment_id);
        let columns = if include_columns {
            Some(list_columnar_columns(
                &bridge,
                table_id,
                segment_id,
                columnar_column_limit,
            )?)
        } else {
            None
        };
        resources.push(ColumnarSegmentResource { id, columns });
    }

    Ok((resources, truncated))
}

fn list_columnar_columns(
    bridge: &ColumnarKvsBridge,
    table_id: u32,
    segment_id: u64,
    limit: usize,
) -> Result<Vec<String>> {
    let stats = bridge
        .read_statistics(table_id, segment_id)
        .map_err(map_columnar_error)?;
    let meta: SegmentMetaV2 = bincode_config()
        .deserialize(&stats)
        .map_err(|err| ServerError::BadRequest(format!("columnar segment invalid: {err}")))?;
    let mut names: Vec<String> = meta
        .schema
        .columns
        .into_iter()
        .map(|column| column.name)
        .collect();
    if names.len() > limit {
        names.truncate(limit);
    }
    Ok(names)
}

fn list_kv_resources(
    store: Arc<alopex_core::kv::any::AnyKV>,
    limit: usize,
    kv_prefix: Option<String>,
) -> Result<(Vec<String>, bool)> {
    let prefix = kv_prefix.unwrap_or_default();
    let mut txn = store.begin(TxnMode::ReadOnly)?;
    let mut keys = Vec::new();
    let mut truncated = false;
    for (key, _) in txn.scan_prefix(prefix.as_bytes())? {
        if is_system_key(&key) {
            continue;
        }
        let Ok(key_str) = std::str::from_utf8(&key) else {
            continue;
        };
        if key_str.is_empty() || key_str.chars().any(|ch| ch.is_control()) {
            continue;
        }
        if keys.len() < limit {
            keys.push(key_str.to_string());
        } else {
            truncated = true;
            break;
        }
    }
    txn.commit_self()?;
    Ok((keys, truncated))
}

fn is_system_key(key: &[u8]) -> bool {
    SYSTEM_PREFIXES
        .iter()
        .any(|prefix| key.starts_with(prefix.as_bytes()))
}

fn resolved_type_to_string(resolved_type: &ResolvedType) -> String {
    match resolved_type {
        ResolvedType::Integer => "INTEGER".to_string(),
        ResolvedType::BigInt => "BIGINT".to_string(),
        ResolvedType::Float => "FLOAT".to_string(),
        ResolvedType::Double => "DOUBLE".to_string(),
        ResolvedType::Text => "TEXT".to_string(),
        ResolvedType::Blob => "BLOB".to_string(),
        ResolvedType::Boolean => "BOOLEAN".to_string(),
        ResolvedType::Timestamp => "TIMESTAMP".to_string(),
        ResolvedType::Vector { dimension, metric } => {
            let metric = match metric {
                VectorMetric::Cosine => "COSINE",
                VectorMetric::L2 => "L2",
                VectorMetric::Inner => "INNER",
            };
            format!("VECTOR({dimension}, {metric})")
        }
        ResolvedType::Null => "NULL".to_string(),
    }
}

fn map_columnar_error(err: alopex_core::columnar::ColumnarError) -> ServerError {
    match err {
        alopex_core::columnar::ColumnarError::NotFound => {
            ServerError::NotFound("columnar segment not found".into())
        }
        alopex_core::columnar::ColumnarError::InvalidFormat(message) => {
            ServerError::BadRequest(format!("columnar segment invalid: {message}"))
        }
        alopex_core::columnar::ColumnarError::MemoryLimitExceeded { limit, requested } => {
            ServerError::PayloadTooLarge(format!(
                "memory limit {limit} exceeded by {requested} bytes"
            ))
        }
        other => ServerError::Core(other.into()),
    }
}