1use anyhow::Result;
2use tracing::instrument;
3use tycho_block_util::queue::{QueueKey, QueuePartitionIdx};
4use tycho_types::cell::HashBytes;
5use tycho_types::models::{BlockId, BlockIdShort, ShardIdent};
6use tycho_util::metrics::HistogramGuard;
7use tycho_util::{FastHashMap, FastHashSet};
8
9use crate::internal_queue::iterator::{QueueIterator, QueueIteratorImpl};
10use crate::internal_queue::queue::{PendingQueueDiff, Queue, QueueImpl};
11use crate::internal_queue::state::states_iterators_manager::StatesIteratorsManager;
12use crate::internal_queue::state::storage::QueueStateStdImpl;
13use crate::internal_queue::types::diff::{DiffZone, QueueDiffWithMessages};
14use crate::internal_queue::types::message::InternalMessageValue;
15use crate::internal_queue::types::ranges::{QueueShardBoundedRange, QueueShardRange};
16use crate::internal_queue::types::router::PartitionRouter;
17use crate::internal_queue::types::stats::{
18 DiffStatistics, QueueStatistics, SeparatedStatisticsByPartitions,
19};
20use crate::storage::models::DiffInfo;
21use crate::storage::snapshot::AccountStatistics;
22use crate::tracing_targets;
23use crate::types::{DebugDisplayOpt, DebugIter, TopBlockIdUpdated};
24
25pub struct MessageQueueAdapterStdImpl<V: InternalMessageValue> {
26 queue: QueueImpl<QueueStateStdImpl, V>,
27}
28
29pub trait MessageQueueAdapter<V>: Send + Sync
30where
31 V: InternalMessageValue + Send + Sync,
32{
33 fn create_iterator(
35 &self,
36 for_shard_id: ShardIdent,
37 partition: QueuePartitionIdx,
38 ranges: Vec<QueueShardBoundedRange>,
39 ) -> Result<Box<dyn QueueIterator<V>>>;
40
41 fn get_statistics(
44 &self,
45 partitions: &FastHashSet<QueuePartitionIdx>,
46 ranges: &[QueueShardBoundedRange],
47 ) -> Result<QueueStatistics>;
48
49 fn prepare_diff(
52 &self,
53 diff: QueueDiffWithMessages<V>,
54 block_id_short: BlockIdShort,
55 diff_hash: &HashBytes,
56 statistics: DiffStatistics,
57 check_sequence: Option<DiffZone>,
58 ) -> Result<Option<PendingQueueDiff>>;
59
60 fn apply_diff(
62 &self,
63 diff: QueueDiffWithMessages<V>,
64 block_id_short: BlockIdShort,
65 diff_hash: &HashBytes,
66 statistics: DiffStatistics,
67 check_sequence: Option<DiffZone>,
68 ) -> Result<()>;
69
70 fn commit_diff(
72 &self,
73 mc_top_blocks: Vec<TopBlockIdUpdated>,
74 partitions: &FastHashSet<QueuePartitionIdx>,
75 ) -> Result<()>;
76
77 fn clear_uncommitted_state(&self, top_shards: &[ShardIdent]) -> Result<()>;
78
79 fn get_diff_info(
81 &self,
82 shard_ident: &ShardIdent,
83 seqno: u32,
84 zone: DiffZone,
85 ) -> Result<Option<DiffInfo>>;
86
87 fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> Result<bool>;
89
90 fn get_last_committed_mc_block_id(&self) -> Result<Option<BlockId>>;
93
94 fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, from: &QueueKey) -> u32;
97
98 fn load_separated_diff_statistics(
102 &self,
103 partitions: &FastHashSet<QueuePartitionIdx>,
104 range: &QueueShardBoundedRange,
105 ) -> Result<SeparatedStatisticsByPartitions>;
106
107 fn get_router_and_statistics(
109 &self,
110 block_id_short: &BlockIdShort,
111 diff_info: DiffInfo,
112 partition: QueuePartitionIdx,
113 ) -> Result<(PartitionRouter, DiffStatistics)>;
114}
115
116impl<V: InternalMessageValue> MessageQueueAdapterStdImpl<V> {
117 pub fn new(queue: QueueImpl<QueueStateStdImpl, V>) -> Self {
118 Self { queue }
119 }
120}
121
122impl<V: InternalMessageValue> MessageQueueAdapter<V> for MessageQueueAdapterStdImpl<V> {
123 #[instrument(skip_all, fields(%for_shard_id, partition, ?ranges))]
124 fn create_iterator(
125 &self,
126 for_shard_id: ShardIdent,
127 partition: QueuePartitionIdx,
128 ranges: Vec<QueueShardBoundedRange>,
129 ) -> Result<Box<dyn QueueIterator<V>>> {
130 let histogram = HistogramGuard::begin("tycho_internal_queue_create_iterator_time");
131
132 let ranges = ranges.into_iter().map(Into::into).collect::<Vec<_>>();
133
134 metrics::counter!("tycho_collator_queue_adapter_iterators_count").increment(1);
135
136 let state_iterator = self.queue.iterator(partition, &ranges, for_shard_id)?;
137 let states_iterators_manager = StatesIteratorsManager::new(state_iterator);
138 let iterator = QueueIteratorImpl::new(states_iterators_manager)?;
139
140 let elapsed = histogram.finish();
141 tracing::debug!(target: tracing_targets::MQ_ADAPTER,
142 elapsed = %humantime::format_duration(elapsed),
143 "create_iterator completed"
144 );
145
146 Ok(Box::new(iterator))
147 }
148
149 #[instrument(skip_all, fields(?partitions, ?ranges))]
150 fn get_statistics(
151 &self,
152 partitions: &FastHashSet<QueuePartitionIdx>,
153 ranges: &[QueueShardBoundedRange],
154 ) -> Result<QueueStatistics> {
155 let start_time = std::time::Instant::now();
156
157 let mut result = AccountStatistics::default();
158
159 let ranges: Vec<QueueShardRange> = ranges.iter().cloned().map(Into::into).collect();
160
161 for range in &ranges {
162 for partition in partitions {
163 self.queue
164 .load_diff_statistics(*partition, range, &mut result)?;
165 }
166 }
167
168 let stats = QueueStatistics::with_statistics(result);
169
170 let elapsed = start_time.elapsed();
171 tracing::debug!(target: tracing_targets::MQ_ADAPTER,
172 elapsed = %humantime::format_duration(elapsed),
173 "get_statistics completed"
174 );
175
176 Ok(stats)
177 }
178
179 #[instrument(skip_all, fields(%block_id_short, %diff_hash, ?check_sequence))]
180 fn prepare_diff(
181 &self,
182 diff: QueueDiffWithMessages<V>,
183 block_id_short: BlockIdShort,
184 diff_hash: &HashBytes,
185 statistics: DiffStatistics,
186 check_sequence: Option<DiffZone>,
187 ) -> Result<Option<PendingQueueDiff>> {
188 let start_time = std::time::Instant::now();
189
190 tracing::debug!(target: tracing_targets::MQ_ADAPTER,
191 "prepare_diff started"
192 );
193
194 let new_messages_len = diff.messages.len();
195 let min_message = diff.min_message().copied();
196 let max_message = diff.max_message().copied();
197 let processed_to = diff.processed_to.clone();
198
199 let tx =
200 self.queue
201 .prepare_diff(diff, block_id_short, diff_hash, statistics, check_sequence)?;
202
203 let elapsed = start_time.elapsed();
204 tracing::info!(target: tracing_targets::MQ_ADAPTER,
205 new_messages_len,
206 ?min_message, ?max_message,
207 ?processed_to,
208 elapsed = %humantime::format_duration(elapsed),
209 "prepare_diff completed"
210 );
211
212 Ok(tx)
213 }
214
215 #[instrument(skip_all, fields(%block_id_short, %diff_hash, ?check_sequence))]
216 fn apply_diff(
217 &self,
218 diff: QueueDiffWithMessages<V>,
219 block_id_short: BlockIdShort,
220 diff_hash: &HashBytes,
221 statistics: DiffStatistics,
222 check_sequence: Option<DiffZone>,
223 ) -> Result<()> {
224 let start_time = std::time::Instant::now();
225
226 tracing::debug!(target: tracing_targets::MQ_ADAPTER,
227 "apply_diff started"
228 );
229
230 let new_messages_len = diff.messages.len();
231 let min_message = diff.min_message().copied();
232 let max_message = diff.max_message().copied();
233 let processed_to = diff.processed_to.clone();
234
235 self.queue
236 .apply_diff(diff, block_id_short, diff_hash, statistics, check_sequence)?;
237
238 let elapsed = start_time.elapsed();
239 tracing::info!(target: tracing_targets::MQ_ADAPTER,
240 new_messages_len,
241 ?min_message, ?max_message,
242 ?processed_to,
243 elapsed = %humantime::format_duration(elapsed),
244 "apply_diff completed"
245 );
246
247 Ok(())
248 }
249
250 #[instrument(skip_all, fields(?partitions))]
251 fn commit_diff(
252 &self,
253 mc_top_blocks: Vec<TopBlockIdUpdated>,
254 partitions: &FastHashSet<QueuePartitionIdx>,
256 ) -> Result<()> {
257 let start_time = std::time::Instant::now();
258
259 self.queue.commit_diff(&mc_top_blocks, partitions)?;
260
261 let elapsed = start_time.elapsed();
262 tracing::info!(target: tracing_targets::MQ_ADAPTER,
263 mc_top_blocks = ?mc_top_blocks,
264 elapsed = %humantime::format_duration(elapsed),
265 "commit_diff completed"
266 );
267
268 Ok(())
269 }
270
271 #[instrument(skip_all)]
272 fn clear_uncommitted_state(&self, top_shards: &[ShardIdent]) -> Result<()> {
273 let start_time = std::time::Instant::now();
274
275 tracing::debug!(
276 target: tracing_targets::MQ_ADAPTER,
277 "clear_uncommitted_state started"
278 );
279
280 let partitions = FastHashSet::from_iter([QueuePartitionIdx(0), QueuePartitionIdx(1)]);
282
283 self.queue
284 .clear_uncommitted_state(&partitions, top_shards)?;
285
286 let elapsed = start_time.elapsed();
287 tracing::info!(target: tracing_targets::MQ_ADAPTER,
288 ?partitions,
289 elapsed = %humantime::format_duration(elapsed),
290 "clear_uncommitted_state completed"
291 );
292
293 Ok(())
294 }
295
296 #[instrument(skip_all, fields(%shard_ident, seqno, ?zone))]
297 fn get_diff_info(
298 &self,
299 shard_ident: &ShardIdent,
300 seqno: u32,
301 zone: DiffZone,
302 ) -> Result<Option<DiffInfo>> {
303 let start_time = std::time::Instant::now();
304 let diff = self.queue.get_diff_info(shard_ident, seqno, zone)?;
305 let elapsed = start_time.elapsed();
306 tracing::debug!(target: tracing_targets::MQ_ADAPTER,
307 elapsed = %humantime::format_duration(elapsed),
308 "get_diff_info completed"
309 );
310 Ok(diff)
311 }
312
313 #[instrument(skip_all, fields(%block_id_short))]
314 fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> Result<bool> {
315 let start_time = std::time::Instant::now();
316 let exists = self.queue.is_diff_exists(block_id_short)?;
317 let elapsed = start_time.elapsed();
318 tracing::debug!(target: tracing_targets::MQ_ADAPTER,
319 elapsed = %humantime::format_duration(elapsed),
320 exists,
321 "is_diff_exists completed"
322 );
323 Ok(exists)
324 }
325
326 #[instrument(skip_all)]
327 fn get_last_committed_mc_block_id(&self) -> Result<Option<BlockId>> {
328 let start_time = std::time::Instant::now();
329 let block_id = self.queue.get_last_committed_mc_block_id()?;
330 let elapsed = start_time.elapsed();
331 tracing::info!(target: tracing_targets::MQ_ADAPTER,
332 elapsed = %humantime::format_duration(elapsed),
333 block_id = ?DebugDisplayOpt(block_id),
334 "get_last_committed_mc_block_id completed"
335 );
336 Ok(block_id)
337 }
338
339 #[instrument(skip_all, fields(%shard_ident, %from))]
340 fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, from: &QueueKey) -> u32 {
341 let start_time = std::time::Instant::now();
342 let from = from.next_value();
343 let tail_len = self.queue.get_diffs_tail_len(shard_ident, &from);
344 let elapsed = start_time.elapsed();
345 tracing::info!(target: tracing_targets::MQ_ADAPTER,
346 elapsed = %humantime::format_duration(elapsed),
347 tail_len,
348 "get_diffs_tail_len completed"
349 );
350 tail_len
351 }
352
353 #[instrument(skip_all, fields(?partitions, ?range))]
354 fn load_separated_diff_statistics(
355 &self,
356 partitions: &FastHashSet<QueuePartitionIdx>,
357 range: &QueueShardBoundedRange,
358 ) -> Result<SeparatedStatisticsByPartitions> {
359 let start_time = std::time::Instant::now();
360
361 tracing::debug!(target: tracing_targets::MQ_ADAPTER,
362 "load_separated_diff_statistics started"
363 );
364
365 let res = self
366 .queue
367 .load_separated_diff_statistics(partitions, &range.clone().into())?;
368
369 let elapsed = start_time.elapsed();
370 tracing::info!(target: tracing_targets::MQ_ADAPTER,
371 stats_len = ?DebugIter(res.iter().map(|(par_id, map)| (*par_id, map.len()))),
372 elapsed = %humantime::format_duration(elapsed),
373 "load_separated_diff_statistics completed"
374 );
375
376 Ok(res)
377 }
378
379 #[instrument(skip_all, fields(%block_id_short, partition))]
380 fn get_router_and_statistics(
381 &self,
382 block_id_short: &BlockIdShort,
383 diff_info: DiffInfo,
384 partition: QueuePartitionIdx,
385 ) -> Result<(PartitionRouter, DiffStatistics)> {
386 let start_time = std::time::Instant::now();
387
388 let partition_router = PartitionRouter::with_partitions(
389 &diff_info.router_partitions_src,
390 &diff_info.router_partitions_dst,
391 );
392
393 let statistics_range = QueueShardRange {
394 shard_ident: block_id_short.shard,
395 from: diff_info.min_message,
396 to: diff_info.max_message.next_value(),
397 };
398
399 let mut statistics = AccountStatistics::default();
400
401 self.queue
402 .load_diff_statistics(partition, &statistics_range, &mut statistics)?;
403
404 let mut diff_statistics = FastHashMap::default();
405 diff_statistics.insert(partition, statistics);
406
407 let diff_statistics = DiffStatistics::new(
408 block_id_short.shard,
409 diff_info.min_message,
410 diff_info.max_message,
411 diff_statistics,
412 );
413
414 let elapsed = start_time.elapsed();
415
416 tracing::debug!(target: tracing_targets::MQ_ADAPTER,
417 elapsed = %humantime::format_duration(elapsed),
418 "get_router_and_statistics completed"
419 );
420
421 Ok((partition_router, diff_statistics))
422 }
423}