flow_bot/lib.rs
1#![feature(try_trait_v2)]
2#![feature(adt_const_params)]
3#![feature(unsized_const_params)]
4
5//! An onebot-11 SDK that simplifies bot creation.
6//!
7//! Flow-bot is carefully crafted to provide a mechanism similar to that of axum so if you are familiar with axum, you will find it easy to use.
8//!
9//! The basic unit of event processing in flow-bot is a handler. A handler is a function that optionally takes [`BotContext`] and a [`BotEvent`] or any of the extractors as arguments and returns a [`HandlerControl`].
10//! Handlers can parse the incoming event and respond to it. The returned value serves as a control flow signal to determine the flow of the event processing which is where the name comes from.
11//!
12//! [`BotContext`]: crate::base::context::BotContext
13//! [`BotEvent`]: crate::event::BotEvent
14//!
15//! # Example
16//! ```no_run
17//! use flow_bot::{
18//! FlowBotBuilder,
19//! base::{connect::ReverseConnectionConfig, extract::Message, handler::HandlerControl},
20//! };
21//!
22//! async fn on_message(msg: Message) -> HandlerControl {
23//! println!("{:?}", msg.message);
24//! HandlerControl::Continue
25//! }
26//!
27//! async fn main() {
28//! let bot = FlowBotBuilder::new(ReverseConnectionConfig {
29//! target: "ws://localhost:19999".to_string(),
30//! auth: None,
31//! })
32//! .with_state(())
33//! .with_handler(on_message)
34//! .build();
35//!
36//! bot.run().await.unwrap();
37//! }
38//! ```
39//!
40//! # Handlers
41//!
42//! Handlers are functions that can be registered to process events. They can be registered using the [`with_handler`] method.
43//! Commonly, a handler responds to a event by calling methods in [`ApiExt`] which is implemented by [`BotContext`] to control the bot.
44//!
45//! [`with_handler`]: crate::FlowBotBuilder::with_handler
46//! [`ApiExt`]: crate::api::api_ext::ApiExt
47//! [`BotContext`]: crate::base::context::BotContext
48//!
49//! The returned value of a handler is a [`HandlerControl`] which determines the flow of the event processing.
50//! [`HandlerControl::Continue`] means the event will be passed to the next handler, [`HandlerControl::Block`] means the event will not be passed to the next handler.
51//! [`HandlerControl::Skip`] means the event will be passed to the next handler but the event will not be processed by the current handler, used in the case where the event criteria is not met within the handler.
52//! It is a crucial difference from many other bot SDKs that we do not provide a matcher machenism to match the event, so that you need to implement the logic in the handler. However, a similar way is mimiced by the extractor mechanism. See the [Extractors] section below.
53//!
54//! [`HandlerControl`]: crate::base::handler::HandlerControl
55//! [`HandlerControl::Continue`]: crate::base::handler::HandlerControl::Continue
56//! [`HandlerControl::Block`]: crate::base::handler::HandlerControl::Block
57//! [`HandlerControl::Skip`]: crate::base::handler::HandlerControl::Skip
58//! [Extractors]: #extractors
59//!
60//! # Extractors
61//! Extractors work similarly to the extractors in axum. They are functions that can be registered to extract data from the event. They are to extract data from the context and event for the handler to use.
62//! To see a full list of predefined extractors, see the [`extract`] module.
63//!
64//! [`extract`]: crate::base::extract
65//!
66//! ## Using Extractors
67//!
68//! It is already shown in the example above how to use the predefined [`Message`] extractor which extracts the message from the event. It is also possible to use extractors to match event criteria.
69//!
70//! [`Message`]: crate::base::extract::Message
71//!
72//! ```no_run
73//! use flow_bot::{
74//! base::extract::MatchGroupId,handler::HandlerControl
75//! };
76//!
77//! async fn on_group_msg(_: MatchGroupId<123>) -> HandlerControl {
78//! // This handler will only be called when the event is a group message in group 123, otherwise it will be skipped.
79//! println!("Received message in group 123");
80//! HandlerControl::Continue
81//! }
82//! ```
83//!
84//! ## Optional Extraction
85//!
86//! Extractors can be optional by using the [`Option`] type. This is useful when the data is not always present in the event.
87//!
88//! ## Custom Extractors
89//!
90//! It is also possible to create custom extractors by implementing the [`FromEvent`] trait.
91//! This is an async trait that takes the context and event as arguments and returns a result of the extracted data.
92//!
93//! [`FromEvent`]: crate::base::extract::FromEvent
94//!
95//! # States
96//!
97//! States are data that can be shared between handlers. They are stored in the context and can be accessed by any handler.
98//! States can be added to the bot using the [`with_state`] method.
99//! States can be any type that implements [`std::any::Any`], [`Send`], and [`Sync`].
100//!
101//! [`with_state`]: crate::FlowBotBuilder::with_state
102//!
103//! In a handler, a state is accessed by using the [`State`] extractor.
104//!
105//! [`State`]: crate::base::extract::State
106//!
107//! There can be multiple states in the bot, each with a unique type.
108//! If the required state is not present in the context, the handler will be skipped.
109//!
110//! # Services
111//!
112//! Services provide a way to make the bot extendable. They are similar to handlers but take the shape of a struct that implements the [`Service`] trait and have their own state.
113//! It is made so that the bot can be extended to use services from other crates with ease.
114//! Services can be added to the bot using the [`with_service`] method.
115//!
116//! [`Service`]: crate::base::service::Service
117//! [`with_service`]: crate::FlowBotBuilder::with_service
118use std::{
119 any::Any,
120 ops::Deref,
121 sync::{
122 Arc,
123 atomic::{AtomicU32, Ordering},
124 },
125};
126
127use base::{
128 connect::ReverseConnectionConfig,
129 context::{BotContext, Context, StateMap},
130 handler::{ErasedHandler, HWrapped, Handler, HandlerControl},
131 service::Service,
132};
133use error::FlowError;
134use event::Event;
135use futures::{
136 StreamExt,
137 stream::{SplitSink, SplitStream},
138};
139use tokio::net::TcpStream;
140use tokio_tungstenite::{
141 MaybeTlsStream, WebSocketStream, connect_async,
142 tungstenite::{Message, Utf8Bytes, client::IntoClientRequest},
143};
144
145pub mod api;
146pub mod base;
147pub mod error;
148pub mod event;
149pub mod extensions;
150pub mod message;
151
152#[cfg(feature = "macros")]
153pub use flow_bot_macros::flow_service;
154
155enum HandlerOrService {
156 Handler(Box<dyn ErasedHandler>),
157 Service(Box<dyn Service>),
158}
159
160pub struct FlowBot {
161 handlers: Arc<Vec<HandlerOrService>>,
162 context: BotContext,
163 connection: ReverseConnectionConfig,
164 reconnect_attempt: AtomicU32,
165}
166
167pub struct FlowBotBuilder {
168 handlers: Vec<HandlerOrService>,
169 connection: ReverseConnectionConfig,
170 states: StateMap,
171}
172
173impl FlowBotBuilder {
174 /// Create a new FlowBotBuilder with the given connection configuration.
175 pub fn new(connection: ReverseConnectionConfig) -> Self {
176 Self {
177 handlers: Vec::new(),
178 connection,
179 states: StateMap::new(),
180 }
181 }
182
183 /// Add a state to the bot.
184 /// If the state of the same type is already present, it will be replaced.
185 pub fn with_state<S: 'static + Any + Send + Sync>(mut self, state: S) -> Self {
186 self.states.insert(state);
187 self
188 }
189
190 /// Add a handler to the bot.
191 /// The order of the handlers added is the order in which they will be called.
192 pub fn with_handler<T, H>(mut self, handler: H) -> Self
193 where
194 T: Send + Sync + 'static,
195 H: Handler<T> + Send + Sync + 'static,
196 {
197 let wrapped = HWrapped {
198 handler,
199 _phantom: std::marker::PhantomData,
200 };
201 self.handlers
202 .push(HandlerOrService::Handler(Box::new(wrapped)));
203 self
204 }
205
206 /// Add a service to the bot.
207 pub fn with_service<Svc>(mut self, service: Svc) -> Self
208 where
209 Svc: Service + Send + Sync + 'static,
210 {
211 self.handlers
212 .push(HandlerOrService::Service(Box::new(service)));
213 self
214 }
215
216 /// Build the FlowBot.
217 pub fn build(self) -> FlowBot {
218 FlowBot {
219 handlers: Arc::new(self.handlers),
220 context: BotContext::new(Context::new(self.states)),
221 connection: self.connection,
222 reconnect_attempt: AtomicU32::new(0),
223 }
224 }
225}
226
227impl FlowBot {
228 /// Run the bot.
229 /// This will connect to the server and start processing events.
230 /// This method will never return unless an error occurs or reconnection attempts are exhausted.
231 pub async fn run(&self) -> Result<(), FlowError> {
232 use base::connect::ReconnectionStrategy;
233
234 match &self.connection.reconnection {
235 ReconnectionStrategy::None => self.run_once().await,
236 ReconnectionStrategy::Infinite {
237 initial_delay_ms,
238 max_delay_ms,
239 } => {
240 self.run_with_infinite_reconnect(*initial_delay_ms, *max_delay_ms)
241 .await
242 }
243 ReconnectionStrategy::Limited {
244 max_attempts,
245 initial_delay_ms,
246 max_delay_ms,
247 } => {
248 self.run_with_limited_reconnect(*max_attempts, *initial_delay_ms, *max_delay_ms)
249 .await
250 }
251 }
252 }
253
254 async fn run_once(&self) -> Result<(), FlowError> {
255 let (write, read) = self.connect().await?;
256
257 // Connection established successfully, reset attempt counter
258 self.reconnect_attempt.store(0, Ordering::Relaxed);
259
260 self.set_sink(write).await;
261 self.init_services().await;
262 self.run_msg_loop(read).await?;
263
264 Ok(())
265 }
266
267 async fn run_with_infinite_reconnect(
268 &self,
269 initial_delay_ms: u64,
270 max_delay_ms: u64,
271 ) -> Result<(), FlowError> {
272 loop {
273 let attempt = self.reconnect_attempt.load(Ordering::Relaxed);
274 let current_delay = (initial_delay_ms * 2_u64.pow(attempt)).min(max_delay_ms);
275
276 match self.run_once().await {
277 Ok(_) => {
278 eprintln!("Connection closed. Reconnecting in {}ms...", current_delay);
279 }
280 Err(e) => {
281 eprintln!(
282 "Connection error: {}. Reconnecting in {}ms...",
283 e, current_delay
284 );
285 }
286 }
287
288 self.reconnect_attempt.fetch_add(1, Ordering::Relaxed);
289 tokio::time::sleep(tokio::time::Duration::from_millis(current_delay)).await;
290 }
291 }
292
293 async fn run_with_limited_reconnect(
294 &self,
295 max_attempts: u32,
296 initial_delay_ms: u64,
297 max_delay_ms: u64,
298 ) -> Result<(), FlowError> {
299 loop {
300 let attempt = self.reconnect_attempt.load(Ordering::Relaxed);
301
302 if attempt >= max_attempts {
303 return Err(FlowError::ReconnectionFailed(max_attempts));
304 }
305
306 let current_delay = (initial_delay_ms * 2_u64.pow(attempt)).min(max_delay_ms);
307
308 match self.run_once().await {
309 Ok(_) => {
310 // Connection was successful and has now closed
311 // Counter was already reset to 0 in run_once
312 eprintln!("Connection closed. Reconnecting in {}ms...", current_delay);
313 }
314 Err(e) => {
315 eprintln!(
316 "Connection error: {}. Reconnecting in {}ms... (attempt {}/{})",
317 e,
318 current_delay,
319 attempt + 1,
320 max_attempts
321 );
322 }
323 }
324
325 self.reconnect_attempt.fetch_add(1, Ordering::Relaxed);
326 tokio::time::sleep(tokio::time::Duration::from_millis(current_delay)).await;
327 }
328 }
329
330 async fn connect(
331 &self,
332 ) -> Result<
333 (
334 SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
335 SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
336 ),
337 FlowError,
338 > {
339 let mut request = self.connection.target.clone().into_client_request()?;
340 if let Some(auth) = &self.connection.auth {
341 request
342 .headers_mut()
343 .append("Authorization", auth.parse().unwrap());
344 }
345
346 let (ws_stream, _) = connect_async(request).await?;
347 Ok(ws_stream.split())
348 }
349
350 async fn set_sink(&self, sink: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>) {
351 let mut ws_sink = self.context.sink.lock().await;
352 *ws_sink = Some(sink);
353 }
354
355 async fn run_msg_loop(
356 &self,
357 mut read: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
358 ) -> Result<(), FlowError> {
359 while let Some(msg) = read.next().await {
360 let msg = msg?;
361 if let Message::Text(text) = msg {
362 if let Some(echo) = Self::check_is_echo(&text) {
363 self.context.on_recv_echo(echo, text.to_string());
364 continue;
365 }
366 self.handle_event(text)?;
367 }
368 }
369 Ok(())
370 }
371
372 async fn init_services(&self) {
373 for handler in self.handlers.deref() {
374 if let HandlerOrService::Service(service) = handler {
375 service.init(self.context.clone()).await;
376 }
377 }
378 }
379
380 fn handle_event(&self, text: Utf8Bytes) -> Result<(), FlowError> {
381 let event: Event = serde_json::from_slice(text.as_bytes())?;
382 let event = Arc::new(event);
383 let context = self.context.clone();
384 let handlers = self.handlers.clone();
385 tokio::spawn(async move {
386 for handler in handlers.deref() {
387 let control = match handler {
388 HandlerOrService::Handler(handler) => {
389 handler.call(context.clone(), event.clone()).await
390 }
391 HandlerOrService::Service(service) => {
392 service.serve(context.clone(), event.clone()).await
393 }
394 };
395
396 if let HandlerControl::Block = control {
397 break;
398 }
399 }
400 });
401 Ok(())
402 }
403
404 fn check_is_echo(msg: &str) -> Option<String> {
405 let msg = serde_json::from_str::<serde_json::Value>(msg).unwrap();
406 if let serde_json::Value::Object(obj) = msg
407 && let Some(serde_json::Value::String(echo)) = obj.get("echo")
408 {
409 return Some(echo.clone());
410 }
411 None
412 }
413}