1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
use crate::ping_clients::ping_client::{PingClientError, PingClientPingResultDetails};
use crate::{ping_client_factory, PingClient, PingClientFactory, PingPortPicker, PingResult, PingWorkerConfig};
use chrono::{offset::Utc, DateTime};
use futures_intrusive::sync::ManualResetEvent;
use std::time::Duration;
use std::{net::SocketAddr, sync::Arc, sync::Mutex};
use tokio::{sync::mpsc, task, task::JoinHandle};

pub struct PingWorker {
    id: u32,
    config: Arc<PingWorkerConfig>,
    stop_event: Arc<ManualResetEvent>,
    port_picker: Arc<Mutex<PingPortPicker>>,
    ping_client: Box<dyn PingClient + Send + Sync>,
    result_sender: mpsc::UnboundedSender<PingResult>,
    is_warmup_worker: bool,
}

impl PingWorker {
    #[tracing::instrument(
        name = "Starting worker",
        level = "debug",
        skip(config, external_ping_client_factory, port_picker, stop_event, result_sender)
    )]
    pub fn run(
        worker_id: u32,
        config: Arc<PingWorkerConfig>,
        external_ping_client_factory: Option<PingClientFactory>,
        port_picker: Arc<Mutex<PingPortPicker>>,
        stop_event: Arc<ManualResetEvent>,
        result_sender: mpsc::UnboundedSender<PingResult>,
        is_warmup_worker: bool,
    ) -> JoinHandle<()> {
        let join_handle = task::spawn(async move {
            let ping_client = ping_client_factory::new_ping_client(&config.protocol, &config.ping_client_config, external_ping_client_factory);

            let mut worker = PingWorker { id: worker_id, config, stop_event, port_picker, ping_client, result_sender, is_warmup_worker };
            worker.run_worker_loop().await;

            tracing::debug!("Ping worker loop exited; worker_id={}", worker.id);
        });

        return join_handle;
    }

    #[tracing::instrument(name = "Running worker loop", level = "debug", skip(self), fields(worker_id = %self.id))]
    async fn run_worker_loop(&mut self) {
        loop {
            let source_port = self.port_picker.lock().expect("Failed getting port picker lock").next();
            match source_port {
                Some(source_port) => self.run_single_ping(source_port).await,
                None => {
                    tracing::debug!("Ping finished, stopping worker; worker_id={}", self.id);
                    return;
                }
            }

            if !self.wait_for_next_schedule().await {
                break;
            }
        }
    }

    #[tracing::instrument(name = "Running single ping", level = "debug", skip(self), fields(worker_id = %self.id))]
    async fn run_single_ping(&mut self, source_port: u16) {
        let source = SocketAddr::new(self.config.source_ip, source_port);
        let target = self.config.target;

        let ping_time = Utc::now();
        match self.ping_client.prepare_ping(&source).await {
            Err(PingClientError::PreparationFailed(e)) => {
                self.process_ping_client_error(&ping_time, source_port, PingClientError::PreparationFailed(e)).await
            }
            Err(_) => panic!("Unexpected failure from prepare_ping! The error type should always be PingClientError::PreparationFailed."),
            Ok(()) => (),
        }

        match self.ping_client.ping(&source, &target).await {
            Ok(result) => self.process_ping_client_result(&ping_time, source_port, result).await,
            Err(error) => self.process_ping_client_error(&ping_time, source_port, error).await,
        }
    }

    #[tracing::instrument(name = "Processing ping client single ping result", level = "debug", skip(self), fields(worker_id = %self.id))]
    async fn process_ping_client_result(&self, ping_time: &DateTime<Utc>, src_port: u16, ping_result: PingClientPingResultDetails) {
        let mut source: Option<SocketAddr> = ping_result.actual_local_addr;

        if source.is_none() {
            source = Some(SocketAddr::new(self.config.source_ip, src_port));
        }

        let result = PingResult::new(
            ping_time,
            self.id,
            self.ping_client.protocol(),
            self.config.target,
            source.unwrap(),
            self.is_warmup_worker,
            !ping_result.is_timeout,
            ping_result.round_trip_time,
            ping_result.is_timeout,
            ping_result.warning,
            None,
        );

        self.result_sender.send(result).unwrap();
    }

    #[tracing::instrument(name = "Processing ping client single ping error", level = "debug", skip(self), fields(worker_id = %self.id))]
    async fn process_ping_client_error(&self, ping_time: &DateTime<Utc>, src_port: u16, error: PingClientError) {
        let source = SocketAddr::new(self.config.source_ip, src_port);

        let result = PingResult::new(
            ping_time,
            self.id,
            self.ping_client.protocol(),
            self.config.target,
            source,
            self.is_warmup_worker,
            false,
            Duration::from_millis(0),
            false,
            None,
            Some(error),
        );

        self.result_sender.send(result).unwrap();
    }

    #[tracing::instrument(name = "Waiting for next schedule", level = "debug", skip(self), fields(worker_id = %self.id))]
    async fn wait_for_next_schedule(&self) -> bool {
        let ping_interval = self.config.ping_interval;
        let result = tokio::time::timeout(ping_interval, self.stop_event.wait()).await;

        // Wait succedded, which means we are signaled to exit.
        if let Ok(_) = result {
            tracing::debug!("Stop event received, stopping worker; worker_id={}", self.id);
            return false;
        }

        // If not, continue to run.
        return true;
    }
}