sagittarius 0.2.0

A fast, self-hosted DNS sinkhole in a single Rust binary
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
//! Raw-bytes DNS response cache with per-entry TTL expiry.
//!
//! Implements the raw-bytes cache described in SPEC §8: each cached entry stores
//! the upstream response exactly as received (`bytes::Bytes`) together with the
//! byte offsets of every real (non-OPT) RR TTL field, recorded by
//! [`codec::ttl::TtlScan`].
//!
//! On a cache hit, [`DnsCache::get`] synthesises a fresh response by copying the
//! cached bytes and patching two things in-place — no parse, no re-serialisation:
//!
//! 1. **Transaction ID** (bytes 0–1): overwritten with the client's query ID.
//! 2. **TTL fields**: each offset from [`TtlScan`] is decremented by the time
//!    elapsed since the entry was stored, saturating at 0.
//!
//! # Design notes
//!
//! - The cache is a **mechanism** only.  It does not classify responses as
//!   positive or negative, and it does not decide whether a response should be
//!   cached.  All policy (which TTL to use, whether to store a `NXDOMAIN`, etc.)
//!   lives in the caller (E6.3).
//! - [`DnsCache`] wraps a [`moka::future::Cache`] which is internally concurrent
//!   and cheaply cloneable.  Shared via [`std::sync::Arc`] in E4.4.
//! - Expiry is driven by a real (`quanta`) clock, not Tokio's virtual clock, so
//!   the one real-sleep expiry test in this module uses `std::thread::sleep`.

use std::{
    sync::{
        Arc,
        atomic::{AtomicU64, Ordering},
    },
    time::{Duration, Instant},
};

use bytes::{BufMut, Bytes, BytesMut};
use moka::future::Cache;

use crate::codec::message::Question;

// ── CachedResponse ────────────────────────────────────────────────────────────

/// A single entry stored in the [`DnsCache`].
///
/// Cheap to clone: `bytes` and `ttl_offsets` are reference-counted, and the
/// remaining fields are plain `Copy` scalars.
#[derive(Clone, Debug)]
struct CachedResponse {
    /// The full upstream response, as received on the wire.
    bytes: Bytes,

    /// Absolute byte offsets of each real (non-OPT) TTL field in `bytes`.
    ///
    /// Stored as an `Arc<[usize]>` so cloning the entry is always O(1)
    /// regardless of how many RRs are present.
    ttl_offsets: Arc<[usize]>,

    /// Wall-clock time at which this entry was stored, used to compute the
    /// elapsed time when serving the entry.
    stored_at: Instant,

    /// The clamped per-entry lifetime.  Read by the [`moka::Expiry`] impl.
    expiry: Duration,
}

// ── Expiry impl ───────────────────────────────────────────────────────────────

/// Per-entry expiry policy for [`moka`].
///
/// `expire_after_create` returns the pre-computed `expiry` field so that
/// `moka` evicts each entry at exactly the right wall-clock time.  The other
/// two methods are not overridden (`None` → no change on read/update).
#[derive(Debug, Clone)]
struct DnsCacheExpiry;

impl moka::Expiry<Question, CachedResponse> for DnsCacheExpiry {
    fn expire_after_create(
        &self,
        _key: &Question,
        value: &CachedResponse,
        _current_time: Instant,
    ) -> Option<Duration> {
        Some(value.expiry)
    }
}

impl CachedResponse {
    /// Produce a patched copy of this cached response ready to send to a client.
    ///
    /// Two in-place mutations are applied to a byte-for-byte copy of `bytes`:
    ///
    /// 1. **Transaction ID** — if the buffer is at least 2 bytes, the first two
    ///    bytes are overwritten with `client_txn_id` in big-endian order.
    /// 2. **TTL decrement** — for each recorded offset, if `offset + 4 <= len`,
    ///    the big-endian `u32` at that offset is decremented by `elapsed_secs`
    ///    (saturating at 0).
    ///
    /// The patching is fully bounds-guarded and never panics on bad input.
    fn patched_for(&self, client_txn_id: u16, elapsed_secs: u32) -> Bytes {
        let len = self.bytes.len();
        let mut buf = BytesMut::with_capacity(len);
        buf.put_slice(&self.bytes);

        // 1. Patch transaction ID (bytes 0–1, big-endian).
        if len >= 2 {
            let id_bytes = client_txn_id.to_be_bytes();
            buf[0] = id_bytes[0];
            buf[1] = id_bytes[1];
        }

        // 2. Decrement each TTL field in-place (bounds-guarded).
        for &offset in self.ttl_offsets.iter() {
            if offset + 4 <= len {
                let old = u32::from_be_bytes([
                    buf[offset],
                    buf[offset + 1],
                    buf[offset + 2],
                    buf[offset + 3],
                ]);
                let new_ttl = old.saturating_sub(elapsed_secs);
                let new_bytes = new_ttl.to_be_bytes();
                buf[offset] = new_bytes[0];
                buf[offset + 1] = new_bytes[1];
                buf[offset + 2] = new_bytes[2];
                buf[offset + 3] = new_bytes[3];
            }
        }

        buf.freeze()
    }
}

// ── DnsCache ──────────────────────────────────────────────────────────────────

/// A TTL-expiring raw-bytes cache for DNS responses.
///
/// Wraps a [`moka::future::Cache`] keyed by [`Question`]
/// (`(qname, qtype, qclass)`).  Each entry carries the full upstream response
/// bytes, the recorded TTL-field offsets, and a clamped per-entry lifetime.
///
/// On a hit [`DnsCache::get`] patches the transaction ID and decrements all TTL
/// fields in-place without any DNS parse or re-serialisation (SPEC §8).
///
/// # Sharing
///
/// `moka::future::Cache` is cheaply cloneable and internally concurrent.
/// `DnsCache` itself is `Clone + Send + Sync` and should be shared via
/// [`std::sync::Arc`] (or cloned directly) across tasks (E4.4).
#[derive(Clone, Debug)]
pub struct DnsCache {
    inner: Cache<Question, CachedResponse>,
    /// Live TTL clamp bounds, packed as `(min_ttl << 32) | max_ttl`.
    ///
    /// Read on every insert and updatable at runtime via
    /// [`set_ttl_bounds`](Self::set_ttl_bounds), so a min/max-TTL settings change
    /// takes effect without rebuilding the cache. `Arc` so clones of the
    /// (cheaply-cloneable) cache share one set of bounds, like moka's own state;
    /// packed into a single atomic so a concurrent reader never sees a torn
    /// `min`/`max` pair (which `u32::clamp` would panic on).
    ttl_bounds: Arc<AtomicU64>,
}

impl DnsCache {
    /// Build a new [`DnsCache`] with the given capacity and TTL bounds.
    ///
    /// - `capacity` — maximum number of entries (moka handles eviction).
    /// - `min_ttl` — entries with a shorter TTL are clamped up to this value
    ///   (seconds).
    /// - `max_ttl` — entries with a longer TTL are clamped down to this value
    ///   (seconds).
    ///
    /// The TTL bounds are live: a later [`set_ttl_bounds`](Self::set_ttl_bounds)
    /// changes the clamp applied to subsequent inserts.
    ///
    /// # Panics
    ///
    /// Panics in debug builds if `min_ttl > max_ttl`.
    #[must_use]
    pub fn new(capacity: u64, min_ttl: u32, max_ttl: u32) -> Self {
        debug_assert!(min_ttl <= max_ttl, "min_ttl must not exceed max_ttl");

        let inner = Cache::builder()
            .max_capacity(capacity)
            .expire_after(DnsCacheExpiry)
            .build();

        Self {
            inner,
            ttl_bounds: Arc::new(AtomicU64::new(pack_bounds(min_ttl, max_ttl))),
        }
    }

    /// Update the live TTL clamp bounds (E8 settings change).
    ///
    /// Takes effect on subsequent inserts; entries already cached keep the
    /// expiry they were stored with. Lets a min/max-TTL edit apply immediately
    /// rather than waiting for a restart (the moka capacity, by contrast, is
    /// fixed at build time and still needs one).
    pub fn set_ttl_bounds(&self, min_ttl: u32, max_ttl: u32) {
        debug_assert!(min_ttl <= max_ttl, "min_ttl must not exceed max_ttl");
        self.ttl_bounds
            .store(pack_bounds(min_ttl, max_ttl), Ordering::Relaxed);
    }

    /// Insert a response into the cache.
    ///
    /// The **caller supplies `expiry_ttl_secs`** — the TTL that should govern
    /// how long this entry lives.  For positive responses this is typically the
    /// minimum real-RR TTL; for negative responses (`NXDOMAIN`/`NODATA`) the
    /// caller passes the SOA-derived negative TTL.  The cache clamps the
    /// supplied value to `[min_ttl, max_ttl]` and stores it as the per-entry
    /// expiry.
    ///
    /// `ttl_offsets` is the `Vec<usize>` from [`codec::ttl::TtlScan`]; it is
    /// converted to an `Arc<[usize]>` once at insert time so that future clones
    /// of the entry are O(1).
    pub async fn insert(
        &self,
        key: Question,
        bytes: Bytes,
        ttl_offsets: Vec<usize>,
        expiry_ttl_secs: u32,
    ) {
        let clamped = self.clamp_ttl(expiry_ttl_secs);
        let entry = CachedResponse {
            bytes,
            ttl_offsets: Arc::from(ttl_offsets),
            stored_at: Instant::now(),
            expiry: Duration::from_secs(u64::from(clamped)),
        };
        self.inner.insert(key, entry).await;
    }

    /// Serve a cached response for `key`, patching it for the given client.
    ///
    /// On a hit, returns `Some(Bytes)` containing the cached response with:
    /// - The transaction ID (bytes 0–1) replaced with `client_txn_id`.
    /// - Each recorded TTL field decremented by the seconds elapsed since the
    ///   entry was stored (saturating at 0).
    ///
    /// Returns `None` if the key is not in the cache (cache miss).
    pub async fn get(&self, key: &Question, client_txn_id: u16) -> Option<Bytes> {
        let entry = self.inner.get(key).await?;

        let elapsed_secs = entry
            .stored_at
            .elapsed()
            .as_secs()
            .try_into()
            .unwrap_or(u32::MAX);

        Some(entry.patched_for(client_txn_id, elapsed_secs))
    }

    /// Run any pending maintenance tasks (e.g. expire stale entries).
    ///
    /// Useful in tests to synchronously flush expiry without relying on moka's
    /// background maintenance thread.
    pub async fn run_pending_tasks(&self) {
        self.inner.run_pending_tasks().await;
    }

    /// Approximate number of entries currently cached (E15.7 dashboard).
    ///
    /// `moka` maintains this lazily, so the count can lag recent inserts/evictions
    /// until pending maintenance runs — fine for an at-a-glance figure.
    pub fn entry_count(&self) -> u64 {
        self.inner.entry_count()
    }

    /// Invalidate every cached entry.
    ///
    /// Called when a configuration change can alter *which authority answers a
    /// name* — notably a conditional-forward zone edit — so previously-cached
    /// answers obtained via the default upstream pool do not keep being served
    /// in place of the newly-routed zone target until they expire. Cheap and
    /// rare (an admin edit), and the cache repopulates on the next miss.
    ///
    /// `moka`'s `invalidate_all` is lazy: entries stop being served immediately,
    /// but `entry_count` may lag until pending maintenance runs.
    pub fn clear(&self) {
        self.inner.invalidate_all();
    }

    /// Clamp a caller-supplied TTL (seconds) into the cache's current `[min,
    /// max]` bounds, read live from [`ttl_bounds`](Self::ttl_bounds).
    #[inline]
    fn clamp_ttl(&self, secs: u32) -> u32 {
        let (min_ttl, max_ttl) = unpack_bounds(self.ttl_bounds.load(Ordering::Relaxed));
        secs.clamp(min_ttl, max_ttl)
    }
}

/// Pack `(min_ttl, max_ttl)` into a single `u64` for atomic storage.
#[inline]
fn pack_bounds(min_ttl: u32, max_ttl: u32) -> u64 {
    (u64::from(min_ttl) << 32) | u64::from(max_ttl)
}

/// Inverse of [`pack_bounds`].
#[inline]
fn unpack_bounds(packed: u64) -> (u32, u32) {
    ((packed >> 32) as u32, packed as u32)
}

// ── Tests ─────────────────────────────────────────────────────────────────────

#[cfg(test)]
mod tests {
    use super::*;
    use crate::codec::{
        header::{Header, Rcode},
        message::{Qclass, Qtype, Question},
        name::Name,
        reader::Reader,
        ttl::TtlScan,
        writer::Writer,
    };

    // ── Test helpers ──────────────────────────────────────────────────────────

    /// Build a minimal DNS A-record response:
    /// - Header: QR=1, QDCOUNT=1, ANCOUNT=1
    /// - Question: `name` / A / IN
    /// - Answer: `name` / A / IN / `ttl` / 4 bytes of 127.0.0.1
    fn build_a_response(id: u16, name: &str, ttl: u32) -> Bytes {
        let mut w = Writer::with_capacity(128);
        Header::new(id)
            .with_qr(true)
            .with_rcode(Rcode::NoError)
            .with_qdcount(1)
            .with_ancount(1)
            .write(&mut w);

        let n: Name = name.parse().expect("valid name");
        // Question section.
        n.write(&mut w);
        w.write_u16(1); // QTYPE A
        w.write_u16(1); // QCLASS IN

        // Answer section: owner + TYPE + CLASS + TTL + RDLENGTH + RDATA.
        n.write(&mut w);
        w.write_u16(1); // TYPE A
        w.write_u16(1); // CLASS IN
        w.write_u32(ttl);
        w.write_u16(4); // RDLENGTH
        w.write_slice(&[127, 0, 0, 1]); // RDATA

        w.finish()
    }

    /// Parse the 16-bit transaction ID from a DNS message (bytes 0–1, big-endian).
    fn read_txn_id(bytes: &[u8]) -> u16 {
        u16::from_be_bytes([bytes[0], bytes[1]])
    }

    /// Read the big-endian u32 at an absolute byte offset.
    fn read_u32_at(bytes: &[u8], offset: usize) -> u32 {
        u32::from_be_bytes([
            bytes[offset],
            bytes[offset + 1],
            bytes[offset + 2],
            bytes[offset + 3],
        ])
    }

    /// Build a [`Question`] for `name` / A / IN.
    fn make_question(name: &str) -> Question {
        Question {
            name: name.parse().expect("valid name"),
            qtype: Qtype::A,
            qclass: Qclass::In,
        }
    }

    /// Build a bare [`CachedResponse`] over `bytes` + `ttl_offsets` for testing
    /// the patch logic (the `stored_at` / `expiry` fields are irrelevant here).
    fn cached(bytes: Bytes, ttl_offsets: Vec<usize>) -> CachedResponse {
        CachedResponse {
            bytes,
            ttl_offsets: Arc::from(ttl_offsets),
            stored_at: Instant::now(),
            expiry: Duration::from_secs(0),
        }
    }

    // ── DnsCache::clamp_ttl ─────────────────────────────────────────────────────

    #[test]
    fn clamp_ttl_below_min_returns_min() {
        let cache = DnsCache::new(100, 5, 3600);
        assert_eq!(cache.clamp_ttl(0), 5);
        assert_eq!(cache.clamp_ttl(4), 5);
    }

    #[test]
    fn clamp_ttl_above_max_returns_max() {
        let cache = DnsCache::new(100, 5, 3600);
        assert_eq!(cache.clamp_ttl(7200), 3600);
        assert_eq!(cache.clamp_ttl(u32::MAX), 3600);
    }

    #[test]
    fn set_ttl_bounds_is_live() {
        // Bounds start at [5, 3600]; a value of 4 clamps up to 5, 7200 down to 3600.
        let cache = DnsCache::new(100, 5, 3600);
        assert_eq!(cache.clamp_ttl(4), 5);
        assert_eq!(cache.clamp_ttl(7200), 3600);

        // Widen the max and raise the min live — the new bounds clamp at once,
        // both narrowing (min up) and widening (max up) directions.
        cache.set_ttl_bounds(60, 86400);
        assert_eq!(cache.clamp_ttl(4), 60, "raised min applies live");
        assert_eq!(cache.clamp_ttl(7200), 7200, "widened max applies live");
        assert_eq!(cache.clamp_ttl(100_000), 86400);
    }

    #[test]
    fn clamp_ttl_in_range_unchanged() {
        let cache = DnsCache::new(100, 5, 3600);
        assert_eq!(cache.clamp_ttl(300), 300);
        assert_eq!(cache.clamp_ttl(5), 5);
        assert_eq!(cache.clamp_ttl(3600), 3600);
    }

    // ── CachedResponse::patched_for ─────────────────────────────────────────────

    /// Build a small but structurally complete A-record response, run TtlScan
    /// to get real offsets, then patch and verify via re-parse.
    #[test]
    fn patch_response_txn_id_and_ttl_decrement() {
        let original_ttl: u32 = 300;
        let original_id: u16 = 0x1234;
        let msg = build_a_response(original_id, "example.com", original_ttl);

        let bytes_ref: Bytes = msg.clone();
        let scan = TtlScan::scan(&bytes_ref).expect("scan must succeed");
        assert_eq!(scan.ttl_offsets.len(), 1, "expected exactly one TTL offset");

        let elapsed: u32 = 30;
        let client_id: u16 = 0xBEEF;

        let patched = cached(msg.clone(), scan.ttl_offsets.clone()).patched_for(client_id, elapsed);

        // Transaction ID must equal client_id.
        assert_eq!(
            read_txn_id(&patched),
            client_id,
            "patched txn-id must match client_id"
        );

        // TTL must be decremented by elapsed.
        let expected_ttl = original_ttl.saturating_sub(elapsed);
        let actual_ttl = read_u32_at(&patched, scan.ttl_offsets[0]);
        assert_eq!(
            actual_ttl, expected_ttl,
            "TTL must be decremented by elapsed"
        );
    }

    /// Verify the transaction ID is also patchable independently.
    #[test]
    fn patch_response_txn_id_only() {
        let msg = build_a_response(0xAAAA, "a.example.com", 60);
        let scan = TtlScan::scan(&msg.clone()).expect("scan");

        // elapsed = 0 → TTLs unchanged; only txn-id changes.
        let patched = cached(msg.clone(), scan.ttl_offsets.clone()).patched_for(0xBBBB, 0);

        assert_eq!(read_txn_id(&patched), 0xBBBB);
        // TTL should be unchanged (elapsed = 0).
        assert_eq!(read_u32_at(&patched, scan.ttl_offsets[0]), 60);
    }

    /// When elapsed exceeds the TTL, the result must saturate at 0 (no wrap).
    #[test]
    fn patch_response_ttl_saturates_at_zero() {
        let ttl: u32 = 10;
        let msg = build_a_response(0x0001, "sat.example.com", ttl);
        let scan = TtlScan::scan(&msg.clone()).expect("scan");

        let patched = cached(msg.clone(), scan.ttl_offsets.clone()).patched_for(0x0001, ttl + 50);

        // Must saturate at 0, not wrap around.
        assert_eq!(
            read_u32_at(&patched, scan.ttl_offsets[0]),
            0,
            "TTL must saturate at 0"
        );
    }

    /// TTL already at 0 — must stay 0 after decrement.
    #[test]
    fn patch_response_zero_ttl_stays_zero() {
        let msg = build_a_response(0x0001, "zero.example.com", 0);
        let scan = TtlScan::scan(&msg.clone()).expect("scan");

        let patched = cached(msg.clone(), scan.ttl_offsets.clone()).patched_for(0x0001, 100);
        assert_eq!(read_u32_at(&patched, scan.ttl_offsets[0]), 0);
    }

    /// Out-of-bounds offsets must be silently skipped (no panic).
    #[test]
    fn patch_response_out_of_bounds_offset_skipped() {
        let msg = build_a_response(0x0001, "oob.example.com", 100);
        let bad_offsets: Vec<usize> = vec![msg.len() - 1, msg.len(), msg.len() + 100];

        // Must not panic.
        let _ = cached(msg.clone(), bad_offsets).patched_for(0x0001, 10);
    }

    /// Buffer shorter than 2 bytes — no txn-id patch, no panic.
    #[test]
    fn patch_response_short_buffer_no_panic() {
        let buf = &[0xAAu8]; // 1 byte — too short to hold a txn-id
        let patched = cached(Bytes::copy_from_slice(buf), vec![]).patched_for(0xBEEF, 0);
        // Must not panic and must return the unchanged single byte.
        assert_eq!(&patched[..], &[0xAAu8]);
    }

    /// Empty buffer — no panic and empty output.
    #[test]
    fn patch_response_empty_buffer_no_panic() {
        let patched = cached(Bytes::new(), vec![]).patched_for(0x1234, 10);
        assert_eq!(patched.len(), 0);
    }

    // ── DnsCache: insert + get ────────────────────────────────────────────────

    #[tokio::test]
    async fn cache_hit_returns_patched_bytes() {
        let cache = DnsCache::new(100, 1, 3600);
        let question = make_question("hit.example.com");
        let ttl: u32 = 300;
        let msg = build_a_response(0xAAAA, "hit.example.com", ttl);
        let bytes_ref: Bytes = msg.clone();
        let scan = TtlScan::scan(&bytes_ref).expect("scan");

        cache
            .insert(question.clone(), msg, scan.ttl_offsets.clone(), ttl)
            .await;

        let client_id: u16 = 0xBEEF;
        let result = cache.get(&question, client_id).await;
        assert!(result.is_some(), "expected a cache hit");

        let patched = result.unwrap();
        assert_eq!(
            read_txn_id(&patched),
            client_id,
            "transaction ID must be patched to client_id"
        );
    }

    /// `clear()` invalidates every entry, so a previously-cached answer misses
    /// afterwards. Guards the conditional-forward zone-edit flush.
    #[tokio::test]
    async fn clear_invalidates_all_entries() {
        let cache = DnsCache::new(100, 1, 3600);
        let question = make_question("flush.example.com");
        let msg = build_a_response(0xAAAA, "flush.example.com", 300);
        let scan = TtlScan::scan(&msg.clone()).expect("scan");
        cache
            .insert(question.clone(), msg, scan.ttl_offsets, 300)
            .await;
        assert!(cache.get(&question, 0x1).await.is_some(), "entry cached");

        cache.clear();
        cache.run_pending_tasks().await;

        assert!(
            cache.get(&question, 0x1).await.is_none(),
            "clear() must invalidate the entry"
        );
    }

    #[tokio::test]
    async fn cache_miss_returns_none() {
        let cache = DnsCache::new(100, 1, 3600);
        let question = make_question("miss.example.com");
        let result = cache.get(&question, 0x1234).await;
        assert!(result.is_none(), "expected a cache miss");
    }

    /// Two distinct questions must not collide in the cache.
    #[tokio::test]
    async fn different_questions_do_not_collide() {
        let cache = DnsCache::new(100, 1, 3600);

        let q1 = make_question("a.example.com");
        let q2 = make_question("b.example.com");

        let msg1 = build_a_response(0x0001, "a.example.com", 100);
        let msg2 = build_a_response(0x0002, "b.example.com", 200);

        let bytes1: Bytes = msg1.clone();
        let scan1 = TtlScan::scan(&bytes1).unwrap();
        let bytes2: Bytes = msg2.clone();
        let scan2 = TtlScan::scan(&bytes2).unwrap();

        cache.insert(q1.clone(), msg1, scan1.ttl_offsets, 100).await;
        cache.insert(q2.clone(), msg2, scan2.ttl_offsets, 200).await;

        assert!(cache.get(&q1, 0xAAAA).await.is_some(), "q1 should hit");
        assert!(cache.get(&q2, 0xBBBB).await.is_some(), "q2 should hit");

        // A completely absent key must miss.
        let q3 = make_question("c.example.com");
        assert!(cache.get(&q3, 0xCCCC).await.is_none(), "q3 should miss");
    }

    // ── DnsCache: expiry (real sleep, one test) ───────────────────────────────

    /// Insert with expiry_ttl_secs = 1 (min_ttl = 1); get immediately → hit;
    /// wait 1.1 s; get again → miss.
    ///
    /// This test uses a real ~1.1 s sleep because moka uses a real (quanta)
    /// clock, not Tokio's virtual clock.  It is the only test in this module
    /// that sleeps.
    #[tokio::test]
    async fn cache_entry_expires_after_ttl() {
        let cache = DnsCache::new(100, 1, 3600);
        let question = make_question("expire.example.com");
        let msg = build_a_response(0x0001, "expire.example.com", 1);
        let bytes_ref: Bytes = msg.clone();
        let scan = TtlScan::scan(&bytes_ref).expect("scan");

        // Insert with 1-second TTL.
        cache
            .insert(question.clone(), msg, scan.ttl_offsets, 1)
            .await;

        // Immediate get → should hit.
        assert!(
            cache.get(&question, 0x0001).await.is_some(),
            "entry should be present immediately after insert"
        );

        // Wait for the entry to expire.
        tokio::time::sleep(Duration::from_millis(1100)).await;
        cache.run_pending_tasks().await;

        // After expiry → should miss.
        assert!(
            cache.get(&question, 0x0001).await.is_none(),
            "entry should have expired after 1.1 s"
        );
    }

    // ── DnsCache: TTL clamping at insert ──────────────────────────────────────

    /// Verify the TTL clamp by observing that an entry stored with a very small
    /// supplied TTL gets bumped to min_ttl (entry survives a short wait that
    /// would have expired the raw supplied TTL).
    ///
    /// Specifically: min_ttl = 5, supply expiry_ttl_secs = 0.  The clamped TTL
    /// is 5 s.  After 100 ms the entry must still be present.
    #[tokio::test]
    async fn insert_clamps_ttl_to_min() {
        let cache = DnsCache::new(100, 5, 3600);
        let question = make_question("clamp.example.com");
        let msg = build_a_response(0x0001, "clamp.example.com", 1);
        let bytes_ref: Bytes = msg.clone();
        let scan = TtlScan::scan(&bytes_ref).expect("scan");

        // Supply ttl=0 (below min_ttl=5) — should be clamped to 5 s.
        cache
            .insert(question.clone(), msg, scan.ttl_offsets, 0)
            .await;

        // 100 ms later the entry must still be present (it would be gone if 0 s
        // were honoured verbatim).
        tokio::time::sleep(Duration::from_millis(100)).await;
        cache.run_pending_tasks().await;

        assert!(
            cache.get(&question, 0x0001).await.is_some(),
            "entry clamped to min_ttl=5 should still be present after 100 ms"
        );
    }

    // ── Re-parse patched bytes ────────────────────────────────────────────────

    /// Build a response, scan TTLs, patch it, then re-parse the patched bytes
    /// with Header::read and TtlScan to confirm structural validity.
    #[test]
    fn patched_bytes_are_re_parseable() {
        let original_ttl: u32 = 120;
        let msg = build_a_response(0xFFFF, "reparse.example.com", original_ttl);
        let bytes_ref: Bytes = msg.clone();
        let scan = TtlScan::scan(&bytes_ref).expect("scan");

        let elapsed: u32 = 20;
        let client_id: u16 = 0x4242;

        let patched_bytes =
            cached(msg.clone(), scan.ttl_offsets.clone()).patched_for(client_id, elapsed);

        // The patched message must still be parseable by TtlScan.
        let patched_scan = TtlScan::scan(&patched_bytes).expect("patched message must scan");
        assert_eq!(patched_scan.ttl_offsets.len(), 1);

        // The header ID must match client_id.
        let mut reader = Reader::new(patched_bytes.clone());
        let header = Header::read(&mut reader).expect("header must parse");
        assert_eq!(
            header.id, client_id,
            "patched header.id must match client_id"
        );

        // The TTL at the recorded offset must equal original − elapsed.
        let expected = original_ttl.saturating_sub(elapsed);
        assert_eq!(
            read_u32_at(&patched_bytes, scan.ttl_offsets[0]),
            expected,
            "TTL at offset must be decremented"
        );
    }
}