nectar-postage-usage 0.3.0

Self-hosted postage batch utilization snapshots for Ethereum Swarm
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
//! A high-level async facade over the snapshot persistence machinery.
//!
//! The low-level path ([`Snapshot`], [`revalidate`](Snapshot::revalidate),
//! [`Validated::plan_persist`], [`seal_plan`], and the chunk-address helpers) is
//! a cross-machine ceremony of roughly a dozen calls: derive the root address,
//! fetch and parse the root, fetch every leaf, reassemble, issue, re-read the
//! live floor, revalidate, plan, choose a strictly increasing timestamp, seal,
//! and upload each sealed chunk. [`BatchStamper`] collapses that into
//! `open` / `stamp` / `flush`, leaving the consumer to supply only the network
//! through the [`SnapshotSource`] and [`SnapshotSink`] traits.
//!
//! Power users who need partial persists, custom timestamp policies, or direct
//! access to the [`PersistPlan`] should keep reaching for the low-level
//! [`Snapshot`] / [`revalidate`](Snapshot::revalidate) / [`seal_plan`] path; the
//! facade is purely additive and never hides it.
//!
//! # Floor safety
//!
//! The whole point of the published-sequence floor (nectar issue #70) is that a
//! persist can never regress the version published at a snapshot's own chunk
//! addresses. That guarantee survives only if a failed network read is never
//! mistaken for "the chunk is absent". [`SnapshotSource::fetch`] therefore
//! distinguishes `Ok(None)` (the network definitively agrees the chunk does not
//! exist) from `Err` (the read could not be completed). [`BatchStamper::open`]
//! and [`BatchStamper::flush`] both abort on `Err`: a transport failure never
//! becomes a fresh sequence-0 start nor a [`PublishedSequence::NONE`] floor over
//! a batch that already has a published root.

use alloc::vec::Vec;

use web_time::{SystemTime, UNIX_EPOCH};

use alloy_primitives::Address;
use alloy_signer::SignerSync;
use bytes::Bytes;
use nectar_postage::{Batch, BatchId, StampIndex};
use nectar_primitives::SwarmAddress;
use thiserror::Error;

use crate::codec::RootInfo;
use crate::seal::{SealError, SealedChunk, seal_plan};
use crate::snapshot::{PublishedSequence, Snapshot};
use crate::{UsageError, usage_chunk_address};

/// Reads a chunk's payload from the network by its single-owner-chunk address.
///
/// The returned [`Bytes`] are the chunk's data payload, the snapshot payload
/// [`RootInfo::parse`] consumes, not the full single-owner-chunk wire form.
///
/// `Ok(None)` means the chunk is *definitively* absent: the network agrees it
/// does not exist. `Err` means the read could not be completed and the caller
/// must *not* treat the chunk as absent. This distinction is load-bearing:
/// treating a transport failure as absence would read the published-sequence
/// floor as [`PublishedSequence::NONE`] and reopen the downgrade that nectar
/// issue #70 closes.
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait SnapshotSource {
    /// The error a failed read reports. A value of this type means the read did
    /// not complete; it never means the chunk is absent.
    type Error: core::error::Error + Send + Sync + 'static;

    /// Fetches the data payload of the chunk at `address`, or `Ok(None)` if the
    /// network confirms no such chunk exists.
    ///
    /// The returned future intentionally carries no `Send` bound: a native
    /// transport whose future is `Send` propagates that through
    /// [`BatchStamper::open`] and [`BatchStamper::flush`] automatically, while a
    /// browser transport with a `!Send` future can still implement this trait.
    fn fetch(
        &self,
        address: &SwarmAddress,
    ) -> impl core::future::Future<Output = Result<Option<Bytes>, Self::Error>>;
}

/// Publishes a sealed snapshot chunk (the single-owner chunk and its stamp) to
/// the network.
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait SnapshotSink {
    /// The error a failed publish reports.
    type Error: core::error::Error + Send + Sync + 'static;

    /// Uploads a sealed snapshot chunk.
    ///
    /// As with [`SnapshotSource::fetch`], the returned future carries no `Send`
    /// bound so a `!Send` browser transport can implement it; native `Send`-ness
    /// propagates through the facade on its own.
    fn push(
        &self,
        sealed: &SealedChunk,
    ) -> impl core::future::Future<Output = Result<(), Self::Error>>;
}

/// Errors produced by the [`BatchStamper`] facade.
///
/// The variants unify the lower-level error taxonomies ([`UsageError`],
/// [`SealError`]) with the consumer-supplied source and sink errors, so a caller
/// matches one enum across the whole `open` / `stamp` / `flush` cycle. The
/// [`Source`](Self::Source) and [`Sink`](Self::Sink) variants carry the
/// transport failures that abort `open` and `flush` rather than degrading into a
/// fresh or [`PublishedSequence::NONE`] persist.
#[non_exhaustive]
#[derive(Debug, Error)]
pub enum ClientError<SrcErr, SnkErr>
where
    SrcErr: core::error::Error + Send + Sync + 'static,
    SnkErr: core::error::Error + Send + Sync + 'static,
{
    /// A usage-table operation failed (issuance, revalidation against the floor,
    /// or planning the persist).
    #[error(transparent)]
    Usage(#[from] UsageError),
    /// Sealing the persist plan failed.
    #[error(transparent)]
    Seal(#[from] SealError),
    /// The [`SnapshotSource`] read could not be completed. This is *not* an absence:
    /// `open` and `flush` abort here rather than starting fresh or persisting
    /// against a [`PublishedSequence::NONE`] floor.
    #[error("chunk source read failed")]
    Source(#[source] SrcErr),
    /// The [`SnapshotSink`] publish failed.
    #[error("chunk sink publish failed")]
    Sink(#[source] SnkErr),
    /// A published root committed to a leaf the source reported as absent. A
    /// missing leaf for a published root is corruption, never a fresh batch.
    #[error("published root commits to leaf {index} but the source reports it absent")]
    MissingLeaf {
        /// The snapshot chunk index of the absent leaf (1 is the first leaf).
        index: u16,
    },
}

/// A high-level, async facade that collapses the cross-machine roam into
/// `open` / `stamp` / `flush`.
///
/// [`open`](Self::open) recovers a snapshot already published for this
/// `batch` + owner, or starts a fresh one when the network confirms none exists.
/// [`stamp`](Self::stamp) issues a content stamp locally, with no network round
/// trip. [`flush`](Self::flush) re-reads the live published floor, revalidates,
/// plans, seals with a strictly increasing timestamp, and uploads every sealed
/// chunk through the sink.
///
/// The owner is fixed at `open` from the signer's address, and every snapshot
/// chunk address is derived from it and the batch id, so a second machine
/// holding only the same key and batch id recovers the same state.
#[derive(Debug)]
pub struct BatchStamper<Sg, Src, Snk> {
    signer: Sg,
    owner: Address,
    batch_id: BatchId,
    source: Src,
    sink: Snk,
    snapshot: Snapshot,
    /// Whether a persist has been emitted in this session. A clean snapshot that
    /// has already persisted once this session makes [`flush`](Self::flush) a
    /// no-op; a clean but never-persisted snapshot still flushes once so a fresh
    /// batch publishes its sequence-1 root.
    persisted_this_session: bool,
}

impl<Sg, Src, Snk> BatchStamper<Sg, Src, Snk>
where
    Sg: SignerSync + alloy_signer::Signer,
    Src: SnapshotSource,
    Snk: SnapshotSink,
{
    /// Opens a stamper for `batch`, recovering published state or starting fresh.
    ///
    /// The owner is taken from `signer.address()`. The root chunk address is
    /// derived from the batch id, owner, and index 0, then read through `source`:
    ///
    /// - `Ok(Some(bytes))`: a published root exists. The root is parsed and every
    ///   leaf it commits to is fetched (a leaf the source reports absent is
    ///   corruption, surfaced as [`ClientError::MissingLeaf`], never a fresh
    ///   batch) and the snapshot is reassembled, preserving the recovered
    ///   sequence and slots.
    /// - `Ok(None)`: the network confirms no root exists, so a fresh snapshot is
    ///   built from the batch (picking fill-watermark or ring mutability from
    ///   `batch.immutable()`).
    /// - `Err`: the read could not be completed. This aborts; it never starts
    ///   fresh, so a transport failure cannot downgrade a batch that already has
    ///   a published root.
    pub async fn open(
        signer: Sg,
        batch: &Batch,
        source: Src,
        sink: Snk,
    ) -> Result<Self, ClientError<Src::Error, Snk::Error>> {
        let owner = signer.address();
        let batch_id = batch.id();
        let root_addr = usage_chunk_address(&batch_id, &owner, 0);

        let snapshot = match source
            .fetch(&root_addr)
            .await
            .map_err(ClientError::Source)?
        {
            Some(root_bytes) => {
                // A published root exists: recover its sequence and slots. Every
                // committed leaf must be present; a missing leaf is corruption,
                // not a reason to start over.
                let root = RootInfo::parse(&root_bytes)?;
                let mut leaves: Vec<Bytes> = Vec::with_capacity(root.leaf_count() as usize);
                for leaf in 0..root.leaf_count() {
                    let index = leaf + 1;
                    let leaf_addr = usage_chunk_address(&batch_id, &owner, index);
                    match source
                        .fetch(&leaf_addr)
                        .await
                        .map_err(ClientError::Source)?
                    {
                        Some(bytes) => leaves.push(bytes),
                        None => return Err(ClientError::MissingLeaf { index }),
                    }
                }
                root.assemble(&leaves)?
            }
            // The network confirms no published root: a genuinely fresh batch.
            None => Snapshot::from_batch(batch)?,
        };

        Ok(Self {
            signer,
            owner,
            batch_id,
            source,
            sink,
            snapshot,
            persisted_this_session: false,
        })
    }

    /// Issues a content stamp for `content`, advancing the matching bucket
    /// counter and returning the assigned stamp index.
    ///
    /// This is local issuance: it touches no network. The reserved snapshot slots
    /// are skipped by construction, so a stamp never lands on the snapshot's own
    /// chunks. Persist the resulting state with [`flush`](Self::flush).
    pub fn stamp(
        &mut self,
        content: &SwarmAddress,
    ) -> Result<StampIndex, ClientError<Src::Error, Snk::Error>> {
        Ok(self.snapshot.issuer(self.owner).record_address(content)?)
    }

    /// Persists the snapshot: re-reads the live floor, revalidates, plans, seals,
    /// and uploads every sealed chunk.
    ///
    /// A no-op (returns `Ok`) when the snapshot is clean and a persist has already
    /// happened this session. Otherwise the live root chunk is re-read through the
    /// source to derive the published floor:
    ///
    /// - `Ok(Some(bytes))`: the floor is the published root's sequence.
    /// - `Ok(None)`: the network confirms no published root, so the floor is
    ///   [`PublishedSequence::NONE`].
    /// - `Err`: the read could not be completed. This aborts; it never persists
    ///   against a floor it could not read.
    ///
    /// The seal timestamp is the wall clock, nudged past the previous seal so the
    /// in-process monotonicity guard in [`seal_plan`] never trips and the reserve
    /// overwrites each metadata chunk in place. A persist whose next sequence does
    /// not strictly exceed the live floor surfaces as
    /// [`UsageError::StaleSequence`].
    pub async fn flush(&mut self) -> Result<(), ClientError<Src::Error, Snk::Error>> {
        if !self.snapshot.is_dirty() && self.persisted_this_session {
            return Ok(());
        }

        let root_addr = usage_chunk_address(&self.batch_id, &self.owner, 0);
        let floor = match self
            .source
            .fetch(&root_addr)
            .await
            .map_err(ClientError::Source)?
        {
            Some(root_bytes) => PublishedSequence::from(&RootInfo::parse(&root_bytes)?),
            // The network confirms no published root: the floor is NONE.
            None => PublishedSequence::NONE,
        };

        let plan = self.snapshot.revalidate(floor)?.plan_persist(&self.owner)?;

        // The seal timestamp must strictly increase across flushes so the reserve
        // overwrites each metadata chunk in place. Take the wall clock, but lift
        // it past the previous seal so a coarse or non-advancing clock never trips
        // the in-process guard in `seal_plan`.
        let now = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .map(|d| d.as_secs())
            .unwrap_or(0);
        let timestamp = self
            .snapshot
            .last_seal_timestamp()
            .map_or(now, |previous| now.max(previous + 1));

        let sealed = seal_plan(&mut self.snapshot, &plan, timestamp, &self.signer)?;
        for chunk in &sealed {
            self.sink.push(chunk).await.map_err(ClientError::Sink)?;
        }

        self.persisted_this_session = true;
        Ok(())
    }

    /// Returns the wrapped snapshot.
    pub const fn snapshot(&self) -> &Snapshot {
        &self.snapshot
    }

    /// Returns the batch owner, fixed at [`open`](Self::open) from the signer.
    pub const fn owner(&self) -> Address {
        self.owner
    }

    /// Returns the batch id this stamper persists into.
    pub const fn batch_id(&self) -> BatchId {
        self.batch_id
    }

    /// Returns whether the snapshot has unpersisted issuance since the last
    /// [`flush`](Self::flush).
    pub const fn is_dirty(&self) -> bool {
        self.snapshot.is_dirty()
    }
}

#[cfg(test)]
mod tests {
    use alloc::collections::BTreeMap;
    use std::sync::Mutex;

    use alloy_primitives::B256;
    use alloy_signer_local::PrivateKeySigner;

    use super::*;
    use crate::{Mutability, UsageTable};

    /// A shared in-memory network backing both a [`SnapshotSource`] and a
    /// [`SnapshotSink`], keyed by single-owner-chunk address.
    #[derive(Debug, Default, Clone)]
    struct MemNet {
        chunks: std::sync::Arc<Mutex<BTreeMap<SwarmAddress, Bytes>>>,
    }

    #[derive(Debug, Error)]
    #[error("mem net error")]
    struct MemError;

    impl SnapshotSource for MemNet {
        type Error = MemError;
        async fn fetch(&self, address: &SwarmAddress) -> Result<Option<Bytes>, Self::Error> {
            Ok(self.chunks.lock().unwrap().get(address).cloned())
        }
    }

    impl SnapshotSink for MemNet {
        type Error = MemError;
        async fn push(&self, sealed: &SealedChunk) -> Result<(), Self::Error> {
            use nectar_primitives::Chunk;
            let address = *sealed.chunk.address();
            let payload = Bytes::copy_from_slice(sealed.chunk.data().as_ref());
            self.chunks.lock().unwrap().insert(address, payload);
            Ok(())
        }
    }

    /// A source whose every read fails, to prove a transport failure never
    /// degrades into a fresh start or a NONE floor.
    #[derive(Debug, Default, Clone)]
    struct FailingSource;

    impl SnapshotSource for FailingSource {
        type Error = MemError;
        async fn fetch(&self, _address: &SwarmAddress) -> Result<Option<Bytes>, Self::Error> {
            Err(MemError)
        }
    }

    impl SnapshotSink for FailingSource {
        type Error = MemError;
        async fn push(&self, _sealed: &SealedChunk) -> Result<(), Self::Error> {
            Ok(())
        }
    }

    /// A transport whose futures capture an [`Rc`](std::rc::Rc) and are therefore
    /// `!Send`, standing in for a browser transport (`fetch`, websocket) that
    /// cannot produce `Send` futures.
    struct LocalNet(std::rc::Rc<()>);

    impl SnapshotSource for LocalNet {
        type Error = MemError;
        fn fetch(
            &self,
            _address: &SwarmAddress,
        ) -> impl core::future::Future<Output = Result<Option<Bytes>, Self::Error>> {
            let hold = self.0.clone();
            async move {
                let _hold = &hold;
                Ok(None)
            }
        }
    }

    impl SnapshotSink for LocalNet {
        type Error = MemError;
        fn push(
            &self,
            _sealed: &SealedChunk,
        ) -> impl core::future::Future<Output = Result<(), Self::Error>> {
            let hold = self.0.clone();
            async move {
                let _hold = &hold;
                Ok(())
            }
        }
    }

    /// Compile-time proof that a `!Send` transport satisfies [`SnapshotSource`] and
    /// [`SnapshotSink`]. This only type-checks while neither trait bounds its future
    /// with `Send`, which is exactly what lets a single-threaded browser
    /// transport implement the facade. Re-adding a `+ Send` bound breaks here.
    #[test]
    fn non_send_transport_satisfies_the_traits() {
        fn assert_source<S: SnapshotSource>(_: &S) {}
        fn assert_sink<K: SnapshotSink>(_: &K) {}
        let local = LocalNet(std::rc::Rc::new(()));
        assert_source(&local);
        assert_sink(&local);
    }

    fn test_batch(signer: &PrivateKeySigner, immutable: bool) -> Batch {
        Batch::new(
            B256::repeat_byte(0x42),
            0,
            0,
            signer.address(),
            20,
            16,
            immutable,
        )
    }

    #[tokio::test]
    async fn open_miss_starts_fresh_and_flush_publishes() {
        let signer = PrivateKeySigner::random();
        let batch = test_batch(&signer, true);
        let net = MemNet::default();

        let mut stamper = BatchStamper::open(signer, &batch, net.clone(), net.clone())
            .await
            .unwrap();
        assert_eq!(stamper.snapshot().sequence(), 0);

        let content = SwarmAddress::from(B256::repeat_byte(0x99));
        stamper.stamp(&content).unwrap();
        assert!(stamper.is_dirty());

        stamper.flush().await.unwrap();
        assert_eq!(stamper.snapshot().sequence(), 1);
        assert!(!stamper.is_dirty());

        // A clean, already-persisted snapshot flushes as a no-op.
        stamper.flush().await.unwrap();
        assert_eq!(stamper.snapshot().sequence(), 1);
    }

    #[tokio::test]
    async fn open_recovers_published_batch() {
        let signer = PrivateKeySigner::random();
        let owner = signer.address();
        let batch = test_batch(&signer, true);
        let net = MemNet::default();

        // Machine A: fresh, stamp, flush.
        {
            let mut a = BatchStamper::open(signer.clone(), &batch, net.clone(), net.clone())
                .await
                .unwrap();
            a.stamp(&SwarmAddress::from(B256::repeat_byte(0x99)))
                .unwrap();
            a.flush().await.unwrap();
        }

        // Machine B: same key and batch id, recovers the published state.
        let b = BatchStamper::open(signer, &batch, net.clone(), net.clone())
            .await
            .unwrap();
        assert_eq!(
            b.snapshot().sequence(),
            1,
            "recovered the published sequence"
        );
        assert_eq!(b.owner(), owner);
        assert!(!b.snapshot().allocated_slots().is_empty());
    }

    #[tokio::test]
    async fn open_aborts_on_source_error() {
        let signer = PrivateKeySigner::random();
        let batch = test_batch(&signer, true);

        let result = BatchStamper::open(signer, &batch, FailingSource, FailingSource).await;
        assert!(
            matches!(result, Err(ClientError::Source(_))),
            "a failed read must abort open, not start fresh",
        );
    }

    #[tokio::test]
    async fn flush_aborts_on_floor_read_error() {
        let signer = PrivateKeySigner::random();
        let owner = signer.address();
        let batch = test_batch(&signer, true);

        // Open against a working empty network so the snapshot starts fresh, then
        // swap in a source whose floor read fails on flush.
        let net = MemNet::default();
        let mut stamper = BatchStamper {
            signer: signer.clone(),
            owner,
            batch_id: batch.id(),
            source: FailingSource,
            sink: net.clone(),
            snapshot: Snapshot::from_batch(&batch).unwrap(),
            persisted_this_session: false,
        };
        stamper
            .stamp(&SwarmAddress::from(B256::repeat_byte(0x99)))
            .unwrap();

        let result = stamper.flush().await;
        assert!(
            matches!(result, Err(ClientError::Source(_))),
            "a failed floor read must abort flush, not persist against NONE",
        );
        // Nothing was published.
        assert!(net.chunks.lock().unwrap().is_empty());
    }

    #[tokio::test]
    async fn stamp_then_flush_advances_sequence_and_reuses_slots() {
        let signer = PrivateKeySigner::random();
        let net = MemNet::default();
        let batch = test_batch(&signer, true);

        let mut stamper = BatchStamper::open(signer, &batch, net.clone(), net.clone())
            .await
            .unwrap();
        stamper
            .stamp(&SwarmAddress::from(B256::repeat_byte(0x99)))
            .unwrap();
        stamper.flush().await.unwrap();
        let slots_after_first = stamper.snapshot().allocated_slots().to_vec();
        assert_eq!(stamper.snapshot().sequence(), 1);

        stamper
            .stamp(&SwarmAddress::from(B256::repeat_byte(0xab)))
            .unwrap();
        stamper.flush().await.unwrap();
        assert_eq!(stamper.snapshot().sequence(), 2, "sequence advanced");
        assert_eq!(
            stamper.snapshot().allocated_slots(),
            slots_after_first.as_slice(),
            "the snapshot's own slots were reused, not re-allocated",
        );
    }

    #[tokio::test]
    async fn flush_rejects_stale_sequence() {
        let signer = PrivateKeySigner::random();
        let owner = signer.address();
        let batch = test_batch(&signer, true);
        let net = MemNet::default();

        // Publish sequence 1 and 2 from machine A so the live floor sits at 2.
        {
            let mut a = BatchStamper::open(signer.clone(), &batch, net.clone(), net.clone())
                .await
                .unwrap();
            a.stamp(&SwarmAddress::from(B256::repeat_byte(0x01)))
                .unwrap();
            a.flush().await.unwrap();
            a.stamp(&SwarmAddress::from(B256::repeat_byte(0x02)))
                .unwrap();
            a.flush().await.unwrap();
            assert_eq!(a.snapshot().sequence(), 2);
        }

        // A stale machine B sitting at sequence 1: open it, but rewind its
        // snapshot to a sequence-1 state, then issue and flush. The live floor (2)
        // rejects the next sequence (2).
        let table = UsageTable::new(batch.id(), 20, 16, Mutability::Immutable).unwrap();
        let mut stale = Snapshot::new(table);
        stale
            .revalidate(PublishedSequence::NONE)
            .unwrap()
            .plan_persist(&owner)
            .unwrap();
        assert_eq!(stale.sequence(), 1);

        let mut b = BatchStamper {
            signer,
            owner,
            batch_id: batch.id(),
            source: net.clone(),
            sink: net.clone(),
            snapshot: stale,
            persisted_this_session: false,
        };
        b.stamp(&SwarmAddress::from(B256::repeat_byte(0x03)))
            .unwrap();
        let result = b.flush().await;
        assert!(
            matches!(
                result,
                Err(ClientError::Usage(UsageError::StaleSequence {
                    next: 2,
                    floor: 2
                })),
            ),
            "a persist whose next sequence does not exceed the live floor is rejected",
        );
    }
}