offline_intelligence/api/
memory_api.rs1use axum::{
4 extract::{Path, State},
5 http::StatusCode,
6 response::IntoResponse,
7 Json,
8};
9use serde_json::json;
10use std::sync::Arc;
11use tracing::{info, warn, error};
12use serde::{Deserialize, Serialize};
13
14use crate::shared_state::SharedState;
15use crate::metrics;
16
17#[derive(Debug)]
21pub struct ApiError {
22 pub status: StatusCode,
23 pub message: String,
24}
25
26impl IntoResponse for ApiError {
27 fn into_response(self) -> axum::response::Response {
28 (
29 self.status,
30 Json(json!({
31 "error": self.message,
32 "code": self.status.as_u16(),
33 })),
34 )
35 .into_response()
36 }
37}
38
39fn validate_session_id(session_id: &str) -> Result<(), ApiError> {
43 if session_id.is_empty() {
44 return Err(ApiError {
45 status: StatusCode::BAD_REQUEST,
46 message: "Session ID cannot be empty".to_string(),
47 });
48 }
49
50 if session_id.len() > 256 {
51 return Err(ApiError {
52 status: StatusCode::BAD_REQUEST,
53 message: "Session ID too long (max 256 chars)".to_string(),
54 });
55 }
56
57 if !session_id
58 .chars()
59 .all(|c| c.is_alphanumeric() || c == '-' || c == '_')
60 {
61 return Err(ApiError {
62 status: StatusCode::BAD_REQUEST,
63 message: "Session ID contains invalid characters".to_string(),
64 });
65 }
66
67 Ok(())
68}
69
70fn validate_messages(messages: &[crate::memory::Message]) -> Result<(), ApiError> {
72 if messages.is_empty() {
73 return Err(ApiError {
74 status: StatusCode::BAD_REQUEST,
75 message: "At least one message is required".to_string(),
76 });
77 }
78
79 if messages.len() > 1000 {
80 return Err(ApiError {
81 status: StatusCode::BAD_REQUEST,
82 message: "Too many messages (max 1000)".to_string(),
83 });
84 }
85
86 for (idx, msg) in messages.iter().enumerate() {
87 if msg.role.is_empty() || msg.content.is_empty() {
88 return Err(ApiError {
89 status: StatusCode::BAD_REQUEST,
90 message: format!("Message {} has empty role or content", idx + 1),
91 });
92 }
93
94 if msg.content.len() > 65_536 {
95 return Err(ApiError {
96 status: StatusCode::BAD_REQUEST,
97 message: format!(
98 "Message {} content exceeds 64KB limit",
99 idx + 1
100 ),
101 });
102 }
103
104 if msg.content.contains('\0') {
105 return Err(ApiError {
106 status: StatusCode::BAD_REQUEST,
107 message: format!(
108 "Message {} contains illegal null bytes",
109 idx + 1
110 ),
111 });
112 }
113 }
114
115 Ok(())
116}
117
118#[derive(Debug, Serialize)]
121pub struct SessionStats {
122 pub total_messages: usize,
123 pub optimized_messages: usize,
124 pub compression_ratio: f32,
125 pub last_accessed: Option<String>,
126 pub memory_size_bytes: Option<usize>,
127}
128
129#[derive(Debug, Serialize)]
130pub struct CleanupStats {
131 pub messages_removed: usize,
132 pub final_count: usize,
133 pub memory_freed_bytes: Option<usize>,
134}
135
136pub async fn memory_optimize(
140 State(shared_state): State<Arc<SharedState>>,
141 Json(payload): Json<MemoryOptimizeRequest>,
142) -> Result<impl IntoResponse, ApiError> {
143 validate_session_id(&payload.session_id)?;
145 validate_messages(&payload.messages)?;
146
147 if let Some(ref query) = payload.user_query {
148 if query.len() > 8_192 {
149 return Err(ApiError {
150 status: StatusCode::BAD_REQUEST,
151 message: "User query too long (max 8KB)".to_string(),
152 });
153 }
154 }
155
156 let mut orchestrator_guard = shared_state.context_orchestrator.write().await;
158
159 if let Some(orchestrator) = &mut *orchestrator_guard {
160 match orchestrator
161 .process_conversation(
162 &payload.session_id,
163 &payload.messages,
164 payload.user_query.as_deref(),
165 )
166 .await
167 {
168 Ok(optimized) => {
169 metrics::inc_request("memory_optimize", "ok");
170
171 let original_len: usize = payload.messages.len();
172 let optimized_len: usize = optimized.len();
173
174 let response = json!({
175 "optimized_messages": optimized,
176 "original_count": original_len,
177 "optimized_count": optimized_len,
178 "compression_ratio": if original_len > 0 {
179 (original_len as f32 - optimized_len as f32) / original_len as f32
180 } else {
181 0.0
182 }
183 });
184
185 Ok((StatusCode::OK, Json(response)))
186 }
187 Err(e) => {
188 metrics::inc_request("memory_optimize", "error");
189 warn!(
190 "Optimization failed for session {}: {}",
191 payload.session_id,
192 e
193 );
194 Err(ApiError {
195 status: StatusCode::INTERNAL_SERVER_ERROR,
196 message: format!("Optimization failed: {}", e),
197 })
198 }
199 }
200 } else {
201 metrics::inc_request("memory_optimize", "disabled");
202 Err(ApiError {
203 status: StatusCode::SERVICE_UNAVAILABLE,
204 message: "Memory system not available".to_string(),
205 })
206 }
207}
208
209pub async fn memory_stats(
211 State(shared_state): State<Arc<SharedState>>,
212 Path(session_id): Path<String>,
213) -> Result<impl IntoResponse, ApiError> {
214 validate_session_id(&session_id)?;
215
216 let orchestrator_guard = shared_state.context_orchestrator.read().await;
217
218 if let Some(orchestrator) = &*orchestrator_guard {
219 match orchestrator.get_session_stats(&session_id).await {
220 Ok(session_stats) => {
221 let stats = SessionStats {
222 total_messages: session_stats.tier_stats.tier1_count +
223 session_stats.tier_stats.tier2_count +
224 session_stats.tier_stats.tier3_count,
225 optimized_messages: session_stats.tier_stats.tier1_count,
226 compression_ratio: if session_stats.tier_stats.tier1_count > 0 {
227 (session_stats.tier_stats.tier2_count as f32 + session_stats.tier_stats.tier3_count as f32)
228 / session_stats.tier_stats.tier1_count as f32
229 } else {
230 0.0
231 },
232 last_accessed: None, memory_size_bytes: Some((session_stats.tier_stats.tier1_count +
234 session_stats.tier_stats.tier2_count +
235 session_stats.tier_stats.tier3_count) * 1024), };
237
238 metrics::inc_request("memory_stats", "ok");
239 Ok((StatusCode::OK, Json(stats)))
240 }
241 Err(e) => {
242 metrics::inc_request("memory_stats", "error");
243 warn!("Failed to get session stats for {}: {}", session_id, e);
244 Err(ApiError {
245 status: StatusCode::INTERNAL_SERVER_ERROR,
246 message: format!("Failed to retrieve session statistics: {}", e),
247 })
248 }
249 }
250 } else {
251 metrics::inc_request("memory_stats", "disabled");
252 Err(ApiError {
253 status: StatusCode::SERVICE_UNAVAILABLE,
254 message: "Memory system not available".to_string(),
255 })
256 }
257}
258
259pub async fn memory_cleanup(
261 State(shared_state): State<Arc<SharedState>>,
262 Json(payload): Json<MemoryCleanupRequest>,
263) -> Result<impl IntoResponse, ApiError> {
264 if !(3_600..=31_536_000).contains(&payload.older_than_seconds) {
266 return Err(ApiError {
267 status: StatusCode::BAD_REQUEST,
268 message: "Cleanup threshold must be between 1 hour and 1 year".to_string(),
269 });
270 }
271
272 let mut orchestrator_guard = shared_state.context_orchestrator.write().await;
273
274 if let Some(orchestrator) = &mut *orchestrator_guard {
275 match orchestrator.cleanup(payload.older_than_seconds).await {
276 Ok(cleanup_stats) => {
277 let stats = CleanupStats {
278 messages_removed: cleanup_stats.sessions_cleaned + cleanup_stats.cache_entries_cleaned,
279 final_count: cleanup_stats.sessions_cleaned, memory_freed_bytes: Some(cleanup_stats.cache_entries_cleaned * 1024), };
282
283 info!("Memory cleanup completed: {:?}", stats);
284 metrics::inc_request("memory_cleanup", "ok");
285 Ok((StatusCode::OK, Json(stats)))
286 }
287 Err(e) => {
288 metrics::inc_request("memory_cleanup", "error");
289 error!("Memory cleanup failed: {}", e);
290 Err(ApiError {
291 status: StatusCode::INTERNAL_SERVER_ERROR,
292 message: format!("Memory cleanup failed: {}", e),
293 })
294 }
295 }
296 } else {
297 metrics::inc_request("memory_cleanup", "disabled");
298 Err(ApiError {
299 status: StatusCode::SERVICE_UNAVAILABLE,
300 message: "Memory system not available".to_string(),
301 })
302 }
303}
304
305#[derive(Debug, Deserialize)]
308pub struct MemoryOptimizeRequest {
309 pub session_id: String,
310 pub messages: Vec<crate::memory::Message>,
311 pub user_query: Option<String>,
312}
313
314#[derive(Debug, Deserialize)]
315pub struct MemoryCleanupRequest {
316 pub older_than_seconds: u64,
317}