kode_bridge/
parser_cache.rs

1use parking_lot::Mutex;
2use std::collections::VecDeque;
3use std::sync::Arc;
4use tracing::debug;
5
6/// Type alias for complex HTTP parsing result
7pub type HttpParseResult = Result<(String, String, Vec<(String, String)>), httparse::Error>;
8
9/// Thread-safe HTTP parser cache to avoid repeated allocations
10pub struct HttpParserCache {
11    parsers: Arc<Mutex<VecDeque<ReusableParser>>>,
12    max_cache_size: usize,
13}
14
15/// Reusable HTTP parser - simplified version without lifetime issues
16#[derive(Default)]
17pub struct ReusableParser {
18    // We'll create headers buffer on each use to avoid lifetime issues
19    // This is still more efficient than creating parsers themselves repeatedly
20}
21
22impl ReusableParser {
23    /// Create a new reusable parser
24    pub fn new() -> Self {
25        Self::default()
26    }
27
28    /// Parse HTTP response
29    pub fn parse_response(
30        &mut self,
31        buffer: &[u8],
32    ) -> Result<(u16, Vec<(String, String)>), httparse::Error> {
33        // Use a local header buffer - still more efficient than recreating parser objects
34        let mut headers = [httparse::EMPTY_HEADER; 128];
35        let mut response = httparse::Response::new(&mut headers);
36
37        match response.parse(buffer)? {
38            httparse::Status::Complete(_) => {
39                let status_code = response.code.ok_or(httparse::Error::Status)?;
40
41                // Extract headers
42                let mut parsed_headers = Vec::with_capacity(response.headers.len());
43                for header in response.headers.iter() {
44                    parsed_headers.push((
45                        header.name.to_string(),
46                        String::from_utf8_lossy(header.value).to_string(),
47                    ));
48                }
49
50                Ok((status_code, parsed_headers))
51            }
52            httparse::Status::Partial => {
53                // 这不应该发生,因为我们应该已经读取了完整的头部
54                Err(httparse::Error::Status)
55            }
56        }
57    }
58
59    /// Parse HTTP request
60    pub fn parse_request(&mut self, buffer: &[u8]) -> HttpParseResult {
61        // Use a local header buffer
62        let mut headers = [httparse::EMPTY_HEADER; 128];
63        let mut request = httparse::Request::new(&mut headers);
64
65        match request.parse(buffer)? {
66            httparse::Status::Complete(_) => {
67                let method = request.method.ok_or(httparse::Error::Status)?;
68                let path = request.path.ok_or(httparse::Error::Status)?;
69
70                // Extract headers
71                let mut parsed_headers = Vec::with_capacity(request.headers.len());
72                for header in request.headers.iter() {
73                    parsed_headers.push((
74                        header.name.to_string(),
75                        String::from_utf8_lossy(header.value).to_string(),
76                    ));
77                }
78
79                Ok((method.to_string(), path.to_string(), parsed_headers))
80            }
81            httparse::Status::Partial => Err(httparse::Error::Status), // 需要更多数据,而不是头部太多
82        }
83    }
84
85    /// Reset parser state for reuse (no-op in simplified version)
86    pub fn reset(&mut self) {
87        // No state to reset in simplified version
88    }
89}
90
91impl HttpParserCache {
92    /// Create a new parser cache
93    pub fn new(max_cache_size: usize) -> Self {
94        Self {
95            parsers: Arc::new(Mutex::new(VecDeque::with_capacity(max_cache_size))),
96            max_cache_size,
97        }
98    }
99
100    /// Get a parser from cache or create new one
101    pub fn get(&self) -> CachedParser {
102        let mut parser = {
103            let mut parsers = self.parsers.lock();
104            parsers.pop_front().unwrap_or_else(|| {
105                debug!("Creating new HTTP parser");
106                ReusableParser::new()
107            })
108        };
109
110        parser.reset();
111
112        CachedParser {
113            parser: Some(parser),
114            cache: Arc::downgrade(&self.parsers),
115            max_cache_size: self.max_cache_size,
116        }
117    }
118
119    /// Get current cache size
120    pub fn size(&self) -> usize {
121        self.parsers.lock().len()
122    }
123
124    /// Pre-warm the cache
125    pub fn warm_up(&self, count: usize) {
126        let mut parsers = self.parsers.lock();
127        let current_size = parsers.len();
128        let to_create =
129            (count.saturating_sub(current_size)).min(self.max_cache_size - current_size);
130
131        for _ in 0..to_create {
132            parsers.push_back(ReusableParser::new());
133        }
134
135        debug!("Parser cache warmed up with {} parsers", to_create);
136    }
137}
138
139impl Default for HttpParserCache {
140    fn default() -> Self {
141        Self::new(8) // Default to 8 cached parsers
142    }
143}
144
145/// RAII wrapper that returns parser to cache on drop
146pub struct CachedParser {
147    parser: Option<ReusableParser>,
148    cache: std::sync::Weak<Mutex<VecDeque<ReusableParser>>>,
149    max_cache_size: usize,
150}
151
152impl CachedParser {
153    /// Parse HTTP response
154    pub fn parse_response(
155        &mut self,
156        buffer: &[u8],
157    ) -> Result<(u16, Vec<(String, String)>), httparse::Error> {
158        self.parser
159            .as_mut()
160            .expect("Parser not available")
161            .parse_response(buffer)
162    }
163
164    /// Parse HTTP request  
165    pub fn parse_request(&mut self, buffer: &[u8]) -> HttpParseResult {
166        self.parser
167            .as_mut()
168            .expect("Parser not available")
169            .parse_request(buffer)
170    }
171}
172
173impl Drop for CachedParser {
174    fn drop(&mut self) {
175        if let (Some(parser), Some(cache)) = (self.parser.take(), self.cache.upgrade()) {
176            let mut parsers = cache.lock();
177            if parsers.len() < self.max_cache_size {
178                parsers.push_back(parser);
179                debug!("Parser returned to cache");
180            }
181        }
182    }
183}
184
185/// Response caching system for frequently accessed data
186use std::collections::HashMap;
187use std::hash::{Hash, Hasher};
188use std::time::{Duration, Instant};
189
190/// Cache key for responses
191#[derive(Clone, Debug)]
192pub struct CacheKey {
193    method: String,
194    path: String,
195    headers_hash: u64,
196}
197
198impl CacheKey {
199    pub fn new(method: &str, path: &str, headers: &[(String, String)]) -> Self {
200        let mut hasher = std::collections::hash_map::DefaultHasher::new();
201
202        // Hash relevant headers (ignore cache-control, etc.)
203        let relevant_headers: Vec<_> = headers
204            .iter()
205            .filter(|(name, _)| {
206                let name_lower = name.to_lowercase();
207                !name_lower.starts_with("cache-")
208                    && name_lower != "date"
209                    && name_lower != "last-modified"
210                    && name_lower != "etag"
211            })
212            .collect();
213
214        relevant_headers.hash(&mut hasher);
215
216        Self {
217            method: method.to_string(),
218            path: path.to_string(),
219            headers_hash: hasher.finish(),
220        }
221    }
222}
223
224impl Hash for CacheKey {
225    fn hash<H: Hasher>(&self, state: &mut H) {
226        self.method.hash(state);
227        self.path.hash(state);
228        self.headers_hash.hash(state);
229    }
230}
231
232impl PartialEq for CacheKey {
233    fn eq(&self, other: &Self) -> bool {
234        self.method == other.method
235            && self.path == other.path
236            && self.headers_hash == other.headers_hash
237    }
238}
239
240impl Eq for CacheKey {}
241
242/// Cached response entry
243#[derive(Clone)]
244pub struct CachedResponse {
245    pub status_code: u16,
246    pub headers: Vec<(String, String)>,
247    pub body: bytes::Bytes,
248    pub cached_at: Instant,
249    pub expires_at: Option<Instant>,
250}
251
252/// Simple LRU response cache
253pub struct ResponseCache {
254    cache: Arc<Mutex<HashMap<CacheKey, CachedResponse>>>,
255    max_entries: usize,
256}
257
258impl ResponseCache {
259    pub fn new(max_entries: usize, _default_ttl: Duration) -> Self {
260        Self {
261            cache: Arc::new(Mutex::new(HashMap::with_capacity(max_entries))),
262            max_entries,
263        }
264    }
265
266    /// Get cached response if available and not expired
267    pub fn get(&self, key: &CacheKey) -> Option<CachedResponse> {
268        let mut cache = self.cache.lock();
269
270        if let Some(entry) = cache.get(key) {
271            // Check if expired
272            if let Some(expires_at) = entry.expires_at {
273                if Instant::now() > expires_at {
274                    cache.remove(key);
275                    return None;
276                }
277            }
278
279            Some(entry.clone())
280        } else {
281            None
282        }
283    }
284
285    /// Store response in cache
286    pub fn put(&self, key: CacheKey, response: CachedResponse) {
287        let mut cache = self.cache.lock();
288
289        // Simple eviction: remove oldest entries if over limit
290        if cache.len() >= self.max_entries {
291            // Find oldest entry to remove
292            let oldest_key = cache
293                .iter()
294                .min_by_key(|(_, v)| v.cached_at)
295                .map(|(k, _)| k.clone());
296
297            if let Some(key_to_remove) = oldest_key {
298                cache.remove(&key_to_remove);
299            }
300        }
301
302        cache.insert(key, response);
303    }
304
305    /// Clear expired entries
306    pub fn cleanup_expired(&self) {
307        let mut cache = self.cache.lock();
308        let now = Instant::now();
309
310        cache.retain(|_, entry| match entry.expires_at {
311            None => true,
312            Some(expires_at) => now <= expires_at,
313        });
314    }
315
316    /// Get cache statistics
317    pub fn stats(&self) -> CacheStats {
318        let cache = self.cache.lock();
319        CacheStats {
320            entries: cache.len(),
321            max_entries: self.max_entries,
322        }
323    }
324
325    /// Clear all cached entries
326    pub fn clear(&self) {
327        let mut cache = self.cache.lock();
328        cache.clear();
329    }
330}
331
332#[derive(Debug, Clone)]
333pub struct CacheStats {
334    pub entries: usize,
335    pub max_entries: usize,
336}
337
338// Global instances
339use std::sync::OnceLock;
340
341static GLOBAL_PARSER_CACHE: OnceLock<HttpParserCache> = OnceLock::new();
342static GLOBAL_RESPONSE_CACHE: OnceLock<ResponseCache> = OnceLock::new();
343
344/// Get global parser cache
345pub fn global_parser_cache() -> &'static HttpParserCache {
346    GLOBAL_PARSER_CACHE.get_or_init(|| {
347        let cache = HttpParserCache::new(8);
348        cache.warm_up(4);
349        cache
350    })
351}
352
353/// Get global response cache
354pub fn global_response_cache() -> &'static ResponseCache {
355    GLOBAL_RESPONSE_CACHE.get_or_init(|| {
356        ResponseCache::new(100, Duration::from_secs(300)) // 100 entries, 5 min TTL
357    })
358}
359
360#[cfg(test)]
361mod tests {
362    use super::*;
363
364    #[test]
365    fn test_parser_cache() {
366        let cache = HttpParserCache::new(2);
367
368        {
369            let mut parser1 = cache.get();
370            let http_response = b"HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nhello";
371            let result = parser1.parse_response(http_response).unwrap();
372            assert_eq!(result.0, 200);
373            assert_eq!(result.1.len(), 1);
374            assert_eq!(result.1[0].0, "Content-Length");
375        }
376
377        // Parser should be returned to cache
378        assert_eq!(cache.size(), 1);
379
380        {
381            let mut parser2 = cache.get();
382            let http_request = b"GET /api HTTP/1.1\r\nHost: localhost\r\n\r\n";
383            let result = parser2.parse_request(http_request).unwrap();
384            assert_eq!(result.0, "GET");
385            assert_eq!(result.1, "/api");
386            assert_eq!(result.2.len(), 1);
387        }
388    }
389
390    #[test]
391    fn test_response_cache() {
392        let cache = ResponseCache::new(2, Duration::from_secs(1));
393
394        let key = CacheKey::new("GET", "/api", &[]);
395        let response = CachedResponse {
396            status_code: 200,
397            headers: vec![("Content-Type".to_string(), "application/json".to_string())],
398            body: bytes::Bytes::from("test"),
399            cached_at: Instant::now(),
400            expires_at: Some(Instant::now() + Duration::from_secs(1)),
401        };
402
403        cache.put(key.clone(), response.clone());
404
405        let cached = cache.get(&key).unwrap();
406        assert_eq!(cached.status_code, 200);
407        assert_eq!(cached.body, bytes::Bytes::from("test"));
408
409        // Test expiration
410        std::thread::sleep(Duration::from_millis(1100));
411        assert!(cache.get(&key).is_none());
412    }
413
414    #[test]
415    fn test_cache_key_equality() {
416        let headers1 = vec![
417            ("Content-Type".to_string(), "application/json".to_string()),
418            ("Authorization".to_string(), "Bearer token".to_string()),
419        ];
420
421        let headers2 = vec![
422            ("Content-Type".to_string(), "application/json".to_string()),
423            ("Authorization".to_string(), "Bearer token".to_string()),
424            (
425                "Date".to_string(),
426                "Wed, 21 Oct 2015 07:28:00 GMT".to_string(),
427            ),
428        ];
429
430        let key1 = CacheKey::new("GET", "/api", &headers1);
431        let key2 = CacheKey::new("GET", "/api", &headers2);
432
433        // Keys should be equal because Date header is ignored
434        assert_eq!(key1, key2);
435    }
436}