Skip to main content

tycho_collator/internal_queue/
queue.rs

1use std::marker::PhantomData;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{Result, anyhow, bail};
6use parking_lot::{ArcMutexGuard, ArcRwLockReadGuard, Mutex, RawMutex, RawRwLock, RwLock};
7use serde::{Deserialize, Serialize};
8use tycho_block_util::queue::{QueueKey, QueuePartitionIdx};
9use tycho_core::global_config::ZerostateId;
10use tycho_types::cell::HashBytes;
11use tycho_types::models::{BlockId, BlockIdShort, ShardIdent};
12use tycho_util::metrics::HistogramGuard;
13use tycho_util::{FastDashMap, FastHashMap, FastHashSet, serde_helpers};
14
15use super::gc::GcEndKey;
16use crate::internal_queue::gc::GcManager;
17use crate::internal_queue::state::state_iterator::StateIterator;
18use crate::internal_queue::state::storage::{
19    QueueState, QueueStateFactory, QueueStateImplFactory, QueueStateStdImpl,
20};
21use crate::internal_queue::types::diff::{DiffZone, QueueDiffWithMessages};
22use crate::internal_queue::types::message::InternalMessageValue;
23use crate::internal_queue::types::ranges::QueueShardRange;
24use crate::internal_queue::types::stats::{
25    AccountStatistics, DiffStatistics, SeparatedStatisticsByPartitions,
26};
27use crate::storage::models::DiffInfo;
28use crate::storage::transaction::InternalQueueTransaction;
29use crate::types::TopBlockIdUpdated;
30use crate::{internal_queue, tracing_targets};
31
32/// Pending queue diff transaction with lock guards.
33/// Locks are held until `write()` is called.
34pub struct PendingQueueDiff {
35    tx: InternalQueueTransaction,
36    _global_guard: ArcRwLockReadGuard<RawRwLock, ()>,
37    _shard_guard: ArcMutexGuard<RawMutex, ()>,
38}
39
40impl PendingQueueDiff {
41    pub fn write(self) -> anyhow::Result<()> {
42        self.tx.write()
43    }
44}
45
46#[derive(Debug, Serialize, Deserialize)]
47pub struct QueueConfig {
48    /// Default: 5 seconds.
49    #[serde(with = "serde_helpers::humantime")]
50    pub gc_interval: Duration,
51}
52
53impl Default for QueueConfig {
54    fn default() -> Self {
55        Self {
56            gc_interval: Duration::from_secs(5),
57        }
58    }
59}
60
61pub trait QueueFactory<V: InternalMessageValue> {
62    type Queue: Queue<V>;
63
64    fn create(&self) -> Result<Self::Queue>;
65}
66
67impl<F, R, V: InternalMessageValue> QueueFactory<V> for F
68where
69    F: Fn() -> Result<R>,
70    R: Queue<V>,
71{
72    type Queue = R;
73
74    fn create(&self) -> Result<Self::Queue> {
75        self()
76    }
77}
78
79pub struct QueueFactoryStdImpl {
80    pub zerostate_id: ZerostateId,
81    pub state: QueueStateImplFactory,
82    pub config: QueueConfig,
83}
84
85// TRAIT
86
87pub trait Queue<V>: Send
88where
89    V: InternalMessageValue + Send + Sync,
90{
91    /// Create iterator for specified shard and return it
92    fn iterator(
93        &self,
94        partition: QueuePartitionIdx,
95        ranges: &[QueueShardRange],
96        for_shard_id: ShardIdent,
97    ) -> Result<Box<dyn StateIterator<V>>>;
98
99    /// Prepare diff for applying to state. Returns transaction that should be committed later.
100    /// Returns None if diff is already applied (duplicate).
101    fn prepare_diff(
102        &self,
103        diff: QueueDiffWithMessages<V>,
104        block_id_short: BlockIdShort,
105        hash: &HashBytes,
106        statistics: DiffStatistics,
107        check_sequence: Option<DiffZone>,
108    ) -> Result<Option<PendingQueueDiff>>;
109
110    /// Add messages to state from `diff.messages` and store diff info (writes immediately)
111    fn apply_diff(
112        &self,
113        diff: QueueDiffWithMessages<V>,
114        block_id_short: BlockIdShort,
115        hash: &HashBytes,
116        statistics: DiffStatistics,
117        check_sequence: Option<DiffZone>,
118    ) -> Result<()>;
119
120    /// Commit diffs to the state and update GC
121    fn commit_diff(
122        &self,
123        mc_top_blocks: &[TopBlockIdUpdated],
124        partitions: &FastHashSet<QueuePartitionIdx>,
125    ) -> Result<()>;
126
127    /// Remove all data in uncommitted zone
128    fn clear_uncommitted_state(
129        &self,
130        partitions: &FastHashSet<QueuePartitionIdx>,
131        top_shards: &[ShardIdent],
132    ) -> Result<()>;
133
134    /// Get diffs tail len
135    fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, from: &QueueKey) -> u32;
136
137    /// Load statistics for the given range by accounts
138    fn load_diff_statistics(
139        &self,
140        partition: QueuePartitionIdx,
141        range: &QueueShardRange,
142        result: &mut AccountStatistics,
143    ) -> Result<()>;
144
145    /// Get diff info for the given block from committed and/or uncommitted zones
146    fn get_diff_info(
147        &self,
148        shard_ident: &ShardIdent,
149        seqno: u32,
150        zone: DiffZone,
151    ) -> Result<Option<DiffInfo>>;
152
153    /// Check if diff exists in state
154    fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> Result<bool>;
155
156    /// Get mc block id on which the queue was committed.
157    /// Returns None if queue was not committed
158    fn get_last_committed_mc_block_id(&self) -> Result<Option<BlockId>>;
159
160    /// Load separated diff statistics for the specified partitions and range
161    fn load_separated_diff_statistics(
162        &self,
163        partitions: &FastHashSet<QueuePartitionIdx>,
164        range: &QueueShardRange,
165    ) -> Result<SeparatedStatisticsByPartitions>;
166}
167
168impl<V: InternalMessageValue> QueueFactory<V> for QueueFactoryStdImpl {
169    type Queue = QueueImpl<QueueStateStdImpl, V>;
170
171    fn create(&self) -> Result<Self::Queue> {
172        let state = <QueueStateImplFactory as QueueStateFactory<V>>::create(&self.state)?;
173        let state = Arc::new(state);
174        let gc = GcManager::start::<V>(state.clone(), self.config.gc_interval);
175        Ok(QueueImpl {
176            state,
177            zerostate_id: self.zerostate_id,
178            gc,
179            global_lock: Arc::new(RwLock::new(())),
180            shard_locks: FastDashMap::default(),
181            _phantom_data: Default::default(),
182        })
183    }
184}
185
186pub struct QueueImpl<P, V>
187where
188    P: QueueState<V>,
189    V: InternalMessageValue,
190{
191    state: Arc<P>,
192    zerostate_id: ZerostateId,
193    gc: GcManager,
194    global_lock: Arc<RwLock<()>>,
195    shard_locks: FastDashMap<ShardIdent, Arc<Mutex<()>>>,
196    _phantom_data: PhantomData<V>,
197}
198
199impl<P, V> Queue<V> for QueueImpl<P, V>
200where
201    P: QueueState<V> + Send + Sync + 'static,
202    V: InternalMessageValue + Send + Sync,
203{
204    fn iterator(
205        &self,
206        partition: QueuePartitionIdx,
207        ranges: &[QueueShardRange],
208        for_shard_id: ShardIdent,
209    ) -> Result<Box<dyn StateIterator<V>>> {
210        let snapshot = self.state.snapshot();
211
212        let state_iterator = {
213            let _histogram =
214                HistogramGuard::begin("tycho_internal_queue_commited_state_iterator_create_time");
215            self.state
216                .iterator(&snapshot, for_shard_id, partition, ranges)?
217        };
218
219        Ok(state_iterator)
220    }
221
222    fn prepare_diff(
223        &self,
224        diff: QueueDiffWithMessages<V>,
225        block_id_short: BlockIdShort,
226        hash: &HashBytes,
227        statistics: DiffStatistics,
228        check_sequence: Option<DiffZone>,
229    ) -> Result<Option<PendingQueueDiff>> {
230        // Take global lock. Blocks commit and clear_uncommitted until write() is called.
231        let global_guard = self.global_lock.read_arc();
232
233        // Take specific shard lock for the duration of prepare
234        let shard_lock = self
235            .shard_locks
236            .entry(block_id_short.shard)
237            .or_default()
238            .clone();
239        let shard_guard = shard_lock.lock_arc();
240
241        // Check for duplicate diffs based on the block_id_short.seqno and hash
242        let shard_diff = internal_queue::queue::Queue::get_diff_info(
243            self,
244            &block_id_short.shard,
245            block_id_short.seqno,
246            DiffZone::Both,
247        )?;
248
249        // Check if the diff is already applied
250        // return if hash is the same
251        if let Some(shard_diff) = shard_diff {
252            // Check if the diff is already applied with different hash
253            if shard_diff.hash != *hash {
254                bail!(
255                    "Duplicate diff with different hash: block_id={}, existing diff_hash={}, new diff_hash={}",
256                    block_id_short,
257                    shard_diff.hash,
258                    hash,
259                )
260            }
261            return Ok(None);
262        }
263
264        if let Some(zone) = check_sequence {
265            let last_applied_seqno = self.state.get_last_applied_seqno(&block_id_short.shard)?;
266
267            if let Some(last_applied_seqno) = last_applied_seqno {
268                let last_applied_diff_opt = internal_queue::queue::Queue::get_diff_info(
269                    self,
270                    &block_id_short.shard,
271                    last_applied_seqno,
272                    zone,
273                )?;
274
275                if let Some(last_applied_diff) = last_applied_diff_opt {
276                    // Check if the diff is already applied
277                    if block_id_short.seqno <= last_applied_diff.seqno {
278                        return Ok(None);
279                    }
280
281                    // Check if the diff is sequential
282                    if block_id_short.seqno != last_applied_diff.seqno + 1 {
283                        bail!(
284                            "Diff seqno is not sequential new seqno {}. last_applied_seqno {}",
285                            block_id_short.seqno,
286                            last_applied_diff.seqno
287                        );
288                    }
289                }
290            }
291        }
292
293        // Check that applied diff is above the commit pointer
294        let commit_pointers = self.state.get_commit_pointers()?;
295        if let Some(commit_pointer) = commit_pointers.get(&block_id_short.shard)
296            && let Some(min_message) = diff.min_message()
297            && min_message <= &commit_pointer.queue_key
298        {
299            bail!(
300                "Diff min_message is less than commit_pointer: block_id={}, diff_min_message={}, commit_pointer={}",
301                block_id_short.seqno,
302                min_message,
303                commit_pointer.queue_key
304            );
305        }
306
307        let tx = self
308            .state
309            .prepare_diff(&block_id_short, &statistics, *hash, diff)?;
310
311        Ok(Some(PendingQueueDiff {
312            tx,
313            _global_guard: global_guard,
314            _shard_guard: shard_guard,
315        }))
316    }
317
318    fn apply_diff(
319        &self,
320        diff: QueueDiffWithMessages<V>,
321        block_id_short: BlockIdShort,
322        hash: &HashBytes,
323        statistics: DiffStatistics,
324        check_sequence: Option<DiffZone>,
325    ) -> Result<()> {
326        if let Some(tx) =
327            self.prepare_diff(diff, block_id_short, hash, statistics, check_sequence)?
328        {
329            let _histogram = HistogramGuard::begin("tycho_internal_queue_write_diff_time");
330            tx.write()?;
331        }
332        Ok(())
333    }
334
335    fn commit_diff(
336        &self,
337        mc_top_blocks: &[TopBlockIdUpdated],
338        partitions: &FastHashSet<QueuePartitionIdx>,
339    ) -> Result<()> {
340        // Take global lock
341        let _global_write_guard = self.global_lock.write();
342
343        let mc_block_id = mc_top_blocks
344            .iter()
345            .find_map(|item| {
346                item.block
347                    .block_id
348                    .is_masterchain()
349                    .then_some(&item.block.block_id)
350            })
351            .ok_or_else(|| anyhow!("Masterchain block not found in commit_diff"))?;
352
353        // check current commit pointer. If it is greater than committing diff then skip
354        let commit_pointers = self.state.get_commit_pointers()?;
355        if let Some(commit_pointer) = commit_pointers.get(&mc_block_id.shard)
356            && commit_pointer.seqno >= mc_block_id.seqno
357        {
358            tracing::debug!(
359                target: tracing_targets::MQ,
360                "Skip commit diff for mc block {}. Committed by next mc block with seqno {}",
361                mc_block_id, commit_pointer.seqno,
362            );
363            // Skip commit because it was already committed
364            return Ok(());
365        }
366
367        let mut gc_ranges = FastHashMap::default();
368
369        let mut commit_pointers = FastHashMap::default();
370
371        for item in mc_top_blocks {
372            let block_id = &item.block.block_id;
373
374            // Check if the diff is already applied
375            let diff = self
376                .state
377                .get_diff_info(&block_id.shard, block_id.seqno, DiffZone::Both)?;
378
379            let diff = match diff {
380                // If top shard block changed and diff not found, then bail
381                None if item.updated && item.block.ref_by_mc_seqno > self.zerostate_id.seqno => {
382                    bail!(
383                        "Diff not found for block_id: {} ref {} zerostate {}",
384                        block_id,
385                        item.block.ref_by_mc_seqno,
386                        self.zerostate_id.seqno
387                    )
388                }
389                // If top shard block not changed and diff not found, then continue
390                None => continue,
391                Some(diff) => diff,
392            };
393
394            // Check for duplicate shard in commit_diff
395            if commit_pointers
396                .insert(block_id.shard, (diff.max_message, diff.seqno))
397                .is_some()
398            {
399                bail!("Duplicate shard in commit_diff: {}", block_id.shard);
400            }
401
402            // Update gc ranges
403            for (shard_ident, processed_to_key) in diff.processed_to.iter() {
404                gc_ranges
405                    .entry(*shard_ident)
406                    .and_modify(|last: &mut GcEndKey| {
407                        if processed_to_key < &last.end_key {
408                            last.end_key = *processed_to_key;
409                            last.on_top_block_id = *block_id;
410                        }
411                    })
412                    .or_insert(GcEndKey {
413                        end_key: *processed_to_key,
414                        on_top_block_id: *block_id,
415                    });
416            }
417        }
418
419        tracing::debug!(target: tracing_targets::MQ,
420            ?commit_pointers,
421            "commit_diff",
422        );
423
424        // change the commit pointers position
425        self.state.commit(commit_pointers, mc_block_id)?;
426
427        // run GC for each found partition
428        for (shard, gc_end_key) in gc_ranges {
429            for partition in partitions {
430                self.gc.update_delete_until(*partition, shard, gc_end_key);
431            }
432        }
433
434        Ok(())
435    }
436
437    fn clear_uncommitted_state(
438        &self,
439        partitions: &FastHashSet<QueuePartitionIdx>,
440        top_shards: &[ShardIdent],
441    ) -> Result<()> {
442        // Take global lock
443        let _global_write_guard = self.global_lock.write();
444        self.state.clear_uncommitted(partitions, top_shards)
445    }
446
447    fn load_diff_statistics(
448        &self,
449        partition: QueuePartitionIdx,
450        range: &QueueShardRange,
451        result: &mut AccountStatistics,
452    ) -> Result<()> {
453        self.state.load_diff_statistics(partition, range, result)
454    }
455
456    fn load_separated_diff_statistics(
457        &self,
458        partitions: &FastHashSet<QueuePartitionIdx>,
459        range: &QueueShardRange,
460    ) -> Result<SeparatedStatisticsByPartitions> {
461        let result = self
462            .state
463            .load_separated_diff_statistics(partitions, range)?;
464
465        Ok(result)
466    }
467
468    fn get_diff_info(
469        &self,
470        shard_ident: &ShardIdent,
471        seqno: u32,
472        zone: DiffZone,
473    ) -> Result<Option<DiffInfo>> {
474        self.state.get_diff_info(shard_ident, seqno, zone)
475    }
476
477    fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, from: &QueueKey) -> u32 {
478        self.state.get_diffs_tail_len(shard_ident, from)
479    }
480
481    fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> Result<bool> {
482        Ok(internal_queue::queue::Queue::get_diff_info(
483            self,
484            &block_id_short.shard,
485            block_id_short.seqno,
486            DiffZone::Both,
487        )?
488        .is_some())
489    }
490
491    fn get_last_committed_mc_block_id(&self) -> Result<Option<BlockId>> {
492        self.state.get_last_committed_mc_block_id()
493    }
494}