nntp_proxy/cache/article.rs
1//! Article caching implementation using LRU cache with TTL
2//!
3//! This module provides article caching with per-backend availability tracking.
4//!
5//! # NNTP Response Semantics (CRITICAL)
6//!
7//! **430 "No Such Article" is AUTHORITATIVE** - NNTP servers NEVER give false negatives.
8//! If a server returns 430, the article is definitively not present on that server.
9//! This is a fundamental property of NNTP: servers only return 430 when they are
10//! certain they don't have the article.
11//!
12//! **2xx success responses are UNRELIABLE** - servers CAN give false positives.
13//! A server might return 220/222/223 but then provide:
14//! - Corrupt or incomplete article data
15//! - Truncated responses due to connection issues
16//! - Stale data that gets cleaned up moments later
17//!
18//! **Therefore: 430 (missing) ALWAYS takes precedence over "has" state.**
19//! - When merging availability info, 430s accumulate and never get cleared
20//! - A previous "has" can be overwritten by a later 430
21//! - A previous 430 should NOT be overwritten by a later "has"
22//! - Cache entries with 430 persist until TTL expiry
23//!
24//! **BACKEND LIMIT**: Maximum 8 backends supported due to u8 bitset optimization.
25//! This limit is enforced at config validation time.
26
27use crate::protocol::StatusCode;
28use crate::router::BackendCount;
29use crate::types::{BackendId, MessageId};
30use moka::future::Cache;
31use std::sync::Arc;
32use std::sync::atomic::{AtomicU64, Ordering};
33use std::time::Duration;
34
35use super::ttl;
36
37/// Maximum number of backends supported by ArticleAvailability bitset
38pub const MAX_BACKENDS: usize = 8;
39
40/// Track which backends have (or don't have) a specific article
41///
42/// Uses two u8 bitsets to track availability across up to 8 backends:
43/// - `checked`: Which backends we've queried (attempted to fetch from)
44/// - `missing`: Which backends returned 430 (don't have this article)
45///
46/// # Example with 2 backends
47/// - Initial state: `checked=00`, `missing=00` (haven't tried any backends yet)
48/// - After backend 0 returns 430: `checked=01`, `missing=01` (backend 0 doesn't have it)
49/// - After backend 1 returns 220: `checked=11`, `missing=01` (backend 1 has it)
50/// - If both return 430: `checked=11`, `missing=11` (all backends exhausted)
51///
52/// # Usage Pattern
53/// This type serves TWO critical purposes:
54///
55/// 1. **Cache persistence** - Track availability across requests (long-lived)
56/// - Store in cache with article data
57/// - Avoid querying backends known to be missing
58/// - Updated after every successful/failed fetch
59///
60/// 2. **430 retry loop** - Track which backends tried during single request (transient)
61/// - Create fresh instance for each ARTICLE request
62/// - Mark backends as missing when they return 430
63/// - Stop when all backends exhausted or one succeeds
64///
65/// # Thread Safety
66/// Wrapped in `Arc<Mutex<>>` when stored in cache entries for concurrent updates.
67/// The precheck pattern ensures serial updates to avoid races:
68/// 1. Query all backends concurrently
69/// 2. Wait for all to complete
70/// 3. Update cache serially with all results
71///
72/// Status of a backend for a specific article
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub enum BackendStatus {
75 /// Backend hasn't been checked yet
76 Unknown,
77 /// Backend was checked and returned 430 (doesn't have article)
78 Missing,
79 /// Backend was checked and has the article
80 HasArticle,
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84pub struct ArticleAvailability {
85 /// Bitset of backends we've checked (tried to fetch from)
86 checked: u8,
87 /// Bitset of backends that DON'T have this article (returned 430)
88 missing: u8, // u8 supports up to 8 backends (plenty for NNTP)
89}
90
91impl ArticleAvailability {
92 /// Create empty availability - assume all backends have article until proven otherwise
93 #[inline]
94 pub const fn new() -> Self {
95 Self {
96 checked: 0,
97 missing: 0,
98 }
99 }
100
101 /// Record that a backend returned 430 (doesn't have the article)
102 ///
103 /// # NNTP Semantics
104 /// **430 is AUTHORITATIVE** - this is a definitive "no". Once marked missing,
105 /// the backend should not be retried for this article until cache TTL expires.
106 /// See module-level docs for full explanation.
107 ///
108 /// # Panics (debug builds only)
109 /// Panics if backend_id >= 8. Config validation enforces max 8 backends.
110 #[inline]
111 pub fn record_missing(&mut self, backend_id: BackendId) -> &mut Self {
112 let idx = backend_id.as_index();
113 debug_assert!(
114 idx < MAX_BACKENDS,
115 "Backend index {} exceeds MAX_BACKENDS ({})",
116 idx,
117 MAX_BACKENDS
118 );
119 self.checked |= 1u8 << idx; // Mark as checked
120 self.missing |= 1u8 << idx; // Mark as missing
121 self
122 }
123
124 /// Record that a backend returned a success response (2xx) for this article
125 ///
126 /// # NNTP Semantics Warning
127 /// **2xx responses are UNRELIABLE** - servers can give false positives.
128 /// This clears the missing bit for fresh `ArticleAvailability` instances,
129 /// but when merged via `merge_from()`, existing 430s take precedence.
130 /// See module-level docs for full explanation.
131 ///
132 /// # Panics (debug builds only)
133 /// Panics if backend_id >= 8. Config validation enforces max 8 backends.
134 #[inline]
135 pub fn record_has(&mut self, backend_id: BackendId) -> &mut Self {
136 let idx = backend_id.as_index();
137 debug_assert!(
138 idx < MAX_BACKENDS,
139 "Backend index {} exceeds MAX_BACKENDS ({})",
140 idx,
141 MAX_BACKENDS
142 );
143 self.checked |= 1u8 << idx; // Mark as checked
144 self.missing &= !(1u8 << idx); // Clear missing bit (has the article)
145 self
146 }
147
148 /// Merge another availability's state into this one
149 ///
150 /// Used to sync local availability tracking back to cache.
151 /// Takes the union of checked backends and missing backends.
152 ///
153 /// # NNTP Semantics (CRITICAL)
154 ///
155 /// **430 responses are AUTHORITATIVE** - NNTP servers NEVER give false negatives.
156 /// If a server says "no such article" (430), the article is definitively not there.
157 ///
158 /// **2xx responses are UNRELIABLE** - servers CAN give false positives.
159 /// A server might claim to have an article but return corrupt data, or the
160 /// article might be incomplete/unavailable due to propagation delays.
161 ///
162 /// Therefore: `missing` state ALWAYS wins over `has` state.
163 /// We trust 430s absolutely but treat successes with skepticism.
164 #[inline]
165 pub fn merge_from(&mut self, other: &Self) {
166 // Simple union: trust all 430s from both sources
167 // 430 is authoritative, so missing bits should accumulate
168 self.checked |= other.checked;
169 self.missing |= other.missing;
170 }
171
172 /// Check if a backend is known to be missing (returned 430)
173 ///
174 /// # Panics (debug builds only)
175 /// Panics if backend_id >= 8. Config validation enforces max 8 backends.
176 #[inline]
177 pub fn is_missing(&self, backend_id: BackendId) -> bool {
178 let idx = backend_id.as_index();
179 debug_assert!(
180 idx < MAX_BACKENDS,
181 "Backend index {} exceeds MAX_BACKENDS ({})",
182 idx,
183 MAX_BACKENDS
184 );
185 self.missing & (1u8 << idx) != 0
186 }
187
188 /// Check if we should attempt to fetch from this backend
189 ///
190 /// Returns `true` if backend might have the article (not yet marked missing).
191 ///
192 /// # Panics (debug builds only)
193 /// Panics if backend_id >= 8. Config validation enforces max 8 backends.
194 #[inline]
195 pub fn should_try(&self, backend_id: BackendId) -> bool {
196 !self.is_missing(backend_id)
197 }
198
199 /// Get the raw missing bitset for debugging
200 #[inline]
201 pub const fn missing_bits(&self) -> u8 {
202 self.missing
203 }
204
205 /// Get the raw checked bitset for debugging
206 #[inline]
207 pub const fn checked_bits(&self) -> u8 {
208 self.checked
209 }
210
211 /// Check if all backends in the pool have been tried and returned 430
212 ///
213 /// Check if all backends have been tried and returned 430
214 ///
215 /// # Panics (debug builds only)
216 /// Panics if backend_count > 8. Config validation enforces max 8 backends.
217 #[inline]
218 pub fn all_exhausted(&self, backend_count: BackendCount) -> bool {
219 let count = backend_count.get();
220 debug_assert!(
221 count <= MAX_BACKENDS,
222 "Backend count {} exceeds MAX_BACKENDS ({})",
223 count,
224 MAX_BACKENDS
225 );
226 match count {
227 0 => true,
228 8 => self.missing == 0xFF,
229 n => self.missing & ((1u8 << n) - 1) == (1u8 << n) - 1,
230 }
231 }
232
233 /// Get an iterator over backends that should still be tried
234 ///
235 /// Returns backend IDs that haven't been marked missing yet.
236 pub fn available_backends(
237 &self,
238 backend_count: BackendCount,
239 ) -> impl Iterator<Item = BackendId> + '_ {
240 (0..backend_count.get().min(MAX_BACKENDS))
241 .map(BackendId::from_index)
242 .filter(move |&backend_id| self.should_try(backend_id))
243 }
244
245 /// Get the underlying bitset value (for debugging)
246 #[inline]
247 pub const fn as_u8(&self) -> u8 {
248 self.missing
249 }
250
251 /// Reconstruct from raw bitset values (used for deserialization)
252 ///
253 /// # Safety
254 /// The caller must ensure the bits represent valid backend states.
255 /// This is primarily used when deserializing from disk cache.
256 #[inline]
257 pub const fn from_bits(checked: u8, missing: u8) -> Self {
258 Self { checked, missing }
259 }
260
261 /// Check if we have any backend availability information
262 ///
263 /// Returns true if at least one backend has been checked.
264 /// If this returns false, we haven't tried any backends yet and shouldn't
265 /// serve from cache (should try backends first).
266 #[inline]
267 pub const fn has_availability_info(&self) -> bool {
268 self.checked != 0
269 }
270
271 /// Check if any backend is known to HAVE the article
272 ///
273 /// Returns true if at least one backend was checked and did NOT return 430.
274 /// This is the inverse check from `all_exhausted` - at least one success.
275 #[inline]
276 pub const fn any_backend_has_article(&self) -> bool {
277 // A backend "has" the article if it's checked but not missing
278 // checked & !missing gives us the backends that have it
279 (self.checked & !self.missing) != 0
280 }
281
282 /// Query backend availability status
283 ///
284 /// # Panics (debug builds only)
285 /// Panics if backend_id >= 8. Config validation enforces max 8 backends.
286 #[inline]
287 pub fn status(&self, backend_id: BackendId) -> BackendStatus {
288 let idx = backend_id.as_index();
289 debug_assert!(
290 idx < MAX_BACKENDS,
291 "Backend index {} exceeds MAX_BACKENDS ({})",
292 idx,
293 MAX_BACKENDS
294 );
295 let mask = 1u8 << idx;
296 if self.checked & mask == 0 {
297 BackendStatus::Unknown
298 } else if self.missing & mask != 0 {
299 BackendStatus::Missing
300 } else {
301 BackendStatus::HasArticle
302 }
303 }
304}
305
306impl Default for ArticleAvailability {
307 fn default() -> Self {
308 Self::new()
309 }
310}
311
312/// Cache entry for an article
313///
314/// Stores complete NNTP response buffer plus backend availability tracking.
315/// The buffer is validated once on insert, then can be served directly without re-parsing.
316#[derive(Clone, Debug)]
317pub struct ArticleEntry {
318 /// Backend availability bitset (2 bytes)
319 ///
320 /// No mutex needed: moka clones entries on get(), and updates go through
321 /// cache.insert() which replaces the whole entry atomically.
322 backend_availability: ArticleAvailability,
323
324 /// Complete response buffer (Arc for cheap cloning)
325 /// Format: `220 <msgid>\r\n<headers>\r\n\r\n<body>\r\n.\r\n`
326 /// Status code is always at bytes [0..3]
327 buffer: Arc<Vec<u8>>,
328
329 /// Tier of the backend that provided this article
330 /// Used for tier-aware TTL: higher tier = longer TTL
331 tier: u8,
332
333 /// Unix timestamp when this entry was inserted (milliseconds since epoch)
334 /// Populated via `ttl::now_millis()` and used with `tier` for TTL expiration
335 inserted_at: u64,
336}
337
338impl ArticleEntry {
339 /// Create from response buffer
340 ///
341 /// The buffer should be a complete NNTP response (status line + data + terminator).
342 /// No validation is performed here - caller must ensure buffer is valid.
343 ///
344 /// Backend availability starts with assumption all backends have the article.
345 /// Tier defaults to 0, timestamp is set to now.
346 pub fn new(buffer: Vec<u8>) -> Self {
347 Self {
348 backend_availability: ArticleAvailability::new(),
349 buffer: Arc::new(buffer),
350 tier: 0,
351 inserted_at: ttl::now_millis(),
352 }
353 }
354
355 /// Create from response buffer with a specific tier
356 ///
357 /// Used when caching articles from backends with known tier.
358 pub fn with_tier(buffer: Vec<u8>, tier: u8) -> Self {
359 Self {
360 backend_availability: ArticleAvailability::new(),
361 buffer: Arc::new(buffer),
362 tier,
363 inserted_at: ttl::now_millis(),
364 }
365 }
366
367 /// Create from pre-wrapped Arc buffer
368 ///
369 /// Use this when the buffer is already wrapped in Arc to avoid double wrapping.
370 pub fn from_arc(buffer: Arc<Vec<u8>>) -> Self {
371 Self {
372 backend_availability: ArticleAvailability::new(),
373 buffer,
374 tier: 0,
375 inserted_at: ttl::now_millis(),
376 }
377 }
378
379 /// Check if this entry has expired based on tier-aware TTL
380 ///
381 /// See [`super::ttl`] for the TTL formula.
382 #[inline]
383 pub fn is_expired(&self, base_ttl_millis: u64) -> bool {
384 ttl::is_expired(self.inserted_at, base_ttl_millis, self.tier)
385 }
386
387 /// Get the tier of the backend that provided this article
388 #[inline]
389 pub fn tier(&self) -> u8 {
390 self.tier
391 }
392
393 /// Set the tier (used when updating entry)
394 #[inline]
395 pub fn set_tier(&mut self, tier: u8) {
396 self.tier = tier;
397 }
398
399 /// Get raw buffer for serving to client
400 #[inline]
401 pub fn buffer(&self) -> &Arc<Vec<u8>> {
402 &self.buffer
403 }
404
405 /// Get status code from the response buffer
406 ///
407 /// Parses the first 3 bytes as the status code.
408 /// Returns None if buffer is too short or invalid.
409 #[inline]
410 pub fn status_code(&self) -> Option<StatusCode> {
411 StatusCode::parse(&self.buffer)
412 }
413
414 /// Check if we should try fetching from this backend
415 ///
416 /// Returns false if backend is known to not have this article (returned 430 before).
417 #[inline]
418 pub fn should_try_backend(&self, backend_id: BackendId) -> bool {
419 self.backend_availability.should_try(backend_id)
420 }
421
422 /// Record that a backend returned 430 (doesn't have this article)
423 pub fn record_backend_missing(&mut self, backend_id: BackendId) {
424 self.backend_availability.record_missing(backend_id);
425 }
426
427 /// Record that a backend successfully provided this article
428 pub fn record_backend_has(&mut self, backend_id: BackendId) {
429 self.backend_availability.record_has(backend_id);
430 }
431
432 /// Set backend availability (used for hydrating from hybrid cache)
433 pub fn set_availability(&mut self, availability: ArticleAvailability) {
434 self.backend_availability = availability;
435 }
436
437 /// Check if all backends have been tried and none have the article
438 pub fn all_backends_exhausted(&self, total_backends: BackendCount) -> bool {
439 self.backend_availability.all_exhausted(total_backends)
440 }
441
442 /// Get backends that might have this article
443 pub fn available_backends(&self, total_backends: BackendCount) -> Vec<BackendId> {
444 self.backend_availability
445 .available_backends(total_backends)
446 .collect()
447 }
448
449 /// Get response buffer (backward compatibility)
450 ///
451 /// This provides the same interface as the old CachedArticle.response field.
452 #[inline]
453 pub fn response(&self) -> &Arc<Vec<u8>> {
454 &self.buffer
455 }
456
457 /// Check if this cache entry has useful availability information
458 ///
459 /// Returns true if at least one backend has been tried (marked as missing or has article).
460 /// If false, we haven't tried any backends yet and should run precheck instead of
461 /// serving from cache.
462 #[inline]
463 pub fn has_availability_info(&self) -> bool {
464 self.backend_availability.has_availability_info()
465 }
466
467 /// Check if this cache entry contains a complete article (220) or body (222)
468 ///
469 /// Returns true if:
470 /// 1. Status code is 220 (ARTICLE) or 222 (BODY)
471 /// 2. Buffer contains actual content (not just a stub like "220\r\n")
472 ///
473 /// A complete response ends with ".\r\n" and is significantly longer
474 /// than a stub. Stubs are typically 5-6 bytes (e.g., "220\r\n" or "223\r\n").
475 ///
476 /// This is used when `cache_articles=true` to determine if we can serve
477 /// directly from cache or need to fetch additional data.
478 #[inline]
479 pub fn is_complete_article(&self) -> bool {
480 // Must be a 220 (ARTICLE) or 222 (BODY) response
481 let Some(code) = self.status_code() else {
482 return false;
483 };
484 if code.as_u16() != 220 && code.as_u16() != 222 {
485 return false;
486 }
487
488 // Must have actual content, not just a stub
489 // A stub is typically "220\r\n" (5 bytes) or "222 0 <test@example.com>\r\n" (25-30 bytes)
490 // A real article/body has content + terminator
491 // Minimum valid: "220 0 <x@y>\r\nX: Y\r\n\r\nB\r\n.\r\n" = 30 bytes
492 const MIN_ARTICLE_SIZE: usize = 30;
493 self.buffer.len() >= MIN_ARTICLE_SIZE && self.buffer.ends_with(b".\r\n")
494 }
495
496 /// Get the appropriate response for a command, if this cache entry can serve it
497 ///
498 /// Returns `Some(response_bytes)` if cache can satisfy the command:
499 /// - ARTICLE (220 cached) → returns full cached response
500 /// - BODY (222 cached or 220 cached) → returns cached response
501 /// - HEAD (221 cached or 220 cached) → returns cached response
502 /// - STAT → synthesizes "223 0 <msg-id>\r\n" (we know article exists)
503 ///
504 /// Returns `None` if cached response can't serve this command type or if
505 /// the cached buffer fails validation.
506 pub fn response_for_command(&self, cmd_verb: &str, message_id: &str) -> Option<Vec<u8>> {
507 let code = self.status_code()?.as_u16();
508
509 match (code, cmd_verb) {
510 // STAT just needs existence confirmation - synthesize response
511 (220..=222, "STAT") => Some(format!("223 0 {}\r\n", message_id).into_bytes()),
512 // Direct match - return cached buffer if valid
513 (220, "ARTICLE") | (222, "BODY") | (221, "HEAD") => {
514 if self.is_valid_response() {
515 Some(self.buffer.to_vec())
516 } else {
517 tracing::warn!(
518 code = code,
519 len = self.buffer.len(),
520 "Cached buffer failed validation, discarding"
521 );
522 None
523 }
524 }
525 // ARTICLE (220) contains everything, can serve BODY or HEAD requests
526 (220, "BODY" | "HEAD") => {
527 if self.is_valid_response() {
528 Some(self.buffer.to_vec())
529 } else {
530 tracing::warn!(
531 code = code,
532 len = self.buffer.len(),
533 "Cached buffer failed validation, discarding"
534 );
535 None
536 }
537 }
538 _ => None,
539 }
540 }
541
542 /// Check if buffer contains a valid NNTP multiline response
543 ///
544 /// A valid response must:
545 /// 1. Start with 3 ASCII digits (status code)
546 /// 2. Have CRLF somewhere (line terminator)
547 /// 3. End with .\r\n for multiline responses (220/221/222)
548 #[inline]
549 fn is_valid_response(&self) -> bool {
550 // Must have at least "NNN \r\n.\r\n" = 9 bytes
551 if self.buffer.len() < 9 {
552 return false;
553 }
554
555 // First 3 bytes must be ASCII digits
556 if !self.buffer[0].is_ascii_digit()
557 || !self.buffer[1].is_ascii_digit()
558 || !self.buffer[2].is_ascii_digit()
559 {
560 return false;
561 }
562
563 // Must end with .\r\n for multiline responses
564 if !self.buffer.ends_with(b".\r\n") {
565 return false;
566 }
567
568 // Must have CRLF in first line (status line)
569 memchr::memmem::find(&self.buffer[..self.buffer.len().min(256)], b"\r\n").is_some()
570 }
571
572 /// Check if this entry can serve a given command type
573 ///
574 /// Simpler version of `response_for_command` for boolean checks.
575 #[inline]
576 pub fn matches_command_type_verb(&self, cmd_verb: &str) -> bool {
577 let Some(code) = self.status_code() else {
578 return false;
579 };
580
581 match code.as_u16() {
582 220 => matches!(cmd_verb, "ARTICLE" | "BODY" | "HEAD" | "STAT"),
583 222 => matches!(cmd_verb, "BODY" | "STAT"),
584 221 => matches!(cmd_verb, "HEAD" | "STAT"),
585 _ => false,
586 }
587 }
588
589 /// Initialize availability tracker from this cached entry
590 ///
591 /// Creates a fresh ArticleAvailability with backends marked missing based on
592 /// cached knowledge (backends that previously returned 430).
593 pub fn to_availability(&self, total_backends: BackendCount) -> ArticleAvailability {
594 let mut availability = ArticleAvailability::new();
595
596 // Mark backends we know don't have this article
597 for backend_id in (0..total_backends.get()).map(BackendId::from_index) {
598 if !self.should_try_backend(backend_id) {
599 availability.record_missing(backend_id);
600 }
601 }
602
603 availability
604 }
605}
606
607/// Article cache using LRU eviction with TTL
608///
609/// Uses `Arc<str>` (message ID content without brackets) as key for zero-allocation lookups.
610/// `Arc<str>` implements `Borrow<str>`, allowing `cache.get(&str)` without allocation.
611///
612/// Supports tier-aware TTL: entries from higher tier backends get longer TTLs.
613/// Formula: `effective_ttl = base_ttl * (2 ^ tier)`
614#[derive(Clone, Debug)]
615pub struct ArticleCache {
616 cache: Arc<Cache<Arc<str>, ArticleEntry>>,
617 hits: Arc<AtomicU64>,
618 misses: Arc<AtomicU64>,
619 capacity: u64,
620 cache_articles: bool,
621 /// Base TTL in milliseconds (used for tier-aware expiration via `effective_ttl`)
622 ttl_millis: u64,
623}
624
625impl ArticleCache {
626 /// Create a new article cache
627 ///
628 /// # Arguments
629 /// * `max_capacity` - Maximum cache size in bytes (uses weighted entries)
630 /// * `ttl` - Time-to-live for cached articles
631 /// * `cache_articles` - Whether to cache full article bodies (true) or just availability (false)
632 pub fn new(max_capacity: u64, ttl: Duration, cache_articles: bool) -> Self {
633 // Build cache with byte-based capacity using weigher
634 // max_capacity is total bytes allowed
635 //
636 // IMPORTANT: Moka's time_to_live must be set to the MAXIMUM tier-aware TTL
637 // so that high-tier entries aren't evicted prematurely by Moka.
638 // Our is_expired() method handles per-entry expiration based on tier.
639 //
640 // Moka has a hard limit of ~1000 years for TTL. For very high tier multipliers
641 // (e.g., tier 63 = 2^63), we cap at 100 years which is more than enough.
642 // We handle per-entry expiration ourselves in get() based on actual tier.
643 const MAX_MOKA_TTL: Duration = Duration::from_secs(100 * 365 * 24 * 3600); // 100 years
644
645 // For zero TTL, use Duration::ZERO so entries expire immediately in Moka too
646 let max_tier_ttl = if ttl.is_zero() {
647 Duration::ZERO
648 } else {
649 // Non-zero TTL: multiply by max tier multiplier (2^63) and cap at MAX_MOKA_TTL
650 // Since 2^63 > u32::MAX, the multiplier will overflow u32, so just use MAX_MOKA_TTL
651 MAX_MOKA_TTL
652 };
653 let cache = Cache::builder()
654 .max_capacity(max_capacity)
655 .time_to_live(max_tier_ttl)
656 .weigher(move |key: &Arc<str>, entry: &ArticleEntry| -> u32 {
657 // Calculate actual memory footprint for accurate capacity tracking.
658 //
659 // Memory layout per cache entry:
660 //
661 // Key: Arc<str>
662 // - Arc control block: 16 bytes (strong_count + weak_count)
663 // - String data: key.len() bytes
664 // - Allocator overhead: ~16 bytes (malloc metadata, alignment)
665 //
666 // Value: ArticleEntry
667 // - Struct inline: 16 bytes (ArticleAvailability: 2 + padding: 6 + Arc ptr: 8)
668 // - Arc<Vec<u8>> control block: 16 bytes
669 // - Vec<u8> struct: 24 bytes (ptr + len + capacity)
670 // - Vec data: buffer.len() bytes
671 // - Allocator overhead: ~32 bytes (two allocations: Arc+Vec, Vec data)
672 //
673 // Moka internal per-entry overhead is MUCH larger than the data itself:
674 // - Key stored twice: Bucket.key AND ValueEntry.info.key_hash.key
675 // - EntryInfo<K> struct: ~72 bytes (atomics, timestamps, counters)
676 // - LRU deque nodes and frequency sketch entries
677 // - Timer wheel entries for TTL tracking
678 // - crossbeam-epoch deferred garbage (can retain 2x entries)
679 // - HashMap segments with open addressing (~2x load factor)
680 //
681 // Empirical testing shows ~10x actual RSS vs weighted_size().
682 // Our observed ratio: 362MB RSS / 36MB weighted = 10x
683 //
684 const ARC_STR_OVERHEAD: usize = 16 + 16; // Arc control block + allocator
685 const ENTRY_STRUCT: usize = 16; // ArticleEntry inline size
686 const ARC_VEC_OVERHEAD: usize = 16 + 24 + 32; // Arc + Vec struct + allocator
687 // Moka internal structures - empirically measured to address memory reporting gap.
688 // See moka issue #473: https://github.com/moka-rs/moka/issues/473
689 // Observed ratio: 362MB RSS / 36MB weighted_size() = 10x
690 const MOKA_OVERHEAD: usize = 2000;
691
692 let key_size = ARC_STR_OVERHEAD + key.len();
693 let buffer_size = ARC_VEC_OVERHEAD + entry.buffer().len();
694 let base_size = key_size + buffer_size + ENTRY_STRUCT + MOKA_OVERHEAD;
695
696 // Stubs (availability-only entries) have higher relative overhead
697 // due to allocator fragmentation on small allocations.
698 // Complete articles are dominated by content size, so no multiplier needed.
699 let weighted_size = if entry.is_complete_article() {
700 base_size
701 } else {
702 // Small allocations have ~50% more overhead from allocator fragmentation.
703 // Use a 1.5x multiplier, rounding up, to avoid underestimating small entries.
704 (base_size * 3).div_ceil(2)
705 };
706
707 weighted_size.try_into().unwrap_or(u32::MAX)
708 })
709 .build();
710
711 Self {
712 cache: Arc::new(cache),
713 hits: Arc::new(AtomicU64::new(0)),
714 misses: Arc::new(AtomicU64::new(0)),
715 capacity: max_capacity,
716 cache_articles,
717 ttl_millis: ttl.as_millis() as u64,
718 }
719 }
720
721 /// Get an article from the cache
722 ///
723 /// Accepts any lifetime MessageId and uses the string content (without brackets) as key.
724 ///
725 /// **Zero-allocation**: `without_brackets()` returns `&str`, which moka accepts directly
726 /// for `Arc<str>` keys via the `Borrow<str>` trait. This avoids allocating a new `Arc<str>`
727 /// for every cache lookup. See `test_arc_str_borrow_lookup` test for verification.
728 ///
729 /// **Tier-aware TTL**: Even if moka hasn't expired the entry yet, we check if the entry
730 /// is expired based on tier-aware TTL. Higher tier entries get longer TTLs.
731 pub async fn get<'a>(&self, message_id: &MessageId<'a>) -> Option<ArticleEntry> {
732 // moka::Cache<Arc<str>, V> supports get(&str) via Borrow<str> trait
733 // This is zero-allocation: no Arc<str> is created for the lookup
734 let result = self.cache.get(message_id.without_brackets()).await;
735
736 match result {
737 Some(entry) if !entry.is_expired(self.ttl_millis) => {
738 self.hits.fetch_add(1, Ordering::Relaxed);
739 Some(entry)
740 }
741 Some(_) => {
742 // Entry exists but expired by tier-aware TTL - invalidate and treat as cache miss
743 // Invalidating immediately frees capacity rather than waiting for LRU eviction,
744 // preventing repeated cache misses on the same stale key
745 self.cache.invalidate(message_id.without_brackets()).await;
746 self.misses.fetch_add(1, Ordering::Relaxed);
747 None
748 }
749 None => {
750 self.misses.fetch_add(1, Ordering::Relaxed);
751 None
752 }
753 }
754 }
755
756 /// Upsert cache entry (insert or update) - ATOMIC OPERATION
757 ///
758 /// Uses moka's `entry().and_upsert_with()` for atomic get-modify-store.
759 /// This eliminates the race condition of separate get() + insert() calls
760 /// and provides key-level locking for concurrent operations.
761 ///
762 /// If entry exists: updates the entry and marks backend as having the article
763 /// If entry doesn't exist: inserts new entry
764 ///
765 /// When `cache_articles=false`, extracts status code from buffer and stores minimal stub.
766 /// When `cache_articles=true`, stores full buffer.
767 ///
768 /// The tier is stored with the entry for tier-aware TTL calculation.
769 ///
770 /// CRITICAL: Always re-insert to refresh TTL, and mark backend as having the article.
771 pub async fn upsert<'a>(
772 &self,
773 message_id: MessageId<'a>,
774 buffer: Vec<u8>,
775 backend_id: BackendId,
776 tier: u8,
777 ) {
778 let key: Arc<str> = message_id.without_brackets().into();
779 let cache_articles = self.cache_articles;
780
781 // Prepare the new buffer outside the closure for stub extraction
782 let new_buffer = if cache_articles {
783 buffer
784 } else {
785 self.create_minimal_stub(&buffer)
786 };
787
788 // Wrap in Arc so we can efficiently share/move into closure without clone.
789 // Arc::try_unwrap will give us the Vec back if we're the only owner.
790 let new_buffer = Arc::new(new_buffer);
791
792 // Use atomic upsert - this provides key-level locking and eliminates
793 // the race condition between get() and insert() calls
794 self.cache
795 .entry(key)
796 .and_upsert_with(|maybe_entry| {
797 let new_entry = if let Some(existing) = maybe_entry {
798 let mut entry = existing.into_value();
799
800 // Decide whether to update buffer based on completeness
801 let existing_complete = entry.is_complete_article();
802 let new_complete = new_buffer.len() >= 30 && new_buffer.ends_with(b".\r\n");
803
804 let should_replace = match (existing_complete, new_complete) {
805 (false, true) => true, // Stub → Complete: Always replace
806 (true, false) => false, // Complete → Stub: Never replace
807 (true, true) => new_buffer.len() > entry.buffer.len(), // Both complete: larger wins
808 (false, false) => new_buffer.len() > entry.buffer.len(), // Both stubs: larger wins
809 };
810
811 if should_replace {
812 // Already wrapped in Arc, just clone the Arc (cheap)
813 entry.buffer = Arc::clone(&new_buffer);
814 // Update tier when replacing buffer
815 entry.tier = tier;
816 }
817
818 // Refresh TTL on every successful upsert, independent of buffer replacement
819 entry.inserted_at = ttl::now_millis();
820
821 // Mark backend as having the article
822 entry.record_backend_has(backend_id);
823 entry
824 } else {
825 // New entry with tier
826 let mut entry = ArticleEntry {
827 backend_availability: ArticleAvailability::new(),
828 buffer: Arc::clone(&new_buffer),
829 tier,
830 inserted_at: ttl::now_millis(),
831 };
832 entry.record_backend_has(backend_id);
833 entry
834 };
835
836 std::future::ready(new_entry)
837 })
838 .await;
839 }
840
841 /// Create minimal stub from response buffer
842 ///
843 /// Extracts the status code from the first line and creates a minimal stub.
844 /// Falls back to "200\r\n" if parsing fails.
845 fn create_minimal_stub(&self, buffer: &[u8]) -> Vec<u8> {
846 // Find first line (status code line)
847 if let Some(end) = buffer.iter().position(|&b| b == b'\n') {
848 // Extract status code (first 3 digits)
849 if end >= 3 {
850 let code = &buffer[..3];
851 // Verify it's actually digits
852 if code.iter().all(|&b| b.is_ascii_digit()) {
853 return format!("{}\r\n", String::from_utf8_lossy(code)).into_bytes();
854 }
855 }
856 }
857 // Fallback if we can't parse
858 b"200\r\n".to_vec()
859 }
860
861 /// Record that a backend returned 430 for this article - ATOMIC OPERATION
862 ///
863 /// Uses moka's `entry().and_upsert_with()` for atomic get-modify-store.
864 /// This eliminates the race condition of separate get() + insert() calls
865 /// and provides key-level locking for concurrent operations.
866 ///
867 /// If the article is already cached, updates the availability bitset.
868 /// If not cached, creates a new cache entry with a 430 stub.
869 /// This prevents repeated queries to backends that don't have the article.
870 ///
871 /// Note: We don't store the actual backend 430 response because:
872 /// 1. We always send a standardized 430 to clients, never the backend's response
873 /// 2. The only info we need is the availability bitset (which backends returned 430)
874 pub async fn record_backend_missing<'a>(
875 &self,
876 message_id: MessageId<'a>,
877 backend_id: BackendId,
878 ) {
879 let key: Arc<str> = message_id.without_brackets().into();
880 let misses = &self.misses;
881
882 // Use atomic upsert - this provides key-level locking
883 let entry = self
884 .cache
885 .entry(key)
886 .and_upsert_with(|maybe_entry| {
887 let new_entry = if let Some(existing) = maybe_entry {
888 let mut entry = existing.into_value();
889 entry.record_backend_missing(backend_id);
890 entry
891 } else {
892 // First 430 for this article - create cache entry with minimal stub
893 let mut entry = ArticleEntry::new(b"430\r\n".to_vec());
894 entry.record_backend_missing(backend_id);
895 entry
896 };
897
898 std::future::ready(new_entry)
899 })
900 .await;
901
902 // Track misses for new entries
903 if entry.is_fresh() {
904 misses.fetch_add(1, Ordering::Relaxed);
905 }
906 }
907
908 /// Sync availability state from local tracker to cache - ATOMIC OPERATION
909 ///
910 /// Uses moka's `entry().and_compute_with()` for atomic get-modify-store.
911 /// This eliminates the race condition of separate get() + insert() calls
912 /// and provides key-level locking for concurrent operations.
913 ///
914 /// This is called ONCE at the end of a retry loop to persist all the
915 /// backends that returned 430 during this request. Much more efficient
916 /// than calling record_backend_missing for each backend individually.
917 ///
918 /// IMPORTANT: Only creates a 430 stub entry if ALL checked backends returned 430.
919 /// If any backend successfully provided the article, we skip creating an entry
920 /// (the actual article will be cached via upsert, which may race with this call).
921 pub async fn sync_availability<'a>(
922 &self,
923 message_id: MessageId<'a>,
924 availability: &ArticleAvailability,
925 ) {
926 use moka::ops::compute::Op;
927
928 // Only sync if we actually tried some backends
929 if availability.checked_bits() == 0 {
930 return;
931 }
932
933 let key: Arc<str> = message_id.without_brackets().into();
934 let availability = *availability; // Copy for the closure
935 let misses = &self.misses;
936
937 // Use atomic compute - allows us to conditionally insert/update
938 let result = self
939 .cache
940 .entry(key)
941 .and_compute_with(|maybe_entry| {
942 let op = if let Some(existing) = maybe_entry {
943 // Merge availability into existing entry
944 let mut entry = existing.into_value();
945 entry.backend_availability.merge_from(&availability);
946 Op::Put(entry)
947 } else {
948 // No existing entry - only create a 430 stub if ALL backends returned 430
949 if availability.any_backend_has_article() {
950 // A backend successfully provided the article.
951 // Don't create a 430 stub - let upsert() handle it with the real article data.
952 Op::Nop
953 } else {
954 // All checked backends returned 430 - create stub to track this
955 let mut entry = ArticleEntry::new(b"430\r\n".to_vec());
956 entry.backend_availability = availability;
957 Op::Put(entry)
958 }
959 };
960
961 std::future::ready(op)
962 })
963 .await;
964
965 // Track misses for new entries
966 if matches!(result, moka::ops::compute::CompResult::Inserted(_)) {
967 misses.fetch_add(1, Ordering::Relaxed);
968 }
969 }
970
971 /// Get cache statistics
972 pub async fn stats(&self) -> CacheStats {
973 CacheStats {
974 entry_count: self.cache.entry_count(),
975 weighted_size: self.cache.weighted_size(),
976 }
977 }
978
979 /// Insert an article entry directly (for testing)
980 ///
981 /// This is a low-level method that bypasses the usual upsert logic.
982 /// Only use this in tests where you need precise control over cache state.
983 #[cfg(test)]
984 pub async fn insert<'a>(&self, message_id: MessageId<'a>, entry: ArticleEntry) {
985 let key: Arc<str> = message_id.without_brackets().into();
986 self.cache.insert(key, entry).await;
987 }
988
989 /// Get maximum cache capacity
990 #[inline]
991 pub fn capacity(&self) -> u64 {
992 self.capacity
993 }
994
995 /// Get current number of cached entries (synchronous)
996 #[inline]
997 pub fn entry_count(&self) -> u64 {
998 self.cache.entry_count()
999 }
1000
1001 /// Get current weighted size in bytes (synchronous)
1002 #[inline]
1003 pub fn weighted_size(&self) -> u64 {
1004 self.cache.weighted_size()
1005 }
1006
1007 /// Get cache hit rate as percentage (0.0 to 100.0)
1008 #[inline]
1009 pub fn hit_rate(&self) -> f64 {
1010 let hits = self.hits.load(Ordering::Relaxed);
1011 let misses = self.misses.load(Ordering::Relaxed);
1012 let total = hits + misses;
1013
1014 if total == 0 {
1015 0.0
1016 } else {
1017 (hits as f64 / total as f64) * 100.0
1018 }
1019 }
1020
1021 /// Run pending background tasks (for testing)
1022 ///
1023 /// Moka performs maintenance tasks (eviction, expiration) asynchronously.
1024 /// This method ensures all pending tasks complete, useful for deterministic testing.
1025 pub async fn sync(&self) {
1026 self.cache.run_pending_tasks().await;
1027 }
1028}
1029
1030/// Cache statistics
1031#[derive(Debug, Clone)]
1032pub struct CacheStats {
1033 pub entry_count: u64,
1034 pub weighted_size: u64,
1035}
1036
1037#[cfg(test)]
1038mod tests {
1039 use super::*;
1040 use crate::types::MessageId;
1041 use std::time::Duration;
1042
1043 fn create_test_article(msgid: &str) -> ArticleEntry {
1044 let buffer = format!("220 0 {}\r\nSubject: Test\r\n\r\nBody\r\n.\r\n", msgid).into_bytes();
1045 ArticleEntry::new(buffer)
1046 }
1047
1048 #[test]
1049 fn test_backend_availability_basic() {
1050 let mut avail = ArticleAvailability::new();
1051 let b0 = BackendId::from_index(0);
1052 let b1 = BackendId::from_index(1);
1053
1054 // Default: assume all backends have it
1055 assert!(avail.should_try(b0));
1056 assert!(avail.should_try(b1));
1057
1058 // Record b0 as missing (returned 430)
1059 avail.record_missing(b0);
1060 assert!(!avail.should_try(b0)); // Should not try again
1061 assert!(avail.should_try(b1)); // Still should try
1062
1063 // Record b1 as missing too
1064 avail.record_missing(b1);
1065 assert!(!avail.should_try(b1));
1066 }
1067
1068 #[test]
1069 fn test_any_backend_has_article() {
1070 let mut avail = ArticleAvailability::new();
1071 let b0 = BackendId::from_index(0);
1072 let b1 = BackendId::from_index(1);
1073
1074 // Empty availability - no backend has it (nothing checked)
1075 assert!(!avail.any_backend_has_article());
1076
1077 // Record b0 as missing - still none have it
1078 avail.record_missing(b0);
1079 assert!(!avail.any_backend_has_article());
1080
1081 // Record b1 as having it - now one has it
1082 avail.record_has(b1);
1083 assert!(avail.any_backend_has_article());
1084
1085 // Record b1 as missing (overwrite) - none have it again
1086 avail.record_missing(b1);
1087 assert!(!avail.any_backend_has_article());
1088 }
1089
1090 #[test]
1091 fn test_record_has_clears_missing_bit() {
1092 let mut avail = ArticleAvailability::new();
1093 let b0 = BackendId::from_index(0);
1094
1095 // First mark as missing
1096 avail.record_missing(b0);
1097 assert!(avail.is_missing(b0));
1098 assert!(!avail.any_backend_has_article());
1099
1100 // Now mark as has - should clear missing
1101 avail.record_has(b0);
1102 assert!(!avail.is_missing(b0));
1103 assert!(avail.any_backend_has_article());
1104 }
1105
1106 #[test]
1107 fn test_merge_from_430_overrides_has() {
1108 // NNTP SEMANTICS:
1109 // - 430 "No Such Article" is AUTHORITATIVE - servers NEVER give false negatives
1110 // - 2xx success is UNRELIABLE - servers CAN give false positives
1111 // Therefore: 430 always wins over previous "has" state
1112 let mut cache_entry = ArticleAvailability::new();
1113 let b0 = BackendId::from_index(0);
1114 let b1 = BackendId::from_index(1);
1115
1116 // Previous request got success from backend 0 (might be false positive)
1117 cache_entry.record_has(b0);
1118 assert!(!cache_entry.is_missing(b0));
1119 assert!(cache_entry.any_backend_has_article());
1120
1121 // Later request gets 430 from backend 0 (AUTHORITATIVE)
1122 let mut new_result = ArticleAvailability::new();
1123 new_result.record_missing(b0); // 430 is definitive
1124 new_result.record_missing(b1); // Backend 1 also returned 430
1125
1126 // Merge new results into cache entry
1127 cache_entry.merge_from(&new_result);
1128
1129 // CRITICAL: 430 is authoritative, so it MUST override the previous "has"
1130 // The earlier success might have been a false positive (corrupt data, etc.)
1131 assert!(
1132 cache_entry.is_missing(b0),
1133 "430 must override previous has - 430 is authoritative"
1134 );
1135 assert!(!cache_entry.any_backend_has_article());
1136
1137 // Backend 1 should also be marked as missing
1138 assert!(
1139 cache_entry.is_missing(b1),
1140 "Backend 1 should be marked missing"
1141 );
1142 }
1143
1144 #[test]
1145 fn test_merge_from_can_add_new_missing() {
1146 let mut avail1 = ArticleAvailability::new();
1147 let mut avail2 = ArticleAvailability::new();
1148
1149 // avail1 knows backend 0 is missing
1150 avail1.record_missing(BackendId::from_index(0));
1151
1152 // avail2 knows backend 1 is missing
1153 avail2.record_missing(BackendId::from_index(1));
1154
1155 // Merge avail2 into avail1
1156 avail1.merge_from(&avail2);
1157
1158 // avail1 should now know both are missing
1159 assert!(avail1.is_missing(BackendId::from_index(0)));
1160 assert!(avail1.is_missing(BackendId::from_index(1)));
1161 }
1162
1163 #[test]
1164 fn test_merge_from_has_does_not_clear_missing() {
1165 // NNTP SEMANTICS:
1166 // - 430 is AUTHORITATIVE - if server says "no", article is NOT there
1167 // - 2xx is UNRELIABLE - server might lie (corrupt data, propagation issues)
1168 // Therefore: "has" from new fetch should NOT clear existing 430
1169 let mut cache_state = ArticleAvailability::new();
1170 let b0 = BackendId::from_index(0);
1171 let b1 = BackendId::from_index(1);
1172
1173 // Cache says both backends returned 430 (authoritative)
1174 cache_state.record_missing(b0);
1175 cache_state.record_missing(b1);
1176 assert!(cache_state.is_missing(b0));
1177 assert!(cache_state.is_missing(b1));
1178 assert!(!cache_state.any_backend_has_article());
1179
1180 // New request claims success from backend 0 (but 2xx is unreliable!)
1181 let mut new_fetch = ArticleAvailability::new();
1182 new_fetch.record_has(b0);
1183
1184 // Merge new fetch into cache
1185 cache_state.merge_from(&new_fetch);
1186
1187 // Backend 0 should STILL be missing - we trust the 430 over the 2xx
1188 // because 2xx can be false positive but 430 is never false negative
1189 assert!(
1190 cache_state.is_missing(b0),
1191 "2xx should NOT override 430 - 430 is authoritative"
1192 );
1193 assert!(!cache_state.any_backend_has_article());
1194
1195 // Backend 1 is still missing
1196 assert!(
1197 cache_state.is_missing(b1),
1198 "Backend 1 should still be missing"
1199 );
1200 }
1201
1202 #[test]
1203 fn test_backend_availability_all_exhausted() {
1204 use crate::router::BackendCount;
1205 let mut avail = ArticleAvailability::new();
1206
1207 // None missing yet
1208 assert!(!avail.all_exhausted(BackendCount::new(2)));
1209 assert!(!avail.all_exhausted(BackendCount::new(3)));
1210
1211 // Record backends 0 and 1 as missing
1212 avail.record_missing(BackendId::from_index(0));
1213 avail.record_missing(BackendId::from_index(1));
1214
1215 // All 2 backends exhausted
1216 assert!(avail.all_exhausted(BackendCount::new(2)));
1217
1218 // But not all 3 backends (backend 2 still untried)
1219 assert!(!avail.all_exhausted(BackendCount::new(3)));
1220 }
1221
1222 #[test]
1223 fn test_article_entry_basic() {
1224 let buffer = b"220 0 <test@example.com>\r\nBody\r\n.\r\n".to_vec();
1225 let entry = ArticleEntry::new(buffer.clone());
1226
1227 assert_eq!(entry.status_code(), Some(StatusCode::new(220)));
1228 assert_eq!(entry.buffer().as_ref(), &buffer);
1229
1230 // Default: should try all backends
1231 assert!(entry.should_try_backend(BackendId::from_index(0)));
1232 assert!(entry.should_try_backend(BackendId::from_index(1)));
1233 }
1234
1235 #[test]
1236 fn test_is_complete_article() {
1237 // Stubs should NOT be complete articles
1238 let stub_430 = ArticleEntry::new(b"430\r\n".to_vec());
1239 assert!(!stub_430.is_complete_article());
1240
1241 let stub_220 = ArticleEntry::new(b"220\r\n".to_vec());
1242 assert!(!stub_220.is_complete_article());
1243
1244 let stub_223 = ArticleEntry::new(b"223\r\n".to_vec());
1245 assert!(!stub_223.is_complete_article());
1246
1247 // Full article SHOULD be complete
1248 let full = ArticleEntry::new(
1249 b"220 0 <test@example.com>\r\nSubject: Test\r\n\r\nBody\r\n.\r\n".to_vec(),
1250 );
1251 assert!(full.is_complete_article());
1252
1253 // Wrong status code (not 220) should NOT be complete article
1254 let head_response =
1255 ArticleEntry::new(b"221 0 <test@example.com>\r\nSubject: Test\r\n.\r\n".to_vec());
1256 assert!(!head_response.is_complete_article());
1257
1258 // Missing terminator should NOT be complete
1259 let no_terminator = ArticleEntry::new(
1260 b"220 0 <test@example.com>\r\nSubject: Test\r\n\r\nBody\r\n".to_vec(),
1261 );
1262 assert!(!no_terminator.is_complete_article());
1263 }
1264
1265 #[test]
1266 fn test_article_entry_record_backend_missing() {
1267 let backend0 = BackendId::from_index(0);
1268 let backend1 = BackendId::from_index(1);
1269 let mut entry = create_test_article("<test@example.com>");
1270
1271 // Initially should try both
1272 assert!(entry.should_try_backend(backend0));
1273 assert!(entry.should_try_backend(backend1));
1274
1275 // Record backend1 as missing (430 response)
1276 entry.record_backend_missing(backend1);
1277
1278 // Should still try backend0, but not backend1
1279 assert!(entry.should_try_backend(backend0));
1280 assert!(!entry.should_try_backend(backend1));
1281 }
1282
1283 #[test]
1284 fn test_article_entry_all_backends_exhausted() {
1285 use crate::router::BackendCount;
1286 let backend0 = BackendId::from_index(0);
1287 let backend1 = BackendId::from_index(1);
1288 let mut entry = create_test_article("<test@example.com>");
1289
1290 // Not all exhausted yet
1291 assert!(!entry.all_backends_exhausted(BackendCount::new(2)));
1292
1293 // Record both as missing
1294 entry.record_backend_missing(backend0);
1295 entry.record_backend_missing(backend1);
1296
1297 // Now all 2 backends are exhausted
1298 assert!(entry.all_backends_exhausted(BackendCount::new(2)));
1299 }
1300
1301 #[tokio::test]
1302 async fn test_arc_str_borrow_lookup() {
1303 // Create cache with Arc<str> keys
1304 let cache = ArticleCache::new(100, Duration::from_secs(300), true);
1305
1306 // Create a MessageId and insert an article
1307 let msgid = MessageId::from_borrowed("<test123@example.com>").unwrap();
1308 let article = create_test_article("<test123@example.com>");
1309
1310 cache.insert(msgid.clone(), article.clone()).await;
1311
1312 // Verify we can retrieve using a different MessageId instance (borrowed)
1313 // This demonstrates that Arc<str> supports Borrow<str> lookups via &str
1314 let msgid2 = MessageId::from_borrowed("<test123@example.com>").unwrap();
1315 let retrieved = cache.get(&msgid2).await;
1316
1317 assert!(
1318 retrieved.is_some(),
1319 "Arc<str> cache should support Borrow<str> lookups"
1320 );
1321 assert_eq!(
1322 retrieved.unwrap().buffer().as_ref(),
1323 article.buffer().as_ref(),
1324 "Retrieved article should match inserted article"
1325 );
1326 }
1327
1328 #[tokio::test]
1329 async fn test_cache_hit_miss() {
1330 let cache = ArticleCache::new(100, Duration::from_secs(300), true);
1331
1332 let msgid = MessageId::from_borrowed("<nonexistent@example.com>").unwrap();
1333 let result = cache.get(&msgid).await;
1334
1335 assert!(
1336 result.is_none(),
1337 "Cache lookup for non-existent key should return None"
1338 );
1339 }
1340
1341 #[tokio::test]
1342 async fn test_cache_insert_and_retrieve() {
1343 let cache = ArticleCache::new(10, Duration::from_secs(300), true);
1344
1345 let msgid = MessageId::from_borrowed("<article@example.com>").unwrap();
1346 let article = create_test_article("<article@example.com>");
1347
1348 cache.insert(msgid.clone(), article.clone()).await;
1349
1350 let retrieved = cache.get(&msgid).await.unwrap();
1351 assert_eq!(retrieved.buffer().as_ref(), article.buffer().as_ref());
1352 }
1353
1354 #[tokio::test]
1355 async fn test_cache_upsert_new_entry() {
1356 let cache = ArticleCache::new(100, Duration::from_secs(300), true);
1357
1358 let msgid = MessageId::from_borrowed("<test@example.com>").unwrap();
1359 let buffer = b"220 0 <test@example.com>\r\nBody\r\n.\r\n".to_vec();
1360
1361 cache
1362 .upsert(msgid.clone(), buffer.clone(), BackendId::from_index(0), 0)
1363 .await;
1364
1365 let retrieved = cache.get(&msgid).await.unwrap();
1366 assert_eq!(retrieved.buffer().as_ref(), &buffer);
1367 // Default: should try all backends
1368 assert!(retrieved.should_try_backend(BackendId::from_index(0)));
1369 assert!(retrieved.should_try_backend(BackendId::from_index(1)));
1370 }
1371
1372 #[tokio::test]
1373 async fn test_cache_upsert_existing_entry() {
1374 let cache = ArticleCache::new(100, Duration::from_secs(300), true);
1375
1376 let msgid = MessageId::from_borrowed("<test@example.com>").unwrap();
1377 let buffer = b"220 0 <test@example.com>\r\nBody\r\n.\r\n".to_vec();
1378
1379 // Insert with backend 0
1380 cache
1381 .upsert(msgid.clone(), buffer.clone(), BackendId::from_index(0), 0)
1382 .await;
1383
1384 // Update with backend 1 - does nothing (entry already exists)
1385 cache
1386 .upsert(msgid.clone(), buffer.clone(), BackendId::from_index(1), 0)
1387 .await;
1388
1389 let retrieved = cache.get(&msgid).await.unwrap();
1390 // Default: should try all backends
1391 assert!(retrieved.should_try_backend(BackendId::from_index(0)));
1392 assert!(retrieved.should_try_backend(BackendId::from_index(1)));
1393 }
1394
1395 #[tokio::test]
1396 async fn test_record_backend_missing() {
1397 let cache = ArticleCache::new(100, Duration::from_secs(300), true);
1398
1399 let msgid = MessageId::from_borrowed("<test@example.com>").unwrap();
1400 let article = create_test_article("<test@example.com>");
1401
1402 cache.insert(msgid.clone(), article).await;
1403
1404 // Record backend 1 as missing
1405 cache
1406 .record_backend_missing(msgid.clone(), BackendId::from_index(1))
1407 .await;
1408
1409 let retrieved = cache.get(&msgid).await.unwrap();
1410 // Backend 0 should still be tried, backend 1 should not
1411 assert!(retrieved.should_try_backend(BackendId::from_index(0)));
1412 assert!(!retrieved.should_try_backend(BackendId::from_index(1)));
1413 }
1414
1415 /// CRITICAL BUG FIX TEST: record_backend_missing must create cache entries
1416 /// for articles that don't exist anywhere (all backends return 430).
1417 ///
1418 /// Bug: Previously, if an article wasn't cached, record_backend_missing
1419 /// would silently do nothing. This caused repeated queries to all backends
1420 /// for missing articles, resulting in:
1421 /// - Massive bandwidth waste
1422 /// - SABnzbd reporting "gigabytes of missing articles"
1423 /// - 4xx/5xx error counts not increasing (metrics bug)
1424 #[tokio::test]
1425 async fn test_record_backend_missing_creates_new_entry() {
1426 let cache = ArticleCache::new(100, Duration::from_secs(300), true);
1427
1428 let msgid = MessageId::from_borrowed("<missing@example.com>").unwrap();
1429
1430 // Verify article is NOT in cache
1431 assert!(cache.get(&msgid).await.is_none());
1432
1433 // Record backend 0 returned 430
1434 cache
1435 .record_backend_missing(msgid.clone(), BackendId::from_index(0))
1436 .await;
1437
1438 // CRITICAL: Cache entry MUST now exist
1439 let entry = cache
1440 .get(&msgid)
1441 .await
1442 .expect("Cache entry must exist after record_backend_missing");
1443
1444 // Verify backend 0 is marked as missing
1445 assert!(
1446 !entry.should_try_backend(BackendId::from_index(0)),
1447 "Backend 0 should be marked missing"
1448 );
1449
1450 // Verify backend 1 is still available (not tried yet)
1451 assert!(
1452 entry.should_try_backend(BackendId::from_index(1)),
1453 "Backend 1 should still be available"
1454 );
1455
1456 // Verify the cached response is a 430 stub
1457 assert_eq!(
1458 entry.buffer().as_ref(),
1459 b"430\r\n",
1460 "Cached buffer should be a 430 stub"
1461 );
1462
1463 // Record backend 1 also returned 430
1464 cache
1465 .record_backend_missing(msgid.clone(), BackendId::from_index(1))
1466 .await;
1467
1468 let entry = cache.get(&msgid).await.unwrap();
1469
1470 // Now both backends should be marked missing
1471 assert!(!entry.should_try_backend(BackendId::from_index(0)));
1472 assert!(!entry.should_try_backend(BackendId::from_index(1)));
1473
1474 // Verify all backends exhausted
1475 use crate::router::BackendCount;
1476 assert!(
1477 entry.all_backends_exhausted(BackendCount::new(2)),
1478 "All backends should be exhausted"
1479 );
1480 }
1481
1482 #[tokio::test]
1483 async fn test_cache_stats() {
1484 let cache = ArticleCache::new(1024 * 1024, Duration::from_secs(300), true); // 1MB
1485
1486 // Initial stats
1487 let stats = cache.stats().await;
1488 assert_eq!(stats.entry_count, 0);
1489
1490 // Insert one article
1491 let msgid = MessageId::from_borrowed("<test@example.com>").unwrap();
1492 let article = create_test_article("<test@example.com>");
1493 cache.insert(msgid, article).await;
1494
1495 // Wait for background tasks
1496 cache.sync().await;
1497
1498 // Check stats again
1499 let stats = cache.stats().await;
1500 assert_eq!(stats.entry_count, 1);
1501 }
1502
1503 #[tokio::test]
1504 async fn test_cache_ttl_expiration() {
1505 let cache = ArticleCache::new(1024 * 1024, Duration::from_millis(50), true); // 1MB
1506
1507 let msgid = MessageId::from_borrowed("<expire@example.com>").unwrap();
1508 let article = create_test_article("<expire@example.com>");
1509
1510 cache.insert(msgid.clone(), article).await;
1511
1512 // Should be cached immediately
1513 assert!(cache.get(&msgid).await.is_some());
1514
1515 // Wait for TTL expiration + sync
1516 tokio::time::sleep(Duration::from_millis(100)).await;
1517 cache.sync().await;
1518
1519 // Should be expired
1520 assert!(cache.get(&msgid).await.is_none());
1521 }
1522
1523 #[tokio::test]
1524 async fn test_insert_respects_cache_articles_flag() {
1525 // Test with cache_articles=false - should store minimal stub
1526 let cache_stub = ArticleCache::new(1024 * 1024, Duration::from_secs(300), false);
1527
1528 let msgid = MessageId::from_borrowed("<test@example.com>").unwrap();
1529 let buffer = b"220 0 <test@example.com>\r\nSubject: Test\r\n\r\nBody\r\n.\r\n".to_vec();
1530 let full_size = buffer.len();
1531
1532 cache_stub
1533 .upsert(msgid.clone(), buffer, BackendId::from_index(0), 0)
1534 .await;
1535 cache_stub.sync().await;
1536
1537 let retrieved = cache_stub.get(&msgid).await.unwrap();
1538 let stub_size = retrieved.buffer().len();
1539
1540 // Stub should be much smaller than full article (just "220\r\n")
1541 assert!(
1542 stub_size < 10,
1543 "Stub size {} should be < 10 bytes",
1544 stub_size
1545 );
1546 assert!(
1547 stub_size < full_size,
1548 "Stub {} should be smaller than full {}",
1549 stub_size,
1550 full_size
1551 );
1552
1553 // Test with cache_articles=true - should store full article
1554 let cache_full = ArticleCache::new(1024 * 1024, Duration::from_secs(300), true);
1555
1556 let msgid2 = MessageId::from_borrowed("<test2@example.com>").unwrap();
1557 let buffer2 = b"220 0 <test2@example.com>\r\nSubject: Test2\r\n\r\nBody2\r\n.\r\n".to_vec();
1558 let original_size = buffer2.len();
1559
1560 cache_full
1561 .upsert(msgid2.clone(), buffer2, BackendId::from_index(0), 0)
1562 .await;
1563 cache_full.sync().await;
1564
1565 let retrieved2 = cache_full.get(&msgid2).await.unwrap();
1566
1567 // Should store full article
1568 assert_eq!(retrieved2.buffer().len(), original_size);
1569 }
1570
1571 #[tokio::test]
1572 async fn test_cache_capacity_limit() {
1573 let cache = ArticleCache::new(500, Duration::from_secs(300), true); // 500 bytes total
1574
1575 // Insert 3 articles (exceeds capacity)
1576 for i in 1..=3 {
1577 let msgid_str = format!("<article{}@example.com>", i);
1578 let msgid = MessageId::new(msgid_str).unwrap();
1579 let article = create_test_article(msgid.as_ref());
1580 cache.insert(msgid, article).await;
1581 cache.sync().await; // Force eviction
1582 }
1583
1584 // Wait for eviction to complete
1585 tokio::time::sleep(Duration::from_millis(10)).await;
1586 cache.sync().await;
1587
1588 let stats = cache.stats().await;
1589 assert!(
1590 stats.entry_count <= 3,
1591 "Cache should have at most 3 entries with 500 byte capacity"
1592 );
1593 }
1594
1595 #[tokio::test]
1596 async fn test_article_entry_clone() {
1597 let article = create_test_article("<test@example.com>");
1598
1599 let cloned = article.clone();
1600 assert_eq!(article.buffer().as_ref(), cloned.buffer().as_ref());
1601 assert!(Arc::ptr_eq(article.buffer(), cloned.buffer()));
1602 }
1603
1604 #[tokio::test]
1605 async fn test_cache_clone() {
1606 let cache1 = ArticleCache::new(1024 * 1024, Duration::from_secs(300), true); // 1MB
1607 let cache2 = cache1.clone();
1608
1609 let msgid = MessageId::from_borrowed("<test@example.com>").unwrap();
1610 let article = create_test_article("<test@example.com>");
1611
1612 cache1.insert(msgid.clone(), article).await;
1613 cache1.sync().await;
1614
1615 // Should be accessible from cloned cache
1616 assert!(cache2.get(&msgid).await.is_some());
1617 }
1618
1619 #[tokio::test]
1620 async fn test_weigher_large_articles() {
1621 // Test that large article bodies use ACTUAL SIZE (no multiplier)
1622 // when cache_articles=true and buffer >10KB
1623 let cache = ArticleCache::new(10 * 1024 * 1024, Duration::from_secs(300), true); // 10MB capacity
1624
1625 // Create a 750KB article (typical size)
1626 let body = vec![b'X'; 750_000];
1627 let response = format!(
1628 "222 0 <test@example.com>\r\n{}\r\n.\r\n",
1629 std::str::from_utf8(&body).unwrap()
1630 );
1631 let article = ArticleEntry::new(response.as_bytes().to_vec());
1632
1633 let msgid = MessageId::from_borrowed("<test@example.com>").unwrap();
1634 cache.insert(msgid.clone(), article).await;
1635 cache.sync().await;
1636
1637 // With actual size (no multiplier): 750KB per entry
1638 // 10MB capacity should fit ~13 articles
1639 // With old 1.8x multiplier: 750KB * 1.8 ≈ 1.35MB per entry, fits ~7 articles
1640 // With old 2.5x multiplier: 750KB * 2.5 ≈ 1.875MB per entry, fits ~5 articles
1641
1642 // Insert 12 more articles (13 total)
1643 for i in 2..=13 {
1644 let msgid_str = format!("<article{}@example.com>", i);
1645 let msgid = MessageId::new(msgid_str).unwrap();
1646 let response = format!(
1647 "222 0 {}\r\n{}\r\n.\r\n",
1648 msgid.as_str(),
1649 std::str::from_utf8(&body).unwrap()
1650 );
1651 let article = ArticleEntry::new(response.as_bytes().to_vec());
1652 cache.insert(msgid, article).await;
1653 cache.sync().await;
1654 }
1655
1656 tokio::time::sleep(Duration::from_millis(50)).await;
1657 cache.sync().await;
1658
1659 let stats = cache.stats().await;
1660 // With actual size (no multiplier), should fit 11-13 large articles
1661 assert!(
1662 stats.entry_count >= 11,
1663 "Cache should fit at least 11 large articles with actual size (no multiplier) (got {})",
1664 stats.entry_count
1665 );
1666 }
1667
1668 #[tokio::test]
1669 async fn test_weigher_small_stubs() {
1670 // Test that small stubs account for moka internal overhead correctly
1671 // With MOKA_OVERHEAD = 2000 bytes (based on empirical 10x memory ratio from moka issue #473)
1672 let cache = ArticleCache::new(1_000_000, Duration::from_secs(300), false); // 1MB capacity
1673
1674 // Create small stub (53 bytes)
1675 let stub = b"223 0 <test@example.com>\r\n".to_vec();
1676 let article = ArticleEntry::new(stub);
1677
1678 let msgid = MessageId::from_borrowed("<test@example.com>").unwrap();
1679 cache.insert(msgid, article).await;
1680 cache.sync().await;
1681
1682 // With MOKA_OVERHEAD = 2000: stub + Arc + availability + overhead
1683 // ~53 + 68 + 40 + 2000 = ~2161 bytes per small stub
1684 // With 2.5x small stub multiplier: ~5400 bytes per stub
1685 // 1MB capacity should fit ~185 stubs
1686
1687 // Insert many small stubs
1688 for i in 2..=200 {
1689 let msgid_str = format!("<stub{}@example.com>", i);
1690 let msgid = MessageId::new(msgid_str).unwrap();
1691 let stub = format!("223 0 {}\r\n", msgid.as_str());
1692 let article = ArticleEntry::new(stub.as_bytes().to_vec());
1693 cache.insert(msgid, article).await;
1694 }
1695
1696 cache.sync().await;
1697 tokio::time::sleep(Duration::from_millis(50)).await;
1698 cache.sync().await;
1699
1700 let stats = cache.stats().await;
1701 // Should be able to fit ~150-185 small stubs in 1MB
1702 assert!(
1703 stats.entry_count >= 100,
1704 "Cache should fit many small stubs (got {})",
1705 stats.entry_count
1706 );
1707 }
1708
1709 #[tokio::test]
1710 async fn test_cache_with_owned_message_id() {
1711 let cache = ArticleCache::new(1024 * 1024, Duration::from_secs(300), true); // 1MB
1712
1713 // Use owned MessageId
1714 let msgid = MessageId::new("<owned@example.com>".to_string()).unwrap();
1715 let article = create_test_article("<owned@example.com>");
1716
1717 cache.insert(msgid.clone(), article).await;
1718
1719 // Retrieve with borrowed MessageId
1720 let borrowed_msgid = MessageId::from_borrowed("<owned@example.com>").unwrap();
1721 assert!(cache.get(&borrowed_msgid).await.is_some());
1722 }
1723}