1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
use crate::{storage::StorageBackend, MsTangle, TangleWorker};
use bee_runtime::{node::Node, shutdown_stream::ShutdownStream, worker::Worker};
use async_trait::async_trait;
use futures::StreamExt;
use log::info;
use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;
use std::{any::TypeId, convert::Infallible, time::Duration};
const TIP_POOL_CLEANER_INTERVAL: u64 = 1;
#[derive(Default)]
pub(crate) struct TipPoolCleanerWorker {}
#[async_trait]
impl<N: Node> Worker<N> for TipPoolCleanerWorker
where
N::Backend: StorageBackend,
{
type Config = ();
type Error = Infallible;
fn dependencies() -> &'static [TypeId] {
vec![TypeId::of::<TangleWorker>()].leak()
}
async fn start(node: &mut N, _config: Self::Config) -> Result<Self, Self::Error> {
let tangle = node.resource::<MsTangle<N::Backend>>();
node.spawn::<Self, _, _>(|shutdown| async move {
info!("Running.");
let mut ticker = ShutdownStream::new(
shutdown,
IntervalStream::new(interval(Duration::from_secs(TIP_POOL_CLEANER_INTERVAL))),
);
while ticker.next().await.is_some() {
tangle.reduce_tips().await
}
info!("Stopped.");
});
Ok(Self::default())
}
}