subxt 0.50.0

Interact with Substrate based chains on the Polkadot Network
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
// Copyright 2019-2026 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.

use super::ChainHeadRpcMethods;
use super::follow_stream::FollowStream;
use crate::config::{Config, Hash, HashFor, RpcConfigFor};
use crate::error::BackendError;
use futures::stream::{FuturesUnordered, Stream, StreamExt};
use subxt_rpcs::methods::chain_head::{
    BestBlockChanged, Finalized, FollowEvent, Initialized, NewBlock,
};

use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};

/// The type of stream item.
pub use super::follow_stream::FollowStreamMsg;

/// A `Stream` which builds on `FollowStream`, and handles pinning. It replaces any block hash seen in
/// the follow events with a `BlockRef` which, when all clones are dropped, will lead to an "unpin" call
/// for that block hash being queued. It will also automatically unpin any blocks that exceed a given max
/// age, to try and prevent the underlying stream from ending (and _all_ blocks from being unpinned as a
/// result). Put simply, it tries to keep every block pinned as long as possible until the block is no longer
/// used anywhere.
#[derive(Debug)]
pub struct FollowStreamUnpin<H: Hash> {
    // The underlying stream of events.
    inner: FollowStream<H>,
    // A method to call to unpin a block, given a block hash and a subscription ID.
    unpin_method: UnpinMethodHolder<H>,
    // Futures for sending unpin events that we'll poll to completion as
    // part of polling the stream as a whole.
    unpin_futs: FuturesUnordered<UnpinFut>,
    // Each time a new finalized block is seen, we give it an age of `next_rel_block_age`,
    // and then increment this ready for the next finalized block. So, the first finalized
    // block will have an age of 0, the next 1, 2, 3 and so on. We can then use `max_block_life`
    // to say "unpin all blocks with an age < (next_rel_block_age-1) - max_block_life".
    next_rel_block_age: usize,
    // The latest ID of the FollowStream subscription, which we can use
    // to unpin blocks.
    subscription_id: Option<Arc<str>>,
    // The longest period a block can be pinned for.
    max_block_life: usize,
    // The currently seen and pinned blocks.
    pinned: HashMap<H, PinnedDetails<H>>,
    // Shared state about blocks we've flagged to unpin from elsewhere
    unpin_flags: UnpinFlags<H>,
}

// Just a wrapper to make implementing debug on the whole thing easier.
struct UnpinMethodHolder<H>(UnpinMethod<H>);
impl<H> std::fmt::Debug for UnpinMethodHolder<H> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(
            f,
            "UnpinMethodHolder(Box<dyn FnMut(Hash, Arc<str>) -> UnpinFut>)"
        )
    }
}

/// The type of the unpin method that we need to provide.
pub type UnpinMethod<H> = Box<dyn FnMut(H, Arc<str>) -> UnpinFut + Send>;

/// The future returned from [`UnpinMethod`].
pub type UnpinFut = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;

impl<H: Hash> std::marker::Unpin for FollowStreamUnpin<H> {}

impl<H: Hash> Stream for FollowStreamUnpin<H> {
    type Item = Result<FollowStreamMsg<BlockRef<H>>, BackendError>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut this = self.as_mut();

        loop {
            // Poll any queued unpin tasks.
            let unpin_futs_are_pending = match this.unpin_futs.poll_next_unpin(cx) {
                Poll::Ready(Some(())) => continue,
                Poll::Ready(None) => false,
                Poll::Pending => true,
            };

            // Poll the inner stream for the next event.
            let Poll::Ready(ev) = this.inner.poll_next_unpin(cx) else {
                return Poll::Pending;
            };

            let Some(ev) = ev else {
                // if the stream is done, but `unpin_futs` are still pending, then
                // return pending here so that they are still driven to completion.
                // Else, return `Ready(None)` to signal nothing left to do.
                return match unpin_futs_are_pending {
                    true => Poll::Pending,
                    false => Poll::Ready(None),
                };
            };

            // Error? just return it and do nothing further.
            let ev = match ev {
                Ok(ev) => ev,
                Err(e) => {
                    return Poll::Ready(Some(Err(e)));
                }
            };

            // React to any actual FollowEvent we get back.
            let ev = match ev {
                FollowStreamMsg::Ready(subscription_id) => {
                    // update the subscription ID we'll use to unpin things.
                    this.subscription_id = Some(subscription_id.clone().into());

                    FollowStreamMsg::Ready(subscription_id)
                }
                FollowStreamMsg::Event(FollowEvent::Initialized(details)) => {
                    let mut finalized_block_hashes =
                        Vec::with_capacity(details.finalized_block_hashes.len());

                    // Pin each of the finalized blocks. None of them will show up again (except as a
                    // parent block), and so they can all be unpinned immediately at any time. Increment
                    // the block age for each one, so that older finalized blocks are pruned first.
                    for finalized_block in &details.finalized_block_hashes {
                        let rel_block_age = this.next_rel_block_age;
                        let block_ref =
                            this.pin_unpinnable_block_at(rel_block_age, *finalized_block);

                        finalized_block_hashes.push(block_ref);
                        this.next_rel_block_age += 1;
                    }

                    FollowStreamMsg::Event(FollowEvent::Initialized(Initialized {
                        finalized_block_hashes,
                        finalized_block_runtime: details.finalized_block_runtime,
                    }))
                }
                FollowStreamMsg::Event(FollowEvent::NewBlock(details)) => {
                    // One bigger than our parent, and if no parent seen (maybe it was
                    // unpinned already), then one bigger than the last finalized block num
                    // as a best guess.
                    let parent_rel_block_age = this
                        .pinned
                        .get(&details.parent_block_hash)
                        .map(|p| p.rel_block_age)
                        .unwrap_or(this.next_rel_block_age.saturating_sub(1));

                    let block_ref = this.pin_block_at(parent_rel_block_age + 1, details.block_hash);
                    let parent_block_ref =
                        this.pin_block_at(parent_rel_block_age, details.parent_block_hash);

                    FollowStreamMsg::Event(FollowEvent::NewBlock(NewBlock {
                        block_hash: block_ref,
                        parent_block_hash: parent_block_ref,
                        new_runtime: details.new_runtime,
                    }))
                }
                FollowStreamMsg::Event(FollowEvent::BestBlockChanged(details)) => {
                    // We expect this block to already exist, so it'll keep its existing block_num,
                    // but worst case it'll just get the current finalized block_num + 1.
                    let rel_block_age = this.next_rel_block_age;
                    let block_ref = this.pin_block_at(rel_block_age, details.best_block_hash);

                    FollowStreamMsg::Event(FollowEvent::BestBlockChanged(BestBlockChanged {
                        best_block_hash: block_ref,
                    }))
                }
                FollowStreamMsg::Event(FollowEvent::Finalized(details)) => {
                    let finalized_block_refs: Vec<_> = details
                        .finalized_block_hashes
                        .into_iter()
                        .enumerate()
                        .map(|(idx, hash)| {
                            // These blocks _should_ exist already and so will have a known block num,
                            // but if they don't, we just increment the num from the last finalized block
                            // we saw, which should be accurate.
                            //
                            // `pin_unpinnable_block_at` indicates that the block will not show up in future events
                            // (They will show up as a parent block, but we don't care about that right now).
                            let rel_block_age = this.next_rel_block_age + idx;
                            this.pin_unpinnable_block_at(rel_block_age, hash)
                        })
                        .collect();

                    // Our relative block height is increased by however many finalized
                    // blocks we've seen.
                    this.next_rel_block_age += finalized_block_refs.len();

                    let pruned_block_refs: Vec<_> = details
                        .pruned_block_hashes
                        .into_iter()
                        .map(|hash| {
                            // We should know about these, too, and if not we set their age to last_finalized + 1.
                            //
                            // `pin_unpinnable_block_at` indicates that the block will not show up in future events.
                            let rel_block_age = this.next_rel_block_age;
                            this.pin_unpinnable_block_at(rel_block_age, hash)
                        })
                        .collect();

                    // At this point, we also check to see which blocks we should submit unpin events
                    // for. We will unpin:
                    // - Any block that's older than the max age.
                    // - Any block that has no references left (ie has been dropped) that _also_ has
                    //   showed up in the pruned list in a finalized event (so it will never be in another event).
                    this.unpin_blocks(cx.waker());

                    FollowStreamMsg::Event(FollowEvent::Finalized(Finalized {
                        finalized_block_hashes: finalized_block_refs,
                        pruned_block_hashes: pruned_block_refs,
                    }))
                }
                FollowStreamMsg::Event(FollowEvent::Stop) => {
                    // clear out "old" things that are no longer applicable since
                    // the subscription has ended (a new one will be created under the hood, at
                    // which point we'll get given a new subscription ID.
                    this.subscription_id = None;
                    this.pinned.clear();
                    this.unpin_futs.clear();
                    this.unpin_flags.lock().unwrap().clear();
                    this.next_rel_block_age = 0;

                    FollowStreamMsg::Event(FollowEvent::Stop)
                }
                // These events aren't interesting; we just forward them on:
                FollowStreamMsg::Event(FollowEvent::OperationBodyDone(details)) => {
                    FollowStreamMsg::Event(FollowEvent::OperationBodyDone(details))
                }
                FollowStreamMsg::Event(FollowEvent::OperationCallDone(details)) => {
                    FollowStreamMsg::Event(FollowEvent::OperationCallDone(details))
                }
                FollowStreamMsg::Event(FollowEvent::OperationStorageItems(details)) => {
                    FollowStreamMsg::Event(FollowEvent::OperationStorageItems(details))
                }
                FollowStreamMsg::Event(FollowEvent::OperationWaitingForContinue(details)) => {
                    FollowStreamMsg::Event(FollowEvent::OperationWaitingForContinue(details))
                }
                FollowStreamMsg::Event(FollowEvent::OperationStorageDone(details)) => {
                    FollowStreamMsg::Event(FollowEvent::OperationStorageDone(details))
                }
                FollowStreamMsg::Event(FollowEvent::OperationInaccessible(details)) => {
                    FollowStreamMsg::Event(FollowEvent::OperationInaccessible(details))
                }
                FollowStreamMsg::Event(FollowEvent::OperationError(details)) => {
                    FollowStreamMsg::Event(FollowEvent::OperationError(details))
                }
            };

            // Return our event.
            return Poll::Ready(Some(Ok(ev)));
        }
    }
}

impl<H: Hash> FollowStreamUnpin<H> {
    /// Create a new [`FollowStreamUnpin`].
    pub fn new(
        follow_stream: FollowStream<H>,
        unpin_method: UnpinMethod<H>,
        max_block_life: usize,
    ) -> Self {
        Self {
            inner: follow_stream,
            unpin_method: UnpinMethodHolder(unpin_method),
            max_block_life,
            pinned: Default::default(),
            subscription_id: None,
            next_rel_block_age: 0,
            unpin_flags: Default::default(),
            unpin_futs: Default::default(),
        }
    }

    /// Create a new [`FollowStreamUnpin`] given the RPC methods.
    pub fn from_methods<T: Config>(
        follow_stream: FollowStream<HashFor<T>>,
        methods: ChainHeadRpcMethods<RpcConfigFor<T>>,
        max_block_life: usize,
    ) -> FollowStreamUnpin<HashFor<T>> {
        let unpin_method = Box::new(move |hash: HashFor<T>, sub_id: Arc<str>| {
            let methods = methods.clone();
            let fut: UnpinFut = Box::pin(async move {
                // We ignore any errors trying to unpin at the moment.
                let _ = methods.chainhead_v1_unpin(&sub_id, hash).await;
            });
            fut
        });

        FollowStreamUnpin::new(follow_stream, unpin_method, max_block_life)
    }

    /// Is the block hash currently pinned.
    pub fn is_pinned(&self, hash: &H) -> bool {
        self.pinned.contains_key(hash)
    }

    /// Pin a block, or return the reference to an already-pinned block. If the block has been registered to
    /// be unpinned, we'll clear those flags, so that it won't be unpinned. If the unpin request has already
    /// been sent though, then the block will be unpinned.
    fn pin_block_at(&mut self, rel_block_age: usize, hash: H) -> BlockRef<H> {
        self.pin_block_at_setting_unpinnable_flag(rel_block_age, hash, false)
    }

    /// Pin a block, or return the reference to an already-pinned block.
    ///
    /// This is the same as [`Self::pin_block_at`], except that it also marks the block as being unpinnable now,
    /// which should be done for any block that will no longer be seen in future events.
    fn pin_unpinnable_block_at(&mut self, rel_block_age: usize, hash: H) -> BlockRef<H> {
        self.pin_block_at_setting_unpinnable_flag(rel_block_age, hash, true)
    }

    fn pin_block_at_setting_unpinnable_flag(
        &mut self,
        rel_block_age: usize,
        hash: H,
        can_be_unpinned: bool,
    ) -> BlockRef<H> {
        let entry = self
            .pinned
            .entry(hash)
            // If there's already an entry, then clear any unpin_flags and update the
            // can_be_unpinned status (this can become true but cannot become false again
            // once true).
            .and_modify(|entry| {
                entry.can_be_unpinned = entry.can_be_unpinned || can_be_unpinned;
                self.unpin_flags.lock().unwrap().remove(&hash);
            })
            // If there's not an entry already, make one and return it.
            .or_insert_with(|| PinnedDetails {
                rel_block_age,
                block_ref: BlockRef {
                    inner: Arc::new(BlockRefInner {
                        hash,
                        unpin_flags: self.unpin_flags.clone(),
                    }),
                },
                can_be_unpinned,
            });

        entry.block_ref.clone()
    }

    /// Unpin any blocks that are either too old, or have the unpin flag set and are old enough.
    fn unpin_blocks(&mut self, waker: &Waker) {
        let mut unpin_flags = self.unpin_flags.lock().unwrap();

        // This gets the age of the last finalized block.
        let rel_block_age = self.next_rel_block_age.saturating_sub(1);

        // If we asked to unpin and there was no subscription_id, then there's nothing we can do,
        // and nothing will need unpinning now anyway.
        let Some(sub_id) = &self.subscription_id else {
            return;
        };

        let mut blocks_to_unpin = vec![];
        for (hash, details) in &self.pinned {
            if rel_block_age.saturating_sub(details.rel_block_age) >= self.max_block_life
                || (unpin_flags.contains(hash) && details.can_be_unpinned)
            {
                // The block is too old, or it's been flagged to be unpinned and won't be in a future
                // backend event, so we can unpin it for real now.
                blocks_to_unpin.push(*hash);
                // Clear it from our unpin flags if present so that we don't try to unpin it again.
                unpin_flags.remove(hash);
            }
        }

        // Release our lock on unpin_flags ASAP.
        drop(unpin_flags);

        // No need to call the waker etc if nothing to do:
        if blocks_to_unpin.is_empty() {
            return;
        }

        for hash in blocks_to_unpin {
            self.pinned.remove(&hash);
            let fut = (self.unpin_method.0)(hash, sub_id.clone());
            self.unpin_futs.push(fut);
        }

        // Any new futures pushed above need polling to start. We could
        // just wait for the next stream event, but let's wake the task to
        // have it polled sooner, just in case it's slow to receive things.
        waker.wake_by_ref();
    }
}

// The set of block hashes that can be unpinned when ready.
// BlockRefs write to this when they are dropped.
type UnpinFlags<H> = Arc<Mutex<HashSet<H>>>;

#[derive(Debug)]
struct PinnedDetails<H: Hash> {
    /// Relatively speaking, how old is the block? When we start following
    /// blocks, the first finalized block gets an age of 0, the second an age
    /// of 1 and so on.
    rel_block_age: usize,
    /// A block ref we can hand out to keep blocks pinned.
    /// Because we store one here until it's unpinned, the live count
    /// will only drop to 1 when no external refs are left.
    block_ref: BlockRef<H>,
    /// Has this block showed up in the list of pruned blocks, or has it
    /// been finalized? In this case, it can now been pinned as it won't
    /// show up again in future events (except as a "parent block" of some
    /// new block, which we're currently ignoring).
    can_be_unpinned: bool,
}

/// All blocks reported will be wrapped in this.
#[derive(Debug, Clone)]
pub struct BlockRef<H: Hash> {
    inner: Arc<BlockRefInner<H>>,
}

#[derive(Debug)]
struct BlockRefInner<H> {
    hash: H,
    unpin_flags: UnpinFlags<H>,
}

impl<H: Hash> BlockRef<H> {
    /// For testing purposes only, create a BlockRef from a hash
    /// that isn't pinned.
    #[cfg(test)]
    pub fn new(hash: H) -> Self {
        BlockRef {
            inner: Arc::new(BlockRefInner {
                hash,
                unpin_flags: Default::default(),
            }),
        }
    }

    /// Return the hash for this block.
    pub fn hash(&self) -> H {
        self.inner.hash
    }
}

impl<H: Hash> PartialEq for BlockRef<H> {
    fn eq(&self, other: &Self) -> bool {
        self.inner.hash == other.inner.hash
    }
}

impl<H: Hash> PartialEq<H> for BlockRef<H> {
    fn eq(&self, other: &H) -> bool {
        &self.inner.hash == other
    }
}

impl<H: Hash> Drop for BlockRef<H> {
    fn drop(&mut self) {
        // PinnedDetails keeps one ref, so if this is the second ref, it's the
        // only "external" one left and we should ask to unpin it now. if it's
        // the only ref remaining, it means that it's already been unpinned, so
        // nothing to do here anyway.
        if Arc::strong_count(&self.inner) == 2 {
            if let Ok(mut unpin_flags) = self.inner.unpin_flags.lock() {
                unpin_flags.insert(self.inner.hash);
            }
        }
    }
}

#[cfg(test)]
pub(super) mod test_utils {
    use super::super::follow_stream::{FollowStream, test_utils::test_stream_getter};
    use super::*;
    use crate::config::substrate::H256;

    pub type UnpinRx<H> = std::sync::mpsc::Receiver<(H, Arc<str>)>;

    /// Get a [`FollowStreamUnpin`] from an iterator over events.
    pub fn test_unpin_stream_getter<H, F, I>(
        events: F,
        max_life: usize,
    ) -> (FollowStreamUnpin<H>, UnpinRx<H>)
    where
        H: Hash + 'static,
        F: Fn() -> I + Send + 'static,
        I: IntoIterator<Item = Result<FollowEvent<H>, BackendError>>,
    {
        // Unpin requests will come here so that we can look out for them.
        let (unpin_tx, unpin_rx) = std::sync::mpsc::channel();

        let follow_stream = FollowStream::new(test_stream_getter(events));
        let unpin_method: UnpinMethod<H> = Box::new(move |hash, sub_id| {
            unpin_tx.send((hash, sub_id)).unwrap();
            Box::pin(std::future::ready(()))
        });

        let follow_unpin = FollowStreamUnpin::new(follow_stream, unpin_method, max_life);
        (follow_unpin, unpin_rx)
    }

    /// Assert that the unpinned blocks sent from the `UnpinRx` channel match the items given.
    pub fn assert_from_unpin_rx<H: Hash + 'static>(
        unpin_rx: &UnpinRx<H>,
        items: impl IntoIterator<Item = H>,
    ) {
        let expected_hashes = HashSet::<H>::from_iter(items);
        for i in 0..expected_hashes.len() {
            let Ok((hash, _)) = unpin_rx.try_recv() else {
                panic!("Another unpin event is expected, but failed to pull item {i} from channel");
            };
            assert!(
                expected_hashes.contains(&hash),
                "Hash {hash:?} was unpinned, but is not expected to have been"
            );
        }
    }

    /// An initialized event containing a BlockRef (useful for comparisons)
    pub fn ev_initialized_ref(n: u64) -> FollowEvent<BlockRef<H256>> {
        FollowEvent::Initialized(Initialized {
            finalized_block_hashes: vec![BlockRef::new(H256::from_low_u64_le(n))],
            finalized_block_runtime: None,
        })
    }

    /// A new block event containing a BlockRef (useful for comparisons)
    pub fn ev_new_block_ref(parent: u64, n: u64) -> FollowEvent<BlockRef<H256>> {
        FollowEvent::NewBlock(NewBlock {
            parent_block_hash: BlockRef::new(H256::from_low_u64_le(parent)),
            block_hash: BlockRef::new(H256::from_low_u64_le(n)),
            new_runtime: None,
        })
    }

    /// A best block event containing a BlockRef (useful for comparisons)
    pub fn ev_best_block_ref(n: u64) -> FollowEvent<BlockRef<H256>> {
        FollowEvent::BestBlockChanged(BestBlockChanged {
            best_block_hash: BlockRef::new(H256::from_low_u64_le(n)),
        })
    }

    /// A finalized event containing a BlockRef (useful for comparisons)
    pub fn ev_finalized_ref(ns: impl IntoIterator<Item = u64>) -> FollowEvent<BlockRef<H256>> {
        FollowEvent::Finalized(Finalized {
            finalized_block_hashes: ns
                .into_iter()
                .map(|h| BlockRef::new(H256::from_low_u64_le(h)))
                .collect(),
            pruned_block_hashes: vec![],
        })
    }
}

#[cfg(test)]
mod test {
    use super::super::follow_stream::test_utils::{
        ev_best_block, ev_finalized, ev_initialized, ev_new_block,
    };
    use super::test_utils::{assert_from_unpin_rx, ev_new_block_ref, test_unpin_stream_getter};
    use super::*;
    use crate::config::substrate::H256;

    #[tokio::test]
    async fn hands_back_blocks() {
        let (follow_unpin, _) = test_unpin_stream_getter(
            || {
                [
                    Ok(ev_new_block(0, 1)),
                    Ok(ev_new_block(1, 2)),
                    Ok(ev_new_block(2, 3)),
                    Err(BackendError::other("ended")),
                ]
            },
            10,
        );

        let out: Vec<_> = follow_unpin.filter_map(async |e| e.ok()).collect().await;

        assert_eq!(
            out,
            vec![
                FollowStreamMsg::Ready("sub_id_0".into()),
                FollowStreamMsg::Event(ev_new_block_ref(0, 1)),
                FollowStreamMsg::Event(ev_new_block_ref(1, 2)),
                FollowStreamMsg::Event(ev_new_block_ref(2, 3)),
            ]
        );
    }

    #[tokio::test]
    async fn unpins_initialized_block() {
        let (mut follow_unpin, unpin_rx) = test_unpin_stream_getter(
            || {
                [
                    Ok(ev_initialized(0)),
                    Ok(ev_finalized([1], [])),
                    Err(BackendError::other("ended")),
                ]
            },
            3,
        );

        let _r = follow_unpin.next().await.unwrap().unwrap();

        // Drop the initialized block:
        let i0 = follow_unpin.next().await.unwrap().unwrap();
        drop(i0);

        // Let a finalization event occur.
        let _f1 = follow_unpin.next().await.unwrap().unwrap();

        // Now, initialized block should be unpinned.
        assert_from_unpin_rx(&unpin_rx, [H256::from_low_u64_le(0)]);
        assert!(!follow_unpin.is_pinned(&H256::from_low_u64_le(0)));
    }

    #[tokio::test]
    async fn unpins_old_blocks() {
        let (mut follow_unpin, unpin_rx) = test_unpin_stream_getter(
            || {
                [
                    Ok(ev_initialized(0)),
                    Ok(ev_finalized([1], [])),
                    Ok(ev_finalized([2], [])),
                    Ok(ev_finalized([3], [])),
                    Ok(ev_finalized([4], [])),
                    Ok(ev_finalized([5], [])),
                    Err(BackendError::other("ended")),
                ]
            },
            3,
        );

        let _r = follow_unpin.next().await.unwrap().unwrap();
        let _i0 = follow_unpin.next().await.unwrap().unwrap();
        unpin_rx.try_recv().expect_err("nothing unpinned yet");
        let _f1 = follow_unpin.next().await.unwrap().unwrap();
        unpin_rx.try_recv().expect_err("nothing unpinned yet");
        let _f2 = follow_unpin.next().await.unwrap().unwrap();
        unpin_rx.try_recv().expect_err("nothing unpinned yet");
        let _f3 = follow_unpin.next().await.unwrap().unwrap();

        // Max age is 3, so after block 3 finalized, block 0 becomes too old and is unpinned.
        assert_from_unpin_rx(&unpin_rx, [H256::from_low_u64_le(0)]);

        let _f4 = follow_unpin.next().await.unwrap().unwrap();

        // Block 1 is now too old and is unpinned.
        assert_from_unpin_rx(&unpin_rx, [H256::from_low_u64_le(1)]);

        let _f5 = follow_unpin.next().await.unwrap().unwrap();

        // Block 2 is now too old and is unpinned.
        assert_from_unpin_rx(&unpin_rx, [H256::from_low_u64_le(2)]);
    }

    #[tokio::test]
    async fn dropped_new_blocks_should_not_get_unpinned_until_finalization() {
        let (mut follow_unpin, unpin_rx) = test_unpin_stream_getter(
            || {
                [
                    Ok(ev_initialized(0)),
                    Ok(ev_new_block(0, 1)),
                    Ok(ev_new_block(1, 2)),
                    Ok(ev_finalized([1], [])),
                    Ok(ev_finalized([2], [])),
                    Err(BackendError::other("ended")),
                ]
            },
            10,
        );

        let _r = follow_unpin.next().await.unwrap().unwrap();
        let _i0 = follow_unpin.next().await.unwrap().unwrap();

        let n1 = follow_unpin.next().await.unwrap().unwrap();
        drop(n1);
        let n2 = follow_unpin.next().await.unwrap().unwrap();
        drop(n2);

        // New blocks dropped but still pinned:
        assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(1)));
        assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(2)));

        let f1 = follow_unpin.next().await.unwrap().unwrap();
        drop(f1);

        // After block 1 finalized, both blocks are still pinned because:
        // - block 1 was handed back in the finalized event, so will be unpinned next time.
        // - block 2 wasn't mentioned in the finalized event, so should not have been unpinned yet.
        assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(1)));
        assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(2)));

        let f2 = follow_unpin.next().await.unwrap().unwrap();
        drop(f2);

        // After block 2 finalized, block 1 can be unpinned finally, but block 2 needs to wait one more event.
        assert!(!follow_unpin.is_pinned(&H256::from_low_u64_le(1)));
        assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(2)));
        assert_from_unpin_rx(&unpin_rx, [H256::from_low_u64_le(1)]);
    }

    #[tokio::test]
    async fn dropped_new_blocks_should_not_get_unpinned_until_pruned() {
        let (mut follow_unpin, unpin_rx) = test_unpin_stream_getter(
            || {
                [
                    Ok(ev_initialized(0)),
                    Ok(ev_new_block(0, 1)),
                    Ok(ev_new_block(1, 2)),
                    Ok(ev_new_block(1, 3)),
                    Ok(ev_finalized([1], [])),
                    Ok(ev_finalized([2], [3])),
                    Ok(ev_finalized([4], [])),
                    Err(BackendError::other("ended")),
                ]
            },
            10,
        );

        let _r = follow_unpin.next().await.unwrap().unwrap();
        let _i0 = follow_unpin.next().await.unwrap().unwrap();

        let n1 = follow_unpin.next().await.unwrap().unwrap();
        drop(n1);
        let n2 = follow_unpin.next().await.unwrap().unwrap();
        drop(n2);
        let n3 = follow_unpin.next().await.unwrap().unwrap();
        drop(n3);

        let f1 = follow_unpin.next().await.unwrap().unwrap();
        drop(f1);

        // After block 1 is finalized, everything is still pinned because the finalization event
        // itself returns 1, and 2/3 aren't finalized or pruned yet.
        assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(1)));
        assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(2)));
        assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(3)));

        let f2 = follow_unpin.next().await.unwrap().unwrap();
        drop(f2);

        // After the next finalization event, block 1 can finally be unpinned since it was Finalized
        // last event _and_ is no longer handed back anywhere. 2 and 3 should still be pinned.
        assert!(!follow_unpin.is_pinned(&H256::from_low_u64_le(1)));
        assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(2)));
        assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(3)));
        assert_from_unpin_rx(&unpin_rx, [H256::from_low_u64_le(1)]);

        let f4 = follow_unpin.next().await.unwrap().unwrap();
        drop(f4);

        // After some other finalized event, we are now allowed to ditch the previously pruned and
        // finalized blocks 2 and 3.
        assert!(!follow_unpin.is_pinned(&H256::from_low_u64_le(2)));
        assert!(!follow_unpin.is_pinned(&H256::from_low_u64_le(3)));
        assert_from_unpin_rx(
            &unpin_rx,
            [H256::from_low_u64_le(2), H256::from_low_u64_le(3)],
        );
    }

    #[tokio::test]
    async fn never_unpin_new_block_before_finalized() {
        // Ensure that if we drop a new block; the pinning is still active until the block is finalized.
        let (mut follow_unpin, unpin_rx) = test_unpin_stream_getter(
            || {
                [
                    Ok(ev_initialized(0)),
                    Ok(ev_new_block(0, 1)),
                    Ok(ev_new_block(1, 2)),
                    Ok(ev_best_block(1)),
                    Ok(ev_finalized([1], [])),
                    Ok(ev_finalized([2], [])),
                    Err(BackendError::other("ended")),
                ]
            },
            10,
        );

        let _r = follow_unpin.next().await.unwrap().unwrap();

        // drop initialised block 0 and new block 1 and new block 2.
        let i0 = follow_unpin.next().await.unwrap().unwrap();
        drop(i0);
        let n1 = follow_unpin.next().await.unwrap().unwrap();
        drop(n1);
        let n2 = follow_unpin.next().await.unwrap().unwrap();
        drop(n2);
        let b1 = follow_unpin.next().await.unwrap().unwrap();
        drop(b1);

        // Nothing unpinned yet!
        unpin_rx.try_recv().expect_err("nothing unpinned yet");

        let f1 = follow_unpin.next().await.unwrap().unwrap();
        drop(f1);

        // After finalization, block 1 is now ready to be unpinned (it won't be seen again),
        // but isn't actually unpinned yet (because it was just handed back in f1). Block 0
        // however has now been unpinned.
        assert!(!follow_unpin.is_pinned(&H256::from_low_u64_le(0)));
        assert_from_unpin_rx(&unpin_rx, [H256::from_low_u64_le(0)]);
        unpin_rx.try_recv().expect_err("nothing unpinned yet");

        let f2 = follow_unpin.next().await.unwrap().unwrap();
        drop(f2);

        // After f2, we can get rid of block 1 now, which was finalized last time.
        assert!(!follow_unpin.is_pinned(&H256::from_low_u64_le(1)));
        assert_from_unpin_rx(&unpin_rx, [H256::from_low_u64_le(1)]);
        unpin_rx.try_recv().expect_err("nothing unpinned yet");
    }
}