Skip to main content

alopex_server/http/
admin_resources.rs

1use std::sync::Arc;
2
3use alopex_core::columnar::kvs_bridge::ColumnarKvsBridge;
4use alopex_core::columnar::segment_v2::SegmentMetaV2;
5use alopex_core::storage::format::bincode_config;
6use alopex_core::types::TxnMode;
7use alopex_core::{KVStore, KVTransaction};
8use alopex_sql::ast::ddl::VectorMetric;
9use alopex_sql::catalog::persistent::{PersistedTableMeta, TABLES_PREFIX};
10use alopex_sql::catalog::{
11    Catalog, CatalogError, CatalogOverlay, PersistentCatalog, TableMetadata,
12};
13use alopex_sql::planner::types::ResolvedType;
14use axum::extract::{Extension, Query};
15use axum::response::Response;
16use bincode::Options;
17use serde::{Deserialize, Serialize};
18
19use crate::error::{Result, ServerError};
20use crate::http::{error_response, json_response, RequestContext};
21use crate::server::ServerState;
22
23const DEFAULT_RESOURCE_LIMIT: usize = 50;
24const DEFAULT_COLUMNAR_COLUMN_LIMIT: usize = 20;
25const SYSTEM_PREFIXES: [&str; 6] = [
26    "__catalog__/",
27    "hnsw:",
28    "__alopex_",
29    "__alopex:",
30    "vector:",
31    "columnar:",
32];
33
34#[derive(Debug, Deserialize)]
35pub struct AdminResourcesQuery {
36    pub limit: Option<usize>,
37    pub include_columnar_columns: Option<bool>,
38    pub columnar_column_limit: Option<usize>,
39    pub kv_prefix: Option<String>,
40}
41
42#[derive(Debug, Serialize)]
43pub struct AdminResourcesResponse {
44    pub sql_tables: Vec<SqlTableResource>,
45    pub columnar_segments: Vec<ColumnarSegmentResource>,
46    pub kv_keys: Vec<String>,
47    pub truncated: TruncatedSections,
48}
49
50#[derive(Debug, Serialize)]
51pub struct TruncatedSections {
52    pub sql_tables: bool,
53    pub columnar_segments: bool,
54    pub kv_keys: bool,
55}
56
57#[derive(Debug, Serialize)]
58pub struct SqlTableResource {
59    pub name: String,
60    pub columns: Vec<SqlColumnResource>,
61}
62
63#[derive(Debug, Serialize)]
64pub struct SqlColumnResource {
65    pub name: String,
66    pub data_type: String,
67}
68
69#[derive(Debug, Serialize)]
70pub struct ColumnarSegmentResource {
71    pub id: String,
72    pub columns: Option<Vec<String>>,
73}
74
75pub async fn list(
76    Extension(state): Extension<Arc<ServerState>>,
77    Extension(ctx): Extension<RequestContext>,
78    Query(query): Query<AdminResourcesQuery>,
79) -> Response {
80    match list_impl(state.clone(), query) {
81        Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
82        Err(err) => error_response(err, &ctx),
83    }
84}
85
86fn list_impl(
87    state: Arc<ServerState>,
88    query: AdminResourcesQuery,
89) -> Result<AdminResourcesResponse> {
90    let limit = query.limit.unwrap_or(DEFAULT_RESOURCE_LIMIT);
91    let columnar_column_limit = query
92        .columnar_column_limit
93        .unwrap_or(DEFAULT_COLUMNAR_COLUMN_LIMIT);
94    let include_columnar_columns = query.include_columnar_columns.unwrap_or(false);
95
96    let (sql_tables, sql_truncated) =
97        list_sql_resources(state.store.clone(), state.catalog.clone(), limit)?;
98    let (columnar_segments, columnar_truncated) = list_columnar_resources(
99        state.store.clone(),
100        limit,
101        include_columnar_columns,
102        columnar_column_limit,
103    )?;
104    let (kv_keys, kv_truncated) = list_kv_resources(state.store.clone(), limit, query.kv_prefix)?;
105
106    Ok(AdminResourcesResponse {
107        sql_tables,
108        columnar_segments,
109        kv_keys,
110        truncated: TruncatedSections {
111            sql_tables: sql_truncated,
112            columnar_segments: columnar_truncated,
113            kv_keys: kv_truncated,
114        },
115    })
116}
117
118fn list_sql_resources(
119    store: Arc<alopex_core::kv::any::AnyKV>,
120    catalog: Arc<std::sync::RwLock<dyn Catalog + Send + Sync>>,
121    limit: usize,
122) -> Result<(Vec<SqlTableResource>, bool)> {
123    let mut tables = list_sql_resources_from_store(store.clone())?;
124    if tables.is_empty() {
125        tables = match PersistentCatalog::load(store.clone()) {
126            Ok(catalog) => {
127                let overlay = CatalogOverlay::new();
128                let mut tables = catalog.list_tables_in_txn("default", "default", &overlay);
129                if tables.is_empty() {
130                    tables = catalog.list_tables_in_txn("main", "default", &overlay);
131                }
132                tables
133            }
134            Err(CatalogError::Kv(alopex_core::Error::NotFound)) => Vec::new(),
135            Err(err) => return Err(ServerError::Catalog(err)),
136        };
137    }
138    if tables.is_empty() {
139        let guard = catalog
140            .read()
141            .map_err(|_| ServerError::Internal("catalog lock poisoned".into()))?;
142        tables = guard.list_tables();
143    }
144
145    tables.sort_by(|a, b| a.name.cmp(&b.name));
146    let truncated = tables.len() > limit;
147    if tables.len() > limit {
148        tables.truncate(limit);
149    }
150
151    let resources = tables
152        .into_iter()
153        .map(|table| SqlTableResource {
154            name: table.name,
155            columns: table
156                .columns
157                .into_iter()
158                .map(|column| SqlColumnResource {
159                    name: column.name,
160                    data_type: resolved_type_to_string(&column.data_type),
161                })
162                .collect(),
163        })
164        .collect();
165
166    Ok((resources, truncated))
167}
168
169fn list_sql_resources_from_store(
170    store: Arc<alopex_core::kv::any::AnyKV>,
171) -> Result<Vec<TableMetadata>> {
172    let mut txn = store.begin(TxnMode::ReadOnly)?;
173    let mut tables = Vec::new();
174    for (_key, value) in txn.scan_prefix(TABLES_PREFIX)? {
175        let persisted: PersistedTableMeta = bincode_config()
176            .deserialize(&value)
177            .map_err(|err| ServerError::BadRequest(format!("catalog entry invalid: {err}")))?;
178        tables.push(TableMetadata::from(persisted));
179    }
180    txn.commit_self()?;
181    Ok(tables)
182}
183
184fn list_columnar_resources(
185    store: Arc<alopex_core::kv::any::AnyKV>,
186    limit: usize,
187    include_columns: bool,
188    columnar_column_limit: usize,
189) -> Result<(Vec<ColumnarSegmentResource>, bool)> {
190    let bridge = ColumnarKvsBridge::new(store);
191    let mut segments = bridge.list_segments().map_err(map_columnar_error)?;
192    segments.sort_by(|a, b| a.0.cmp(&b.0).then_with(|| a.1.cmp(&b.1)));
193    let truncated = segments.len() > limit;
194    if segments.len() > limit {
195        segments.truncate(limit);
196    }
197
198    let mut resources = Vec::with_capacity(segments.len());
199    for (table_id, segment_id) in segments {
200        let id = format!("{}:{}", table_id, segment_id);
201        let columns = if include_columns {
202            Some(list_columnar_columns(
203                &bridge,
204                table_id,
205                segment_id,
206                columnar_column_limit,
207            )?)
208        } else {
209            None
210        };
211        resources.push(ColumnarSegmentResource { id, columns });
212    }
213
214    Ok((resources, truncated))
215}
216
217fn list_columnar_columns(
218    bridge: &ColumnarKvsBridge,
219    table_id: u32,
220    segment_id: u64,
221    limit: usize,
222) -> Result<Vec<String>> {
223    let stats = bridge
224        .read_statistics(table_id, segment_id)
225        .map_err(map_columnar_error)?;
226    let meta: SegmentMetaV2 = bincode_config()
227        .deserialize(&stats)
228        .map_err(|err| ServerError::BadRequest(format!("columnar segment invalid: {err}")))?;
229    let mut names: Vec<String> = meta
230        .schema
231        .columns
232        .into_iter()
233        .map(|column| column.name)
234        .collect();
235    if names.len() > limit {
236        names.truncate(limit);
237    }
238    Ok(names)
239}
240
241fn list_kv_resources(
242    store: Arc<alopex_core::kv::any::AnyKV>,
243    limit: usize,
244    kv_prefix: Option<String>,
245) -> Result<(Vec<String>, bool)> {
246    let prefix = kv_prefix.unwrap_or_default();
247    let mut txn = store.begin(TxnMode::ReadOnly)?;
248    let mut keys = Vec::new();
249    let mut truncated = false;
250    for (key, _) in txn.scan_prefix(prefix.as_bytes())? {
251        if is_system_key(&key) {
252            continue;
253        }
254        let Ok(key_str) = std::str::from_utf8(&key) else {
255            continue;
256        };
257        if key_str.is_empty() || key_str.chars().any(|ch| ch.is_control()) {
258            continue;
259        }
260        if keys.len() < limit {
261            keys.push(key_str.to_string());
262        } else {
263            truncated = true;
264            break;
265        }
266    }
267    txn.commit_self()?;
268    Ok((keys, truncated))
269}
270
271fn is_system_key(key: &[u8]) -> bool {
272    SYSTEM_PREFIXES
273        .iter()
274        .any(|prefix| key.starts_with(prefix.as_bytes()))
275}
276
277fn resolved_type_to_string(resolved_type: &ResolvedType) -> String {
278    match resolved_type {
279        ResolvedType::Integer => "INTEGER".to_string(),
280        ResolvedType::BigInt => "BIGINT".to_string(),
281        ResolvedType::Float => "FLOAT".to_string(),
282        ResolvedType::Double => "DOUBLE".to_string(),
283        ResolvedType::Text => "TEXT".to_string(),
284        ResolvedType::Blob => "BLOB".to_string(),
285        ResolvedType::Boolean => "BOOLEAN".to_string(),
286        ResolvedType::Timestamp => "TIMESTAMP".to_string(),
287        ResolvedType::Vector { dimension, metric } => {
288            let metric = match metric {
289                VectorMetric::Cosine => "COSINE",
290                VectorMetric::L2 => "L2",
291                VectorMetric::Inner => "INNER",
292            };
293            format!("VECTOR({dimension}, {metric})")
294        }
295        ResolvedType::Null => "NULL".to_string(),
296    }
297}
298
299fn map_columnar_error(err: alopex_core::columnar::ColumnarError) -> ServerError {
300    match err {
301        alopex_core::columnar::ColumnarError::NotFound => {
302            ServerError::NotFound("columnar segment not found".into())
303        }
304        alopex_core::columnar::ColumnarError::InvalidFormat(message) => {
305            ServerError::BadRequest(format!("columnar segment invalid: {message}"))
306        }
307        alopex_core::columnar::ColumnarError::MemoryLimitExceeded { limit, requested } => {
308            ServerError::PayloadTooLarge(format!(
309                "memory limit {limit} exceeded by {requested} bytes"
310            ))
311        }
312        other => ServerError::Core(other.into()),
313    }
314}