grovedb_epoch_based_storage_flags/
lib.rs

1//! Flags
2
3pub mod error;
4mod split_removal_bytes;
5mod update_element_flags;
6
7use crate::{
8    error::StorageFlagsError,
9    StorageFlags::{MultiEpoch, MultiEpochOwned, SingleEpoch, SingleEpochOwned},
10};
11
12const DEFAULT_HASH_SIZE_U32: u32 = 32;
13
14/// Optional meta-data to be stored per element
15pub type ElementFlags = Vec<u8>;
16
17use std::{borrow::Cow, cmp::Ordering, collections::BTreeMap, fmt};
18
19use grovedb_costs::storage_cost::removal::{
20    StorageRemovalPerEpochByIdentifier, StorageRemovedBytes,
21    StorageRemovedBytes::{NoStorageRemoval, SectionedStorageRemoval},
22};
23use integer_encoding::VarInt;
24use intmap::IntMap;
25
26type EpochIndex = u16;
27
28type BaseEpoch = EpochIndex;
29
30type BytesAddedInEpoch = u32;
31
32type OwnerId = [u8; 32];
33
34/// The size of single epoch flags
35pub const SINGLE_EPOCH_FLAGS_SIZE: u32 = 3;
36
37/// The minimum size of the non-base flags
38pub const MINIMUM_NON_BASE_FLAGS_SIZE: u32 = 3;
39
40/// Storage flags
41#[derive(Clone, Debug, PartialEq, Eq)]
42pub enum StorageFlags {
43    /// Single epoch
44    /// represented as byte 0
45    SingleEpoch(BaseEpoch),
46
47    /// Multi epoch
48    /// represented as byte 1
49    MultiEpoch(BaseEpoch, BTreeMap<EpochIndex, BytesAddedInEpoch>),
50
51    /// Single epoch owned
52    /// represented as byte 2
53    SingleEpochOwned(BaseEpoch, OwnerId),
54
55    /// Multi epoch owned
56    /// represented as byte 3
57    MultiEpochOwned(BaseEpoch, BTreeMap<EpochIndex, BytesAddedInEpoch>, OwnerId),
58}
59
60impl fmt::Display for StorageFlags {
61    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62        match self {
63            StorageFlags::SingleEpoch(base_epoch) => {
64                write!(f, "SingleEpoch(BaseEpoch: {})", base_epoch)
65            }
66            StorageFlags::MultiEpoch(base_epoch, epochs) => {
67                write!(f, "MultiEpoch(BaseEpoch: {}, Epochs: ", base_epoch)?;
68                for (index, bytes) in epochs {
69                    write!(f, "[EpochIndex: {}, BytesAdded: {}] ", index, bytes)?;
70                }
71                write!(f, ")")
72            }
73            StorageFlags::SingleEpochOwned(base_epoch, owner_id) => {
74                write!(
75                    f,
76                    "SingleEpochOwned(BaseEpoch: {}, OwnerId: {})",
77                    base_epoch,
78                    hex::encode(owner_id)
79                )
80            }
81            StorageFlags::MultiEpochOwned(base_epoch, epochs, owner_id) => {
82                write!(f, "MultiEpochOwned(BaseEpoch: {}, Epochs: ", base_epoch)?;
83                for (index, bytes) in epochs {
84                    write!(f, "[EpochIndex: {}, BytesAdded: {}] ", index, bytes)?;
85                }
86                write!(f, ", OwnerId: {})", hex::encode(owner_id))
87            }
88        }
89    }
90}
91
92/// MergingOwnersStrategy decides which owner to keep during a merge
93#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
94pub enum MergingOwnersStrategy {
95    #[default]
96    /// Raise an issue that owners of nodes are different
97    RaiseIssue,
98    /// Use the original owner id
99    UseOurs,
100    /// Use the new owner id
101    UseTheirs,
102}
103
104impl StorageFlags {
105    /// Create new single epoch storage flags
106    pub fn new_single_epoch(epoch: BaseEpoch, maybe_owner_id: Option<OwnerId>) -> Self {
107        match maybe_owner_id {
108            None => SingleEpoch(epoch),
109            Some(owner_id) => SingleEpochOwned(epoch, owner_id),
110        }
111    }
112
113    /// Sets the owner id if we have owned storage flags
114    pub fn set_owner_id(&mut self, owner_id: OwnerId) {
115        match self {
116            SingleEpochOwned(_, previous_owner_id) | MultiEpochOwned(_, _, previous_owner_id) => {
117                *previous_owner_id = owner_id;
118            }
119            _ => {}
120        }
121    }
122
123    fn combine_owner_id<'a>(
124        &'a self,
125        rhs: &'a Self,
126        merging_owners_strategy: MergingOwnersStrategy,
127    ) -> Result<Option<&'a OwnerId>, StorageFlagsError> {
128        if let Some(our_owner_id) = self.owner_id() {
129            if let Some(other_owner_id) = rhs.owner_id() {
130                if our_owner_id != other_owner_id {
131                    match merging_owners_strategy {
132                        MergingOwnersStrategy::RaiseIssue => {
133                            Err(StorageFlagsError::MergingStorageFlagsFromDifferentOwners(
134                                "can not merge from different owners".to_string(),
135                            ))
136                        }
137                        MergingOwnersStrategy::UseOurs => Ok(Some(our_owner_id)),
138                        MergingOwnersStrategy::UseTheirs => Ok(Some(other_owner_id)),
139                    }
140                } else {
141                    Ok(Some(our_owner_id))
142                }
143            } else {
144                Ok(Some(our_owner_id))
145            }
146        } else if let Some(other_owner_id) = rhs.owner_id() {
147            Ok(Some(other_owner_id))
148        } else {
149            Ok(None)
150        }
151    }
152
153    fn combine_non_base_epoch_bytes(
154        &self,
155        rhs: &Self,
156    ) -> Option<BTreeMap<EpochIndex, BytesAddedInEpoch>> {
157        if let Some(our_epoch_index_map) = self.epoch_index_map() {
158            if let Some(other_epoch_index_map) = rhs.epoch_index_map() {
159                let mut combined_index_map = our_epoch_index_map.clone();
160                other_epoch_index_map
161                    .iter()
162                    .for_each(|(epoch_index, bytes_added)| {
163                        // Simply insert the value from rhs, overwriting any existing value
164                        combined_index_map.insert(*epoch_index, *bytes_added);
165                    });
166                // println!(
167                //     "         >combine_non_base_epoch_bytes: self:{:?} & rhs:{:?} -> {:?}",
168                //     our_epoch_index_map, other_epoch_index_map, combined_index_map
169                // );
170                Some(combined_index_map)
171            } else {
172                Some(our_epoch_index_map.clone())
173            }
174        } else {
175            rhs.epoch_index_map().cloned()
176        }
177    }
178
179    fn combine_same_base_epoch(
180        &self,
181        rhs: Self,
182        merging_owners_strategy: MergingOwnersStrategy,
183    ) -> Result<Self, StorageFlagsError> {
184        let base_epoch = *self.base_epoch();
185        let owner_id = self.combine_owner_id(&rhs, merging_owners_strategy)?;
186        let other_epoch_bytes = self.combine_non_base_epoch_bytes(&rhs);
187
188        match (owner_id, other_epoch_bytes) {
189            (None, None) => Ok(SingleEpoch(base_epoch)),
190            (Some(owner_id), None) => Ok(SingleEpochOwned(base_epoch, *owner_id)),
191            (None, Some(other_epoch_bytes)) => Ok(MultiEpoch(base_epoch, other_epoch_bytes)),
192            (Some(owner_id), Some(other_epoch_bytes)) => {
193                Ok(MultiEpochOwned(base_epoch, other_epoch_bytes, *owner_id))
194            }
195        }
196    }
197
198    fn combine_with_higher_base_epoch(
199        &self,
200        rhs: Self,
201        added_bytes: u32,
202        merging_owners_strategy: MergingOwnersStrategy,
203    ) -> Result<Self, StorageFlagsError> {
204        let base_epoch = *self.base_epoch();
205        let epoch_with_adding_bytes = rhs.base_epoch();
206        let owner_id = self.combine_owner_id(&rhs, merging_owners_strategy)?;
207        let mut other_epoch_bytes = self.combine_non_base_epoch_bytes(&rhs).unwrap_or_default();
208        let original_value = other_epoch_bytes.remove(epoch_with_adding_bytes);
209        match original_value {
210            None => other_epoch_bytes.insert(*epoch_with_adding_bytes, added_bytes),
211            Some(original_bytes) => {
212                other_epoch_bytes.insert(*epoch_with_adding_bytes, original_bytes + added_bytes)
213            }
214        };
215        // println!(
216        //     "         >combine_with_higher_base_epoch added_bytes:{} self:{:?} &
217        // rhs:{:?} -> {:?}",     added_bytes,
218        //     self.epoch_index_map(),
219        //     rhs.epoch_index_map(),
220        //     other_epoch_bytes
221        // );
222
223        match owner_id {
224            None => Ok(MultiEpoch(base_epoch, other_epoch_bytes)),
225            Some(owner_id) => Ok(MultiEpochOwned(base_epoch, other_epoch_bytes, *owner_id)),
226        }
227    }
228
229    fn combine_with_higher_base_epoch_remove_bytes(
230        self,
231        rhs: Self,
232        removed_bytes: &StorageRemovedBytes,
233        merging_owners_strategy: MergingOwnersStrategy,
234    ) -> Result<Self, StorageFlagsError> {
235        if matches!(&self, &SingleEpoch(_) | &SingleEpochOwned(..)) {
236            return Ok(self);
237        }
238        let base_epoch = *self.base_epoch();
239        let owner_id = self.combine_owner_id(&rhs, merging_owners_strategy)?;
240        let mut other_epoch_bytes = self.combine_non_base_epoch_bytes(&rhs).unwrap_or_default();
241        if let SectionedStorageRemoval(sectioned_bytes_by_identifier) = removed_bytes {
242            if sectioned_bytes_by_identifier.len() > 1 {
243                return Err(StorageFlagsError::MergingStorageFlagsFromDifferentOwners(
244                    "can not remove bytes when there is no epoch".to_string(),
245                ));
246            }
247            // we must use our owner id, because we would be removing bytes from it
248            let identifier = self.owner_id().copied().unwrap_or_default();
249            let sectioned_bytes = sectioned_bytes_by_identifier.get(&identifier).ok_or(
250                StorageFlagsError::MergingStorageFlagsFromDifferentOwners(
251                    "can not remove bytes when there is no epoch".to_string(),
252                ),
253            )?;
254            let mut keys_to_remove = Vec::new(); // To store the keys that need to be removed
255
256            sectioned_bytes
257                .iter()
258                .try_for_each(|(epoch, removed_bytes)| {
259                    if epoch == base_epoch {
260                        return Ok::<(), StorageFlagsError>(());
261                    }
262                    let bytes_added_in_epoch = other_epoch_bytes.get_mut(&epoch).ok_or(
263                        StorageFlagsError::RemovingAtEpochWithNoAssociatedStorage(format!(
264                            "can not remove bytes when there is no epoch number [{}]",
265                            epoch
266                        )),
267                    )?;
268
269                    let desired_bytes_in_epoch = bytes_added_in_epoch
270                        .checked_sub(*removed_bytes)
271                        .ok_or(StorageFlagsError::StorageFlagsOverflow(
272                        "can't remove more bytes than exist at that epoch".to_string(),
273                    ))?;
274
275                    if desired_bytes_in_epoch <= MINIMUM_NON_BASE_FLAGS_SIZE {
276                        // Collect the key to remove later
277                        keys_to_remove.push(epoch);
278                    } else {
279                        *bytes_added_in_epoch = desired_bytes_in_epoch;
280                    }
281
282                    Ok::<(), StorageFlagsError>(())
283                })?;
284
285            // Now remove the keys after the iteration
286            for key in keys_to_remove {
287                other_epoch_bytes.remove(&key);
288            }
289        }
290        // println!(
291        //     "         >combine_with_higher_base_epoch_remove_bytes: self:{:?} &
292        // rhs:{:?} -> {:?}",     self.epoch_index_map(),
293        //     rhs.epoch_index_map(),
294        //     other_epoch_bytes
295        // );
296
297        if other_epoch_bytes.is_empty() {
298            match owner_id {
299                None => Ok(SingleEpoch(base_epoch)),
300                Some(owner_id) => Ok(SingleEpochOwned(base_epoch, *owner_id)),
301            }
302        } else {
303            match owner_id {
304                None => Ok(MultiEpoch(base_epoch, other_epoch_bytes)),
305                Some(owner_id) => Ok(MultiEpochOwned(base_epoch, other_epoch_bytes, *owner_id)),
306            }
307        }
308    }
309
310    /// Optional combine added bytes
311    pub fn optional_combine_added_bytes(
312        ours: Option<Self>,
313        theirs: Self,
314        added_bytes: u32,
315        merging_owners_strategy: MergingOwnersStrategy,
316    ) -> Result<Self, StorageFlagsError> {
317        match ours {
318            None => Ok(theirs),
319            Some(ours) => {
320                Ok(ours.combine_added_bytes(theirs, added_bytes, merging_owners_strategy)?)
321            }
322        }
323    }
324
325    /// Optional combine removed bytes
326    pub fn optional_combine_removed_bytes(
327        ours: Option<Self>,
328        theirs: Self,
329        removed_bytes: &StorageRemovedBytes,
330        merging_owners_strategy: MergingOwnersStrategy,
331    ) -> Result<Self, StorageFlagsError> {
332        match ours {
333            None => Ok(theirs),
334            Some(ours) => {
335                Ok(ours.combine_removed_bytes(theirs, removed_bytes, merging_owners_strategy)?)
336            }
337        }
338    }
339
340    /// Combine added bytes
341    pub fn combine_added_bytes(
342        self,
343        rhs: Self,
344        added_bytes: u32,
345        merging_owners_strategy: MergingOwnersStrategy,
346    ) -> Result<Self, StorageFlagsError> {
347        match self.base_epoch().cmp(rhs.base_epoch()) {
348            Ordering::Equal => self.combine_same_base_epoch(rhs, merging_owners_strategy),
349            Ordering::Less => {
350                self.combine_with_higher_base_epoch(rhs, added_bytes, merging_owners_strategy)
351            }
352            Ordering::Greater => Err(
353                StorageFlagsError::MergingStorageFlagsWithDifferentBaseEpoch(
354                    "can not merge with new item in older base epoch".to_string(),
355                ),
356            ),
357        }
358    }
359
360    /// Combine removed bytes
361    pub fn combine_removed_bytes(
362        self,
363        rhs: Self,
364        removed_bytes: &StorageRemovedBytes,
365        merging_owners_strategy: MergingOwnersStrategy,
366    ) -> Result<Self, StorageFlagsError> {
367        match self.base_epoch().cmp(rhs.base_epoch()) {
368            Ordering::Equal => self.combine_same_base_epoch(rhs, merging_owners_strategy),
369            Ordering::Less => self.combine_with_higher_base_epoch_remove_bytes(
370                rhs,
371                removed_bytes,
372                merging_owners_strategy,
373            ),
374            Ordering::Greater => Err(
375                StorageFlagsError::MergingStorageFlagsWithDifferentBaseEpoch(
376                    "can not merge with new item in older base epoch".to_string(),
377                ),
378            ),
379        }
380    }
381
382    /// Returns base epoch
383    pub fn base_epoch(&self) -> &BaseEpoch {
384        match self {
385            SingleEpoch(base_epoch)
386            | MultiEpoch(base_epoch, _)
387            | SingleEpochOwned(base_epoch, _)
388            | MultiEpochOwned(base_epoch, ..) => base_epoch,
389        }
390    }
391
392    /// Returns owner id
393    pub fn owner_id(&self) -> Option<&OwnerId> {
394        match self {
395            SingleEpochOwned(_, owner_id) | MultiEpochOwned(_, _, owner_id) => Some(owner_id),
396            _ => None,
397        }
398    }
399
400    /// Returns epoch index map
401    pub fn epoch_index_map(&self) -> Option<&BTreeMap<EpochIndex, BytesAddedInEpoch>> {
402        match self {
403            MultiEpoch(_, epoch_int_map) | MultiEpochOwned(_, epoch_int_map, _) => {
404                Some(epoch_int_map)
405            }
406            _ => None,
407        }
408    }
409
410    /// Returns optional default storage flags
411    pub fn optional_default() -> Option<Self> {
412        None
413    }
414
415    /// Returns default optional storage flag as ref
416    pub fn optional_default_as_ref() -> Option<&'static Self> {
417        None
418    }
419
420    /// Returns default optional storage flag as ref
421    pub fn optional_default_as_cow() -> Option<Cow<'static, Self>> {
422        None
423    }
424
425    /// Returns type byte
426    pub fn type_byte(&self) -> u8 {
427        match self {
428            SingleEpoch(_) => 0,
429            MultiEpoch(..) => 1,
430            SingleEpochOwned(..) => 2,
431            MultiEpochOwned(..) => 3,
432        }
433    }
434
435    fn append_to_vec_base_epoch(&self, buffer: &mut Vec<u8>) {
436        match self {
437            SingleEpoch(base_epoch)
438            | MultiEpoch(base_epoch, ..)
439            | SingleEpochOwned(base_epoch, ..)
440            | MultiEpochOwned(base_epoch, ..) => buffer.extend(base_epoch.to_be_bytes()),
441        }
442    }
443
444    fn maybe_append_to_vec_epoch_map(&self, buffer: &mut Vec<u8>) {
445        match self {
446            MultiEpoch(_, epoch_map) | MultiEpochOwned(_, epoch_map, _) => {
447                if epoch_map.is_empty() {
448                    panic!("this should not be empty");
449                }
450                epoch_map.iter().for_each(|(epoch_index, bytes_added)| {
451                    buffer.extend(epoch_index.to_be_bytes());
452                    buffer.extend(bytes_added.encode_var_vec());
453                })
454            }
455            _ => {}
456        }
457    }
458
459    fn maybe_epoch_map_size(&self) -> u32 {
460        let mut size = 0;
461        match self {
462            MultiEpoch(_, epoch_map) | MultiEpochOwned(_, epoch_map, _) => {
463                epoch_map.iter().for_each(|(_epoch_index, bytes_added)| {
464                    size += 2;
465                    size += bytes_added.encode_var_vec().len() as u32;
466                })
467            }
468            _ => {}
469        }
470        size
471    }
472
473    fn maybe_append_to_vec_owner_id(&self, buffer: &mut Vec<u8>) {
474        match self {
475            SingleEpochOwned(_, owner_id) | MultiEpochOwned(_, _, owner_id) => {
476                buffer.extend(owner_id);
477            }
478            _ => {}
479        }
480    }
481
482    fn maybe_owner_id_size(&self) -> u32 {
483        match self {
484            SingleEpochOwned(..) | MultiEpochOwned(..) => DEFAULT_HASH_SIZE_U32,
485            _ => 0,
486        }
487    }
488
489    /// ApproximateSize
490    pub fn approximate_size(
491        has_owner_id: bool,
492        approximate_changes_and_bytes_count: Option<(u16, u8)>,
493    ) -> u32 {
494        let mut size = 3; // 1 for type byte, 2 for epoch number
495        if has_owner_id {
496            size += DEFAULT_HASH_SIZE_U32;
497        }
498        if let Some((approximate_change_count, bytes_changed_required_size)) =
499            approximate_changes_and_bytes_count
500        {
501            size += (approximate_change_count as u32) * (2 + bytes_changed_required_size as u32)
502        }
503        size
504    }
505
506    /// Serialize storage flags
507    pub fn serialize(&self) -> Vec<u8> {
508        let mut buffer = vec![self.type_byte()];
509        self.maybe_append_to_vec_owner_id(&mut buffer);
510        self.append_to_vec_base_epoch(&mut buffer);
511        self.maybe_append_to_vec_epoch_map(&mut buffer);
512        buffer
513    }
514
515    /// Serialize storage flags
516    pub fn serialized_size(&self) -> u32 {
517        let mut buffer_len = 3; // for type byte and base epoch
518        buffer_len += self.maybe_owner_id_size();
519        buffer_len += self.maybe_epoch_map_size();
520        buffer_len
521    }
522
523    /// Deserialize single epoch storage flags from bytes
524    pub fn deserialize_single_epoch(data: &[u8]) -> Result<Self, StorageFlagsError> {
525        if data.len() != 3 {
526            Err(StorageFlagsError::StorageFlagsWrongSize(
527                "single epoch must be 3 bytes total".to_string(),
528            ))
529        } else {
530            let epoch = u16::from_be_bytes(data[1..3].try_into().map_err(|_| {
531                StorageFlagsError::StorageFlagsWrongSize(
532                    "single epoch must be 3 bytes total".to_string(),
533                )
534            })?);
535            Ok(SingleEpoch(epoch))
536        }
537    }
538
539    /// Deserialize multi epoch storage flags from bytes
540    pub fn deserialize_multi_epoch(data: &[u8]) -> Result<Self, StorageFlagsError> {
541        let len = data.len();
542        if len < 6 {
543            Err(StorageFlagsError::StorageFlagsWrongSize(
544                "multi epoch must be at least 6 bytes total".to_string(),
545            ))
546        } else {
547            let base_epoch = u16::from_be_bytes(data[1..3].try_into().map_err(|_| {
548                StorageFlagsError::StorageFlagsWrongSize(
549                    "multi epoch must have enough bytes for the base epoch".to_string(),
550                )
551            })?);
552            let mut offset = 3;
553            let mut bytes_per_epoch: BTreeMap<u16, u32> = BTreeMap::default();
554            while offset + 2 < len {
555                // 2 for epoch size
556                let epoch_index =
557                    u16::from_be_bytes(data[offset..offset + 2].try_into().map_err(|_| {
558                        StorageFlagsError::StorageFlagsWrongSize(
559                            "multi epoch must have enough bytes epoch indexes".to_string(),
560                        )
561                    })?);
562                offset += 2;
563                let (bytes_at_epoch, bytes_used) = u32::decode_var(&data[offset..]).ok_or(
564                    StorageFlagsError::StorageFlagsWrongSize(
565                        "multi epoch must have enough bytes for the amount of bytes used"
566                            .to_string(),
567                    ),
568                )?;
569                offset += bytes_used;
570                bytes_per_epoch.insert(epoch_index, bytes_at_epoch);
571            }
572            Ok(MultiEpoch(base_epoch, bytes_per_epoch))
573        }
574    }
575
576    /// Deserialize single epoch owned storage flags from bytes
577    pub fn deserialize_single_epoch_owned(data: &[u8]) -> Result<Self, StorageFlagsError> {
578        if data.len() != 35 {
579            Err(StorageFlagsError::StorageFlagsWrongSize(
580                "single epoch owned must be 35 bytes total".to_string(),
581            ))
582        } else {
583            let owner_id: OwnerId = data[1..33].try_into().map_err(|_| {
584                StorageFlagsError::StorageFlagsWrongSize(
585                    "single epoch owned must be 35 bytes total for owner id".to_string(),
586                )
587            })?;
588            let epoch = u16::from_be_bytes(data[33..35].try_into().map_err(|_| {
589                StorageFlagsError::StorageFlagsWrongSize(
590                    "single epoch owned must be 35 bytes total for epoch".to_string(),
591                )
592            })?);
593            Ok(SingleEpochOwned(epoch, owner_id))
594        }
595    }
596
597    /// Deserialize multi epoch owned storage flags from bytes
598    pub fn deserialize_multi_epoch_owned(data: &[u8]) -> Result<Self, StorageFlagsError> {
599        let len = data.len();
600        if len < 38 {
601            Err(StorageFlagsError::StorageFlagsWrongSize(
602                "multi epoch owned must be at least 38 bytes total".to_string(),
603            ))
604        } else {
605            let owner_id: OwnerId = data[1..33].try_into().map_err(|_| {
606                StorageFlagsError::StorageFlagsWrongSize(
607                    "multi epoch owned must be 38 bytes total for owner id".to_string(),
608                )
609            })?;
610            let base_epoch = u16::from_be_bytes(data[33..35].try_into().map_err(|_| {
611                StorageFlagsError::StorageFlagsWrongSize(
612                    "multi epoch must have enough bytes for the base epoch".to_string(),
613                )
614            })?);
615            let mut offset = 35;
616            let mut bytes_per_epoch: BTreeMap<u16, u32> = BTreeMap::default();
617            while offset + 2 < len {
618                // 2 for epoch size
619                let epoch_index =
620                    u16::from_be_bytes(data[offset..offset + 2].try_into().map_err(|_| {
621                        StorageFlagsError::StorageFlagsWrongSize(
622                            "multi epoch must have enough bytes epoch indexes".to_string(),
623                        )
624                    })?);
625                offset += 2;
626                let (bytes_at_epoch, bytes_used) = u32::decode_var(&data[offset..]).ok_or(
627                    StorageFlagsError::StorageFlagsWrongSize(
628                        "multi epoch must have enough bytes for the amount of bytes used"
629                            .to_string(),
630                    ),
631                )?;
632                offset += bytes_used;
633                bytes_per_epoch.insert(epoch_index, bytes_at_epoch);
634            }
635            Ok(MultiEpochOwned(base_epoch, bytes_per_epoch, owner_id))
636        }
637    }
638
639    /// Deserialize storage flags from bytes
640    pub fn deserialize(data: &[u8]) -> Result<Option<Self>, StorageFlagsError> {
641        let first_byte = data.first();
642        match first_byte {
643            None => Ok(None),
644            Some(first_byte) => match *first_byte {
645                0 => Ok(Some(Self::deserialize_single_epoch(data)?)),
646                1 => Ok(Some(Self::deserialize_multi_epoch(data)?)),
647                2 => Ok(Some(Self::deserialize_single_epoch_owned(data)?)),
648                3 => Ok(Some(Self::deserialize_multi_epoch_owned(data)?)),
649                _ => Err(StorageFlagsError::DeserializeUnknownStorageFlagsType(
650                    "unknown storage flags serialization".to_string(),
651                )),
652            },
653        }
654    }
655
656    /// Creates storage flags from a slice.
657    pub fn from_slice(data: &[u8]) -> Result<Option<Self>, StorageFlagsError> {
658        Self::deserialize(data)
659    }
660
661    /// Creates storage flags from element flags.
662    pub fn from_element_flags_ref(data: &ElementFlags) -> Result<Option<Self>, StorageFlagsError> {
663        Self::from_slice(data.as_slice())
664    }
665
666    /// Create Storage flags from optional element flags ref
667    pub fn map_some_element_flags_ref(
668        data: &Option<ElementFlags>,
669    ) -> Result<Option<Self>, StorageFlagsError> {
670        match data {
671            None => Ok(None),
672            Some(data) => Self::from_slice(data.as_slice()),
673        }
674    }
675
676    /// Create Storage flags from optional element flags ref
677    pub fn map_cow_some_element_flags_ref(
678        data: &Option<ElementFlags>,
679    ) -> Result<Option<Cow<Self>>, StorageFlagsError> {
680        match data {
681            None => Ok(None),
682            Some(data) => Self::from_slice(data.as_slice()).map(|option| option.map(Cow::Owned)),
683        }
684    }
685
686    /// Map to owned optional element flags
687    pub fn map_owned_to_element_flags(maybe_storage_flags: Option<Self>) -> ElementFlags {
688        maybe_storage_flags
689            .map(|storage_flags| storage_flags.serialize())
690            .unwrap_or_default()
691    }
692
693    /// Map to optional element flags
694    pub fn map_to_some_element_flags(maybe_storage_flags: Option<&Self>) -> Option<ElementFlags> {
695        maybe_storage_flags.map(|storage_flags| storage_flags.serialize())
696    }
697
698    /// Map to optional element flags
699    pub fn map_cow_to_some_element_flags(
700        maybe_storage_flags: Option<Cow<Self>>,
701    ) -> Option<ElementFlags> {
702        maybe_storage_flags.map(|storage_flags| storage_flags.serialize())
703    }
704
705    /// Map to optional element flags
706    pub fn map_borrowed_cow_to_some_element_flags(
707        maybe_storage_flags: &Option<Cow<Self>>,
708    ) -> Option<ElementFlags> {
709        maybe_storage_flags
710            .as_ref()
711            .map(|storage_flags| storage_flags.serialize())
712    }
713
714    /// Creates optional element flags
715    pub fn to_some_element_flags(&self) -> Option<ElementFlags> {
716        Some(self.serialize())
717    }
718
719    /// Creates element flags.
720    pub fn to_element_flags(&self) -> ElementFlags {
721        self.serialize()
722    }
723
724    /// split_storage_removed_bytes removes bytes as LIFO
725    pub fn split_storage_removed_bytes(
726        &self,
727        removed_key_bytes: u32,
728        removed_value_bytes: u32,
729    ) -> (StorageRemovedBytes, StorageRemovedBytes) {
730        fn single_storage_removal(
731            removed_bytes: u32,
732            base_epoch: &BaseEpoch,
733            owner_id: Option<&OwnerId>,
734        ) -> StorageRemovedBytes {
735            if removed_bytes == 0 {
736                return NoStorageRemoval;
737            }
738            let bytes_left = removed_bytes;
739            let mut sectioned_storage_removal: IntMap<u16, u32> = IntMap::default();
740            if bytes_left > 0 {
741                // We need to take some from the base epoch
742                sectioned_storage_removal.insert(*base_epoch, removed_bytes);
743            }
744            let mut sectioned_storage_removal_by_identifier: StorageRemovalPerEpochByIdentifier =
745                BTreeMap::new();
746            if let Some(owner_id) = owner_id {
747                sectioned_storage_removal_by_identifier
748                    .insert(*owner_id, sectioned_storage_removal);
749            } else {
750                let default = [0u8; 32];
751                sectioned_storage_removal_by_identifier.insert(default, sectioned_storage_removal);
752            }
753            SectionedStorageRemoval(sectioned_storage_removal_by_identifier)
754        }
755
756        fn sectioned_storage_removal(
757            removed_bytes: u32,
758            base_epoch: &BaseEpoch,
759            other_epoch_bytes: &BTreeMap<EpochIndex, BytesAddedInEpoch>,
760            owner_id: Option<&OwnerId>,
761        ) -> StorageRemovedBytes {
762            if removed_bytes == 0 {
763                return NoStorageRemoval;
764            }
765            let mut bytes_left = removed_bytes;
766            let mut rev_iter = other_epoch_bytes.iter().rev();
767            let mut sectioned_storage_removal: IntMap<u16, u32> = IntMap::default();
768
769            while bytes_left > 0 {
770                if let Some((epoch_index, bytes_in_epoch)) = rev_iter.next() {
771                    if *bytes_in_epoch <= bytes_left + MINIMUM_NON_BASE_FLAGS_SIZE {
772                        sectioned_storage_removal
773                            .insert(*epoch_index, *bytes_in_epoch - MINIMUM_NON_BASE_FLAGS_SIZE);
774                        bytes_left -= *bytes_in_epoch - MINIMUM_NON_BASE_FLAGS_SIZE;
775                    } else {
776                        // Correctly take only the required bytes_left from this epoch
777                        sectioned_storage_removal.insert(*epoch_index, bytes_left);
778                        bytes_left = 0; // All required bytes have been removed, stop processing
779                        break; // Exit the loop as there's no need to process
780                               // further epochs
781                    }
782                } else {
783                    break;
784                }
785            }
786
787            if bytes_left > 0 {
788                // If there are still bytes left, take them from the base epoch
789                sectioned_storage_removal.insert(*base_epoch, bytes_left);
790            }
791
792            let mut sectioned_storage_removal_by_identifier: StorageRemovalPerEpochByIdentifier =
793                BTreeMap::new();
794
795            if let Some(owner_id) = owner_id {
796                sectioned_storage_removal_by_identifier
797                    .insert(*owner_id, sectioned_storage_removal);
798            } else {
799                let default = [0u8; 32];
800                sectioned_storage_removal_by_identifier.insert(default, sectioned_storage_removal);
801            }
802
803            SectionedStorageRemoval(sectioned_storage_removal_by_identifier)
804        }
805
806        // If key bytes are being removed, it implies a delete; thus, we should remove
807        // all relevant storage bytes
808        let key_storage_removal = if removed_key_bytes > 0 {
809            match self {
810                // For any variant, always take the key's removed bytes from the base epoch
811                SingleEpoch(base_epoch) | MultiEpoch(base_epoch, _) => {
812                    single_storage_removal(removed_key_bytes, base_epoch, None)
813                }
814                SingleEpochOwned(base_epoch, owner_id)
815                | MultiEpochOwned(base_epoch, _, owner_id) => {
816                    single_storage_removal(removed_key_bytes, base_epoch, Some(owner_id))
817                }
818            }
819        } else {
820            StorageRemovedBytes::default()
821        };
822
823        // For normal logic, we only need to process the value-related bytes.
824        let value_storage_removal = match self {
825            SingleEpoch(base_epoch) => {
826                single_storage_removal(removed_value_bytes, base_epoch, None)
827            }
828            SingleEpochOwned(base_epoch, owner_id) => {
829                single_storage_removal(removed_value_bytes, base_epoch, Some(owner_id))
830            }
831            MultiEpoch(base_epoch, other_epoch_bytes) => {
832                sectioned_storage_removal(removed_value_bytes, base_epoch, other_epoch_bytes, None)
833            }
834            MultiEpochOwned(base_epoch, other_epoch_bytes, owner_id) => sectioned_storage_removal(
835                removed_value_bytes,
836                base_epoch,
837                other_epoch_bytes,
838                Some(owner_id),
839            ),
840        };
841
842        // For key removal, simply return the empty removal since it's an update does
843        // not modify the key.
844        (key_storage_removal, value_storage_removal)
845    }
846
847    /// Wrap Storage Flags into optional owned cow
848    pub fn into_optional_cow<'a>(self) -> Option<Cow<'a, Self>> {
849        Some(Cow::Owned(self))
850    }
851}
852
853#[cfg(test)]
854mod storage_flags_tests {
855    use std::collections::BTreeMap;
856
857    use grovedb_costs::storage_cost::removal::{
858        StorageRemovalPerEpochByIdentifier, StorageRemovedBytes,
859    };
860    use intmap::IntMap;
861
862    use crate::{
863        BaseEpoch, BytesAddedInEpoch, MergingOwnersStrategy, OwnerId, StorageFlags,
864        MINIMUM_NON_BASE_FLAGS_SIZE,
865    };
866    #[test]
867    fn test_storage_flags_combine() {
868        {
869            // Same SingleEpoch - AdditionBytes
870            let common_base_index: BaseEpoch = 1;
871            let left_flag = StorageFlags::new_single_epoch(common_base_index, None);
872            let right_flag = StorageFlags::new_single_epoch(common_base_index, None);
873
874            let added_bytes: BytesAddedInEpoch = 10;
875            let combined_flag = left_flag.clone().combine_added_bytes(
876                right_flag.clone(),
877                added_bytes,
878                MergingOwnersStrategy::UseOurs,
879            );
880            // println!(
881            //     "{:?} & {:?} added_bytes:{} --> {:?}\n",
882            //     left_flag, right_flag, added_bytes, combined_flag
883            // );
884        }
885        // {
886        // Same SingleEpoch - RemovedBytes
887        // let common_base_index: BaseEpoch = 1;
888        // let left_flag = StorageFlags::new_single_epoch(common_base_index, None);
889        // let right_flag = StorageFlags::new_single_epoch(common_base_index, None);
890        //
891        // let removed_bytes = StorageRemovedBytes::BasicStorageRemoval(10);
892        // let combined_flag =
893        // left_flag.clone().combine_removed_bytes(right_flag.clone(), &removed_bytes,
894        // MergingOwnersStrategy::UseOurs); println!("{:?} & {:?}
895        // removed_bytes:{:?} --> {:?}\n", left_flag, right_flag, removed_bytes,
896        // combined_flag); }
897        {
898            // Different-Higher SingleEpoch - AdditionBytes
899            let left_base_index: BaseEpoch = 1;
900            let right_base_index: BaseEpoch = 2;
901            let left_flag = StorageFlags::new_single_epoch(left_base_index, None);
902            let right_flag = StorageFlags::new_single_epoch(right_base_index, None);
903
904            let added_bytes: BytesAddedInEpoch = 10;
905            let combined_flag = left_flag.clone().combine_added_bytes(
906                right_flag.clone(),
907                added_bytes,
908                MergingOwnersStrategy::UseOurs,
909            );
910            // println!(
911            //     "{:?} & {:?} added_bytes:{} --> {:?}\n",
912            //     left_flag, right_flag, added_bytes, combined_flag
913            // );
914        }
915        {
916            // Different-Lesser SingleEpoch - AdditionBytes
917            let left_base_index: BaseEpoch = 2;
918            let right_base_index: BaseEpoch = 1;
919            let left_flag = StorageFlags::new_single_epoch(left_base_index, None);
920            let right_flag = StorageFlags::new_single_epoch(right_base_index, None);
921
922            let added_bytes: BytesAddedInEpoch = 10;
923            let combined_flag = left_flag.clone().combine_added_bytes(
924                right_flag.clone(),
925                added_bytes,
926                MergingOwnersStrategy::UseOurs,
927            );
928            // println!(
929            //     "{:?} & {:?} added_bytes:{} --> {:?}\n",
930            //     left_flag, right_flag, added_bytes, combined_flag
931            // );
932        }
933        {
934            // SingleEpoch-MultiEpoch same BaseEpoch - AdditionBytes
935            let common_base_index: BaseEpoch = 1;
936            let left_flag = StorageFlags::new_single_epoch(common_base_index, None);
937            let right_flag = StorageFlags::MultiEpoch(
938                common_base_index,
939                [(common_base_index + 1, 5)].iter().cloned().collect(),
940            );
941
942            let added_bytes: BytesAddedInEpoch = 10;
943            let combined_flag = left_flag.clone().combine_added_bytes(
944                right_flag.clone(),
945                added_bytes,
946                MergingOwnersStrategy::UseOurs,
947            );
948            // println!(
949            //     "{:?} & {:?} added_bytes:{} --> {:?}\n",
950            //     left_flag, right_flag, added_bytes, combined_flag
951            // );
952        }
953        {
954            // SingleEpoch-MultiEpoch higher BaseEpoch - AdditionBytes
955            let left_base_index: BaseEpoch = 1;
956            let right_base_index: BaseEpoch = 2;
957            let left_flag = StorageFlags::new_single_epoch(left_base_index, None);
958            let right_flag = StorageFlags::MultiEpoch(
959                right_base_index,
960                [(right_base_index + 1, 5)].iter().cloned().collect(),
961            );
962
963            let added_bytes: BytesAddedInEpoch = 10;
964            let combined_flag = left_flag.clone().combine_added_bytes(
965                right_flag.clone(),
966                added_bytes,
967                MergingOwnersStrategy::UseOurs,
968            );
969            // println!(
970            //     "{:?} & {:?} added_bytes:{} --> {:?}\n",
971            //     left_flag, right_flag, added_bytes, combined_flag
972            // );
973        }
974        // {
975        // SingleEpoch-MultiEpoch same BaseEpoch - RemovedBytes (positive difference)
976        // let common_base_index: BaseEpoch = 1;
977        // let left_flag = StorageFlags::new_single_epoch(common_base_index, None);
978        // let right_flag = StorageFlags::MultiEpoch(common_base_index,
979        // [(common_base_index + 1, 10)].iter().cloned().collect());
980        //
981        // let removed_bytes = StorageRemovedBytes::BasicStorageRemoval(3);
982        // let combined_flag =
983        // left_flag.clone().combine_removed_bytes(right_flag.clone(), &removed_bytes,
984        // MergingOwnersStrategy::UseOurs); println!("{:?} & {:?}
985        // removed_bytes:{:?} --> {:?}\n", left_flag, right_flag, &removed_bytes,
986        // combined_flag); }
987        // {
988        // SingleEpoch-MultiEpoch same BaseEpoch - RemovedBytes (negative difference)
989        // let common_base_index: BaseEpoch = 1;
990        // let left_flag = StorageFlags::new_single_epoch(common_base_index, None);
991        // let right_flag = StorageFlags::MultiEpoch(common_base_index,
992        // [(common_base_index + 1, 10)].iter().cloned().collect());
993        //
994        // let removed_bytes = StorageRemovedBytes::BasicStorageRemoval(13);
995        // let combined_flag =
996        // left_flag.clone().combine_removed_bytes(right_flag.clone(), &removed_bytes,
997        // MergingOwnersStrategy::UseOurs); println!("{:?} & {:?}
998        // removed_bytes:{:?} --> {:?}\n", left_flag, right_flag, &removed_bytes,
999        // combined_flag); }
1000        // {
1001        // SingleEpoch-MultiEpoch higher BaseEpoch - RemovedBytes (positive difference)
1002        // let left_base_index: BaseEpoch = 1;
1003        // let right_base_index: BaseEpoch = 2;
1004        // let left_flag = StorageFlags::new_single_epoch(left_base_index, None);
1005        // let right_flag = StorageFlags::MultiEpoch(right_base_index,
1006        // [(right_base_index + 1, 10)].iter().cloned().collect());
1007        //
1008        // let removed_bytes = StorageRemovedBytes::BasicStorageRemoval(3);
1009        // let combined_flag =
1010        // left_flag.clone().combine_removed_bytes(right_flag.clone(), &removed_bytes,
1011        // MergingOwnersStrategy::UseOurs); println!("{:?} & {:?}
1012        // removed_bytes:{:?} --> {:?}\n", left_flag, right_flag, &removed_bytes,
1013        // combined_flag); }
1014        // {
1015        // SingleEpoch-MultiEpoch higher BaseEpoch - RemovedBytes (negative difference)
1016        // let left_base_index: BaseEpoch = 1;
1017        // let right_base_index: BaseEpoch = 2;
1018        // let left_flag = StorageFlags::new_single_epoch(left_base_index, None);
1019        // let right_flag = StorageFlags::MultiEpoch(right_base_index,
1020        // [(right_base_index + 1, 5)].iter().cloned().collect());
1021        //
1022        // let removed_bytes = StorageRemovedBytes::BasicStorageRemoval(7);
1023        // let combined_flag =
1024        // left_flag.clone().combine_removed_bytes(right_flag.clone(), &removed_bytes,
1025        // MergingOwnersStrategy::UseOurs); println!("{:?} & {:?}
1026        // removed_bytes:{:?} --> {:?}\n", left_flag, right_flag, &removed_bytes,
1027        // combined_flag); }
1028        {
1029            // MultiEpochs same BaseEpoch - AdditionBytes #1
1030            let common_base_index: BaseEpoch = 1;
1031            let left_flag = StorageFlags::MultiEpoch(
1032                common_base_index,
1033                [(common_base_index + 1, 7)].iter().cloned().collect(),
1034            );
1035            let right_flag = StorageFlags::MultiEpoch(
1036                common_base_index,
1037                [(common_base_index + 1, 5)].iter().cloned().collect(),
1038            );
1039
1040            let added_bytes: BytesAddedInEpoch = 10;
1041            let combined_flag = left_flag.clone().combine_added_bytes(
1042                right_flag.clone(),
1043                added_bytes,
1044                MergingOwnersStrategy::UseOurs,
1045            );
1046            println!(
1047                "{:?} & {:?} added_bytes:{} --> {:?}\n",
1048                left_flag, right_flag, added_bytes, combined_flag
1049            );
1050        }
1051        {
1052            // MultiEpochs same BaseEpoch - AdditionBytes #2
1053            let common_base_index: BaseEpoch = 1;
1054            let left_flag = StorageFlags::MultiEpoch(
1055                common_base_index,
1056                [(common_base_index + 1, 7)].iter().cloned().collect(),
1057            );
1058            let right_flag = StorageFlags::MultiEpoch(
1059                common_base_index,
1060                [(common_base_index + 2, 5)].iter().cloned().collect(),
1061            );
1062
1063            let added_bytes: BytesAddedInEpoch = 10;
1064            let combined_flag = left_flag.clone().combine_added_bytes(
1065                right_flag.clone(),
1066                added_bytes,
1067                MergingOwnersStrategy::UseOurs,
1068            );
1069            println!(
1070                "{:?} & {:?} added_bytes:{} --> {:?}\n",
1071                left_flag, right_flag, added_bytes, combined_flag
1072            );
1073        }
1074        {
1075            // MultiEpochs same BaseEpoch - AdditionBytes #3
1076            let common_base_index: BaseEpoch = 1;
1077            let left_flag = StorageFlags::MultiEpoch(
1078                common_base_index,
1079                [(common_base_index + 1, 7)].iter().cloned().collect(),
1080            );
1081            let right_flag = StorageFlags::MultiEpoch(
1082                common_base_index,
1083                [(common_base_index + 1, 3), (common_base_index + 2, 5)]
1084                    .iter()
1085                    .cloned()
1086                    .collect(),
1087            );
1088
1089            let added_bytes: BytesAddedInEpoch = 10;
1090            let combined_flag = left_flag.clone().combine_added_bytes(
1091                right_flag.clone(),
1092                added_bytes,
1093                MergingOwnersStrategy::UseOurs,
1094            );
1095            println!(
1096                "{:?} & {:?} added_bytes:{} --> {:?}\n",
1097                left_flag, right_flag, added_bytes, combined_flag
1098            );
1099        }
1100        {
1101            // MultiEpochs higher BaseEpoch - AdditionBytes #1
1102            let common_base_index: BaseEpoch = 1;
1103            let left_flag = StorageFlags::MultiEpoch(
1104                common_base_index,
1105                [(common_base_index + 1, 7)].iter().cloned().collect(),
1106            );
1107            let right_flag = StorageFlags::MultiEpoch(
1108                common_base_index + 1,
1109                [(common_base_index + 1, 5)].iter().cloned().collect(),
1110            );
1111
1112            let added_bytes: BytesAddedInEpoch = 10;
1113            let combined_flag = left_flag.clone().combine_added_bytes(
1114                right_flag.clone(),
1115                added_bytes,
1116                MergingOwnersStrategy::UseOurs,
1117            );
1118            println!(
1119                "{:?} & {:?} added_bytes:{} --> {:?}\n",
1120                left_flag, right_flag, added_bytes, combined_flag
1121            );
1122        }
1123        {
1124            // MultiEpochs higher BaseEpoch - AdditionBytes #2
1125            let common_base_index: BaseEpoch = 1;
1126            let left_flag = StorageFlags::MultiEpoch(
1127                common_base_index,
1128                [(common_base_index + 1, 7)].iter().cloned().collect(),
1129            );
1130            let right_flag = StorageFlags::MultiEpoch(
1131                common_base_index + 1,
1132                [(common_base_index + 2, 5)].iter().cloned().collect(),
1133            );
1134
1135            let added_bytes: BytesAddedInEpoch = 10;
1136            let combined_flag = left_flag.clone().combine_added_bytes(
1137                right_flag.clone(),
1138                added_bytes,
1139                MergingOwnersStrategy::UseOurs,
1140            );
1141            println!(
1142                "{:?} & {:?} added_bytes:{} --> {:?}\n",
1143                left_flag, right_flag, added_bytes, combined_flag
1144            );
1145        }
1146        {
1147            // MultiEpochs higher BaseEpoch - AdditionBytes #3
1148            let common_base_index: BaseEpoch = 1;
1149            let left_flag = StorageFlags::MultiEpoch(
1150                common_base_index,
1151                [(common_base_index + 1, 7)].iter().cloned().collect(),
1152            );
1153            let right_flag = StorageFlags::MultiEpoch(
1154                common_base_index + 1,
1155                [(common_base_index + 2, 3), (common_base_index + 3, 5)]
1156                    .iter()
1157                    .cloned()
1158                    .collect(),
1159            );
1160
1161            let added_bytes: BytesAddedInEpoch = 10;
1162            let combined_flag = left_flag.clone().combine_added_bytes(
1163                right_flag.clone(),
1164                added_bytes,
1165                MergingOwnersStrategy::UseOurs,
1166            );
1167            println!(
1168                "{:?} & {:?} added_bytes:{} --> {:?}\n",
1169                left_flag, right_flag, added_bytes, combined_flag
1170            );
1171        }
1172    }
1173
1174    fn create_epoch_map(epoch: u16, bytes: u32) -> BTreeMap<u16, u32> {
1175        let mut map = BTreeMap::new();
1176        map.insert(epoch, bytes);
1177        map
1178    }
1179
1180    fn default_owner_id() -> OwnerId {
1181        [0u8; 32]
1182    }
1183
1184    fn single_epoch_removed_bytes_map(
1185        owner_id: [u8; 32],
1186        epoch_index: u16,
1187        bytes_removed: u32,
1188    ) -> StorageRemovalPerEpochByIdentifier {
1189        let mut removed_bytes = StorageRemovalPerEpochByIdentifier::default();
1190        let mut removed_bytes_for_identity = IntMap::new();
1191        removed_bytes_for_identity.insert(epoch_index, bytes_removed);
1192        removed_bytes.insert(owner_id, removed_bytes_for_identity);
1193        removed_bytes
1194    }
1195
1196    fn multi_epoch_removed_bytes_map(
1197        owner_id: [u8; 32],
1198        removed_bytes_per_epoch: IntMap<u16, u32>,
1199    ) -> StorageRemovalPerEpochByIdentifier {
1200        let mut removed_bytes = StorageRemovalPerEpochByIdentifier::default();
1201        removed_bytes.insert(owner_id, removed_bytes_per_epoch);
1202        removed_bytes
1203    }
1204
1205    #[test]
1206    fn test_combine_that_would_remove_the_epoch_completely() {
1207        let owner_id = [2; 32];
1208        let left_base_index: BaseEpoch = 0;
1209        let right_base_index: BaseEpoch = 1;
1210        let other_epochs = create_epoch_map(1, 5);
1211        let left_flag = StorageFlags::MultiEpochOwned(left_base_index, other_epochs, owner_id);
1212        let right_flag = StorageFlags::new_single_epoch(right_base_index, Some(owner_id));
1213
1214        let removed_bytes = single_epoch_removed_bytes_map(owner_id, 1, 3);
1215        let combined_flag = left_flag
1216            .clone()
1217            .combine_removed_bytes(
1218                right_flag.clone(),
1219                &StorageRemovedBytes::SectionedStorageRemoval(removed_bytes.clone()),
1220                MergingOwnersStrategy::UseOurs,
1221            )
1222            .expect("expected to combine flags");
1223
1224        assert_eq!(
1225            combined_flag,
1226            StorageFlags::SingleEpochOwned(left_base_index, owner_id)
1227        );
1228        println!(
1229            "{:?} & {:?} removed:{:?} --> {:?}\n",
1230            left_flag, right_flag, removed_bytes, combined_flag
1231        );
1232    }
1233
1234    #[test]
1235    fn test_combine_that_would_remove_the_epoch_completely_with_many_entries() {
1236        let owner_id = [2; 32];
1237        let left_base_index: BaseEpoch = 0;
1238        let right_base_index: BaseEpoch = 1;
1239        let mut other_epochs = BTreeMap::new();
1240        let mut removed_bytes = IntMap::new();
1241        for i in 1..200 {
1242            other_epochs.insert(i, MINIMUM_NON_BASE_FLAGS_SIZE + 1);
1243            removed_bytes.insert(i, 1); // anything between 1 and
1244                                        // MINIMUM_NON_BASE_FLAGS_SIZE +
1245                                        // 1 would be the same
1246        }
1247
1248        let left_flag = StorageFlags::MultiEpochOwned(left_base_index, other_epochs, owner_id);
1249        let right_flag = StorageFlags::new_single_epoch(right_base_index, Some(owner_id));
1250
1251        let removed_bytes = multi_epoch_removed_bytes_map(owner_id, removed_bytes);
1252        let combined_flag = left_flag
1253            .clone()
1254            .combine_removed_bytes(
1255                right_flag.clone(),
1256                &StorageRemovedBytes::SectionedStorageRemoval(removed_bytes.clone()),
1257                MergingOwnersStrategy::UseOurs,
1258            )
1259            .expect("expected to combine flags");
1260
1261        assert_eq!(
1262            combined_flag,
1263            StorageFlags::SingleEpochOwned(left_base_index, owner_id)
1264        );
1265        println!(
1266            "{:?} & {:?} removed:{:?} --> {:?}\n",
1267            left_flag, right_flag, removed_bytes, combined_flag
1268        );
1269    }
1270
1271    /// Tests the case when using SingleEpoch flags, ensuring that the correct
1272    /// storage removal is calculated.
1273    #[test]
1274    fn test_single_epoch_removal() {
1275        let flags = StorageFlags::SingleEpoch(5);
1276        let (key_removal, value_removal) = flags.split_storage_removed_bytes(100, 200);
1277
1278        assert_eq!(
1279            key_removal,
1280            StorageRemovedBytes::SectionedStorageRemoval({
1281                let mut map = BTreeMap::new();
1282                map.insert(default_owner_id(), IntMap::from_iter([(5u16, 100)]));
1283                map
1284            })
1285        );
1286        assert_eq!(
1287            value_removal,
1288            StorageRemovedBytes::SectionedStorageRemoval({
1289                let mut map = BTreeMap::new();
1290                map.insert(default_owner_id(), IntMap::from_iter([(5u16, 200)]));
1291                map
1292            })
1293        );
1294    }
1295
1296    /// Tests SingleEpochOwned flags where the removal is done under an OwnerId
1297    #[test]
1298    fn test_single_epoch_owned_removal() {
1299        let owner_id = [1u8; 32];
1300        let flags = StorageFlags::SingleEpochOwned(5, owner_id);
1301        let (key_removal, value_removal) = flags.split_storage_removed_bytes(50, 150);
1302
1303        assert_eq!(
1304            key_removal,
1305            StorageRemovedBytes::SectionedStorageRemoval({
1306                let mut map = BTreeMap::new();
1307                map.insert(owner_id, IntMap::from_iter([(5u16, 50)]));
1308                map
1309            })
1310        );
1311        assert_eq!(
1312            value_removal,
1313            StorageRemovedBytes::SectionedStorageRemoval({
1314                let mut map = BTreeMap::new();
1315                map.insert(owner_id, IntMap::from_iter([(5u16, 150)]));
1316                map
1317            })
1318        );
1319    }
1320
1321    /// Tests the case where multiple epochs are used and the total removal
1322    /// doesn’t exceed the extra epoch bytes
1323    #[test]
1324    fn test_multi_epoch_removal_no_remaining_base() {
1325        let mut other_epochs = create_epoch_map(6, 100);
1326        other_epochs.insert(7, 200);
1327
1328        let flags = StorageFlags::MultiEpoch(5, other_epochs);
1329        let (_key_removal, value_removal) = flags.split_storage_removed_bytes(0, 250);
1330
1331        assert_eq!(
1332            value_removal,
1333            StorageRemovedBytes::SectionedStorageRemoval({
1334                let mut map = BTreeMap::new();
1335                map.insert(
1336                    default_owner_id(),
1337                    IntMap::from_iter([(7u16, 197), (6u16, 53)]),
1338                );
1339                map
1340            })
1341        );
1342    }
1343
1344    /// Similar to the previous test, but this time the base epoch is also used
1345    /// due to insufficient bytes in the extra epochs
1346    #[test]
1347    fn test_multi_epoch_removal_with_remaining_base() {
1348        let mut other_epochs = create_epoch_map(6, 100);
1349        other_epochs.insert(7, 50);
1350
1351        let flags = StorageFlags::MultiEpoch(5, other_epochs);
1352        let (key_removal, value_removal) = flags.split_storage_removed_bytes(250, 250);
1353
1354        assert_eq!(
1355            key_removal,
1356            StorageRemovedBytes::SectionedStorageRemoval({
1357                let mut map = BTreeMap::new();
1358                map.insert(default_owner_id(), IntMap::from_iter([(5u16, 250)]));
1359                map
1360            })
1361        );
1362        assert_eq!(
1363            value_removal,
1364            StorageRemovedBytes::SectionedStorageRemoval({
1365                let mut map = BTreeMap::new();
1366                map.insert(
1367                    default_owner_id(),
1368                    IntMap::from_iter([(7u16, 47), (6u16, 97), (5u16, 106)]),
1369                );
1370                map
1371            })
1372        );
1373    }
1374
1375    /// Same as last test but for owned flags with OwnerId
1376    #[test]
1377    fn test_multi_epoch_owned_removal_with_remaining_base() {
1378        let owner_id = [2u8; 32];
1379        let mut other_epochs = create_epoch_map(6, 100);
1380        other_epochs.insert(7, 50);
1381
1382        let flags = StorageFlags::MultiEpochOwned(5, other_epochs, owner_id);
1383        let (key_removal, value_removal) = flags.split_storage_removed_bytes(250, 250);
1384
1385        assert_eq!(
1386            key_removal,
1387            StorageRemovedBytes::SectionedStorageRemoval({
1388                let mut map = BTreeMap::new();
1389                map.insert(owner_id, IntMap::from_iter([(5u16, 250)]));
1390                map
1391            })
1392        );
1393        assert_eq!(
1394            value_removal,
1395            StorageRemovedBytes::SectionedStorageRemoval({
1396                let mut map = BTreeMap::new();
1397                map.insert(
1398                    owner_id,
1399                    IntMap::from_iter([(7u16, 47), (6u16, 97), (5u16, 106)]),
1400                );
1401                map
1402            })
1403        );
1404    }
1405
1406    /// Tests the function when zero bytes are to be removed, expecting an empty
1407    /// removal result
1408    #[test]
1409    fn test_single_epoch_removal_zero_bytes() {
1410        let flags = StorageFlags::SingleEpoch(5);
1411        let (key_removal, value_removal) = flags.split_storage_removed_bytes(0, 0);
1412
1413        assert_eq!(key_removal, StorageRemovedBytes::NoStorageRemoval);
1414        assert_eq!(value_removal, StorageRemovedBytes::NoStorageRemoval);
1415    }
1416
1417    /// Tests the removal of only part of the bytes using SingleEpochOwned
1418    #[test]
1419    fn test_single_epoch_owned_removal_partial_bytes() {
1420        let owner_id = [3u8; 32];
1421        let flags = StorageFlags::SingleEpochOwned(5, owner_id);
1422        let (key_removal, value_removal) = flags.split_storage_removed_bytes(100, 50);
1423
1424        assert_eq!(
1425            key_removal,
1426            StorageRemovedBytes::SectionedStorageRemoval({
1427                let mut map = BTreeMap::new();
1428                map.insert(owner_id, IntMap::from_iter([(5u16, 100)]));
1429                map
1430            })
1431        );
1432        assert_eq!(
1433            value_removal,
1434            StorageRemovedBytes::SectionedStorageRemoval({
1435                let mut map = BTreeMap::new();
1436                map.insert(owner_id, IntMap::from_iter([(5u16, 50)]));
1437                map
1438            })
1439        );
1440    }
1441
1442    /// Ensures that the function correctly handles when there are more bytes to
1443    /// be removed than are available in the epoch map, requiring the base epoch
1444    /// to be used
1445    #[test]
1446    fn test_multi_epoch_removal_excess_bytes() {
1447        let mut other_epochs = create_epoch_map(6, 100);
1448        other_epochs.insert(7, 200);
1449
1450        let flags = StorageFlags::MultiEpoch(5, other_epochs);
1451        let (key_removal, value_removal) = flags.split_storage_removed_bytes(400, 300);
1452
1453        assert_eq!(
1454            key_removal,
1455            StorageRemovedBytes::SectionedStorageRemoval({
1456                let mut map = BTreeMap::new();
1457                map.insert(default_owner_id(), IntMap::from_iter([(5u16, 400)]));
1458                map
1459            })
1460        );
1461        assert_eq!(
1462            value_removal,
1463            StorageRemovedBytes::SectionedStorageRemoval({
1464                let mut map = BTreeMap::new();
1465                map.insert(
1466                    default_owner_id(),
1467                    IntMap::from_iter([(7u16, 197), (6u16, 97), (5u16, 6)]),
1468                );
1469                map
1470            })
1471        );
1472    }
1473
1474    /// Similar to the previous test, but for owned flags with OwnerId
1475    #[test]
1476    fn test_multi_epoch_owned_removal_excess_bytes() {
1477        let owner_id = [4u8; 32];
1478        let mut other_epochs = create_epoch_map(6, 100);
1479        other_epochs.insert(7, 200);
1480
1481        let flags = StorageFlags::MultiEpochOwned(5, other_epochs, owner_id);
1482        let (key_removal, value_removal) = flags.split_storage_removed_bytes(450, 350);
1483
1484        assert_eq!(
1485            key_removal,
1486            StorageRemovedBytes::SectionedStorageRemoval({
1487                let mut map = BTreeMap::new();
1488                map.insert(owner_id, IntMap::from_iter([(5u16, 450)]));
1489                map
1490            })
1491        );
1492        assert_eq!(
1493            value_removal,
1494            StorageRemovedBytes::SectionedStorageRemoval({
1495                let mut map = BTreeMap::new();
1496                map.insert(
1497                    owner_id,
1498                    IntMap::from_iter([(7u16, 197), (6u16, 97), (5u16, 56)]),
1499                );
1500                map
1501            })
1502        );
1503    }
1504
1505    #[test]
1506    /// This test verifies the `split_storage_removed_bytes` function when all
1507    /// required bytes are taken from non-base epochs during the removal
1508    /// process.
1509    ///
1510    /// The scenario:
1511    /// - The test initializes a `StorageFlags::MultiEpochOwned` with a
1512    ///   `BaseEpoch` of 5.
1513    /// - Two additional epochs, 6 and 7, are provided with 300 and 400 bytes
1514    ///   respectively.
1515    /// - The function is then called to remove 700 bytes from the value, while
1516    ///   no bytes are removed from the key.
1517    ///
1518    /// The expected behavior:
1519    /// - For key removal: No bytes should be removed since the key removal
1520    ///   request is zero.
1521    /// - For value removal: It should consume all 400 bytes from epoch 7 (LIFO
1522    ///   order) and the remaining 300 bytes from epoch 6.
1523    fn test_multi_epoch_owned_removal_all_bytes_taken_from_non_base_epoch() {
1524        // Define the owner ID as a 32-byte array filled with 5s.
1525        let owner_id = [5u8; 32];
1526
1527        // Create a map for additional epochs with 300 bytes in epoch 6.
1528        let mut other_epochs = create_epoch_map(6, 300);
1529
1530        // Insert 400 bytes for epoch 7 into the map.
1531        other_epochs.insert(7, 400);
1532
1533        // Initialize the `StorageFlags::MultiEpochOwned` with base epoch 5, additional
1534        // epochs, and the owner ID.
1535        let flags = StorageFlags::MultiEpochOwned(5, other_epochs, owner_id);
1536
1537        // Call the function to split the storage removal bytes, expecting to remove 700
1538        // bytes from the value.
1539        let (key_removal, value_removal) = flags.split_storage_removed_bytes(0, 700);
1540
1541        // Verify that no bytes are removed from the key.
1542        assert_eq!(key_removal, StorageRemovedBytes::NoStorageRemoval);
1543
1544        // Verify that 700 bytes are removed from the value, consuming 400 bytes from
1545        // epoch 7 and 300 bytes from epoch 6.
1546        assert_eq!(
1547            value_removal,
1548            StorageRemovedBytes::SectionedStorageRemoval({
1549                let mut map = BTreeMap::new();
1550                map.insert(
1551                    owner_id,
1552                    IntMap::from_iter([(5u16, 6), (6u16, 297), (7u16, 397)]),
1553                );
1554                map
1555            })
1556        );
1557    }
1558
1559    #[test]
1560    fn test_multi_epoch_removal_remaining_base_epoch() {
1561        let mut other_epochs = create_epoch_map(6, 300);
1562        other_epochs.insert(7, 100);
1563
1564        let flags = StorageFlags::MultiEpoch(5, other_epochs);
1565        let (key_removal, value_removal) = flags.split_storage_removed_bytes(400, 500);
1566
1567        assert_eq!(
1568            key_removal,
1569            StorageRemovedBytes::SectionedStorageRemoval({
1570                let mut map = BTreeMap::new();
1571                map.insert(default_owner_id(), IntMap::from_iter([(5u16, 400)]));
1572                map
1573            })
1574        );
1575        assert_eq!(
1576            value_removal,
1577            StorageRemovedBytes::SectionedStorageRemoval({
1578                let mut map = BTreeMap::new();
1579                map.insert(
1580                    default_owner_id(),
1581                    IntMap::from_iter([(7u16, 97), (6u16, 297), (5u16, 106)]),
1582                );
1583                map
1584            })
1585        );
1586    }
1587}