bee_tangle/
tip_pool_cleaner_worker.rs1use crate::{storage::StorageBackend, Tangle, TangleWorker};
5
6use bee_runtime::{node::Node, shutdown_stream::ShutdownStream, worker::Worker};
7
8use async_trait::async_trait;
9use futures::StreamExt;
10use log::info;
11use tokio::time::interval;
12use tokio_stream::wrappers::IntervalStream;
13
14use std::{any::TypeId, convert::Infallible, time::Duration};
15
16const TIP_POOL_CLEANER_INTERVAL: u64 = 1;
18
19#[derive(Default)]
20pub(crate) struct TipPoolCleanerWorker {}
21
22#[async_trait]
23impl<N: Node> Worker<N> for TipPoolCleanerWorker
24where
25 N::Backend: StorageBackend,
26{
27 type Config = ();
28 type Error = Infallible;
29
30 fn dependencies() -> &'static [TypeId] {
31 vec![TypeId::of::<TangleWorker>()].leak()
32 }
33
34 async fn start(node: &mut N, _config: Self::Config) -> Result<Self, Self::Error> {
35 let tangle = node.resource::<Tangle<N::Backend>>();
36
37 node.spawn::<Self, _, _>(|shutdown| async move {
38 info!("Running.");
39
40 let mut ticker = ShutdownStream::new(
41 shutdown,
42 IntervalStream::new(interval(Duration::from_secs(TIP_POOL_CLEANER_INTERVAL))),
43 );
44
45 while ticker.next().await.is_some() {
46 tangle.reduce_tips().await
47 }
48
49 info!("Stopped.");
50 });
51
52 Ok(Self::default())
53 }
54}