bee_tangle/
tangle_worker.rs1use crate::{config::TangleConfig, storage::StorageBackend, Tangle};
5
6use bee_runtime::{node::Node, worker::Worker};
7
8use async_trait::async_trait;
9use log::{error, warn};
10use tokio::time::interval;
11
12use std::{
13 convert::Infallible,
14 time::{Duration, Instant},
15};
16
17pub struct TangleWorker;
19
20#[async_trait]
21impl<N: Node> Worker<N> for TangleWorker
22where
23 N::Backend: StorageBackend,
24{
25 type Config = TangleConfig;
26 type Error = Infallible;
27
28 async fn start(node: &mut N, config: Self::Config) -> Result<Self, Self::Error> {
29 node.register_resource(Tangle::<N::Backend>::new(config, node.storage()));
30
31 Ok(Self)
32 }
33
34 async fn stop(self, node: &mut N) -> Result<(), Self::Error> {
35 let tangle = if let Some(tangle) = node.remove_resource::<Tangle<N::Backend>>() {
36 tangle
37 } else {
38 warn!(
39 "The tangle was still in use by other users when the tangle worker stopped. \
40 This is a bug, but not a critical one. From here, we'll revert to polling the \
41 tangle until other users are finished with it."
42 );
43
44 let poll_start = Instant::now();
45 let poll_freq = 20;
46 let mut interval = interval(Duration::from_millis(poll_freq));
47 loop {
48 match node.remove_resource::<Tangle<N::Backend>>() {
49 Some(tangle) => break tangle,
50 None => {
51 if Instant::now().duration_since(poll_start) > Duration::from_secs(5) {
52 error!(
53 "Tangle shutdown polling period elapsed. The tangle will be dropped \
54 without proper shutdown. This should be considered a bug."
55 );
56 return Ok(());
57 } else {
58 interval.tick().await;
59 }
60 }
61 }
62 }
63 };
64
65 tangle.shutdown().await;
66
67 Ok(())
68 }
69}