use alloc::collections::BTreeSet;
use alloc::vec::Vec;
use alloy_primitives::{Address, B256};
use bytes::Bytes;
use nectar_postage::{Batch, BatchId, StampIndex, calculate_bucket};
use nectar_primitives::SwarmAddress;
use crate::codec::{self, Encoded, RootInfo};
use crate::table::{TableView, UsageTable};
use crate::{Result, UsageError, usage_chunk_address, usage_chunk_id};
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct PublishedSequence(u64);
impl PublishedSequence {
pub const NONE: Self = Self(0);
pub const fn new(published: u64) -> Self {
Self(published)
}
pub const fn get(self) -> u64 {
self.0
}
}
impl From<&RootInfo> for PublishedSequence {
fn from(r: &RootInfo) -> Self {
Self(r.sequence())
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Snapshot {
table: UsageTable,
sequence: u64,
slots: Vec<u32>,
issued_at_persist: Option<u64>,
#[cfg(feature = "seal")]
last_seal_timestamp: Option<u64>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PlannedChunk {
pub index: u16,
pub id: B256,
pub address: SwarmAddress,
pub stamp_index: StampIndex,
pub payload: Bytes,
pub newly_allocated: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[must_use = "dropping the parts discards the recovered sequence and slots; rebuild with Snapshot::from_parts"]
pub struct SnapshotParts {
table: UsageTable,
sequence: u64,
slots: Vec<u32>,
}
impl SnapshotParts {
pub const fn table(&self) -> TableView<'_> {
TableView::new(&self.table)
}
pub const fn sequence(&self) -> u64 {
self.sequence
}
pub fn allocated_slots(&self) -> &[u32] {
&self.slots
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PersistPlan {
pub batch_id: BatchId,
pub sequence: u64,
pub chunks: Vec<PlannedChunk>,
#[cfg(feature = "seal")]
pub previous_timestamp: Option<u64>,
}
impl Snapshot {
pub const fn new(table: UsageTable) -> Self {
Self {
table,
sequence: 0,
slots: Vec::new(),
issued_at_persist: None,
#[cfg(feature = "seal")]
last_seal_timestamp: None,
}
}
pub fn from_batch(batch: &Batch) -> Result<Self> {
Ok(Self::new(UsageTable::from_batch(batch)?))
}
pub(crate) fn validate_parts(table: &UsageTable, slots: &[u32]) -> Result<()> {
let capacity = table.bucket_capacity();
if slots.len() > u16::MAX as usize {
return Err(UsageError::Malformed("too many allocated chunks"));
}
if let Some(&slot) = slots.iter().find(|&&slot| slot >= capacity) {
return Err(UsageError::InvalidSlot { slot, capacity });
}
Ok(())
}
pub(crate) fn recovered_parts(
table: UsageTable,
sequence: u64,
slots: Vec<u32>,
) -> Result<SnapshotParts> {
Self::validate_parts(&table, &slots)?;
Ok(SnapshotParts {
table,
sequence,
slots,
})
}
pub fn from_parts(parts: SnapshotParts) -> Result<Self> {
let SnapshotParts {
table,
sequence,
slots,
} = parts;
Self::validate_parts(&table, &slots)?;
let issued_at_persist = Some(table.total_issued());
Ok(Self {
table,
sequence,
slots,
issued_at_persist,
#[cfg(feature = "seal")]
last_seal_timestamp: None,
})
}
pub const fn table(&self) -> TableView<'_> {
TableView::new(&self.table)
}
pub(crate) const fn table_ref(&self) -> &UsageTable {
&self.table
}
pub fn dilute(&mut self, new_depth: u8) -> Result<()> {
self.table.dilute(new_depth)
}
pub fn merge_max(&mut self, other: &UsageTable) -> Result<()> {
self.table.merge_max(other)
}
#[must_use = "the parts carry the recovered sequence and slots; dropping them discards that state"]
pub fn into_parts(self) -> SnapshotParts {
SnapshotParts {
table: self.table,
sequence: self.sequence,
slots: self.slots,
}
}
pub const fn sequence(&self) -> u64 {
self.sequence
}
#[cfg(feature = "seal")]
pub const fn last_seal_timestamp(&self) -> Option<u64> {
self.last_seal_timestamp
}
#[cfg(feature = "seal")]
pub(crate) const fn record_seal_timestamp(&mut self, timestamp: u64) {
self.last_seal_timestamp = Some(timestamp);
}
pub fn allocated_slots(&self) -> &[u32] {
&self.slots
}
pub const fn is_dirty(&self) -> bool {
match self.issued_at_persist {
Some(baseline) => self.table.total_issued() != baseline,
None => self.table.total_issued() != 0,
}
}
pub const fn stamps_since_persist(&self) -> Option<u64> {
if self.table.is_mutable() {
return None;
}
let baseline = match self.issued_at_persist {
Some(baseline) => baseline,
None => 0,
};
Some(self.table.total_issued().saturating_sub(baseline))
}
pub fn reserved_stamp_indices(&self, owner: &Address) -> Vec<StampIndex> {
let batch_id = self.table.batch_id();
let bucket_depth = self.table.bucket_depth();
self.slots
.iter()
.enumerate()
.map(|(index, &slot)| {
let address = usage_chunk_address(&batch_id, owner, index as u16);
StampIndex::new(calculate_bucket(&address, bucket_depth), slot)
})
.collect()
}
pub fn is_reserved(&self, owner: &Address, index: StampIndex) -> bool {
self.reserved_stamp_indices(owner).contains(&index)
}
pub(crate) fn reserved_slots(&self, owner: &Address) -> BTreeSet<(u32, u32)> {
self.reserved_stamp_indices(owner)
.into_iter()
.map(|index| (index.bucket(), index.index()))
.collect()
}
pub(crate) fn record_bucket(
&mut self,
bucket: u32,
reserved: &BTreeSet<(u32, u32)>,
) -> Result<u32> {
self.table
.counters_mut()
.record(bucket, |slot| reserved.contains(&(bucket, slot)))
.map_err(crate::table::map_counter_error)
}
pub fn issuer(&mut self, owner: Address) -> Issuer<'_> {
let reserved = self.reserved_slots(&owner);
Issuer {
snapshot: self,
owner,
reserved,
}
}
#[cfg(feature = "issuer")]
pub(crate) fn record_address(
&mut self,
owner: Address,
address: &SwarmAddress,
) -> Result<StampIndex> {
self.issuer(owner).record_address(address)
}
#[cfg(feature = "issuer")]
pub const fn into_issuer(self, owner: Address) -> crate::SnapshotIssuer {
crate::SnapshotIssuer::new(self, owner)
}
pub(crate) fn encode(&self) -> Result<Encoded> {
codec::encode(&self.table, self.sequence, &self.slots)
}
pub fn revalidate(&mut self, floor: PublishedSequence) -> Result<Validated<'_>> {
let next = self
.sequence
.checked_add(1)
.ok_or(UsageError::Malformed("persist sequence would overflow"))?;
if next <= floor.get() {
return Err(UsageError::StaleSequence {
next,
floor: floor.get(),
});
}
Ok(Validated {
snapshot: self,
floor: floor.get(),
})
}
}
#[derive(Debug)]
pub struct Validated<'s> {
snapshot: &'s mut Snapshot,
floor: u64,
}
impl Validated<'_> {
#[must_use = "the persist plan is the chunks to publish; dropping it discards the planned persist"]
pub fn plan_persist(&mut self, owner: &Address) -> Result<PersistPlan> {
let sequence = self
.snapshot
.sequence
.checked_add(1)
.ok_or(UsageError::Malformed("persist sequence would overflow"))?;
if sequence <= self.floor {
return Err(UsageError::StaleSequence {
next: sequence,
floor: self.floor,
});
}
let batch_id = self.snapshot.table.batch_id();
let bucket_depth = self.snapshot.table.bucket_depth();
let previously_allocated = self.snapshot.slots.len();
let previous_sequence = self.snapshot.sequence;
let steady_state_encoded = if self.snapshot.slots.is_empty() {
None
} else {
self.snapshot.sequence = sequence;
let encoded = self.snapshot.encode()?;
if self.snapshot.slots.len() > encoded.leaves.len() {
Some(encoded)
} else {
self.snapshot.sequence = previous_sequence;
None
}
};
let encoded = if let Some(encoded) = steady_state_encoded {
encoded
} else {
let mut work = self.snapshot.clone();
work.sequence = sequence;
let allocate = |work: &mut Snapshot| -> Result<()> {
let index = work.slots.len() as u16;
let address = usage_chunk_address(&batch_id, owner, index);
let bucket = calculate_bucket(&address, bucket_depth);
let reserved = work.reserved_slots(owner);
let slot = work.record_bucket(bucket, &reserved)?;
work.slots.push(slot);
Ok(())
};
if work.slots.is_empty() {
allocate(&mut work)?;
}
let encoded = loop {
let encoded = work.encode()?;
if work.slots.len() > encoded.leaves.len() {
break encoded;
}
allocate(&mut work)?;
};
*self.snapshot = work;
encoded
};
self.snapshot.issued_at_persist = Some(self.snapshot.table.total_issued());
let slots = &self.snapshot.slots;
let mut chunks = Vec::with_capacity(1 + encoded.leaves.len());
let payloads = core::iter::once(&encoded.root).chain(encoded.leaves.iter());
for (index, payload) in payloads.enumerate() {
let index = index as u16;
let id = usage_chunk_id(&batch_id, index);
let address = usage_chunk_address(&batch_id, owner, index);
let bucket = calculate_bucket(&address, bucket_depth);
chunks.push(PlannedChunk {
index,
id,
address,
stamp_index: StampIndex::new(bucket, slots[index as usize]),
payload: payload.clone(),
newly_allocated: (index as usize) >= previously_allocated,
});
}
Ok(PersistPlan {
batch_id,
sequence,
chunks,
#[cfg(feature = "seal")]
previous_timestamp: self.snapshot.last_seal_timestamp,
})
}
pub const fn floor(&self) -> u64 {
self.floor
}
}
#[derive(Debug)]
pub struct Issuer<'s> {
snapshot: &'s mut Snapshot,
owner: Address,
reserved: BTreeSet<(u32, u32)>,
}
impl Issuer<'_> {
pub fn record_address(&mut self, address: &SwarmAddress) -> Result<StampIndex> {
Ok(self.record_address_reporting_wrap(address)?.0)
}
pub fn record_address_reporting_wrap(
&mut self,
address: &SwarmAddress,
) -> Result<(StampIndex, bool)> {
let bucket = calculate_bucket(address, self.snapshot.table.bucket_depth());
let wrapped = self.will_wrap(bucket)?;
let index = self.snapshot.record_bucket(bucket, &self.reserved)?;
Ok((StampIndex::new(bucket, index), wrapped))
}
pub fn will_wrap(&self, bucket: u32) -> Result<bool> {
if !self.snapshot.table_ref().is_mutable() {
return Ok(false);
}
self.snapshot
.table_ref()
.has_capacity(bucket)
.map(|free| !free)
}
pub const fn owner(&self) -> Address {
self.owner
}
pub fn count(&self, bucket: u32) -> Result<u32> {
self.snapshot.table_ref().count(bucket)
}
pub fn max_count(&self) -> u32 {
self.snapshot.table_ref().max_count()
}
pub const fn is_mutable(&self) -> bool {
self.snapshot.table_ref().is_mutable()
}
pub fn has_capacity(&self, bucket: u32) -> Result<bool> {
self.snapshot.table_ref().has_capacity(bucket)
}
pub const fn stamps_issued(&self) -> Option<u64> {
if self.snapshot.table_ref().is_mutable() {
None
} else {
Some(self.snapshot.table_ref().total_issued())
}
}
pub const fn checksum(&self) -> u64 {
self.snapshot.table_ref().total_issued()
}
}