Skip to main content

kode_bridge/
parser_cache.rs

1use parking_lot::Mutex;
2use std::collections::VecDeque;
3use std::sync::Arc;
4use tracing::debug;
5
6/// Type alias for complex HTTP parsing result
7pub type HttpParseResult = Result<(String, String, Vec<(String, String)>), httparse::Error>;
8
9/// Thread-safe HTTP parser cache to avoid repeated allocations
10pub struct HttpParserCache {
11    parsers: Arc<Mutex<VecDeque<ReusableParser>>>,
12    max_cache_size: usize,
13}
14
15/// Reusable HTTP parser - simplified version without lifetime issues
16#[derive(Default)]
17pub struct ReusableParser {
18    // We'll create headers buffer on each use to avoid lifetime issues
19    // This is still more efficient than creating parsers themselves repeatedly
20}
21
22impl ReusableParser {
23    /// Create a new reusable parser
24    pub fn new() -> Self {
25        Self::default()
26    }
27
28    /// Parse HTTP response
29    pub fn parse_response(&mut self, buffer: &[u8]) -> Result<(u16, Vec<(String, String)>), httparse::Error> {
30        // Use a local header buffer - still more efficient than recreating parser objects
31        let mut headers = vec![httparse::EMPTY_HEADER; 128];
32        let mut response = httparse::Response::new(headers.as_mut_slice());
33
34        match response.parse(buffer)? {
35            httparse::Status::Complete(_) => {
36                let status_code = response.code.ok_or(httparse::Error::Status)?;
37
38                // Extract headers
39                let mut parsed_headers = Vec::with_capacity(response.headers.len());
40                for header in response.headers.iter() {
41                    parsed_headers.push((
42                        header.name.to_string(),
43                        String::from_utf8_lossy(header.value).to_string(),
44                    ));
45                }
46
47                Ok((status_code, parsed_headers))
48            }
49            httparse::Status::Partial => {
50                // 这不应该发生,因为我们应该已经读取了完整的头部
51                Err(httparse::Error::Status)
52            }
53        }
54    }
55
56    /// Parse HTTP request
57    pub fn parse_request(&mut self, buffer: &[u8]) -> HttpParseResult {
58        // Use a local header buffer
59        let mut headers = vec![httparse::EMPTY_HEADER; 128];
60        let mut request = httparse::Request::new(headers.as_mut_slice());
61
62        match request.parse(buffer)? {
63            httparse::Status::Complete(_) => {
64                let method = request.method.ok_or(httparse::Error::Status)?;
65                let path = request.path.ok_or(httparse::Error::Status)?;
66
67                // Extract headers
68                let mut parsed_headers = Vec::with_capacity(request.headers.len());
69                for header in request.headers.iter() {
70                    parsed_headers.push((
71                        header.name.to_string(),
72                        String::from_utf8_lossy(header.value).to_string(),
73                    ));
74                }
75
76                Ok((method.to_string(), path.to_string(), parsed_headers))
77            }
78            httparse::Status::Partial => Err(httparse::Error::Status), // 需要更多数据,而不是头部太多
79        }
80    }
81
82    /// Reset parser state for reuse (no-op in simplified version)
83    pub const fn reset(&mut self) {
84        // No state to reset in simplified version
85    }
86}
87
88impl HttpParserCache {
89    /// Create a new parser cache
90    pub fn new(max_cache_size: usize) -> Self {
91        Self {
92            parsers: Arc::new(Mutex::new(VecDeque::with_capacity(max_cache_size))),
93            max_cache_size,
94        }
95    }
96
97    /// Get a parser from cache or create new one
98    pub fn get(&self) -> CachedParser {
99        let mut parser = {
100            let mut parsers = self.parsers.lock();
101            parsers.pop_front().unwrap_or_else(|| {
102                debug!("Creating new HTTP parser");
103                ReusableParser::new()
104            })
105        };
106
107        parser.reset();
108
109        CachedParser {
110            parser: Some(parser),
111            cache: Arc::downgrade(&self.parsers),
112            max_cache_size: self.max_cache_size,
113        }
114    }
115
116    /// Get current cache size
117    pub fn size(&self) -> usize {
118        self.parsers.lock().len()
119    }
120
121    /// Pre-warm the cache
122    pub fn warm_up(&self, count: usize) {
123        let to_create = {
124            let mut parsers = self.parsers.lock();
125            let current_size = parsers.len();
126            let to_create = (count.saturating_sub(current_size)).min(self.max_cache_size - current_size);
127
128            for _ in 0..to_create {
129                parsers.push_back(ReusableParser::new());
130            }
131
132            to_create
133        };
134
135        debug!("Parser cache warmed up with {} parsers", to_create);
136    }
137}
138
139impl Default for HttpParserCache {
140    fn default() -> Self {
141        Self::new(8) // Default to 8 cached parsers
142    }
143}
144
145/// RAII wrapper that returns parser to cache on drop
146pub struct CachedParser {
147    parser: Option<ReusableParser>,
148    cache: std::sync::Weak<Mutex<VecDeque<ReusableParser>>>,
149    max_cache_size: usize,
150}
151
152impl CachedParser {
153    /// Parse HTTP response
154    pub fn parse_response(&mut self, buffer: &[u8]) -> Result<(u16, Vec<(String, String)>), httparse::Error> {
155        let parser = self.parser.as_mut().ok_or(httparse::Error::Status)?;
156        parser.parse_response(buffer)
157    }
158
159    /// Parse HTTP request  
160    pub fn parse_request(&mut self, buffer: &[u8]) -> HttpParseResult {
161        let parser = self.parser.as_mut().ok_or(httparse::Error::Status)?;
162        parser.parse_request(buffer)
163    }
164}
165
166impl Drop for CachedParser {
167    fn drop(&mut self) {
168        if let (Some(parser), Some(cache)) = (self.parser.take(), self.cache.upgrade()) {
169            let returned = {
170                let mut parsers = cache.lock();
171                if parsers.len() < self.max_cache_size {
172                    parsers.push_back(parser);
173                    true
174                } else {
175                    false
176                }
177            };
178
179            if returned {
180                debug!("Parser returned to cache");
181            }
182        }
183    }
184}
185
186/// Response caching system for frequently accessed data
187use std::collections::HashMap;
188use std::hash::{Hash, Hasher};
189use std::time::{Duration, Instant};
190
191/// Cache key for responses
192#[derive(Clone, Debug)]
193pub struct CacheKey {
194    method: String,
195    path: String,
196    headers_hash: u64,
197}
198
199impl CacheKey {
200    pub fn new(method: &str, path: &str, headers: &[(String, String)]) -> Self {
201        let mut hasher = std::collections::hash_map::DefaultHasher::new();
202
203        // Hash relevant headers (ignore cache-control, etc.)
204        for (name, value) in headers.iter().filter(|(name, _)| {
205            let name_lower = name.to_lowercase();
206            !name_lower.starts_with("cache-")
207                && name_lower != "date"
208                && name_lower != "last-modified"
209                && name_lower != "etag"
210        }) {
211            name.hash(&mut hasher);
212            value.hash(&mut hasher);
213        }
214
215        Self {
216            method: method.to_string(),
217            path: path.to_string(),
218            headers_hash: hasher.finish(),
219        }
220    }
221}
222
223impl Hash for CacheKey {
224    fn hash<H: Hasher>(&self, state: &mut H) {
225        self.method.hash(state);
226        self.path.hash(state);
227        self.headers_hash.hash(state);
228    }
229}
230
231impl PartialEq for CacheKey {
232    fn eq(&self, other: &Self) -> bool {
233        self.method == other.method && self.path == other.path && self.headers_hash == other.headers_hash
234    }
235}
236
237impl Eq for CacheKey {}
238
239/// Cached response entry
240#[derive(Clone)]
241pub struct CachedResponse {
242    pub status_code: u16,
243    pub headers: Vec<(String, String)>,
244    pub body: bytes::Bytes,
245    pub cached_at: Instant,
246    pub expires_at: Option<Instant>,
247}
248
249/// Simple LRU response cache
250pub struct ResponseCache {
251    cache: Arc<Mutex<HashMap<CacheKey, CachedResponse>>>,
252    max_entries: usize,
253}
254
255impl ResponseCache {
256    pub fn new(max_entries: usize, _default_ttl: Duration) -> Self {
257        Self {
258            cache: Arc::new(Mutex::new(HashMap::with_capacity(max_entries))),
259            max_entries,
260        }
261    }
262
263    /// Get cached response if available and not expired
264    pub fn get(&self, key: &CacheKey) -> Option<CachedResponse> {
265        let mut cache = self.cache.lock();
266
267        if let Some(entry) = cache.get(key) {
268            // Check if expired
269            if let Some(expires_at) = entry.expires_at {
270                if Instant::now() > expires_at {
271                    cache.remove(key);
272                    drop(cache);
273                    return None;
274                }
275            }
276
277            Some(entry.clone())
278        } else {
279            None
280        }
281    }
282
283    /// Store response in cache
284    pub fn put(&self, key: CacheKey, response: CachedResponse) {
285        let mut cache = self.cache.lock();
286
287        // Simple eviction: remove oldest entries if over limit
288        if cache.len() >= self.max_entries {
289            // Find oldest entry to remove
290            let oldest_key = cache
291                .iter()
292                .min_by_key(|(_, v)| v.cached_at)
293                .map(|(k, _)| k.clone());
294
295            if let Some(key_to_remove) = oldest_key {
296                cache.remove(&key_to_remove);
297            }
298        }
299
300        cache.insert(key, response);
301    }
302
303    /// Clear expired entries
304    pub fn cleanup_expired(&self) {
305        let mut cache = self.cache.lock();
306        let now = Instant::now();
307
308        cache.retain(|_, entry| match entry.expires_at {
309            None => true,
310            Some(expires_at) => now <= expires_at,
311        });
312    }
313
314    /// Get cache statistics
315    pub fn stats(&self) -> CacheStats {
316        let cache = self.cache.lock();
317        CacheStats {
318            entries: cache.len(),
319            max_entries: self.max_entries,
320        }
321    }
322
323    /// Clear all cached entries
324    pub fn clear(&self) {
325        let mut cache = self.cache.lock();
326        cache.clear();
327    }
328}
329
330#[derive(Debug, Clone)]
331pub struct CacheStats {
332    pub entries: usize,
333    pub max_entries: usize,
334}
335
336// Global instances
337use std::sync::OnceLock;
338
339static GLOBAL_PARSER_CACHE: OnceLock<HttpParserCache> = OnceLock::new();
340static GLOBAL_RESPONSE_CACHE: OnceLock<ResponseCache> = OnceLock::new();
341
342/// Get global parser cache
343pub fn global_parser_cache() -> &'static HttpParserCache {
344    GLOBAL_PARSER_CACHE.get_or_init(|| {
345        let cache = HttpParserCache::new(8);
346        cache.warm_up(4);
347        cache
348    })
349}
350
351/// Get global response cache
352pub fn global_response_cache() -> &'static ResponseCache {
353    GLOBAL_RESPONSE_CACHE.get_or_init(|| {
354        ResponseCache::new(100, Duration::from_secs(300)) // 100 entries, 5 min TTL
355    })
356}
357
358#[cfg(test)]
359mod tests {
360    use super::*;
361
362    #[test]
363    fn test_parser_cache() {
364        let cache = HttpParserCache::new(2);
365
366        {
367            let mut parser1 = cache.get();
368            let http_response = b"HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nhello";
369            let result = parser1.parse_response(http_response).unwrap();
370            assert_eq!(result.0, 200);
371            assert_eq!(result.1.len(), 1);
372            assert_eq!(result.1[0].0, "Content-Length");
373        }
374
375        // Parser should be returned to cache
376        assert_eq!(cache.size(), 1);
377
378        {
379            let mut parser2 = cache.get();
380            let http_request = b"GET /api HTTP/1.1\r\nHost: localhost\r\n\r\n";
381            let result = parser2.parse_request(http_request).unwrap();
382            assert_eq!(result.0, "GET");
383            assert_eq!(result.1, "/api");
384            assert_eq!(result.2.len(), 1);
385        }
386    }
387
388    #[test]
389    fn test_response_cache() {
390        let cache = ResponseCache::new(2, Duration::from_secs(1));
391
392        let key = CacheKey::new("GET", "/api", &[]);
393        let response = CachedResponse {
394            status_code: 200,
395            headers: vec![("Content-Type".to_string(), "application/json".to_string())],
396            body: bytes::Bytes::from("test"),
397            cached_at: Instant::now(),
398            expires_at: Some(Instant::now() + Duration::from_secs(1)),
399        };
400
401        cache.put(key.clone(), response);
402
403        let cached = cache.get(&key).unwrap();
404        assert_eq!(cached.status_code, 200);
405        assert_eq!(cached.body, bytes::Bytes::from("test"));
406
407        // Test expiration
408        std::thread::sleep(Duration::from_millis(1100));
409        assert!(cache.get(&key).is_none());
410    }
411
412    #[test]
413    fn test_cache_key_equality() {
414        let headers1 = vec![
415            ("Content-Type".to_string(), "application/json".to_string()),
416            ("Authorization".to_string(), "Bearer token".to_string()),
417        ];
418
419        let headers2 = vec![
420            ("Content-Type".to_string(), "application/json".to_string()),
421            ("Authorization".to_string(), "Bearer token".to_string()),
422            ("Date".to_string(), "Wed, 21 Oct 2015 07:28:00 GMT".to_string()),
423        ];
424
425        let key1 = CacheKey::new("GET", "/api", &headers1);
426        let key2 = CacheKey::new("GET", "/api", &headers2);
427
428        // Keys should be equal because Date header is ignored
429        assert_eq!(key1, key2);
430    }
431}