Skip to main content

tycho_core/block_strider/
block_saver.rs

1use anyhow::{Context, Result};
2use futures_util::future::BoxFuture;
3use tycho_block_util::archive::ArchiveData;
4use tycho_block_util::block::BlockStuff;
5use tycho_types::models::BlockId;
6
7use crate::block_strider::{BlockSubscriber, BlockSubscriberContext};
8use crate::storage::{BlockConnection, BlockHandle, CoreStorage, NewBlockMeta};
9
10#[repr(transparent)]
11#[derive(Clone)]
12pub struct BlockSaver {
13    storage: CoreStorage,
14}
15
16impl BlockSaver {
17    pub fn new(storage: CoreStorage) -> Self {
18        Self { storage }
19    }
20
21    pub async fn save_block(&self, cx: &BlockSubscriberContext) -> Result<BlockHandle> {
22        // Construct prev ids
23        let (prev_id, prev_id_alt) = cx
24            .block
25            .construct_prev_id()
26            .context("failed to construct prev id")?;
27
28        // Store block data and get handle
29        let handle = self
30            .create_or_load_block_handle(&cx.mc_block_id, &cx.block, &cx.archive_data)
31            .await?;
32
33        // Store block connections
34        let block_handles = self.storage.block_handle_storage();
35        let connections = self.storage.block_connection_storage();
36
37        let block_id = cx.block.id();
38
39        let prev_handle = block_handles.load_handle(&prev_id);
40
41        match prev_id_alt {
42            None => {
43                if let Some(handle) = prev_handle {
44                    let direction = if block_id.shard != prev_id.shard
45                        && prev_id.shard.split().unwrap().1 == block_id.shard
46                    {
47                        // Special case for the right child after split
48                        BlockConnection::Next2
49                    } else {
50                        BlockConnection::Next1
51                    };
52                    connections.store_connection(&handle, direction, block_id);
53                }
54                connections.store_connection(&handle, BlockConnection::Prev1, &prev_id);
55            }
56            Some(ref prev_id_alt) => {
57                if let Some(handle) = prev_handle {
58                    connections.store_connection(&handle, BlockConnection::Next1, block_id);
59                }
60                if let Some(handle) = block_handles.load_handle(prev_id_alt) {
61                    connections.store_connection(&handle, BlockConnection::Next1, block_id);
62                }
63                connections.store_connection(&handle, BlockConnection::Prev1, &prev_id);
64                connections.store_connection(&handle, BlockConnection::Prev2, prev_id_alt);
65            }
66        }
67
68        // Save block to archive if needed
69        if self.storage.config().store_archives {
70            let storage = self.storage.clone();
71            let handle = handle.clone();
72            let mc_is_key_block = cx.mc_is_key_block;
73            cx.delayed.spawn(move || async move {
74                tracing::debug!(block_id = %handle.id(), "saving block into archive");
75                storage
76                    .block_storage()
77                    .move_into_archive(&handle, mc_is_key_block)
78                    .await
79            })?;
80        }
81
82        Ok(handle)
83    }
84
85    async fn create_or_load_block_handle(
86        &self,
87        mc_block_id: &BlockId,
88        block: &BlockStuff,
89        archive_data: &ArchiveData,
90    ) -> Result<BlockHandle> {
91        let block_storage = self.storage.block_storage();
92
93        let info = block.load_info()?;
94        let res = block_storage
95            .store_block_data(block, archive_data, NewBlockMeta {
96                is_key_block: info.key_block,
97                gen_utime: info.gen_utime,
98                ref_by_mc_seqno: mc_block_id.seqno,
99            })
100            .await?;
101
102        Ok(res.handle)
103    }
104}
105
106impl BlockSubscriber for BlockSaver {
107    type Prepared = BlockHandle;
108
109    type PrepareBlockFut<'a> = BoxFuture<'a, Result<Self::Prepared>>;
110    type HandleBlockFut<'a> = futures_util::future::Ready<Result<()>>;
111
112    fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
113        Box::pin(self.save_block(cx))
114    }
115
116    fn handle_block<'a>(
117        &'a self,
118        _: &'a BlockSubscriberContext,
119        _: Self::Prepared,
120    ) -> Self::HandleBlockFut<'a> {
121        futures_util::future::ready(Ok(()))
122    }
123}