reflex_server/gateway/
mod.rs1#![allow(missing_docs)]
8
9pub mod adapter;
10pub mod error;
11pub mod handler;
12pub mod payload;
13pub mod state;
14pub mod streaming;
15
16#[cfg(test)]
17mod handler_tests;
18
19use axum::{
20 Json, Router,
21 http::{HeaderMap, StatusCode, header::HeaderValue},
22 response::{IntoResponse, Response},
23 routing::{get, post},
24};
25use tower_http::trace::TraceLayer;
26
27pub use handler::chat_completions_handler;
28pub use state::HandlerState;
29
30use reflex::cache::{
31 BqSearchBackend, REFLEX_STATUS_ERROR, REFLEX_STATUS_HEADER, REFLEX_STATUS_HEALTHY,
32 REFLEX_STATUS_READY, StorageLoader,
33};
34use reflex::storage::StorageWriter;
35
36pub fn create_router_with_state<B, S>(state: HandlerState<B, S>) -> Router
37where
38 B: BqSearchBackend + Clone + Send + Sync + 'static,
39 S: StorageLoader + StorageWriter + Clone + Send + Sync + 'static,
40{
41 Router::new()
42 .route("/healthz", get(health_handler))
43 .route("/ready", get(ready_handler))
44 .route("/v1/chat/completions", post(chat_completions_handler))
45 .layer(TraceLayer::new_for_http())
46 .with_state(state)
47}
48
49#[derive(serde::Serialize)]
50pub struct HealthResponse {
51 pub status: &'static str,
52}
53
54#[derive(serde::Serialize)]
55pub struct ReadyResponse {
56 pub status: &'static str,
57 pub components: ComponentStatus,
58}
59
60#[derive(serde::Serialize)]
61pub struct ComponentStatus {
62 pub http: &'static str,
63 pub storage: &'static str,
64 pub vectordb: &'static str,
65 pub embedding: &'static str,
66 pub embedder_mode: &'static str,
67}
68
69#[tracing::instrument]
70pub async fn health_handler() -> Response {
71 let mut headers = HeaderMap::new();
72 headers.insert(
73 REFLEX_STATUS_HEADER,
74 HeaderValue::from_static(REFLEX_STATUS_HEALTHY),
75 );
76
77 (
78 StatusCode::OK,
79 headers,
80 Json(HealthResponse { status: "ok" }),
81 )
82 .into_response()
83}
84
85use axum::extract::State;
86
87#[tracing::instrument(skip(state))]
88pub async fn ready_handler<B, S>(State(state): State<HandlerState<B, S>>) -> Response
89where
90 B: BqSearchBackend + Clone + Send + Sync + 'static,
91 S: StorageLoader + Clone + Send + Sync + 'static,
92{
93 let storage_status = if state.storage_path.exists() && state.storage_path.is_dir() {
94 REFLEX_STATUS_READY
95 } else {
96 REFLEX_STATUS_ERROR
97 };
98
99 let vectordb_status = if state.tiered_cache.is_ready().await {
100 REFLEX_STATUS_READY
101 } else {
102 "pending"
103 };
104
105 let embedding_status = REFLEX_STATUS_READY;
106
107 let is_stub = state.tiered_cache.l2().is_embedder_stub();
108 let embedder_mode = if is_stub { "stub" } else { "real" };
109
110 let components = ComponentStatus {
111 http: REFLEX_STATUS_READY,
112 storage: storage_status,
113 vectordb: vectordb_status,
114 embedding: embedding_status,
115 embedder_mode,
116 };
117
118 let is_ready = components.storage == REFLEX_STATUS_READY
119 && components.vectordb == REFLEX_STATUS_READY
120 && components.embedding == REFLEX_STATUS_READY;
121
122 let status_code = if is_ready {
123 StatusCode::OK
124 } else {
125 StatusCode::SERVICE_UNAVAILABLE
126 };
127 let status_msg = if is_ready { "ok" } else { "pending" };
128
129 let mut headers = HeaderMap::new();
130 headers.insert(
131 REFLEX_STATUS_HEADER,
132 HeaderValue::from_str(status_msg).unwrap_or(HeaderValue::from_static("error")),
133 );
134
135 (
136 status_code,
137 headers,
138 Json(ReadyResponse {
139 status: status_msg,
140 components,
141 }),
142 )
143 .into_response()
144}