net-mesh 0.27.0

High-performance, schema-agnostic, backend-agnostic event bus
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
//! Payload storage segments for RedEX.
//!
//! v1 has one segment type: `HeapSegment`, a grow-only `Vec<u8>` payload
//! store returning `(offset, len)` for each append. Range reads return
//! `Bytes` slices over the underlying buffer.
//!
//! Reclamation (for retention) is deferred: truncation of the head of
//! the buffer happens on the retention sweep by rewriting the segment
//! plus adjusting a `base_offset` that callers subtract from stored
//! entry offsets. v1 keeps that machinery inside `RedexFile`; this
//! module just provides the primitive append+read surface.

use bytes::{Bytes, BytesMut};

use super::error::RedexError;

/// Maximum heap segment size before `append` fails with
/// `PayloadTooLarge`. 32-bit offsets imply 4 GB hard max; we stay 1 GB
/// below to leave room for concurrent appends during a retention sweep.
pub(super) const MAX_SEGMENT_BYTES: usize = 3 * 1024 * 1024 * 1024; // 3 GB

/// Backing store for a [`HeapSegment`], in one of two states.
///
/// The active `Mut` state is the append hot path: `extend_from_slice`
/// straight into a `BytesMut`, no per-append round-trip. A `read`
/// transitions to `Frozen` (one `freeze`) so it can hand out zero-copy
/// `Bytes` slices; the next `append` transitions back to `Mut` via
/// `try_into_mut` (O(1) when no reader holds an outstanding slice; a
/// single copy when one does). An append-only burst therefore never
/// freezes — see the type docs for why this matters.
#[derive(Debug)]
enum Buf {
    /// Appendable. No outstanding zero-copy reader slice can reference
    /// this allocation (reads transition to `Frozen` first).
    Mut(BytesMut),
    /// Immutable snapshot. `read` slices this with a refcount bump.
    Frozen(Bytes),
}

impl Buf {
    #[inline]
    fn len(&self) -> usize {
        match self {
            Buf::Mut(m) => m.len(),
            Buf::Frozen(b) => b.len(),
        }
    }
}

/// In-memory payload segment.
///
/// Append-only from the caller's perspective. The retention sweep may
/// rewrite the buffer and advance `base_offset` to drop evicted heads;
/// all live offsets stored in `RedexEntry` records are absolute over
/// the logical seq-space and translated through `base_offset` on read.
///
/// `read` returns zero-copy `Bytes::slice` snapshots — refcount bumps
/// only (perf #51). Pre-#51 `read` did a full `Bytes::copy_from_slice`
/// on every call, ~400 MB/s of wasted memcpy for a 4 KB watcher at
/// 100K ev/s.
///
/// The buffer is held in two states ([`Buf`]) so the zero-copy read
/// win doesn't tax the write hot path. The first cut of #51 held the
/// buffer as a single `Bytes` and round-tripped
/// `try_into_mut` + `freeze` on **every** append — a constant
/// per-append cost (atomic refcount churn + `Shared` init) that
/// regressed `cortex_ingest/tasks_create` ~1.5× (125 ns → 190 ns).
/// Now appends extend an active `BytesMut` directly and only `freeze`
/// lazily when a `read` actually needs a snapshot, so an append-only
/// burst pays zero freezes while reads stay zero-copy. Outstanding
/// reader slices keep their portion of the old allocation alive via
/// refcount and stay valid across the next append's transition.
#[derive(Debug)]
pub struct HeapSegment {
    buf: Buf,
    /// The absolute offset of the first byte currently in the buffer.
    /// Starts at 0 and increases as eviction compacts the head.
    base_offset: u64,
}

impl HeapSegment {
    /// Create an empty segment.
    pub fn new() -> Self {
        Self {
            buf: Buf::Mut(BytesMut::new()),
            base_offset: 0,
        }
    }

    /// Create an empty segment with `capacity` bytes reserved.
    pub fn with_capacity(capacity: usize) -> Self {
        Self {
            buf: Buf::Mut(BytesMut::with_capacity(capacity)),
            base_offset: 0,
        }
    }

    /// Build a segment from pre-existing payload bytes (e.g. replayed
    /// from disk). The bytes become the live region starting at
    /// absolute offset 0. Kept appendable (`Mut`) so the first append
    /// after recovery is a plain extend.
    #[cfg(feature = "redex-disk")]
    pub(super) fn from_existing(buf: Vec<u8>) -> Self {
        Self {
            buf: Buf::Mut(BytesMut::from(&buf[..])),
            base_offset: 0,
        }
    }

    /// Ensure the buffer is in the `Mut` state and return it for
    /// appending.
    ///
    /// No-op when already `Mut` (the append-burst common case — pure
    /// `extend_from_slice`, no round-trip). When `Frozen`, reclaims the
    /// allocation via [`Bytes::try_into_mut`] (O(1) if no reader holds
    /// an outstanding slice) or, when a reader still does, copies the
    /// live region once into a fresh `BytesMut` — that reader's slice
    /// keeps the old allocation alive via its own refcount.
    fn ensure_mut(&mut self) -> &mut BytesMut {
        if let Buf::Frozen(_) = self.buf {
            let Buf::Frozen(bytes) = std::mem::replace(&mut self.buf, Buf::Mut(BytesMut::new()))
            else {
                unreachable!("just matched Frozen")
            };
            let m = match bytes.try_into_mut() {
                Ok(m) => m,
                Err(bytes) => {
                    let mut m = BytesMut::with_capacity(bytes.len());
                    m.extend_from_slice(&bytes);
                    m
                }
            };
            self.buf = Buf::Mut(m);
        }
        match &mut self.buf {
            Buf::Mut(m) => m,
            Buf::Frozen(_) => unreachable!("converted to Mut above"),
        }
    }

    /// Ensure the buffer is in the `Frozen` state and return the
    /// snapshot for zero-copy slicing. Freezes once on the
    /// `Mut`→`Frozen` transition; subsequent reads with no intervening
    /// append are pure refcount bumps.
    fn ensure_frozen(&mut self) -> &Bytes {
        if let Buf::Mut(_) = self.buf {
            let Buf::Mut(m) = std::mem::replace(&mut self.buf, Buf::Frozen(Bytes::new())) else {
                unreachable!("just matched Mut")
            };
            self.buf = Buf::Frozen(m.freeze());
        }
        match &self.buf {
            Buf::Frozen(b) => b,
            Buf::Mut(_) => unreachable!("converted to Frozen above"),
        }
    }

    /// Append `payload` and return the absolute offset it was written
    /// at (offset in the logical seq-space, not in the backing buffer).
    pub fn append(&mut self, payload: &[u8]) -> Result<u64, RedexError> {
        let live = self.buf.len();
        if live.saturating_add(payload.len()) > MAX_SEGMENT_BYTES {
            return Err(RedexError::PayloadTooLarge {
                size: payload.len(),
                max: MAX_SEGMENT_BYTES.saturating_sub(live),
            });
        }
        let offset = self.base_offset + live as u64;
        self.ensure_mut().extend_from_slice(payload);
        Ok(offset)
    }

    /// Append every payload in `payloads` in order. Returns the
    /// absolute offset the FIRST payload was written at; subsequent
    /// payloads land at successive offsets.
    ///
    /// Performs one bounds check against the total size and one
    /// `reserve` before extending — equivalent to N `append` calls
    /// but with a single capacity check and a single allocation
    /// when the buffer needs to grow.
    pub fn append_many(&mut self, payloads: &[Bytes]) -> Result<u64, RedexError> {
        let total: usize = payloads.iter().map(|p| p.len()).sum();
        let live = self.buf.len();
        if live.saturating_add(total) > MAX_SEGMENT_BYTES {
            return Err(RedexError::PayloadTooLarge {
                size: total,
                max: MAX_SEGMENT_BYTES.saturating_sub(live),
            });
        }
        let first = self.base_offset + live as u64;
        let m = self.ensure_mut();
        m.reserve(total);
        for p in payloads {
            m.extend_from_slice(p);
        }
        Ok(first)
    }

    /// Read `len` bytes starting at absolute `offset`. Returns `None`
    /// if the slice is not fully contained in the live region
    /// (evicted or past end).
    ///
    /// Zero-copy: returns a [`Bytes`] slice that shares the
    /// underlying allocation with the segment (refcount bump only).
    /// Takes `&mut self` because the first read after an append
    /// freezes the active buffer once; callers already hold the
    /// `RedexFile` state lock exclusively, so this adds no contention.
    pub fn read(&mut self, offset: u64, len: u32) -> Option<Bytes> {
        let len = len as usize;
        if offset < self.base_offset {
            return None;
        }
        let rel = (offset - self.base_offset) as usize;
        let end = rel.checked_add(len)?;
        if end > self.buf.len() {
            return None;
        }
        Some(self.ensure_frozen().slice(rel..end))
    }

    /// Number of live bytes currently in the segment.
    pub fn live_bytes(&self) -> usize {
        self.buf.len()
    }

    /// Absolute offset of the first live byte. Anything below this has
    /// been evicted.
    pub fn base_offset(&self) -> u64 {
        self.base_offset
    }

    /// Test-only: forcibly set the absolute base offset without
    /// touching the buffer. Used to simulate a long-lifetime file
    /// where eviction has pushed `base_offset` near `u32::MAX`,
    /// triggering the overflow path in `file.rs::offset_to_u32` and
    /// the pre-validation in `append_batch` / `append_batch_ordered`.
    #[cfg(test)]
    pub(super) fn force_base_offset(&mut self, base: u64) {
        self.base_offset = base;
    }

    /// Reset `base_offset` to zero without touching the buffer.
    ///
    /// Used by `RedexFile::sweep_retention` after a successful
    /// `disk.compact_to`: the on-disk dat is now rewritten to start
    /// at byte 0, so the in-memory segment must follow the same
    /// renormalization or subsequent appends will compute absolute
    /// offsets that index past the end of the new on-disk dat (BUG
    /// #92). Caller is responsible for renormalizing any external
    /// offsets (e.g. `RedexEntry::payload_offset` values stored in
    /// the index) by the prior `base_offset` value before calling
    /// this — otherwise reads through `read_at` will misalign.
    #[cfg(feature = "redex-disk")]
    pub(super) fn rebase_to_zero(&mut self) {
        self.base_offset = 0;
    }

    /// Test-only: mutate the underlying byte buffer in place.
    /// Used by checksum-on-read regression tests to simulate
    /// on-disk corruption without going through a real I/O path.
    ///
    /// Takes a closure rather than returning `&mut [u8]` because the
    /// mutable view borrows the active `BytesMut` for the closure's
    /// scope; a returned `&mut [u8]` would outlive a subsequent
    /// state transition.
    #[cfg(test)]
    pub(super) fn with_bytes_for_test_mut<F>(&mut self, f: F)
    where
        F: FnOnce(&mut [u8]),
    {
        let m = self.ensure_mut();
        f(&mut m[..]);
    }

    /// Test-only: data pointer of the current live buffer, for the
    /// zero-copy read pins. `BytesMut::freeze` (the `Mut`→`Frozen`
    /// transition) converts in place without moving the allocation,
    /// so a pointer captured while `Mut` equals the `Frozen` slice
    /// pointer a subsequent `read` returns — letting the pins prove
    /// `read` is zero-copy across the freeze.
    #[cfg(test)]
    pub(super) fn buf_data_ptr(&self) -> *const u8 {
        match &self.buf {
            Buf::Mut(m) => m.as_ptr(),
            Buf::Frozen(b) => b.as_ptr(),
        }
    }

    /// Evict the prefix of the segment strictly below `new_base` in
    /// the absolute offset space.
    ///
    /// Returns the number of bytes evicted. The retained tail is
    /// copied into a fresh appendable `BytesMut` so subsequent appends
    /// stay on the zero-round-trip fast path; any existing reader
    /// slices into the evicted prefix stay valid via their own
    /// refcounts to the prior allocation.
    pub fn evict_prefix_to(&mut self, new_base: u64) -> u64 {
        if new_base <= self.base_offset {
            return 0;
        }
        let live = self.buf.len();
        let delta = (new_base - self.base_offset) as usize;
        let delta = delta.min(live);
        // Copy the retained tail into a fresh `BytesMut` and keep the
        // segment appendable. Storing the tail as `Mut` (rather than a
        // zero-copy `slice` of the old allocation) means the next
        // append is a plain extend instead of a `try_into_mut` that
        // would have to copy off the still-referenced prior buffer.
        let mut m = BytesMut::with_capacity(live - delta);
        match &self.buf {
            Buf::Mut(b) => m.extend_from_slice(&b[delta..]),
            Buf::Frozen(b) => m.extend_from_slice(&b[delta..]),
        }
        self.buf = Buf::Mut(m);
        self.base_offset += delta as u64;
        delta as u64
    }
}

impl Default for HeapSegment {
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_append_and_read() {
        let mut seg = HeapSegment::new();
        let o1 = seg.append(b"hello").unwrap();
        let o2 = seg.append(b"world").unwrap();
        assert_eq!(o1, 0);
        assert_eq!(o2, 5);

        assert_eq!(seg.read(o1, 5).unwrap().as_ref(), b"hello");
        assert_eq!(seg.read(o2, 5).unwrap().as_ref(), b"world");
    }

    #[test]
    fn test_read_out_of_range_returns_none() {
        let mut seg = HeapSegment::new();
        seg.append(b"abc").unwrap();
        assert!(seg.read(10, 3).is_none()); // offset past end
        assert!(seg.read(0, 10).is_none()); // len overruns
    }

    #[test]
    fn test_evict_prefix() {
        let mut seg = HeapSegment::new();
        seg.append(b"aaaa").unwrap();
        let o2 = seg.append(b"bbbb").unwrap();
        assert_eq!(o2, 4);

        let evicted = seg.evict_prefix_to(4);
        assert_eq!(evicted, 4);
        assert_eq!(seg.base_offset(), 4);
        assert_eq!(seg.live_bytes(), 4);

        // Old offset 0 is now below base
        assert!(seg.read(0, 4).is_none());
        // New read from o2 still works
        assert_eq!(seg.read(o2, 4).unwrap().as_ref(), b"bbbb");
    }

    #[test]
    fn test_evict_below_base_is_noop() {
        let mut seg = HeapSegment::new();
        seg.append(b"abc").unwrap();
        seg.evict_prefix_to(10);
        assert_eq!(seg.base_offset(), 3);
        // Further eviction below current base does nothing.
        assert_eq!(seg.evict_prefix_to(1), 0);
    }

    #[test]
    fn test_evict_beyond_live_clamps() {
        let mut seg = HeapSegment::new();
        seg.append(b"xyz").unwrap();
        // Eviction beyond the tail should clamp to tail without panic.
        let evicted = seg.evict_prefix_to(100);
        assert_eq!(evicted, 3);
        assert_eq!(seg.base_offset(), 3);
        assert_eq!(seg.live_bytes(), 0);
    }

    #[test]
    fn test_append_many_basic() {
        let mut seg = HeapSegment::new();
        // Pre-fill so the returned offset isn't 0 — pins that
        // `append_many` honors the existing buffer length.
        let pre = seg.append(b"prefix").unwrap();
        assert_eq!(pre, 0);

        let payloads: Vec<Bytes> = vec![
            Bytes::from_static(b"alpha"),
            Bytes::from_static(b"beta"),
            Bytes::from_static(b"gamma"),
        ];
        let first = seg.append_many(&payloads).unwrap();
        assert_eq!(first, 6, "first payload offset == prefix len");

        // Each payload must be readable at first + sum(prev lens).
        assert_eq!(seg.read(6, 5).unwrap().as_ref(), b"alpha");
        assert_eq!(seg.read(11, 4).unwrap().as_ref(), b"beta");
        assert_eq!(seg.read(15, 5).unwrap().as_ref(), b"gamma");
        assert_eq!(seg.live_bytes(), 6 + 5 + 4 + 5);
    }

    #[test]
    fn test_append_many_capacity_exceeded() {
        let mut seg = HeapSegment::new();
        // One huge payload (1 GiB) so the first append succeeds, then
        // a batch whose total pushes us past `MAX_SEGMENT_BYTES` (3
        // GiB). This is the multi-payload bounds-check path that a
        // per-payload loop would not catch until partway through.
        let big = vec![0u8; 1024 * 1024 * 1024];
        seg.append(&big).unwrap();
        seg.append(&big).unwrap();
        seg.append(&big).unwrap();
        // Now at MAX exactly. A two-payload batch totaling 2 bytes
        // must still be rejected.
        let payloads: Vec<Bytes> = vec![Bytes::from_static(b"x"), Bytes::from_static(b"y")];
        assert!(matches!(
            seg.append_many(&payloads),
            Err(RedexError::PayloadTooLarge { .. })
        ));
        // And the buffer state stayed clean — no partial extension.
        assert_eq!(seg.live_bytes(), MAX_SEGMENT_BYTES);
    }

    /// Pin perf #51: `read` returns a zero-copy `Bytes` slice
    /// over the underlying buffer, not a copy. We observe this
    /// by computing the buffer's allocation identity (the raw
    /// pointer) and asserting `read` returns a slice into the
    /// same allocation.
    ///
    /// `Bytes::as_ptr()` points into the buffer's data; for a
    /// non-empty Bytes that's the address of byte 0. For a
    /// slice produced by `Bytes::slice(r)` that's the address
    /// of byte `r.start` IN THE SAME allocation — i.e., we can
    /// compute `slice.as_ptr() - r.start` and it must equal the
    /// original buffer's `as_ptr()`.
    ///
    /// A regression that re-introduces `Bytes::copy_from_slice`
    /// allocates a fresh buffer; the pointer arithmetic above
    /// would yield an unrelated address.
    #[test]
    fn read_returns_zero_copy_slice_of_underlying_buffer() {
        let mut seg = HeapSegment::new();
        let payload = b"the quick brown fox jumps over the lazy dog";
        seg.append(payload).unwrap();
        let buf_ptr = seg.buf_data_ptr();

        let slice = seg.read(0, payload.len() as u32).unwrap();
        // Full-range read returns a Bytes whose data pointer
        // matches the buffer's data pointer exactly.
        assert_eq!(
            slice.as_ptr(),
            buf_ptr,
            "read(0, len) must return a zero-copy slice of the segment buffer",
        );
        assert_eq!(slice.as_ref(), payload);

        // Sub-range read: data pointer is offset by the start of
        // the range within the SAME allocation. Pre-fix
        // `Bytes::copy_from_slice` would put `sub` in a fresh
        // allocation completely unrelated to `buf_ptr`.
        let sub = seg.read(4, 5).unwrap();
        // Compute the address delta via `usize::wrapping_sub`,
        // not `<*const u8>::offset_from`. `offset_from` is
        // documented UB when the two pointers are NOT from the
        // same allocation — which is exactly the regression case
        // this test is trying to detect (a re-introduced
        // `Bytes::copy_from_slice` would place `sub` in a fresh
        // allocation unrelated to `buf_ptr`). The integer-cast
        // form is well-defined for any pointer values: in the
        // zero-copy case it yields exactly 4; in the regression
        // case it yields some large unrelated number that fails
        // the equality assertion cleanly without invoking UB.
        let sub_offset = (sub.as_ptr() as usize).wrapping_sub(buf_ptr as usize);
        assert_eq!(
            sub_offset, 4,
            "sub-range read must be a slice into the original buffer at offset 4; \
             got offset {sub_offset} (a regression here means read deep-copies)",
        );
        assert_eq!(sub.as_ref(), b"quick");
    }

    /// Companion to the zero-copy pin above: prove the
    /// `wrapping_sub` comparison correctly detects the
    /// regression-case shape (a `Bytes` from a fresh allocation
    /// unrelated to the segment buffer). Pre-fix this test
    /// would have used `<*const u8>::offset_from`, which is
    /// documented UB across allocations — the very case the
    /// regression test is meant to detect. The integer-cast
    /// form (`(p as usize).wrapping_sub(q as usize)`) is
    /// well-defined for any pointer pair.
    ///
    /// We construct what a deep-copy regression WOULD return:
    /// a fresh `Bytes::copy_from_slice` of the bytes at offset
    /// 4, holding the same content as the sub-slice but in a
    /// new allocation. The wrapping_sub against the segment's
    /// `buf_ptr` must yield some non-4 value (with overwhelming
    /// probability — distinct heap allocations don't land at a
    /// fixed offset of each other).
    #[test]
    fn read_zero_copy_pin_detects_deep_copy_via_wrapping_sub() {
        let mut seg = HeapSegment::new();
        let payload = b"the quick brown fox jumps over the lazy dog";
        seg.append(payload).unwrap();
        let buf_ptr = seg.buf_data_ptr();

        // Simulate the regression: a fresh allocation carrying
        // the same five bytes the zero-copy `read(4, 5)` would
        // produce. `Bytes::copy_from_slice` allocates a brand
        // new buffer; its data pointer is unrelated to
        // `buf_ptr`.
        let fake_deep_copy = bytes::Bytes::copy_from_slice(&payload[4..9]);
        assert_eq!(fake_deep_copy.as_ref(), b"quick");
        let fake_offset = (fake_deep_copy.as_ptr() as usize).wrapping_sub(buf_ptr as usize);
        // The two addresses live in different allocations. The
        // wrapping_sub is some arbitrary non-4 value — we can't
        // predict it, but we can assert it isn't 4 (the rare
        // collision case where the allocator happens to lay the
        // fresh buffer exactly 4 bytes past the segment buffer
        // is vanishingly improbable on any real allocator, and
        // even a deliberate adversary couldn't arrange it
        // through public API).
        assert_ne!(
            fake_offset, 4,
            "wrapping_sub form must distinguish a fresh allocation \
             from a same-allocation sub-slice — if these collide \
             the zero-copy pin would no longer detect a deep-copy \
             regression",
        );

        // And confirm the wrapping_sub form is non-UB by running
        // it across the two unrelated pointers without exploding.
        let _well_defined: usize = fake_offset;
    }

    #[test]
    fn test_append_many_empty_returns_current_end() {
        let mut seg = HeapSegment::new();
        seg.append(b"xyz").unwrap();
        // Empty batch is a no-op that returns the current end offset.
        let off = seg.append_many(&[]).unwrap();
        assert_eq!(off, 3);
        assert_eq!(seg.live_bytes(), 3);
    }
}