Skip to main content

tycho_collator/
queue_adapter.rs

1use anyhow::Result;
2use tracing::instrument;
3use tycho_block_util::queue::{QueueKey, QueuePartitionIdx};
4use tycho_types::cell::HashBytes;
5use tycho_types::models::{BlockId, BlockIdShort, ShardIdent};
6use tycho_util::metrics::HistogramGuard;
7use tycho_util::{FastHashMap, FastHashSet};
8
9use crate::internal_queue::iterator::{QueueIterator, QueueIteratorImpl};
10use crate::internal_queue::queue::{PendingQueueDiff, Queue, QueueImpl};
11use crate::internal_queue::state::states_iterators_manager::StatesIteratorsManager;
12use crate::internal_queue::state::storage::QueueStateStdImpl;
13use crate::internal_queue::types::diff::{DiffZone, QueueDiffWithMessages};
14use crate::internal_queue::types::message::InternalMessageValue;
15use crate::internal_queue::types::ranges::{QueueShardBoundedRange, QueueShardRange};
16use crate::internal_queue::types::router::PartitionRouter;
17use crate::internal_queue::types::stats::{
18    DiffStatistics, QueueStatistics, SeparatedStatisticsByPartitions,
19};
20use crate::storage::models::DiffInfo;
21use crate::storage::snapshot::AccountStatistics;
22use crate::tracing_targets;
23use crate::types::{DebugDisplayOpt, DebugIter, TopBlockIdUpdated};
24
25pub struct MessageQueueAdapterStdImpl<V: InternalMessageValue> {
26    queue: QueueImpl<QueueStateStdImpl, V>,
27}
28
29pub trait MessageQueueAdapter<V>: Send + Sync
30where
31    V: InternalMessageValue + Send + Sync,
32{
33    /// Create iterator for specified shard and return it
34    fn create_iterator(
35        &self,
36        for_shard_id: ShardIdent,
37        partition: QueuePartitionIdx,
38        ranges: Vec<QueueShardBoundedRange>,
39    ) -> Result<Box<dyn QueueIterator<V>>>;
40
41    /// Returns statistics for the specified ranges by partition
42    /// and source shards (equal to iterator ranges)
43    fn get_statistics(
44        &self,
45        partitions: &FastHashSet<QueuePartitionIdx>,
46        ranges: &[QueueShardBoundedRange],
47    ) -> Result<QueueStatistics>;
48
49    /// Prepare diff for applying to queue. Returns transaction that should be committed later.
50    /// Returns None if diff is already applied (duplicate).
51    fn prepare_diff(
52        &self,
53        diff: QueueDiffWithMessages<V>,
54        block_id_short: BlockIdShort,
55        diff_hash: &HashBytes,
56        statistics: DiffStatistics,
57        check_sequence: Option<DiffZone>,
58    ) -> Result<Option<PendingQueueDiff>>;
59
60    /// Apply diff by storing it to the queue uncommitted zone (waiting for the operation to complete)
61    fn apply_diff(
62        &self,
63        diff: QueueDiffWithMessages<V>,
64        block_id_short: BlockIdShort,
65        diff_hash: &HashBytes,
66        statistics: DiffStatistics,
67        check_sequence: Option<DiffZone>,
68    ) -> Result<()>;
69
70    /// Commit previously applied diffs, updating commit pointers (waiting for the operation to complete)
71    fn commit_diff(
72        &self,
73        mc_top_blocks: Vec<TopBlockIdUpdated>,
74        partitions: &FastHashSet<QueuePartitionIdx>,
75    ) -> Result<()>;
76
77    fn clear_uncommitted_state(&self, top_shards: &[ShardIdent]) -> Result<()>;
78
79    /// Get diff for the given block from committed and/or uncommitted zone
80    fn get_diff_info(
81        &self,
82        shard_ident: &ShardIdent,
83        seqno: u32,
84        zone: DiffZone,
85    ) -> Result<Option<DiffInfo>>;
86
87    /// Check if diff exists in state
88    fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> Result<bool>;
89
90    /// Get mc block id on which the queue was committed.
91    /// Returns None if queue was not committed
92    fn get_last_committed_mc_block_id(&self) -> Result<Option<BlockId>>;
93
94    /// Get diffs tail len.
95    /// `from` - start key for the tail. Diff with `max_message` == `from` will be excluded from the tail
96    fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, from: &QueueKey) -> u32;
97
98    /// Load separated diff statistics for the specified partitions and range.
99    /// `range.from` = diff with `max_message == range.from` will be excluded in statistics;
100    /// `range.to` = diff with `max_message == range.to` will be included in statistics
101    fn load_separated_diff_statistics(
102        &self,
103        partitions: &FastHashSet<QueuePartitionIdx>,
104        range: &QueueShardBoundedRange,
105    ) -> Result<SeparatedStatisticsByPartitions>;
106
107    /// Get partition router and statistics for the specified block
108    fn get_router_and_statistics(
109        &self,
110        block_id_short: &BlockIdShort,
111        diff_info: DiffInfo,
112        partition: QueuePartitionIdx,
113    ) -> Result<(PartitionRouter, DiffStatistics)>;
114}
115
116impl<V: InternalMessageValue> MessageQueueAdapterStdImpl<V> {
117    pub fn new(queue: QueueImpl<QueueStateStdImpl, V>) -> Self {
118        Self { queue }
119    }
120}
121
122impl<V: InternalMessageValue> MessageQueueAdapter<V> for MessageQueueAdapterStdImpl<V> {
123    #[instrument(skip_all, fields(%for_shard_id, partition, ?ranges))]
124    fn create_iterator(
125        &self,
126        for_shard_id: ShardIdent,
127        partition: QueuePartitionIdx,
128        ranges: Vec<QueueShardBoundedRange>,
129    ) -> Result<Box<dyn QueueIterator<V>>> {
130        let histogram = HistogramGuard::begin("tycho_internal_queue_create_iterator_time");
131
132        let ranges = ranges.into_iter().map(Into::into).collect::<Vec<_>>();
133
134        metrics::counter!("tycho_collator_queue_adapter_iterators_count").increment(1);
135
136        let state_iterator = self.queue.iterator(partition, &ranges, for_shard_id)?;
137        let states_iterators_manager = StatesIteratorsManager::new(state_iterator);
138        let iterator = QueueIteratorImpl::new(states_iterators_manager)?;
139
140        let elapsed = histogram.finish();
141        tracing::debug!(target: tracing_targets::MQ_ADAPTER,
142            elapsed = %humantime::format_duration(elapsed),
143            "create_iterator completed"
144        );
145
146        Ok(Box::new(iterator))
147    }
148
149    #[instrument(skip_all, fields(?partitions, ?ranges))]
150    fn get_statistics(
151        &self,
152        partitions: &FastHashSet<QueuePartitionIdx>,
153        ranges: &[QueueShardBoundedRange],
154    ) -> Result<QueueStatistics> {
155        let start_time = std::time::Instant::now();
156
157        let mut result = AccountStatistics::default();
158
159        let ranges: Vec<QueueShardRange> = ranges.iter().cloned().map(Into::into).collect();
160
161        for range in &ranges {
162            for partition in partitions {
163                self.queue
164                    .load_diff_statistics(*partition, range, &mut result)?;
165            }
166        }
167
168        let stats = QueueStatistics::with_statistics(result);
169
170        let elapsed = start_time.elapsed();
171        tracing::debug!(target: tracing_targets::MQ_ADAPTER,
172            elapsed = %humantime::format_duration(elapsed),
173            "get_statistics completed"
174        );
175
176        Ok(stats)
177    }
178
179    #[instrument(skip_all, fields(%block_id_short, %diff_hash, ?check_sequence))]
180    fn prepare_diff(
181        &self,
182        diff: QueueDiffWithMessages<V>,
183        block_id_short: BlockIdShort,
184        diff_hash: &HashBytes,
185        statistics: DiffStatistics,
186        check_sequence: Option<DiffZone>,
187    ) -> Result<Option<PendingQueueDiff>> {
188        let start_time = std::time::Instant::now();
189
190        tracing::debug!(target: tracing_targets::MQ_ADAPTER,
191            "prepare_diff started"
192        );
193
194        let new_messages_len = diff.messages.len();
195        let min_message = diff.min_message().copied();
196        let max_message = diff.max_message().copied();
197        let processed_to = diff.processed_to.clone();
198
199        let tx =
200            self.queue
201                .prepare_diff(diff, block_id_short, diff_hash, statistics, check_sequence)?;
202
203        let elapsed = start_time.elapsed();
204        tracing::info!(target: tracing_targets::MQ_ADAPTER,
205            new_messages_len,
206            ?min_message, ?max_message,
207            ?processed_to,
208            elapsed = %humantime::format_duration(elapsed),
209            "prepare_diff completed"
210        );
211
212        Ok(tx)
213    }
214
215    #[instrument(skip_all, fields(%block_id_short, %diff_hash, ?check_sequence))]
216    fn apply_diff(
217        &self,
218        diff: QueueDiffWithMessages<V>,
219        block_id_short: BlockIdShort,
220        diff_hash: &HashBytes,
221        statistics: DiffStatistics,
222        check_sequence: Option<DiffZone>,
223    ) -> Result<()> {
224        let start_time = std::time::Instant::now();
225
226        tracing::debug!(target: tracing_targets::MQ_ADAPTER,
227            "apply_diff started"
228        );
229
230        let new_messages_len = diff.messages.len();
231        let min_message = diff.min_message().copied();
232        let max_message = diff.max_message().copied();
233        let processed_to = diff.processed_to.clone();
234
235        self.queue
236            .apply_diff(diff, block_id_short, diff_hash, statistics, check_sequence)?;
237
238        let elapsed = start_time.elapsed();
239        tracing::info!(target: tracing_targets::MQ_ADAPTER,
240            new_messages_len,
241            ?min_message, ?max_message,
242            ?processed_to,
243            elapsed = %humantime::format_duration(elapsed),
244            "apply_diff completed"
245        );
246
247        Ok(())
248    }
249
250    #[instrument(skip_all, fields(?partitions))]
251    fn commit_diff(
252        &self,
253        mc_top_blocks: Vec<TopBlockIdUpdated>,
254        // TODO: get partitions from queue state
255        partitions: &FastHashSet<QueuePartitionIdx>,
256    ) -> Result<()> {
257        let start_time = std::time::Instant::now();
258
259        self.queue.commit_diff(&mc_top_blocks, partitions)?;
260
261        let elapsed = start_time.elapsed();
262        tracing::info!(target: tracing_targets::MQ_ADAPTER,
263            mc_top_blocks = ?mc_top_blocks,
264            elapsed = %humantime::format_duration(elapsed),
265            "commit_diff completed"
266        );
267
268        Ok(())
269    }
270
271    #[instrument(skip_all)]
272    fn clear_uncommitted_state(&self, top_shards: &[ShardIdent]) -> Result<()> {
273        let start_time = std::time::Instant::now();
274
275        tracing::debug!(
276            target: tracing_targets::MQ_ADAPTER,
277            "clear_uncommitted_state started"
278        );
279
280        // TODO: get partitions from queue state
281        let partitions = FastHashSet::from_iter([QueuePartitionIdx(0), QueuePartitionIdx(1)]);
282
283        self.queue
284            .clear_uncommitted_state(&partitions, top_shards)?;
285
286        let elapsed = start_time.elapsed();
287        tracing::info!(target: tracing_targets::MQ_ADAPTER,
288            ?partitions,
289            elapsed = %humantime::format_duration(elapsed),
290            "clear_uncommitted_state completed"
291        );
292
293        Ok(())
294    }
295
296    #[instrument(skip_all, fields(%shard_ident, seqno, ?zone))]
297    fn get_diff_info(
298        &self,
299        shard_ident: &ShardIdent,
300        seqno: u32,
301        zone: DiffZone,
302    ) -> Result<Option<DiffInfo>> {
303        let start_time = std::time::Instant::now();
304        let diff = self.queue.get_diff_info(shard_ident, seqno, zone)?;
305        let elapsed = start_time.elapsed();
306        tracing::debug!(target: tracing_targets::MQ_ADAPTER,
307            elapsed = %humantime::format_duration(elapsed),
308            "get_diff_info completed"
309        );
310        Ok(diff)
311    }
312
313    #[instrument(skip_all, fields(%block_id_short))]
314    fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> Result<bool> {
315        let start_time = std::time::Instant::now();
316        let exists = self.queue.is_diff_exists(block_id_short)?;
317        let elapsed = start_time.elapsed();
318        tracing::debug!(target: tracing_targets::MQ_ADAPTER,
319            elapsed = %humantime::format_duration(elapsed),
320            exists,
321            "is_diff_exists completed"
322        );
323        Ok(exists)
324    }
325
326    #[instrument(skip_all)]
327    fn get_last_committed_mc_block_id(&self) -> Result<Option<BlockId>> {
328        let start_time = std::time::Instant::now();
329        let block_id = self.queue.get_last_committed_mc_block_id()?;
330        let elapsed = start_time.elapsed();
331        tracing::info!(target: tracing_targets::MQ_ADAPTER,
332            elapsed = %humantime::format_duration(elapsed),
333            block_id = ?DebugDisplayOpt(block_id),
334            "get_last_committed_mc_block_id completed"
335        );
336        Ok(block_id)
337    }
338
339    #[instrument(skip_all, fields(%shard_ident, %from))]
340    fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, from: &QueueKey) -> u32 {
341        let start_time = std::time::Instant::now();
342        let from = from.next_value();
343        let tail_len = self.queue.get_diffs_tail_len(shard_ident, &from);
344        let elapsed = start_time.elapsed();
345        tracing::info!(target: tracing_targets::MQ_ADAPTER,
346            elapsed = %humantime::format_duration(elapsed),
347            tail_len,
348            "get_diffs_tail_len completed"
349        );
350        tail_len
351    }
352
353    #[instrument(skip_all, fields(?partitions, ?range))]
354    fn load_separated_diff_statistics(
355        &self,
356        partitions: &FastHashSet<QueuePartitionIdx>,
357        range: &QueueShardBoundedRange,
358    ) -> Result<SeparatedStatisticsByPartitions> {
359        let start_time = std::time::Instant::now();
360
361        tracing::debug!(target: tracing_targets::MQ_ADAPTER,
362            "load_separated_diff_statistics started"
363        );
364
365        let res = self
366            .queue
367            .load_separated_diff_statistics(partitions, &range.clone().into())?;
368
369        let elapsed = start_time.elapsed();
370        tracing::info!(target: tracing_targets::MQ_ADAPTER,
371            stats_len = ?DebugIter(res.iter().map(|(par_id, map)| (*par_id, map.len()))),
372            elapsed = %humantime::format_duration(elapsed),
373            "load_separated_diff_statistics completed"
374        );
375
376        Ok(res)
377    }
378
379    #[instrument(skip_all, fields(%block_id_short, partition))]
380    fn get_router_and_statistics(
381        &self,
382        block_id_short: &BlockIdShort,
383        diff_info: DiffInfo,
384        partition: QueuePartitionIdx,
385    ) -> Result<(PartitionRouter, DiffStatistics)> {
386        let start_time = std::time::Instant::now();
387
388        let partition_router = PartitionRouter::with_partitions(
389            &diff_info.router_partitions_src,
390            &diff_info.router_partitions_dst,
391        );
392
393        let statistics_range = QueueShardRange {
394            shard_ident: block_id_short.shard,
395            from: diff_info.min_message,
396            to: diff_info.max_message.next_value(),
397        };
398
399        let mut statistics = AccountStatistics::default();
400
401        self.queue
402            .load_diff_statistics(partition, &statistics_range, &mut statistics)?;
403
404        let mut diff_statistics = FastHashMap::default();
405        diff_statistics.insert(partition, statistics);
406
407        let diff_statistics = DiffStatistics::new(
408            block_id_short.shard,
409            diff_info.min_message,
410            diff_info.max_message,
411            diff_statistics,
412        );
413
414        let elapsed = start_time.elapsed();
415
416        tracing::debug!(target: tracing_targets::MQ_ADAPTER,
417            elapsed = %humantime::format_duration(elapsed),
418            "get_router_and_statistics completed"
419        );
420
421        Ok((partition_router, diff_statistics))
422    }
423}