1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use anyhow::Result;
5use tycho_block_util::queue::{QueueKey, QueuePartitionIdx, RouterAddr, RouterPartitions};
6use tycho_storage::StorageContext;
7use tycho_types::cell::HashBytes;
8use tycho_types::models::{BlockId, BlockIdShort, ShardIdent};
9use tycho_util::metrics::HistogramGuard;
10use tycho_util::{FastHashMap, FastHashSet};
11
12use crate::internal_queue::state::state_iterator::{StateIterator, StateIteratorImpl};
13use crate::internal_queue::types::diff::{DiffZone, QueueDiffWithMessages};
14use crate::internal_queue::types::message::InternalMessageValue;
15use crate::internal_queue::types::ranges::QueueShardRange;
16use crate::internal_queue::types::router::PartitionRouter;
17use crate::internal_queue::types::stats::{
18 AccountStatistics, DiffStatistics, SeparatedStatisticsByPartitions,
19};
20use crate::storage::InternalQueueStorage;
21use crate::storage::models::{
22 CommitPointerValue, DiffInfo, DiffInfoKey, DiffTailKey, ShardsInternalMessagesKey, StatKey,
23};
24use crate::storage::snapshot::InternalQueueSnapshot;
25use crate::storage::transaction::InternalQueueTransaction;
26use crate::types::ProcessedTo;
27
28pub trait QueueStateFactory<V: InternalMessageValue> {
31 type QueueState: QueueState<V>;
32
33 fn create(&self) -> Result<Self::QueueState>;
34}
35
36impl<F, R, V> QueueStateFactory<V> for F
37where
38 F: Fn() -> Result<R>,
39 R: QueueState<V>,
40 V: InternalMessageValue,
41{
42 type QueueState = R;
43
44 fn create(&self) -> Result<Self::QueueState> {
45 self()
46 }
47}
48
49pub struct QueueStateImplFactory {
50 pub storage: InternalQueueStorage,
51}
52
53impl QueueStateImplFactory {
54 pub fn new(ctx: StorageContext) -> Result<Self> {
55 let storage = InternalQueueStorage::open(ctx)?;
56 Ok(Self { storage })
57 }
58}
59
60impl<V: InternalMessageValue> QueueStateFactory<V> for QueueStateImplFactory {
61 type QueueState = QueueStateStdImpl;
62
63 fn create(&self) -> Result<Self::QueueState> {
64 Ok(QueueStateStdImpl {
65 storage: self.storage.clone(),
66 })
67 }
68}
69
70pub trait QueueState<V: InternalMessageValue>: Send + Sync {
73 fn snapshot(&self) -> InternalQueueSnapshot;
75
76 fn iterator(
78 &self,
79 snapshot: &InternalQueueSnapshot,
80 receiver: ShardIdent,
81 partition: QueuePartitionIdx,
82 ranges: &[QueueShardRange],
83 ) -> Result<Box<dyn StateIterator<V>>>;
84
85 fn delete(&self, partition: QueuePartitionIdx, ranges: &[QueueShardRange]) -> Result<()>;
87
88 fn commit(
91 &self,
92 commit_pointers: FastHashMap<ShardIdent, (QueueKey, u32)>,
93 mc_block_id: &BlockId,
94 ) -> Result<()>;
95
96 fn load_diff_statistics(
98 &self,
99 partition: QueuePartitionIdx,
100 range: &QueueShardRange,
101 result: &mut AccountStatistics,
102 ) -> Result<()>;
103
104 fn load_separated_diff_statistics(
106 &self,
107 partitions: &FastHashSet<QueuePartitionIdx>,
108 range: &QueueShardRange,
109 ) -> Result<SeparatedStatisticsByPartitions>;
110
111 fn get_last_committed_mc_block_id(&self) -> Result<Option<BlockId>>;
114
115 fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, from: &QueueKey) -> u32;
117
118 fn get_diff_info(
120 &self,
121 shard_ident: &ShardIdent,
122 seqno: u32,
123 zone: DiffZone,
124 ) -> Result<Option<DiffInfo>>;
125
126 fn get_last_applied_seqno(&self, shard_ident: &ShardIdent) -> Result<Option<u32>>;
128
129 fn get_commit_pointers(&self) -> Result<FastHashMap<ShardIdent, CommitPointerValue>>;
131
132 fn prepare_diff(
134 &self,
135 block_id_short: &BlockIdShort,
136 statistics: &DiffStatistics,
137 hash: HashBytes,
138 diff: QueueDiffWithMessages<V>,
139 ) -> Result<InternalQueueTransaction>;
140
141 fn write_diff(
143 &self,
144 block_id_short: &BlockIdShort,
145 statistics: &DiffStatistics,
146 hash: HashBytes,
147 diff: QueueDiffWithMessages<V>,
148 ) -> Result<()> {
149 self.prepare_diff(block_id_short, statistics, hash, diff)?
150 .write()
151 }
152
153 fn clear_uncommitted(
154 &self,
155 partitions: &FastHashSet<QueuePartitionIdx>,
156 top_shards: &[ShardIdent],
157 ) -> Result<()>;
158}
159
160pub struct QueueStateStdImpl {
163 storage: InternalQueueStorage,
164}
165
166impl<V: InternalMessageValue> QueueState<V> for QueueStateStdImpl {
167 fn snapshot(&self) -> InternalQueueSnapshot {
168 let _histogram = HistogramGuard::begin("tycho_internal_queue_snapshot_time");
169 self.storage.make_snapshot()
170 }
171
172 fn iterator(
173 &self,
174 snapshot: &InternalQueueSnapshot,
175 receiver: ShardIdent,
176 partition: QueuePartitionIdx,
177 ranges: &[QueueShardRange],
178 ) -> Result<Box<dyn StateIterator<V>>> {
179 let mut shards_iters = Vec::new();
180
181 for range in ranges {
182 let from = ShardsInternalMessagesKey::new(partition, range.shard_ident, range.from);
183 let to = ShardsInternalMessagesKey::new(partition, range.shard_ident, range.to);
184 shards_iters.push((snapshot.iter_messages(from, to), range.shard_ident));
185 }
186
187 let iterator = StateIteratorImpl::new(shards_iters, receiver)?;
188 Ok(Box::new(iterator))
189 }
190
191 fn delete(&self, partition: QueuePartitionIdx, ranges: &[QueueShardRange]) -> Result<()> {
192 let mut queue_ranges = vec![];
193 for range in ranges {
194 queue_ranges.push(crate::storage::models::QueueRange {
195 partition,
196 shard_ident: range.shard_ident,
197 from: range.from,
198 to: range.to,
199 });
200 }
201
202 let tx = self.storage.begin_transaction();
203 tx.delete(&queue_ranges)?;
204 tx.write()
205 }
206
207 fn commit(
208 &self,
209 commit_pointers: FastHashMap<ShardIdent, (QueueKey, u32)>,
210 mc_block_id: &BlockId,
211 ) -> Result<()> {
212 let mut tx = self.storage.begin_transaction();
213 tx.commit_messages(commit_pointers)?;
214 tx.set_last_committed_mc_block_id(mc_block_id)?;
215 tx.write()?;
216 let db = self.storage.db();
217 db.rocksdb()
218 .flush_cf(&db.internal_message_commit_pointer.cf())?;
219 db.rocksdb().flush_cf(&db.internal_message_var.cf())?;
220 Ok(())
221 }
222
223 fn load_diff_statistics(
224 &self,
225 partition: QueuePartitionIdx,
226 range: &QueueShardRange,
227 result: &mut AccountStatistics,
228 ) -> Result<()> {
229 let _histogram = HistogramGuard::begin("tycho_internal_queue_statistics_load_time");
230 let snapshot = self.storage.make_snapshot();
231 snapshot.collect_stats_in_range(
232 &range.shard_ident,
233 partition,
234 &range.from,
235 &range.to,
236 result,
237 )
238 }
239
240 fn load_separated_diff_statistics(
241 &self,
242 partitions: &FastHashSet<QueuePartitionIdx>,
243 range: &QueueShardRange,
244 ) -> Result<SeparatedStatisticsByPartitions> {
245 let _histogram =
246 HistogramGuard::begin("tycho_internal_queue_separated_statistics_load_time");
247 let snapshot = self.storage.make_snapshot();
248
249 let result = snapshot.collect_separated_stats_in_range_for_partitions(
250 &range.shard_ident,
251 partitions,
252 &range.from,
253 &range.to,
254 )?;
255
256 Ok(result)
257 }
258
259 fn get_last_committed_mc_block_id(&self) -> Result<Option<BlockId>> {
260 let snapshot = self.storage.make_snapshot();
261 snapshot.get_last_committed_mc_block_id()
262 }
263
264 fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, from: &QueueKey) -> u32 {
265 let snapshot = self.storage.make_snapshot();
266 snapshot.calc_diffs_tail(&DiffTailKey {
267 shard_ident: *shard_ident,
268 max_message: *from,
269 })
270 }
271
272 fn get_diff_info(
273 &self,
274 shard_ident: &ShardIdent,
275 seqno: u32,
276 zone: DiffZone,
277 ) -> Result<Option<DiffInfo>> {
278 let snapshot = self.storage.make_snapshot();
279
280 let diff_info_bytes = snapshot.get_diff_info(&DiffInfoKey {
281 shard_ident: *shard_ident,
282 seqno,
283 })?;
284
285 let diff_info_bytes = match diff_info_bytes {
286 Some(bytes) => bytes,
287 None => return Ok(None),
288 };
289
290 let diff_info: DiffInfo = tl_proto::deserialize(&diff_info_bytes)?;
291
292 match zone {
293 DiffZone::Both => {}
294 DiffZone::Committed => {
295 let commit_pointers = snapshot.read_commit_pointers()?;
296 if let Some(commit_pointer) = commit_pointers.get(shard_ident) {
297 if commit_pointer.queue_key < diff_info.max_message {
299 return Ok(None);
300 }
301 } else {
302 return Ok(None);
303 }
304 }
305 DiffZone::Uncommitted => {
306 let commit_pointers = snapshot.read_commit_pointers()?;
307 if let Some(commit_pointer) = commit_pointers.get(shard_ident) {
308 if commit_pointer.queue_key >= diff_info.max_message {
310 return Ok(None);
311 }
312 }
313 }
314 }
315
316 Ok(Some(diff_info))
317 }
318
319 fn get_last_applied_seqno(&self, shard_ident: &ShardIdent) -> Result<Option<u32>> {
320 let snapshot = self.storage.make_snapshot();
321 snapshot.get_last_applied_diff_seqno(shard_ident)
322 }
323
324 fn get_commit_pointers(&self) -> Result<FastHashMap<ShardIdent, CommitPointerValue>> {
325 self.storage.make_snapshot().read_commit_pointers()
326 }
327
328 fn prepare_diff(
329 &self,
330 block_id_short: &BlockIdShort,
331 statistics: &DiffStatistics,
332 hash: HashBytes,
333 diff: QueueDiffWithMessages<V>,
334 ) -> Result<InternalQueueTransaction> {
335 let mut tx = self.storage.begin_transaction();
336
337 Self::add_messages(
338 &mut tx,
339 block_id_short.shard,
340 &diff.partition_router,
341 &diff.messages,
342 )?;
343 Self::add_statistics(&mut tx, statistics)?;
344 Self::add_diff_tail(&mut tx, block_id_short, statistics.max_message());
345
346 let src_router_partition = diff.partition_router.to_router_partitions_src();
347 let dst_router_partition = diff.partition_router.to_router_partitions_dst();
348
349 Self::add_diff_info(
350 &mut tx,
351 block_id_short,
352 statistics,
353 hash,
354 diff.processed_to,
355 src_router_partition,
356 dst_router_partition,
357 );
358
359 let labels = [("workchain", block_id_short.shard.workchain().to_string())];
360
361 let (batch_len, batch_size) = tx.size();
362
363 metrics::gauge!("tycho_internal_queue_write_diff_batch_len", &labels).set(batch_len as f64);
364 metrics::gauge!("tycho_internal_queue_write_diff_batch_size", &labels)
365 .set(batch_size as f64);
366 metrics::gauge!("tycho_internal_queue_write_diff_messages_count", &labels)
367 .set(diff.messages.len() as f64);
368
369 Ok(tx)
370 }
371
372 fn clear_uncommitted(
373 &self,
374 partitions: &FastHashSet<QueuePartitionIdx>,
375 top_shards: &[ShardIdent],
376 ) -> Result<()> {
377 let snapshot = self.storage.make_snapshot();
378 let pointers = snapshot.read_commit_pointers()?;
379 let tx = self.storage.begin_transaction();
380 tx.clear_uncommitted(partitions, &pointers, top_shards)?;
381 tx.write()
382 }
383}
384
385impl QueueStateStdImpl {
386 fn add_messages<V: InternalMessageValue>(
388 internal_queue_tx: &mut InternalQueueTransaction,
389 source: ShardIdent,
390 partition_router: &PartitionRouter,
391 messages: &BTreeMap<QueueKey, Arc<V>>,
392 ) -> Result<()> {
393 let _histogram = HistogramGuard::begin("tycho_internal_queue_apply_diff_add_messages_time");
394 let mut buffer = Vec::new();
395 for (internal_message_key, message) in messages {
396 let destination = message.destination();
397 let partition = partition_router.get_partition(Some(message.source()), destination);
398
399 buffer.clear();
400 message.serialize(&mut buffer);
401
402 internal_queue_tx.insert_message(
403 &ShardsInternalMessagesKey::new(partition, source, *internal_message_key),
404 destination,
405 &buffer,
406 );
407 }
408
409 Ok(())
410 }
411
412 fn add_statistics(
414 internal_queue_tx: &mut InternalQueueTransaction,
415 diff_statistics: &DiffStatistics,
416 ) -> Result<()> {
417 let _histogram =
418 HistogramGuard::begin("tycho_internal_queue_apply_diff_add_statistics_time");
419 let shard_ident = diff_statistics.shard_ident();
420 let max_message = diff_statistics.max_message();
421
422 for (partition, values) in diff_statistics.iter() {
423 for (addr, count) in values {
424 let Some(dest) = RouterAddr::from_int_addr(addr) else {
425 anyhow::bail!("cannot add VarAddr to router statistics");
426 };
427
428 let key = StatKey {
429 shard_ident: *shard_ident,
430 partition: *partition,
431 max_message: *max_message,
432 dest,
433 };
434
435 internal_queue_tx.insert_statistics(&key, *count);
436 }
437 metrics::counter!(
438 "tycho_internal_queue_apply_diff_add_statistics_accounts_count",
439 "partition" => partition.to_string(),
440 )
441 .increment(values.len() as u64);
442 }
443
444 Ok(())
445 }
446
447 fn add_diff_tail(
448 internal_queue_tx: &mut InternalQueueTransaction,
449 block_id_short: &BlockIdShort,
450 max_message: &QueueKey,
451 ) {
452 internal_queue_tx.insert_diff_tail(
453 &DiffTailKey {
454 shard_ident: block_id_short.shard,
455 max_message: *max_message,
456 },
457 block_id_short.seqno.to_le_bytes().as_slice(),
458 );
459 }
460
461 fn add_diff_info(
462 internal_queue_tx: &mut InternalQueueTransaction,
463 block_id_short: &BlockIdShort,
464 diff_statistics: &DiffStatistics,
465 hash: HashBytes,
466 processed_to: ProcessedTo,
467 router_partitions_src: RouterPartitions,
468 router_partitions_dst: RouterPartitions,
469 ) {
470 let shard_messages_count = diff_statistics.shards_messages_count();
471
472 let key = DiffInfoKey {
473 shard_ident: block_id_short.shard,
474 seqno: block_id_short.seqno,
475 };
476
477 let diff_info = DiffInfo {
478 min_message: *diff_statistics.min_message(),
479 max_message: *diff_statistics.max_message(),
480 shards_messages_count: shard_messages_count.clone(),
481 hash,
482 processed_to,
483 router_partitions_src,
484 router_partitions_dst,
485 seqno: block_id_short.seqno,
486 };
487
488 let serialized_diff_info = tl_proto::serialize(diff_info);
489
490 internal_queue_tx.insert_diff_info(&key, &serialized_diff_info);
491 }
492}