tycho_core/block_strider/
block_saver.rs1use anyhow::{Context, Result};
2use futures_util::future::BoxFuture;
3use tycho_block_util::archive::ArchiveData;
4use tycho_block_util::block::BlockStuff;
5use tycho_types::models::BlockId;
6
7use crate::block_strider::{BlockSubscriber, BlockSubscriberContext};
8use crate::storage::{BlockConnection, BlockHandle, CoreStorage, NewBlockMeta};
9
10#[repr(transparent)]
11#[derive(Clone)]
12pub struct BlockSaver {
13 storage: CoreStorage,
14}
15
16impl BlockSaver {
17 pub fn new(storage: CoreStorage) -> Self {
18 Self { storage }
19 }
20
21 pub async fn save_block(&self, cx: &BlockSubscriberContext) -> Result<BlockHandle> {
22 let (prev_id, prev_id_alt) = cx
24 .block
25 .construct_prev_id()
26 .context("failed to construct prev id")?;
27
28 let handle = self
30 .create_or_load_block_handle(&cx.mc_block_id, &cx.block, &cx.archive_data)
31 .await?;
32
33 let block_handles = self.storage.block_handle_storage();
35 let connections = self.storage.block_connection_storage();
36
37 let block_id = cx.block.id();
38
39 let prev_handle = block_handles.load_handle(&prev_id);
40
41 match prev_id_alt {
42 None => {
43 if let Some(handle) = prev_handle {
44 let direction = if block_id.shard != prev_id.shard
45 && prev_id.shard.split().unwrap().1 == block_id.shard
46 {
47 BlockConnection::Next2
49 } else {
50 BlockConnection::Next1
51 };
52 connections.store_connection(&handle, direction, block_id);
53 }
54 connections.store_connection(&handle, BlockConnection::Prev1, &prev_id);
55 }
56 Some(ref prev_id_alt) => {
57 if let Some(handle) = prev_handle {
58 connections.store_connection(&handle, BlockConnection::Next1, block_id);
59 }
60 if let Some(handle) = block_handles.load_handle(prev_id_alt) {
61 connections.store_connection(&handle, BlockConnection::Next1, block_id);
62 }
63 connections.store_connection(&handle, BlockConnection::Prev1, &prev_id);
64 connections.store_connection(&handle, BlockConnection::Prev2, prev_id_alt);
65 }
66 }
67
68 if self.storage.config().store_archives {
70 let storage = self.storage.clone();
71 let handle = handle.clone();
72 let mc_is_key_block = cx.mc_is_key_block;
73 cx.delayed.spawn(move || async move {
74 tracing::debug!(block_id = %handle.id(), "saving block into archive");
75 storage
76 .block_storage()
77 .move_into_archive(&handle, mc_is_key_block)
78 .await
79 })?;
80 }
81
82 Ok(handle)
83 }
84
85 async fn create_or_load_block_handle(
86 &self,
87 mc_block_id: &BlockId,
88 block: &BlockStuff,
89 archive_data: &ArchiveData,
90 ) -> Result<BlockHandle> {
91 let block_storage = self.storage.block_storage();
92
93 let info = block.load_info()?;
94 let res = block_storage
95 .store_block_data(block, archive_data, NewBlockMeta {
96 is_key_block: info.key_block,
97 gen_utime: info.gen_utime,
98 ref_by_mc_seqno: mc_block_id.seqno,
99 })
100 .await?;
101
102 Ok(res.handle)
103 }
104}
105
106impl BlockSubscriber for BlockSaver {
107 type Prepared = BlockHandle;
108
109 type PrepareBlockFut<'a> = BoxFuture<'a, Result<Self::Prepared>>;
110 type HandleBlockFut<'a> = futures_util::future::Ready<Result<()>>;
111
112 fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
113 Box::pin(self.save_block(cx))
114 }
115
116 fn handle_block<'a>(
117 &'a self,
118 _: &'a BlockSubscriberContext,
119 _: Self::Prepared,
120 ) -> Self::HandleBlockFut<'a> {
121 futures_util::future::ready(Ok(()))
122 }
123}