Skip to main content

kovi_onebot/
driver.rs

1use std::sync::Arc;
2
3use crate::driver::config::{OneBotDriverConfig, Server};
4use crate::driver::connect::api_cnt::{OneBotApiOneshotSender, OneBotSendApi};
5use crate::event::MsgEvent;
6use kovi::bot::SendApi;
7use kovi::driver::{Driver, DriverEvent, MessageEventRegister};
8use kovi::futures_util;
9use log::{error, info};
10use tokio::sync::{Mutex, OnceCell, mpsc};
11
12pub mod config;
13pub(crate) mod connect;
14
15/// echo -> oneshot sender,用于将 WS 返回的响应路由回调用者
16pub(crate) type OneshotTxMap =
17    Arc<parking_lot::Mutex<ahash::HashMap<String, OneBotApiOneshotSender>>>;
18
19/// Drop 时自动 abort 的任务句柄
20pub(crate) struct AbortOnDrop(pub(crate) tokio::task::JoinHandle<()>);
21impl Drop for AbortOnDrop {
22    fn drop(&mut self) {
23        self.0.abort();
24    }
25}
26
27/// 初始化一次后持有的上下文:写端 sender + 后台任务句柄
28pub(crate) struct ApiContext {
29    pub(crate) api_tx: mpsc::Sender<(OneBotSendApi, Option<OneBotApiOneshotSender>)>,
30    /// 字段名以 _ 开头,只用于 Drop 时自动 abort 任务
31    _tasks: Vec<AbortOnDrop>,
32}
33
34pub type EventTx = Arc<Mutex<Option<mpsc::Sender<Result<DriverEvent, kovi::driver::AnyError>>>>>;
35
36pub struct OneBotDriver {
37    pub(crate) server: Arc<Server>,
38    /// 异步 OnceCell:保证并发时只初始化一次
39    ctx: Arc<OnceCell<ApiContext>>,
40    pub(crate) event_tx: EventTx,
41}
42
43impl OneBotDriver {
44    pub fn new(config: OneBotDriverConfig) -> Self {
45        let config = OneBotDriverConfig::normalize_path(config);
46
47        Self {
48            server: Arc::new(config.server),
49            ctx: Arc::new(OnceCell::new()),
50            event_tx: Arc::new(Mutex::new(None)),
51        }
52    }
53}
54
55#[async_trait::async_trait]
56impl Driver for OneBotDriver {
57    async fn event_channel(
58        &self,
59    ) -> Result<
60        std::pin::Pin<
61            Box<
62                dyn futures_util::Stream<Item = Result<DriverEvent, kovi::driver::AnyError>> + Send,
63            >,
64        >,
65        kovi::driver::AnyError,
66    > {
67        let (event_tx, event_rx) = mpsc::channel(64);
68        {
69            let mut guard = self.event_tx.lock().await;
70            *guard = Some(event_tx);
71        }
72
73        match self.handler_lifecycle_log_bot_enable().await {
74            Ok(_) => {}
75            Err(_) => {
76                log::error!("Failed to initialize onebot connection");
77                return Err("Failed to initialize onebot connection".into());
78            }
79        };
80
81        OneBotDriver::ws_event_connect((*self.server).clone(), event_rx).await
82    }
83
84    fn api_handler(
85        &self,
86        value: kovi::bot::SendApi,
87    ) -> std::pin::Pin<
88        Box<
89            dyn std::future::Future<
90                    Output = Result<
91                        Result<kovi::ApiReturn, kovi::ApiReturn>,
92                        kovi::driver::AnyError,
93                    >,
94                > + Send,
95        >,
96    > {
97        if self.ctx.initialized() {
98            let ctx = Arc::clone(&self.ctx);
99            Box::pin(async move {
100                // 初始化后只用 api_tx(Sender clone 极廉价),server 不再传入热路径
101                let api_tx = ctx.get().expect("unreachable").api_tx.clone();
102
103                OneBotDriver::send_api_inner(api_tx, value).await
104            })
105        } else {
106            let server = Arc::clone(&self.server);
107            let event_tx = Arc::clone(&self.event_tx);
108            let self_ctx = Arc::clone(&self.ctx);
109            Box::pin(async move {
110                let api_tx = self_ctx
111                    .get_or_try_init(|| OneBotDriver::init_api_context(server, event_tx))
112                    .await?
113                    .api_tx
114                    .clone();
115                OneBotDriver::send_api_inner(api_tx, value).await
116            })
117        }
118    }
119
120    fn message_event_register(&self) -> MessageEventRegister {
121        MessageEventRegister::register::<MsgEvent>()
122    }
123}
124
125impl std::fmt::Display for OneBotSendApi {
126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127        write!(f, "{}", serde_json::to_string(self).expect("unreachable"))
128    }
129}
130
131impl OneBotDriver {
132    pub(crate) async fn handler_lifecycle_log_bot_enable(&self) -> Result<(), ()> {
133        let api_msg = SendApi::new("get_login_info", serde_json::json!({}));
134
135        let res = match self.api_handler(api_msg).await {
136            Ok(v) => v,
137            Err(err) => {
138                let server_url = self.server.ws_url("api");
139                error!("failed to initialize api_handler (server url: {server_url}): {err}");
140                return Err(());
141            }
142        };
143
144        let self_info_value = match res {
145            Ok(v) => v,
146            Err(e) => {
147                error!("Lifecycle Error, get bot info failed: {e}");
148                return Err(());
149            }
150        };
151
152        let self_id = match self_info_value.data.get("user_id") {
153            Some(user_id) => match user_id.as_i64() {
154                Some(id) => id,
155                None => {
156                    error!("Expected 'user_id' to be an integer");
157                    return Err(());
158                }
159            },
160            None => {
161                error!("Missing 'user_id' in self_info_value data");
162                return Err(());
163            }
164        };
165        let self_name = match self_info_value.data.get("nickname") {
166            Some(nickname) => nickname.to_string(),
167            None => {
168                error!("Missing 'nickname' in self_info_value data");
169                return Err(());
170            }
171        };
172        info!("Bot connection successful,Nickname:{self_name},ID:{self_id}");
173
174        Ok(())
175    }
176}