Skip to main content

tycho_collator/internal_queue/state/
storage.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use anyhow::Result;
5use tycho_block_util::queue::{QueueKey, QueuePartitionIdx, RouterAddr, RouterPartitions};
6use tycho_storage::StorageContext;
7use tycho_types::cell::HashBytes;
8use tycho_types::models::{BlockId, BlockIdShort, ShardIdent};
9use tycho_util::metrics::HistogramGuard;
10use tycho_util::{FastHashMap, FastHashSet};
11
12use crate::internal_queue::state::state_iterator::{StateIterator, StateIteratorImpl};
13use crate::internal_queue::types::diff::{DiffZone, QueueDiffWithMessages};
14use crate::internal_queue::types::message::InternalMessageValue;
15use crate::internal_queue::types::ranges::QueueShardRange;
16use crate::internal_queue::types::router::PartitionRouter;
17use crate::internal_queue::types::stats::{
18    AccountStatistics, DiffStatistics, SeparatedStatisticsByPartitions,
19};
20use crate::storage::InternalQueueStorage;
21use crate::storage::models::{
22    CommitPointerValue, DiffInfo, DiffInfoKey, DiffTailKey, ShardsInternalMessagesKey, StatKey,
23};
24use crate::storage::snapshot::InternalQueueSnapshot;
25use crate::storage::transaction::InternalQueueTransaction;
26use crate::types::ProcessedTo;
27
28// FACTORY
29
30pub trait QueueStateFactory<V: InternalMessageValue> {
31    type QueueState: QueueState<V>;
32
33    fn create(&self) -> Result<Self::QueueState>;
34}
35
36impl<F, R, V> QueueStateFactory<V> for F
37where
38    F: Fn() -> Result<R>,
39    R: QueueState<V>,
40    V: InternalMessageValue,
41{
42    type QueueState = R;
43
44    fn create(&self) -> Result<Self::QueueState> {
45        self()
46    }
47}
48
49pub struct QueueStateImplFactory {
50    pub storage: InternalQueueStorage,
51}
52
53impl QueueStateImplFactory {
54    pub fn new(ctx: StorageContext) -> Result<Self> {
55        let storage = InternalQueueStorage::open(ctx)?;
56        Ok(Self { storage })
57    }
58}
59
60impl<V: InternalMessageValue> QueueStateFactory<V> for QueueStateImplFactory {
61    type QueueState = QueueStateStdImpl;
62
63    fn create(&self) -> Result<Self::QueueState> {
64        Ok(QueueStateStdImpl {
65            storage: self.storage.clone(),
66        })
67    }
68}
69
70// TRAIT
71
72pub trait QueueState<V: InternalMessageValue>: Send + Sync {
73    /// Create snapshot
74    fn snapshot(&self) -> InternalQueueSnapshot;
75
76    /// Create iterator for given partition and ranges
77    fn iterator(
78        &self,
79        snapshot: &InternalQueueSnapshot,
80        receiver: ShardIdent,
81        partition: QueuePartitionIdx,
82        ranges: &[QueueShardRange],
83    ) -> Result<Box<dyn StateIterator<V>>>;
84
85    /// Delete messages in given partition and ranges
86    fn delete(&self, partition: QueuePartitionIdx, ranges: &[QueueShardRange]) -> Result<()>;
87
88    /// Set commit pointers and last committed mc block id.
89    /// ATTENTION! Overrides old value without checks. Should validate the new value in the calling code.
90    fn commit(
91        &self,
92        commit_pointers: FastHashMap<ShardIdent, (QueueKey, u32)>,
93        mc_block_id: &BlockId,
94    ) -> Result<()>;
95
96    /// Load statistics for given partition and ranges
97    fn load_diff_statistics(
98        &self,
99        partition: QueuePartitionIdx,
100        range: &QueueShardRange,
101        result: &mut AccountStatistics,
102    ) -> Result<()>;
103
104    /// Load separated diff statistics for the specified partitions and range
105    fn load_separated_diff_statistics(
106        &self,
107        partitions: &FastHashSet<QueuePartitionIdx>,
108        range: &QueueShardRange,
109    ) -> Result<SeparatedStatisticsByPartitions>;
110
111    /// Get mc block id on which the queue was committed.
112    /// Returns None if queue was not committed
113    fn get_last_committed_mc_block_id(&self) -> Result<Option<BlockId>>;
114
115    /// Get diffs tail len from uncommitted state and committed state
116    fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, from: &QueueKey) -> u32;
117
118    /// Get diff info by diff seqno
119    fn get_diff_info(
120        &self,
121        shard_ident: &ShardIdent,
122        seqno: u32,
123        zone: DiffZone,
124    ) -> Result<Option<DiffInfo>>;
125
126    /// Get last applied block seqno by shard ident from committed and uncommited zone
127    fn get_last_applied_seqno(&self, shard_ident: &ShardIdent) -> Result<Option<u32>>;
128
129    /// Get commit pointers
130    fn get_commit_pointers(&self) -> Result<FastHashMap<ShardIdent, CommitPointerValue>>;
131
132    /// Prepare diff for writing to storage. Returns transaction that should be committed later.
133    fn prepare_diff(
134        &self,
135        block_id_short: &BlockIdShort,
136        statistics: &DiffStatistics,
137        hash: HashBytes,
138        diff: QueueDiffWithMessages<V>,
139    ) -> Result<InternalQueueTransaction>;
140
141    /// Write diff to storage immediately.
142    fn write_diff(
143        &self,
144        block_id_short: &BlockIdShort,
145        statistics: &DiffStatistics,
146        hash: HashBytes,
147        diff: QueueDiffWithMessages<V>,
148    ) -> Result<()> {
149        self.prepare_diff(block_id_short, statistics, hash, diff)?
150            .write()
151    }
152
153    fn clear_uncommitted(
154        &self,
155        partitions: &FastHashSet<QueuePartitionIdx>,
156        top_shards: &[ShardIdent],
157    ) -> Result<()>;
158}
159
160// IMPLEMENTATION
161
162pub struct QueueStateStdImpl {
163    storage: InternalQueueStorage,
164}
165
166impl<V: InternalMessageValue> QueueState<V> for QueueStateStdImpl {
167    fn snapshot(&self) -> InternalQueueSnapshot {
168        let _histogram = HistogramGuard::begin("tycho_internal_queue_snapshot_time");
169        self.storage.make_snapshot()
170    }
171
172    fn iterator(
173        &self,
174        snapshot: &InternalQueueSnapshot,
175        receiver: ShardIdent,
176        partition: QueuePartitionIdx,
177        ranges: &[QueueShardRange],
178    ) -> Result<Box<dyn StateIterator<V>>> {
179        let mut shards_iters = Vec::new();
180
181        for range in ranges {
182            let from = ShardsInternalMessagesKey::new(partition, range.shard_ident, range.from);
183            let to = ShardsInternalMessagesKey::new(partition, range.shard_ident, range.to);
184            shards_iters.push((snapshot.iter_messages(from, to), range.shard_ident));
185        }
186
187        let iterator = StateIteratorImpl::new(shards_iters, receiver)?;
188        Ok(Box::new(iterator))
189    }
190
191    fn delete(&self, partition: QueuePartitionIdx, ranges: &[QueueShardRange]) -> Result<()> {
192        let mut queue_ranges = vec![];
193        for range in ranges {
194            queue_ranges.push(crate::storage::models::QueueRange {
195                partition,
196                shard_ident: range.shard_ident,
197                from: range.from,
198                to: range.to,
199            });
200        }
201
202        let tx = self.storage.begin_transaction();
203        tx.delete(&queue_ranges)?;
204        tx.write()
205    }
206
207    fn commit(
208        &self,
209        commit_pointers: FastHashMap<ShardIdent, (QueueKey, u32)>,
210        mc_block_id: &BlockId,
211    ) -> Result<()> {
212        let mut tx = self.storage.begin_transaction();
213        tx.commit_messages(commit_pointers)?;
214        tx.set_last_committed_mc_block_id(mc_block_id)?;
215        tx.write()?;
216        let db = self.storage.db();
217        db.rocksdb()
218            .flush_cf(&db.internal_message_commit_pointer.cf())?;
219        db.rocksdb().flush_cf(&db.internal_message_var.cf())?;
220        Ok(())
221    }
222
223    fn load_diff_statistics(
224        &self,
225        partition: QueuePartitionIdx,
226        range: &QueueShardRange,
227        result: &mut AccountStatistics,
228    ) -> Result<()> {
229        let _histogram = HistogramGuard::begin("tycho_internal_queue_statistics_load_time");
230        let snapshot = self.storage.make_snapshot();
231        snapshot.collect_stats_in_range(
232            &range.shard_ident,
233            partition,
234            &range.from,
235            &range.to,
236            result,
237        )
238    }
239
240    fn load_separated_diff_statistics(
241        &self,
242        partitions: &FastHashSet<QueuePartitionIdx>,
243        range: &QueueShardRange,
244    ) -> Result<SeparatedStatisticsByPartitions> {
245        let _histogram =
246            HistogramGuard::begin("tycho_internal_queue_separated_statistics_load_time");
247        let snapshot = self.storage.make_snapshot();
248
249        let result = snapshot.collect_separated_stats_in_range_for_partitions(
250            &range.shard_ident,
251            partitions,
252            &range.from,
253            &range.to,
254        )?;
255
256        Ok(result)
257    }
258
259    fn get_last_committed_mc_block_id(&self) -> Result<Option<BlockId>> {
260        let snapshot = self.storage.make_snapshot();
261        snapshot.get_last_committed_mc_block_id()
262    }
263
264    fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, from: &QueueKey) -> u32 {
265        let snapshot = self.storage.make_snapshot();
266        snapshot.calc_diffs_tail(&DiffTailKey {
267            shard_ident: *shard_ident,
268            max_message: *from,
269        })
270    }
271
272    fn get_diff_info(
273        &self,
274        shard_ident: &ShardIdent,
275        seqno: u32,
276        zone: DiffZone,
277    ) -> Result<Option<DiffInfo>> {
278        let snapshot = self.storage.make_snapshot();
279
280        let diff_info_bytes = snapshot.get_diff_info(&DiffInfoKey {
281            shard_ident: *shard_ident,
282            seqno,
283        })?;
284
285        let diff_info_bytes = match diff_info_bytes {
286            Some(bytes) => bytes,
287            None => return Ok(None),
288        };
289
290        let diff_info: DiffInfo = tl_proto::deserialize(&diff_info_bytes)?;
291
292        match zone {
293            DiffZone::Both => {}
294            DiffZone::Committed => {
295                let commit_pointers = snapshot.read_commit_pointers()?;
296                if let Some(commit_pointer) = commit_pointers.get(shard_ident) {
297                    // if true then diff is in uncommitted zone
298                    if commit_pointer.queue_key < diff_info.max_message {
299                        return Ok(None);
300                    }
301                } else {
302                    return Ok(None);
303                }
304            }
305            DiffZone::Uncommitted => {
306                let commit_pointers = snapshot.read_commit_pointers()?;
307                if let Some(commit_pointer) = commit_pointers.get(shard_ident) {
308                    // if true then diff is in committed zone
309                    if commit_pointer.queue_key >= diff_info.max_message {
310                        return Ok(None);
311                    }
312                }
313            }
314        }
315
316        Ok(Some(diff_info))
317    }
318
319    fn get_last_applied_seqno(&self, shard_ident: &ShardIdent) -> Result<Option<u32>> {
320        let snapshot = self.storage.make_snapshot();
321        snapshot.get_last_applied_diff_seqno(shard_ident)
322    }
323
324    fn get_commit_pointers(&self) -> Result<FastHashMap<ShardIdent, CommitPointerValue>> {
325        self.storage.make_snapshot().read_commit_pointers()
326    }
327
328    fn prepare_diff(
329        &self,
330        block_id_short: &BlockIdShort,
331        statistics: &DiffStatistics,
332        hash: HashBytes,
333        diff: QueueDiffWithMessages<V>,
334    ) -> Result<InternalQueueTransaction> {
335        let mut tx = self.storage.begin_transaction();
336
337        Self::add_messages(
338            &mut tx,
339            block_id_short.shard,
340            &diff.partition_router,
341            &diff.messages,
342        )?;
343        Self::add_statistics(&mut tx, statistics)?;
344        Self::add_diff_tail(&mut tx, block_id_short, statistics.max_message());
345
346        let src_router_partition = diff.partition_router.to_router_partitions_src();
347        let dst_router_partition = diff.partition_router.to_router_partitions_dst();
348
349        Self::add_diff_info(
350            &mut tx,
351            block_id_short,
352            statistics,
353            hash,
354            diff.processed_to,
355            src_router_partition,
356            dst_router_partition,
357        );
358
359        let labels = [("workchain", block_id_short.shard.workchain().to_string())];
360
361        let (batch_len, batch_size) = tx.size();
362
363        metrics::gauge!("tycho_internal_queue_write_diff_batch_len", &labels).set(batch_len as f64);
364        metrics::gauge!("tycho_internal_queue_write_diff_batch_size", &labels)
365            .set(batch_size as f64);
366        metrics::gauge!("tycho_internal_queue_write_diff_messages_count", &labels)
367            .set(diff.messages.len() as f64);
368
369        Ok(tx)
370    }
371
372    fn clear_uncommitted(
373        &self,
374        partitions: &FastHashSet<QueuePartitionIdx>,
375        top_shards: &[ShardIdent],
376    ) -> Result<()> {
377        let snapshot = self.storage.make_snapshot();
378        let pointers = snapshot.read_commit_pointers()?;
379        let tx = self.storage.begin_transaction();
380        tx.clear_uncommitted(partitions, &pointers, top_shards)?;
381        tx.write()
382    }
383}
384
385impl QueueStateStdImpl {
386    /// write new messages to storage
387    fn add_messages<V: InternalMessageValue>(
388        internal_queue_tx: &mut InternalQueueTransaction,
389        source: ShardIdent,
390        partition_router: &PartitionRouter,
391        messages: &BTreeMap<QueueKey, Arc<V>>,
392    ) -> Result<()> {
393        let _histogram = HistogramGuard::begin("tycho_internal_queue_apply_diff_add_messages_time");
394        let mut buffer = Vec::new();
395        for (internal_message_key, message) in messages {
396            let destination = message.destination();
397            let partition = partition_router.get_partition(Some(message.source()), destination);
398
399            buffer.clear();
400            message.serialize(&mut buffer);
401
402            internal_queue_tx.insert_message(
403                &ShardsInternalMessagesKey::new(partition, source, *internal_message_key),
404                destination,
405                &buffer,
406            );
407        }
408
409        Ok(())
410    }
411
412    /// write new statistics to storage
413    fn add_statistics(
414        internal_queue_tx: &mut InternalQueueTransaction,
415        diff_statistics: &DiffStatistics,
416    ) -> Result<()> {
417        let _histogram =
418            HistogramGuard::begin("tycho_internal_queue_apply_diff_add_statistics_time");
419        let shard_ident = diff_statistics.shard_ident();
420        let max_message = diff_statistics.max_message();
421
422        for (partition, values) in diff_statistics.iter() {
423            for (addr, count) in values {
424                let Some(dest) = RouterAddr::from_int_addr(addr) else {
425                    anyhow::bail!("cannot add VarAddr to router statistics");
426                };
427
428                let key = StatKey {
429                    shard_ident: *shard_ident,
430                    partition: *partition,
431                    max_message: *max_message,
432                    dest,
433                };
434
435                internal_queue_tx.insert_statistics(&key, *count);
436            }
437            metrics::counter!(
438                "tycho_internal_queue_apply_diff_add_statistics_accounts_count",
439                "partition" => partition.to_string(),
440            )
441            .increment(values.len() as u64);
442        }
443
444        Ok(())
445    }
446
447    fn add_diff_tail(
448        internal_queue_tx: &mut InternalQueueTransaction,
449        block_id_short: &BlockIdShort,
450        max_message: &QueueKey,
451    ) {
452        internal_queue_tx.insert_diff_tail(
453            &DiffTailKey {
454                shard_ident: block_id_short.shard,
455                max_message: *max_message,
456            },
457            block_id_short.seqno.to_le_bytes().as_slice(),
458        );
459    }
460
461    fn add_diff_info(
462        internal_queue_tx: &mut InternalQueueTransaction,
463        block_id_short: &BlockIdShort,
464        diff_statistics: &DiffStatistics,
465        hash: HashBytes,
466        processed_to: ProcessedTo,
467        router_partitions_src: RouterPartitions,
468        router_partitions_dst: RouterPartitions,
469    ) {
470        let shard_messages_count = diff_statistics.shards_messages_count();
471
472        let key = DiffInfoKey {
473            shard_ident: block_id_short.shard,
474            seqno: block_id_short.seqno,
475        };
476
477        let diff_info = DiffInfo {
478            min_message: *diff_statistics.min_message(),
479            max_message: *diff_statistics.max_message(),
480            shards_messages_count: shard_messages_count.clone(),
481            hash,
482            processed_to,
483            router_partitions_src,
484            router_partitions_dst,
485            seqno: block_id_short.seqno,
486        };
487
488        let serialized_diff_info = tl_proto::serialize(diff_info);
489
490        internal_queue_tx.insert_diff_info(&key, &serialized_diff_info);
491    }
492}