1use std::sync::Arc;
2
3use alopex_core::columnar::kvs_bridge::ColumnarKvsBridge;
4use alopex_core::columnar::segment_v2::SegmentMetaV2;
5use alopex_core::storage::format::bincode_config;
6use alopex_core::types::TxnMode;
7use alopex_core::{KVStore, KVTransaction};
8use alopex_sql::ast::ddl::VectorMetric;
9use alopex_sql::catalog::persistent::{PersistedTableMeta, TABLES_PREFIX};
10use alopex_sql::catalog::{
11 Catalog, CatalogError, CatalogOverlay, PersistentCatalog, TableMetadata,
12};
13use alopex_sql::planner::types::ResolvedType;
14use axum::extract::{Extension, Query};
15use axum::response::Response;
16use bincode::Options;
17use serde::{Deserialize, Serialize};
18
19use crate::error::{Result, ServerError};
20use crate::http::{error_response, json_response, RequestContext};
21use crate::server::ServerState;
22
23const DEFAULT_RESOURCE_LIMIT: usize = 50;
24const DEFAULT_COLUMNAR_COLUMN_LIMIT: usize = 20;
25const SYSTEM_PREFIXES: [&str; 6] = [
26 "__catalog__/",
27 "hnsw:",
28 "__alopex_",
29 "__alopex:",
30 "vector:",
31 "columnar:",
32];
33
34#[derive(Debug, Deserialize)]
35pub struct AdminResourcesQuery {
36 pub limit: Option<usize>,
37 pub include_columnar_columns: Option<bool>,
38 pub columnar_column_limit: Option<usize>,
39 pub kv_prefix: Option<String>,
40}
41
42#[derive(Debug, Serialize)]
43pub struct AdminResourcesResponse {
44 pub sql_tables: Vec<SqlTableResource>,
45 pub columnar_segments: Vec<ColumnarSegmentResource>,
46 pub kv_keys: Vec<String>,
47 pub truncated: TruncatedSections,
48}
49
50#[derive(Debug, Serialize)]
51pub struct TruncatedSections {
52 pub sql_tables: bool,
53 pub columnar_segments: bool,
54 pub kv_keys: bool,
55}
56
57#[derive(Debug, Serialize)]
58pub struct SqlTableResource {
59 pub name: String,
60 pub columns: Vec<SqlColumnResource>,
61}
62
63#[derive(Debug, Serialize)]
64pub struct SqlColumnResource {
65 pub name: String,
66 pub data_type: String,
67}
68
69#[derive(Debug, Serialize)]
70pub struct ColumnarSegmentResource {
71 pub id: String,
72 pub columns: Option<Vec<String>>,
73}
74
75pub async fn list(
76 Extension(state): Extension<Arc<ServerState>>,
77 Extension(ctx): Extension<RequestContext>,
78 Query(query): Query<AdminResourcesQuery>,
79) -> Response {
80 match list_impl(state.clone(), query) {
81 Ok(resp) => json_response(resp, state.config.max_response_size, &ctx),
82 Err(err) => error_response(err, &ctx),
83 }
84}
85
86fn list_impl(
87 state: Arc<ServerState>,
88 query: AdminResourcesQuery,
89) -> Result<AdminResourcesResponse> {
90 let limit = query.limit.unwrap_or(DEFAULT_RESOURCE_LIMIT);
91 let columnar_column_limit = query
92 .columnar_column_limit
93 .unwrap_or(DEFAULT_COLUMNAR_COLUMN_LIMIT);
94 let include_columnar_columns = query.include_columnar_columns.unwrap_or(false);
95
96 let (sql_tables, sql_truncated) =
97 list_sql_resources(state.store.clone(), state.catalog.clone(), limit)?;
98 let (columnar_segments, columnar_truncated) = list_columnar_resources(
99 state.store.clone(),
100 limit,
101 include_columnar_columns,
102 columnar_column_limit,
103 )?;
104 let (kv_keys, kv_truncated) = list_kv_resources(state.store.clone(), limit, query.kv_prefix)?;
105
106 Ok(AdminResourcesResponse {
107 sql_tables,
108 columnar_segments,
109 kv_keys,
110 truncated: TruncatedSections {
111 sql_tables: sql_truncated,
112 columnar_segments: columnar_truncated,
113 kv_keys: kv_truncated,
114 },
115 })
116}
117
118fn list_sql_resources(
119 store: Arc<alopex_core::kv::any::AnyKV>,
120 catalog: Arc<std::sync::RwLock<dyn Catalog + Send + Sync>>,
121 limit: usize,
122) -> Result<(Vec<SqlTableResource>, bool)> {
123 let mut tables = list_sql_resources_from_store(store.clone())?;
124 if tables.is_empty() {
125 tables = match PersistentCatalog::load(store.clone()) {
126 Ok(catalog) => {
127 let overlay = CatalogOverlay::new();
128 let mut tables = catalog.list_tables_in_txn("default", "default", &overlay);
129 if tables.is_empty() {
130 tables = catalog.list_tables_in_txn("main", "default", &overlay);
131 }
132 tables
133 }
134 Err(CatalogError::Kv(alopex_core::Error::NotFound)) => Vec::new(),
135 Err(err) => return Err(ServerError::Catalog(err)),
136 };
137 }
138 if tables.is_empty() {
139 let guard = catalog
140 .read()
141 .map_err(|_| ServerError::Internal("catalog lock poisoned".into()))?;
142 tables = guard.list_tables();
143 }
144
145 tables.sort_by(|a, b| a.name.cmp(&b.name));
146 let truncated = tables.len() > limit;
147 if tables.len() > limit {
148 tables.truncate(limit);
149 }
150
151 let resources = tables
152 .into_iter()
153 .map(|table| SqlTableResource {
154 name: table.name,
155 columns: table
156 .columns
157 .into_iter()
158 .map(|column| SqlColumnResource {
159 name: column.name,
160 data_type: resolved_type_to_string(&column.data_type),
161 })
162 .collect(),
163 })
164 .collect();
165
166 Ok((resources, truncated))
167}
168
169fn list_sql_resources_from_store(
170 store: Arc<alopex_core::kv::any::AnyKV>,
171) -> Result<Vec<TableMetadata>> {
172 let mut txn = store.begin(TxnMode::ReadOnly)?;
173 let mut tables = Vec::new();
174 for (_key, value) in txn.scan_prefix(TABLES_PREFIX)? {
175 let persisted: PersistedTableMeta = bincode_config()
176 .deserialize(&value)
177 .map_err(|err| ServerError::BadRequest(format!("catalog entry invalid: {err}")))?;
178 tables.push(TableMetadata::from(persisted));
179 }
180 txn.commit_self()?;
181 Ok(tables)
182}
183
184fn list_columnar_resources(
185 store: Arc<alopex_core::kv::any::AnyKV>,
186 limit: usize,
187 include_columns: bool,
188 columnar_column_limit: usize,
189) -> Result<(Vec<ColumnarSegmentResource>, bool)> {
190 let bridge = ColumnarKvsBridge::new(store);
191 let mut segments = bridge.list_segments().map_err(map_columnar_error)?;
192 segments.sort_by(|a, b| a.0.cmp(&b.0).then_with(|| a.1.cmp(&b.1)));
193 let truncated = segments.len() > limit;
194 if segments.len() > limit {
195 segments.truncate(limit);
196 }
197
198 let mut resources = Vec::with_capacity(segments.len());
199 for (table_id, segment_id) in segments {
200 let id = format!("{}:{}", table_id, segment_id);
201 let columns = if include_columns {
202 Some(list_columnar_columns(
203 &bridge,
204 table_id,
205 segment_id,
206 columnar_column_limit,
207 )?)
208 } else {
209 None
210 };
211 resources.push(ColumnarSegmentResource { id, columns });
212 }
213
214 Ok((resources, truncated))
215}
216
217fn list_columnar_columns(
218 bridge: &ColumnarKvsBridge,
219 table_id: u32,
220 segment_id: u64,
221 limit: usize,
222) -> Result<Vec<String>> {
223 let stats = bridge
224 .read_statistics(table_id, segment_id)
225 .map_err(map_columnar_error)?;
226 let meta: SegmentMetaV2 = bincode_config()
227 .deserialize(&stats)
228 .map_err(|err| ServerError::BadRequest(format!("columnar segment invalid: {err}")))?;
229 let mut names: Vec<String> = meta
230 .schema
231 .columns
232 .into_iter()
233 .map(|column| column.name)
234 .collect();
235 if names.len() > limit {
236 names.truncate(limit);
237 }
238 Ok(names)
239}
240
241fn list_kv_resources(
242 store: Arc<alopex_core::kv::any::AnyKV>,
243 limit: usize,
244 kv_prefix: Option<String>,
245) -> Result<(Vec<String>, bool)> {
246 let prefix = kv_prefix.unwrap_or_default();
247 let mut txn = store.begin(TxnMode::ReadOnly)?;
248 let mut keys = Vec::new();
249 let mut truncated = false;
250 for (key, _) in txn.scan_prefix(prefix.as_bytes())? {
251 if is_system_key(&key) {
252 continue;
253 }
254 let Ok(key_str) = std::str::from_utf8(&key) else {
255 continue;
256 };
257 if key_str.is_empty() || key_str.chars().any(|ch| ch.is_control()) {
258 continue;
259 }
260 if keys.len() < limit {
261 keys.push(key_str.to_string());
262 } else {
263 truncated = true;
264 break;
265 }
266 }
267 txn.commit_self()?;
268 Ok((keys, truncated))
269}
270
271fn is_system_key(key: &[u8]) -> bool {
272 SYSTEM_PREFIXES
273 .iter()
274 .any(|prefix| key.starts_with(prefix.as_bytes()))
275}
276
277fn resolved_type_to_string(resolved_type: &ResolvedType) -> String {
278 match resolved_type {
279 ResolvedType::Integer => "INTEGER".to_string(),
280 ResolvedType::BigInt => "BIGINT".to_string(),
281 ResolvedType::Float => "FLOAT".to_string(),
282 ResolvedType::Double => "DOUBLE".to_string(),
283 ResolvedType::Text => "TEXT".to_string(),
284 ResolvedType::Blob => "BLOB".to_string(),
285 ResolvedType::Boolean => "BOOLEAN".to_string(),
286 ResolvedType::Timestamp => "TIMESTAMP".to_string(),
287 ResolvedType::Vector { dimension, metric } => {
288 let metric = match metric {
289 VectorMetric::Cosine => "COSINE",
290 VectorMetric::L2 => "L2",
291 VectorMetric::Inner => "INNER",
292 };
293 format!("VECTOR({dimension}, {metric})")
294 }
295 ResolvedType::Null => "NULL".to_string(),
296 }
297}
298
299fn map_columnar_error(err: alopex_core::columnar::ColumnarError) -> ServerError {
300 match err {
301 alopex_core::columnar::ColumnarError::NotFound => {
302 ServerError::NotFound("columnar segment not found".into())
303 }
304 alopex_core::columnar::ColumnarError::InvalidFormat(message) => {
305 ServerError::BadRequest(format!("columnar segment invalid: {message}"))
306 }
307 alopex_core::columnar::ColumnarError::MemoryLimitExceeded { limit, requested } => {
308 ServerError::PayloadTooLarge(format!(
309 "memory limit {limit} exceeded by {requested} bytes"
310 ))
311 }
312 other => ServerError::Core(other.into()),
313 }
314}