Skip to main content

crabllm_proxy/ext/
cache.rs

1use crate::PREFIX_CACHE;
2use axum::{Router, http::StatusCode, routing::delete};
3use crabllm_core::{BoxFuture, RequestContext, Storage, storage_key};
4use sha2::{Digest, Sha256};
5use std::{
6    sync::Arc,
7    time::{SystemTime, UNIX_EPOCH},
8};
9
10pub struct Cache {
11    storage: Arc<dyn Storage>,
12    ttl_seconds: u64,
13}
14
15impl Cache {
16    pub fn new(config: &serde_json::Value, storage: Arc<dyn Storage>) -> Result<Self, String> {
17        let ttl_seconds = config
18            .get("ttl_seconds")
19            .and_then(|v| v.as_i64())
20            .unwrap_or(300) as u64;
21
22        Ok(Self {
23            storage,
24            ttl_seconds,
25        })
26    }
27
28    fn cache_key(raw_request: &[u8]) -> Vec<u8> {
29        let mut hasher = Sha256::new();
30        hasher.update(raw_request);
31        storage_key(&PREFIX_CACHE, &hasher.finalize())
32    }
33
34    fn now_secs() -> u64 {
35        SystemTime::now()
36            .duration_since(UNIX_EPOCH)
37            .unwrap_or_default()
38            .as_secs()
39    }
40
41    pub fn admin_routes(&self) -> Router {
42        let storage = self.storage.clone();
43        let prefix = PREFIX_CACHE;
44        Router::new().route(
45            "/v1/cache",
46            delete(move || {
47                let storage = storage.clone();
48                async move {
49                    let pairs = storage.list(&prefix).await.unwrap_or_default();
50                    for (key, _) in pairs {
51                        let _ = storage.delete(&key).await;
52                    }
53                    StatusCode::NO_CONTENT
54                }
55            }),
56        )
57    }
58}
59
60impl crabllm_core::Extension for Cache {
61    fn name(&self) -> &str {
62        "cache"
63    }
64
65    fn prefix(&self) -> crabllm_core::Prefix {
66        PREFIX_CACHE
67    }
68
69    fn on_cache_lookup(&self, raw_request: &[u8]) -> BoxFuture<'_, Option<Vec<u8>>> {
70        let key = Self::cache_key(raw_request);
71        let ttl = self.ttl_seconds;
72
73        Box::pin(async move {
74            let data = self.storage.get(&key).await.ok()??;
75            if data.len() < 8 {
76                return None;
77            }
78
79            let timestamp = u64::from_be_bytes(data[..8].try_into().ok()?);
80            if Self::now_secs().saturating_sub(timestamp) > ttl {
81                let _ = self.storage.delete(&key).await;
82                return None;
83            }
84
85            Some(data[8..].to_vec())
86        })
87    }
88
89    fn on_response(
90        &self,
91        ctx: &RequestContext,
92        raw_request: &[u8],
93        raw_response: &[u8],
94    ) -> BoxFuture<'_, ()> {
95        if ctx.is_stream {
96            return Box::pin(async {});
97        }
98
99        let key = Self::cache_key(raw_request);
100        let mut value = Vec::with_capacity(8 + raw_response.len());
101        value.extend_from_slice(&Self::now_secs().to_be_bytes());
102        value.extend_from_slice(raw_response);
103
104        Box::pin(async move {
105            let _ = self.storage.set(&key, value).await;
106        })
107    }
108}