1use std::collections::BTreeMap;
2use std::pin::pin;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU32, Ordering};
5use std::time::{Duration, Instant};
6
7use parking_lot::Mutex;
8use rand::Rng;
9use scopeguard::defer;
10use serde::{Deserialize, Serialize};
11use tokio::sync::{Notify, watch};
12use tokio::task::AbortHandle;
13use tycho_block_util::block::BlockStuff;
14use tycho_types::models::BlockId;
15use tycho_util::metrics::HistogramGuard;
16
17use super::{
18 ArchivesGcConfig, BlockHandleStorage, BlockStorage, BlocksGcConfig, BlocksGcType,
19 CoreStorageConfig, NodeStateStorage, PersistentStateStorage, ShardStateStorage, StatesGcConfig,
20};
21
22#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
23pub enum ManualGcTrigger {
24 Exact(u32),
26 Distance(u32),
28}
29
30pub(crate) struct CoreStorageGc {
31 tick_tx: TickTx,
32 last_key_block_seqno: AtomicU32,
33 diff_tail_cache: DiffTailCache,
34
35 archives_gc_trigger: ManualTriggerTx,
36 blocks_gc_trigger: ManualTriggerTx,
37 states_gc_trigger: ManualTriggerTx,
38
39 blocks_gc_handle: AbortHandle,
40 states_gc_handle: AbortHandle,
41 archive_gc_handle: AbortHandle,
42}
43
44impl Drop for CoreStorageGc {
45 fn drop(&mut self) {
46 self.blocks_gc_handle.abort();
47 self.states_gc_handle.abort();
48 self.archive_gc_handle.abort();
49 }
50}
51
52impl CoreStorageGc {
53 pub fn new(
54 node_state: &NodeStateStorage,
55 block_handles: Arc<BlockHandleStorage>,
56 blocks: Arc<BlockStorage>,
57 shard_states: Arc<ShardStateStorage>,
58 persistent_states: PersistentStateStorage,
59 config: &CoreStorageConfig,
60 ) -> Self {
61 let last_key_block_seqno = block_handles
62 .find_last_key_block()
63 .map_or(0, |handle| handle.id().seqno);
64
65 let diff_tail_cache = DiffTailCache::default();
66
67 let (tick_tx, tick_rx) = watch::channel(None::<Tick>);
68
69 let (archives_gc_trigger, archives_gc_rx) = watch::channel(None::<ManualGcTrigger>);
70 let archives_gc = tokio::spawn(Self::archives_gc(
71 tick_rx.clone(),
72 archives_gc_rx,
73 persistent_states,
74 blocks.clone(),
75 config.archives_gc,
76 ));
77
78 let (blocks_gc_trigger, blocks_gc_rx) = watch::channel(None::<ManualGcTrigger>);
79 let blocks_gc = tokio::spawn(Self::blocks_gc(
80 tick_rx.clone(),
81 blocks_gc_rx,
82 diff_tail_cache.clone(),
83 block_handles,
84 blocks,
85 config.blocks_gc,
86 ));
87
88 let (states_gc_trigger, states_gc_rx) = watch::channel(None::<ManualGcTrigger>);
89 let states_gc = tokio::spawn(Self::states_gc(
90 tick_rx,
91 states_gc_rx,
92 shard_states,
93 config.states_gc,
94 ));
95
96 let last_known_mc_block = node_state.load_last_mc_block_id();
97 if let Some(mc_block_id) = last_known_mc_block {
98 tracing::info!(
99 %mc_block_id,
100 "starting GC subscriber with the last known master block"
101 );
102 metrics::gauge!("tycho_core_last_mc_block_seqno").set(mc_block_id.seqno as f64);
103 tick_tx.send_replace(Some(Tick {
104 last_key_block_seqno,
105 mc_block_id,
106 }));
107 }
108
109 Self {
110 tick_tx,
111 last_key_block_seqno: AtomicU32::new(last_key_block_seqno),
112 diff_tail_cache,
113
114 archives_gc_trigger,
115 blocks_gc_trigger,
116 states_gc_trigger,
117
118 archive_gc_handle: archives_gc.abort_handle(),
119 blocks_gc_handle: blocks_gc.abort_handle(),
120 states_gc_handle: states_gc.abort_handle(),
121 }
122 }
123
124 pub fn trigger_archives_gc(&self, trigger: ManualGcTrigger) {
125 self.archives_gc_trigger.send_replace(Some(trigger));
126 }
127
128 pub fn trigger_blocks_gc(&self, trigger: ManualGcTrigger) {
129 self.blocks_gc_trigger.send_replace(Some(trigger));
130 }
131
132 pub fn trigger_states_gc(&self, trigger: ManualGcTrigger) {
133 self.states_gc_trigger.send_replace(Some(trigger));
134 }
135
136 pub fn handle_block(&self, is_key_block: bool, block: &BlockStuff) {
137 self.diff_tail_cache.handle_block(block);
139
140 if !block.id().is_masterchain() {
141 return;
142 }
143
144 if is_key_block {
145 self.last_key_block_seqno
146 .store(block.id().seqno, Ordering::Relaxed);
147 }
148
149 self.tick_tx.send_replace(Some(Tick {
150 last_key_block_seqno: self.last_key_block_seqno.load(Ordering::Relaxed),
151 mc_block_id: *block.id(),
152 }));
153 }
154
155 #[tracing::instrument(skip_all)]
156 async fn archives_gc(
157 mut tick_rx: TickRx,
158 mut manual_rx: ManualTriggerRx,
159 persistent_states: PersistentStateStorage,
160 blocks: Arc<BlockStorage>,
161 config: Option<ArchivesGcConfig>,
162 ) {
163 let Some(config) = config else {
164 tracing::warn!("manager disabled");
165 return;
166 };
167 tracing::info!("manager started");
168 defer! {
169 tracing::info!("manager stopped");
170 }
171
172 let compute_offset = |gen_utime: u32| -> Duration {
173 let usable_at = std::time::UNIX_EPOCH
174 + Duration::from_secs(gen_utime as _)
175 + BlockStuff::BOOT_OFFSET;
176 (usable_at + config.persistent_state_offset)
177 .duration_since(std::time::SystemTime::now())
178 .unwrap_or_default()
179 };
180
181 let mut prev_pss_seqno = 0;
182 'outer: loop {
183 let target_seqno = 'seqno: {
185 let wait_for_state_fut = async {
186 loop {
187 let mut new_state_found =
188 pin!(persistent_states.oldest_known_handle_changed());
189
190 let pss_handle = match persistent_states.load_oldest_known_handle() {
191 Some(handle) if handle.id().seqno > prev_pss_seqno => handle,
192 _ => {
193 new_state_found.await;
195 continue;
196 }
197 };
198
199 let time_to_wait = compute_offset(pss_handle.gen_utime());
201 tokio::select! {
202 _ = tokio::time::sleep(time_to_wait) => break pss_handle.id().seqno,
203 _ = &mut new_state_found => {},
204 }
205 }
206 };
207
208 tokio::select! {
209 seqno = wait_for_state_fut => {
211 prev_pss_seqno = seqno;
212 break 'seqno seqno
213 },
214 trigger = manual_rx.changed() => {
216 if trigger.is_err() {
217 break 'outer;
218 }
219 }
220 }
221
222 let (Some(tick), Some(trigger)) =
223 (*tick_rx.borrow_and_update(), *manual_rx.borrow_and_update())
224 else {
225 continue 'outer;
226 };
227
228 tick.adjust(trigger)
229 };
230
231 if let Err(e) = blocks.remove_outdated_archives(target_seqno).await {
232 tracing::error!("failed to remove outdated archives: {e:?}");
233 }
234 }
235 }
236
237 #[tracing::instrument(skip_all)]
238 async fn blocks_gc(
239 mut tick_rx: TickRx,
240 mut manual_rx: ManualTriggerRx,
241 diff_tail_cache: DiffTailCache,
242 block_handles: Arc<BlockHandleStorage>,
243 blocks: Arc<BlockStorage>,
244 config: Option<BlocksGcConfig>,
245 ) {
246 let Some(config) = config else {
247 tracing::warn!("manager disabled");
248 return;
249 };
250 tracing::info!("manager started");
251 defer! {
252 tracing::info!("manager stopped");
253 }
254
255 let mut last_tiggered_at = None::<Instant>;
256 let mut sleep_until = None::<Instant>;
257 let mut known_key_block_seqno = 0;
258
259 while let Some(source) = wait_with_sleep(&mut tick_rx, &mut manual_rx, sleep_until).await {
261 sleep_until = None;
262
263 let Some(tick) = *tick_rx.borrow_and_update() else {
264 continue;
265 };
266 tracing::debug!(?tick);
267
268 let has_new_key_block = tick.last_key_block_seqno > known_key_block_seqno;
271 known_key_block_seqno = tick.last_key_block_seqno;
272
273 let target_seqno = match (source, config.ty) {
274 (GcSource::Manual, _) => {
275 let Some(trigger) = *manual_rx.borrow_and_update() else {
276 continue;
277 };
278
279 let target_seqno = tick.adjust(trigger);
281 if target_seqno == 0 {
282 continue;
283 }
284
285 last_tiggered_at = Some(Instant::now());
287 target_seqno
288 }
289 (
290 GcSource::Schedule,
291 BlocksGcType::BeforeSafeDistance {
292 safe_distance,
293 min_interval,
294 },
295 ) => {
296 const MIN_SAFE_DISTANCE: u32 = 100;
298
299 let Some(tail_len) = diff_tail_cache
300 .wait_for_tail_len(tick.mc_block_id.seqno)
301 .await
302 else {
303 tracing::warn!(
304 seqno = ?tick.mc_block_id.seqno ,
305 "tail diff not found in cache, skipping GC. This is expected during startup."
306 );
307 continue;
308 };
309
310 metrics::gauge!("tycho_core_blocks_gc_tail_len").set(tail_len);
311
312 tracing::info!(tail_len, "found longest diffs tail");
313
314 let safe_distance = [safe_distance, MIN_SAFE_DISTANCE, tail_len + 1]
315 .into_iter()
316 .max()
317 .unwrap();
318
319 let target_seqno = match tick.mc_block_id.seqno.checked_sub(safe_distance) {
321 None | Some(0) => continue,
323 Some(seqno) => seqno,
324 };
325
326 if let Some(last) = last_tiggered_at
328 && last.elapsed() < min_interval
329 {
330 sleep_until = Some(last + min_interval);
333 continue;
334 }
335
336 last_tiggered_at = Some(Instant::now());
339 target_seqno
340 }
341 (GcSource::Schedule, BlocksGcType::BeforePreviousKeyBlock) => {
342 if !has_new_key_block {
343 continue;
344 }
345
346 let target_seqno = tick.last_key_block_seqno;
348 match block_handles.find_prev_key_block(target_seqno) {
349 Some(handle) => handle.id().seqno,
350 None => {
351 tracing::warn!(target_seqno, "previous key block not found");
352 continue;
353 }
354 }
355 }
356 (GcSource::Schedule, BlocksGcType::BeforePreviousPersistentState) => {
357 if !has_new_key_block {
358 continue;
359 }
360
361 let target_seqno = tick.last_key_block_seqno;
363 match block_handles.find_prev_persistent_key_block(target_seqno) {
364 Some(handle) => handle.id().seqno,
365 None => {
366 tracing::warn!(target_seqno, "previous persistent block not found");
367 continue;
368 }
369 }
370 }
371 };
372
373 metrics::gauge!("tycho_core_mc_blocks_gc_lag")
374 .set(tick.mc_block_id.seqno.saturating_sub(target_seqno));
375
376 if let Err(e) = blocks
377 .remove_outdated_blocks(target_seqno, config.max_blocks_per_batch)
378 .await
379 {
380 tracing::error!("failed to remove outdated blocks: {e:?}");
381 }
382
383 diff_tail_cache.cleanup(target_seqno);
385 }
386 }
387
388 #[tracing::instrument(skip_all)]
389 async fn states_gc(
390 mut tick_rx: TickRx,
391 mut manual_rx: ManualTriggerRx,
392 shard_states: Arc<ShardStateStorage>,
393 config: Option<StatesGcConfig>,
394 ) {
395 let Some(config) = config else {
396 tracing::warn!("manager disabled");
397 return;
398 };
399 tracing::info!(?config, "manager started");
400 defer! {
401 tracing::info!("manager stopped");
402 }
403
404 let mut random_offset = config
405 .random_offset
406 .then(|| rand::rng().random_range(Duration::ZERO..config.interval));
407 let mut last_triggered_at = None::<Instant>;
408 let mut sleep_until = None::<Instant>;
409
410 while let Some(source) = wait_with_sleep(&mut tick_rx, &mut manual_rx, sleep_until).await {
411 sleep_until = None;
412 if let GcSource::Manual = source {
413 tracing::info!("manual states gc triggered");
414 }
415
416 let Some(tick) = *tick_rx.borrow_and_update() else {
417 tracing::debug!("no tick available, continuing");
418 continue;
419 };
420 tracing::debug!(?tick, "states gc tick");
421
422 let now = Instant::now();
423
424 let target_seqno = match source {
426 GcSource::Manual => {
428 let Some(trigger) = *manual_rx.borrow_and_update() else {
429 tracing::debug!("no manual trigger available, continuing");
430 continue;
431 };
432 tracing::info!(seqno = tick.adjust(trigger), "manual GC triggered");
433 tick.adjust(trigger)
434 }
435 GcSource::Schedule => {
436 if let Some(last) = last_triggered_at {
438 let next_gc = last + config.interval;
439 if next_gc > now {
440 sleep_until = Some(next_gc);
441 let sleep_duration = next_gc - now;
442 tracing::debug!(
443 duration = sleep_duration.as_secs_f64(),
444 "sleeping until next GC"
445 );
446 continue;
447 }
448 } else if let Some(offset) = random_offset.take() {
449 sleep_until = Some(now + offset);
450 let sleep_duration = offset;
451 tracing::debug!(
452 duration = sleep_duration.as_secs_f64(),
453 "sleeping with random offset"
454 );
455 continue;
456 }
457
458 tracing::info!(seqno = tick.mc_block_id.seqno, "scheduled GC");
459 tick.mc_block_id.seqno
460 }
461 };
462
463 if target_seqno == 0 {
464 tracing::warn!("target seqno is 0, skipping GC");
465 continue;
466 }
467
468 last_triggered_at = Some(now);
469 tracing::info!("starting GC for target seqno: {}", target_seqno);
470
471 let hist = HistogramGuard::begin("tycho_gc_states_time");
472
473 if let Err(e) = shard_states.remove_outdated_states(target_seqno).await {
474 tracing::error!("failed to remove outdated states: {e:?}");
475 }
476
477 let took = hist.finish();
478 tracing::info!(
479 duration = took.as_secs_f64(),
480 "completed GC for target seqno: {}",
481 target_seqno
482 );
483 }
484 }
485}
486
487#[derive(Debug, Clone)]
488enum GcSource {
489 Schedule,
490 Manual,
491}
492
493#[derive(Debug, Clone, Copy)]
494struct Tick {
495 pub mc_block_id: BlockId,
496 pub last_key_block_seqno: u32,
497}
498
499impl Tick {
500 fn adjust(&self, trigger: ManualGcTrigger) -> u32 {
501 match trigger {
502 ManualGcTrigger::Exact(seqno) => seqno,
503 ManualGcTrigger::Distance(distance) => self.mc_block_id.seqno.saturating_sub(distance),
504 }
505 }
506}
507
508type TickTx = watch::Sender<Option<Tick>>;
509type TickRx = watch::Receiver<Option<Tick>>;
510
511type ManualTriggerTx = watch::Sender<Option<ManualGcTrigger>>;
512type ManualTriggerRx = watch::Receiver<Option<ManualGcTrigger>>;
513
514async fn wait_with_sleep(
515 tick_rx: &mut TickRx,
516 manual_rx: &mut ManualTriggerRx,
517 sleep_until: Option<Instant>,
518) -> Option<GcSource> {
519 use futures_util::future::Either;
520
521 let fut = match sleep_until {
522 Some(deadline) => Either::Left(async move {
524 tokio::time::sleep_until(deadline.into()).await;
525 Some(GcSource::Schedule)
526 }),
527 None => Either::Right(async {
529 let res = tick_rx.changed().await;
530 res.is_ok().then_some(GcSource::Schedule)
531 }),
532 };
533
534 tokio::select! {
535 res = fut => res,
536 trigger = manual_rx.changed() => {
537 trigger.is_ok().then_some(GcSource::Manual)
538 },
539 }
540}
541
542#[derive(Default, Clone)]
545struct DiffTailCache {
546 inner: Arc<DiffTailCacheInner>,
547}
548
549#[derive(Default)]
550struct DiffTailCacheInner {
551 new_block_finalized: Notify,
552 latest_finalized_seqno: AtomicU32,
553 max_tail_len: AtomicU32,
554 finalized: Mutex<BTreeMap<u32, DiffTailCacheEntry>>,
555}
556
557impl DiffTailCache {
558 fn handle_block(&self, block: &BlockStuff) {
564 const OFFSET: u32 = 1;
565
566 let this = self.inner.as_ref();
567
568 let seqno = block.id().seqno;
569 let tail_len = block.as_ref().out_msg_queue_updates.tail_len;
570
571 if block.id().is_masterchain() {
572 let acc = this.max_tail_len.swap(0, Ordering::AcqRel);
574
575 {
576 let mut finalized = this.finalized.lock();
577
578 let sc_tail_len = match acc.checked_sub(OFFSET) {
579 Some(tail_len) => tail_len,
581 None => match finalized.get(&seqno.saturating_sub(1)) {
587 Some(prev) => prev.sc_tail_len,
588 None => 1,
589 },
590 };
591
592 let prev = finalized.insert(seqno, DiffTailCacheEntry {
593 mc_tail_len: tail_len,
594 sc_tail_len,
595 });
596 debug_assert!(prev.is_none(), "same block handled twice at runtime");
597 }
598
599 this.latest_finalized_seqno.store(seqno, Ordering::Release);
600 this.new_block_finalized.notify_waiters();
601 } else {
602 this.max_tail_len
606 .fetch_max(OFFSET + tail_len, Ordering::Release);
607 }
608 }
609
610 async fn wait_for_tail_len(&self, mc_seqno: u32) -> Option<u32> {
616 let this = self.inner.as_ref();
617
618 loop {
620 let updated = this.new_block_finalized.notified();
621 if this.latest_finalized_seqno.load(Ordering::Acquire) >= mc_seqno {
622 break;
623 }
624 updated.await;
625 }
626
627 this.finalized
630 .lock()
631 .get(&mc_seqno)
632 .map(DiffTailCacheEntry::compute_max)
633 }
634
635 fn cleanup(&self, upto_mc_seqno: u32) {
638 let mut finalized = self.inner.finalized.lock();
639 if let Some(lower_bound) = upto_mc_seqno.checked_add(1) {
640 let rest = finalized.split_off(&lower_bound);
641 *finalized = rest;
642 } else {
643 finalized.clear();
644 }
645 }
646}
647
648#[derive(Clone, Copy)]
649struct DiffTailCacheEntry {
650 mc_tail_len: u32,
651 sc_tail_len: u32,
652}
653
654impl DiffTailCacheEntry {
655 fn compute_max(&self) -> u32 {
656 self.mc_tail_len.max(self.sc_tail_len)
657 }
658}
659
660#[cfg(test)]
661mod tests {
662 use std::pin::pin;
663
664 use futures_util::future::poll_immediate;
665 use tycho_types::models::ShardIdent;
666
667 use super::*;
668
669 #[tokio::test]
670 async fn test_tail_cache_basic_flow() {
671 let cache = DiffTailCache::default();
672
673 assert_eq!(cache.wait_for_tail_len(0).await, None);
675
676 let mut wait_tail_len = pin!(cache.wait_for_tail_len(1));
679
680 assert_eq!(poll_immediate(&mut wait_tail_len).await, None);
682
683 for (sc_seqno, tail_len) in [(100, 20), (101, 15), (102, 16)] {
685 let sc_block = BlockStuff::new_with(ShardIdent::BASECHAIN, sc_seqno, |block| {
686 block.out_msg_queue_updates.tail_len = tail_len;
687 });
688 cache.handle_block(&sc_block);
689 }
690
691 assert_eq!(poll_immediate(&mut wait_tail_len).await, None);
693
694 cache.handle_block(&BlockStuff::new_with(ShardIdent::MASTERCHAIN, 1, |block| {
696 block.out_msg_queue_updates.tail_len = 10;
697 }));
698
699 assert_eq!(wait_tail_len.await, Some(20));
701
702 let mut wait_tail_len = pin!(cache.wait_for_tail_len(2));
705
706 assert_eq!(poll_immediate(&mut wait_tail_len).await, None);
708
709 cache.handle_block(&BlockStuff::new_with(ShardIdent::MASTERCHAIN, 2, |block| {
711 block.out_msg_queue_updates.tail_len = 5;
712 }));
713
714 assert_eq!(wait_tail_len.await, Some(20));
716
717 assert_eq!(cache.wait_for_tail_len(1).await, Some(20));
719 assert_eq!(cache.wait_for_tail_len(2).await, Some(20));
720
721 cache.handle_block(&BlockStuff::new_with(ShardIdent::BASECHAIN, 103, |block| {
725 block.out_msg_queue_updates.tail_len = 1;
726 }));
727
728 let mut wait_tail_len = pin!(cache.wait_for_tail_len(3));
729
730 assert_eq!(poll_immediate(&mut wait_tail_len).await, None);
732
733 cache.handle_block(&BlockStuff::new_with(ShardIdent::MASTERCHAIN, 3, |block| {
735 block.out_msg_queue_updates.tail_len = 2;
736 }));
737
738 assert_eq!(wait_tail_len.await, Some(2));
739
740 assert_eq!(cache.wait_for_tail_len(3).await, Some(2));
742
743 cache.cleanup(1);
746
747 assert_eq!(poll_immediate(cache.wait_for_tail_len(0)).await, Some(None));
749 assert_eq!(poll_immediate(cache.wait_for_tail_len(1)).await, Some(None));
750 assert_eq!(
752 poll_immediate(cache.wait_for_tail_len(2)).await,
753 Some(Some(20))
754 );
755 assert_eq!(
756 poll_immediate(cache.wait_for_tail_len(3)).await,
757 Some(Some(2))
758 );
759 assert_eq!(poll_immediate(cache.wait_for_tail_len(4)).await, None);
761
762 cache.cleanup(10);
764
765 assert_eq!(poll_immediate(cache.wait_for_tail_len(4)).await, None);
767 }
768}