1pub mod anthropic;
2pub mod compress;
3pub mod cost;
4pub mod forward;
5pub mod google;
6pub mod history_prune;
7pub mod introspect;
8pub mod metrics;
9pub mod openai;
10pub mod openai_responses;
11pub mod tool_kind;
12
13use std::net::SocketAddr;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::Arc;
16
17use axum::{
18 body::Body,
19 extract::State,
20 http::{Request, StatusCode},
21 response::{IntoResponse, Response},
22 routing::{any, get},
23 Router,
24};
25
26#[derive(Clone)]
27pub struct ProxyState {
28 pub client: reqwest::Client,
29 pub port: u16,
30 pub stats: Arc<ProxyStats>,
31 pub introspect: Arc<introspect::IntrospectState>,
32 pub anthropic_upstream: String,
33 pub openai_upstream: String,
34 pub gemini_upstream: String,
35}
36
37pub struct ProxyStats {
38 pub requests_total: AtomicU64,
39 pub requests_compressed: AtomicU64,
40 pub tokens_saved: AtomicU64,
41 pub bytes_original: AtomicU64,
42 pub bytes_compressed: AtomicU64,
43}
44
45impl Default for ProxyStats {
46 fn default() -> Self {
47 Self {
48 requests_total: AtomicU64::new(0),
49 requests_compressed: AtomicU64::new(0),
50 tokens_saved: AtomicU64::new(0),
51 bytes_original: AtomicU64::new(0),
52 bytes_compressed: AtomicU64::new(0),
53 }
54 }
55}
56
57impl ProxyStats {
58 pub fn record_request(&self) {
59 self.requests_total.fetch_add(1, Ordering::Relaxed);
60 }
61
62 pub fn record_compression(&self, original: usize, compressed: usize) {
63 self.requests_compressed.fetch_add(1, Ordering::Relaxed);
64 self.bytes_original
65 .fetch_add(original as u64, Ordering::Relaxed);
66 self.bytes_compressed
67 .fetch_add(compressed as u64, Ordering::Relaxed);
68 let saved_tokens = (original.saturating_sub(compressed) / 4) as u64;
69 self.tokens_saved.fetch_add(saved_tokens, Ordering::Relaxed);
70 }
71
72 pub fn compression_ratio(&self) -> f64 {
73 let original = self.bytes_original.load(Ordering::Relaxed);
74 if original == 0 {
75 return 0.0;
76 }
77 let compressed = self.bytes_compressed.load(Ordering::Relaxed);
78 (1.0 - compressed as f64 / original as f64) * 100.0
79 }
80}
81
82fn connect_timeout_secs() -> u64 {
84 std::env::var("LEAN_CTX_PROXY_CONNECT_TIMEOUT_SECS")
85 .ok()
86 .and_then(|v| v.trim().parse::<u64>().ok())
87 .filter(|s| *s > 0)
88 .unwrap_or(15)
89}
90
91fn read_idle_timeout_secs() -> u64 {
96 std::env::var("LEAN_CTX_PROXY_READ_TIMEOUT_SECS")
97 .ok()
98 .and_then(|v| v.trim().parse::<u64>().ok())
99 .filter(|s| *s > 0)
100 .unwrap_or(300)
101}
102
103pub async fn start_proxy(port: u16) -> anyhow::Result<()> {
104 let token = crate::core::session_token::resolve_proxy_token("LEAN_CTX_PROXY_TOKEN");
105 start_proxy_with_token(port, Some(token)).await
106}
107
108pub async fn start_proxy_with_token(port: u16, auth_token: Option<String>) -> anyhow::Result<()> {
109 use crate::core::config::{Config, ProxyProvider};
110
111 let client = reqwest::Client::builder()
116 .connect_timeout(std::time::Duration::from_secs(connect_timeout_secs()))
117 .read_timeout(std::time::Duration::from_secs(read_idle_timeout_secs()))
118 .build()?;
119
120 let cfg = Config::load();
121 let anthropic_upstream = cfg.proxy.resolve_upstream(ProxyProvider::Anthropic);
122 let openai_upstream = cfg.proxy.resolve_upstream(ProxyProvider::OpenAi);
123 let gemini_upstream = cfg.proxy.resolve_upstream(ProxyProvider::Gemini);
124
125 let state = ProxyState {
126 client,
127 port,
128 stats: Arc::new(ProxyStats::default()),
129 introspect: Arc::new(introspect::IntrospectState::default()),
130 anthropic_upstream: anthropic_upstream.clone(),
131 openai_upstream: openai_upstream.clone(),
132 gemini_upstream: gemini_upstream.clone(),
133 };
134
135 let mut app = Router::new()
136 .route("/health", get(health))
137 .route("/status", get(status_handler))
138 .route("/v1/messages", any(anthropic::handler))
139 .route("/v1/messages/{*rest}", any(anthropic::handler))
140 .route("/v1/chat/completions", any(openai::handler))
141 .route("/v1/responses", any(openai_responses::handler))
142 .route("/v1/responses/{*rest}", any(openai_responses::handler))
143 .route("/messages", any(anthropic::handler))
149 .route("/messages/{*rest}", any(anthropic::handler))
150 .route("/chat/completions", any(openai::handler))
151 .route("/responses", any(openai_responses::handler))
152 .route("/responses/{*rest}", any(openai_responses::handler))
153 .route("/v1/references/{id}", get(v1_resolve_reference))
154 .fallback(fallback_router)
155 .layer(axum::middleware::from_fn(host_guard))
156 .with_state(state);
157
158 if let Some(ref token) = auth_token {
159 let expected = token.clone();
160 app = app.layer(axum::middleware::from_fn(move |req, next| {
161 let expected = expected.clone();
162 proxy_auth_guard(req, next, expected)
163 }));
164 }
165
166 app = app.layer(axum::middleware::from_fn(normalize_provider_path));
170
171 let addr = SocketAddr::from(([127, 0, 0, 1], port));
172 if auth_token.is_some() {
173 println!("lean-ctx proxy listening on http://{addr} (token auth enabled)");
174 } else {
175 println!("lean-ctx proxy listening on http://{addr} (no auth — set LEAN_CTX_PROXY_TOKEN to enable)");
176 }
177 println!(" Anthropic: POST /v1/messages → {anthropic_upstream}");
178 println!(" OpenAI: POST /v1/chat/completions → {openai_upstream}");
179 println!(" OpenAI: POST /v1/responses → {openai_upstream}");
180 println!(" Gemini: POST /v1beta/models/... → {gemini_upstream}");
181
182 let listener = tokio::net::TcpListener::bind(addr).await?;
183 axum::serve(listener, app)
184 .with_graceful_shutdown(shutdown_signal())
185 .await?;
186
187 println!("lean-ctx proxy shut down cleanly.");
188 Ok(())
189}
190
191async fn shutdown_signal() {
192 let ctrl_c = tokio::signal::ctrl_c();
193
194 #[cfg(unix)]
195 {
196 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
199 Ok(mut sigterm) => {
200 tokio::select! {
201 _ = ctrl_c => {},
202 _ = sigterm.recv() => {},
203 }
204 }
205 Err(e) => {
206 tracing::warn!("lean-ctx proxy: SIGTERM handler unavailable ({e}); Ctrl-C only");
207 ctrl_c.await.ok();
208 }
209 }
210 }
211
212 #[cfg(not(unix))]
213 {
214 ctrl_c.await.ok();
215 }
216
217 println!("lean-ctx proxy: received shutdown signal, draining…");
218}
219
220async fn health() -> impl IntoResponse {
221 let body = serde_json::json!({
222 "status": "ok",
223 "pid": std::process::id(),
224 });
225 (StatusCode::OK, axum::Json(body))
226}
227
228async fn v1_resolve_reference(
229 axum::extract::Path(id): axum::extract::Path<String>,
230) -> impl IntoResponse {
231 match crate::server::reference_store::resolve(&id) {
232 Some(content) => (StatusCode::OK, content),
233 None => (
234 StatusCode::NOT_FOUND,
235 "Reference expired or not found".to_string(),
236 ),
237 }
238}
239
240async fn status_handler(State(state): State<ProxyState>) -> impl IntoResponse {
241 use std::sync::atomic::Ordering::Relaxed;
242 let s = &state.stats;
243 let i = &state.introspect;
244
245 let last_breakdown = i
246 .last_breakdown
247 .lock()
248 .ok()
249 .and_then(|guard| guard.as_ref().map(|b| serde_json::to_value(b).ok()))
250 .flatten();
251
252 let body = serde_json::json!({
253 "status": "running",
254 "port": state.port,
255 "requests_total": s.requests_total.load(Relaxed),
256 "requests_compressed": s.requests_compressed.load(Relaxed),
257 "tokens_saved": s.tokens_saved.load(Relaxed),
258 "tokens_saved_estimated": true,
259 "bytes_original": s.bytes_original.load(Relaxed),
260 "bytes_compressed": s.bytes_compressed.load(Relaxed),
261 "compression_ratio_pct": format!("{:.1}", s.compression_ratio()),
262 "per_model": cost::snapshot(),
263 "note": "Savings are request-side (tokens removed before forwarding); they do not subtract any re-reads the agent performs. Token figures are estimates; USD uses the shared model price table.",
264 "introspect": {
265 "total_requests_analyzed": i.total_requests.load(Relaxed),
266 "total_system_prompt_tokens": i.total_system_prompt_tokens.load(Relaxed),
267 "last_breakdown": last_breakdown,
268 }
269 });
270 (StatusCode::OK, axum::Json(body))
271}
272
273async fn proxy_auth_guard(
274 req: axum::extract::Request,
275 next: axum::middleware::Next,
276 expected_token: String,
277) -> Result<Response, Response> {
278 let path = req.uri().path();
279 if path == "/health" {
280 return Ok(next.run(req).await);
281 }
282
283 if let Some(auth) = req
285 .headers()
286 .get("authorization")
287 .and_then(|v| v.to_str().ok())
288 {
289 if let Some(token) = auth.strip_prefix("Bearer ") {
290 if constant_time_eq(token.as_bytes(), expected_token.as_bytes()) {
291 return Ok(next.run(req).await);
292 }
293 }
294 }
295
296 if has_provider_api_key(&req) && is_provider_route(path) {
301 return Ok(next.run(req).await);
302 }
303
304 let cfg = crate::core::config::Config::load();
305 let hint = match cfg.proxy_enabled {
306 Some(true) => "lean-ctx proxy requires authentication. Use a Bearer token (LEAN_CTX_PROXY_TOKEN) or configure your AI tool's API key.",
307 Some(false) => "lean-ctx proxy is disabled but still running. Run: lean-ctx proxy cleanup",
308 None => "lean-ctx proxy is not configured. Your AI tool's ANTHROPIC_BASE_URL may be pointing here by mistake. Fix: lean-ctx proxy cleanup OR lean-ctx proxy enable",
309 };
310
311 let body = serde_json::json!({
312 "type": "error",
313 "error": {
314 "type": "authentication_error",
315 "message": format!("401 Unauthorized — {hint}")
316 }
317 });
318
319 Err((StatusCode::UNAUTHORIZED, axum::Json(body)).into_response())
320}
321
322fn has_provider_api_key(req: &axum::extract::Request) -> bool {
323 let headers = req.headers();
324 for key in ["x-api-key", "x-goog-api-key", "api-key"] {
327 if headers
328 .get(key)
329 .and_then(|v| v.to_str().ok())
330 .is_some_and(|v| !v.trim().is_empty())
331 {
332 return true;
333 }
334 }
335 if let Some(auth) = headers.get("authorization").and_then(|v| v.to_str().ok()) {
344 let auth = auth.trim();
345 let credential = auth
346 .strip_prefix("Bearer ")
347 .or_else(|| auth.strip_prefix("bearer "))
348 .unwrap_or(auth)
349 .trim();
350 return !credential.is_empty() && !credential.eq_ignore_ascii_case("bearer");
352 }
353 false
354}
355
356fn is_provider_route(path: &str) -> bool {
357 path.starts_with("/v1/")
358 || path.starts_with("/v1beta/")
359 || path.starts_with("/chat/completions")
360 || path.starts_with("/responses")
361 || path.starts_with("/messages")
362}
363
364fn canonical_provider_path(path: &str) -> Option<String> {
375 const BARE_TO_CANONICAL: &[(&str, &str)] = &[
376 ("/responses", "/v1/responses"),
377 ("/chat/completions", "/v1/chat/completions"),
378 ("/messages", "/v1/messages"),
379 ];
380 for (bare, canonical) in BARE_TO_CANONICAL {
381 if path == *bare {
382 return Some((*canonical).to_string());
383 }
384 if let Some(rest) = path.strip_prefix(&format!("{bare}/")) {
385 return Some(format!("{canonical}/{rest}"));
386 }
387 }
388 None
389}
390
391fn normalized_provider_uri(uri: &axum::http::Uri) -> Option<axum::http::Uri> {
395 let canonical = canonical_provider_path(uri.path())?;
396 let new_path_and_query = match uri.query() {
397 Some(q) => format!("{canonical}?{q}"),
398 None => canonical,
399 };
400 new_path_and_query.parse::<axum::http::Uri>().ok()
401}
402
403async fn normalize_provider_path(
407 mut req: axum::extract::Request,
408 next: axum::middleware::Next,
409) -> Response {
410 if let Some(uri) = normalized_provider_uri(req.uri()) {
411 *req.uri_mut() = uri;
412 }
413 next.run(req).await
414}
415
416fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
417 use subtle::ConstantTimeEq;
418 if a.len() != b.len() {
419 return false;
420 }
421 bool::from(a.ct_eq(b))
422}
423
424async fn host_guard(
425 req: axum::extract::Request,
426 next: axum::middleware::Next,
427) -> Result<Response, StatusCode> {
428 if let Some(host) = req.headers().get("host").and_then(|v| v.to_str().ok()) {
429 let h = host.split(':').next().unwrap_or(host);
430 if matches!(h, "127.0.0.1" | "localhost" | "[::1]") {
431 return Ok(next.run(req).await);
432 }
433 }
434 Err(StatusCode::FORBIDDEN)
435}
436
437async fn fallback_router(State(state): State<ProxyState>, req: Request<Body>) -> Response {
438 let path = req.uri().path().to_string();
439
440 if path.starts_with("/v1beta/models/") || path.starts_with("/v1/models/") {
441 match google::handler(State(state), req).await {
442 Ok(resp) => resp,
443 Err(status) => Response::builder()
444 .status(status)
445 .body(Body::from("proxy error"))
446 .expect("BUG: building error response with valid status should never fail"),
447 }
448 } else {
449 let method = req.method().to_string();
450 eprintln!("lean-ctx proxy: unmatched {method} {path}");
451 Response::builder()
452 .status(StatusCode::NOT_FOUND)
453 .body(Body::from(format!(
454 "lean-ctx proxy: no handler for {method} {path}"
455 )))
456 .expect("BUG: building 404 response should never fail")
457 }
458}
459
460#[cfg(test)]
461mod auth_tests {
462 use super::*;
463
464 #[test]
465 fn is_provider_route_v1() {
466 assert!(is_provider_route("/v1/chat/completions"));
467 assert!(is_provider_route("/v1/messages"));
468 assert!(is_provider_route("/v1/completions"));
469 }
470
471 #[test]
472 fn is_provider_route_anthropic_subpaths() {
473 assert!(is_provider_route("/v1/messages/count_tokens"));
474 assert!(is_provider_route("/v1/messages/batches"));
475 assert!(is_provider_route("/v1/messages/batches/batch_123"));
476 }
477
478 #[test]
479 fn is_provider_route_v1beta() {
480 assert!(is_provider_route("/v1beta/models"));
481 }
482
483 #[test]
484 fn is_provider_route_chat() {
485 assert!(is_provider_route("/chat/completions"));
486 }
487
488 #[test]
489 fn is_provider_route_rejects_non_provider() {
490 assert!(!is_provider_route("/health"));
491 assert!(!is_provider_route("/api/v2/test"));
492 assert!(!is_provider_route("/"));
493 }
494
495 fn build_request(headers: &[(&str, &str)], path: &str) -> axum::extract::Request {
496 let mut builder = axum::http::Request::builder().uri(path);
497 for (k, v) in headers {
498 builder = builder.header(*k, *v);
499 }
500 builder.body(axum::body::Body::empty()).unwrap()
501 }
502
503 #[test]
504 fn has_provider_api_key_x_api_key() {
505 let req = build_request(&[("x-api-key", "sk-ant-abc123")], "/v1/messages");
506 assert!(has_provider_api_key(&req));
507 }
508
509 #[test]
510 fn has_provider_api_key_x_goog() {
511 let req = build_request(&[("x-goog-api-key", "AIzaSyAbc")], "/v1beta/models");
512 assert!(has_provider_api_key(&req));
513 }
514
515 #[test]
516 fn has_provider_api_key_azure() {
517 let req = build_request(&[("api-key", "deadbeef")], "/v1/completions");
518 assert!(has_provider_api_key(&req));
519 }
520
521 #[test]
522 fn has_provider_api_key_bearer_sk() {
523 let req = build_request(
524 &[("authorization", "Bearer sk-proj-abc123")],
525 "/v1/chat/completions",
526 );
527 assert!(has_provider_api_key(&req));
528 }
529
530 #[test]
531 fn has_provider_api_key_empty_rejected() {
532 let req = build_request(&[("x-api-key", " ")], "/v1/messages");
533 assert!(!has_provider_api_key(&req));
534 }
535
536 #[test]
537 fn has_provider_api_key_no_headers() {
538 let req = build_request(&[], "/v1/messages");
539 assert!(!has_provider_api_key(&req));
540 }
541
542 #[test]
543 fn has_provider_api_key_accepts_non_sk_bearer() {
544 for key in [
550 "Bearer or-v1-9f8e7d6c", "Bearer gsk_live_1234", "Bearer abc.def.ghi", "Bearer 0123456789", ] {
555 let req = build_request(&[("authorization", key)], "/v1/responses");
556 assert!(
557 has_provider_api_key(&req),
558 "non-sk Bearer must count as a provider credential: {key}"
559 );
560 }
561 }
562
563 #[test]
564 fn has_provider_api_key_empty_bearer_rejected() {
565 for bad in ["Bearer ", "", "Bearer", "bearer", " "] {
568 let req = build_request(&[("authorization", bad)], "/responses");
569 assert!(
570 !has_provider_api_key(&req),
571 "blank/scheme-only Authorization must not authenticate: {bad:?}"
572 );
573 }
574 }
575
576 #[test]
579 fn is_provider_route_bare_responses_and_messages() {
580 assert!(is_provider_route("/responses"));
583 assert!(is_provider_route("/responses/resp_123/input_items"));
584 assert!(is_provider_route("/messages"));
585 }
586
587 #[test]
588 fn canonical_provider_path_rewrites_bare_endpoints() {
589 assert_eq!(
590 canonical_provider_path("/responses").as_deref(),
591 Some("/v1/responses")
592 );
593 assert_eq!(
594 canonical_provider_path("/chat/completions").as_deref(),
595 Some("/v1/chat/completions")
596 );
597 assert_eq!(
598 canonical_provider_path("/messages").as_deref(),
599 Some("/v1/messages")
600 );
601 }
602
603 #[test]
604 fn canonical_provider_path_preserves_subpaths() {
605 assert_eq!(
606 canonical_provider_path("/responses/resp_abc/cancel").as_deref(),
607 Some("/v1/responses/resp_abc/cancel")
608 );
609 assert_eq!(
610 canonical_provider_path("/messages/batches/batch_1").as_deref(),
611 Some("/v1/messages/batches/batch_1")
612 );
613 }
614
615 #[test]
616 fn canonical_provider_path_ignores_already_canonical_and_unknown() {
617 assert_eq!(canonical_provider_path("/v1/responses"), None);
619 assert_eq!(canonical_provider_path("/v1/chat/completions"), None);
620 assert_eq!(canonical_provider_path("/health"), None);
622 assert_eq!(canonical_provider_path("/responsesx"), None);
623 assert_eq!(canonical_provider_path("/"), None);
624 }
625
626 #[test]
627 fn normalized_provider_uri_rewrites_path_and_preserves_query() {
628 use axum::http::Uri;
629 let uri: Uri = "/responses?stream=true".parse().unwrap();
630 let rewritten = normalized_provider_uri(&uri).expect("bare /responses must rewrite");
631 assert_eq!(rewritten.path(), "/v1/responses");
632 assert_eq!(rewritten.query(), Some("stream=true"));
633 assert_eq!(
634 rewritten
635 .path_and_query()
636 .map(axum::http::uri::PathAndQuery::as_str),
637 Some("/v1/responses?stream=true")
638 );
639 }
640
641 #[test]
642 fn normalized_provider_uri_noop_for_canonical() {
643 use axum::http::Uri;
644 let uri: Uri = "/v1/responses".parse().unwrap();
645 assert!(normalized_provider_uri(&uri).is_none());
646 }
647}