walle_core/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc = include_str!("../README.md")]
3
4#[doc(hidden)]
5pub const VERSION: &str = env!("CARGO_PKG_VERSION");
6/// "Walle-core"
7pub const WALLE_CORE: &str = "Walle-core";
8
9pub mod action;
10#[cfg(feature = "alt")]
11pub mod alt;
12pub mod config;
13pub mod error;
14pub mod event;
15pub mod resp;
16pub mod segment;
17pub mod structs;
18pub mod util;
19
20mod ah;
21pub use ah::{ActionHandler, GetSelfs, GetStatus, GetVersion};
22mod eh;
23pub use eh::EventHandler;
24use tokio::task::JoinHandle;
25
26#[cfg(any(feature = "impl-obc", feature = "app-obc"))]
27pub mod obc;
28#[cfg(test)]
29mod test;
30
31pub mod prelude {
32    pub use super::*;
33    pub use crate::error::{WalleError, WalleResult};
34    pub use crate::util::{Echo, GetSelf, OneBotBytes, Value, ValueMap, ValueMapExt};
35    pub use crate::{value, value_map, value_vec};
36    pub use async_trait::async_trait;
37    pub use walle_macro::{PushToValueMap, ToAction, ToEvent, ToMsgSegment};
38    pub use walle_macro::{TryFromAction, TryFromEvent, TryFromMsgSegment, TryFromValue};
39
40    pub use crate::action::{Action, BaseAction, ToAction, TryFromAction};
41    pub use crate::event::{BaseEvent, Event, ToEvent, TryFromEvent};
42    pub use crate::resp::{resp_error, Resp};
43    pub use crate::segment::{
44        IntoMessage, MessageExt, MsgSegment, Segments, ToMsgSegment, TryFromMsgSegment,
45    };
46    pub use crate::structs::*;
47}
48
49/// 基础抽象模型,持有 ActionHandler 与 EventHandler
50pub struct OneBot<AH, EH> {
51    action_handler: AH,
52    event_handler: EH,
53    // Some for running, None for stopped
54    signal: StdMutex<Option<tokio::sync::broadcast::Sender<()>>>,
55    ah_tasks: Mutex<Vec<JoinHandle<()>>>,
56    eh_tasks: Mutex<Vec<JoinHandle<()>>>,
57}
58
59use std::future::Future;
60use std::pin::Pin;
61use std::sync::{Arc, Mutex as StdMutex};
62use tokio::sync::Mutex;
63
64pub use crate::error::{WalleError, WalleResult};
65
66impl<AH, EH> OneBot<AH, EH> {
67    pub fn new(action_handler: AH, event_handler: EH) -> Self {
68        Self {
69            action_handler,
70            event_handler,
71            signal: StdMutex::new(None),
72            ah_tasks: Mutex::default(),
73            eh_tasks: Mutex::default(),
74        }
75    }
76    pub async fn start<E, A, R>(
77        self: &Arc<Self>,
78        ah_config: AH::Config,
79        eh_config: EH::Config,
80        ah_first: bool,
81    ) -> WalleResult<()>
82    where
83        E: Send + Sync + 'static,
84        A: Send + Sync + 'static,
85        R: Send + Sync + 'static,
86        AH: ActionHandler<E, A, R> + Send + Sync + 'static,
87        EH: EventHandler<E, A, R> + Send + Sync + 'static,
88    {
89        if !self.set_signal() {
90            return Err(WalleError::AlreadyStarted);
91        }
92        if ah_first {
93            *self.ah_tasks.lock().await = self.action_handler.start(self, ah_config).await?;
94            *self.eh_tasks.lock().await = self.event_handler.start(self, eh_config).await?;
95        } else {
96            *self.eh_tasks.lock().await = self.event_handler.start(self, eh_config).await?;
97            *self.ah_tasks.lock().await = self.action_handler.start(self, ah_config).await?;
98        }
99        Ok(())
100    }
101    pub async fn wait_all(&self) {
102        let mut tasks: Vec<JoinHandle<()>> = std::mem::take(self.ah_tasks.lock().await.as_mut());
103        tasks.extend(
104            std::mem::take::<Vec<JoinHandle<()>>>(self.eh_tasks.lock().await.as_mut()).into_iter(),
105        );
106        for task in tasks {
107            task.await.ok();
108        }
109    }
110    pub fn set_signal(&self) -> bool {
111        let mut signal = self.signal.lock().unwrap();
112        if signal.is_none() {
113            let (tx, _) = tokio::sync::broadcast::channel(1);
114            *signal = Some(tx);
115            true
116        } else {
117            false
118        }
119    }
120    pub fn is_started(&self) -> bool {
121        self.signal.lock().unwrap().is_some()
122    }
123    pub fn get_signal_rx(&self) -> WalleResult<tokio::sync::broadcast::Receiver<()>> {
124        Ok(self
125            .signal
126            .lock()
127            .unwrap()
128            .as_ref()
129            .ok_or(WalleError::NotStarted)?
130            .subscribe())
131    }
132    pub async fn shutdown<E, A, R>(&self, ah_first: bool) -> WalleResult<()>
133    where
134        E: Send + Sync + 'static,
135        A: Send + Sync + 'static,
136        R: Send + Sync + 'static,
137        AH: ActionHandler<E, A, R> + Send + Sync + 'static,
138        EH: EventHandler<E, A, R> + Send + Sync + 'static,
139    {
140        let tx = self
141            .signal
142            .lock()
143            .unwrap()
144            .take()
145            .ok_or(WalleError::NotStarted)?;
146        tx.send(()).ok();
147        if ah_first {
148            self.action_handler.shutdown().await;
149            self.event_handler.shutdown().await;
150        } else {
151            self.event_handler.shutdown().await;
152            self.action_handler.shutdown().await;
153        }
154        Ok(self.wait_all().await)
155    }
156    pub async fn handle_event<E, A, R>(self: &Arc<Self>, event: E) -> WalleResult<()>
157    where
158        AH: ActionHandler<E, A, R> + Send + Sync + 'static,
159        EH: EventHandler<E, A, R> + Send + Sync + 'static,
160        E: Send + 'static,
161    {
162        self.event_handler
163            .call(
164                self.action_handler.before_call_event(event, self).await?,
165                self,
166            )
167            .await?;
168        self.action_handler.after_call_event(self).await
169    }
170    pub async fn handle_action<E, A, R>(self: &Arc<Self>, action: A) -> WalleResult<R>
171    where
172        AH: ActionHandler<E, A, R> + Send + Sync + 'static,
173        EH: EventHandler<E, A, R> + Send + Sync + 'static,
174        A: Send + 'static,
175        R: Send + 'static,
176    {
177        self.event_handler
178            .after_call_action(
179                self.action_handler
180                    .call(
181                        self.event_handler.before_call_action(action, self).await?,
182                        self,
183                    )
184                    .await?,
185                self,
186            )
187            .await
188    }
189}
190
191impl<AH, EH> GetStatus for OneBot<AH, EH>
192where
193    AH: GetStatus + Sync,
194{
195    fn get_status<'a, 't>(&'a self) -> Pin<Box<dyn Future<Output = structs::Status> + Send + 't>>
196    where
197        Self: Sized,
198        'a: 't,
199        Self: Sync + 't,
200    {
201        self.action_handler.get_status()
202    }
203    fn is_good<'a, 't>(&'a self) -> Pin<Box<dyn Future<Output = bool> + Send + 't>>
204    where
205        'a: 't,
206        Self: 't,
207    {
208        self.action_handler.is_good()
209    }
210}
211
212impl<AH, EH> GetSelfs for OneBot<AH, EH>
213where
214    AH: GetSelfs,
215{
216    fn get_impl<'a, 'b, 't>(
217        &'a self,
218        selft: &'b structs::Selft,
219    ) -> Pin<Box<dyn Future<Output = String> + Send + 't>>
220    where
221        'a: 't,
222        'b: 't,
223        Self: 't,
224    {
225        self.action_handler.get_impl(selft)
226    }
227    fn get_selfs<'a, 't>(&'a self) -> Pin<Box<dyn Future<Output = Vec<structs::Selft>> + Send + 't>>
228    where
229        'a: 't,
230        Self: 't,
231    {
232        self.action_handler.get_selfs()
233    }
234}