bee_tangle/
tip_pool_cleaner_worker.rs

1// Copyright 2020-2021 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::{storage::StorageBackend, Tangle, TangleWorker};
5
6use bee_runtime::{node::Node, shutdown_stream::ShutdownStream, worker::Worker};
7
8use async_trait::async_trait;
9use futures::StreamExt;
10use log::info;
11use tokio::time::interval;
12use tokio_stream::wrappers::IntervalStream;
13
14use std::{any::TypeId, convert::Infallible, time::Duration};
15
16// In seconds
17const TIP_POOL_CLEANER_INTERVAL: u64 = 1;
18
19#[derive(Default)]
20pub(crate) struct TipPoolCleanerWorker {}
21
22#[async_trait]
23impl<N: Node> Worker<N> for TipPoolCleanerWorker
24where
25    N::Backend: StorageBackend,
26{
27    type Config = ();
28    type Error = Infallible;
29
30    fn dependencies() -> &'static [TypeId] {
31        vec![TypeId::of::<TangleWorker>()].leak()
32    }
33
34    async fn start(node: &mut N, _config: Self::Config) -> Result<Self, Self::Error> {
35        let tangle = node.resource::<Tangle<N::Backend>>();
36
37        node.spawn::<Self, _, _>(|shutdown| async move {
38            info!("Running.");
39
40            let mut ticker = ShutdownStream::new(
41                shutdown,
42                IntervalStream::new(interval(Duration::from_secs(TIP_POOL_CLEANER_INTERVAL))),
43            );
44
45            while ticker.next().await.is_some() {
46                tangle.reduce_tips().await
47            }
48
49            info!("Stopped.");
50        });
51
52        Ok(Self::default())
53    }
54}