use dashmap::DashMap;
use std::sync::{
atomic::{AtomicU8, Ordering},
Arc,
};
use tokio::{sync::RwLock, task::JoinHandle};
use tracing::{debug, info};
use crate::{
config::AppConfig,
event::BaseEvent,
utils::{Echo, EchoS},
Action, ActionResp, ActionRespContent, EventContent, RUNNING, SHUTDOWN,
};
mod action;
pub(crate) type ActionRespSender<R> = tokio::sync::oneshot::Sender<ActionResp<R>>;
pub(crate) type ArcEventHandler<E> =
Arc<dyn crate::handle::EventHandler<BaseEvent<E>> + Send + Sync>;
pub(crate) type CustomActionSender<A> = tokio::sync::mpsc::UnboundedSender<Echo<A>>;
pub(crate) type CustomActionReceiver<A> = tokio::sync::mpsc::UnboundedReceiver<Echo<A>>;
pub type OneBot = CustomOneBot<EventContent, Action, ActionRespContent>;
pub struct CustomOneBot<E, A, R> {
self_id: RwLock<String>,
pub config: AppConfig,
pub(crate) event_handler: ArcEventHandler<E>,
action_sender: CustomActionSender<A>,
pub(crate) action_receiver: RwLock<CustomActionReceiver<A>>,
pub(crate) echo_map: DashMap<EchoS, ActionRespSender<R>>,
#[cfg(feature = "websocket")]
ws_join_handles: RwLock<(
Option<JoinHandle<()>>,
Option<crate::comms::WebSocketServer>,
)>,
status: AtomicU8,
}
impl<E, A, R> CustomOneBot<E, A, R>
where
E: Clone + serde::de::DeserializeOwned + Send + 'static + std::fmt::Debug,
A: Clone + serde::Serialize + Send + 'static + std::fmt::Debug,
R: Clone + serde::de::DeserializeOwned + Send + 'static + std::fmt::Debug,
{
pub fn new(config: AppConfig, event_handler: ArcEventHandler<E>) -> Self {
let (action_sender, action_receiver) = tokio::sync::mpsc::unbounded_channel();
Self {
self_id: RwLock::default(),
config,
event_handler,
action_sender,
action_receiver: RwLock::new(action_receiver),
echo_map: DashMap::new(),
#[cfg(feature = "websocket")]
ws_join_handles: RwLock::default(),
status: AtomicU8::default(),
}
}
pub fn arc(self) -> Arc<Self> {
Arc::new(self)
}
pub async fn self_id(&self) -> String {
self.self_id.read().await.clone()
}
pub(crate) async fn set_id(&self, id: &str) {
if &self.self_id().await != id {
*self.self_id.write().await = id.to_owned()
}
}
pub async fn run(ob: Arc<Self>) -> Result<(), &'static str> {
if ob.status.load(std::sync::atomic::Ordering::SeqCst) == RUNNING {
return Err("OneBot is already running");
}
#[cfg(feature = "websocket")]
if let Some(websocket) = &ob.config.websocket {
info!(target: "Walle-core", "Running WebSocket");
ob.ws_join_handles.write().await.0 =
Some(crate::comms::app::websocket_run(websocket, ob.clone()).await);
ob.status.swap(RUNNING, Ordering::SeqCst);
return Ok(());
}
#[cfg(feature = "websocket")]
if let Some(websocket_rev) = &ob.config.websocket_rev {
info!(target: "Walle-core", "Running WebSocket");
ob.ws_join_handles.write().await.1 =
Some(crate::comms::app::websocket_rev_run(websocket_rev, ob.clone()).await);
ob.status.swap(RUNNING, Ordering::SeqCst);
return Ok(());
}
Err("there is no connect config found")
}
pub fn is_shutdown(&self) -> bool {
if self.status.load(std::sync::atomic::Ordering::SeqCst) == SHUTDOWN {
true
} else {
false
}
}
pub fn is_running(&self) -> bool {
if self.status.load(std::sync::atomic::Ordering::SeqCst) == SHUTDOWN {
false
} else {
true
}
}
pub async fn shutdown(&self) {
use std::mem::swap;
#[cfg(feature = "websocket")]
{
let mut joins = self.ws_join_handles.write().await;
if let Some(j) = &joins.0 {
j.abort();
joins.0 = None;
}
if joins.1.is_some() {
let mut j = None;
swap(&mut joins.1, &mut j);
j.unwrap().abort().await;
}
}
self.status
.swap(SHUTDOWN, std::sync::atomic::Ordering::SeqCst);
}
pub async fn call_action(&self, action: A) {
self.call_action_resp(action).await;
}
pub async fn call_action_resp(&self, action: A) -> Option<ActionResp<R>> {
use colored::*;
debug!(target:"Walle-core", "[{}] Sending action:{:?}", self.self_id().await.red(), action);
let (sender, receiver) = tokio::sync::oneshot::channel();
let echo = EchoS::new(&self.self_id().await);
self.echo_map.insert(echo.clone(), sender);
self.action_sender.send(echo.clone().pack(action)).unwrap();
match tokio::time::timeout(tokio::time::Duration::from_secs(15), async {
if let Ok(r) = receiver.await {
Some(r)
} else {
None
}
})
.await
{
Ok(r) => r,
Err(_) => {
self.echo_map.remove(&echo);
None
}
}
}
}