Skip to main content

tycho_collator/storage/
transaction.rs

1use anyhow::{Result, ensure};
2use tycho_block_util::queue::{QueueKey, QueuePartitionIdx, RouterAddr};
3use tycho_storage::kv::StoredValue;
4use tycho_types::models::{BlockId, IntAddr, ShardIdent};
5use tycho_util::{FastHashMap, FastHashSet};
6use weedb::rocksdb::WriteBatch;
7use weedb::{BoundedCfHandle, ColumnFamily, OwnedSnapshot, Table, rocksdb};
8
9use super::INT_QUEUE_LAST_COMMITTED_MC_BLOCK_ID_KEY;
10use super::db::InternalQueueDB;
11use super::models::{
12    CommitPointerKey, CommitPointerValue, DiffInfoKey, DiffTailKey, QueueRange,
13    ShardsInternalMessagesKey, StatKey,
14};
15use crate::tracing_targets;
16
17pub struct InternalQueueTransaction {
18    pub db: InternalQueueDB,
19    pub batch: WriteBatch,
20    pub buffer: Vec<u8>,
21}
22
23impl InternalQueueTransaction {
24    pub fn size(&self) -> (usize, usize) {
25        (self.batch.len(), self.batch.size_in_bytes())
26    }
27
28    pub fn write(self) -> Result<()> {
29        self.db
30            .rocksdb()
31            .write_opt(self.batch, self.db.shard_internal_messages.write_config())
32            .map_err(Into::into)
33    }
34
35    pub fn insert_statistics(&mut self, key: &StatKey, count: u64) {
36        let cf = self.db.internal_message_stats.cf();
37        self.batch.put_cf(&cf, key.to_vec(), count.to_le_bytes());
38    }
39
40    pub fn insert_diff_tail(&mut self, key: &DiffTailKey, value: &[u8]) {
41        let cf = self.db.internal_message_diffs_tail.cf();
42        self.batch.put_cf(&cf, key.to_vec(), value);
43    }
44
45    pub fn insert_diff_info(&mut self, key: &DiffInfoKey, value: &[u8]) {
46        let cf = self.db.internal_message_diff_info.cf();
47        self.batch.put_cf(&cf, key.to_vec(), value);
48    }
49
50    pub fn insert_message(
51        &mut self,
52        key: &ShardsInternalMessagesKey,
53        dest: &IntAddr,
54        value: &[u8],
55    ) {
56        let cf = self.db.shard_internal_messages.cf();
57
58        self.buffer.clear();
59        self.buffer.reserve(1 + 8 + value.len());
60
61        self.buffer.push(dest.workchain() as i8 as u8);
62        self.buffer.extend_from_slice(&dest.prefix().to_le_bytes());
63        self.buffer.extend_from_slice(value);
64
65        self.batch.put_cf(&cf, key.to_vec(), self.buffer.as_slice());
66    }
67
68    /// Updates commit pointers in message queue.
69    /// ATTENTION! Overrides old value without checks. Should validate the new value in the calling code.
70    pub fn commit_messages(
71        &mut self,
72        commit_pointers: FastHashMap<ShardIdent, (QueueKey, u32)>,
73    ) -> Result<()> {
74        let commit_pointers_cf = self.db.internal_message_commit_pointer.cf();
75
76        for (shard_ident, (queue_key, seqno)) in commit_pointers {
77            let key = CommitPointerKey { shard_ident }.to_vec();
78
79            let new_val = CommitPointerValue { queue_key, seqno };
80
81            self.batch
82                .put_cf(&commit_pointers_cf, key, new_val.to_vec());
83        }
84
85        Ok(())
86    }
87
88    /// Removes all keys that are strictly above the committed pointers in each partition.
89    /// (Anything above `pointer + 1` is considered "uncommitted" and will be deleted.)
90    ///
91    /// - `commit_pointers`: a map of (`ShardIdent` -> last committed `QueueKey`)
92    /// - `partitions`: a list of partitions (e.g. 0..255) to clear
93    /// - `top_shards`: a list of all shards for backoff when no commit pointers
94    pub fn clear_uncommitted(
95        &self,
96        partitions: &FastHashSet<QueuePartitionIdx>,
97        commit_pointers: &FastHashMap<ShardIdent, CommitPointerValue>,
98        top_shards: &[ShardIdent],
99    ) -> Result<()> {
100        let mut ranges = Vec::new();
101
102        for (&shard_ident, pointer_val) in commit_pointers {
103            // delete from next value of the pointer
104            let from = pointer_val.queue_key.next_value();
105            // to the maximum value
106            let to = QueueKey::MAX;
107
108            for &partition in partitions {
109                ranges.push(QueueRange {
110                    shard_ident,
111                    partition,
112                    from,
113                    to,
114                });
115            }
116        }
117
118        // backoff: if no commit pointers (no any committed diff)
119        //          create full ranges for delete for each shard
120        if ranges.is_empty() {
121            for &shard_ident in top_shards {
122                for &partition in partitions {
123                    ranges.push(QueueRange {
124                        shard_ident,
125                        partition,
126                        from: QueueKey::MIN,
127                        to: QueueKey::MAX,
128                    });
129                }
130            }
131        }
132
133        tracing::debug!(target: tracing_targets::MQ,
134            ?commit_pointers,
135            ?ranges,
136            "clear_uncommitted_state",
137        );
138
139        self.delete(&ranges)
140    }
141
142    pub fn delete(&self, ranges: &[QueueRange]) -> Result<()> {
143        let mut batch = WriteBatch::default();
144        let snapshot = self.db.owned_snapshot();
145
146        let bump = bumpalo::Bump::new();
147
148        let mut msgs_to_compact = Vec::new();
149        let mut stats_to_compact = Vec::new();
150        let mut diffs_tail_to_compact = Vec::new();
151        let mut diff_info_to_compact = Vec::new();
152
153        let messages_cf = &self.db.shard_internal_messages.cf();
154        let stats_cf = &self.db.internal_message_stats.cf();
155        let diffs_tail_cf = &self.db.internal_message_diffs_tail.cf();
156        let diff_info_cf = &self.db.internal_message_diff_info.cf();
157
158        for range in ranges {
159            // Delete messages in one range
160            let start_msg_key =
161                ShardsInternalMessagesKey::new(range.partition, range.shard_ident, range.from);
162
163            let end_msg_key =
164                ShardsInternalMessagesKey::new(range.partition, range.shard_ident, range.to);
165
166            delete_range(
167                &mut batch,
168                messages_cf,
169                &start_msg_key.to_vec(),
170                &end_msg_key.to_vec(),
171                &bump,
172                &mut msgs_to_compact,
173            );
174
175            // Delete stats in one range
176            let start_stat_key = StatKey {
177                shard_ident: range.shard_ident,
178                partition: range.partition,
179                max_message: range.from,
180                dest: RouterAddr::MIN,
181            };
182
183            let end_stat_key = StatKey {
184                shard_ident: range.shard_ident,
185                partition: range.partition,
186                max_message: range.to,
187                dest: RouterAddr::MAX,
188            };
189
190            delete_range(
191                &mut batch,
192                stats_cf,
193                &start_stat_key.to_vec(),
194                &end_stat_key.to_vec(),
195                &bump,
196                &mut stats_to_compact,
197            );
198
199            // delete tail and info
200            let start_diff_tail_key = DiffTailKey {
201                shard_ident: range.shard_ident,
202                max_message: range.from,
203            };
204
205            let end_diff_tail_key = DiffTailKey {
206                shard_ident: range.shard_ident,
207                max_message: range.to,
208            };
209
210            let from_diff_tail_bytes = start_diff_tail_key.to_vec();
211            let to_diff_tail_bytes = end_diff_tail_key.to_vec();
212
213            let (min_seqno, max_seqno) = delete_diff_tails_and_collect_seqno(
214                &mut batch,
215                self.db.rocksdb().as_ref(),
216                &self.db.internal_message_diffs_tail,
217                &from_diff_tail_bytes,
218                &to_diff_tail_bytes,
219                &snapshot,
220            )?;
221
222            diffs_tail_to_compact.push((
223                bump.alloc_slice_copy(&from_diff_tail_bytes),
224                bump.alloc_slice_copy(&to_diff_tail_bytes),
225            ));
226
227            // if we found some valid seqnos, also delete the [min_seqno .. max_seqno] from diff_info
228            if min_seqno != u32::MAX && max_seqno != 0 {
229                let from_diff_info = DiffInfoKey {
230                    shard_ident: range.shard_ident,
231                    seqno: min_seqno,
232                }
233                .to_vec();
234                let to_diff_info = DiffInfoKey {
235                    shard_ident: range.shard_ident,
236                    seqno: max_seqno,
237                }
238                .to_vec();
239
240                // Range-delete for diff_info
241                batch.delete_range_cf(diff_info_cf, &from_diff_info, &to_diff_info);
242                batch.delete_cf(diff_info_cf, &to_diff_info);
243
244                diff_info_to_compact.push((
245                    bump.alloc_slice_copy(&from_diff_info),
246                    bump.alloc_slice_copy(&to_diff_info),
247                ));
248            }
249        }
250
251        let db = self.db.rocksdb().as_ref();
252        db.write(batch)?;
253
254        for (start_key, end_key) in msgs_to_compact {
255            db.compact_range_cf(messages_cf, Some(start_key), Some(end_key));
256        }
257        for (start_key, end_key) in stats_to_compact {
258            db.compact_range_cf(stats_cf, Some(start_key), Some(end_key));
259        }
260        for (start_key, end_key) in diffs_tail_to_compact {
261            db.compact_range_cf(diffs_tail_cf, Some(start_key), Some(end_key));
262        }
263        for (start_key, end_key) in diff_info_to_compact {
264            db.compact_range_cf(diff_info_cf, Some(start_key), Some(end_key));
265        }
266
267        Ok(())
268    }
269
270    /// Stores mc block id on which the queue was committed.
271    /// ATTENTION! Overrides old value without checks. Should validate the new value in the calling code.
272    pub fn set_last_committed_mc_block_id(&mut self, mc_block_id: &BlockId) -> Result<()> {
273        let cf = self.db.internal_message_var.cf();
274
275        self.batch.put_cf(
276            &cf,
277            INT_QUEUE_LAST_COMMITTED_MC_BLOCK_ID_KEY,
278            mc_block_id.to_vec(),
279        );
280
281        Ok(())
282    }
283}
284
285pub fn delete_diff_tails_and_collect_seqno<T: ColumnFamily>(
286    batch: &mut WriteBatch,
287    db: &rocksdb::DB,
288    diffs_tail_table: &Table<T>,
289    from_key: &[u8],
290    to_key: &[u8],
291    snapshot: &OwnedSnapshot,
292) -> Result<(u32, u32)> {
293    let mut read_opts = diffs_tail_table.new_read_config();
294    read_opts.set_iterate_lower_bound(from_key);
295    read_opts.set_snapshot(snapshot);
296
297    // Create a raw iterator over the diffs_tail_table
298    let mut iter = db.raw_iterator_cf_opt(&diffs_tail_table.cf(), read_opts);
299
300    // Seek to the lower boundary
301    iter.seek(from_key);
302
303    let mut min_seqno = u32::MAX;
304    let mut max_seqno = 0;
305
306    // Iterate as long as the iterator is valid
307    while iter.valid() {
308        // Extract the current key
309        let raw_key = match iter.key() {
310            Some(k) => k,
311            None => break, // if no key is available, stop
312        };
313
314        // Stop if we've gone past the upper boundary
315        if raw_key > to_key {
316            break;
317        }
318
319        // Extract the current value
320        let raw_value = match iter.value() {
321            Some(v) => v,
322            None => break,
323        };
324
325        // Decode the seqno (first 4 bytes)
326        ensure!(
327            raw_value.len() >= 4,
328            "Invalid diff tail value length: {} < 4",
329            raw_value.len()
330        );
331        let block_seqno = u32::from_le_bytes(raw_value[..4].try_into()?);
332
333        // Update min/max sequence numbers
334        if block_seqno < min_seqno {
335            min_seqno = block_seqno;
336        }
337        if block_seqno > max_seqno {
338            max_seqno = block_seqno;
339        }
340
341        // Move to the next item
342        iter.next();
343    }
344
345    batch.delete_range_cf(&diffs_tail_table.cf(), from_key, to_key);
346    batch.delete_cf(&diffs_tail_table.cf(), to_key);
347
348    // Check the iterator status for any internal errors
349    iter.status()?;
350
351    Ok((min_seqno, max_seqno))
352}
353
354fn delete_range<'a>(
355    batch: &mut WriteBatch,
356    cf: &'_ BoundedCfHandle<'_>,
357    start_key: &'_ [u8],
358    end_key: &'_ [u8],
359    bump: &'a bumpalo::Bump,
360    to_compact: &mut Vec<(&'a [u8], &'a [u8])>,
361) {
362    batch.delete_range_cf(cf, start_key, end_key);
363    batch.delete_cf(cf, end_key);
364    to_compact.push((
365        bump.alloc_slice_copy(start_key),
366        bump.alloc_slice_copy(end_key),
367    ));
368}