flow_bot/
lib.rs

1#![feature(try_trait_v2)]
2
3//! An onebot-11 SDK that simplifies bot creation.
4//!
5//! Flow-bot is carefully crafted to provide a mechanism similar to that of axum so if you are familiar with axum, you will find it easy to use.
6//!
7//! The basic unit of event processing in flow-bot is a handler. A handler is a function that optionally takes [`BotContext`] and a [`BotEvent`] or any of the extractors as arguments and returns a [`HandlerControl`].
8//! Handlers can parse the incoming event and respond to it. The returned value serves as a control flow signal to determine the flow of the event processing which is where the name comes from.
9//!
10//! [`BotContext`]: crate::base::context::BotContext
11//! [`BotEvent`]: crate::event::BotEvent
12//!
13//! # Example
14//! ```no_run
15//! use flow_bot::{
16//!     FlowBotBuilder,
17//!     base::{connect::ReverseConnectionConfig, extract::Message, handler::HandlerControl},
18//! };
19//!
20//! async fn on_message(msg: Message) -> HandlerControl {
21//!     println!("{:?}", msg.message);
22//!     HandlerControl::Continue
23//! }
24//!
25//! async fn main() {
26//!     let bot = FlowBotBuilder::new(ReverseConnectionConfig {
27//!         target: "ws://localhost:19999".to_string(),
28//!         auth: None,
29//!     })
30//!     .with_state(())
31//!     .with_handler(on_message)
32//!     .build();
33//!
34//!     bot.run().await.unwrap();
35//! }
36//! ```
37//!
38//! # Handlers
39//!
40//! Handlers are functions that can be registered to process events. They can be registered using the [`with_handler`] method.
41//! Commonly, a handler responds to a event by calling methods in [`ApiExt`] which is implemented by [`BotContext`] to control the bot.
42//!
43//! [`with_handler`]: crate::FlowBotBuilder::with_handler
44//! [`ApiExt`]: crate::api::api_ext::ApiExt
45//! [`BotContext`]: crate::base::context::BotContext
46//!
47//! The returned value of a handler is a [`HandlerControl`] which determines the flow of the event processing.
48//! [`HandlerControl::Continue`] means the event will be passed to the next handler, [`HandlerControl::Block`] means the event will not be passed to the next handler.
49//! [`HandlerControl::Skip`] means the event will be passed to the next handler but the event will not be processed by the current handler, used in the case where the event criteria is not met within the handler.
50//! It is a crucial difference from many other bot SDKs that we do not provide a matcher machenism to match the event, so that you need to implement the logic in the handler. However, a similar way is mimiced by the extractor mechanism. See the [Extractors] section below.
51//!
52//! [`HandlerControl`]: crate::base::handler::HandlerControl
53//! [`HandlerControl::Continue`]: crate::base::handler::HandlerControl::Continue
54//! [`HandlerControl::Block`]: crate::base::handler::HandlerControl::Block
55//! [`HandlerControl::Skip`]: crate::base::handler::HandlerControl::Skip
56//! [Extractors]: #extractors
57//!
58//! # Extractors
59//! Extractors work similarly to the extractors in axum. They are functions that can be registered to extract data from the event. They are to extract data from the context and event for the handler to use.
60//! To see a full list of predefined extractors, see the [`extract`] module.
61//!
62//! [`extract`]: crate::base::extract
63//!
64//! ## Using Extractors
65//!
66//! It is already shown in the example above how to use the predefined [`Message`] extractor which extracts the message from the event. It is also possible to use extractors to match event criteria.
67//!
68//! [`Message`]: crate::base::extract::Message
69//!
70//! ```no_run
71//! use flow_bot::{
72//!    base::extract::MatchGroupId,handler::HandlerControl
73//! };
74//!
75//! async fn on_group_msg(_: MatchGroupId<123>) -> HandlerControl {
76//!    // This handler will only be called when the event is a group message in group 123, otherwise it will be skipped.
77//!    println!("Received message in group 123");
78//!    HandlerControl::Continue
79//! }
80//! ```
81//!
82//! ## Optional Extraction
83//!
84//! Extractors can be optional by using the [`Option`] type. This is useful when the data is not always present in the event.
85//!
86//! ## Custom Extractors
87//!
88//! It is also possible to create custom extractors by implementing the [`FromEvent`] trait.
89//! This is an async trait that takes the context and event as arguments and returns a result of the extracted data.
90//!
91//! [`FromEvent`]: crate::base::extract::FromEvent
92//!
93//! # States
94//!
95//! States are data that can be shared between handlers. They are stored in the context and can be accessed by any handler.
96//! States can be added to the bot using the [`with_state`] method.
97//! States can be any type that implements [`std::any::Any`], [`Send`], and [`Sync`].
98//!
99//! [`with_state`]: crate::FlowBotBuilder::with_state
100//!
101//! In a handler, a state is accessed by using the [`State`] extractor.
102//!
103//! [`State`]: crate::base::extract::State
104//!
105//! There can be multiple states in the bot, each with a unique type.
106//! If the required state is not present in the context, the handler will be skipped.
107//!
108//! # Services
109//!
110//! Services provide a way to make the bot extendable. They are similar to handlers but take the shape of a struct that implements the [`Service`] trait and have their own state.
111//! It is made so that the bot can be extended to use services from other crates with ease.
112//! Services can be added to the bot using the [`with_service`] method.
113//!
114//! [`Service`]: crate::base::service::Service
115//! [`with_service`]: crate::FlowBotBuilder::with_service
116use std::{any::Any, ops::Deref, sync::Arc};
117
118use base::{
119    connect::ReverseConnectionConfig,
120    context::{BotContext, Context, StateMap},
121    handler::{ErasedHandler, HWrapped, Handler, HandlerControl},
122    service::Service,
123};
124use error::FlowError;
125use event::Event;
126use futures::{
127    StreamExt,
128    stream::{SplitSink, SplitStream},
129};
130use tokio::net::TcpStream;
131use tokio_tungstenite::{
132    MaybeTlsStream, WebSocketStream, connect_async,
133    tungstenite::{Message, Utf8Bytes, client::IntoClientRequest},
134};
135
136pub mod api;
137pub mod base;
138pub mod error;
139pub mod event;
140pub mod message;
141
142enum HandlerOrService {
143    Handler(Box<dyn ErasedHandler>),
144    Service(Box<dyn Service>),
145}
146
147pub struct FlowBot {
148    handlers: Arc<Vec<HandlerOrService>>,
149    context: BotContext,
150    connection: ReverseConnectionConfig,
151}
152
153pub struct FlowBotBuilder {
154    handlers: Vec<HandlerOrService>,
155    connection: ReverseConnectionConfig,
156    states: StateMap,
157}
158
159impl FlowBotBuilder {
160    /// Create a new FlowBotBuilder with the given connection configuration.
161    pub fn new(connection: ReverseConnectionConfig) -> Self {
162        Self {
163            handlers: Vec::new(),
164            connection,
165            states: StateMap::new(),
166        }
167    }
168
169    /// Add a state to the bot.
170    /// If the state of the same type is already present, it will be replaced.
171    pub fn with_state<S: 'static + Any + Send + Sync>(mut self, state: S) -> Self {
172        self.states.insert(state);
173        self
174    }
175
176    /// Add a handler to the bot.
177    /// The order of the handlers added is the order in which they will be called.
178    pub fn with_handler<T, H>(mut self, handler: H) -> Self
179    where
180        T: Send + Sync + 'static,
181        H: Handler<T> + Send + Sync + 'static,
182    {
183        let wrapped = HWrapped {
184            handler,
185            _phantom: std::marker::PhantomData,
186        };
187        self.handlers
188            .push(HandlerOrService::Handler(Box::new(wrapped)));
189        self
190    }
191
192    /// Add a service to the bot.
193    pub fn with_service<Svc>(mut self, service: Svc) -> Self
194    where
195        Svc: Service + Send + Sync + 'static,
196    {
197        self.handlers
198            .push(HandlerOrService::Service(Box::new(service)));
199        self
200    }
201
202    /// Build the FlowBot.
203    pub fn build(self) -> FlowBot {
204        FlowBot {
205            handlers: Arc::new(self.handlers),
206            context: BotContext::new(Context::new(self.states)),
207            connection: self.connection,
208        }
209    }
210}
211
212impl FlowBot {
213    /// Run the bot.
214    /// This will connect to the server and start processing events.
215    /// This method will never return unless an error occurs.
216    pub async fn run(&self) -> Result<(), FlowError> {
217        let (write, read) = self.connect().await?;
218
219        self.set_sink(write).await;
220        self.run_msg_loop(read).await?;
221
222        Ok(())
223    }
224
225    async fn connect(
226        &self,
227    ) -> Result<
228        (
229            SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
230            SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
231        ),
232        FlowError,
233    > {
234        let mut request = self.connection.target.clone().into_client_request()?;
235        if let Some(auth) = &self.connection.auth {
236            request
237                .headers_mut()
238                .append("Authorization", auth.parse().unwrap());
239        }
240
241        let (ws_stream, _) = connect_async(request).await?;
242        Ok(ws_stream.split())
243    }
244
245    async fn set_sink(&self, sink: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>) {
246        let mut ws_sink = self.context.sink.lock().await;
247        *ws_sink = Some(sink);
248    }
249
250    async fn run_msg_loop(
251        &self,
252        mut read: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
253    ) -> Result<(), FlowError> {
254        while let Some(msg) = read.next().await {
255            let msg = msg?;
256            if let Message::Text(text) = msg {
257                if let Some(echo) = Self::check_is_echo(&text) {
258                    self.context.on_recv_echo(echo, text.to_string());
259                    continue;
260                }
261                self.handle_event(text)?;
262            }
263        }
264        Ok(())
265    }
266
267    fn handle_event(&self, text: Utf8Bytes) -> Result<(), FlowError> {
268        let event: Event = serde_json::from_slice(text.as_bytes())?;
269        let event = Arc::new(event);
270        let context = self.context.clone();
271        let handlers = self.handlers.clone();
272        tokio::spawn(async move {
273            for handler in handlers.deref() {
274                let control = match handler {
275                    HandlerOrService::Handler(handler) => {
276                        handler.call(context.clone(), event.clone()).await
277                    }
278                    HandlerOrService::Service(service) => {
279                        service.serve(context.clone(), event.clone()).await
280                    }
281                };
282
283                if let HandlerControl::Block = control {
284                    break;
285                }
286            }
287        });
288        Ok(())
289    }
290
291    fn check_is_echo(msg: &str) -> Option<String> {
292        let msg = serde_json::from_str::<serde_json::Value>(msg).unwrap();
293        if let serde_json::Value::Object(obj) = msg
294            && let Some(serde_json::Value::String(echo)) = obj.get("echo")
295        {
296            return Some(echo.clone());
297        }
298        None
299    }
300}