Skip to main content

lean_ctx/proxy/
mod.rs

1pub mod anthropic;
2pub mod compress;
3pub mod forward;
4pub mod google;
5pub mod openai;
6
7use std::net::SocketAddr;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10
11use axum::{
12    body::Body,
13    extract::State,
14    http::{Request, StatusCode},
15    response::{IntoResponse, Response},
16    routing::{any, get},
17    Router,
18};
19
20#[derive(Clone)]
21pub struct ProxyState {
22    pub client: reqwest::Client,
23    pub port: u16,
24    pub stats: Arc<ProxyStats>,
25}
26
27pub struct ProxyStats {
28    pub requests_total: AtomicU64,
29    pub requests_compressed: AtomicU64,
30    pub tokens_saved: AtomicU64,
31    pub bytes_original: AtomicU64,
32    pub bytes_compressed: AtomicU64,
33}
34
35impl Default for ProxyStats {
36    fn default() -> Self {
37        Self {
38            requests_total: AtomicU64::new(0),
39            requests_compressed: AtomicU64::new(0),
40            tokens_saved: AtomicU64::new(0),
41            bytes_original: AtomicU64::new(0),
42            bytes_compressed: AtomicU64::new(0),
43        }
44    }
45}
46
47impl ProxyStats {
48    pub fn record_request(&self) {
49        self.requests_total.fetch_add(1, Ordering::Relaxed);
50    }
51
52    pub fn record_compression(&self, original: usize, compressed: usize) {
53        self.requests_compressed.fetch_add(1, Ordering::Relaxed);
54        self.bytes_original
55            .fetch_add(original as u64, Ordering::Relaxed);
56        self.bytes_compressed
57            .fetch_add(compressed as u64, Ordering::Relaxed);
58        let saved_tokens = (original.saturating_sub(compressed) / 4) as u64;
59        self.tokens_saved.fetch_add(saved_tokens, Ordering::Relaxed);
60    }
61
62    pub fn compression_ratio(&self) -> f64 {
63        let original = self.bytes_original.load(Ordering::Relaxed);
64        if original == 0 {
65            return 0.0;
66        }
67        let compressed = self.bytes_compressed.load(Ordering::Relaxed);
68        (1.0 - compressed as f64 / original as f64) * 100.0
69    }
70}
71
72pub async fn start_proxy(port: u16) -> anyhow::Result<()> {
73    let client = reqwest::Client::builder()
74        .timeout(std::time::Duration::from_secs(120))
75        .build()?;
76
77    let state = ProxyState {
78        client,
79        port,
80        stats: Arc::new(ProxyStats::default()),
81    };
82
83    let app = Router::new()
84        .route("/health", get(health))
85        .route("/status", get(status_handler))
86        .route("/v1/messages", any(anthropic::handler))
87        .route("/v1/chat/completions", any(openai::handler))
88        .fallback(fallback_router)
89        .with_state(state);
90
91    let addr = SocketAddr::from(([127, 0, 0, 1], port));
92    println!("lean-ctx proxy listening on http://{addr}");
93    println!("  Anthropic: POST /v1/messages");
94    println!("  OpenAI:    POST /v1/chat/completions");
95    println!("  Gemini:    POST /v1beta/models/...");
96
97    let listener = tokio::net::TcpListener::bind(addr).await?;
98    axum::serve(listener, app).await?;
99
100    Ok(())
101}
102
103async fn health() -> impl IntoResponse {
104    (StatusCode::OK, "ok")
105}
106
107async fn status_handler(State(state): State<ProxyState>) -> impl IntoResponse {
108    use std::sync::atomic::Ordering::Relaxed;
109    let s = &state.stats;
110    let body = serde_json::json!({
111        "status": "running",
112        "port": state.port,
113        "requests_total": s.requests_total.load(Relaxed),
114        "requests_compressed": s.requests_compressed.load(Relaxed),
115        "tokens_saved": s.tokens_saved.load(Relaxed),
116        "bytes_original": s.bytes_original.load(Relaxed),
117        "bytes_compressed": s.bytes_compressed.load(Relaxed),
118        "compression_ratio_pct": format!("{:.1}", s.compression_ratio()),
119    });
120    (StatusCode::OK, axum::Json(body))
121}
122
123async fn fallback_router(State(state): State<ProxyState>, req: Request<Body>) -> Response {
124    let path = req.uri().path().to_string();
125
126    if path.starts_with("/v1beta/models/") || path.starts_with("/v1/models/") {
127        match google::handler(State(state), req).await {
128            Ok(resp) => resp,
129            Err(status) => Response::builder()
130                .status(status)
131                .body(Body::from("proxy error"))
132                .unwrap(),
133        }
134    } else {
135        let method = req.method().to_string();
136        eprintln!("lean-ctx proxy: unmatched {method} {path}");
137        Response::builder()
138            .status(StatusCode::NOT_FOUND)
139            .body(Body::from(format!(
140                "lean-ctx proxy: no handler for {method} {path}"
141            )))
142            .unwrap()
143    }
144}