mod connect;
use super::Bot;
use crate::PluginBuilder;
use crate::bot::handler::{ExitEvent, InternalInternalEvent};
use crate::types::ApiAndOptOneshot;
use log::error;
use parking_lot::RwLock;
use std::borrow::Borrow;
use std::future::Future;
use std::process::exit;
use std::sync::{Arc, LazyLock};
use tokio::sync::{mpsc, watch};
use tokio::task::JoinHandle;
impl Bot {
pub fn spawn<F>(&mut self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let join = tokio::spawn(future);
self.run_abort.push(join.abort_handle());
join
}
pub async fn run(self) {
let bot = Arc::new(RwLock::new(self));
Self::hander_event(bot).await;
}
async fn hander_event(bot: Arc<RwLock<Bot>>) {
let (self_event_tx, mut self_event_rx): (
mpsc::Sender<InternalInternalEvent>,
mpsc::Receiver<InternalInternalEvent>,
) = mpsc::channel(32);
let (self_api_tx, self_api_rx): (
mpsc::Sender<ApiAndOptOneshot>,
mpsc::Receiver<ApiAndOptOneshot>,
) = mpsc::channel(32);
{
let mut bot_write = bot.write();
let drive = bot_write.drive.clone();
bot_write.spawn(exit_signal_check(self_event_tx.clone()));
bot_write.spawn(connect::event_connect(self_event_tx.clone(), drive.clone()));
bot_write.spawn(connect::send_connect(
self_api_rx,
self_event_tx,
drive.clone(),
));
bot_write.spawn({
let bot = bot.clone();
let self_api_tx = self_api_tx.clone();
async move { Self::run_mains(bot, self_api_tx) }
});
}
let mut drop_task = None;
while let Some(event) = self_event_rx.recv().await {
let self_api_tx = self_api_tx.clone();
let bot = bot.clone();
if let InternalInternalEvent::Exit(exit_event) = &event {
drop_task = Some(tokio::spawn(Self::handler_event(
bot,
event.clone(),
self_api_tx,
)));
match exit_event {
ExitEvent::FromDrive => {
break;
}
ExitEvent::FromSignal => handler_second_time_exit_signal(),
}
} else {
tokio::spawn(Self::handler_event(bot, event, self_api_tx));
}
}
if let Some(drop_task) = drop_task {
match drop_task.await {
Ok(_) => {}
Err(e) => {
error!("{e}")
}
};
}
}
fn run_mains(bot: Arc<RwLock<Self>>, api_tx: mpsc::Sender<ApiAndOptOneshot>) {
let bot_ = bot.read();
let main_job_map = bot_.plugins.borrow();
for (name, plugin) in main_job_map.iter() {
if !plugin.enable_on_startup {
continue;
}
let plugin_builder = PluginBuilder::new(name.clone(), bot.clone(), api_tx.clone());
plugin.run(plugin_builder);
}
}
}
pub(crate) static DROP_CHECK: LazyLock<ExitCheck> = LazyLock::new(ExitCheck::init);
pub struct ExitCheck {
watch_rx: watch::Receiver<bool>,
join_handle: tokio::task::JoinHandle<()>,
}
impl Drop for ExitCheck {
fn drop(&mut self) {
self.join_handle.abort();
}
}
impl ExitCheck {
fn init() -> ExitCheck {
let (tx, watch_rx) = watch::channel(false);
let join_handle = tokio::spawn(async move {
Self::await_exit_signal().await;
let _ = tx.send(true);
Self::await_exit_signal().await;
handler_second_time_exit_signal();
});
ExitCheck {
watch_rx,
join_handle,
}
}
async fn await_exit_signal() {
#[cfg(unix)]
use tokio::signal::unix::{SignalKind, signal};
#[cfg(windows)]
use tokio::signal::windows;
#[cfg(windows)]
{
let mut sig_ctrl_break = windows::ctrl_break().expect("unreachable");
let mut sig_ctrl_c = windows::ctrl_c().expect("unreachable");
let mut sig_ctrl_close = windows::ctrl_close().expect("unreachable");
let mut sig_ctrl_logoff = windows::ctrl_logoff().expect("unreachable");
let mut sig_ctrl_shutdown = windows::ctrl_shutdown().expect("unreachable");
tokio::select! {
_ = sig_ctrl_break.recv() => {}
_ = sig_ctrl_c.recv() => {}
_ = sig_ctrl_close.recv() => {}
_ = sig_ctrl_logoff.recv() => {}
_ = sig_ctrl_shutdown.recv() => {}
}
}
#[cfg(unix)]
{
let mut sig_hangup = signal(SignalKind::hangup()).expect("unreachable");
let mut sig_alarm = signal(SignalKind::alarm()).expect("unreachable");
let mut sig_interrupt = signal(SignalKind::interrupt()).expect("unreachable");
let mut sig_quit = signal(SignalKind::quit()).expect("unreachable");
let mut sig_terminate = signal(SignalKind::terminate()).expect("unreachable");
tokio::select! {
_ = sig_hangup.recv() => {}
_ = sig_alarm.recv() => {}
_ = sig_interrupt.recv() => {}
_ = sig_quit.recv() => {}
_ = sig_terminate.recv() => {}
}
}
}
pub async fn await_exit_signal_change(&self) {
let mut rx = self.watch_rx.clone();
rx.changed().await.expect("The exit signal wait failed");
}
}
pub(crate) async fn exit_signal_check(tx: mpsc::Sender<InternalInternalEvent>) {
DROP_CHECK.await_exit_signal_change().await;
tx.send(InternalInternalEvent::Exit(ExitEvent::FromSignal))
.await
.expect("The exit signal send failed");
}
fn handler_second_time_exit_signal() {
exit(1)
}