1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
use crate::{
    indexer::{ChainSegment, ChainSegmentIncompatibility},
    utils::Context,
};
use chainhook_types::{
    BitcoinBlockData, BitcoinChainEvent, BitcoinChainUpdatedWithBlocksData,
    BitcoinChainUpdatedWithReorgData, BlockIdentifier,
};
use hiro_system_kit::slog;
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};

pub struct BitcoinBlockPool {
    canonical_fork_id: usize,
    orphans: BTreeSet<BlockIdentifier>,
    block_store: HashMap<BlockIdentifier, BitcoinBlockData>,
    forks: BTreeMap<usize, ChainSegment>,
}

impl BitcoinBlockPool {
    pub fn new() -> BitcoinBlockPool {
        let mut forks = BTreeMap::new();
        forks.insert(0, ChainSegment::new());
        BitcoinBlockPool {
            canonical_fork_id: 0,
            block_store: HashMap::new(),
            orphans: BTreeSet::new(),
            forks,
        }
    }

    pub fn process_block(
        &mut self,
        block: BitcoinBlockData,
        ctx: &Context,
    ) -> Result<Option<BitcoinChainEvent>, String> {
        ctx.try_log(|logger| {
            slog::info!(
                logger,
                "Start processing Bitcoin {}",
                block.block_identifier
            )
        });

        // Keep block data in memory
        let existing_entry = self
            .block_store
            .insert(block.block_identifier.clone(), block.clone());
        if existing_entry.is_some() {
            ctx.try_log(|logger| {
                slog::warn!(
                    logger,
                    "Bitcoin {} has already been processed",
                    block.block_identifier
                )
            });
            return Ok(None);
        }

        for (i, fork) in self.forks.iter() {
            ctx.try_log(|logger| slog::info!(logger, "Active fork {}: {}", i, fork));
        }
        // Retrieve previous canonical fork
        let previous_canonical_fork_id = self.canonical_fork_id;
        let previous_canonical_fork = match self.forks.get(&previous_canonical_fork_id) {
            Some(fork) => fork.clone(),
            None => {
                ctx.try_log(|logger| {
                    slog::error!(logger, "unable to retrieve previous bitcoin fork")
                });
                return Ok(None);
            }
        };

        let mut fork_updated = None;
        for (_, fork) in self.forks.iter_mut() {
            let (block_appended, mut new_fork) = fork.try_append_block(&block, ctx);
            if block_appended {
                if let Some(new_fork) = new_fork.take() {
                    let fork_id = self.forks.len();
                    self.forks.insert(fork_id, new_fork);
                    fork_updated = self.forks.get_mut(&fork_id);
                } else {
                    fork_updated = Some(fork);
                }
                // A block can only be added to one segment
                break;
            }
        }

        let fork_updated = match fork_updated.take() {
            Some(fork) => {
                ctx.try_log(|logger| {
                    slog::debug!(
                        logger,
                        "Bitcoin {} successfully appended to {}",
                        block.block_identifier,
                        fork
                    )
                });
                fork
            }
            None => {
                ctx.try_log(|logger| {
                    slog::debug!(
                        logger,
                        "Unable to process Bitcoin {} - inboxed for later",
                        block.block_identifier
                    )
                });
                self.orphans.insert(block.block_identifier.clone());
                return Ok(None);
            }
        };

        // Process former orphans
        let orphans = self.orphans.clone();
        let mut orphans_to_untrack = HashSet::new();

        let mut at_least_one_orphan_appended = true;
        // As long as we are successful appending blocks that were previously unprocessable,
        // Keep looping on this backlog
        let mut applied = HashSet::new();
        let mut forks_created = vec![];
        while at_least_one_orphan_appended {
            at_least_one_orphan_appended = false;
            for orphan_block_identifier in orphans.iter() {
                if applied.contains(orphan_block_identifier) {
                    continue;
                }
                let block = match self.block_store.get(orphan_block_identifier) {
                    Some(block) => block.clone(),
                    None => continue,
                };

                let (orphan_appended, mut new_fork) = fork_updated.try_append_block(&block, ctx);
                if orphan_appended {
                    applied.insert(orphan_block_identifier);
                    orphans_to_untrack.insert(orphan_block_identifier);
                    if let Some(new_fork) = new_fork.take() {
                        forks_created.push(new_fork);
                    }
                }
                at_least_one_orphan_appended = at_least_one_orphan_appended || orphan_appended;
            }
        }

        // Update orphans
        for orphan in orphans_to_untrack.into_iter() {
            ctx.try_log(|logger| slog::info!(logger, "Dequeuing orphan {}", orphan));
            self.orphans.remove(orphan);
        }

        // Select canonical fork
        let mut canonical_fork_id = 0;
        let mut highest_height = 0;
        for (fork_id, fork) in self.forks.iter() {
            ctx.try_log(|logger| slog::info!(logger, "Active fork: {} - {}", fork_id, fork));
            if fork.get_length() >= highest_height {
                highest_height = fork.get_length();
                canonical_fork_id = *fork_id;
            }
        }
        ctx.try_log(|logger| {
            slog::info!(
                logger,
                "Active fork selected as canonical: {}",
                canonical_fork_id
            )
        });

        self.canonical_fork_id = canonical_fork_id;
        // Generate chain event from the previous and current canonical forks
        let canonical_fork = self.forks.get(&canonical_fork_id).unwrap().clone();
        if canonical_fork.eq(&previous_canonical_fork) {
            ctx.try_log(|logger| slog::info!(logger, "Canonical fork unchanged"));
            return Ok(None);
        }

        let res = self.generate_block_chain_event(&canonical_fork, &previous_canonical_fork, ctx);
        let mut chain_event = match res {
            Ok(chain_event) => chain_event,
            Err(ChainSegmentIncompatibility::ParentBlockUnknown) => {
                self.canonical_fork_id = previous_canonical_fork_id;
                return Ok(None);
            }
            _ => return Ok(None),
        };

        self.collect_and_prune_confirmed_blocks(&mut chain_event, ctx);

        Ok(Some(chain_event))
    }

    pub fn collect_and_prune_confirmed_blocks(
        &mut self,
        chain_event: &mut BitcoinChainEvent,
        ctx: &Context,
    ) {
        let (tip, confirmed_blocks) = match chain_event {
            BitcoinChainEvent::ChainUpdatedWithBlocks(ref mut event) => {
                match event.new_blocks.last() {
                    Some(tip) => (tip.block_identifier.clone(), &mut event.confirmed_blocks),
                    None => return,
                }
            }
            BitcoinChainEvent::ChainUpdatedWithReorg(ref mut event) => {
                match event.blocks_to_apply.last() {
                    Some(tip) => (tip.block_identifier.clone(), &mut event.confirmed_blocks),
                    None => return,
                }
            }
        };

        let mut forks_to_prune = vec![];
        let mut ancestor_identifier = &tip;

        // Retrieve the whole canonical segment present in memory, ascending order
        // [1] ... [6] [7]
        let canonical_segment = {
            let mut segment = vec![];
            while let Some(ancestor) = self.block_store.get(&ancestor_identifier) {
                ancestor_identifier = &ancestor.parent_block_identifier;
                segment.push(ancestor.block_identifier.clone());
            }
            segment
        };
        if canonical_segment.len() < 7 {
            return;
        }
        // Any block beyond 6th ancestor is considered as confirmed and can be pruned
        let cut_off = &canonical_segment[5];

        // Prune forks using the confirmed block
        let mut blocks_to_prune = vec![];
        for (fork_id, fork) in self.forks.iter_mut() {
            let mut res = fork.prune_confirmed_blocks(&cut_off);
            blocks_to_prune.append(&mut res);
            if fork.block_ids.is_empty() {
                forks_to_prune.push(*fork_id);
            }
        }

        // Prune orphans using the confirmed block
        let iter = self.orphans.clone().into_iter();
        for orphan in iter {
            if orphan.index < cut_off.index {
                self.orphans.remove(&orphan);
                blocks_to_prune.push(orphan);
            }
        }

        for confirmed_block in canonical_segment[6..].into_iter() {
            let block = match self.block_store.remove(confirmed_block) {
                None => {
                    ctx.try_log(|logger| {
                        slog::error!(logger, "unable to retrieve data for {}", confirmed_block)
                    });
                    return;
                }
                Some(block) => block,
            };
            confirmed_blocks.push(block);
        }

        // Prune data
        for block_to_prune in blocks_to_prune {
            self.block_store.remove(&block_to_prune);
        }
        for fork_id in forks_to_prune {
            self.forks.remove(&fork_id);
        }
        confirmed_blocks.reverse();
    }

    pub fn generate_block_chain_event(
        &mut self,
        canonical_segment: &ChainSegment,
        other_segment: &ChainSegment,
        ctx: &Context,
    ) -> Result<BitcoinChainEvent, ChainSegmentIncompatibility> {
        if other_segment.is_empty() {
            let mut new_blocks = vec![];
            for i in 0..canonical_segment.block_ids.len() {
                let block_identifier =
                    &canonical_segment.block_ids[canonical_segment.block_ids.len() - 1 - i];
                let block = match self.block_store.get(block_identifier) {
                    Some(block) => block.clone(),
                    None => {
                        ctx.try_log(|logger| {
                            slog::error!(
                                logger,
                                "unable to retrive Bitcoin {} from block store",
                                block_identifier
                            )
                        });
                        return Err(ChainSegmentIncompatibility::Unknown);
                    }
                };
                new_blocks.push(block)
            }
            return Ok(BitcoinChainEvent::ChainUpdatedWithBlocks(
                BitcoinChainUpdatedWithBlocksData {
                    new_blocks,
                    confirmed_blocks: vec![],
                },
            ));
        }
        if let Ok(divergence) = canonical_segment.try_identify_divergence(other_segment, false, ctx)
        {
            if divergence.block_ids_to_rollback.is_empty() {
                let mut new_blocks = vec![];
                for i in 0..divergence.block_ids_to_apply.len() {
                    let block_identifier = &divergence.block_ids_to_apply[i];
                    let block = match self.block_store.get(block_identifier) {
                        Some(block) => block.clone(),
                        None => panic!("unable to retrive block from block store"),
                    };
                    new_blocks.push(block)
                }
                return Ok(BitcoinChainEvent::ChainUpdatedWithBlocks(
                    BitcoinChainUpdatedWithBlocksData {
                        new_blocks,
                        confirmed_blocks: vec![],
                    },
                ));
            } else {
                return Ok(BitcoinChainEvent::ChainUpdatedWithReorg(
                    BitcoinChainUpdatedWithReorgData {
                        blocks_to_rollback: divergence
                            .block_ids_to_rollback
                            .iter()
                            .map(|block_id| {
                                let block = match self.block_store.get(block_id) {
                                    Some(block) => block.clone(),
                                    None => panic!("unable to retrive block from block store"),
                                };
                                block
                            })
                            .collect::<Vec<_>>(),
                        blocks_to_apply: divergence
                            .block_ids_to_apply
                            .iter()
                            .map(|block_id| {
                                let block = match self.block_store.get(block_id) {
                                    Some(block) => block.clone(),
                                    None => panic!("unable to retrive block from block store"),
                                };
                                block
                            })
                            .collect::<Vec<_>>(),
                        confirmed_blocks: vec![],
                    },
                ));
            }
        }
        ctx.try_log(|logger| {
            slog::debug!(
                logger,
                "Unable to infer chain event out of {} and {}",
                canonical_segment,
                other_segment
            )
        });
        Err(ChainSegmentIncompatibility::ParentBlockUnknown)
    }
}