tycho-collator 0.3.6

A collator node.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
use std::marker::PhantomData;
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;

use anyhow::{Result, anyhow, bail};
use serde::{Deserialize, Serialize};
use tycho_block_util::queue::{QueueKey, QueuePartitionIdx};
use tycho_core::global_config::ZerostateId;
use tycho_types::cell::HashBytes;
use tycho_types::models::{BlockId, BlockIdShort, ShardIdent};
use tycho_util::metrics::HistogramGuard;
use tycho_util::{FastDashMap, FastHashMap, FastHashSet, serde_helpers};

use super::gc::GcEndKey;
use crate::internal_queue::gc::GcManager;
use crate::internal_queue::state::state_iterator::StateIterator;
use crate::internal_queue::state::storage::{
    QueueState, QueueStateFactory, QueueStateImplFactory, QueueStateStdImpl,
};
use crate::internal_queue::types::diff::{DiffZone, QueueDiffWithMessages};
use crate::internal_queue::types::message::InternalMessageValue;
use crate::internal_queue::types::ranges::QueueShardRange;
use crate::internal_queue::types::stats::{
    AccountStatistics, DiffStatistics, SeparatedStatisticsByPartitions,
};
use crate::storage::models::DiffInfo;
use crate::types::TopBlockIdUpdated;
use crate::{internal_queue, tracing_targets};

#[derive(Debug, Serialize, Deserialize)]
pub struct QueueConfig {
    /// Default: 5 seconds.
    #[serde(with = "serde_helpers::humantime")]
    pub gc_interval: Duration,
}

impl Default for QueueConfig {
    fn default() -> Self {
        Self {
            gc_interval: Duration::from_secs(5),
        }
    }
}

pub trait QueueFactory<V: InternalMessageValue> {
    type Queue: Queue<V>;

    fn create(&self) -> Result<Self::Queue>;
}

impl<F, R, V: InternalMessageValue> QueueFactory<V> for F
where
    F: Fn() -> Result<R>,
    R: Queue<V>,
{
    type Queue = R;

    fn create(&self) -> Result<Self::Queue> {
        self()
    }
}

pub struct QueueFactoryStdImpl {
    pub zerostate_id: ZerostateId,
    pub state: QueueStateImplFactory,
    pub config: QueueConfig,
}

// TRAIT

pub trait Queue<V>: Send
where
    V: InternalMessageValue + Send + Sync,
{
    /// Create iterator for specified shard and return it
    fn iterator(
        &self,
        partition: QueuePartitionIdx,
        ranges: &[QueueShardRange],
        for_shard_id: ShardIdent,
    ) -> Result<Box<dyn StateIterator<V>>>;

    /// Add messages to state from `diff.messages` and store diff info
    fn apply_diff(
        &self,
        diff: QueueDiffWithMessages<V>,
        block_id_short: BlockIdShort,
        hash: &HashBytes,
        statistics: DiffStatistics,
        check_sequence: Option<DiffZone>,
    ) -> Result<()>;

    /// Commit diffs to the state and update GC
    fn commit_diff(
        &self,
        mc_top_blocks: &[TopBlockIdUpdated],
        partitions: &FastHashSet<QueuePartitionIdx>,
    ) -> Result<()>;

    /// Remove all data in uncommitted zone
    fn clear_uncommitted_state(
        &self,
        partitions: &FastHashSet<QueuePartitionIdx>,
        top_shards: &[ShardIdent],
    ) -> Result<()>;

    /// Get diffs tail len
    fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, from: &QueueKey) -> u32;

    /// Load statistics for the given range by accounts
    fn load_diff_statistics(
        &self,
        partition: QueuePartitionIdx,
        range: &QueueShardRange,
        result: &mut AccountStatistics,
    ) -> Result<()>;

    /// Get diff info for the given block from committed and/or uncommitted zones
    fn get_diff_info(
        &self,
        shard_ident: &ShardIdent,
        seqno: u32,
        zone: DiffZone,
    ) -> Result<Option<DiffInfo>>;

    /// Check if diff exists in state
    fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> Result<bool>;

    /// Get mc block id on which the queue was committed.
    /// Returns None if queue was not committed
    fn get_last_committed_mc_block_id(&self) -> Result<Option<BlockId>>;

    /// Load separated diff statistics for the specified partitions and range
    fn load_separated_diff_statistics(
        &self,
        partitions: &FastHashSet<QueuePartitionIdx>,
        range: &QueueShardRange,
    ) -> Result<SeparatedStatisticsByPartitions>;
}

impl<V: InternalMessageValue> QueueFactory<V> for QueueFactoryStdImpl {
    type Queue = QueueImpl<QueueStateStdImpl, V>;

    fn create(&self) -> Result<Self::Queue> {
        let state = <QueueStateImplFactory as QueueStateFactory<V>>::create(&self.state)?;
        let state = Arc::new(state);
        let gc = GcManager::start::<V>(state.clone(), self.config.gc_interval);
        Ok(QueueImpl {
            state,
            zerostate_id: self.zerostate_id,
            gc,
            global_lock: RwLock::new(()),
            shard_locks: FastDashMap::default(),
            _phantom_data: Default::default(),
        })
    }
}

pub struct QueueImpl<P, V>
where
    P: QueueState<V>,
    V: InternalMessageValue,
{
    state: Arc<P>,
    zerostate_id: ZerostateId,
    gc: GcManager,
    global_lock: RwLock<()>,
    shard_locks: FastDashMap<ShardIdent, Arc<Mutex<()>>>,
    _phantom_data: PhantomData<V>,
}

impl<P, V> Queue<V> for QueueImpl<P, V>
where
    P: QueueState<V> + Send + Sync + 'static,
    V: InternalMessageValue + Send + Sync,
{
    fn iterator(
        &self,
        partition: QueuePartitionIdx,
        ranges: &[QueueShardRange],
        for_shard_id: ShardIdent,
    ) -> Result<Box<dyn StateIterator<V>>> {
        let snapshot = self.state.snapshot();

        let state_iterator = {
            let _histogram =
                HistogramGuard::begin("tycho_internal_queue_commited_state_iterator_create_time");
            self.state
                .iterator(&snapshot, for_shard_id, partition, ranges)?
        };

        Ok(state_iterator)
    }

    fn apply_diff(
        &self,
        diff: QueueDiffWithMessages<V>,
        block_id_short: BlockIdShort,
        hash: &HashBytes,
        statistics: DiffStatistics,
        check_sequence: Option<DiffZone>,
    ) -> Result<()> {
        // Take global lock. Lock commit and clear uncommitted state for execution
        let _global_read_guard = self.global_lock.read().unwrap_or_else(|e| e.into_inner());

        // Take specific shard lock
        let shard_lock = self.shard_locks.entry(block_id_short.shard).or_default();
        let _shard_guard = shard_lock.lock().unwrap_or_else(|e| e.into_inner());

        // Check for duplicate diffs based on the block_id_short.seqno and hash
        let shard_diff = internal_queue::queue::Queue::get_diff_info(
            self,
            &block_id_short.shard,
            block_id_short.seqno,
            DiffZone::Both,
        )?;

        // Check if the diff is already applied
        // return if hash is the same
        if let Some(shard_diff) = shard_diff {
            // Check if the diff is already applied with different hash
            if shard_diff.hash != *hash {
                bail!(
                    "Duplicate diff with different hash: block_id={}, existing diff_hash={}, new diff_hash={}",
                    block_id_short,
                    shard_diff.hash,
                    hash,
                )
            }
            return Ok(());
        }

        if let Some(zone) = check_sequence {
            let last_applied_seqno = self.state.get_last_applied_seqno(&block_id_short.shard)?;

            if let Some(last_applied_seqno) = last_applied_seqno {
                let last_applied_diff_opt = internal_queue::queue::Queue::get_diff_info(
                    self,
                    &block_id_short.shard,
                    last_applied_seqno,
                    zone,
                )?;

                if let Some(last_applied_diff) = last_applied_diff_opt {
                    // Check if the diff is already applied
                    if block_id_short.seqno <= last_applied_diff.seqno {
                        return Ok(());
                    }

                    // Check if the diff is sequential
                    if block_id_short.seqno != last_applied_diff.seqno + 1 {
                        bail!(
                            "Diff seqno is not sequential new seqno {}. last_applied_seqno {}",
                            block_id_short.seqno,
                            last_applied_diff.seqno
                        );
                    }
                }
            }
        }

        // Check that applied diff is above the commit pointer
        let commit_pointers = self.state.get_commit_pointers()?;
        if let Some(commit_pointer) = commit_pointers.get(&block_id_short.shard)
            && let Some(min_message) = diff.min_message()
            && min_message <= &commit_pointer.queue_key
        {
            bail!(
                "Diff min_message is less than commit_pointer: block_id={}, diff_min_message={}, commit_pointer={}",
                block_id_short.seqno,
                min_message,
                commit_pointer.queue_key
            );
        }

        self.state
            .write_diff(&block_id_short, &statistics, *hash, diff)?;

        Ok(())
    }

    fn commit_diff(
        &self,
        mc_top_blocks: &[TopBlockIdUpdated],
        partitions: &FastHashSet<QueuePartitionIdx>,
    ) -> Result<()> {
        // Take global lock
        let _global_write_guard = self.global_lock.write().unwrap_or_else(|e| e.into_inner());

        let mc_block_id = mc_top_blocks
            .iter()
            .find_map(|item| {
                item.block
                    .block_id
                    .is_masterchain()
                    .then_some(&item.block.block_id)
            })
            .ok_or_else(|| anyhow!("Masterchain block not found in commit_diff"))?;

        // check current commit pointer. If it is greater than committing diff then skip
        let commit_pointers = self.state.get_commit_pointers()?;
        if let Some(commit_pointer) = commit_pointers.get(&mc_block_id.shard)
            && commit_pointer.seqno >= mc_block_id.seqno
        {
            tracing::debug!(
                target: tracing_targets::MQ,
                "Skip commit diff for mc block {}. Committed by next mc block with seqno {}",
                mc_block_id, commit_pointer.seqno,
            );
            // Skip commit because it was already committed
            return Ok(());
        }

        let mut gc_ranges = FastHashMap::default();

        let mut commit_pointers = FastHashMap::default();

        for item in mc_top_blocks {
            let block_id = &item.block.block_id;

            // Check if the diff is already applied
            let diff = self
                .state
                .get_diff_info(&block_id.shard, block_id.seqno, DiffZone::Both)?;

            let diff = match diff {
                // If top shard block changed and diff not found, then bail
                None if item.updated && item.block.ref_by_mc_seqno > self.zerostate_id.seqno => {
                    bail!(
                        "Diff not found for block_id: {} ref {} zerostate {}",
                        block_id,
                        item.block.ref_by_mc_seqno,
                        self.zerostate_id.seqno
                    )
                }
                // If top shard block not changed and diff not found, then continue
                None => continue,
                Some(diff) => diff,
            };

            // Check for duplicate shard in commit_diff
            if commit_pointers
                .insert(block_id.shard, (diff.max_message, diff.seqno))
                .is_some()
            {
                bail!("Duplicate shard in commit_diff: {}", block_id.shard);
            }

            // Update gc ranges
            for (shard_ident, processed_to_key) in diff.processed_to.iter() {
                gc_ranges
                    .entry(*shard_ident)
                    .and_modify(|last: &mut GcEndKey| {
                        if processed_to_key < &last.end_key {
                            last.end_key = *processed_to_key;
                            last.on_top_block_id = *block_id;
                        }
                    })
                    .or_insert(GcEndKey {
                        end_key: *processed_to_key,
                        on_top_block_id: *block_id,
                    });
            }
        }

        tracing::debug!(target: tracing_targets::MQ,
            ?commit_pointers,
            "commit_diff",
        );

        // change the commit pointers position
        self.state.commit(commit_pointers, mc_block_id)?;

        // run GC for each found partition
        for (shard, gc_end_key) in gc_ranges {
            for partition in partitions {
                self.gc.update_delete_until(*partition, shard, gc_end_key);
            }
        }

        Ok(())
    }

    fn clear_uncommitted_state(
        &self,
        partitions: &FastHashSet<QueuePartitionIdx>,
        top_shards: &[ShardIdent],
    ) -> Result<()> {
        // Take global lock
        let _global_write_guard = self.global_lock.write().unwrap_or_else(|e| e.into_inner());
        self.state.clear_uncommitted(partitions, top_shards)
    }

    fn load_diff_statistics(
        &self,
        partition: QueuePartitionIdx,
        range: &QueueShardRange,
        result: &mut AccountStatistics,
    ) -> Result<()> {
        self.state.load_diff_statistics(partition, range, result)
    }

    fn load_separated_diff_statistics(
        &self,
        partitions: &FastHashSet<QueuePartitionIdx>,
        range: &QueueShardRange,
    ) -> Result<SeparatedStatisticsByPartitions> {
        let result = self
            .state
            .load_separated_diff_statistics(partitions, range)?;

        Ok(result)
    }

    fn get_diff_info(
        &self,
        shard_ident: &ShardIdent,
        seqno: u32,
        zone: DiffZone,
    ) -> Result<Option<DiffInfo>> {
        self.state.get_diff_info(shard_ident, seqno, zone)
    }

    fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, from: &QueueKey) -> u32 {
        self.state.get_diffs_tail_len(shard_ident, from)
    }

    fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> Result<bool> {
        Ok(internal_queue::queue::Queue::get_diff_info(
            self,
            &block_id_short.shard,
            block_id_short.seqno,
            DiffZone::Both,
        )?
        .is_some())
    }

    fn get_last_committed_mc_block_id(&self) -> Result<Option<BlockId>> {
        self.state.get_last_committed_mc_block_id()
    }
}