Skip to main content

tycho_collator/storage/
mod.rs

1use std::fs::File;
2
3use anyhow::Result;
4use tycho_block_util::queue::{QueueKey, QueuePartitionIdx, RouterAddr, RouterPartitions};
5use tycho_core::storage::QueueStateReader;
6use tycho_storage::StorageContext;
7use tycho_storage::kv::StoredValue;
8use tycho_types::models::{BlockId, IntAddr, Message, MsgInfo, OutMsgQueueUpdates, ShardIdent};
9use tycho_util::FastHashMap;
10use tycho_util::fs::MappedFile;
11
12use self::db::InternalQueueDB;
13use self::models::{
14    CommitPointerKey, CommitPointerValue, DiffInfo, DiffInfoKey, DiffTailKey,
15    ShardsInternalMessagesKey, StatKey,
16};
17use self::snapshot::InternalQueueSnapshot;
18use self::transaction::InternalQueueTransaction;
19
20pub mod db;
21pub mod iterator;
22pub mod models;
23pub mod snapshot;
24pub mod tables;
25pub mod transaction;
26
27// Constants
28pub const INT_QUEUE_LAST_COMMITTED_MC_BLOCK_ID_KEY: &[u8] = b"last_committed_mc_block_id";
29const INT_QUEUE_SUBDIR: &str = "int_queue";
30
31#[derive(Clone)]
32pub struct InternalQueueStorage {
33    #[allow(unused)]
34    context: StorageContext,
35    db: InternalQueueDB,
36}
37
38impl InternalQueueStorage {
39    pub fn open(context: StorageContext) -> Result<Self> {
40        let db = context.open_preconfigured(INT_QUEUE_SUBDIR)?;
41
42        // TODO: Add migrations here if needed. However, it might require making this method `async`.
43
44        Ok(Self { context, db })
45    }
46
47    pub fn db(&self) -> &InternalQueueDB {
48        &self.db
49    }
50
51    pub fn begin_transaction(&self) -> InternalQueueTransaction {
52        InternalQueueTransaction {
53            db: self.db.clone(),
54            batch: Default::default(),
55            buffer: Vec::new(),
56        }
57    }
58
59    pub fn make_snapshot(&self) -> InternalQueueSnapshot {
60        InternalQueueSnapshot {
61            db: self.db.clone(),
62            snapshot: self.db.owned_snapshot(),
63        }
64    }
65
66    /// Initializes the internal queue storage from a file.
67    ///
68    /// # Arguments
69    /// * `top_update` - The top-level diff and tail len.
70    /// * `file` - The file containing queue state data.
71    /// * `block_id` - The key block identifier.
72    ///
73    /// # Returns
74    /// * `Ok(())` if the import was successful.
75    /// * `Err(anyhow::Error)` if an error occurs during import.
76    pub async fn import_from_file(
77        &self,
78        top_update: &OutMsgQueueUpdates,
79        file: File,
80        block_id: BlockId,
81    ) -> Result<()> {
82        tracing::info!("Importing internal queue from file for block {block_id}");
83        use tycho_types::boc::ser::BocHeader;
84
85        let top_update = top_update.clone();
86        let this = self.clone();
87
88        let span = tracing::Span::current();
89        tokio::task::spawn_blocking(move || {
90            let _span = span.enter();
91
92            let get_partition = |partitions: &RouterPartitions, router_addr: &RouterAddr| {
93                for (p, addresses) in partitions {
94                    if addresses.contains(router_addr) {
95                        return Some(*p);
96                    }
97                }
98                None
99            };
100
101            let mapped = MappedFile::from_existing_file(file)?;
102
103            let mut reader = QueueStateReader::begin_from_mapped(mapped.as_slice(), &top_update)?;
104
105            let messages_cf = this.db.shard_internal_messages.cf();
106            let stats_cf = this.db.internal_message_stats.cf();
107            let var_cf = this.db.internal_message_var.cf();
108            let diffs_tail_cf = this.db.internal_message_diffs_tail.cf();
109            let diff_infos_cf = this.db.internal_message_diff_info.cf();
110            let commit_pointers_cf = this.db.internal_message_commit_pointer.cf();
111
112            let mut first_diff_read = false;
113
114            let mut batch = weedb::rocksdb::WriteBatch::default();
115
116            let mut buffer = Vec::new();
117            let mut statistics: FastHashMap<QueuePartitionIdx, FastHashMap<RouterAddr, u64>> =
118                FastHashMap::default();
119            while let Some(mut part) = reader.read_next_queue_diff()? {
120                let mut shards_messages_count = FastHashMap::default();
121
122                while let Some(cell) = part.read_next_message()? {
123                    let msg_hash = cell.repr_hash();
124                    let msg = cell.parse::<Message<'_>>()?;
125                    let MsgInfo::Int(int_msg_info) = &msg.info else {
126                        anyhow::bail!("non-internal message in the queue in msg {msg_hash}");
127                    };
128
129                    let IntAddr::Std(dest) = &int_msg_info.dst else {
130                        anyhow::bail!("non-std destination address in msg {msg_hash}");
131                    };
132
133                    let IntAddr::Std(src) = &int_msg_info.src else {
134                        anyhow::bail!("non-std destination address in msg {msg_hash}");
135                    };
136
137                    let src_addr = RouterAddr {
138                        workchain: src.workchain,
139                        account: src.address,
140                    };
141
142                    let dest_addr = RouterAddr {
143                        workchain: dest.workchain,
144                        account: dest.address,
145                    };
146
147                    // TODO after split/merge implementation we should use detailed counter for 256 shards
148                    let dest_shard = ShardIdent::new_full(dest_addr.workchain as i32);
149
150                    shards_messages_count
151                        .entry(dest_shard)
152                        .and_modify(|count| *count += 1)
153                        .or_insert(1);
154
155                    let queue_diff = part.queue_diff();
156                    let partition = get_partition(&queue_diff.router_partitions_dst, &dest_addr)
157                        .or_else(|| get_partition(&queue_diff.router_partitions_src, &src_addr))
158                        .unwrap_or_default();
159
160                    let key = ShardsInternalMessagesKey {
161                        partition,
162                        shard_ident: block_id.shard,
163                        internal_message_key: QueueKey {
164                            lt: int_msg_info.created_lt,
165                            hash: *msg_hash,
166                        },
167                    };
168
169                    buffer.clear();
170                    buffer.push(dest.workchain as u8);
171                    buffer.extend_from_slice(&dest.prefix().to_le_bytes());
172                    BocHeader::<ahash::RandomState>::with_root(cell.as_ref()).encode(&mut buffer);
173                    batch.put_cf(&messages_cf, key.to_vec(), &buffer);
174
175                    let partition_stats = statistics.entry(partition).or_default();
176                    *partition_stats.entry(dest_addr).or_insert(0) += 1;
177                }
178
179                let queue_diff = part.queue_diff();
180
181                if !first_diff_read {
182                    // set commit pointer
183                    let commit_pointer_key = CommitPointerKey {
184                        shard_ident: queue_diff.shard_ident,
185                    };
186
187                    let commit_pointer_value = CommitPointerValue {
188                        queue_key: queue_diff.max_message,
189                        seqno: queue_diff.seqno,
190                    };
191
192                    batch.put_cf(
193                        &commit_pointers_cf,
194                        commit_pointer_key.to_vec(),
195                        commit_pointer_value.to_vec(),
196                    );
197                }
198
199                first_diff_read = true;
200
201                // insert diff tail
202                let diff_tail_key = DiffTailKey {
203                    shard_ident: queue_diff.shard_ident,
204                    max_message: queue_diff.max_message,
205                };
206
207                batch.put_cf(
208                    &diffs_tail_cf,
209                    diff_tail_key.to_vec(),
210                    queue_diff.seqno.to_le_bytes(),
211                );
212
213                // insert diff info
214                let diff_info_key = DiffInfoKey {
215                    shard_ident: queue_diff.shard_ident,
216                    seqno: queue_diff.seqno,
217                };
218
219                let diff_info = DiffInfo {
220                    min_message: queue_diff.min_message,
221                    max_message: queue_diff.max_message,
222                    shards_messages_count,
223                    hash: queue_diff.hash,
224                    processed_to: queue_diff.processed_to.clone(),
225                    router_partitions_src: queue_diff.router_partitions_src.clone(),
226                    router_partitions_dst: queue_diff.router_partitions_dst.clone(),
227                    seqno: queue_diff.seqno,
228                };
229
230                batch.put_cf(
231                    &diff_infos_cf,
232                    diff_info_key.to_vec(),
233                    tl_proto::serialize(diff_info),
234                );
235
236                for (partition, statistics) in statistics.drain() {
237                    for (dest, count) in statistics.iter() {
238                        let key = StatKey {
239                            shard_ident: queue_diff.shard_ident,
240                            partition,
241                            max_message: queue_diff.max_message,
242                            dest: *dest,
243                        };
244
245                        batch.put_cf(&stats_cf, key.to_vec(), count.to_le_bytes());
246                    }
247                }
248            }
249
250            // insert last applied diff
251            if block_id.is_masterchain() {
252                batch.put_cf(
253                    &var_cf,
254                    INT_QUEUE_LAST_COMMITTED_MC_BLOCK_ID_KEY,
255                    block_id.to_vec(),
256                );
257            }
258
259            reader.finish()?;
260
261            this.db.rocksdb().write(batch)?;
262            Ok(())
263        })
264        .await?
265    }
266}