Skip to main content

nntp_proxy/cache/
article.rs

1//! Article caching implementation using LRU cache with TTL
2//!
3//! This module provides article caching with per-backend availability tracking.
4//!
5//! # NNTP Response Semantics (CRITICAL)
6//!
7//! **430 "No Such Article" is AUTHORITATIVE** - NNTP servers NEVER give false negatives.
8//! If a server returns 430, the article is definitively not present on that server.
9//! This is a fundamental property of NNTP: servers only return 430 when they are
10//! certain they don't have the article.
11//!
12//! **2xx success responses are UNRELIABLE** - servers CAN give false positives.
13//! A server might return 220/222/223 but then provide:
14//! - Corrupt or incomplete article data
15//! - Truncated responses due to connection issues
16//! - Stale data that gets cleaned up moments later
17//!
18//! **Therefore: 430 (missing) ALWAYS takes precedence over "has" state.**
19//! - When merging availability info, 430s accumulate and never get cleared
20//! - A previous "has" can be overwritten by a later 430
21//! - A previous 430 should NOT be overwritten by a later "has"
22//! - Cache entries with 430 persist until TTL expiry
23//!
24//! **BACKEND LIMIT**: Maximum 8 backends supported due to u8 bitset optimization.
25//! This limit is enforced at config validation time.
26
27use crate::protocol::StatusCode;
28use crate::router::BackendCount;
29use crate::types::{BackendId, MessageId};
30use moka::future::Cache;
31use std::sync::Arc;
32use std::sync::atomic::{AtomicU64, Ordering};
33use std::time::Duration;
34
35use super::ttl;
36
37/// Maximum number of backends supported by ArticleAvailability bitset
38pub const MAX_BACKENDS: usize = 8;
39
40/// Track which backends have (or don't have) a specific article
41///
42/// Uses two u8 bitsets to track availability across up to 8 backends:
43/// - `checked`: Which backends we've queried (attempted to fetch from)
44/// - `missing`: Which backends returned 430 (don't have this article)
45///
46/// # Example with 2 backends
47/// - Initial state: `checked=00`, `missing=00` (haven't tried any backends yet)
48/// - After backend 0 returns 430: `checked=01`, `missing=01` (backend 0 doesn't have it)
49/// - After backend 1 returns 220: `checked=11`, `missing=01` (backend 1 has it)
50/// - If both return 430: `checked=11`, `missing=11` (all backends exhausted)
51///
52/// # Usage Pattern
53/// This type serves TWO critical purposes:
54///
55/// 1. **Cache persistence** - Track availability across requests (long-lived)
56///    - Store in cache with article data
57///    - Avoid querying backends known to be missing
58///    - Updated after every successful/failed fetch
59///
60/// 2. **430 retry loop** - Track which backends tried during single request (transient)
61///    - Create fresh instance for each ARTICLE request
62///    - Mark backends as missing when they return 430
63///    - Stop when all backends exhausted or one succeeds
64///
65/// # Thread Safety
66/// Wrapped in `Arc<Mutex<>>` when stored in cache entries for concurrent updates.
67/// The precheck pattern ensures serial updates to avoid races:
68/// 1. Query all backends concurrently
69/// 2. Wait for all to complete
70/// 3. Update cache serially with all results
71///
72/// Status of a backend for a specific article
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub enum BackendStatus {
75    /// Backend hasn't been checked yet
76    Unknown,
77    /// Backend was checked and returned 430 (doesn't have article)
78    Missing,
79    /// Backend was checked and has the article
80    HasArticle,
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84pub struct ArticleAvailability {
85    /// Bitset of backends we've checked (tried to fetch from)
86    checked: u8,
87    /// Bitset of backends that DON'T have this article (returned 430)
88    missing: u8, // u8 supports up to 8 backends (plenty for NNTP)
89}
90
91impl ArticleAvailability {
92    /// Create empty availability - assume all backends have article until proven otherwise
93    #[inline]
94    pub const fn new() -> Self {
95        Self {
96            checked: 0,
97            missing: 0,
98        }
99    }
100
101    /// Record that a backend returned 430 (doesn't have the article)
102    ///
103    /// # NNTP Semantics
104    /// **430 is AUTHORITATIVE** - this is a definitive "no". Once marked missing,
105    /// the backend should not be retried for this article until cache TTL expires.
106    /// See module-level docs for full explanation.
107    ///
108    /// # Panics (debug builds only)
109    /// Panics if backend_id >= 8. Config validation enforces max 8 backends.
110    #[inline]
111    pub fn record_missing(&mut self, backend_id: BackendId) -> &mut Self {
112        let idx = backend_id.as_index();
113        debug_assert!(
114            idx < MAX_BACKENDS,
115            "Backend index {} exceeds MAX_BACKENDS ({})",
116            idx,
117            MAX_BACKENDS
118        );
119        self.checked |= 1u8 << idx; // Mark as checked
120        self.missing |= 1u8 << idx; // Mark as missing
121        self
122    }
123
124    /// Record that a backend returned a success response (2xx) for this article
125    ///
126    /// # NNTP Semantics Warning
127    /// **2xx responses are UNRELIABLE** - servers can give false positives.
128    /// This clears the missing bit for fresh `ArticleAvailability` instances,
129    /// but when merged via `merge_from()`, existing 430s take precedence.
130    /// See module-level docs for full explanation.
131    ///
132    /// # Panics (debug builds only)
133    /// Panics if backend_id >= 8. Config validation enforces max 8 backends.
134    #[inline]
135    pub fn record_has(&mut self, backend_id: BackendId) -> &mut Self {
136        let idx = backend_id.as_index();
137        debug_assert!(
138            idx < MAX_BACKENDS,
139            "Backend index {} exceeds MAX_BACKENDS ({})",
140            idx,
141            MAX_BACKENDS
142        );
143        self.checked |= 1u8 << idx; // Mark as checked
144        self.missing &= !(1u8 << idx); // Clear missing bit (has the article)
145        self
146    }
147
148    /// Merge another availability's state into this one
149    ///
150    /// Used to sync local availability tracking back to cache.
151    /// Takes the union of checked backends and missing backends.
152    ///
153    /// # NNTP Semantics (CRITICAL)
154    ///
155    /// **430 responses are AUTHORITATIVE** - NNTP servers NEVER give false negatives.
156    /// If a server says "no such article" (430), the article is definitively not there.
157    ///
158    /// **2xx responses are UNRELIABLE** - servers CAN give false positives.
159    /// A server might claim to have an article but return corrupt data, or the
160    /// article might be incomplete/unavailable due to propagation delays.
161    ///
162    /// Therefore: `missing` state ALWAYS wins over `has` state.
163    /// We trust 430s absolutely but treat successes with skepticism.
164    #[inline]
165    pub fn merge_from(&mut self, other: &Self) {
166        // Simple union: trust all 430s from both sources
167        // 430 is authoritative, so missing bits should accumulate
168        self.checked |= other.checked;
169        self.missing |= other.missing;
170    }
171
172    /// Check if a backend is known to be missing (returned 430)
173    ///
174    /// # Panics (debug builds only)
175    /// Panics if backend_id >= 8. Config validation enforces max 8 backends.
176    #[inline]
177    pub fn is_missing(&self, backend_id: BackendId) -> bool {
178        let idx = backend_id.as_index();
179        debug_assert!(
180            idx < MAX_BACKENDS,
181            "Backend index {} exceeds MAX_BACKENDS ({})",
182            idx,
183            MAX_BACKENDS
184        );
185        self.missing & (1u8 << idx) != 0
186    }
187
188    /// Check if we should attempt to fetch from this backend
189    ///
190    /// Returns `true` if backend might have the article (not yet marked missing).
191    ///
192    /// # Panics (debug builds only)
193    /// Panics if backend_id >= 8. Config validation enforces max 8 backends.
194    #[inline]
195    pub fn should_try(&self, backend_id: BackendId) -> bool {
196        !self.is_missing(backend_id)
197    }
198
199    /// Get the raw missing bitset for debugging
200    #[inline]
201    pub const fn missing_bits(&self) -> u8 {
202        self.missing
203    }
204
205    /// Get the raw checked bitset for debugging
206    #[inline]
207    pub const fn checked_bits(&self) -> u8 {
208        self.checked
209    }
210
211    /// Check if all backends in the pool have been tried and returned 430
212    ///
213    /// Check if all backends have been tried and returned 430
214    ///
215    /// # Panics (debug builds only)
216    /// Panics if backend_count > 8. Config validation enforces max 8 backends.
217    #[inline]
218    pub fn all_exhausted(&self, backend_count: BackendCount) -> bool {
219        let count = backend_count.get();
220        debug_assert!(
221            count <= MAX_BACKENDS,
222            "Backend count {} exceeds MAX_BACKENDS ({})",
223            count,
224            MAX_BACKENDS
225        );
226        match count {
227            0 => true,
228            8 => self.missing == 0xFF,
229            n => self.missing & ((1u8 << n) - 1) == (1u8 << n) - 1,
230        }
231    }
232
233    /// Get an iterator over backends that should still be tried
234    ///
235    /// Returns backend IDs that haven't been marked missing yet.
236    pub fn available_backends(
237        &self,
238        backend_count: BackendCount,
239    ) -> impl Iterator<Item = BackendId> + '_ {
240        (0..backend_count.get().min(MAX_BACKENDS))
241            .map(BackendId::from_index)
242            .filter(move |&backend_id| self.should_try(backend_id))
243    }
244
245    /// Get the underlying bitset value (for debugging)
246    #[inline]
247    pub const fn as_u8(&self) -> u8 {
248        self.missing
249    }
250
251    /// Reconstruct from raw bitset values (used for deserialization)
252    ///
253    /// # Safety
254    /// The caller must ensure the bits represent valid backend states.
255    /// This is primarily used when deserializing from disk cache.
256    #[inline]
257    pub const fn from_bits(checked: u8, missing: u8) -> Self {
258        Self { checked, missing }
259    }
260
261    /// Check if we have any backend availability information
262    ///
263    /// Returns true if at least one backend has been checked.
264    /// If this returns false, we haven't tried any backends yet and shouldn't
265    /// serve from cache (should try backends first).
266    #[inline]
267    pub const fn has_availability_info(&self) -> bool {
268        self.checked != 0
269    }
270
271    /// Check if any backend is known to HAVE the article
272    ///
273    /// Returns true if at least one backend was checked and did NOT return 430.
274    /// This is the inverse check from `all_exhausted` - at least one success.
275    #[inline]
276    pub const fn any_backend_has_article(&self) -> bool {
277        // A backend "has" the article if it's checked but not missing
278        // checked & !missing gives us the backends that have it
279        (self.checked & !self.missing) != 0
280    }
281
282    /// Query backend availability status
283    ///
284    /// # Panics (debug builds only)
285    /// Panics if backend_id >= 8. Config validation enforces max 8 backends.
286    #[inline]
287    pub fn status(&self, backend_id: BackendId) -> BackendStatus {
288        let idx = backend_id.as_index();
289        debug_assert!(
290            idx < MAX_BACKENDS,
291            "Backend index {} exceeds MAX_BACKENDS ({})",
292            idx,
293            MAX_BACKENDS
294        );
295        let mask = 1u8 << idx;
296        if self.checked & mask == 0 {
297            BackendStatus::Unknown
298        } else if self.missing & mask != 0 {
299            BackendStatus::Missing
300        } else {
301            BackendStatus::HasArticle
302        }
303    }
304}
305
306impl Default for ArticleAvailability {
307    fn default() -> Self {
308        Self::new()
309    }
310}
311
312/// Cache entry for an article
313///
314/// Stores complete NNTP response buffer plus backend availability tracking.
315/// The buffer is validated once on insert, then can be served directly without re-parsing.
316#[derive(Clone, Debug)]
317pub struct ArticleEntry {
318    /// Backend availability bitset (2 bytes)
319    ///
320    /// No mutex needed: moka clones entries on get(), and updates go through
321    /// cache.insert() which replaces the whole entry atomically.
322    backend_availability: ArticleAvailability,
323
324    /// Complete response buffer (Arc for cheap cloning)
325    /// Format: `220 <msgid>\r\n<headers>\r\n\r\n<body>\r\n.\r\n`
326    /// Status code is always at bytes [0..3]
327    buffer: Arc<Vec<u8>>,
328
329    /// Tier of the backend that provided this article
330    /// Used for tier-aware TTL: higher tier = longer TTL
331    tier: u8,
332
333    /// Unix timestamp when this entry was inserted (milliseconds since epoch)
334    /// Populated via `ttl::now_millis()` and used with `tier` for TTL expiration
335    inserted_at: u64,
336}
337
338impl ArticleEntry {
339    /// Create from response buffer
340    ///
341    /// The buffer should be a complete NNTP response (status line + data + terminator).
342    /// No validation is performed here - caller must ensure buffer is valid.
343    ///
344    /// Backend availability starts with assumption all backends have the article.
345    /// Tier defaults to 0, timestamp is set to now.
346    pub fn new(buffer: Vec<u8>) -> Self {
347        Self {
348            backend_availability: ArticleAvailability::new(),
349            buffer: Arc::new(buffer),
350            tier: 0,
351            inserted_at: ttl::now_millis(),
352        }
353    }
354
355    /// Create from response buffer with a specific tier
356    ///
357    /// Used when caching articles from backends with known tier.
358    pub fn with_tier(buffer: Vec<u8>, tier: u8) -> Self {
359        Self {
360            backend_availability: ArticleAvailability::new(),
361            buffer: Arc::new(buffer),
362            tier,
363            inserted_at: ttl::now_millis(),
364        }
365    }
366
367    /// Create from pre-wrapped Arc buffer
368    ///
369    /// Use this when the buffer is already wrapped in Arc to avoid double wrapping.
370    pub fn from_arc(buffer: Arc<Vec<u8>>) -> Self {
371        Self {
372            backend_availability: ArticleAvailability::new(),
373            buffer,
374            tier: 0,
375            inserted_at: ttl::now_millis(),
376        }
377    }
378
379    /// Check if this entry has expired based on tier-aware TTL
380    ///
381    /// See [`super::ttl`] for the TTL formula.
382    #[inline]
383    pub fn is_expired(&self, base_ttl_millis: u64) -> bool {
384        ttl::is_expired(self.inserted_at, base_ttl_millis, self.tier)
385    }
386
387    /// Get the tier of the backend that provided this article
388    #[inline]
389    pub fn tier(&self) -> u8 {
390        self.tier
391    }
392
393    /// Set the tier (used when updating entry)
394    #[inline]
395    pub fn set_tier(&mut self, tier: u8) {
396        self.tier = tier;
397    }
398
399    /// Get raw buffer for serving to client
400    #[inline]
401    pub fn buffer(&self) -> &Arc<Vec<u8>> {
402        &self.buffer
403    }
404
405    /// Get status code from the response buffer
406    ///
407    /// Parses the first 3 bytes as the status code.
408    /// Returns None if buffer is too short or invalid.
409    #[inline]
410    pub fn status_code(&self) -> Option<StatusCode> {
411        StatusCode::parse(&self.buffer)
412    }
413
414    /// Check if we should try fetching from this backend
415    ///
416    /// Returns false if backend is known to not have this article (returned 430 before).
417    #[inline]
418    pub fn should_try_backend(&self, backend_id: BackendId) -> bool {
419        self.backend_availability.should_try(backend_id)
420    }
421
422    /// Record that a backend returned 430 (doesn't have this article)
423    pub fn record_backend_missing(&mut self, backend_id: BackendId) {
424        self.backend_availability.record_missing(backend_id);
425    }
426
427    /// Record that a backend successfully provided this article
428    pub fn record_backend_has(&mut self, backend_id: BackendId) {
429        self.backend_availability.record_has(backend_id);
430    }
431
432    /// Set backend availability (used for hydrating from hybrid cache)
433    pub fn set_availability(&mut self, availability: ArticleAvailability) {
434        self.backend_availability = availability;
435    }
436
437    /// Check if all backends have been tried and none have the article
438    pub fn all_backends_exhausted(&self, total_backends: BackendCount) -> bool {
439        self.backend_availability.all_exhausted(total_backends)
440    }
441
442    /// Get backends that might have this article
443    pub fn available_backends(&self, total_backends: BackendCount) -> Vec<BackendId> {
444        self.backend_availability
445            .available_backends(total_backends)
446            .collect()
447    }
448
449    /// Get response buffer (backward compatibility)
450    ///
451    /// This provides the same interface as the old CachedArticle.response field.
452    #[inline]
453    pub fn response(&self) -> &Arc<Vec<u8>> {
454        &self.buffer
455    }
456
457    /// Check if this cache entry has useful availability information
458    ///
459    /// Returns true if at least one backend has been tried (marked as missing or has article).
460    /// If false, we haven't tried any backends yet and should run precheck instead of
461    /// serving from cache.
462    #[inline]
463    pub fn has_availability_info(&self) -> bool {
464        self.backend_availability.has_availability_info()
465    }
466
467    /// Check if this cache entry contains a complete article (220) or body (222)
468    ///
469    /// Returns true if:
470    /// 1. Status code is 220 (ARTICLE) or 222 (BODY)
471    /// 2. Buffer contains actual content (not just a stub like "220\r\n")
472    ///
473    /// A complete response ends with ".\r\n" and is significantly longer
474    /// than a stub. Stubs are typically 5-6 bytes (e.g., "220\r\n" or "223\r\n").
475    ///
476    /// This is used when `cache_articles=true` to determine if we can serve
477    /// directly from cache or need to fetch additional data.
478    #[inline]
479    pub fn is_complete_article(&self) -> bool {
480        // Must be a 220 (ARTICLE) or 222 (BODY) response
481        let Some(code) = self.status_code() else {
482            return false;
483        };
484        if code.as_u16() != 220 && code.as_u16() != 222 {
485            return false;
486        }
487
488        // Must have actual content, not just a stub
489        // A stub is typically "220\r\n" (5 bytes) or "222 0 <test@example.com>\r\n" (25-30 bytes)
490        // A real article/body has content + terminator
491        // Minimum valid: "220 0 <x@y>\r\nX: Y\r\n\r\nB\r\n.\r\n" = 30 bytes
492        const MIN_ARTICLE_SIZE: usize = 30;
493        self.buffer.len() >= MIN_ARTICLE_SIZE && self.buffer.ends_with(b".\r\n")
494    }
495
496    /// Get the appropriate response for a command, if this cache entry can serve it
497    ///
498    /// Returns `Some(response_bytes)` if cache can satisfy the command:
499    /// - ARTICLE (220 cached) → returns full cached response
500    /// - BODY (222 cached or 220 cached) → returns cached response  
501    /// - HEAD (221 cached or 220 cached) → returns cached response
502    /// - STAT → synthesizes "223 0 <msg-id>\r\n" (we know article exists)
503    ///
504    /// Returns `None` if cached response can't serve this command type or if
505    /// the cached buffer fails validation.
506    pub fn response_for_command(&self, cmd_verb: &str, message_id: &str) -> Option<Vec<u8>> {
507        let code = self.status_code()?.as_u16();
508
509        match (code, cmd_verb) {
510            // STAT just needs existence confirmation - synthesize response
511            (220..=222, "STAT") => Some(format!("223 0 {}\r\n", message_id).into_bytes()),
512            // Direct match - return cached buffer if valid
513            (220, "ARTICLE") | (222, "BODY") | (221, "HEAD") => {
514                if self.is_valid_response() {
515                    Some(self.buffer.to_vec())
516                } else {
517                    tracing::warn!(
518                        code = code,
519                        len = self.buffer.len(),
520                        "Cached buffer failed validation, discarding"
521                    );
522                    None
523                }
524            }
525            // ARTICLE (220) contains everything, can serve BODY or HEAD requests
526            (220, "BODY" | "HEAD") => {
527                if self.is_valid_response() {
528                    Some(self.buffer.to_vec())
529                } else {
530                    tracing::warn!(
531                        code = code,
532                        len = self.buffer.len(),
533                        "Cached buffer failed validation, discarding"
534                    );
535                    None
536                }
537            }
538            _ => None,
539        }
540    }
541
542    /// Check if buffer contains a valid NNTP multiline response
543    ///
544    /// A valid response must:
545    /// 1. Start with 3 ASCII digits (status code)
546    /// 2. Have CRLF somewhere (line terminator)
547    /// 3. End with .\r\n for multiline responses (220/221/222)
548    #[inline]
549    fn is_valid_response(&self) -> bool {
550        // Must have at least "NNN \r\n.\r\n" = 9 bytes
551        if self.buffer.len() < 9 {
552            return false;
553        }
554
555        // First 3 bytes must be ASCII digits
556        if !self.buffer[0].is_ascii_digit()
557            || !self.buffer[1].is_ascii_digit()
558            || !self.buffer[2].is_ascii_digit()
559        {
560            return false;
561        }
562
563        // Must end with .\r\n for multiline responses
564        if !self.buffer.ends_with(b".\r\n") {
565            return false;
566        }
567
568        // Must have CRLF in first line (status line)
569        memchr::memmem::find(&self.buffer[..self.buffer.len().min(256)], b"\r\n").is_some()
570    }
571
572    /// Check if this entry can serve a given command type
573    ///
574    /// Simpler version of `response_for_command` for boolean checks.
575    #[inline]
576    pub fn matches_command_type_verb(&self, cmd_verb: &str) -> bool {
577        let Some(code) = self.status_code() else {
578            return false;
579        };
580
581        match code.as_u16() {
582            220 => matches!(cmd_verb, "ARTICLE" | "BODY" | "HEAD" | "STAT"),
583            222 => matches!(cmd_verb, "BODY" | "STAT"),
584            221 => matches!(cmd_verb, "HEAD" | "STAT"),
585            _ => false,
586        }
587    }
588
589    /// Initialize availability tracker from this cached entry
590    ///
591    /// Creates a fresh ArticleAvailability with backends marked missing based on
592    /// cached knowledge (backends that previously returned 430).
593    pub fn to_availability(&self, total_backends: BackendCount) -> ArticleAvailability {
594        let mut availability = ArticleAvailability::new();
595
596        // Mark backends we know don't have this article
597        for backend_id in (0..total_backends.get()).map(BackendId::from_index) {
598            if !self.should_try_backend(backend_id) {
599                availability.record_missing(backend_id);
600            }
601        }
602
603        availability
604    }
605}
606
607/// Article cache using LRU eviction with TTL
608///
609/// Uses `Arc<str>` (message ID content without brackets) as key for zero-allocation lookups.
610/// `Arc<str>` implements `Borrow<str>`, allowing `cache.get(&str)` without allocation.
611///
612/// Supports tier-aware TTL: entries from higher tier backends get longer TTLs.
613/// Formula: `effective_ttl = base_ttl * (2 ^ tier)`
614#[derive(Clone, Debug)]
615pub struct ArticleCache {
616    cache: Arc<Cache<Arc<str>, ArticleEntry>>,
617    hits: Arc<AtomicU64>,
618    misses: Arc<AtomicU64>,
619    capacity: u64,
620    cache_articles: bool,
621    /// Base TTL in milliseconds (used for tier-aware expiration via `effective_ttl`)
622    ttl_millis: u64,
623}
624
625impl ArticleCache {
626    /// Create a new article cache
627    ///
628    /// # Arguments
629    /// * `max_capacity` - Maximum cache size in bytes (uses weighted entries)
630    /// * `ttl` - Time-to-live for cached articles
631    /// * `cache_articles` - Whether to cache full article bodies (true) or just availability (false)
632    pub fn new(max_capacity: u64, ttl: Duration, cache_articles: bool) -> Self {
633        // Build cache with byte-based capacity using weigher
634        // max_capacity is total bytes allowed
635        //
636        // IMPORTANT: Moka's time_to_live must be set to the MAXIMUM tier-aware TTL
637        // so that high-tier entries aren't evicted prematurely by Moka.
638        // Our is_expired() method handles per-entry expiration based on tier.
639        //
640        // Moka has a hard limit of ~1000 years for TTL. For very high tier multipliers
641        // (e.g., tier 63 = 2^63), we cap at 100 years which is more than enough.
642        // We handle per-entry expiration ourselves in get() based on actual tier.
643        const MAX_MOKA_TTL: Duration = Duration::from_secs(100 * 365 * 24 * 3600); // 100 years
644
645        // For zero TTL, use Duration::ZERO so entries expire immediately in Moka too
646        let max_tier_ttl = if ttl.is_zero() {
647            Duration::ZERO
648        } else {
649            // Non-zero TTL: multiply by max tier multiplier (2^63) and cap at MAX_MOKA_TTL
650            // Since 2^63 > u32::MAX, the multiplier will overflow u32, so just use MAX_MOKA_TTL
651            MAX_MOKA_TTL
652        };
653        let cache = Cache::builder()
654            .max_capacity(max_capacity)
655            .time_to_live(max_tier_ttl)
656            .weigher(move |key: &Arc<str>, entry: &ArticleEntry| -> u32 {
657                // Calculate actual memory footprint for accurate capacity tracking.
658                //
659                // Memory layout per cache entry:
660                //
661                // Key: Arc<str>
662                //   - Arc control block: 16 bytes (strong_count + weak_count)
663                //   - String data: key.len() bytes
664                //   - Allocator overhead: ~16 bytes (malloc metadata, alignment)
665                //
666                // Value: ArticleEntry
667                //   - Struct inline: 16 bytes (ArticleAvailability: 2 + padding: 6 + Arc ptr: 8)
668                //   - Arc<Vec<u8>> control block: 16 bytes
669                //   - Vec<u8> struct: 24 bytes (ptr + len + capacity)
670                //   - Vec data: buffer.len() bytes
671                //   - Allocator overhead: ~32 bytes (two allocations: Arc+Vec, Vec data)
672                //
673                // Moka internal per-entry overhead is MUCH larger than the data itself:
674                //   - Key stored twice: Bucket.key AND ValueEntry.info.key_hash.key
675                //   - EntryInfo<K> struct: ~72 bytes (atomics, timestamps, counters)
676                //   - LRU deque nodes and frequency sketch entries
677                //   - Timer wheel entries for TTL tracking
678                //   - crossbeam-epoch deferred garbage (can retain 2x entries)
679                //   - HashMap segments with open addressing (~2x load factor)
680                //
681                // Empirical testing shows ~10x actual RSS vs weighted_size().
682                // Our observed ratio: 362MB RSS / 36MB weighted = 10x
683                //
684                const ARC_STR_OVERHEAD: usize = 16 + 16; // Arc control block + allocator
685                const ENTRY_STRUCT: usize = 16; // ArticleEntry inline size
686                const ARC_VEC_OVERHEAD: usize = 16 + 24 + 32; // Arc + Vec struct + allocator
687                // Moka internal structures - empirically measured to address memory reporting gap.
688                // See moka issue #473: https://github.com/moka-rs/moka/issues/473
689                // Observed ratio: 362MB RSS / 36MB weighted_size() = 10x
690                const MOKA_OVERHEAD: usize = 2000;
691
692                let key_size = ARC_STR_OVERHEAD + key.len();
693                let buffer_size = ARC_VEC_OVERHEAD + entry.buffer().len();
694                let base_size = key_size + buffer_size + ENTRY_STRUCT + MOKA_OVERHEAD;
695
696                // Stubs (availability-only entries) have higher relative overhead
697                // due to allocator fragmentation on small allocations.
698                // Complete articles are dominated by content size, so no multiplier needed.
699                let weighted_size = if entry.is_complete_article() {
700                    base_size
701                } else {
702                    // Small allocations have ~50% more overhead from allocator fragmentation.
703                    // Use a 1.5x multiplier, rounding up, to avoid underestimating small entries.
704                    (base_size * 3).div_ceil(2)
705                };
706
707                weighted_size.try_into().unwrap_or(u32::MAX)
708            })
709            .build();
710
711        Self {
712            cache: Arc::new(cache),
713            hits: Arc::new(AtomicU64::new(0)),
714            misses: Arc::new(AtomicU64::new(0)),
715            capacity: max_capacity,
716            cache_articles,
717            ttl_millis: ttl.as_millis() as u64,
718        }
719    }
720
721    /// Get an article from the cache
722    ///
723    /// Accepts any lifetime MessageId and uses the string content (without brackets) as key.
724    ///
725    /// **Zero-allocation**: `without_brackets()` returns `&str`, which moka accepts directly
726    /// for `Arc<str>` keys via the `Borrow<str>` trait. This avoids allocating a new `Arc<str>`
727    /// for every cache lookup. See `test_arc_str_borrow_lookup` test for verification.
728    ///
729    /// **Tier-aware TTL**: Even if moka hasn't expired the entry yet, we check if the entry
730    /// is expired based on tier-aware TTL. Higher tier entries get longer TTLs.
731    pub async fn get<'a>(&self, message_id: &MessageId<'a>) -> Option<ArticleEntry> {
732        // moka::Cache<Arc<str>, V> supports get(&str) via Borrow<str> trait
733        // This is zero-allocation: no Arc<str> is created for the lookup
734        let result = self.cache.get(message_id.without_brackets()).await;
735
736        match result {
737            Some(entry) if !entry.is_expired(self.ttl_millis) => {
738                self.hits.fetch_add(1, Ordering::Relaxed);
739                Some(entry)
740            }
741            Some(_) => {
742                // Entry exists but expired by tier-aware TTL - invalidate and treat as cache miss
743                // Invalidating immediately frees capacity rather than waiting for LRU eviction,
744                // preventing repeated cache misses on the same stale key
745                self.cache.invalidate(message_id.without_brackets()).await;
746                self.misses.fetch_add(1, Ordering::Relaxed);
747                None
748            }
749            None => {
750                self.misses.fetch_add(1, Ordering::Relaxed);
751                None
752            }
753        }
754    }
755
756    /// Upsert cache entry (insert or update) - ATOMIC OPERATION
757    ///
758    /// Uses moka's `entry().and_upsert_with()` for atomic get-modify-store.
759    /// This eliminates the race condition of separate get() + insert() calls
760    /// and provides key-level locking for concurrent operations.
761    ///
762    /// If entry exists: updates the entry and marks backend as having the article
763    /// If entry doesn't exist: inserts new entry
764    ///
765    /// When `cache_articles=false`, extracts status code from buffer and stores minimal stub.
766    /// When `cache_articles=true`, stores full buffer.
767    ///
768    /// The tier is stored with the entry for tier-aware TTL calculation.
769    ///
770    /// CRITICAL: Always re-insert to refresh TTL, and mark backend as having the article.
771    pub async fn upsert<'a>(
772        &self,
773        message_id: MessageId<'a>,
774        buffer: Vec<u8>,
775        backend_id: BackendId,
776        tier: u8,
777    ) {
778        let key: Arc<str> = message_id.without_brackets().into();
779        let cache_articles = self.cache_articles;
780
781        // Prepare the new buffer outside the closure for stub extraction
782        let new_buffer = if cache_articles {
783            buffer
784        } else {
785            self.create_minimal_stub(&buffer)
786        };
787
788        // Wrap in Arc so we can efficiently share/move into closure without clone.
789        // Arc::try_unwrap will give us the Vec back if we're the only owner.
790        let new_buffer = Arc::new(new_buffer);
791
792        // Use atomic upsert - this provides key-level locking and eliminates
793        // the race condition between get() and insert() calls
794        self.cache
795            .entry(key)
796            .and_upsert_with(|maybe_entry| {
797                let new_entry = if let Some(existing) = maybe_entry {
798                    let mut entry = existing.into_value();
799
800                    // Decide whether to update buffer based on completeness
801                    let existing_complete = entry.is_complete_article();
802                    let new_complete = new_buffer.len() >= 30 && new_buffer.ends_with(b".\r\n");
803
804                    let should_replace = match (existing_complete, new_complete) {
805                        (false, true) => true,  // Stub → Complete: Always replace
806                        (true, false) => false, // Complete → Stub: Never replace
807                        (true, true) => new_buffer.len() > entry.buffer.len(), // Both complete: larger wins
808                        (false, false) => new_buffer.len() > entry.buffer.len(), // Both stubs: larger wins
809                    };
810
811                    if should_replace {
812                        // Already wrapped in Arc, just clone the Arc (cheap)
813                        entry.buffer = Arc::clone(&new_buffer);
814                        // Update tier when replacing buffer
815                        entry.tier = tier;
816                    }
817
818                    // Refresh TTL on every successful upsert, independent of buffer replacement
819                    entry.inserted_at = ttl::now_millis();
820
821                    // Mark backend as having the article
822                    entry.record_backend_has(backend_id);
823                    entry
824                } else {
825                    // New entry with tier
826                    let mut entry = ArticleEntry {
827                        backend_availability: ArticleAvailability::new(),
828                        buffer: Arc::clone(&new_buffer),
829                        tier,
830                        inserted_at: ttl::now_millis(),
831                    };
832                    entry.record_backend_has(backend_id);
833                    entry
834                };
835
836                std::future::ready(new_entry)
837            })
838            .await;
839    }
840
841    /// Create minimal stub from response buffer
842    ///
843    /// Extracts the status code from the first line and creates a minimal stub.
844    /// Falls back to "200\r\n" if parsing fails.
845    fn create_minimal_stub(&self, buffer: &[u8]) -> Vec<u8> {
846        // Find first line (status code line)
847        if let Some(end) = buffer.iter().position(|&b| b == b'\n') {
848            // Extract status code (first 3 digits)
849            if end >= 3 {
850                let code = &buffer[..3];
851                // Verify it's actually digits
852                if code.iter().all(|&b| b.is_ascii_digit()) {
853                    return format!("{}\r\n", String::from_utf8_lossy(code)).into_bytes();
854                }
855            }
856        }
857        // Fallback if we can't parse
858        b"200\r\n".to_vec()
859    }
860
861    /// Record that a backend returned 430 for this article - ATOMIC OPERATION
862    ///
863    /// Uses moka's `entry().and_upsert_with()` for atomic get-modify-store.
864    /// This eliminates the race condition of separate get() + insert() calls
865    /// and provides key-level locking for concurrent operations.
866    ///
867    /// If the article is already cached, updates the availability bitset.
868    /// If not cached, creates a new cache entry with a 430 stub.
869    /// This prevents repeated queries to backends that don't have the article.
870    ///
871    /// Note: We don't store the actual backend 430 response because:
872    /// 1. We always send a standardized 430 to clients, never the backend's response
873    /// 2. The only info we need is the availability bitset (which backends returned 430)
874    pub async fn record_backend_missing<'a>(
875        &self,
876        message_id: MessageId<'a>,
877        backend_id: BackendId,
878    ) {
879        let key: Arc<str> = message_id.without_brackets().into();
880        let misses = &self.misses;
881
882        // Use atomic upsert - this provides key-level locking
883        let entry = self
884            .cache
885            .entry(key)
886            .and_upsert_with(|maybe_entry| {
887                let new_entry = if let Some(existing) = maybe_entry {
888                    let mut entry = existing.into_value();
889                    entry.record_backend_missing(backend_id);
890                    entry
891                } else {
892                    // First 430 for this article - create cache entry with minimal stub
893                    let mut entry = ArticleEntry::new(b"430\r\n".to_vec());
894                    entry.record_backend_missing(backend_id);
895                    entry
896                };
897
898                std::future::ready(new_entry)
899            })
900            .await;
901
902        // Track misses for new entries
903        if entry.is_fresh() {
904            misses.fetch_add(1, Ordering::Relaxed);
905        }
906    }
907
908    /// Sync availability state from local tracker to cache - ATOMIC OPERATION
909    ///
910    /// Uses moka's `entry().and_compute_with()` for atomic get-modify-store.
911    /// This eliminates the race condition of separate get() + insert() calls
912    /// and provides key-level locking for concurrent operations.
913    ///
914    /// This is called ONCE at the end of a retry loop to persist all the
915    /// backends that returned 430 during this request. Much more efficient
916    /// than calling record_backend_missing for each backend individually.
917    ///
918    /// IMPORTANT: Only creates a 430 stub entry if ALL checked backends returned 430.
919    /// If any backend successfully provided the article, we skip creating an entry
920    /// (the actual article will be cached via upsert, which may race with this call).
921    pub async fn sync_availability<'a>(
922        &self,
923        message_id: MessageId<'a>,
924        availability: &ArticleAvailability,
925    ) {
926        use moka::ops::compute::Op;
927
928        // Only sync if we actually tried some backends
929        if availability.checked_bits() == 0 {
930            return;
931        }
932
933        let key: Arc<str> = message_id.without_brackets().into();
934        let availability = *availability; // Copy for the closure
935        let misses = &self.misses;
936
937        // Use atomic compute - allows us to conditionally insert/update
938        let result = self
939            .cache
940            .entry(key)
941            .and_compute_with(|maybe_entry| {
942                let op = if let Some(existing) = maybe_entry {
943                    // Merge availability into existing entry
944                    let mut entry = existing.into_value();
945                    entry.backend_availability.merge_from(&availability);
946                    Op::Put(entry)
947                } else {
948                    // No existing entry - only create a 430 stub if ALL backends returned 430
949                    if availability.any_backend_has_article() {
950                        // A backend successfully provided the article.
951                        // Don't create a 430 stub - let upsert() handle it with the real article data.
952                        Op::Nop
953                    } else {
954                        // All checked backends returned 430 - create stub to track this
955                        let mut entry = ArticleEntry::new(b"430\r\n".to_vec());
956                        entry.backend_availability = availability;
957                        Op::Put(entry)
958                    }
959                };
960
961                std::future::ready(op)
962            })
963            .await;
964
965        // Track misses for new entries
966        if matches!(result, moka::ops::compute::CompResult::Inserted(_)) {
967            misses.fetch_add(1, Ordering::Relaxed);
968        }
969    }
970
971    /// Get cache statistics
972    pub async fn stats(&self) -> CacheStats {
973        CacheStats {
974            entry_count: self.cache.entry_count(),
975            weighted_size: self.cache.weighted_size(),
976        }
977    }
978
979    /// Insert an article entry directly (for testing)
980    ///
981    /// This is a low-level method that bypasses the usual upsert logic.
982    /// Only use this in tests where you need precise control over cache state.
983    #[cfg(test)]
984    pub async fn insert<'a>(&self, message_id: MessageId<'a>, entry: ArticleEntry) {
985        let key: Arc<str> = message_id.without_brackets().into();
986        self.cache.insert(key, entry).await;
987    }
988
989    /// Get maximum cache capacity
990    #[inline]
991    pub fn capacity(&self) -> u64 {
992        self.capacity
993    }
994
995    /// Get current number of cached entries (synchronous)
996    #[inline]
997    pub fn entry_count(&self) -> u64 {
998        self.cache.entry_count()
999    }
1000
1001    /// Get current weighted size in bytes (synchronous)
1002    #[inline]
1003    pub fn weighted_size(&self) -> u64 {
1004        self.cache.weighted_size()
1005    }
1006
1007    /// Get cache hit rate as percentage (0.0 to 100.0)
1008    #[inline]
1009    pub fn hit_rate(&self) -> f64 {
1010        let hits = self.hits.load(Ordering::Relaxed);
1011        let misses = self.misses.load(Ordering::Relaxed);
1012        let total = hits + misses;
1013
1014        if total == 0 {
1015            0.0
1016        } else {
1017            (hits as f64 / total as f64) * 100.0
1018        }
1019    }
1020
1021    /// Run pending background tasks (for testing)
1022    ///
1023    /// Moka performs maintenance tasks (eviction, expiration) asynchronously.
1024    /// This method ensures all pending tasks complete, useful for deterministic testing.
1025    pub async fn sync(&self) {
1026        self.cache.run_pending_tasks().await;
1027    }
1028}
1029
1030/// Cache statistics
1031#[derive(Debug, Clone)]
1032pub struct CacheStats {
1033    pub entry_count: u64,
1034    pub weighted_size: u64,
1035}
1036
1037#[cfg(test)]
1038mod tests {
1039    use super::*;
1040    use crate::types::MessageId;
1041    use std::time::Duration;
1042
1043    fn create_test_article(msgid: &str) -> ArticleEntry {
1044        let buffer = format!("220 0 {}\r\nSubject: Test\r\n\r\nBody\r\n.\r\n", msgid).into_bytes();
1045        ArticleEntry::new(buffer)
1046    }
1047
1048    #[test]
1049    fn test_backend_availability_basic() {
1050        let mut avail = ArticleAvailability::new();
1051        let b0 = BackendId::from_index(0);
1052        let b1 = BackendId::from_index(1);
1053
1054        // Default: assume all backends have it
1055        assert!(avail.should_try(b0));
1056        assert!(avail.should_try(b1));
1057
1058        // Record b0 as missing (returned 430)
1059        avail.record_missing(b0);
1060        assert!(!avail.should_try(b0)); // Should not try again
1061        assert!(avail.should_try(b1)); // Still should try
1062
1063        // Record b1 as missing too
1064        avail.record_missing(b1);
1065        assert!(!avail.should_try(b1));
1066    }
1067
1068    #[test]
1069    fn test_any_backend_has_article() {
1070        let mut avail = ArticleAvailability::new();
1071        let b0 = BackendId::from_index(0);
1072        let b1 = BackendId::from_index(1);
1073
1074        // Empty availability - no backend has it (nothing checked)
1075        assert!(!avail.any_backend_has_article());
1076
1077        // Record b0 as missing - still none have it
1078        avail.record_missing(b0);
1079        assert!(!avail.any_backend_has_article());
1080
1081        // Record b1 as having it - now one has it
1082        avail.record_has(b1);
1083        assert!(avail.any_backend_has_article());
1084
1085        // Record b1 as missing (overwrite) - none have it again
1086        avail.record_missing(b1);
1087        assert!(!avail.any_backend_has_article());
1088    }
1089
1090    #[test]
1091    fn test_record_has_clears_missing_bit() {
1092        let mut avail = ArticleAvailability::new();
1093        let b0 = BackendId::from_index(0);
1094
1095        // First mark as missing
1096        avail.record_missing(b0);
1097        assert!(avail.is_missing(b0));
1098        assert!(!avail.any_backend_has_article());
1099
1100        // Now mark as has - should clear missing
1101        avail.record_has(b0);
1102        assert!(!avail.is_missing(b0));
1103        assert!(avail.any_backend_has_article());
1104    }
1105
1106    #[test]
1107    fn test_merge_from_430_overrides_has() {
1108        // NNTP SEMANTICS:
1109        // - 430 "No Such Article" is AUTHORITATIVE - servers NEVER give false negatives
1110        // - 2xx success is UNRELIABLE - servers CAN give false positives
1111        // Therefore: 430 always wins over previous "has" state
1112        let mut cache_entry = ArticleAvailability::new();
1113        let b0 = BackendId::from_index(0);
1114        let b1 = BackendId::from_index(1);
1115
1116        // Previous request got success from backend 0 (might be false positive)
1117        cache_entry.record_has(b0);
1118        assert!(!cache_entry.is_missing(b0));
1119        assert!(cache_entry.any_backend_has_article());
1120
1121        // Later request gets 430 from backend 0 (AUTHORITATIVE)
1122        let mut new_result = ArticleAvailability::new();
1123        new_result.record_missing(b0); // 430 is definitive
1124        new_result.record_missing(b1); // Backend 1 also returned 430
1125
1126        // Merge new results into cache entry
1127        cache_entry.merge_from(&new_result);
1128
1129        // CRITICAL: 430 is authoritative, so it MUST override the previous "has"
1130        // The earlier success might have been a false positive (corrupt data, etc.)
1131        assert!(
1132            cache_entry.is_missing(b0),
1133            "430 must override previous has - 430 is authoritative"
1134        );
1135        assert!(!cache_entry.any_backend_has_article());
1136
1137        // Backend 1 should also be marked as missing
1138        assert!(
1139            cache_entry.is_missing(b1),
1140            "Backend 1 should be marked missing"
1141        );
1142    }
1143
1144    #[test]
1145    fn test_merge_from_can_add_new_missing() {
1146        let mut avail1 = ArticleAvailability::new();
1147        let mut avail2 = ArticleAvailability::new();
1148
1149        // avail1 knows backend 0 is missing
1150        avail1.record_missing(BackendId::from_index(0));
1151
1152        // avail2 knows backend 1 is missing
1153        avail2.record_missing(BackendId::from_index(1));
1154
1155        // Merge avail2 into avail1
1156        avail1.merge_from(&avail2);
1157
1158        // avail1 should now know both are missing
1159        assert!(avail1.is_missing(BackendId::from_index(0)));
1160        assert!(avail1.is_missing(BackendId::from_index(1)));
1161    }
1162
1163    #[test]
1164    fn test_merge_from_has_does_not_clear_missing() {
1165        // NNTP SEMANTICS:
1166        // - 430 is AUTHORITATIVE - if server says "no", article is NOT there
1167        // - 2xx is UNRELIABLE - server might lie (corrupt data, propagation issues)
1168        // Therefore: "has" from new fetch should NOT clear existing 430
1169        let mut cache_state = ArticleAvailability::new();
1170        let b0 = BackendId::from_index(0);
1171        let b1 = BackendId::from_index(1);
1172
1173        // Cache says both backends returned 430 (authoritative)
1174        cache_state.record_missing(b0);
1175        cache_state.record_missing(b1);
1176        assert!(cache_state.is_missing(b0));
1177        assert!(cache_state.is_missing(b1));
1178        assert!(!cache_state.any_backend_has_article());
1179
1180        // New request claims success from backend 0 (but 2xx is unreliable!)
1181        let mut new_fetch = ArticleAvailability::new();
1182        new_fetch.record_has(b0);
1183
1184        // Merge new fetch into cache
1185        cache_state.merge_from(&new_fetch);
1186
1187        // Backend 0 should STILL be missing - we trust the 430 over the 2xx
1188        // because 2xx can be false positive but 430 is never false negative
1189        assert!(
1190            cache_state.is_missing(b0),
1191            "2xx should NOT override 430 - 430 is authoritative"
1192        );
1193        assert!(!cache_state.any_backend_has_article());
1194
1195        // Backend 1 is still missing
1196        assert!(
1197            cache_state.is_missing(b1),
1198            "Backend 1 should still be missing"
1199        );
1200    }
1201
1202    #[test]
1203    fn test_backend_availability_all_exhausted() {
1204        use crate::router::BackendCount;
1205        let mut avail = ArticleAvailability::new();
1206
1207        // None missing yet
1208        assert!(!avail.all_exhausted(BackendCount::new(2)));
1209        assert!(!avail.all_exhausted(BackendCount::new(3)));
1210
1211        // Record backends 0 and 1 as missing
1212        avail.record_missing(BackendId::from_index(0));
1213        avail.record_missing(BackendId::from_index(1));
1214
1215        // All 2 backends exhausted
1216        assert!(avail.all_exhausted(BackendCount::new(2)));
1217
1218        // But not all 3 backends (backend 2 still untried)
1219        assert!(!avail.all_exhausted(BackendCount::new(3)));
1220    }
1221
1222    #[test]
1223    fn test_article_entry_basic() {
1224        let buffer = b"220 0 <test@example.com>\r\nBody\r\n.\r\n".to_vec();
1225        let entry = ArticleEntry::new(buffer.clone());
1226
1227        assert_eq!(entry.status_code(), Some(StatusCode::new(220)));
1228        assert_eq!(entry.buffer().as_ref(), &buffer);
1229
1230        // Default: should try all backends
1231        assert!(entry.should_try_backend(BackendId::from_index(0)));
1232        assert!(entry.should_try_backend(BackendId::from_index(1)));
1233    }
1234
1235    #[test]
1236    fn test_is_complete_article() {
1237        // Stubs should NOT be complete articles
1238        let stub_430 = ArticleEntry::new(b"430\r\n".to_vec());
1239        assert!(!stub_430.is_complete_article());
1240
1241        let stub_220 = ArticleEntry::new(b"220\r\n".to_vec());
1242        assert!(!stub_220.is_complete_article());
1243
1244        let stub_223 = ArticleEntry::new(b"223\r\n".to_vec());
1245        assert!(!stub_223.is_complete_article());
1246
1247        // Full article SHOULD be complete
1248        let full = ArticleEntry::new(
1249            b"220 0 <test@example.com>\r\nSubject: Test\r\n\r\nBody\r\n.\r\n".to_vec(),
1250        );
1251        assert!(full.is_complete_article());
1252
1253        // Wrong status code (not 220) should NOT be complete article
1254        let head_response =
1255            ArticleEntry::new(b"221 0 <test@example.com>\r\nSubject: Test\r\n.\r\n".to_vec());
1256        assert!(!head_response.is_complete_article());
1257
1258        // Missing terminator should NOT be complete
1259        let no_terminator = ArticleEntry::new(
1260            b"220 0 <test@example.com>\r\nSubject: Test\r\n\r\nBody\r\n".to_vec(),
1261        );
1262        assert!(!no_terminator.is_complete_article());
1263    }
1264
1265    #[test]
1266    fn test_article_entry_record_backend_missing() {
1267        let backend0 = BackendId::from_index(0);
1268        let backend1 = BackendId::from_index(1);
1269        let mut entry = create_test_article("<test@example.com>");
1270
1271        // Initially should try both
1272        assert!(entry.should_try_backend(backend0));
1273        assert!(entry.should_try_backend(backend1));
1274
1275        // Record backend1 as missing (430 response)
1276        entry.record_backend_missing(backend1);
1277
1278        // Should still try backend0, but not backend1
1279        assert!(entry.should_try_backend(backend0));
1280        assert!(!entry.should_try_backend(backend1));
1281    }
1282
1283    #[test]
1284    fn test_article_entry_all_backends_exhausted() {
1285        use crate::router::BackendCount;
1286        let backend0 = BackendId::from_index(0);
1287        let backend1 = BackendId::from_index(1);
1288        let mut entry = create_test_article("<test@example.com>");
1289
1290        // Not all exhausted yet
1291        assert!(!entry.all_backends_exhausted(BackendCount::new(2)));
1292
1293        // Record both as missing
1294        entry.record_backend_missing(backend0);
1295        entry.record_backend_missing(backend1);
1296
1297        // Now all 2 backends are exhausted
1298        assert!(entry.all_backends_exhausted(BackendCount::new(2)));
1299    }
1300
1301    #[tokio::test]
1302    async fn test_arc_str_borrow_lookup() {
1303        // Create cache with Arc<str> keys
1304        let cache = ArticleCache::new(100, Duration::from_secs(300), true);
1305
1306        // Create a MessageId and insert an article
1307        let msgid = MessageId::from_borrowed("<test123@example.com>").unwrap();
1308        let article = create_test_article("<test123@example.com>");
1309
1310        cache.insert(msgid.clone(), article.clone()).await;
1311
1312        // Verify we can retrieve using a different MessageId instance (borrowed)
1313        // This demonstrates that Arc<str> supports Borrow<str> lookups via &str
1314        let msgid2 = MessageId::from_borrowed("<test123@example.com>").unwrap();
1315        let retrieved = cache.get(&msgid2).await;
1316
1317        assert!(
1318            retrieved.is_some(),
1319            "Arc<str> cache should support Borrow<str> lookups"
1320        );
1321        assert_eq!(
1322            retrieved.unwrap().buffer().as_ref(),
1323            article.buffer().as_ref(),
1324            "Retrieved article should match inserted article"
1325        );
1326    }
1327
1328    #[tokio::test]
1329    async fn test_cache_hit_miss() {
1330        let cache = ArticleCache::new(100, Duration::from_secs(300), true);
1331
1332        let msgid = MessageId::from_borrowed("<nonexistent@example.com>").unwrap();
1333        let result = cache.get(&msgid).await;
1334
1335        assert!(
1336            result.is_none(),
1337            "Cache lookup for non-existent key should return None"
1338        );
1339    }
1340
1341    #[tokio::test]
1342    async fn test_cache_insert_and_retrieve() {
1343        let cache = ArticleCache::new(10, Duration::from_secs(300), true);
1344
1345        let msgid = MessageId::from_borrowed("<article@example.com>").unwrap();
1346        let article = create_test_article("<article@example.com>");
1347
1348        cache.insert(msgid.clone(), article.clone()).await;
1349
1350        let retrieved = cache.get(&msgid).await.unwrap();
1351        assert_eq!(retrieved.buffer().as_ref(), article.buffer().as_ref());
1352    }
1353
1354    #[tokio::test]
1355    async fn test_cache_upsert_new_entry() {
1356        let cache = ArticleCache::new(100, Duration::from_secs(300), true);
1357
1358        let msgid = MessageId::from_borrowed("<test@example.com>").unwrap();
1359        let buffer = b"220 0 <test@example.com>\r\nBody\r\n.\r\n".to_vec();
1360
1361        cache
1362            .upsert(msgid.clone(), buffer.clone(), BackendId::from_index(0), 0)
1363            .await;
1364
1365        let retrieved = cache.get(&msgid).await.unwrap();
1366        assert_eq!(retrieved.buffer().as_ref(), &buffer);
1367        // Default: should try all backends
1368        assert!(retrieved.should_try_backend(BackendId::from_index(0)));
1369        assert!(retrieved.should_try_backend(BackendId::from_index(1)));
1370    }
1371
1372    #[tokio::test]
1373    async fn test_cache_upsert_existing_entry() {
1374        let cache = ArticleCache::new(100, Duration::from_secs(300), true);
1375
1376        let msgid = MessageId::from_borrowed("<test@example.com>").unwrap();
1377        let buffer = b"220 0 <test@example.com>\r\nBody\r\n.\r\n".to_vec();
1378
1379        // Insert with backend 0
1380        cache
1381            .upsert(msgid.clone(), buffer.clone(), BackendId::from_index(0), 0)
1382            .await;
1383
1384        // Update with backend 1 - does nothing (entry already exists)
1385        cache
1386            .upsert(msgid.clone(), buffer.clone(), BackendId::from_index(1), 0)
1387            .await;
1388
1389        let retrieved = cache.get(&msgid).await.unwrap();
1390        // Default: should try all backends
1391        assert!(retrieved.should_try_backend(BackendId::from_index(0)));
1392        assert!(retrieved.should_try_backend(BackendId::from_index(1)));
1393    }
1394
1395    #[tokio::test]
1396    async fn test_record_backend_missing() {
1397        let cache = ArticleCache::new(100, Duration::from_secs(300), true);
1398
1399        let msgid = MessageId::from_borrowed("<test@example.com>").unwrap();
1400        let article = create_test_article("<test@example.com>");
1401
1402        cache.insert(msgid.clone(), article).await;
1403
1404        // Record backend 1 as missing
1405        cache
1406            .record_backend_missing(msgid.clone(), BackendId::from_index(1))
1407            .await;
1408
1409        let retrieved = cache.get(&msgid).await.unwrap();
1410        // Backend 0 should still be tried, backend 1 should not
1411        assert!(retrieved.should_try_backend(BackendId::from_index(0)));
1412        assert!(!retrieved.should_try_backend(BackendId::from_index(1)));
1413    }
1414
1415    /// CRITICAL BUG FIX TEST: record_backend_missing must create cache entries
1416    /// for articles that don't exist anywhere (all backends return 430).
1417    ///
1418    /// Bug: Previously, if an article wasn't cached, record_backend_missing
1419    /// would silently do nothing. This caused repeated queries to all backends
1420    /// for missing articles, resulting in:
1421    /// - Massive bandwidth waste
1422    /// - SABnzbd reporting "gigabytes of missing articles"
1423    /// - 4xx/5xx error counts not increasing (metrics bug)
1424    #[tokio::test]
1425    async fn test_record_backend_missing_creates_new_entry() {
1426        let cache = ArticleCache::new(100, Duration::from_secs(300), true);
1427
1428        let msgid = MessageId::from_borrowed("<missing@example.com>").unwrap();
1429
1430        // Verify article is NOT in cache
1431        assert!(cache.get(&msgid).await.is_none());
1432
1433        // Record backend 0 returned 430
1434        cache
1435            .record_backend_missing(msgid.clone(), BackendId::from_index(0))
1436            .await;
1437
1438        // CRITICAL: Cache entry MUST now exist
1439        let entry = cache
1440            .get(&msgid)
1441            .await
1442            .expect("Cache entry must exist after record_backend_missing");
1443
1444        // Verify backend 0 is marked as missing
1445        assert!(
1446            !entry.should_try_backend(BackendId::from_index(0)),
1447            "Backend 0 should be marked missing"
1448        );
1449
1450        // Verify backend 1 is still available (not tried yet)
1451        assert!(
1452            entry.should_try_backend(BackendId::from_index(1)),
1453            "Backend 1 should still be available"
1454        );
1455
1456        // Verify the cached response is a 430 stub
1457        assert_eq!(
1458            entry.buffer().as_ref(),
1459            b"430\r\n",
1460            "Cached buffer should be a 430 stub"
1461        );
1462
1463        // Record backend 1 also returned 430
1464        cache
1465            .record_backend_missing(msgid.clone(), BackendId::from_index(1))
1466            .await;
1467
1468        let entry = cache.get(&msgid).await.unwrap();
1469
1470        // Now both backends should be marked missing
1471        assert!(!entry.should_try_backend(BackendId::from_index(0)));
1472        assert!(!entry.should_try_backend(BackendId::from_index(1)));
1473
1474        // Verify all backends exhausted
1475        use crate::router::BackendCount;
1476        assert!(
1477            entry.all_backends_exhausted(BackendCount::new(2)),
1478            "All backends should be exhausted"
1479        );
1480    }
1481
1482    #[tokio::test]
1483    async fn test_cache_stats() {
1484        let cache = ArticleCache::new(1024 * 1024, Duration::from_secs(300), true); // 1MB
1485
1486        // Initial stats
1487        let stats = cache.stats().await;
1488        assert_eq!(stats.entry_count, 0);
1489
1490        // Insert one article
1491        let msgid = MessageId::from_borrowed("<test@example.com>").unwrap();
1492        let article = create_test_article("<test@example.com>");
1493        cache.insert(msgid, article).await;
1494
1495        // Wait for background tasks
1496        cache.sync().await;
1497
1498        // Check stats again
1499        let stats = cache.stats().await;
1500        assert_eq!(stats.entry_count, 1);
1501    }
1502
1503    #[tokio::test]
1504    async fn test_cache_ttl_expiration() {
1505        let cache = ArticleCache::new(1024 * 1024, Duration::from_millis(50), true); // 1MB
1506
1507        let msgid = MessageId::from_borrowed("<expire@example.com>").unwrap();
1508        let article = create_test_article("<expire@example.com>");
1509
1510        cache.insert(msgid.clone(), article).await;
1511
1512        // Should be cached immediately
1513        assert!(cache.get(&msgid).await.is_some());
1514
1515        // Wait for TTL expiration + sync
1516        tokio::time::sleep(Duration::from_millis(100)).await;
1517        cache.sync().await;
1518
1519        // Should be expired
1520        assert!(cache.get(&msgid).await.is_none());
1521    }
1522
1523    #[tokio::test]
1524    async fn test_insert_respects_cache_articles_flag() {
1525        // Test with cache_articles=false - should store minimal stub
1526        let cache_stub = ArticleCache::new(1024 * 1024, Duration::from_secs(300), false);
1527
1528        let msgid = MessageId::from_borrowed("<test@example.com>").unwrap();
1529        let buffer = b"220 0 <test@example.com>\r\nSubject: Test\r\n\r\nBody\r\n.\r\n".to_vec();
1530        let full_size = buffer.len();
1531
1532        cache_stub
1533            .upsert(msgid.clone(), buffer, BackendId::from_index(0), 0)
1534            .await;
1535        cache_stub.sync().await;
1536
1537        let retrieved = cache_stub.get(&msgid).await.unwrap();
1538        let stub_size = retrieved.buffer().len();
1539
1540        // Stub should be much smaller than full article (just "220\r\n")
1541        assert!(
1542            stub_size < 10,
1543            "Stub size {} should be < 10 bytes",
1544            stub_size
1545        );
1546        assert!(
1547            stub_size < full_size,
1548            "Stub {} should be smaller than full {}",
1549            stub_size,
1550            full_size
1551        );
1552
1553        // Test with cache_articles=true - should store full article
1554        let cache_full = ArticleCache::new(1024 * 1024, Duration::from_secs(300), true);
1555
1556        let msgid2 = MessageId::from_borrowed("<test2@example.com>").unwrap();
1557        let buffer2 = b"220 0 <test2@example.com>\r\nSubject: Test2\r\n\r\nBody2\r\n.\r\n".to_vec();
1558        let original_size = buffer2.len();
1559
1560        cache_full
1561            .upsert(msgid2.clone(), buffer2, BackendId::from_index(0), 0)
1562            .await;
1563        cache_full.sync().await;
1564
1565        let retrieved2 = cache_full.get(&msgid2).await.unwrap();
1566
1567        // Should store full article
1568        assert_eq!(retrieved2.buffer().len(), original_size);
1569    }
1570
1571    #[tokio::test]
1572    async fn test_cache_capacity_limit() {
1573        let cache = ArticleCache::new(500, Duration::from_secs(300), true); // 500 bytes total
1574
1575        // Insert 3 articles (exceeds capacity)
1576        for i in 1..=3 {
1577            let msgid_str = format!("<article{}@example.com>", i);
1578            let msgid = MessageId::new(msgid_str).unwrap();
1579            let article = create_test_article(msgid.as_ref());
1580            cache.insert(msgid, article).await;
1581            cache.sync().await; // Force eviction
1582        }
1583
1584        // Wait for eviction to complete
1585        tokio::time::sleep(Duration::from_millis(10)).await;
1586        cache.sync().await;
1587
1588        let stats = cache.stats().await;
1589        assert!(
1590            stats.entry_count <= 3,
1591            "Cache should have at most 3 entries with 500 byte capacity"
1592        );
1593    }
1594
1595    #[tokio::test]
1596    async fn test_article_entry_clone() {
1597        let article = create_test_article("<test@example.com>");
1598
1599        let cloned = article.clone();
1600        assert_eq!(article.buffer().as_ref(), cloned.buffer().as_ref());
1601        assert!(Arc::ptr_eq(article.buffer(), cloned.buffer()));
1602    }
1603
1604    #[tokio::test]
1605    async fn test_cache_clone() {
1606        let cache1 = ArticleCache::new(1024 * 1024, Duration::from_secs(300), true); // 1MB
1607        let cache2 = cache1.clone();
1608
1609        let msgid = MessageId::from_borrowed("<test@example.com>").unwrap();
1610        let article = create_test_article("<test@example.com>");
1611
1612        cache1.insert(msgid.clone(), article).await;
1613        cache1.sync().await;
1614
1615        // Should be accessible from cloned cache
1616        assert!(cache2.get(&msgid).await.is_some());
1617    }
1618
1619    #[tokio::test]
1620    async fn test_weigher_large_articles() {
1621        // Test that large article bodies use ACTUAL SIZE (no multiplier)
1622        // when cache_articles=true and buffer >10KB
1623        let cache = ArticleCache::new(10 * 1024 * 1024, Duration::from_secs(300), true); // 10MB capacity
1624
1625        // Create a 750KB article (typical size)
1626        let body = vec![b'X'; 750_000];
1627        let response = format!(
1628            "222 0 <test@example.com>\r\n{}\r\n.\r\n",
1629            std::str::from_utf8(&body).unwrap()
1630        );
1631        let article = ArticleEntry::new(response.as_bytes().to_vec());
1632
1633        let msgid = MessageId::from_borrowed("<test@example.com>").unwrap();
1634        cache.insert(msgid.clone(), article).await;
1635        cache.sync().await;
1636
1637        // With actual size (no multiplier): 750KB per entry
1638        // 10MB capacity should fit ~13 articles
1639        // With old 1.8x multiplier: 750KB * 1.8 ≈ 1.35MB per entry, fits ~7 articles
1640        // With old 2.5x multiplier: 750KB * 2.5 ≈ 1.875MB per entry, fits ~5 articles
1641
1642        // Insert 12 more articles (13 total)
1643        for i in 2..=13 {
1644            let msgid_str = format!("<article{}@example.com>", i);
1645            let msgid = MessageId::new(msgid_str).unwrap();
1646            let response = format!(
1647                "222 0 {}\r\n{}\r\n.\r\n",
1648                msgid.as_str(),
1649                std::str::from_utf8(&body).unwrap()
1650            );
1651            let article = ArticleEntry::new(response.as_bytes().to_vec());
1652            cache.insert(msgid, article).await;
1653            cache.sync().await;
1654        }
1655
1656        tokio::time::sleep(Duration::from_millis(50)).await;
1657        cache.sync().await;
1658
1659        let stats = cache.stats().await;
1660        // With actual size (no multiplier), should fit 11-13 large articles
1661        assert!(
1662            stats.entry_count >= 11,
1663            "Cache should fit at least 11 large articles with actual size (no multiplier) (got {})",
1664            stats.entry_count
1665        );
1666    }
1667
1668    #[tokio::test]
1669    async fn test_weigher_small_stubs() {
1670        // Test that small stubs account for moka internal overhead correctly
1671        // With MOKA_OVERHEAD = 2000 bytes (based on empirical 10x memory ratio from moka issue #473)
1672        let cache = ArticleCache::new(1_000_000, Duration::from_secs(300), false); // 1MB capacity
1673
1674        // Create small stub (53 bytes)
1675        let stub = b"223 0 <test@example.com>\r\n".to_vec();
1676        let article = ArticleEntry::new(stub);
1677
1678        let msgid = MessageId::from_borrowed("<test@example.com>").unwrap();
1679        cache.insert(msgid, article).await;
1680        cache.sync().await;
1681
1682        // With MOKA_OVERHEAD = 2000: stub + Arc + availability + overhead
1683        // ~53 + 68 + 40 + 2000 = ~2161 bytes per small stub
1684        // With 2.5x small stub multiplier: ~5400 bytes per stub
1685        // 1MB capacity should fit ~185 stubs
1686
1687        // Insert many small stubs
1688        for i in 2..=200 {
1689            let msgid_str = format!("<stub{}@example.com>", i);
1690            let msgid = MessageId::new(msgid_str).unwrap();
1691            let stub = format!("223 0 {}\r\n", msgid.as_str());
1692            let article = ArticleEntry::new(stub.as_bytes().to_vec());
1693            cache.insert(msgid, article).await;
1694        }
1695
1696        cache.sync().await;
1697        tokio::time::sleep(Duration::from_millis(50)).await;
1698        cache.sync().await;
1699
1700        let stats = cache.stats().await;
1701        // Should be able to fit ~150-185 small stubs in 1MB
1702        assert!(
1703            stats.entry_count >= 100,
1704            "Cache should fit many small stubs (got {})",
1705            stats.entry_count
1706        );
1707    }
1708
1709    #[tokio::test]
1710    async fn test_cache_with_owned_message_id() {
1711        let cache = ArticleCache::new(1024 * 1024, Duration::from_secs(300), true); // 1MB
1712
1713        // Use owned MessageId
1714        let msgid = MessageId::new("<owned@example.com>".to_string()).unwrap();
1715        let article = create_test_article("<owned@example.com>");
1716
1717        cache.insert(msgid.clone(), article).await;
1718
1719        // Retrieve with borrowed MessageId
1720        let borrowed_msgid = MessageId::from_borrowed("<owned@example.com>").unwrap();
1721        assert!(cache.get(&borrowed_msgid).await.is_some());
1722    }
1723}