use std::{
collections::HashMap,
fmt::Debug,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tracing::info;
use crate::{
config::AppConfig, handle::EventHandler, ProtocolItem, Resps, SelfId, StandardAction,
StandardEvent, WalleError, WalleResult,
};
mod bot;
pub(crate) type CustomRespSender<R> = tokio::sync::oneshot::Sender<R>;
pub(crate) type CustomActionSender<A, R> =
tokio::sync::mpsc::UnboundedSender<(A, CustomRespSender<R>)>;
pub type StandardOneBot<H> = OneBot<StandardEvent, StandardAction, Resps, H, 12>;
pub struct OneBot<E, A, R, H, const V: u8> {
pub config: AppConfig,
pub bots: RwLock<HashMap<String, ArcBot<A, R>>>,
#[cfg(feature = "websocket")]
#[cfg_attr(docsrs, doc(cfg(feature = "websocket")))]
pub(crate) ws_hooks: crate::hooks::BoxWsHooks<Self>,
pub(crate) event_handler: H,
running: AtomicBool,
}
pub type ArcBot<A, R> = Arc<Bot<A, R>>;
pub type StandardArcBot = ArcBot<StandardAction, Resps>;
pub struct Bot<A, R> {
#[allow(dead_code)]
pub self_id: String,
sender: CustomActionSender<A, R>,
}
impl<E, A, R, H, const V: u8> OneBot<E, A, R, H, V>
where
E: Sync + Send + 'static,
A: Sync + Send + 'static,
R: Sync + Send + 'static,
H: Sync + Send + 'static,
{
pub fn new(config: AppConfig, event_handler: H) -> Self {
Self {
config,
event_handler,
running: AtomicBool::default(),
bots: RwLock::default(),
#[cfg(feature = "websocket")]
#[cfg_attr(docsrs, doc(cfg(feature = "websocket")))]
ws_hooks: crate::hooks::empty_ws_hooks(),
}
}
}
impl<E, A, R, H, const V: u8> OneBot<E, A, R, H, V> {
pub fn arc(self) -> Arc<Self> {
Arc::new(self)
}
pub async fn get_bot(&self, bot_id: &str) -> Option<ArcBot<A, R>> {
self.bots.read().await.get(bot_id).cloned()
}
pub async fn get_bots(&self) -> HashMap<String, ArcBot<A, R>> {
self.bots.read().await.clone()
}
pub fn is_running(&self) -> bool {
self.running.load(Ordering::SeqCst)
}
pub fn is_shutdown(&self) -> bool {
!self.is_running()
}
pub async fn shutdown(&self) {
self.running.swap(false, Ordering::SeqCst);
}
pub(crate) fn set_running(&self) {
self.running.swap(true, Ordering::SeqCst);
}
}
impl<E, A, R, H, const V: u8> OneBot<E, A, R, H, V>
where
E: ProtocolItem + SelfId + Clone + Send + 'static + Debug,
A: ProtocolItem + Clone + Send + 'static + Debug,
R: ProtocolItem + Clone + Send + 'static + Debug,
H: EventHandler<E, A, R> + Send + Sync + 'static,
{
pub(crate) async fn insert_bot(
&self,
bot_id: &str,
sender: &CustomActionSender<A, R>,
) -> ArcBot<A, R> {
let bot = Arc::new(Bot::new(bot_id.to_string(), sender.clone()));
self.bots
.write()
.await
.insert(bot_id.to_string(), bot.clone());
bot
}
pub(crate) async fn remove_bot(&self, bot_id: &str) -> Option<ArcBot<A, R>> {
self.bots.write().await.remove(bot_id)
}
pub async fn run(self: &Arc<Self>) -> WalleResult<Vec<JoinHandle<()>>> {
if self.is_running() {
return Err(WalleError::AlreadyRunning);
}
info!(target: "Walle-core", "OneBot is starting...");
let mut joins = vec![];
#[cfg(feature = "http")]
self.http(&mut joins).await;
#[cfg(feature = "websocket")]
self.ws(&mut joins).await;
#[cfg(feature = "websocket")]
self.wsr(&mut joins).await?;
Ok(joins)
}
pub async fn run_block(self: &Arc<Self>) -> WalleResult<()> {
for join in self.run().await? {
let _ = join.await;
}
Ok(())
}
}