hypercore/
core.rs

1//! Hypercore's main abstraction. Exposes an append-only, secure log structure.
2use ed25519_dalek::Signature;
3use futures::future::Either;
4use std::convert::TryFrom;
5use std::fmt::Debug;
6use tracing::instrument;
7
8#[cfg(feature = "cache")]
9use crate::common::cache::CacheOptions;
10use crate::{
11    bitfield::Bitfield,
12    common::{BitfieldUpdate, HypercoreError, NodeByteRange, Proof, StoreInfo, ValuelessProof},
13    crypto::{generate_signing_key, PartialKeypair},
14    data::BlockStore,
15    oplog::{Header, Oplog, MAX_OPLOG_ENTRIES_BYTE_SIZE},
16    storage::Storage,
17    tree::{MerkleTree, MerkleTreeChangeset},
18    RequestBlock, RequestSeek, RequestUpgrade,
19};
20
21#[derive(Debug)]
22pub(crate) struct HypercoreOptions {
23    pub(crate) key_pair: Option<PartialKeypair>,
24    pub(crate) open: bool,
25    #[cfg(feature = "cache")]
26    pub(crate) node_cache_options: Option<CacheOptions>,
27}
28
29impl HypercoreOptions {
30    pub(crate) fn new() -> Self {
31        Self {
32            key_pair: None,
33            open: false,
34            #[cfg(feature = "cache")]
35            node_cache_options: None,
36        }
37    }
38}
39
40/// Hypercore is an append-only log structure.
41#[derive(Debug)]
42pub struct Hypercore {
43    pub(crate) key_pair: PartialKeypair,
44    pub(crate) storage: Storage,
45    pub(crate) oplog: Oplog,
46    pub(crate) tree: MerkleTree,
47    pub(crate) block_store: BlockStore,
48    pub(crate) bitfield: Bitfield,
49    skip_flush_count: u8, // autoFlush in Javascript
50    header: Header,
51    #[cfg(feature = "replication")]
52    events: crate::replication::events::Events,
53}
54
55/// Response from append, matches that of the Javascript result
56#[derive(Debug, PartialEq)]
57pub struct AppendOutcome {
58    /// Length of the hypercore after append
59    pub length: u64,
60    /// Byte length of the hypercore after append
61    pub byte_length: u64,
62}
63
64/// Info about the hypercore
65#[derive(Debug, PartialEq)]
66pub struct Info {
67    /// Length of the hypercore
68    pub length: u64,
69    /// Byte length of the hypercore
70    pub byte_length: u64,
71    /// Continuous length of entries in the hypercore with data
72    /// starting from index 0
73    pub contiguous_length: u64,
74    /// Fork index. 0 if hypercore not forked.
75    pub fork: u64,
76    /// True if hypercore is writeable, false if read-only
77    pub writeable: bool,
78}
79
80impl Hypercore {
81    /// Creates/opens new hypercore using given storage and options
82    pub(crate) async fn new(
83        mut storage: Storage,
84        mut options: HypercoreOptions,
85    ) -> Result<Hypercore, HypercoreError> {
86        let key_pair: Option<PartialKeypair> = if options.open {
87            if options.key_pair.is_some() {
88                return Err(HypercoreError::BadArgument {
89                    context: "Key pair can not be used when building an openable hypercore"
90                        .to_string(),
91                });
92            }
93            None
94        } else {
95            Some(options.key_pair.take().unwrap_or_else(|| {
96                let signing_key = generate_signing_key();
97                PartialKeypair {
98                    public: signing_key.verifying_key(),
99                    secret: Some(signing_key),
100                }
101            }))
102        };
103
104        // Open/create oplog
105        let mut oplog_open_outcome = match Oplog::open(&key_pair, None)? {
106            Either::Right(value) => value,
107            Either::Left(instruction) => {
108                let info = storage.read_info(instruction).await?;
109                match Oplog::open(&key_pair, Some(info))? {
110                    Either::Right(value) => value,
111                    Either::Left(_) => {
112                        return Err(HypercoreError::InvalidOperation {
113                            context: "Could not open oplog".to_string(),
114                        });
115                    }
116                }
117            }
118        };
119        storage
120            .flush_infos(&oplog_open_outcome.infos_to_flush)
121            .await?;
122
123        // Open/create tree
124        let mut tree = match MerkleTree::open(
125            &oplog_open_outcome.header.tree,
126            None,
127            #[cfg(feature = "cache")]
128            &options.node_cache_options,
129        )? {
130            Either::Right(value) => value,
131            Either::Left(instructions) => {
132                let infos = storage.read_infos(&instructions).await?;
133                match MerkleTree::open(
134                    &oplog_open_outcome.header.tree,
135                    Some(&infos),
136                    #[cfg(feature = "cache")]
137                    &options.node_cache_options,
138                )? {
139                    Either::Right(value) => value,
140                    Either::Left(_) => {
141                        return Err(HypercoreError::InvalidOperation {
142                            context: "Could not open tree".to_string(),
143                        });
144                    }
145                }
146            }
147        };
148
149        // Create block store instance
150        let block_store = BlockStore::default();
151
152        // Open bitfield
153        let mut bitfield = match Bitfield::open(None) {
154            Either::Right(value) => value,
155            Either::Left(instruction) => {
156                let info = storage.read_info(instruction).await?;
157                match Bitfield::open(Some(info)) {
158                    Either::Right(value) => value,
159                    Either::Left(instruction) => {
160                        let info = storage.read_info(instruction).await?;
161                        match Bitfield::open(Some(info)) {
162                            Either::Right(value) => value,
163                            Either::Left(_) => {
164                                return Err(HypercoreError::InvalidOperation {
165                                    context: "Could not open bitfield".to_string(),
166                                });
167                            }
168                        }
169                    }
170                }
171            }
172        };
173
174        // Process entries stored only to the oplog and not yet flushed into bitfield or tree
175        if let Some(entries) = oplog_open_outcome.entries {
176            for entry in entries.iter() {
177                for node in &entry.tree_nodes {
178                    tree.add_node(node.clone());
179                }
180
181                if let Some(bitfield_update) = &entry.bitfield {
182                    bitfield.update(bitfield_update);
183                    update_contiguous_length(
184                        &mut oplog_open_outcome.header,
185                        &bitfield,
186                        bitfield_update,
187                    );
188                }
189                if let Some(tree_upgrade) = &entry.tree_upgrade {
190                    // TODO: Generalize Either response stack
191                    let mut changeset =
192                        match tree.truncate(tree_upgrade.length, tree_upgrade.fork, None)? {
193                            Either::Right(value) => value,
194                            Either::Left(instructions) => {
195                                let infos = storage.read_infos(&instructions).await?;
196                                match tree.truncate(
197                                    tree_upgrade.length,
198                                    tree_upgrade.fork,
199                                    Some(&infos),
200                                )? {
201                                    Either::Right(value) => value,
202                                    Either::Left(_) => {
203                                        return Err(HypercoreError::InvalidOperation {
204                                            context: format!(
205                                                "Could not truncate tree to length {}",
206                                                tree_upgrade.length
207                                            ),
208                                        });
209                                    }
210                                }
211                            }
212                        };
213                    changeset.ancestors = tree_upgrade.ancestors;
214                    changeset.hash = Some(changeset.hash());
215                    changeset.signature =
216                        Some(Signature::try_from(&*tree_upgrade.signature).map_err(|_| {
217                            HypercoreError::InvalidSignature {
218                                context: "Could not parse changeset signature".to_string(),
219                            }
220                        })?);
221
222                    // Update the header with this changeset to make in-memory value match that
223                    // of the stored value.
224                    oplog_open_outcome.oplog.update_header_with_changeset(
225                        &changeset,
226                        None,
227                        &mut oplog_open_outcome.header,
228                    )?;
229
230                    // TODO: Skip reorg hints for now, seems to only have to do with replication
231                    // addReorgHint(header.hints.reorgs, tree, batch)
232
233                    // Commit changeset to in-memory tree
234                    tree.commit(changeset)?;
235                }
236            }
237        }
238
239        let oplog = oplog_open_outcome.oplog;
240        let header = oplog_open_outcome.header;
241        let key_pair = header.key_pair.clone();
242
243        Ok(Hypercore {
244            key_pair,
245            storage,
246            oplog,
247            tree,
248            block_store,
249            bitfield,
250            header,
251            skip_flush_count: 0,
252            #[cfg(feature = "replication")]
253            events: crate::replication::events::Events::new(),
254        })
255    }
256
257    /// Gets basic info about the Hypercore
258    pub fn info(&self) -> Info {
259        Info {
260            length: self.tree.length,
261            byte_length: self.tree.byte_length,
262            contiguous_length: self.header.hints.contiguous_length,
263            fork: self.tree.fork,
264            writeable: self.key_pair.secret.is_some(),
265        }
266    }
267
268    /// Appends a data slice to the hypercore.
269    #[instrument(err, skip_all, fields(data_len = data.len()))]
270    pub async fn append(&mut self, data: &[u8]) -> Result<AppendOutcome, HypercoreError> {
271        self.append_batch(&[data]).await
272    }
273
274    /// Appends a given batch of data slices to the hypercore.
275    #[instrument(err, skip_all, fields(batch_len = batch.as_ref().len()))]
276    pub async fn append_batch<A: AsRef<[u8]>, B: AsRef<[A]>>(
277        &mut self,
278        batch: B,
279    ) -> Result<AppendOutcome, HypercoreError> {
280        let secret_key = match &self.key_pair.secret {
281            Some(key) => key,
282            None => return Err(HypercoreError::NotWritable),
283        };
284
285        if !batch.as_ref().is_empty() {
286            // Create a changeset for the tree
287            let mut changeset = self.tree.changeset();
288            let mut batch_length: usize = 0;
289            for data in batch.as_ref().iter() {
290                batch_length += changeset.append(data.as_ref());
291            }
292            changeset.hash_and_sign(secret_key);
293
294            // Write the received data to the block store
295            let info =
296                self.block_store
297                    .append_batch(batch.as_ref(), batch_length, self.tree.byte_length);
298            self.storage.flush_info(info).await?;
299
300            // Append the changeset to the Oplog
301            let bitfield_update = BitfieldUpdate {
302                drop: false,
303                start: changeset.ancestors,
304                length: changeset.batch_length,
305            };
306            let outcome = self.oplog.append_changeset(
307                &changeset,
308                Some(bitfield_update.clone()),
309                false,
310                &self.header,
311            )?;
312            self.storage.flush_infos(&outcome.infos_to_flush).await?;
313            self.header = outcome.header;
314
315            // Write to bitfield
316            self.bitfield.update(&bitfield_update);
317
318            // Contiguous length is known only now
319            update_contiguous_length(&mut self.header, &self.bitfield, &bitfield_update);
320
321            // Commit changeset to in-memory tree
322            self.tree.commit(changeset)?;
323
324            // Now ready to flush
325            if self.should_flush_bitfield_and_tree_and_oplog() {
326                self.flush_bitfield_and_tree_and_oplog(false).await?;
327            }
328
329            #[cfg(feature = "replication")]
330            {
331                let _ = self.events.send(crate::replication::events::DataUpgrade {});
332                let _ = self
333                    .events
334                    .send(crate::replication::events::Have::from(&bitfield_update));
335            }
336        }
337
338        // Return the new value
339        Ok(AppendOutcome {
340            length: self.tree.length,
341            byte_length: self.tree.byte_length,
342        })
343    }
344
345    #[cfg(feature = "replication")]
346    /// Subscribe to core events relevant to replication
347    pub fn event_subscribe(&self) -> async_broadcast::Receiver<crate::replication::events::Event> {
348        self.events.channel.new_receiver()
349    }
350
351    /// Check if core has the block at the given `index` locally
352    #[instrument(ret, skip(self))]
353    pub fn has(&self, index: u64) -> bool {
354        self.bitfield.get(index)
355    }
356
357    /// Read value at given index, if any.
358    #[instrument(err, skip(self))]
359    pub async fn get(&mut self, index: u64) -> Result<Option<Vec<u8>>, HypercoreError> {
360        if !self.bitfield.get(index) {
361            #[cfg(feature = "replication")]
362            // if not in this core, emit Event::Get(index)
363            {
364                self.events.send_on_get(index);
365            }
366            return Ok(None);
367        }
368
369        let byte_range = self.byte_range(index, None).await?;
370
371        // TODO: Generalize Either response stack
372        let data = match self.block_store.read(&byte_range, None) {
373            Either::Right(value) => value,
374            Either::Left(instruction) => {
375                let info = self.storage.read_info(instruction).await?;
376                match self.block_store.read(&byte_range, Some(info)) {
377                    Either::Right(value) => value,
378                    Either::Left(_) => {
379                        return Err(HypercoreError::InvalidOperation {
380                            context: "Could not read block storage range".to_string(),
381                        });
382                    }
383                }
384            }
385        };
386
387        Ok(Some(data.to_vec()))
388    }
389
390    /// Clear data for entries between start and end (exclusive) indexes.
391    #[instrument(err, skip(self))]
392    pub async fn clear(&mut self, start: u64, end: u64) -> Result<(), HypercoreError> {
393        if start >= end {
394            // NB: This is what javascript does, so we mimic that here
395            return Ok(());
396        }
397        // Write to oplog
398        let infos_to_flush = self.oplog.clear(start, end)?;
399        self.storage.flush_infos(&infos_to_flush).await?;
400
401        // Set bitfield
402        self.bitfield.set_range(start, end - start, false);
403
404        // Set contiguous length
405        if start < self.header.hints.contiguous_length {
406            self.header.hints.contiguous_length = start;
407        }
408
409        // Find the biggest hole that can be punched into the data
410        let start = if let Some(index) = self.bitfield.last_index_of(true, start) {
411            index + 1
412        } else {
413            0
414        };
415        let end = if let Some(index) = self.bitfield.index_of(true, end) {
416            index
417        } else {
418            self.tree.length
419        };
420
421        // Find byte offset for first value
422        let mut infos: Vec<StoreInfo> = Vec::new();
423        let clear_offset = match self.tree.byte_offset(start, None)? {
424            Either::Right(value) => value,
425            Either::Left(instructions) => {
426                let new_infos = self.storage.read_infos_to_vec(&instructions).await?;
427                infos.extend(new_infos);
428                match self.tree.byte_offset(start, Some(&infos))? {
429                    Either::Right(value) => value,
430                    Either::Left(_) => {
431                        return Err(HypercoreError::InvalidOperation {
432                            context: format!("Could not read offset for index {start} from tree"),
433                        });
434                    }
435                }
436            }
437        };
438
439        // Find byte range for last value
440        let last_byte_range = self.byte_range(end - 1, Some(&infos)).await?;
441
442        let clear_length = (last_byte_range.index + last_byte_range.length) - clear_offset;
443
444        // Clear blocks
445        let info_to_flush = self.block_store.clear(clear_offset, clear_length);
446        self.storage.flush_info(info_to_flush).await?;
447
448        // Now ready to flush
449        if self.should_flush_bitfield_and_tree_and_oplog() {
450            self.flush_bitfield_and_tree_and_oplog(false).await?;
451        }
452
453        Ok(())
454    }
455
456    /// Access the key pair.
457    pub fn key_pair(&self) -> &PartialKeypair {
458        &self.key_pair
459    }
460
461    /// Create a proof for given request
462    #[instrument(err, skip_all)]
463    pub async fn create_proof(
464        &mut self,
465        block: Option<RequestBlock>,
466        hash: Option<RequestBlock>,
467        seek: Option<RequestSeek>,
468        upgrade: Option<RequestUpgrade>,
469    ) -> Result<Option<Proof>, HypercoreError> {
470        let valueless_proof = self
471            .create_valueless_proof(block, hash, seek, upgrade)
472            .await?;
473        let value: Option<Vec<u8>> = if let Some(block) = valueless_proof.block.as_ref() {
474            let value = self.get(block.index).await?;
475            if value.is_none() {
476                // The data value requested in the proof can not be read, we return None here
477                // and let the party requesting figure out what to do.
478                return Ok(None);
479            }
480            value
481        } else {
482            None
483        };
484        Ok(Some(valueless_proof.into_proof(value)))
485    }
486
487    /// Verify and apply proof received from peer, returns true if changed, false if not
488    /// possible to apply.
489    #[instrument(skip_all)]
490    pub async fn verify_and_apply_proof(&mut self, proof: &Proof) -> Result<bool, HypercoreError> {
491        if proof.fork != self.tree.fork {
492            return Ok(false);
493        }
494        let changeset = self.verify_proof(proof).await?;
495        if !self.tree.commitable(&changeset) {
496            return Ok(false);
497        }
498
499        // In javascript there's _verifyExclusive and _verifyShared based on changeset.upgraded, but
500        // here we do only one. _verifyShared groups together many subsequent changesets into a single
501        // oplog push, and then flushes in the end only for the whole group.
502        let bitfield_update: Option<BitfieldUpdate> = if let Some(block) = &proof.block.as_ref() {
503            let byte_offset =
504                match self
505                    .tree
506                    .byte_offset_in_changeset(block.index, &changeset, None)?
507                {
508                    Either::Right(value) => value,
509                    Either::Left(instructions) => {
510                        let infos = self.storage.read_infos_to_vec(&instructions).await?;
511                        match self.tree.byte_offset_in_changeset(
512                            block.index,
513                            &changeset,
514                            Some(&infos),
515                        )? {
516                            Either::Right(value) => value,
517                            Either::Left(_) => {
518                                return Err(HypercoreError::InvalidOperation {
519                                    context: format!(
520                                        "Could not read offset for index {} from tree",
521                                        block.index
522                                    ),
523                                });
524                            }
525                        }
526                    }
527                };
528
529            // Write the value to the block store
530            let info_to_flush = self.block_store.put(&block.value, byte_offset);
531            self.storage.flush_info(info_to_flush).await?;
532
533            // Return a bitfield update for the given value
534            Some(BitfieldUpdate {
535                drop: false,
536                start: block.index,
537                length: 1,
538            })
539        } else {
540            // Only from DataBlock can there be changes to the bitfield
541            None
542        };
543
544        // Append the changeset to the Oplog
545        let outcome = self.oplog.append_changeset(
546            &changeset,
547            bitfield_update.clone(),
548            false,
549            &self.header,
550        )?;
551        self.storage.flush_infos(&outcome.infos_to_flush).await?;
552        self.header = outcome.header;
553
554        if let Some(bitfield_update) = &bitfield_update {
555            // Write to bitfield
556            self.bitfield.update(bitfield_update);
557
558            // Contiguous length is known only now
559            update_contiguous_length(&mut self.header, &self.bitfield, bitfield_update);
560        }
561
562        // Commit changeset to in-memory tree
563        self.tree.commit(changeset)?;
564
565        // Now ready to flush
566        if self.should_flush_bitfield_and_tree_and_oplog() {
567            self.flush_bitfield_and_tree_and_oplog(false).await?;
568        }
569
570        #[cfg(feature = "replication")]
571        {
572            if proof.upgrade.is_some() {
573                // Notify replicator if we receieved an upgrade
574                let _ = self.events.send(crate::replication::events::DataUpgrade {});
575            }
576
577            // Notify replicator if we receieved a bitfield update
578            if let Some(ref bitfield) = bitfield_update {
579                let _ = self
580                    .events
581                    .send(crate::replication::events::Have::from(bitfield));
582            }
583        }
584        Ok(true)
585    }
586
587    /// Used to fill the nodes field of a `RequestBlock` during
588    /// synchronization.
589    #[instrument(err, skip(self))]
590    pub async fn missing_nodes(&mut self, index: u64) -> Result<u64, HypercoreError> {
591        self.missing_nodes_from_merkle_tree_index(index * 2).await
592    }
593
594    /// Get missing nodes using a merkle tree index. Advanced variant of missing_nodex
595    /// that allow for special cases of searching directly from the merkle tree.
596    #[instrument(err, skip(self))]
597    pub async fn missing_nodes_from_merkle_tree_index(
598        &mut self,
599        merkle_tree_index: u64,
600    ) -> Result<u64, HypercoreError> {
601        match self.tree.missing_nodes(merkle_tree_index, None)? {
602            Either::Right(value) => Ok(value),
603            Either::Left(instructions) => {
604                let mut instructions = instructions;
605                let mut infos: Vec<StoreInfo> = vec![];
606                loop {
607                    infos.extend(self.storage.read_infos_to_vec(&instructions).await?);
608                    match self.tree.missing_nodes(merkle_tree_index, Some(&infos))? {
609                        Either::Right(value) => {
610                            return Ok(value);
611                        }
612                        Either::Left(new_instructions) => {
613                            instructions = new_instructions;
614                        }
615                    }
616                }
617            }
618        }
619    }
620
621    /// Makes the hypercore read-only by deleting the secret key. Returns true if the
622    /// hypercore was changed, false if the hypercore was already read-only. This is useful
623    /// in scenarios where a hypercore should be made immutable after initial values have
624    /// been stored.
625    #[instrument(err, skip_all)]
626    pub async fn make_read_only(&mut self) -> Result<bool, HypercoreError> {
627        if self.key_pair.secret.is_some() {
628            self.key_pair.secret = None;
629            self.header.key_pair.secret = None;
630            // Need to flush clearing traces to make sure both oplog slots are cleared
631            self.flush_bitfield_and_tree_and_oplog(true).await?;
632            Ok(true)
633        } else {
634            Ok(false)
635        }
636    }
637
638    async fn byte_range(
639        &mut self,
640        index: u64,
641        initial_infos: Option<&[StoreInfo]>,
642    ) -> Result<NodeByteRange, HypercoreError> {
643        match self.tree.byte_range(index, initial_infos)? {
644            Either::Right(value) => Ok(value),
645            Either::Left(instructions) => {
646                let mut instructions = instructions;
647                let mut infos: Vec<StoreInfo> = vec![];
648                loop {
649                    infos.extend(self.storage.read_infos_to_vec(&instructions).await?);
650                    match self.tree.byte_range(index, Some(&infos))? {
651                        Either::Right(value) => {
652                            return Ok(value);
653                        }
654                        Either::Left(new_instructions) => {
655                            instructions = new_instructions;
656                        }
657                    }
658                }
659            }
660        }
661    }
662
663    async fn create_valueless_proof(
664        &mut self,
665        block: Option<RequestBlock>,
666        hash: Option<RequestBlock>,
667        seek: Option<RequestSeek>,
668        upgrade: Option<RequestUpgrade>,
669    ) -> Result<ValuelessProof, HypercoreError> {
670        match self.tree.create_valueless_proof(
671            block.as_ref(),
672            hash.as_ref(),
673            seek.as_ref(),
674            upgrade.as_ref(),
675            None,
676        )? {
677            Either::Right(value) => Ok(value),
678            Either::Left(instructions) => {
679                let mut instructions = instructions;
680                let mut infos: Vec<StoreInfo> = vec![];
681                loop {
682                    infos.extend(self.storage.read_infos_to_vec(&instructions).await?);
683                    match self.tree.create_valueless_proof(
684                        block.as_ref(),
685                        hash.as_ref(),
686                        seek.as_ref(),
687                        upgrade.as_ref(),
688                        Some(&infos),
689                    )? {
690                        Either::Right(value) => {
691                            return Ok(value);
692                        }
693                        Either::Left(new_instructions) => {
694                            instructions = new_instructions;
695                        }
696                    }
697                }
698            }
699        }
700    }
701
702    /// Verify a proof received from a peer. Returns a changeset that should be
703    /// applied.
704    async fn verify_proof(&mut self, proof: &Proof) -> Result<MerkleTreeChangeset, HypercoreError> {
705        match self.tree.verify_proof(proof, &self.key_pair.public, None)? {
706            Either::Right(value) => Ok(value),
707            Either::Left(instructions) => {
708                let infos = self.storage.read_infos_to_vec(&instructions).await?;
709                match self
710                    .tree
711                    .verify_proof(proof, &self.key_pair.public, Some(&infos))?
712                {
713                    Either::Right(value) => Ok(value),
714                    Either::Left(_) => Err(HypercoreError::InvalidOperation {
715                        context: "Could not verify proof from tree".to_string(),
716                    }),
717                }
718            }
719        }
720    }
721
722    fn should_flush_bitfield_and_tree_and_oplog(&mut self) -> bool {
723        if self.skip_flush_count == 0
724            || self.oplog.entries_byte_length >= MAX_OPLOG_ENTRIES_BYTE_SIZE
725        {
726            self.skip_flush_count = 3;
727            true
728        } else {
729            self.skip_flush_count -= 1;
730            false
731        }
732    }
733
734    async fn flush_bitfield_and_tree_and_oplog(
735        &mut self,
736        clear_traces: bool,
737    ) -> Result<(), HypercoreError> {
738        let infos = self.bitfield.flush();
739        self.storage.flush_infos(&infos).await?;
740        let infos = self.tree.flush();
741        self.storage.flush_infos(&infos).await?;
742        let infos = self.oplog.flush(&self.header, clear_traces)?;
743        self.storage.flush_infos(&infos).await?;
744        Ok(())
745    }
746}
747
748fn update_contiguous_length(
749    header: &mut Header,
750    bitfield: &Bitfield,
751    bitfield_update: &BitfieldUpdate,
752) {
753    let end = bitfield_update.start + bitfield_update.length;
754    let mut c = header.hints.contiguous_length;
755    if bitfield_update.drop {
756        if c <= end && c > bitfield_update.start {
757            c = bitfield_update.start;
758        }
759    } else if c <= end && c >= bitfield_update.start {
760        c = end;
761        while bitfield.get(c) {
762            c += 1;
763        }
764    }
765
766    if c != header.hints.contiguous_length {
767        header.hints.contiguous_length = c;
768    }
769}
770
771#[cfg(test)]
772pub(crate) mod tests {
773    use super::*;
774
775    #[async_std::test]
776    async fn core_create_proof_block_only() -> Result<(), HypercoreError> {
777        let mut hypercore = create_hypercore_with_data(10).await?;
778
779        let proof = hypercore
780            .create_proof(Some(RequestBlock { index: 4, nodes: 2 }), None, None, None)
781            .await?
782            .unwrap();
783        let block = proof.block.unwrap();
784        assert_eq!(proof.upgrade, None);
785        assert_eq!(proof.seek, None);
786        assert_eq!(block.index, 4);
787        assert_eq!(block.nodes.len(), 2);
788        assert_eq!(block.nodes[0].index, 10);
789        assert_eq!(block.nodes[1].index, 13);
790        Ok(())
791    }
792
793    #[async_std::test]
794    async fn core_create_proof_block_and_upgrade() -> Result<(), HypercoreError> {
795        let mut hypercore = create_hypercore_with_data(10).await?;
796        let proof = hypercore
797            .create_proof(
798                Some(RequestBlock { index: 4, nodes: 0 }),
799                None,
800                None,
801                Some(RequestUpgrade {
802                    start: 0,
803                    length: 10,
804                }),
805            )
806            .await?
807            .unwrap();
808        let block = proof.block.unwrap();
809        let upgrade = proof.upgrade.unwrap();
810        assert_eq!(proof.seek, None);
811        assert_eq!(block.index, 4);
812        assert_eq!(block.nodes.len(), 3);
813        assert_eq!(block.nodes[0].index, 10);
814        assert_eq!(block.nodes[1].index, 13);
815        assert_eq!(block.nodes[2].index, 3);
816        assert_eq!(upgrade.start, 0);
817        assert_eq!(upgrade.length, 10);
818        assert_eq!(upgrade.nodes.len(), 1);
819        assert_eq!(upgrade.nodes[0].index, 17);
820        assert_eq!(upgrade.additional_nodes.len(), 0);
821        Ok(())
822    }
823
824    #[async_std::test]
825    async fn core_create_proof_block_and_upgrade_and_additional() -> Result<(), HypercoreError> {
826        let mut hypercore = create_hypercore_with_data(10).await?;
827        let proof = hypercore
828            .create_proof(
829                Some(RequestBlock { index: 4, nodes: 0 }),
830                None,
831                None,
832                Some(RequestUpgrade {
833                    start: 0,
834                    length: 8,
835                }),
836            )
837            .await?
838            .unwrap();
839        let block = proof.block.unwrap();
840        let upgrade = proof.upgrade.unwrap();
841        assert_eq!(proof.seek, None);
842        assert_eq!(block.index, 4);
843        assert_eq!(block.nodes.len(), 3);
844        assert_eq!(block.nodes[0].index, 10);
845        assert_eq!(block.nodes[1].index, 13);
846        assert_eq!(block.nodes[2].index, 3);
847        assert_eq!(upgrade.start, 0);
848        assert_eq!(upgrade.length, 8);
849        assert_eq!(upgrade.nodes.len(), 0);
850        assert_eq!(upgrade.additional_nodes.len(), 1);
851        assert_eq!(upgrade.additional_nodes[0].index, 17);
852        Ok(())
853    }
854
855    #[async_std::test]
856    async fn core_create_proof_block_and_upgrade_from_existing_state() -> Result<(), HypercoreError>
857    {
858        let mut hypercore = create_hypercore_with_data(10).await?;
859        let proof = hypercore
860            .create_proof(
861                Some(RequestBlock { index: 1, nodes: 0 }),
862                None,
863                None,
864                Some(RequestUpgrade {
865                    start: 1,
866                    length: 9,
867                }),
868            )
869            .await?
870            .unwrap();
871        let block = proof.block.unwrap();
872        let upgrade = proof.upgrade.unwrap();
873        assert_eq!(proof.seek, None);
874        assert_eq!(block.index, 1);
875        assert_eq!(block.nodes.len(), 0);
876        assert_eq!(upgrade.start, 1);
877        assert_eq!(upgrade.length, 9);
878        assert_eq!(upgrade.nodes.len(), 3);
879        assert_eq!(upgrade.nodes[0].index, 5);
880        assert_eq!(upgrade.nodes[1].index, 11);
881        assert_eq!(upgrade.nodes[2].index, 17);
882        assert_eq!(upgrade.additional_nodes.len(), 0);
883        Ok(())
884    }
885
886    #[async_std::test]
887    async fn core_create_proof_block_and_upgrade_from_existing_state_with_additional(
888    ) -> Result<(), HypercoreError> {
889        let mut hypercore = create_hypercore_with_data(10).await?;
890        let proof = hypercore
891            .create_proof(
892                Some(RequestBlock { index: 1, nodes: 0 }),
893                None,
894                None,
895                Some(RequestUpgrade {
896                    start: 1,
897                    length: 5,
898                }),
899            )
900            .await?
901            .unwrap();
902        let block = proof.block.unwrap();
903        let upgrade = proof.upgrade.unwrap();
904        assert_eq!(proof.seek, None);
905        assert_eq!(block.index, 1);
906        assert_eq!(block.nodes.len(), 0);
907        assert_eq!(upgrade.start, 1);
908        assert_eq!(upgrade.length, 5);
909        assert_eq!(upgrade.nodes.len(), 2);
910        assert_eq!(upgrade.nodes[0].index, 5);
911        assert_eq!(upgrade.nodes[1].index, 9);
912        assert_eq!(upgrade.additional_nodes.len(), 2);
913        assert_eq!(upgrade.additional_nodes[0].index, 13);
914        assert_eq!(upgrade.additional_nodes[1].index, 17);
915        Ok(())
916    }
917
918    #[async_std::test]
919    async fn core_create_proof_block_and_seek_1_no_upgrade() -> Result<(), HypercoreError> {
920        let mut hypercore = create_hypercore_with_data(10).await?;
921        let proof = hypercore
922            .create_proof(
923                Some(RequestBlock { index: 4, nodes: 2 }),
924                None,
925                Some(RequestSeek { bytes: 8 }),
926                None,
927            )
928            .await?
929            .unwrap();
930        let block = proof.block.unwrap();
931        assert_eq!(proof.seek, None); // seek included in block
932        assert_eq!(proof.upgrade, None);
933        assert_eq!(block.index, 4);
934        assert_eq!(block.nodes.len(), 2);
935        assert_eq!(block.nodes[0].index, 10);
936        assert_eq!(block.nodes[1].index, 13);
937        Ok(())
938    }
939
940    #[async_std::test]
941    async fn core_create_proof_block_and_seek_2_no_upgrade() -> Result<(), HypercoreError> {
942        let mut hypercore = create_hypercore_with_data(10).await?;
943        let proof = hypercore
944            .create_proof(
945                Some(RequestBlock { index: 4, nodes: 2 }),
946                None,
947                Some(RequestSeek { bytes: 10 }),
948                None,
949            )
950            .await?
951            .unwrap();
952        let block = proof.block.unwrap();
953        assert_eq!(proof.seek, None); // seek included in block
954        assert_eq!(proof.upgrade, None);
955        assert_eq!(block.index, 4);
956        assert_eq!(block.nodes.len(), 2);
957        assert_eq!(block.nodes[0].index, 10);
958        assert_eq!(block.nodes[1].index, 13);
959        Ok(())
960    }
961
962    #[async_std::test]
963    async fn core_create_proof_block_and_seek_3_no_upgrade() -> Result<(), HypercoreError> {
964        let mut hypercore = create_hypercore_with_data(10).await?;
965        let proof = hypercore
966            .create_proof(
967                Some(RequestBlock { index: 4, nodes: 2 }),
968                None,
969                Some(RequestSeek { bytes: 13 }),
970                None,
971            )
972            .await?
973            .unwrap();
974        let block = proof.block.unwrap();
975        let seek = proof.seek.unwrap();
976        assert_eq!(proof.upgrade, None);
977        assert_eq!(block.index, 4);
978        assert_eq!(block.nodes.len(), 1);
979        assert_eq!(block.nodes[0].index, 10);
980        assert_eq!(seek.nodes.len(), 2);
981        assert_eq!(seek.nodes[0].index, 12);
982        assert_eq!(seek.nodes[1].index, 14);
983        Ok(())
984    }
985
986    #[async_std::test]
987    async fn core_create_proof_block_and_seek_to_tree_no_upgrade() -> Result<(), HypercoreError> {
988        let mut hypercore = create_hypercore_with_data(16).await?;
989        let proof = hypercore
990            .create_proof(
991                Some(RequestBlock { index: 0, nodes: 4 }),
992                None,
993                Some(RequestSeek { bytes: 26 }),
994                None,
995            )
996            .await?
997            .unwrap();
998        let block = proof.block.unwrap();
999        let seek = proof.seek.unwrap();
1000        assert_eq!(proof.upgrade, None);
1001        assert_eq!(block.nodes.len(), 3);
1002        assert_eq!(block.nodes[0].index, 2);
1003        assert_eq!(block.nodes[1].index, 5);
1004        assert_eq!(block.nodes[2].index, 11);
1005        assert_eq!(seek.nodes.len(), 2);
1006        assert_eq!(seek.nodes[0].index, 19);
1007        assert_eq!(seek.nodes[1].index, 27);
1008        Ok(())
1009    }
1010
1011    #[async_std::test]
1012    async fn core_create_proof_block_and_seek_with_upgrade() -> Result<(), HypercoreError> {
1013        let mut hypercore = create_hypercore_with_data(10).await?;
1014        let proof = hypercore
1015            .create_proof(
1016                Some(RequestBlock { index: 4, nodes: 2 }),
1017                None,
1018                Some(RequestSeek { bytes: 13 }),
1019                Some(RequestUpgrade {
1020                    start: 8,
1021                    length: 2,
1022                }),
1023            )
1024            .await?
1025            .unwrap();
1026        let block = proof.block.unwrap();
1027        let seek = proof.seek.unwrap();
1028        let upgrade = proof.upgrade.unwrap();
1029        assert_eq!(block.index, 4);
1030        assert_eq!(block.nodes.len(), 1);
1031        assert_eq!(block.nodes[0].index, 10);
1032        assert_eq!(seek.nodes.len(), 2);
1033        assert_eq!(seek.nodes[0].index, 12);
1034        assert_eq!(seek.nodes[1].index, 14);
1035        assert_eq!(upgrade.nodes.len(), 1);
1036        assert_eq!(upgrade.nodes[0].index, 17);
1037        assert_eq!(upgrade.additional_nodes.len(), 0);
1038        Ok(())
1039    }
1040
1041    #[async_std::test]
1042    async fn core_create_proof_seek_with_upgrade() -> Result<(), HypercoreError> {
1043        let mut hypercore = create_hypercore_with_data(10).await?;
1044        let proof = hypercore
1045            .create_proof(
1046                None,
1047                None,
1048                Some(RequestSeek { bytes: 13 }),
1049                Some(RequestUpgrade {
1050                    start: 0,
1051                    length: 10,
1052                }),
1053            )
1054            .await?
1055            .unwrap();
1056        let seek = proof.seek.unwrap();
1057        let upgrade = proof.upgrade.unwrap();
1058        assert_eq!(proof.block, None);
1059        assert_eq!(seek.nodes.len(), 4);
1060        assert_eq!(seek.nodes[0].index, 12);
1061        assert_eq!(seek.nodes[1].index, 14);
1062        assert_eq!(seek.nodes[2].index, 9);
1063        assert_eq!(seek.nodes[3].index, 3);
1064        assert_eq!(upgrade.nodes.len(), 1);
1065        assert_eq!(upgrade.nodes[0].index, 17);
1066        assert_eq!(upgrade.additional_nodes.len(), 0);
1067        Ok(())
1068    }
1069
1070    #[async_std::test]
1071    async fn core_verify_proof_invalid_signature() -> Result<(), HypercoreError> {
1072        let mut hypercore = create_hypercore_with_data(10).await?;
1073        // Invalid clone hypercore with a different public key
1074        let mut hypercore_clone = create_hypercore_with_data(0).await?;
1075        let proof = hypercore
1076            .create_proof(
1077                None,
1078                Some(RequestBlock { index: 6, nodes: 0 }),
1079                None,
1080                Some(RequestUpgrade {
1081                    start: 0,
1082                    length: 10,
1083                }),
1084            )
1085            .await?
1086            .unwrap();
1087        assert!(hypercore_clone
1088            .verify_and_apply_proof(&proof)
1089            .await
1090            .is_err());
1091        Ok(())
1092    }
1093
1094    #[async_std::test]
1095    async fn core_verify_and_apply_proof() -> Result<(), HypercoreError> {
1096        let mut main = create_hypercore_with_data(10).await?;
1097        let mut clone = create_hypercore_with_data_and_key_pair(
1098            0,
1099            PartialKeypair {
1100                public: main.key_pair.public,
1101                secret: None,
1102            },
1103        )
1104        .await?;
1105        let index = 6;
1106        let nodes = clone.missing_nodes(index).await?;
1107        let proof = main
1108            .create_proof(
1109                None,
1110                Some(RequestBlock { index, nodes }),
1111                None,
1112                Some(RequestUpgrade {
1113                    start: 0,
1114                    length: 10,
1115                }),
1116            )
1117            .await?
1118            .unwrap();
1119        assert!(clone.verify_and_apply_proof(&proof).await?);
1120        let main_info = main.info();
1121        let clone_info = clone.info();
1122        assert_eq!(main_info.byte_length, clone_info.byte_length);
1123        assert_eq!(main_info.length, clone_info.length);
1124        assert!(main.get(6).await?.is_some());
1125        assert!(clone.get(6).await?.is_none());
1126
1127        // Fetch data for index 6 and verify it is found
1128        let index = 6;
1129        let nodes = clone.missing_nodes(index).await?;
1130        let proof = main
1131            .create_proof(Some(RequestBlock { index, nodes }), None, None, None)
1132            .await?
1133            .unwrap();
1134        assert!(clone.verify_and_apply_proof(&proof).await?);
1135        Ok(())
1136    }
1137
1138    pub(crate) async fn create_hypercore_with_data(
1139        length: u64,
1140    ) -> Result<Hypercore, HypercoreError> {
1141        let signing_key = generate_signing_key();
1142        create_hypercore_with_data_and_key_pair(
1143            length,
1144            PartialKeypair {
1145                public: signing_key.verifying_key(),
1146                secret: Some(signing_key),
1147            },
1148        )
1149        .await
1150    }
1151
1152    pub(crate) async fn create_hypercore_with_data_and_key_pair(
1153        length: u64,
1154        key_pair: PartialKeypair,
1155    ) -> Result<Hypercore, HypercoreError> {
1156        let storage = Storage::new_memory().await?;
1157        let mut hypercore = Hypercore::new(
1158            storage,
1159            HypercoreOptions {
1160                key_pair: Some(key_pair),
1161                open: false,
1162                #[cfg(feature = "cache")]
1163                node_cache_options: None,
1164            },
1165        )
1166        .await?;
1167        for i in 0..length {
1168            hypercore.append(format!("#{}", i).as_bytes()).await?;
1169        }
1170        Ok(hypercore)
1171    }
1172}