reflex_server/gateway/
mod.rs

1//! HTTP gateway layer.
2
3//! HTTP gateway (Axum) for cache lookup and storage.
4//!
5//! This module is primarily used by the `reflex` server binary.
6
7#![allow(missing_docs)]
8
9pub mod adapter;
10pub mod error;
11pub mod handler;
12pub mod payload;
13pub mod state;
14pub mod streaming;
15
16#[cfg(test)]
17mod handler_tests;
18
19use axum::{
20    Json, Router,
21    http::{HeaderMap, StatusCode, header::HeaderValue},
22    response::{IntoResponse, Response},
23    routing::{get, post},
24};
25use tower_http::trace::TraceLayer;
26
27pub use handler::chat_completions_handler;
28pub use state::HandlerState;
29
30use reflex::cache::{
31    BqSearchBackend, REFLEX_STATUS_ERROR, REFLEX_STATUS_HEADER, REFLEX_STATUS_HEALTHY,
32    REFLEX_STATUS_READY, StorageLoader,
33};
34use reflex::storage::StorageWriter;
35
36pub fn create_router_with_state<B, S>(state: HandlerState<B, S>) -> Router
37where
38    B: BqSearchBackend + Clone + Send + Sync + 'static,
39    S: StorageLoader + StorageWriter + Clone + Send + Sync + 'static,
40{
41    Router::new()
42        .route("/healthz", get(health_handler))
43        .route("/ready", get(ready_handler))
44        .route("/v1/chat/completions", post(chat_completions_handler))
45        .layer(TraceLayer::new_for_http())
46        .with_state(state)
47}
48
49#[derive(serde::Serialize)]
50pub struct HealthResponse {
51    pub status: &'static str,
52}
53
54#[derive(serde::Serialize)]
55pub struct ReadyResponse {
56    pub status: &'static str,
57    pub components: ComponentStatus,
58}
59
60#[derive(serde::Serialize)]
61pub struct ComponentStatus {
62    pub http: &'static str,
63    pub storage: &'static str,
64    pub vectordb: &'static str,
65    pub embedding: &'static str,
66    pub embedder_mode: &'static str,
67}
68
69#[tracing::instrument]
70pub async fn health_handler() -> Response {
71    let mut headers = HeaderMap::new();
72    headers.insert(
73        REFLEX_STATUS_HEADER,
74        HeaderValue::from_static(REFLEX_STATUS_HEALTHY),
75    );
76
77    (
78        StatusCode::OK,
79        headers,
80        Json(HealthResponse { status: "ok" }),
81    )
82        .into_response()
83}
84
85use axum::extract::State;
86
87#[tracing::instrument(skip(state))]
88pub async fn ready_handler<B, S>(State(state): State<HandlerState<B, S>>) -> Response
89where
90    B: BqSearchBackend + Clone + Send + Sync + 'static,
91    S: StorageLoader + Clone + Send + Sync + 'static,
92{
93    let storage_status = if state.storage_path.exists() && state.storage_path.is_dir() {
94        REFLEX_STATUS_READY
95    } else {
96        REFLEX_STATUS_ERROR
97    };
98
99    let vectordb_status = if state.tiered_cache.is_ready().await {
100        REFLEX_STATUS_READY
101    } else {
102        "pending"
103    };
104
105    let embedding_status = REFLEX_STATUS_READY;
106
107    let is_stub = state.tiered_cache.l2().is_embedder_stub();
108    let embedder_mode = if is_stub { "stub" } else { "real" };
109
110    let components = ComponentStatus {
111        http: REFLEX_STATUS_READY,
112        storage: storage_status,
113        vectordb: vectordb_status,
114        embedding: embedding_status,
115        embedder_mode,
116    };
117
118    let is_ready = components.storage == REFLEX_STATUS_READY
119        && components.vectordb == REFLEX_STATUS_READY
120        && components.embedding == REFLEX_STATUS_READY;
121
122    let status_code = if is_ready {
123        StatusCode::OK
124    } else {
125        StatusCode::SERVICE_UNAVAILABLE
126    };
127    let status_msg = if is_ready { "ok" } else { "pending" };
128
129    let mut headers = HeaderMap::new();
130    headers.insert(
131        REFLEX_STATUS_HEADER,
132        HeaderValue::from_str(status_msg).unwrap_or(HeaderValue::from_static("error")),
133    );
134
135    (
136        status_code,
137        headers,
138        Json(ReadyResponse {
139            status: status_msg,
140            components,
141        }),
142    )
143        .into_response()
144}