1use anyhow::{Result, ensure};
2use tycho_block_util::queue::{QueueKey, QueuePartitionIdx, RouterAddr};
3use tycho_storage::kv::StoredValue;
4use tycho_types::models::{BlockId, IntAddr, ShardIdent};
5use tycho_util::{FastHashMap, FastHashSet};
6use weedb::rocksdb::WriteBatch;
7use weedb::{BoundedCfHandle, ColumnFamily, OwnedSnapshot, Table, rocksdb};
8
9use super::INT_QUEUE_LAST_COMMITTED_MC_BLOCK_ID_KEY;
10use super::db::InternalQueueDB;
11use super::models::{
12 CommitPointerKey, CommitPointerValue, DiffInfoKey, DiffTailKey, QueueRange,
13 ShardsInternalMessagesKey, StatKey,
14};
15use crate::tracing_targets;
16
17pub struct InternalQueueTransaction {
18 pub db: InternalQueueDB,
19 pub batch: WriteBatch,
20 pub buffer: Vec<u8>,
21}
22
23impl InternalQueueTransaction {
24 pub fn size(&self) -> (usize, usize) {
25 (self.batch.len(), self.batch.size_in_bytes())
26 }
27
28 pub fn write(self) -> Result<()> {
29 self.db
30 .rocksdb()
31 .write_opt(self.batch, self.db.shard_internal_messages.write_config())
32 .map_err(Into::into)
33 }
34
35 pub fn insert_statistics(&mut self, key: &StatKey, count: u64) {
36 let cf = self.db.internal_message_stats.cf();
37 self.batch.put_cf(&cf, key.to_vec(), count.to_le_bytes());
38 }
39
40 pub fn insert_diff_tail(&mut self, key: &DiffTailKey, value: &[u8]) {
41 let cf = self.db.internal_message_diffs_tail.cf();
42 self.batch.put_cf(&cf, key.to_vec(), value);
43 }
44
45 pub fn insert_diff_info(&mut self, key: &DiffInfoKey, value: &[u8]) {
46 let cf = self.db.internal_message_diff_info.cf();
47 self.batch.put_cf(&cf, key.to_vec(), value);
48 }
49
50 pub fn insert_message(
51 &mut self,
52 key: &ShardsInternalMessagesKey,
53 dest: &IntAddr,
54 value: &[u8],
55 ) {
56 let cf = self.db.shard_internal_messages.cf();
57
58 self.buffer.clear();
59 self.buffer.reserve(1 + 8 + value.len());
60
61 self.buffer.push(dest.workchain() as i8 as u8);
62 self.buffer.extend_from_slice(&dest.prefix().to_le_bytes());
63 self.buffer.extend_from_slice(value);
64
65 self.batch.put_cf(&cf, key.to_vec(), self.buffer.as_slice());
66 }
67
68 pub fn commit_messages(
71 &mut self,
72 commit_pointers: FastHashMap<ShardIdent, (QueueKey, u32)>,
73 ) -> Result<()> {
74 let commit_pointers_cf = self.db.internal_message_commit_pointer.cf();
75
76 for (shard_ident, (queue_key, seqno)) in commit_pointers {
77 let key = CommitPointerKey { shard_ident }.to_vec();
78
79 let new_val = CommitPointerValue { queue_key, seqno };
80
81 self.batch
82 .put_cf(&commit_pointers_cf, key, new_val.to_vec());
83 }
84
85 Ok(())
86 }
87
88 pub fn clear_uncommitted(
95 &self,
96 partitions: &FastHashSet<QueuePartitionIdx>,
97 commit_pointers: &FastHashMap<ShardIdent, CommitPointerValue>,
98 top_shards: &[ShardIdent],
99 ) -> Result<()> {
100 let mut ranges = Vec::new();
101
102 for (&shard_ident, pointer_val) in commit_pointers {
103 let from = pointer_val.queue_key.next_value();
105 let to = QueueKey::MAX;
107
108 for &partition in partitions {
109 ranges.push(QueueRange {
110 shard_ident,
111 partition,
112 from,
113 to,
114 });
115 }
116 }
117
118 if ranges.is_empty() {
121 for &shard_ident in top_shards {
122 for &partition in partitions {
123 ranges.push(QueueRange {
124 shard_ident,
125 partition,
126 from: QueueKey::MIN,
127 to: QueueKey::MAX,
128 });
129 }
130 }
131 }
132
133 tracing::debug!(target: tracing_targets::MQ,
134 ?commit_pointers,
135 ?ranges,
136 "clear_uncommitted_state",
137 );
138
139 self.delete(&ranges)
140 }
141
142 pub fn delete(&self, ranges: &[QueueRange]) -> Result<()> {
143 let mut batch = WriteBatch::default();
144 let snapshot = self.db.owned_snapshot();
145
146 let bump = bumpalo::Bump::new();
147
148 let mut msgs_to_compact = Vec::new();
149 let mut stats_to_compact = Vec::new();
150 let mut diffs_tail_to_compact = Vec::new();
151 let mut diff_info_to_compact = Vec::new();
152
153 let messages_cf = &self.db.shard_internal_messages.cf();
154 let stats_cf = &self.db.internal_message_stats.cf();
155 let diffs_tail_cf = &self.db.internal_message_diffs_tail.cf();
156 let diff_info_cf = &self.db.internal_message_diff_info.cf();
157
158 for range in ranges {
159 let start_msg_key =
161 ShardsInternalMessagesKey::new(range.partition, range.shard_ident, range.from);
162
163 let end_msg_key =
164 ShardsInternalMessagesKey::new(range.partition, range.shard_ident, range.to);
165
166 delete_range(
167 &mut batch,
168 messages_cf,
169 &start_msg_key.to_vec(),
170 &end_msg_key.to_vec(),
171 &bump,
172 &mut msgs_to_compact,
173 );
174
175 let start_stat_key = StatKey {
177 shard_ident: range.shard_ident,
178 partition: range.partition,
179 max_message: range.from,
180 dest: RouterAddr::MIN,
181 };
182
183 let end_stat_key = StatKey {
184 shard_ident: range.shard_ident,
185 partition: range.partition,
186 max_message: range.to,
187 dest: RouterAddr::MAX,
188 };
189
190 delete_range(
191 &mut batch,
192 stats_cf,
193 &start_stat_key.to_vec(),
194 &end_stat_key.to_vec(),
195 &bump,
196 &mut stats_to_compact,
197 );
198
199 let start_diff_tail_key = DiffTailKey {
201 shard_ident: range.shard_ident,
202 max_message: range.from,
203 };
204
205 let end_diff_tail_key = DiffTailKey {
206 shard_ident: range.shard_ident,
207 max_message: range.to,
208 };
209
210 let from_diff_tail_bytes = start_diff_tail_key.to_vec();
211 let to_diff_tail_bytes = end_diff_tail_key.to_vec();
212
213 let (min_seqno, max_seqno) = delete_diff_tails_and_collect_seqno(
214 &mut batch,
215 self.db.rocksdb().as_ref(),
216 &self.db.internal_message_diffs_tail,
217 &from_diff_tail_bytes,
218 &to_diff_tail_bytes,
219 &snapshot,
220 )?;
221
222 diffs_tail_to_compact.push((
223 bump.alloc_slice_copy(&from_diff_tail_bytes),
224 bump.alloc_slice_copy(&to_diff_tail_bytes),
225 ));
226
227 if min_seqno != u32::MAX && max_seqno != 0 {
229 let from_diff_info = DiffInfoKey {
230 shard_ident: range.shard_ident,
231 seqno: min_seqno,
232 }
233 .to_vec();
234 let to_diff_info = DiffInfoKey {
235 shard_ident: range.shard_ident,
236 seqno: max_seqno,
237 }
238 .to_vec();
239
240 batch.delete_range_cf(diff_info_cf, &from_diff_info, &to_diff_info);
242 batch.delete_cf(diff_info_cf, &to_diff_info);
243
244 diff_info_to_compact.push((
245 bump.alloc_slice_copy(&from_diff_info),
246 bump.alloc_slice_copy(&to_diff_info),
247 ));
248 }
249 }
250
251 let db = self.db.rocksdb().as_ref();
252 db.write(batch)?;
253
254 for (start_key, end_key) in msgs_to_compact {
255 db.compact_range_cf(messages_cf, Some(start_key), Some(end_key));
256 }
257 for (start_key, end_key) in stats_to_compact {
258 db.compact_range_cf(stats_cf, Some(start_key), Some(end_key));
259 }
260 for (start_key, end_key) in diffs_tail_to_compact {
261 db.compact_range_cf(diffs_tail_cf, Some(start_key), Some(end_key));
262 }
263 for (start_key, end_key) in diff_info_to_compact {
264 db.compact_range_cf(diff_info_cf, Some(start_key), Some(end_key));
265 }
266
267 Ok(())
268 }
269
270 pub fn set_last_committed_mc_block_id(&mut self, mc_block_id: &BlockId) -> Result<()> {
273 let cf = self.db.internal_message_var.cf();
274
275 self.batch.put_cf(
276 &cf,
277 INT_QUEUE_LAST_COMMITTED_MC_BLOCK_ID_KEY,
278 mc_block_id.to_vec(),
279 );
280
281 Ok(())
282 }
283}
284
285pub fn delete_diff_tails_and_collect_seqno<T: ColumnFamily>(
286 batch: &mut WriteBatch,
287 db: &rocksdb::DB,
288 diffs_tail_table: &Table<T>,
289 from_key: &[u8],
290 to_key: &[u8],
291 snapshot: &OwnedSnapshot,
292) -> Result<(u32, u32)> {
293 let mut read_opts = diffs_tail_table.new_read_config();
294 read_opts.set_iterate_lower_bound(from_key);
295 read_opts.set_snapshot(snapshot);
296
297 let mut iter = db.raw_iterator_cf_opt(&diffs_tail_table.cf(), read_opts);
299
300 iter.seek(from_key);
302
303 let mut min_seqno = u32::MAX;
304 let mut max_seqno = 0;
305
306 while iter.valid() {
308 let raw_key = match iter.key() {
310 Some(k) => k,
311 None => break, };
313
314 if raw_key > to_key {
316 break;
317 }
318
319 let raw_value = match iter.value() {
321 Some(v) => v,
322 None => break,
323 };
324
325 ensure!(
327 raw_value.len() >= 4,
328 "Invalid diff tail value length: {} < 4",
329 raw_value.len()
330 );
331 let block_seqno = u32::from_le_bytes(raw_value[..4].try_into()?);
332
333 if block_seqno < min_seqno {
335 min_seqno = block_seqno;
336 }
337 if block_seqno > max_seqno {
338 max_seqno = block_seqno;
339 }
340
341 iter.next();
343 }
344
345 batch.delete_range_cf(&diffs_tail_table.cf(), from_key, to_key);
346 batch.delete_cf(&diffs_tail_table.cf(), to_key);
347
348 iter.status()?;
350
351 Ok((min_seqno, max_seqno))
352}
353
354fn delete_range<'a>(
355 batch: &mut WriteBatch,
356 cf: &'_ BoundedCfHandle<'_>,
357 start_key: &'_ [u8],
358 end_key: &'_ [u8],
359 bump: &'a bumpalo::Bump,
360 to_compact: &mut Vec<(&'a [u8], &'a [u8])>,
361) {
362 batch.delete_range_cf(cf, start_key, end_key);
363 batch.delete_cf(cf, end_key);
364 to_compact.push((
365 bump.alloc_slice_copy(start_key),
366 bump.alloc_slice_copy(end_key),
367 ));
368}