tycho_collator/storage/
mod.rs1use std::fs::File;
2
3use anyhow::Result;
4use tycho_block_util::queue::{QueueKey, QueuePartitionIdx, RouterAddr, RouterPartitions};
5use tycho_core::storage::QueueStateReader;
6use tycho_storage::StorageContext;
7use tycho_storage::kv::StoredValue;
8use tycho_types::models::{BlockId, IntAddr, Message, MsgInfo, OutMsgQueueUpdates, ShardIdent};
9use tycho_util::FastHashMap;
10use tycho_util::fs::MappedFile;
11
12use self::db::InternalQueueDB;
13use self::models::{
14 CommitPointerKey, CommitPointerValue, DiffInfo, DiffInfoKey, DiffTailKey,
15 ShardsInternalMessagesKey, StatKey,
16};
17use self::snapshot::InternalQueueSnapshot;
18use self::transaction::InternalQueueTransaction;
19
20pub mod db;
21pub mod iterator;
22pub mod models;
23pub mod snapshot;
24pub mod tables;
25pub mod transaction;
26
27pub const INT_QUEUE_LAST_COMMITTED_MC_BLOCK_ID_KEY: &[u8] = b"last_committed_mc_block_id";
29const INT_QUEUE_SUBDIR: &str = "int_queue";
30
31#[derive(Clone)]
32pub struct InternalQueueStorage {
33 #[allow(unused)]
34 context: StorageContext,
35 db: InternalQueueDB,
36}
37
38impl InternalQueueStorage {
39 pub fn open(context: StorageContext) -> Result<Self> {
40 let db = context.open_preconfigured(INT_QUEUE_SUBDIR)?;
41
42 Ok(Self { context, db })
45 }
46
47 pub fn db(&self) -> &InternalQueueDB {
48 &self.db
49 }
50
51 pub fn begin_transaction(&self) -> InternalQueueTransaction {
52 InternalQueueTransaction {
53 db: self.db.clone(),
54 batch: Default::default(),
55 buffer: Vec::new(),
56 }
57 }
58
59 pub fn make_snapshot(&self) -> InternalQueueSnapshot {
60 InternalQueueSnapshot {
61 db: self.db.clone(),
62 snapshot: self.db.owned_snapshot(),
63 }
64 }
65
66 pub async fn import_from_file(
77 &self,
78 top_update: &OutMsgQueueUpdates,
79 file: File,
80 block_id: BlockId,
81 ) -> Result<()> {
82 tracing::info!("Importing internal queue from file for block {block_id}");
83 use tycho_types::boc::ser::BocHeader;
84
85 let top_update = top_update.clone();
86 let this = self.clone();
87
88 let span = tracing::Span::current();
89 tokio::task::spawn_blocking(move || {
90 let _span = span.enter();
91
92 let get_partition = |partitions: &RouterPartitions, router_addr: &RouterAddr| {
93 for (p, addresses) in partitions {
94 if addresses.contains(router_addr) {
95 return Some(*p);
96 }
97 }
98 None
99 };
100
101 let mapped = MappedFile::from_existing_file(file)?;
102
103 let mut reader = QueueStateReader::begin_from_mapped(mapped.as_slice(), &top_update)?;
104
105 let messages_cf = this.db.shard_internal_messages.cf();
106 let stats_cf = this.db.internal_message_stats.cf();
107 let var_cf = this.db.internal_message_var.cf();
108 let diffs_tail_cf = this.db.internal_message_diffs_tail.cf();
109 let diff_infos_cf = this.db.internal_message_diff_info.cf();
110 let commit_pointers_cf = this.db.internal_message_commit_pointer.cf();
111
112 let mut first_diff_read = false;
113
114 let mut batch = weedb::rocksdb::WriteBatch::default();
115
116 let mut buffer = Vec::new();
117 let mut statistics: FastHashMap<QueuePartitionIdx, FastHashMap<RouterAddr, u64>> =
118 FastHashMap::default();
119 while let Some(mut part) = reader.read_next_queue_diff()? {
120 let mut shards_messages_count = FastHashMap::default();
121
122 while let Some(cell) = part.read_next_message()? {
123 let msg_hash = cell.repr_hash();
124 let msg = cell.parse::<Message<'_>>()?;
125 let MsgInfo::Int(int_msg_info) = &msg.info else {
126 anyhow::bail!("non-internal message in the queue in msg {msg_hash}");
127 };
128
129 let IntAddr::Std(dest) = &int_msg_info.dst else {
130 anyhow::bail!("non-std destination address in msg {msg_hash}");
131 };
132
133 let IntAddr::Std(src) = &int_msg_info.src else {
134 anyhow::bail!("non-std destination address in msg {msg_hash}");
135 };
136
137 let src_addr = RouterAddr {
138 workchain: src.workchain,
139 account: src.address,
140 };
141
142 let dest_addr = RouterAddr {
143 workchain: dest.workchain,
144 account: dest.address,
145 };
146
147 let dest_shard = ShardIdent::new_full(dest_addr.workchain as i32);
149
150 shards_messages_count
151 .entry(dest_shard)
152 .and_modify(|count| *count += 1)
153 .or_insert(1);
154
155 let queue_diff = part.queue_diff();
156 let partition = get_partition(&queue_diff.router_partitions_dst, &dest_addr)
157 .or_else(|| get_partition(&queue_diff.router_partitions_src, &src_addr))
158 .unwrap_or_default();
159
160 let key = ShardsInternalMessagesKey {
161 partition,
162 shard_ident: block_id.shard,
163 internal_message_key: QueueKey {
164 lt: int_msg_info.created_lt,
165 hash: *msg_hash,
166 },
167 };
168
169 buffer.clear();
170 buffer.push(dest.workchain as u8);
171 buffer.extend_from_slice(&dest.prefix().to_le_bytes());
172 BocHeader::<ahash::RandomState>::with_root(cell.as_ref()).encode(&mut buffer);
173 batch.put_cf(&messages_cf, key.to_vec(), &buffer);
174
175 let partition_stats = statistics.entry(partition).or_default();
176 *partition_stats.entry(dest_addr).or_insert(0) += 1;
177 }
178
179 let queue_diff = part.queue_diff();
180
181 if !first_diff_read {
182 let commit_pointer_key = CommitPointerKey {
184 shard_ident: queue_diff.shard_ident,
185 };
186
187 let commit_pointer_value = CommitPointerValue {
188 queue_key: queue_diff.max_message,
189 seqno: queue_diff.seqno,
190 };
191
192 batch.put_cf(
193 &commit_pointers_cf,
194 commit_pointer_key.to_vec(),
195 commit_pointer_value.to_vec(),
196 );
197 }
198
199 first_diff_read = true;
200
201 let diff_tail_key = DiffTailKey {
203 shard_ident: queue_diff.shard_ident,
204 max_message: queue_diff.max_message,
205 };
206
207 batch.put_cf(
208 &diffs_tail_cf,
209 diff_tail_key.to_vec(),
210 queue_diff.seqno.to_le_bytes(),
211 );
212
213 let diff_info_key = DiffInfoKey {
215 shard_ident: queue_diff.shard_ident,
216 seqno: queue_diff.seqno,
217 };
218
219 let diff_info = DiffInfo {
220 min_message: queue_diff.min_message,
221 max_message: queue_diff.max_message,
222 shards_messages_count,
223 hash: queue_diff.hash,
224 processed_to: queue_diff.processed_to.clone(),
225 router_partitions_src: queue_diff.router_partitions_src.clone(),
226 router_partitions_dst: queue_diff.router_partitions_dst.clone(),
227 seqno: queue_diff.seqno,
228 };
229
230 batch.put_cf(
231 &diff_infos_cf,
232 diff_info_key.to_vec(),
233 tl_proto::serialize(diff_info),
234 );
235
236 for (partition, statistics) in statistics.drain() {
237 for (dest, count) in statistics.iter() {
238 let key = StatKey {
239 shard_ident: queue_diff.shard_ident,
240 partition,
241 max_message: queue_diff.max_message,
242 dest: *dest,
243 };
244
245 batch.put_cf(&stats_cf, key.to_vec(), count.to_le_bytes());
246 }
247 }
248 }
249
250 if block_id.is_masterchain() {
252 batch.put_cf(
253 &var_cf,
254 INT_QUEUE_LAST_COMMITTED_MC_BLOCK_ID_KEY,
255 block_id.to_vec(),
256 );
257 }
258
259 reader.finish()?;
260
261 this.db.rocksdb().write(batch)?;
262 Ok(())
263 })
264 .await?
265 }
266}