zksync_node_framework/implementations/layers/
pruning.rs1use std::time::Duration;
2
3use zksync_node_db_pruner::{DbPruner, DbPrunerConfig};
4
5use crate::{
6 implementations::resources::{
7 healthcheck::AppHealthCheckResource,
8 pools::{MasterPool, PoolResource},
9 },
10 service::StopReceiver,
11 task::{Task, TaskId},
12 wiring_layer::{WiringError, WiringLayer},
13 FromContext, IntoContext,
14};
15
16#[derive(Debug)]
18pub struct PruningLayer {
19 pruning_removal_delay: Duration,
20 pruning_chunk_size: u32,
21 minimum_l1_batch_age: Duration,
22}
23
24#[derive(Debug, FromContext)]
25#[context(crate = crate)]
26pub struct Input {
27 pub master_pool: PoolResource<MasterPool>,
28 #[context(default)]
29 pub app_health: AppHealthCheckResource,
30}
31
32#[derive(Debug, IntoContext)]
33#[context(crate = crate)]
34pub struct Output {
35 #[context(task)]
36 pub db_pruner: DbPruner,
37}
38
39impl PruningLayer {
40 pub fn new(
41 pruning_removal_delay: Duration,
42 pruning_chunk_size: u32,
43 minimum_l1_batch_age: Duration,
44 ) -> Self {
45 Self {
46 pruning_removal_delay,
47 pruning_chunk_size,
48 minimum_l1_batch_age,
49 }
50 }
51}
52
53#[async_trait::async_trait]
54impl WiringLayer for PruningLayer {
55 type Input = Input;
56 type Output = Output;
57
58 fn layer_name(&self) -> &'static str {
59 "pruning_layer"
60 }
61
62 async fn wire(self, input: Self::Input) -> Result<Self::Output, WiringError> {
63 let main_pool = input.master_pool.get().await?;
64
65 let db_pruner = DbPruner::new(
66 DbPrunerConfig {
67 removal_delay: self.pruning_removal_delay,
68 pruned_batch_chunk_size: self.pruning_chunk_size,
69 minimum_l1_batch_age: self.minimum_l1_batch_age,
70 },
71 main_pool,
72 );
73
74 input
75 .app_health
76 .0
77 .insert_component(db_pruner.health_check())
78 .map_err(WiringError::internal)?;
79 Ok(Output { db_pruner })
80 }
81}
82
83#[async_trait::async_trait]
84impl Task for DbPruner {
85 fn id(&self) -> TaskId {
86 "db_pruner".into()
87 }
88
89 async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
90 (*self).run(stop_receiver.0).await
91 }
92}