Skip to main content

flow_bot/
lib.rs

1#![feature(try_trait_v2)]
2#![feature(adt_const_params)]
3#![feature(unsized_const_params)]
4
5//! An onebot-11 SDK that simplifies bot creation.
6//!
7//! Flow-bot is carefully crafted to provide a mechanism similar to that of axum so if you are familiar with axum, you will find it easy to use.
8//!
9//! The basic unit of event processing in flow-bot is a handler. A handler is a function that optionally takes [`BotContext`] and a [`BotEvent`] or any of the extractors as arguments and returns a [`HandlerControl`].
10//! Handlers can parse the incoming event and respond to it. The returned value serves as a control flow signal to determine the flow of the event processing which is where the name comes from.
11//!
12//! [`BotContext`]: crate::base::context::BotContext
13//! [`BotEvent`]: crate::event::BotEvent
14//!
15//! # Example
16//! ```no_run
17//! use flow_bot::{
18//!     FlowBotBuilder,
19//!     base::{connect::ReverseConnectionConfig, extract::Message, handler::HandlerControl},
20//! };
21//!
22//! async fn on_message(msg: Message) -> HandlerControl {
23//!     println!("{:?}", msg.message);
24//!     HandlerControl::Continue
25//! }
26//!
27//! async fn main() {
28//!     let bot = FlowBotBuilder::new(ReverseConnectionConfig {
29//!         target: "ws://localhost:19999".to_string(),
30//!         auth: None,
31//!     })
32//!     .with_state(())
33//!     .with_handler(on_message)
34//!     .build();
35//!
36//!     bot.run().await.unwrap();
37//! }
38//! ```
39//!
40//! # Handlers
41//!
42//! Handlers are functions that can be registered to process events. They can be registered using the [`with_handler`] method.
43//! Commonly, a handler responds to a event by calling methods in [`ApiExt`] which is implemented by [`BotContext`] to control the bot.
44//!
45//! [`with_handler`]: crate::FlowBotBuilder::with_handler
46//! [`ApiExt`]: crate::api::api_ext::ApiExt
47//! [`BotContext`]: crate::base::context::BotContext
48//!
49//! The returned value of a handler is a [`HandlerControl`] which determines the flow of the event processing.
50//! [`HandlerControl::Continue`] means the event will be passed to the next handler, [`HandlerControl::Block`] means the event will not be passed to the next handler.
51//! [`HandlerControl::Skip`] means the event will be passed to the next handler but the event will not be processed by the current handler, used in the case where the event criteria is not met within the handler.
52//! It is a crucial difference from many other bot SDKs that we do not provide a matcher machenism to match the event, so that you need to implement the logic in the handler. However, a similar way is mimiced by the extractor mechanism. See the [Extractors] section below.
53//!
54//! [`HandlerControl`]: crate::base::handler::HandlerControl
55//! [`HandlerControl::Continue`]: crate::base::handler::HandlerControl::Continue
56//! [`HandlerControl::Block`]: crate::base::handler::HandlerControl::Block
57//! [`HandlerControl::Skip`]: crate::base::handler::HandlerControl::Skip
58//! [Extractors]: #extractors
59//!
60//! # Extractors
61//! Extractors work similarly to the extractors in axum. They are functions that can be registered to extract data from the event. They are to extract data from the context and event for the handler to use.
62//! To see a full list of predefined extractors, see the [`extract`] module.
63//!
64//! [`extract`]: crate::base::extract
65//!
66//! ## Using Extractors
67//!
68//! It is already shown in the example above how to use the predefined [`Message`] extractor which extracts the message from the event. It is also possible to use extractors to match event criteria.
69//!
70//! [`Message`]: crate::base::extract::Message
71//!
72//! ```no_run
73//! use flow_bot::{
74//!    base::extract::MatchGroupId,handler::HandlerControl
75//! };
76//!
77//! async fn on_group_msg(_: MatchGroupId<123>) -> HandlerControl {
78//!    // This handler will only be called when the event is a group message in group 123, otherwise it will be skipped.
79//!    println!("Received message in group 123");
80//!    HandlerControl::Continue
81//! }
82//! ```
83//!
84//! ## Optional Extraction
85//!
86//! Extractors can be optional by using the [`Option`] type. This is useful when the data is not always present in the event.
87//!
88//! ## Custom Extractors
89//!
90//! It is also possible to create custom extractors by implementing the [`FromEvent`] trait.
91//! This is an async trait that takes the context and event as arguments and returns a result of the extracted data.
92//!
93//! [`FromEvent`]: crate::base::extract::FromEvent
94//!
95//! # States
96//!
97//! States are data that can be shared between handlers. They are stored in the context and can be accessed by any handler.
98//! States can be added to the bot using the [`with_state`] method.
99//! States can be any type that implements [`std::any::Any`], [`Send`], and [`Sync`].
100//!
101//! [`with_state`]: crate::FlowBotBuilder::with_state
102//!
103//! In a handler, a state is accessed by using the [`State`] extractor.
104//!
105//! [`State`]: crate::base::extract::State
106//!
107//! There can be multiple states in the bot, each with a unique type.
108//! If the required state is not present in the context, the handler will be skipped.
109//!
110//! # Services
111//!
112//! Services provide a way to make the bot extendable. They are similar to handlers but take the shape of a struct that implements the [`Service`] trait and have their own state.
113//! It is made so that the bot can be extended to use services from other crates with ease.
114//! Services can be added to the bot using the [`with_service`] method.
115//!
116//! [`Service`]: crate::base::service::Service
117//! [`with_service`]: crate::FlowBotBuilder::with_service
118use std::{any::Any, ops::Deref, sync::Arc};
119
120use base::{
121    connect::ReverseConnectionConfig,
122    context::{BotContext, Context, StateMap},
123    handler::{ErasedHandler, HWrapped, Handler, HandlerControl},
124    service::Service,
125};
126use error::FlowError;
127use event::Event;
128use futures::{
129    StreamExt,
130    stream::{SplitSink, SplitStream},
131};
132use tokio::net::TcpStream;
133use tokio_tungstenite::{
134    MaybeTlsStream, WebSocketStream, connect_async,
135    tungstenite::{Message, Utf8Bytes, client::IntoClientRequest},
136};
137
138pub mod api;
139pub mod base;
140pub mod error;
141pub mod event;
142pub mod message;
143
144#[cfg(feature = "macros")]
145pub use flow_bot_macros::flow_service;
146
147enum HandlerOrService {
148    Handler(Box<dyn ErasedHandler>),
149    Service(Box<dyn Service>),
150}
151
152pub struct FlowBot {
153    handlers: Arc<Vec<HandlerOrService>>,
154    context: BotContext,
155    connection: ReverseConnectionConfig,
156}
157
158pub struct FlowBotBuilder {
159    handlers: Vec<HandlerOrService>,
160    connection: ReverseConnectionConfig,
161    states: StateMap,
162}
163
164impl FlowBotBuilder {
165    /// Create a new FlowBotBuilder with the given connection configuration.
166    pub fn new(connection: ReverseConnectionConfig) -> Self {
167        Self {
168            handlers: Vec::new(),
169            connection,
170            states: StateMap::new(),
171        }
172    }
173
174    /// Add a state to the bot.
175    /// If the state of the same type is already present, it will be replaced.
176    pub fn with_state<S: 'static + Any + Send + Sync>(mut self, state: S) -> Self {
177        self.states.insert(state);
178        self
179    }
180
181    /// Add a handler to the bot.
182    /// The order of the handlers added is the order in which they will be called.
183    pub fn with_handler<T, H>(mut self, handler: H) -> Self
184    where
185        T: Send + Sync + 'static,
186        H: Handler<T> + Send + Sync + 'static,
187    {
188        let wrapped = HWrapped {
189            handler,
190            _phantom: std::marker::PhantomData,
191        };
192        self.handlers
193            .push(HandlerOrService::Handler(Box::new(wrapped)));
194        self
195    }
196
197    /// Add a service to the bot.
198    pub fn with_service<Svc>(mut self, service: Svc) -> Self
199    where
200        Svc: Service + Send + Sync + 'static,
201    {
202        self.handlers
203            .push(HandlerOrService::Service(Box::new(service)));
204        self
205    }
206
207    /// Build the FlowBot.
208    pub fn build(self) -> FlowBot {
209        FlowBot {
210            handlers: Arc::new(self.handlers),
211            context: BotContext::new(Context::new(self.states)),
212            connection: self.connection,
213        }
214    }
215}
216
217impl FlowBot {
218    /// Run the bot.
219    /// This will connect to the server and start processing events.
220    /// This method will never return unless an error occurs.
221    pub async fn run(&self) -> Result<(), FlowError> {
222        let (write, read) = self.connect().await?;
223
224        self.set_sink(write).await;
225        self.init_services().await;
226        self.run_msg_loop(read).await?;
227
228        Ok(())
229    }
230
231    async fn connect(
232        &self,
233    ) -> Result<
234        (
235            SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
236            SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
237        ),
238        FlowError,
239    > {
240        let mut request = self.connection.target.clone().into_client_request()?;
241        if let Some(auth) = &self.connection.auth {
242            request
243                .headers_mut()
244                .append("Authorization", auth.parse().unwrap());
245        }
246
247        let (ws_stream, _) = connect_async(request).await?;
248        Ok(ws_stream.split())
249    }
250
251    async fn set_sink(&self, sink: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>) {
252        let mut ws_sink = self.context.sink.lock().await;
253        *ws_sink = Some(sink);
254    }
255
256    async fn run_msg_loop(
257        &self,
258        mut read: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
259    ) -> Result<(), FlowError> {
260        while let Some(msg) = read.next().await {
261            let msg = msg?;
262            if let Message::Text(text) = msg {
263                if let Some(echo) = Self::check_is_echo(&text) {
264                    self.context.on_recv_echo(echo, text.to_string());
265                    continue;
266                }
267                self.handle_event(text)?;
268            }
269        }
270        Ok(())
271    }
272
273    async fn init_services(&self) {
274        for handler in self.handlers.deref() {
275            if let HandlerOrService::Service(service) = handler {
276                service.init(self.context.clone()).await;
277            }
278        }
279    }
280
281    fn handle_event(&self, text: Utf8Bytes) -> Result<(), FlowError> {
282        let event: Event = serde_json::from_slice(text.as_bytes())?;
283        let event = Arc::new(event);
284        let context = self.context.clone();
285        let handlers = self.handlers.clone();
286        tokio::spawn(async move {
287            for handler in handlers.deref() {
288                let control = match handler {
289                    HandlerOrService::Handler(handler) => {
290                        handler.call(context.clone(), event.clone()).await
291                    }
292                    HandlerOrService::Service(service) => {
293                        service.serve(context.clone(), event.clone()).await
294                    }
295                };
296
297                if let HandlerControl::Block = control {
298                    break;
299                }
300            }
301        });
302        Ok(())
303    }
304
305    fn check_is_echo(msg: &str) -> Option<String> {
306        let msg = serde_json::from_str::<serde_json::Value>(msg).unwrap();
307        if let serde_json::Value::Object(obj) = msg
308            && let Some(serde_json::Value::String(echo)) = obj.get("echo")
309        {
310            return Some(echo.clone());
311        }
312        None
313    }
314}