use std::ops::Range;
use crate::{BlockId, KvbmSequenceHashProvider, SequenceHash};
use dynamo_tokens::TokenBlock;
use super::super::store::BlockStore;
use crate::sequence::BlockSequenceError;
pub struct ExternalBlockAssignments {
store: BlockStore<(), SequenceHash, SequenceHash>,
offset: usize,
}
impl std::fmt::Debug for ExternalBlockAssignments {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ExternalBlockAssignments")
.field("assigned_count", &self.store.assigned_count())
.field("staged_count", &self.store.staged_count())
.field("unassigned_count", &self.store.unassigned_count())
.field("offset", &self.offset)
.finish()
}
}
impl ExternalBlockAssignments {
pub fn new(offset: usize) -> Self {
Self {
store: BlockStore::new(),
offset,
}
}
pub fn offset(&self) -> usize {
self.offset
}
pub fn contains(&self, block_id: &BlockId) -> bool {
self.store.contains(block_id)
}
pub fn get_assigned(&self, index: usize) -> Option<(BlockId, SequenceHash)> {
self.store
.get_assigned(index)
.map(|(&id, &hash)| (id, hash))
}
pub fn assigned_count(&self) -> usize {
self.store.assigned_count()
}
pub fn staged_count(&self) -> usize {
self.store.staged_count()
}
pub fn unassigned_count(&self) -> usize {
self.store.unassigned_count()
}
pub fn get_staged(&self, index: usize) -> Option<(BlockId, SequenceHash)> {
self.store.get_staged(index).map(|(&id, &hash)| (id, hash))
}
pub fn assigned_iter(&self) -> impl Iterator<Item = (BlockId, SequenceHash)> + '_ {
self.store.assigned_iter().map(|(&id, &hash)| (id, hash))
}
pub fn staged_iter(&self) -> impl Iterator<Item = (BlockId, SequenceHash)> + '_ {
self.store.staged_iter().map(|(&id, &hash)| (id, hash))
}
pub fn unassigned_iter(&self) -> impl Iterator<Item = BlockId> + '_ {
self.store.unassigned_iter().map(|(&id, _)| id)
}
pub fn clear(&mut self) {
self.store.clear();
}
pub fn take_staged(&mut self) -> Vec<(BlockId, SequenceHash)> {
self.store.take_staged()
}
pub fn next_position(&self) -> usize {
self.offset + self.store.assigned_count() + self.store.staged_count()
}
pub fn assigned_positions(&self) -> Range<usize> {
self.offset..self.offset + self.store.assigned_count()
}
pub fn staged_positions(&self) -> Range<usize> {
let start = self.offset + self.store.assigned_count();
start..start + self.store.staged_count()
}
pub fn get_at_position(&self, abs_pos: usize) -> Option<(BlockId, SequenceHash)> {
let relative = abs_pos.checked_sub(self.offset)?;
self.get_assigned(relative)
}
pub fn pending_positions(&self) -> Range<usize> {
let start = self.next_position();
start..start + self.store.unassigned_count()
}
pub fn get_pending_at_position(&self, abs_pos: usize) -> Option<BlockId> {
let start = self.next_position();
let relative = abs_pos.checked_sub(start)?;
self.store.get_unassigned(relative).map(|(&id, _)| id)
}
pub fn extend_block_ids(
&mut self,
block_ids: impl IntoIterator<Item = BlockId>,
) -> Result<(), BlockSequenceError> {
let mut new_ids = Vec::new();
let mut new_id_set = indexmap::IndexSet::new();
let mut first_new_index: Option<usize> = None;
for (i, id) in block_ids.into_iter().enumerate() {
if self.contains(&id) {
if let Some(first_new) = first_new_index {
return Err(BlockSequenceError::OrderingViolation {
known_id: id,
new_id: new_ids[0],
known_index: i,
first_new_index: first_new,
});
}
} else {
if !new_id_set.insert(id) {
return Err(BlockSequenceError::DuplicateBlockId { block_id: id });
}
if first_new_index.is_none() {
first_new_index = Some(i);
}
new_ids.push(id);
}
}
for id in new_ids {
self.store.insert_unassigned(id, ());
}
Ok(())
}
pub fn extend_assigned(
&mut self,
items: impl IntoIterator<Item = (BlockId, SequenceHash)>,
) -> Result<usize, BlockSequenceError> {
let items: Vec<(BlockId, SequenceHash)> = items.into_iter().collect();
if let Err(block_id) = self
.store
.validate_no_duplicates(items.iter().map(|(id, _)| *id), items.len())
{
return Err(BlockSequenceError::DuplicateBlockId { block_id });
}
let count = items.len();
for (id, hash) in items {
self.store.insert_assigned(id, hash);
}
Ok(count)
}
pub fn stage_pending(
&mut self,
sequence_blocks: &[TokenBlock],
) -> Result<Range<usize>, BlockSequenceError> {
let staged_start_idx = self.store.staged_count();
let start_pos = self.next_position();
let available_blocks = sequence_blocks.len().saturating_sub(start_pos);
let to_stage = available_blocks.min(self.store.unassigned_count());
for i in 0..to_stage {
let seq_pos = start_pos + i;
let block = &sequence_blocks[seq_pos];
let hash = block.kvbm_sequence_hash();
let actual_pos = hash.position();
if actual_pos != seq_pos as u64 {
let block_id = self.store.get_unassigned(i).map(|(&id, _)| id).unwrap();
return Err(BlockSequenceError::PositionMismatch {
expected: seq_pos,
actual: actual_pos,
block_id,
});
}
}
for i in 0..to_stage {
let seq_pos = start_pos + i;
let hash = sequence_blocks[seq_pos].kvbm_sequence_hash();
let (block_id, _) = self.store.shift_unassigned().unwrap();
self.store.insert_staged(block_id, hash);
}
let staged_end_idx = self.store.staged_count();
Ok(staged_start_idx..staged_end_idx)
}
pub fn commit_staged(&mut self) -> Range<usize> {
let start_idx = self.store.assigned_count();
while let Some((block_id, hash)) = self.store.shift_staged() {
self.store.insert_assigned(block_id, hash);
}
let end_idx = self.store.assigned_count();
start_idx..end_idx
}
pub fn assign_pending(
&mut self,
sequence_blocks: &[TokenBlock],
) -> Result<Range<usize>, BlockSequenceError> {
self.stage_pending(sequence_blocks)?;
Ok(self.commit_staged())
}
}
pub fn zip_assigned(
a: &ExternalBlockAssignments,
b: &ExternalBlockAssignments,
) -> Vec<(usize, BlockId, BlockId)> {
let a_range = a.assigned_positions();
let b_range = b.assigned_positions();
let start = a_range.start.max(b_range.start);
let end = a_range.end.min(b_range.end);
let mut result = Vec::new();
for pos in start..end {
let (a_id, _) = a.get_at_position(pos).unwrap();
let (b_id, _) = b.get_at_position(pos).unwrap();
result.push((pos, a_id, b_id));
}
result
}
pub fn zip_assigned_pending(
src: &ExternalBlockAssignments,
dst: &ExternalBlockAssignments,
) -> Vec<(usize, BlockId, BlockId)> {
let src_range = src.assigned_positions();
let dst_range = dst.pending_positions();
let start = src_range.start.max(dst_range.start);
let end = src_range.end.min(dst_range.end);
let mut result = Vec::new();
for pos in start..end {
let (src_id, _) = src.get_at_position(pos).unwrap();
let dst_id = dst.get_pending_at_position(pos).unwrap();
result.push((pos, src_id, dst_id));
}
result
}