use std::{
collections::HashMap,
fmt::Debug,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use tokio::sync::RwLock;
use tracing::info;
use crate::{
config::AppConfig, ProtocolItem, Resps, SelfId, StandardAction, StandardEvent, WalleError,
WalleResult,
};
mod bot;
pub(crate) type BoxEventHandler<E, A, R> =
Box<dyn crate::handle::EventHandler<E, A, R> + Send + Sync>;
pub(crate) type CustomRespSender<R> = tokio::sync::oneshot::Sender<R>;
pub(crate) type CustomActionSender<A, R> =
tokio::sync::mpsc::UnboundedSender<(A, CustomRespSender<R>)>;
pub type StandardOneBot = OneBot<StandardEvent, StandardAction, Resps, 12>;
pub struct OneBot<E, A, R, const V: u8> {
pub config: AppConfig,
pub bots: RwLock<HashMap<String, ArcBot<A, R>>>,
#[cfg(feature = "websocket")]
#[cfg_attr(docsrs, doc(cfg(feature = "websocket")))]
pub(crate) ws_hooks: crate::hooks::BoxWsHooks<Self>,
pub(crate) event_handler: BoxEventHandler<E, A, R>,
running: AtomicBool,
}
pub type ArcBot<A, R> = Arc<Bot<A, R>>;
pub type StandardArcBot = ArcBot<StandardAction, Resps>;
pub struct Bot<A, R> {
#[allow(dead_code)]
pub self_id: String,
sender: CustomActionSender<A, R>,
}
impl<E, A, R, const V: u8> OneBot<E, A, R, V>
where
E: Sync + Send + 'static,
A: Sync + Send + 'static,
R: Sync + Send + 'static,
{
pub fn new(config: AppConfig, event_handler: BoxEventHandler<E, A, R>) -> Self {
Self {
config,
event_handler,
running: AtomicBool::default(),
bots: RwLock::default(),
#[cfg(feature = "websocket")]
#[cfg_attr(docsrs, doc(cfg(feature = "websocket")))]
ws_hooks: crate::hooks::empty_ws_hooks(),
}
}
}
impl<E, A, R, const V: u8> OneBot<E, A, R, V> {
pub fn arc(self) -> Arc<Self> {
Arc::new(self)
}
pub async fn get_bot(&self, bot_id: &str) -> Option<ArcBot<A, R>> {
self.bots.read().await.get(bot_id).cloned()
}
pub async fn get_bots(&self) -> HashMap<String, ArcBot<A, R>> {
self.bots.read().await.clone()
}
pub fn is_running(&self) -> bool {
self.running.load(Ordering::SeqCst)
}
pub fn is_shutdown(&self) -> bool {
!self.is_running()
}
pub async fn shutdown(&self) {
self.running.swap(false, Ordering::SeqCst);
}
pub(crate) fn set_running(&self) {
self.running.swap(true, Ordering::SeqCst);
}
}
impl<E, A, R, const V: u8> OneBot<E, A, R, V>
where
E: ProtocolItem + SelfId + Clone + Send + 'static + Debug,
A: ProtocolItem + Clone + Send + 'static + Debug,
R: ProtocolItem + Clone + Send + 'static + Debug,
{
pub(crate) async fn insert_bot(
&self,
bot_id: &str,
sender: &CustomActionSender<A, R>,
) -> ArcBot<A, R> {
let bot = Arc::new(Bot::new(bot_id.to_string(), sender.clone()));
self.bots
.write()
.await
.insert(bot_id.to_string(), bot.clone());
bot
}
pub(crate) async fn remove_bot(&self, bot_id: &str) -> Option<ArcBot<A, R>> {
self.bots.write().await.remove(bot_id)
}
pub async fn run(self: &Arc<Self>) -> WalleResult<()> {
if self.is_running() {
return Err(WalleError::AlreadyRunning);
}
info!(target: "Walle-core", "OneBot is starting...");
#[cfg(feature = "http")]
self.http().await;
#[cfg(feature = "websocket")]
self.ws().await;
#[cfg(feature = "websocket")]
self.wsr().await?;
self.running.store(true, Ordering::Relaxed);
Ok(())
}
pub async fn run_block(self: &Arc<Self>) -> WalleResult<()> {
if self.is_running() {
return Err(WalleError::AlreadyRunning);
}
info!(target: "Walle-core", "OneBot is starting...");
#[cfg(feature = "websocket")]
{
let mut joins = self.ws().await;
for join in self.wsr().await? {
joins.push(join);
}
if !joins.is_empty() {
self.running.store(true, Ordering::Relaxed);
}
for join in joins {
let _ = join.await;
}
}
Ok(())
}
}