watchmend/
engine.rs

1use crate::common::config::Config;
2use tokio::sync::mpsc;
3use tracing::info;
4
5use crate::global;
6
7#[cfg(feature = "sock")]
8pub mod sock;
9
10#[cfg(feature = "socket")]
11pub mod socket;
12
13#[cfg(feature = "http")]
14pub mod http;
15
16pub async fn start(config: Config, load: bool) {
17    if load {
18        if let Some(path) = config.watchmen.cache.clone() {
19            global::set_cache(path.clone()).await;
20            match global::load(&path).await {
21                Ok(_) => {
22                    info!("Cache tasks loaded.");
23                    println!("Cache tasks loaded.");
24                }
25                Err(e) => {
26                    info!("Cache tasks load failed: {}", e);
27                    println!("Cache tasks load failed: {}", e);
28                }
29            }
30        } else {
31            info!("Cache tasks load failed: cache file not set in config.");
32            println!("Cache tasks load failed: cache file not set in config.");
33        }
34    }
35
36    let (tx, mut rx) = mpsc::channel::<i32>(12);
37
38    let tx_ctrl_c = tx.clone(); // 监听到 ctrl c 通信管道
39    let tx_ctrl_d = tx.clone(); // 监听到 ctrl d 通信管道
40
41    // ctrl c 停止运行 / terminate on ctrl-c
42    let s_ctrl_c = tokio::spawn(async move {
43        tokio::signal::ctrl_c().await.unwrap();
44        tx_ctrl_c.send(9).await.unwrap();
45    });
46
47    // ctrl d 停止运行 / terminate on ctrl-d
48    let s_ctrl_d = tokio::spawn(async move {
49        tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
50            .unwrap()
51            .recv()
52            .await;
53        tx_ctrl_d.send(15).await.unwrap();
54    });
55
56    #[cfg(feature = "sock")]
57    // sock in config.watchmen.engines ?
58    let joinhandle_sock = if config.watchmen.engines.contains(&"sock".to_string()) {
59        info!("Starting sock...");
60        println!("Starting sock...");
61        Some(sock::start(config.clone()).await)
62    } else {
63        None
64    };
65
66    #[cfg(feature = "socket")]
67    let joinhandle_socket = if config.watchmen.engines.contains(&"socket".to_string()) {
68        info!("Starting socket...");
69        println!("Starting socket...");
70        Some(socket::start(config.clone()).await)
71    } else {
72        None
73    };
74
75    #[cfg(feature = "http")]
76    let joinhandle_http = if config.watchmen.engines.contains(&"http".to_string()) {
77        info!("Starting http...");
78        println!("Starting http...");
79        Some(http::start(config.clone()).await)
80    } else {
81        None
82    };
83
84    info!("All engines started.");
85    println!("All engines started.");
86
87    // ================== Wait for all tasks to complete ==================
88
89    let _res = rx.recv().await;
90
91    s_ctrl_c.abort();
92    s_ctrl_d.abort();
93
94    println!("Shutting down...");
95
96    #[cfg(feature = "sock")]
97    if config.watchmen.engines.contains(&"sock".to_string()) && joinhandle_sock.is_some() {
98        joinhandle_sock.unwrap().abort();
99    }
100
101    #[cfg(feature = "socket")]
102    if config.watchmen.engines.contains(&"socket".to_string()) && joinhandle_socket.is_some() {
103        joinhandle_socket.unwrap().abort();
104    }
105
106    #[cfg(feature = "http")]
107    if config.watchmen.engines.contains(&"http".to_string()) && joinhandle_http.is_some() {
108        joinhandle_http.unwrap().abort();
109    }
110}