1use std::marker::PhantomData;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{Result, anyhow, bail};
6use parking_lot::{ArcMutexGuard, ArcRwLockReadGuard, Mutex, RawMutex, RawRwLock, RwLock};
7use serde::{Deserialize, Serialize};
8use tycho_block_util::queue::{QueueKey, QueuePartitionIdx};
9use tycho_core::global_config::ZerostateId;
10use tycho_types::cell::HashBytes;
11use tycho_types::models::{BlockId, BlockIdShort, ShardIdent};
12use tycho_util::metrics::HistogramGuard;
13use tycho_util::{FastDashMap, FastHashMap, FastHashSet, serde_helpers};
14
15use super::gc::GcEndKey;
16use crate::internal_queue::gc::GcManager;
17use crate::internal_queue::state::state_iterator::StateIterator;
18use crate::internal_queue::state::storage::{
19 QueueState, QueueStateFactory, QueueStateImplFactory, QueueStateStdImpl,
20};
21use crate::internal_queue::types::diff::{DiffZone, QueueDiffWithMessages};
22use crate::internal_queue::types::message::InternalMessageValue;
23use crate::internal_queue::types::ranges::QueueShardRange;
24use crate::internal_queue::types::stats::{
25 AccountStatistics, DiffStatistics, SeparatedStatisticsByPartitions,
26};
27use crate::storage::models::DiffInfo;
28use crate::storage::transaction::InternalQueueTransaction;
29use crate::types::TopBlockIdUpdated;
30use crate::{internal_queue, tracing_targets};
31
32pub struct PendingQueueDiff {
35 tx: InternalQueueTransaction,
36 _global_guard: ArcRwLockReadGuard<RawRwLock, ()>,
37 _shard_guard: ArcMutexGuard<RawMutex, ()>,
38}
39
40impl PendingQueueDiff {
41 pub fn write(self) -> anyhow::Result<()> {
42 self.tx.write()
43 }
44}
45
46#[derive(Debug, Serialize, Deserialize)]
47pub struct QueueConfig {
48 #[serde(with = "serde_helpers::humantime")]
50 pub gc_interval: Duration,
51}
52
53impl Default for QueueConfig {
54 fn default() -> Self {
55 Self {
56 gc_interval: Duration::from_secs(5),
57 }
58 }
59}
60
61pub trait QueueFactory<V: InternalMessageValue> {
62 type Queue: Queue<V>;
63
64 fn create(&self) -> Result<Self::Queue>;
65}
66
67impl<F, R, V: InternalMessageValue> QueueFactory<V> for F
68where
69 F: Fn() -> Result<R>,
70 R: Queue<V>,
71{
72 type Queue = R;
73
74 fn create(&self) -> Result<Self::Queue> {
75 self()
76 }
77}
78
79pub struct QueueFactoryStdImpl {
80 pub zerostate_id: ZerostateId,
81 pub state: QueueStateImplFactory,
82 pub config: QueueConfig,
83}
84
85pub trait Queue<V>: Send
88where
89 V: InternalMessageValue + Send + Sync,
90{
91 fn iterator(
93 &self,
94 partition: QueuePartitionIdx,
95 ranges: &[QueueShardRange],
96 for_shard_id: ShardIdent,
97 ) -> Result<Box<dyn StateIterator<V>>>;
98
99 fn prepare_diff(
102 &self,
103 diff: QueueDiffWithMessages<V>,
104 block_id_short: BlockIdShort,
105 hash: &HashBytes,
106 statistics: DiffStatistics,
107 check_sequence: Option<DiffZone>,
108 ) -> Result<Option<PendingQueueDiff>>;
109
110 fn apply_diff(
112 &self,
113 diff: QueueDiffWithMessages<V>,
114 block_id_short: BlockIdShort,
115 hash: &HashBytes,
116 statistics: DiffStatistics,
117 check_sequence: Option<DiffZone>,
118 ) -> Result<()>;
119
120 fn commit_diff(
122 &self,
123 mc_top_blocks: &[TopBlockIdUpdated],
124 partitions: &FastHashSet<QueuePartitionIdx>,
125 ) -> Result<()>;
126
127 fn clear_uncommitted_state(
129 &self,
130 partitions: &FastHashSet<QueuePartitionIdx>,
131 top_shards: &[ShardIdent],
132 ) -> Result<()>;
133
134 fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, from: &QueueKey) -> u32;
136
137 fn load_diff_statistics(
139 &self,
140 partition: QueuePartitionIdx,
141 range: &QueueShardRange,
142 result: &mut AccountStatistics,
143 ) -> Result<()>;
144
145 fn get_diff_info(
147 &self,
148 shard_ident: &ShardIdent,
149 seqno: u32,
150 zone: DiffZone,
151 ) -> Result<Option<DiffInfo>>;
152
153 fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> Result<bool>;
155
156 fn get_last_committed_mc_block_id(&self) -> Result<Option<BlockId>>;
159
160 fn load_separated_diff_statistics(
162 &self,
163 partitions: &FastHashSet<QueuePartitionIdx>,
164 range: &QueueShardRange,
165 ) -> Result<SeparatedStatisticsByPartitions>;
166}
167
168impl<V: InternalMessageValue> QueueFactory<V> for QueueFactoryStdImpl {
169 type Queue = QueueImpl<QueueStateStdImpl, V>;
170
171 fn create(&self) -> Result<Self::Queue> {
172 let state = <QueueStateImplFactory as QueueStateFactory<V>>::create(&self.state)?;
173 let state = Arc::new(state);
174 let gc = GcManager::start::<V>(state.clone(), self.config.gc_interval);
175 Ok(QueueImpl {
176 state,
177 zerostate_id: self.zerostate_id,
178 gc,
179 global_lock: Arc::new(RwLock::new(())),
180 shard_locks: FastDashMap::default(),
181 _phantom_data: Default::default(),
182 })
183 }
184}
185
186pub struct QueueImpl<P, V>
187where
188 P: QueueState<V>,
189 V: InternalMessageValue,
190{
191 state: Arc<P>,
192 zerostate_id: ZerostateId,
193 gc: GcManager,
194 global_lock: Arc<RwLock<()>>,
195 shard_locks: FastDashMap<ShardIdent, Arc<Mutex<()>>>,
196 _phantom_data: PhantomData<V>,
197}
198
199impl<P, V> Queue<V> for QueueImpl<P, V>
200where
201 P: QueueState<V> + Send + Sync + 'static,
202 V: InternalMessageValue + Send + Sync,
203{
204 fn iterator(
205 &self,
206 partition: QueuePartitionIdx,
207 ranges: &[QueueShardRange],
208 for_shard_id: ShardIdent,
209 ) -> Result<Box<dyn StateIterator<V>>> {
210 let snapshot = self.state.snapshot();
211
212 let state_iterator = {
213 let _histogram =
214 HistogramGuard::begin("tycho_internal_queue_commited_state_iterator_create_time");
215 self.state
216 .iterator(&snapshot, for_shard_id, partition, ranges)?
217 };
218
219 Ok(state_iterator)
220 }
221
222 fn prepare_diff(
223 &self,
224 diff: QueueDiffWithMessages<V>,
225 block_id_short: BlockIdShort,
226 hash: &HashBytes,
227 statistics: DiffStatistics,
228 check_sequence: Option<DiffZone>,
229 ) -> Result<Option<PendingQueueDiff>> {
230 let global_guard = self.global_lock.read_arc();
232
233 let shard_lock = self
235 .shard_locks
236 .entry(block_id_short.shard)
237 .or_default()
238 .clone();
239 let shard_guard = shard_lock.lock_arc();
240
241 let shard_diff = internal_queue::queue::Queue::get_diff_info(
243 self,
244 &block_id_short.shard,
245 block_id_short.seqno,
246 DiffZone::Both,
247 )?;
248
249 if let Some(shard_diff) = shard_diff {
252 if shard_diff.hash != *hash {
254 bail!(
255 "Duplicate diff with different hash: block_id={}, existing diff_hash={}, new diff_hash={}",
256 block_id_short,
257 shard_diff.hash,
258 hash,
259 )
260 }
261 return Ok(None);
262 }
263
264 if let Some(zone) = check_sequence {
265 let last_applied_seqno = self.state.get_last_applied_seqno(&block_id_short.shard)?;
266
267 if let Some(last_applied_seqno) = last_applied_seqno {
268 let last_applied_diff_opt = internal_queue::queue::Queue::get_diff_info(
269 self,
270 &block_id_short.shard,
271 last_applied_seqno,
272 zone,
273 )?;
274
275 if let Some(last_applied_diff) = last_applied_diff_opt {
276 if block_id_short.seqno <= last_applied_diff.seqno {
278 return Ok(None);
279 }
280
281 if block_id_short.seqno != last_applied_diff.seqno + 1 {
283 bail!(
284 "Diff seqno is not sequential new seqno {}. last_applied_seqno {}",
285 block_id_short.seqno,
286 last_applied_diff.seqno
287 );
288 }
289 }
290 }
291 }
292
293 let commit_pointers = self.state.get_commit_pointers()?;
295 if let Some(commit_pointer) = commit_pointers.get(&block_id_short.shard)
296 && let Some(min_message) = diff.min_message()
297 && min_message <= &commit_pointer.queue_key
298 {
299 bail!(
300 "Diff min_message is less than commit_pointer: block_id={}, diff_min_message={}, commit_pointer={}",
301 block_id_short.seqno,
302 min_message,
303 commit_pointer.queue_key
304 );
305 }
306
307 let tx = self
308 .state
309 .prepare_diff(&block_id_short, &statistics, *hash, diff)?;
310
311 Ok(Some(PendingQueueDiff {
312 tx,
313 _global_guard: global_guard,
314 _shard_guard: shard_guard,
315 }))
316 }
317
318 fn apply_diff(
319 &self,
320 diff: QueueDiffWithMessages<V>,
321 block_id_short: BlockIdShort,
322 hash: &HashBytes,
323 statistics: DiffStatistics,
324 check_sequence: Option<DiffZone>,
325 ) -> Result<()> {
326 if let Some(tx) =
327 self.prepare_diff(diff, block_id_short, hash, statistics, check_sequence)?
328 {
329 let _histogram = HistogramGuard::begin("tycho_internal_queue_write_diff_time");
330 tx.write()?;
331 }
332 Ok(())
333 }
334
335 fn commit_diff(
336 &self,
337 mc_top_blocks: &[TopBlockIdUpdated],
338 partitions: &FastHashSet<QueuePartitionIdx>,
339 ) -> Result<()> {
340 let _global_write_guard = self.global_lock.write();
342
343 let mc_block_id = mc_top_blocks
344 .iter()
345 .find_map(|item| {
346 item.block
347 .block_id
348 .is_masterchain()
349 .then_some(&item.block.block_id)
350 })
351 .ok_or_else(|| anyhow!("Masterchain block not found in commit_diff"))?;
352
353 let commit_pointers = self.state.get_commit_pointers()?;
355 if let Some(commit_pointer) = commit_pointers.get(&mc_block_id.shard)
356 && commit_pointer.seqno >= mc_block_id.seqno
357 {
358 tracing::debug!(
359 target: tracing_targets::MQ,
360 "Skip commit diff for mc block {}. Committed by next mc block with seqno {}",
361 mc_block_id, commit_pointer.seqno,
362 );
363 return Ok(());
365 }
366
367 let mut gc_ranges = FastHashMap::default();
368
369 let mut commit_pointers = FastHashMap::default();
370
371 for item in mc_top_blocks {
372 let block_id = &item.block.block_id;
373
374 let diff = self
376 .state
377 .get_diff_info(&block_id.shard, block_id.seqno, DiffZone::Both)?;
378
379 let diff = match diff {
380 None if item.updated && item.block.ref_by_mc_seqno > self.zerostate_id.seqno => {
382 bail!(
383 "Diff not found for block_id: {} ref {} zerostate {}",
384 block_id,
385 item.block.ref_by_mc_seqno,
386 self.zerostate_id.seqno
387 )
388 }
389 None => continue,
391 Some(diff) => diff,
392 };
393
394 if commit_pointers
396 .insert(block_id.shard, (diff.max_message, diff.seqno))
397 .is_some()
398 {
399 bail!("Duplicate shard in commit_diff: {}", block_id.shard);
400 }
401
402 for (shard_ident, processed_to_key) in diff.processed_to.iter() {
404 gc_ranges
405 .entry(*shard_ident)
406 .and_modify(|last: &mut GcEndKey| {
407 if processed_to_key < &last.end_key {
408 last.end_key = *processed_to_key;
409 last.on_top_block_id = *block_id;
410 }
411 })
412 .or_insert(GcEndKey {
413 end_key: *processed_to_key,
414 on_top_block_id: *block_id,
415 });
416 }
417 }
418
419 tracing::debug!(target: tracing_targets::MQ,
420 ?commit_pointers,
421 "commit_diff",
422 );
423
424 self.state.commit(commit_pointers, mc_block_id)?;
426
427 for (shard, gc_end_key) in gc_ranges {
429 for partition in partitions {
430 self.gc.update_delete_until(*partition, shard, gc_end_key);
431 }
432 }
433
434 Ok(())
435 }
436
437 fn clear_uncommitted_state(
438 &self,
439 partitions: &FastHashSet<QueuePartitionIdx>,
440 top_shards: &[ShardIdent],
441 ) -> Result<()> {
442 let _global_write_guard = self.global_lock.write();
444 self.state.clear_uncommitted(partitions, top_shards)
445 }
446
447 fn load_diff_statistics(
448 &self,
449 partition: QueuePartitionIdx,
450 range: &QueueShardRange,
451 result: &mut AccountStatistics,
452 ) -> Result<()> {
453 self.state.load_diff_statistics(partition, range, result)
454 }
455
456 fn load_separated_diff_statistics(
457 &self,
458 partitions: &FastHashSet<QueuePartitionIdx>,
459 range: &QueueShardRange,
460 ) -> Result<SeparatedStatisticsByPartitions> {
461 let result = self
462 .state
463 .load_separated_diff_statistics(partitions, range)?;
464
465 Ok(result)
466 }
467
468 fn get_diff_info(
469 &self,
470 shard_ident: &ShardIdent,
471 seqno: u32,
472 zone: DiffZone,
473 ) -> Result<Option<DiffInfo>> {
474 self.state.get_diff_info(shard_ident, seqno, zone)
475 }
476
477 fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, from: &QueueKey) -> u32 {
478 self.state.get_diffs_tail_len(shard_ident, from)
479 }
480
481 fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> Result<bool> {
482 Ok(internal_queue::queue::Queue::get_diff_info(
483 self,
484 &block_id_short.shard,
485 block_id_short.seqno,
486 DiffZone::Both,
487 )?
488 .is_some())
489 }
490
491 fn get_last_committed_mc_block_id(&self) -> Result<Option<BlockId>> {
492 self.state.get_last_committed_mc_block_id()
493 }
494}