zksync_node_framework/implementations/layers/
pruning.rs

1use std::time::Duration;
2
3use zksync_node_db_pruner::{DbPruner, DbPrunerConfig};
4
5use crate::{
6    implementations::resources::{
7        healthcheck::AppHealthCheckResource,
8        pools::{MasterPool, PoolResource},
9    },
10    service::StopReceiver,
11    task::{Task, TaskId},
12    wiring_layer::{WiringError, WiringLayer},
13    FromContext, IntoContext,
14};
15
16/// Wiring layer for node pruning layer.
17#[derive(Debug)]
18pub struct PruningLayer {
19    pruning_removal_delay: Duration,
20    pruning_chunk_size: u32,
21    minimum_l1_batch_age: Duration,
22}
23
24#[derive(Debug, FromContext)]
25#[context(crate = crate)]
26pub struct Input {
27    pub master_pool: PoolResource<MasterPool>,
28    #[context(default)]
29    pub app_health: AppHealthCheckResource,
30}
31
32#[derive(Debug, IntoContext)]
33#[context(crate = crate)]
34pub struct Output {
35    #[context(task)]
36    pub db_pruner: DbPruner,
37}
38
39impl PruningLayer {
40    pub fn new(
41        pruning_removal_delay: Duration,
42        pruning_chunk_size: u32,
43        minimum_l1_batch_age: Duration,
44    ) -> Self {
45        Self {
46            pruning_removal_delay,
47            pruning_chunk_size,
48            minimum_l1_batch_age,
49        }
50    }
51}
52
53#[async_trait::async_trait]
54impl WiringLayer for PruningLayer {
55    type Input = Input;
56    type Output = Output;
57
58    fn layer_name(&self) -> &'static str {
59        "pruning_layer"
60    }
61
62    async fn wire(self, input: Self::Input) -> Result<Self::Output, WiringError> {
63        let main_pool = input.master_pool.get().await?;
64
65        let db_pruner = DbPruner::new(
66            DbPrunerConfig {
67                removal_delay: self.pruning_removal_delay,
68                pruned_batch_chunk_size: self.pruning_chunk_size,
69                minimum_l1_batch_age: self.minimum_l1_batch_age,
70            },
71            main_pool,
72        );
73
74        input
75            .app_health
76            .0
77            .insert_component(db_pruner.health_check())
78            .map_err(WiringError::internal)?;
79        Ok(Output { db_pruner })
80    }
81}
82
83#[async_trait::async_trait]
84impl Task for DbPruner {
85    fn id(&self) -> TaskId {
86        "db_pruner".into()
87    }
88
89    async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
90        (*self).run(stop_receiver.0).await
91    }
92}