exocore-chain 0.1.24

Storage of Exocore (Distributed applications framework)
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
use std::{
    collections::btree_map::BTreeMap,
    io::{Read, Write},
    ops::Range,
    path::{Path, PathBuf},
    sync::Arc,
};

use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use exocore_core::simple_store::{json_disk_store::JsonDiskStore, SimpleStore};
use exocore_protos::generated::data_chain_capnp::block_header;
use extindex::{Builder, Reader, Serializable};
use itertools::Itertools;
use serde::{Deserialize, Serialize};

use super::{DirectoryChainStoreConfig, DirectoryError};
use crate::{
    block::{Block, BlockOffset},
    chain::Error,
    operation::OperationId,
};

/// Operation ID to Block offset index. This is used to retrieve the block
/// offset in which a given operation ID has been stored.
///
/// This index has a in-memory buffer, and is flushed to disk into `extindex`
/// immutable index files.
///
/// The in-memory portion of it may be lost if it hadn't been flush. The chain
/// directory make sure that the chain is properly indexed when its initializing
/// using the `next_expected_offset` value.
///
/// The index maintains the list of persisted index in a "Metadata" file.
pub struct OperationIndex {
    config: DirectoryChainStoreConfig,
    directory: PathBuf,

    metadata_store: JsonDiskStore<Metadata>,

    memory_offset_from: BlockOffset,
    memory_index: BTreeMap<OperationId, BlockOffset>,

    next_expected_offset: BlockOffset,

    stored_indices: Vec<StoredIndex>,
}

impl OperationIndex {
    /// Creates a new operation index that will be stored in given directory.
    pub fn create(
        config: DirectoryChainStoreConfig,
        directory_path: &Path,
    ) -> Result<OperationIndex, Error> {
        let metadata_path = Metadata::file_path(directory_path);
        let metadata_store = JsonDiskStore::<Metadata>::new(&metadata_path).map_err(|err| {
            Error::new_io(
                err,
                format!(
                    "Error creating operation index metadata file {:?}",
                    metadata_path
                ),
            )
        })?;

        let operation_index = OperationIndex {
            config,
            directory: directory_path.to_path_buf(),

            metadata_store,

            memory_offset_from: 0,
            memory_index: BTreeMap::new(),

            next_expected_offset: 0,

            stored_indices: vec![],
        };

        // we write even if it's empty because `open` expects it to exist
        operation_index.write_metadata()?;

        Ok(operation_index)
    }

    /// Open an existing operation index stored in given directory.
    pub fn open(
        config: DirectoryChainStoreConfig,
        directory_path: &Path,
    ) -> Result<OperationIndex, Error> {
        let metadata_path = Metadata::file_path(directory_path);
        let metadata_store = JsonDiskStore::<Metadata>::new(&metadata_path).map_err(|err| {
            Error::new_io(
                err,
                format!(
                    "Error creating operation index metadata file {:?}",
                    metadata_path
                ),
            )
        })?;

        let metadata = metadata_store
            .read()
            .map_err(|err| {
                Error::new_io(
                    err,
                    format!(
                        "Error reading operation index metadata file {:?}",
                        metadata_path
                    ),
                )
            })?
            .ok_or_else(|| {
                Error::UnexpectedState(anyhow!("Operation index metadata file didn't exist"))
            })?;

        let mut stored_indices = Vec::new();
        for index_file_metadata in metadata.files.iter() {
            let index_file_path = directory_path.join(&index_file_metadata.file_name);
            let index_reader = Reader::open(index_file_path)
                .map_err(|err| DirectoryError::OperationIndexRead(Arc::new(err)))?;

            stored_indices.push(StoredIndex {
                range: index_file_metadata.offset_from..index_file_metadata.offset_to,
                index_reader,
            });
        }

        // the next expected offset is the upper bound (excluded) of the last segment we
        // indexed
        let next_expected_offset = stored_indices.last().map_or(0, |index| index.range.end);

        // we have nothing in memory, so memory index is from next expected offset
        let memory_offset_from = next_expected_offset;
        let memory_index = BTreeMap::new();

        Ok(OperationIndex {
            config,
            directory: directory_path.to_path_buf(),

            metadata_store,

            memory_offset_from,
            memory_index,
            next_expected_offset,
            stored_indices,
        })
    }

    /// Returns the offset that we expect the next block to have. This can be
    /// used to know which operations are missing and need to be re-indexed.
    pub fn next_expected_block_offset(&self) -> BlockOffset {
        self.next_expected_offset
    }

    /// Indexes an iterator of blocks. There is no guarantee that they will be
    /// actually stored to disk if they can still fit in the in-memory
    /// index.
    pub fn index_blocks<I: Iterator<Item = Result<B, Error>>, B: Block>(
        &mut self,
        iterator: I,
    ) -> Result<(), Error> {
        for block in iterator {
            let block = block?;
            if block.offset() >= self.memory_offset_from {
                self.index_block(&block)?;
            }
        }

        Ok(())
    }

    /// Indexes a block. There is no guarantee that it will be actually stored
    /// if it can still fit in the in-memory index.
    pub fn index_block<B: Block>(&mut self, block: &B) -> Result<(), Error> {
        if self.next_expected_offset != block.offset() {
            return Err(Error::Integrity(anyhow!(
                "Tried to index operations from a block with unexpected offset: block={} != expected={}",
                block.offset(),
                self.next_expected_offset
            )));
        }

        let block_header_reader: block_header::Reader = block
            .header()
            .get_reader()
            .map_err(|err| Error::Block(err.into()))?;

        // we add the operation that lead to the block proposal
        let block_propose_op_id = block_header_reader.get_proposed_operation_id();
        self.put_operation_block(block_propose_op_id, block.offset());

        // we add all operations that are in the block
        for operation in block.operations_iter()? {
            let operation_reader = operation.get_reader()?;
            self.put_operation_block(operation_reader.get_operation_id(), block.offset());
        }

        self.next_expected_offset = block.next_offset();

        self.maybe_flush_to_disk()?;

        Ok(())
    }

    /// Retrieves the block offset in which a given operation was stored.
    pub fn get_operation_block(
        &self,
        operation_id: OperationId,
    ) -> Result<Option<BlockOffset>, Error> {
        if let Some(block_offset) = self.memory_index.get(&operation_id) {
            return Ok(Some(*block_offset));
        }

        let needle = StoredIndexKey { operation_id };
        for index in self.stored_indices.iter() {
            let opt_entry = index
                .index_reader
                .find(&needle)
                .map_err(|err| DirectoryError::OperationIndexRead(Arc::new(err)))?;

            if let Some(entry) = opt_entry {
                return Ok(Some(entry.value().offset));
            }
        }

        Ok(None)
    }

    /// Truncates the index from the given offset. Because of the nature of the
    /// immutable underlying indices, we cannot delete from the exact
    /// offset.

    /// Therefor, we expect `index_blocks` to be called right after to index any
    /// missing blocks that we over-truncated. The
    /// `next_expected_block_offset` method can be used to know from which
    /// offset we need to re-index from.
    pub fn truncate_from_offset(&mut self, from_offset: BlockOffset) -> Result<(), Error> {
        if from_offset >= self.memory_offset_from {
            self.memory_index.clear();
            self.next_expected_offset = self.memory_offset_from;
        } else {
            let mut previous_indices = Vec::new();
            std::mem::swap(&mut self.stored_indices, &mut previous_indices);

            for index in previous_indices {
                if index.range.end >= from_offset {
                    self.next_expected_offset = self.next_expected_offset.min(index.range.start);

                    let index_path = StoredIndex::file_path(&self.directory, &index.range);
                    let _ = std::fs::remove_file(index_path);
                } else {
                    self.stored_indices.push(index);
                }
            }

            self.next_expected_offset = self
                .stored_indices
                .last()
                .map_or(0, |index| index.range.end);
            self.memory_offset_from = self.next_expected_offset;
        }

        self.write_metadata()?;

        Ok(())
    }

    /// Inserts a single operation in the in-memory index
    fn put_operation_block(&mut self, operation_id: OperationId, block_offset: BlockOffset) {
        self.memory_index.insert(operation_id, block_offset);
    }

    /// Checks the size of the in-memory index and flush it to disk if it
    /// exceeds configured maximum.
    fn maybe_flush_to_disk(&mut self) -> Result<(), Error> {
        if self.memory_index.len() > self.config.operation_index_max_memory_items {
            debug!(
                "Storing in-memory index of operations to disk ({} items)",
                self.memory_index.len()
            );

            let from_offset = self.memory_offset_from;
            let to_offset = self.next_expected_offset;
            let range = from_offset..to_offset;
            let index_file = StoredIndex::file_path(&self.directory, &range);

            // build the index from in-memory index, which is already sorted because it's in
            // a tree
            let ops_count = self.memory_index.len() as u64;
            let ops_iter = self.memory_index.iter().map(|(operation_id, offset)| {
                let key = StoredIndexKey {
                    operation_id: *operation_id,
                };
                let value = StoredIndexValue { offset: *offset };

                extindex::Entry::new(key, value)
            });
            let index_builder =
                Builder::<StoredIndexKey, StoredIndexValue>::new(index_file.clone());
            index_builder
                .build_from_sorted(ops_iter, ops_count)
                .map_err(|err| DirectoryError::OperationIndexBuild(Arc::new(err)))?;

            // open the index we just created
            let index_reader = Reader::open(index_file)
                .map_err(|err| DirectoryError::OperationIndexRead(Arc::new(err)))?;
            let stored_index = StoredIndex {
                range,
                index_reader,
            };
            self.stored_indices.push(stored_index);

            self.write_metadata()?;

            // memory index now starts at next expected offset
            self.memory_offset_from = self.next_expected_offset;
            self.memory_index.clear();
        }

        Ok(())
    }

    /// Writes metadata to disk
    fn write_metadata(&self) -> Result<(), Error> {
        let files = self
            .stored_indices
            .iter()
            .map(|index| {
                let file_name = StoredIndex::file_name(&index.range);
                MetadataIndexFile {
                    offset_from: index.range.start,
                    offset_to: index.range.end,
                    file_name,
                }
            })
            .collect_vec();
        let metadata = Metadata { files };

        self.metadata_store
            .write(&metadata)
            .map_err(|err| Error::new_io(err, "Error storing into operation index metadata file"))
    }
}

/// Represents an immutable on-disk index for a given range of offsets.
struct StoredIndex {
    range: Range<BlockOffset>,
    index_reader: Reader<StoredIndexKey, StoredIndexValue>,
}

impl StoredIndex {
    fn file_path(directory: &Path, range: &Range<BlockOffset>) -> PathBuf {
        directory.join(Self::file_name(range))
    }

    fn file_name(range: &Range<BlockOffset>) -> String {
        format!("opsidx_{}.bin", range.start)
    }
}

/// Metadata stored on disk to describe segments of the block that are indexed.
#[derive(Serialize, Deserialize)]
struct Metadata {
    files: Vec<MetadataIndexFile>,
}

impl Metadata {
    fn file_path(directory: &Path) -> PathBuf {
        directory.join("ops_idx.json")
    }
}

#[derive(Serialize, Deserialize)]
struct MetadataIndexFile {
    offset_from: BlockOffset,
    offset_to: BlockOffset,
    file_name: String,
}

/// Wraps the key stored in the on-disk index.
/// This is needed for encoding / decoding.
#[derive(PartialEq, Eq, PartialOrd, Ord)]
struct StoredIndexKey {
    operation_id: OperationId,
}

impl Serializable for StoredIndexKey {
    fn size(&self) -> Option<usize> {
        Some(8) // u64
    }

    fn serialize<W: Write>(&self, write: &mut W) -> Result<(), std::io::Error> {
        write.write_u64::<LittleEndian>(self.operation_id)
    }

    fn deserialize<R: Read>(data: &mut R, _size: usize) -> Result<StoredIndexKey, std::io::Error> {
        let operation_id = data.read_u64::<LittleEndian>()?;
        Ok(StoredIndexKey { operation_id })
    }
}

/// Wraps the value stored in the on-disk index.
/// This is needed for encoding / decoding.
struct StoredIndexValue {
    offset: BlockOffset,
}

impl Serializable for StoredIndexValue {
    fn size(&self) -> Option<usize> {
        Some(8) // u64
    }

    fn serialize<W: Write>(&self, write: &mut W) -> Result<(), std::io::Error> {
        write.write_u64::<LittleEndian>(self.offset)
    }

    fn deserialize<R: Read>(
        data: &mut R,
        _size: usize,
    ) -> Result<StoredIndexValue, std::io::Error> {
        let offset = data.read_u64::<LittleEndian>()?;
        Ok(StoredIndexValue { offset })
    }
}

#[cfg(test)]
mod tests {
    use exocore_core::cell::{FullCell, LocalNode};

    use super::*;
    use crate::chain::directory::tests::create_block;

    #[test]
    fn create_from_iterator() -> anyhow::Result<()> {
        let local_node = LocalNode::generate();
        let cell = FullCell::generate(local_node)?;
        let dir = tempfile::tempdir()?;
        let config = DirectoryChainStoreConfig {
            operation_index_max_memory_items: 100,
            ..DirectoryChainStoreConfig::default()
        };

        let mut index = OperationIndex::create(config, dir.path())?;
        let generated_ops = generate_index_blocks(&cell, &mut index, 0, 1000)?;

        // 19 because there is 2 ops per block (block itself + op inside)
        assert_eq!(19, index.stored_indices.len());

        // make sure we can find all stored operations
        for (op, offset) in &generated_ops {
            assert_eq!(Some(*offset), index.get_operation_block(*op)?);
        }

        // try to find a missing operation
        assert_eq!(None, index.get_operation_block(435_874_985)?);

        Ok(())
    }

    #[test]
    fn open_existing() -> anyhow::Result<()> {
        let local_node = LocalNode::generate();
        let cell = FullCell::generate(local_node)?;
        let dir = tempfile::tempdir()?;
        let config = DirectoryChainStoreConfig {
            operation_index_max_memory_items: 100,
            ..DirectoryChainStoreConfig::default()
        };

        let (memory_offset_from, generated_ops) = {
            let mut index = OperationIndex::create(config, dir.path())?;
            let generated_ops = generate_index_blocks(&cell, &mut index, 0, 1000)?;
            (index.memory_offset_from, generated_ops)
        };

        let mut index = OperationIndex::open(config, dir.path())?;

        // all data that was previously stored in memory is lost
        assert_eq!(memory_offset_from, index.memory_offset_from);
        assert_eq!(memory_offset_from, index.next_expected_block_offset());

        assert_eq!(19, index.stored_indices.len());

        // make sure we can find all stored operations
        for (op, offset) in &generated_ops {
            if *offset < memory_offset_from {
                assert_eq!(Some(*offset), index.get_operation_block(*op)?);
            }
        }

        // we append some more operations, we expect all of them to be there
        let new_ops = generate_index_blocks(&cell, &mut index, memory_offset_from, 200)?;
        for (op, offset) in &new_ops {
            assert_eq!(Some(*offset), index.get_operation_block(*op)?);
        }
        assert_eq!(22, index.stored_indices.len());

        Ok(())
    }

    #[test]
    fn truncate_from_offset_memory() -> anyhow::Result<()> {
        let local_node = LocalNode::generate();
        let cell = FullCell::generate(local_node)?;
        let dir = tempfile::tempdir()?;
        let config = DirectoryChainStoreConfig {
            operation_index_max_memory_items: 100,
            ..DirectoryChainStoreConfig::default()
        };

        let mut index = OperationIndex::create(config, dir.path())?;
        generate_index_blocks(&cell, &mut index, 0, 1000)?;

        let files_count_before = index.stored_indices.len();
        index.truncate_from_offset(index.memory_offset_from)?;
        assert_eq!(index.memory_offset_from, index.next_expected_offset);
        assert_eq!(index.stored_indices.len(), files_count_before);

        Ok(())
    }

    #[test]
    fn truncate_from_offset_disk() -> anyhow::Result<()> {
        let local_node = LocalNode::generate();
        let cell = FullCell::generate(local_node)?;
        let dir = tempfile::tempdir()?;
        let config = DirectoryChainStoreConfig {
            operation_index_max_memory_items: 100,
            ..DirectoryChainStoreConfig::default()
        };

        let next_expected_offset = {
            let mut index = OperationIndex::create(config, dir.path())?;
            let generated_ops = generate_index_blocks(&cell, &mut index, 0, 1000)?;

            let operation_ids = generated_ops.keys().collect_vec();
            let middle_block_offset = generated_ops[operation_ids[operation_ids.len() / 2]];

            let files_count_before = index.stored_indices.len();
            index.truncate_from_offset(middle_block_offset)?;

            assert!(index.next_expected_offset <= middle_block_offset);
            assert_eq!(index.memory_offset_from, index.next_expected_offset);
            assert!(index.stored_indices.len() <= files_count_before / 2);

            index.next_expected_offset
        };

        {
            let index = OperationIndex::open(config, dir.path())?;
            assert_eq!(next_expected_offset, index.next_expected_offset);
        }

        Ok(())
    }

    fn generate_index_blocks(
        full_cell: &FullCell,
        index: &mut OperationIndex,
        from_offset: BlockOffset,
        count: usize,
    ) -> Result<BTreeMap<OperationId, BlockOffset>, Error> {
        let mut generated_ops = BTreeMap::new();

        let mut next_offset = from_offset;
        let blocks_iter = (0..count).map(|_i| {
            // create_block will use offset as proposed operation id and will create 1 op
            // inside
            let block = create_block(full_cell, next_offset);
            generated_ops.insert(next_offset, next_offset);
            generated_ops.insert(next_offset + 1, next_offset);

            next_offset = block.next_offset();
            Ok(block)
        });

        index.index_blocks(blocks_iter)?;

        Ok(generated_ops)
    }
}