Skip to main content

veilid_core/network_manager/tasks/
mod.rs

1pub mod rolling_transfers;
2use super::*;
3
4impl NetworkManager {
5    pub fn setup_tasks(&self) {
6        // Set rolling transfers tick task
7        impl_setup_task_async!(
8            self,
9            Self,
10            rolling_transfers_task,
11            rolling_transfers_task_routine
12        );
13
14        // Set address filter task
15        {
16            let registry = self.registry();
17            self.address_filter_task.set_routine(move |s, l, t| {
18                let registry = registry.clone();
19                Box::pin(async move {
20                    registry
21                        .network_manager()
22                        .address_filter()
23                        .address_filter_task_routine(s, Timestamp::new(l), Timestamp::new(t))
24                        .await
25                })
26            });
27        }
28    }
29
30    pub async fn tick_event_handler(&self, _evt: Arc<TickEvent>) {
31        if let Err(e) = self.tick().await {
32            error!("Error in network manager tick: {}", e);
33        }
34    }
35
36    #[cfg_attr(
37        feature = "instrument",
38        instrument(level = "trace", name = "NetworkManager::tick", skip_all, err, fields(__VEILID_LOG_KEY = self.log_key()))
39    )]
40    async fn tick(&self) -> EyreResult<()> {
41        let Ok(_startup_guard) = self.startup_context.startup_lock.enter() else {
42            return Ok(());
43        };
44
45        let net = self.net();
46        let receipt_manager = self.receipt_manager();
47
48        // Run the rolling transfers task
49        self.rolling_transfers_task.tick().await?;
50
51        // Run the address filter task
52        self.address_filter_task.tick().await?;
53
54        // Run the low level network tick
55        net.tick().await?;
56
57        // Run the receipt manager tick
58        receipt_manager.tick().await?;
59
60        // Purge the client allowlist
61        self.purge_client_allowlist();
62
63        Ok(())
64    }
65
66    pub async fn cancel_tasks(&self) {
67        veilid_log!(self debug "stopping receipt manager tasks");
68        let receipt_manager = self.receipt_manager();
69        receipt_manager.cancel_tasks().await;
70
71        let net = self.net();
72        net.cancel_tasks().await;
73
74        veilid_log!(self debug "stopping rolling transfers task");
75        if let Err(e) = self.rolling_transfers_task.stop().await {
76            veilid_log!(self warn "rolling_transfers_task not stopped: {}", e);
77        }
78
79        veilid_log!(self debug "stopping address filter task");
80        if let Err(e) = self.address_filter_task.stop().await {
81            veilid_log!(self warn "address_filter_task not stopped: {}", e);
82        }
83    }
84}