#![allow(dead_code)]
use axum::{
routing::{get, patch, post},
Router,
};
use std::sync::Arc;
use tempfile::TempDir;
use velesdb_core::Database;
use velesdb_server::{
add_edge, add_edges_batch, aggregate,
auth::{auth_middleware, AuthState},
batch_search, bulk_delete_points, collection_diagnostics, collection_sanity,
compact_collection, create_collection, delete_collection, delete_point, enable_streaming,
explain, get_collection, get_collection_config, get_edges, get_node_degree, get_point,
health_check, hybrid_search, list_collections, multi_query_search, multi_query_search_ids,
query, readiness_check, rebuild_index, reorder_for_locality, scroll_points, search, search_ids,
set_point_ttl, stream_insert, stream_upsert_points, text_search, traverse_graph, upsert_points,
upsert_points_raw, vacuum_collection, AppState, OnboardingMetrics,
};
fn base_routes() -> Router<Arc<AppState>> {
Router::new()
.route("/health", get(health_check))
.route("/ready", get(readiness_check))
.route(
"/collections",
get(list_collections).post(create_collection),
)
.route(
"/collections/{name}",
get(get_collection).delete(delete_collection),
)
.route("/collections/{name}/config", get(get_collection_config))
.route(
"/collections/{name}/diagnostics",
get(collection_diagnostics),
)
.route("/collections/{name}/index/rebuild", post(rebuild_index))
.route("/collections/{name}/sanity", get(collection_sanity))
.route("/collections/{name}/points", post(upsert_points))
.route("/collections/{name}/points/raw", post(upsert_points_raw))
.route(
"/collections/{name}/points/stream",
post(stream_upsert_points),
)
.route("/collections/{name}/stream/enable", post(enable_streaming))
.route("/collections/{name}/stream/insert", post(stream_insert))
.route(
"/collections/{name}/points/{id}",
get(get_point).delete(delete_point),
)
.route("/collections/{name}/points/scroll", post(scroll_points))
.route("/collections/{name}/points/{id}/ttl", patch(set_point_ttl))
.route("/collections/{name}/search", post(search))
.route("/collections/{name}/search/batch", post(batch_search))
.route("/collections/{name}/search/multi", post(multi_query_search))
.route(
"/collections/{name}/search/multi/ids",
post(multi_query_search_ids),
)
.route("/collections/{name}/search/text", post(text_search))
.route("/collections/{name}/search/hybrid", post(hybrid_search))
.route("/collections/{name}/search/ids", post(search_ids))
.route("/query", post(query))
.route("/aggregate", post(aggregate))
.route("/query/explain", post(explain))
.merge(graph_and_maintenance_routes())
}
fn graph_and_maintenance_routes() -> Router<Arc<AppState>> {
Router::new()
.route(
"/collections/{name}/graph/edges",
get(get_edges).post(add_edge),
)
.route(
"/collections/{name}/graph/edges/batch",
post(add_edges_batch),
)
.route("/collections/{name}/graph/traverse", post(traverse_graph))
.route(
"/collections/{name}/graph/nodes/{node_id}/degree",
get(get_node_degree),
)
.route(
"/collections/{name}/points/delete",
post(bulk_delete_points),
)
.route("/collections/{name}/vacuum", post(vacuum_collection))
.route("/collections/{name}/compact", post(compact_collection))
.route(
"/collections/{name}/locality/reorder",
post(reorder_for_locality),
)
}
fn app_state_from_db(db: Database) -> Arc<AppState> {
Arc::new(AppState {
db,
onboarding_metrics: OnboardingMetrics::default(),
query_limits: parking_lot::RwLock::new(velesdb_core::guardrails::QueryLimits::default()),
ready: std::sync::atomic::AtomicBool::new(true),
operational_metrics: velesdb_core::metrics::OperationalMetrics::new_arc(),
traversal_metrics: std::sync::Arc::new(velesdb_core::metrics::TraversalMetrics::new()),
query_duration_histogram: std::sync::Arc::new(
velesdb_core::metrics::DurationHistogram::new(),
),
})
}
fn create_app_state(temp_dir: &TempDir) -> Arc<AppState> {
app_state_from_db(Database::open(temp_dir.path()).expect("Failed to open database"))
}
pub fn create_test_app(temp_dir: &TempDir) -> Router {
base_routes().with_state(create_app_state(temp_dir))
}
pub fn create_test_app_with_observer(
temp_dir: &TempDir,
observer: Arc<dyn velesdb_core::DatabaseObserver>,
) -> Router {
let db =
Database::open_with_observer(temp_dir.path(), observer).expect("Failed to open database");
base_routes().with_state(app_state_from_db(db))
}
pub fn create_test_app_with_state(temp_dir: &TempDir) -> (Router, Arc<AppState>) {
let state = create_app_state(temp_dir);
let router = base_routes().with_state(Arc::clone(&state));
(router, state)
}
pub fn create_test_app_with_auth(temp_dir: &TempDir, api_keys: Vec<String>) -> Router {
let state = create_app_state(temp_dir);
let auth_state = AuthState::new(api_keys);
base_routes()
.with_state(state)
.layer(axum::middleware::from_fn_with_state(
auth_state,
auth_middleware,
))
}
async fn deprecation_header(
request: axum::extract::Request,
next: axum::middleware::Next,
) -> axum::response::Response {
let mut response = next.run(request).await;
let headers = response.headers_mut();
headers.insert(
"deprecation",
"true".parse().expect("test: static header value"),
);
headers.insert(
"x-api-deprecated",
"Use /v1/ prefix"
.parse()
.expect("test: static header value"),
);
response
}
pub fn create_versioned_test_app(temp_dir: &TempDir) -> Router {
let state = create_app_state(temp_dir);
let routes = base_routes();
let versioned = Router::new().nest("/v1", routes.clone());
let legacy = routes.layer(axum::middleware::from_fn(deprecation_header));
versioned.merge(legacy).with_state(state)
}
pub async fn create_graph_collection(app: &Router, name: &str) {
use axum::body::Body;
use axum::http::Request;
use tower::ServiceExt;
let response = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri("/collections")
.header("Content-Type", "application/json")
.body(Body::from(
serde_json::json!({
"name": name,
"collection_type": "graph"
})
.to_string(),
))
.expect("test: build create graph collection request"),
)
.await
.expect("test: create graph collection request failed");
assert_eq!(
response.status(),
axum::http::StatusCode::CREATED,
"test: failed to create graph collection '{name}'"
);
}