use std::sync::{atomic::AtomicU64, Arc};
use super::OBC;
use crate::util::{Echo, EchoInner, EchoS, ProtocolItem, SelfId, SelfIds};
use crate::{ActionHandler, EventHandler, GetStatus, OneBot};
use crate::{WalleError, WalleResult};
use async_trait::async_trait;
use dashmap::DashMap;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tracing::{info, warn};
#[cfg(feature = "http")]
mod app_http;
#[cfg(feature = "websocket")]
mod app_ws;
pub(crate) type EchoMap<R> = Arc<DashMap<EchoS, oneshot::Sender<R>>>;
pub(crate) type BotMap<A> = Arc<DashMap<String, Vec<mpsc::UnboundedSender<Echo<A>>>>>;
pub struct AppOBC<A, R> {
pub(crate) echos: EchoMap<R>, pub(crate) seq: AtomicU64, pub bots: BotMap<A>, }
impl<A, R> AppOBC<A, R> {
pub fn new() -> Self {
Default::default()
}
}
impl<A, R> Default for AppOBC<A, R> {
fn default() -> Self {
Self {
echos: Arc::new(DashMap::new()),
seq: AtomicU64::default(),
bots: Arc::new(Default::default()),
}
}
}
impl<A, R> AppOBC<A, R> {
pub(crate) fn next_seg(&self) -> EchoS {
EchoS(Some(EchoInner::S(
self.seq
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
.to_string(),
)))
}
}
#[async_trait]
impl<E, A, R> ActionHandler<E, A, R, 12> for AppOBC<A, R>
where
E: ProtocolItem + Clone + SelfId,
A: ProtocolItem + SelfId,
R: ProtocolItem,
{
type Config = crate::config::AppConfig;
async fn start<AH, EH>(
&self,
ob: &Arc<OneBot<AH, EH, 12>>,
config: crate::config::AppConfig,
) -> WalleResult<Vec<JoinHandle<()>>>
where
AH: ActionHandler<E, A, R, 12> + Send + Sync + 'static,
EH: EventHandler<E, A, R, 12> + Send + Sync + 'static,
{
let mut tasks = vec![];
#[cfg(feature = "websocket")]
{
self.wsr(ob, config.websocket_rev, &mut tasks).await?;
self.ws(ob, config.websocket, &mut tasks).await?;
}
#[cfg(feature = "http")]
{
self.webhook(ob, config.http_webhook, &mut tasks).await?;
self.http(ob, config.http, &mut tasks).await?;
}
Ok(tasks)
}
async fn call(&self, action: A) -> WalleResult<R> {
match self.bots.get_bot(&action.self_id()) {
Some(action_txs) => {
let (tx, rx) = oneshot::channel();
let seq = self.next_seg();
self.echos.insert(seq.clone(), tx);
action_txs
.first()
.unwrap() .send(seq.pack(action))
.map_err(|e| {
warn!(target: super::OBC, "send action error: {}", e);
WalleError::Other(e.to_string())
})?;
match tokio::time::timeout(std::time::Duration::from_secs(10), rx).await {
Ok(Ok(res)) => Ok(res),
Ok(Err(e)) => {
warn!(target: super::OBC, "resp recv error: {:?}", e);
Err(WalleError::Other(e.to_string()))
}
Err(_) => {
warn!(target: super::OBC, "resp timeout");
Err(WalleError::Other("resp timeout".to_string()))
}
}
}
None => {
warn!(target: super::OBC, "bot not found");
Err(WalleError::BotNotExist)
}
}
}
}
pub trait BotMapExt<A> {
fn ensure_bot(&self, bot_id: &str, tx: &mpsc::UnboundedSender<Echo<A>>);
fn remove_bot(&self, bot_id: &str, tx: &mpsc::UnboundedSender<Echo<A>>);
fn get_bot(&self, bot_id: &str) -> Option<Vec<mpsc::UnboundedSender<Echo<A>>>>;
}
impl<A> BotMapExt<A> for DashMap<String, Vec<mpsc::UnboundedSender<Echo<A>>>> {
fn ensure_bot(&self, bot_id: &str, tx: &mpsc::UnboundedSender<Echo<A>>) {
let mut refmut = self.entry(bot_id.to_string()).or_default();
for x in refmut.value() {
if tx.same_channel(&x) {
return;
}
}
refmut.push(tx.clone());
info!(target: super::OBC, "New Bot connected: {}", bot_id);
}
fn remove_bot(&self, bot_id: &str, tx: &mpsc::UnboundedSender<Echo<A>>) {
let mut empty = false;
if let Some(mut txs) = self.get_mut(bot_id) {
for i in 0..txs.len() {
if tx.same_channel(&txs[i]) {
txs.remove(i);
break;
}
}
if txs.value().is_empty() {
empty = true;
}
};
if empty {
self.remove(bot_id);
info!(target: super::OBC, "Bot disconnected: {}", bot_id);
}
}
fn get_bot(&self, bot_id: &str) -> Option<Vec<mpsc::UnboundedSender<Echo<A>>>> {
self.get(bot_id).as_deref().cloned()
}
}
#[async_trait]
impl<A, R> SelfIds for AppOBC<A, R>
where
A: Send + 'static,
R: Send + 'static,
{
async fn self_ids(&self) -> Vec<String> {
self.bots.iter().map(|r| r.key().clone()).collect()
}
}
impl<A, R> GetStatus for AppOBC<A, R> {
fn get_status(&self) -> crate::structs::Status {
crate::structs::Status {
good: true,
online: true,
}
}
}