1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
use crate::{config::TangleConfig, storage::StorageBackend, Tangle};
use bee_runtime::{node::Node, worker::Worker};
use async_trait::async_trait;
use log::{error, warn};
use tokio::time::interval;
use std::{
convert::Infallible,
time::{Duration, Instant},
};
pub struct TangleWorker;
#[async_trait]
impl<N: Node> Worker<N> for TangleWorker
where
N::Backend: StorageBackend,
{
type Config = TangleConfig;
type Error = Infallible;
async fn start(node: &mut N, config: Self::Config) -> Result<Self, Self::Error> {
node.register_resource(Tangle::<N::Backend>::new(config, node.storage()));
Ok(Self)
}
async fn stop(self, node: &mut N) -> Result<(), Self::Error> {
let tangle = if let Some(tangle) = node.remove_resource::<Tangle<N::Backend>>() {
tangle
} else {
warn!(
"The tangle was still in use by other users when the tangle worker stopped. \
This is a bug, but not a critical one. From here, we'll revert to polling the \
tangle until other users are finished with it."
);
let poll_start = Instant::now();
let poll_freq = 20;
let mut interval = interval(Duration::from_millis(poll_freq));
loop {
match node.remove_resource::<Tangle<N::Backend>>() {
Some(tangle) => break tangle,
None => {
if Instant::now().duration_since(poll_start) > Duration::from_secs(5) {
error!(
"Tangle shutdown polling period elapsed. The tangle will be dropped \
without proper shutdown. This should be considered a bug."
);
return Ok(());
} else {
interval.tick().await;
}
}
}
}
};
tangle.shutdown().await;
Ok(())
}
}