offline_intelligence/api/
memory_api.rs1use axum::{
4 extract::{Path, State},
5 http::StatusCode,
6 response::IntoResponse,
7 Json,
8};
9use serde_json::json;
10use std::sync::Arc;
11use tracing::{info, warn, error};
12use serde::{Deserialize, Serialize};
13
14use crate::shared_state::SharedState;
15use crate::metrics;
16
17#[derive(Debug)]
21pub struct ApiError {
22 pub status: StatusCode,
23 pub message: String,
24}
25
26impl IntoResponse for ApiError {
27 fn into_response(self) -> axum::response::Response {
28 (
29 self.status,
30 Json(json!({
31 "error": self.message,
32 "code": self.status.as_u16(),
33 })),
34 )
35 .into_response()
36 }
37}
38
39fn validate_session_id(session_id: &str) -> Result<(), ApiError> {
43 if session_id.is_empty() {
44 return Err(ApiError {
45 status: StatusCode::BAD_REQUEST,
46 message: "Session ID cannot be empty".to_string(),
47 });
48 }
49
50 if session_id.len() > 256 {
51 return Err(ApiError {
52 status: StatusCode::BAD_REQUEST,
53 message: "Session ID too long (max 256 chars)".to_string(),
54 });
55 }
56
57 if !session_id
58 .chars()
59 .all(|c| c.is_alphanumeric() || c == '-' || c == '_')
60 {
61 return Err(ApiError {
62 status: StatusCode::BAD_REQUEST,
63 message: "Session ID contains invalid characters".to_string(),
64 });
65 }
66
67 Ok(())
68}
69
70fn validate_messages(messages: &[crate::memory::Message]) -> Result<(), ApiError> {
72 if messages.is_empty() {
73 return Err(ApiError {
74 status: StatusCode::BAD_REQUEST,
75 message: "At least one message is required".to_string(),
76 });
77 }
78
79 if messages.len() > 1000 {
80 return Err(ApiError {
81 status: StatusCode::BAD_REQUEST,
82 message: "Too many messages (max 1000)".to_string(),
83 });
84 }
85
86 for (idx, msg) in messages.iter().enumerate() {
87 if msg.role.is_empty() || msg.content.is_empty() {
88 return Err(ApiError {
89 status: StatusCode::BAD_REQUEST,
90 message: format!("Message {} has empty role or content", idx + 1),
91 });
92 }
93
94 if msg.content.len() > 65_536 {
95 return Err(ApiError {
96 status: StatusCode::BAD_REQUEST,
97 message: format!(
98 "Message {} content exceeds 64KB limit",
99 idx + 1
100 ),
101 });
102 }
103
104 if msg.content.contains('\0') {
105 return Err(ApiError {
106 status: StatusCode::BAD_REQUEST,
107 message: format!(
108 "Message {} contains illegal null bytes",
109 idx + 1
110 ),
111 });
112 }
113 }
114
115 Ok(())
116}
117
118#[derive(Debug, Serialize)]
121pub struct SessionStats {
122 pub total_messages: usize,
123 pub optimized_messages: usize,
124 pub compression_ratio: f32,
125 pub last_accessed: Option<String>,
126 pub memory_size_bytes: Option<usize>,
127}
128
129#[derive(Debug, Serialize)]
130pub struct CleanupStats {
131 pub messages_removed: usize,
132 pub final_count: usize,
133 pub memory_freed_bytes: Option<usize>,
134}
135
136pub async fn memory_optimize(
140 State(shared_state): State<Arc<SharedState>>,
141 Json(payload): Json<MemoryOptimizeRequest>,
142) -> Result<impl IntoResponse, ApiError> {
143 validate_session_id(&payload.session_id)?;
145 validate_messages(&payload.messages)?;
146
147 if let Some(ref query) = payload.user_query {
148 if query.len() > 8_192 {
149 return Err(ApiError {
150 status: StatusCode::BAD_REQUEST,
151 message: "User query too long (max 8KB)".to_string(),
152 });
153 }
154 }
155
156 let orchestrator_guard = shared_state.context_orchestrator.read().await;
158
159 if let Some(orchestrator) = &*orchestrator_guard {
160 match orchestrator
161 .process_conversation(
162 &payload.session_id,
163 &payload.messages,
164 payload.user_query.as_deref(),
165 )
166 .await
167 {
168 Ok(optimized) => {
169 metrics::inc_request("memory_optimize", "ok");
170
171 let original_len: usize = payload.messages.len();
172 let optimized_len: usize = optimized.len();
173
174 let response = json!({
175 "optimized_messages": optimized,
176 "original_count": original_len,
177 "optimized_count": optimized_len,
178 "compression_ratio": if original_len > 0 {
179 (original_len as f32 - optimized_len as f32) / original_len as f32
180 } else {
181 0.0
182 }
183 });
184
185 Ok((StatusCode::OK, Json(response)))
186 }
187 Err(e) => {
188 metrics::inc_request("memory_optimize", "error");
189 warn!(
190 "Optimization failed for session {}: {}",
191 payload.session_id,
192 e
193 );
194 Err(ApiError {
195 status: StatusCode::INTERNAL_SERVER_ERROR,
196 message: format!("Optimization failed: {}", e),
197 })
198 }
199 }
200 } else {
201 metrics::inc_request("memory_optimize", "disabled");
202 Err(ApiError {
203 status: StatusCode::SERVICE_UNAVAILABLE,
204 message: "Memory system not available".to_string(),
205 })
206 }
207}
208
209pub async fn memory_stats(
211 State(shared_state): State<Arc<SharedState>>,
212 Path(session_id): Path<String>,
213) -> Result<impl IntoResponse, ApiError> {
214 validate_session_id(&session_id)?;
215
216 let orchestrator_guard = shared_state.context_orchestrator.read().await;
217
218 if let Some(orchestrator) = &*orchestrator_guard {
219 match orchestrator.get_session_stats(&session_id).await {
220 Ok(session_stats) => {
221 let stats = SessionStats {
222 total_messages: session_stats.tier_stats.tier1_count +
223 session_stats.tier_stats.tier3_count,
224 optimized_messages: session_stats.tier_stats.tier1_count,
225 compression_ratio: if session_stats.tier_stats.tier1_count > 0 {
226 session_stats.tier_stats.tier3_count as f32
227 / session_stats.tier_stats.tier1_count as f32
228 } else {
229 0.0
230 },
231 last_accessed: None,
232 memory_size_bytes: Some((session_stats.tier_stats.tier1_count +
233 session_stats.tier_stats.tier3_count) * 1024),
234 };
235
236 metrics::inc_request("memory_stats", "ok");
237 Ok((StatusCode::OK, Json(stats)))
238 }
239 Err(e) => {
240 metrics::inc_request("memory_stats", "error");
241 warn!("Failed to get session stats for {}: {}", session_id, e);
242 Err(ApiError {
243 status: StatusCode::INTERNAL_SERVER_ERROR,
244 message: format!("Failed to retrieve session statistics: {}", e),
245 })
246 }
247 }
248 } else {
249 metrics::inc_request("memory_stats", "disabled");
250 Err(ApiError {
251 status: StatusCode::SERVICE_UNAVAILABLE,
252 message: "Memory system not available".to_string(),
253 })
254 }
255}
256
257pub async fn memory_cleanup(
259 State(shared_state): State<Arc<SharedState>>,
260 Json(payload): Json<MemoryCleanupRequest>,
261) -> Result<impl IntoResponse, ApiError> {
262 if !(3_600..=31_536_000).contains(&payload.older_than_seconds) {
264 return Err(ApiError {
265 status: StatusCode::BAD_REQUEST,
266 message: "Cleanup threshold must be between 1 hour and 1 year".to_string(),
267 });
268 }
269
270 let mut orchestrator_guard = shared_state.context_orchestrator.write().await;
271
272 if let Some(orchestrator) = &mut *orchestrator_guard {
273 match orchestrator.cleanup(payload.older_than_seconds).await {
274 Ok(cleanup_stats) => {
275 let stats = CleanupStats {
276 messages_removed: cleanup_stats.sessions_cleaned + cleanup_stats.cache_entries_cleaned,
277 final_count: cleanup_stats.sessions_cleaned, memory_freed_bytes: Some(cleanup_stats.cache_entries_cleaned * 1024), };
280
281 info!("Memory cleanup completed: {:?}", stats);
282 metrics::inc_request("memory_cleanup", "ok");
283 Ok((StatusCode::OK, Json(stats)))
284 }
285 Err(e) => {
286 metrics::inc_request("memory_cleanup", "error");
287 error!("Memory cleanup failed: {}", e);
288 Err(ApiError {
289 status: StatusCode::INTERNAL_SERVER_ERROR,
290 message: format!("Memory cleanup failed: {}", e),
291 })
292 }
293 }
294 } else {
295 metrics::inc_request("memory_cleanup", "disabled");
296 Err(ApiError {
297 status: StatusCode::SERVICE_UNAVAILABLE,
298 message: "Memory system not available".to_string(),
299 })
300 }
301}
302
303#[derive(Debug, Deserialize)]
306pub struct MemoryOptimizeRequest {
307 pub session_id: String,
308 pub messages: Vec<crate::memory::Message>,
309 pub user_query: Option<String>,
310}
311
312#[derive(Debug, Deserialize)]
313pub struct MemoryCleanupRequest {
314 pub older_than_seconds: u64,
315}