1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
use crate::{
    rnp_utils, PingPortPicker, PingResult, PingResultProcessingWorker, PingWorker, RnpCoreConfig, RNP_ABOUT, RNP_AUTHOR,
    RNP_NAME,
};
use futures_intrusive::sync::ManualResetEvent;
use std::sync::{Arc, Mutex};
use tokio::{sync::mpsc, task::JoinHandle};

pub struct RnpCore {
    config: RnpCoreConfig,

    stop_event: Arc<ManualResetEvent>,
    worker_join_handles: Vec<JoinHandle<()>>,
    ping_result_processor_join_handle: Option<JoinHandle<()>>,
}

impl RnpCore {
    #[tracing::instrument(name = "Start running Rnp core", level = "debug", skip(stop_event))]
    pub fn start_run(config: RnpCoreConfig, stop_event: Arc<ManualResetEvent>) -> RnpCore {
        let mut rnp_core = RnpCore {
            config,
            stop_event,
            worker_join_handles: Vec::new(),
            ping_result_processor_join_handle: None,
        };

        rnp_core.start();

        return rnp_core;
    }

    fn start(&mut self) {
        self.log_header_to_console();

        let ping_result_sender = self.create_ping_result_processing_worker();
        self.create_ping_workers(ping_result_sender);
    }

    fn log_header_to_console(&self) {
        println!("{} - {} - {}\n", RNP_NAME, RNP_AUTHOR, RNP_ABOUT);

        println!(
            "Start testing {} {:?}:",
            rnp_utils::format_protocol(self.config.worker_config.protocol),
            self.config.worker_config.target
        );
    }

    #[tracing::instrument(name = "Creating ping result processing worker", level = "debug", skip(self))]
    fn create_ping_result_processing_worker(&mut self) -> mpsc::Sender<PingResult> {
        let mut ping_result_channel_size = self.config.worker_scheduler_config.parallel_ping_count * 2;
        if ping_result_channel_size < 128 {
            ping_result_channel_size = 128;
        }

        let (ping_result_sender, ping_result_receiver) = mpsc::channel(ping_result_channel_size as usize);
        self.ping_result_processor_join_handle = Some(PingResultProcessingWorker::run(
            Arc::new(self.config.result_processor_config.clone()),
            self.stop_event.clone(),
            ping_result_receiver,
        ));

        return ping_result_sender;
    }

    #[tracing::instrument(name = "Creating all ping workers", level = "debug", skip(self, sender))]
    fn create_ping_workers(&mut self, sender: mpsc::Sender<PingResult>) {
        let mut worker_join_handles = Vec::new();

        let source_port_picker = Arc::new(Mutex::new(PingPortPicker::new(
            self.config.worker_scheduler_config.ping_count,
            self.config.worker_scheduler_config.source_port_min,
            self.config.worker_scheduler_config.source_port_max,
            &self.config.worker_scheduler_config.source_port_list,
        )));

        let worker_count = self.config.worker_scheduler_config.parallel_ping_count;
        let shared_worker_config = Arc::new(self.config.worker_config.clone());
        for worker_id in 0..worker_count {
            let worker_join_handle = PingWorker::run(
                worker_id,
                shared_worker_config.clone(),
                source_port_picker.clone(),
                self.stop_event.clone(),
                sender.clone(),
            );
            worker_join_handles.push(worker_join_handle);
        }

        self.worker_join_handles = worker_join_handles;
    }

    #[tracing::instrument(name = "Waiting for all workers to finish", level = "debug", skip(self))]
    pub async fn join(&mut self) {
        for join_handle in &mut self.worker_join_handles {
            join_handle.await.unwrap();
        }
        self.worker_join_handles.clear();
        tracing::debug!("All workers are stopped.");

        if !self.stop_event.is_set() {
            tracing::debug!(
                "All ping jobs are completed and all workers are stopped. Signal result processor to exit."
            );
            self.stop_event.set();
        }
        self.ping_result_processor_join_handle.take().unwrap().await.unwrap();
    }
}