Skip to main content

flow_bot/
lib.rs

1#![feature(try_trait_v2)]
2#![feature(adt_const_params)]
3#![feature(unsized_const_params)]
4
5//! An onebot-11 SDK that simplifies bot creation.
6//!
7//! Flow-bot is carefully crafted to provide a mechanism similar to that of axum so if you are familiar with axum, you will find it easy to use.
8//!
9//! The basic unit of event processing in flow-bot is a handler. A handler is a function that optionally takes [`BotContext`] and a [`BotEvent`] or any of the extractors as arguments and returns a [`HandlerControl`].
10//! Handlers can parse the incoming event and respond to it. The returned value serves as a control flow signal to determine the flow of the event processing which is where the name comes from.
11//!
12//! [`BotContext`]: crate::base::context::BotContext
13//! [`BotEvent`]: crate::event::BotEvent
14//!
15//! # Example
16//! ```no_run
17//! use flow_bot::{
18//!     FlowBotBuilder,
19//!     base::{connect::ReverseConnectionConfig, extract::Message, handler::HandlerControl},
20//! };
21//!
22//! async fn on_message(msg: Message) -> HandlerControl {
23//!     println!("{:?}", msg.message);
24//!     HandlerControl::Continue
25//! }
26//!
27//! async fn main() {
28//!     let bot = FlowBotBuilder::new(ReverseConnectionConfig {
29//!         target: "ws://localhost:19999".to_string(),
30//!         auth: None,
31//!     })
32//!     .with_state(())
33//!     .with_handler(on_message)
34//!     .build();
35//!
36//!     bot.run().await.unwrap();
37//! }
38//! ```
39//!
40//! # Handlers
41//!
42//! Handlers are functions that can be registered to process events. They can be registered using the [`with_handler`] method.
43//! Commonly, a handler responds to a event by calling methods in [`ApiExt`] which is implemented by [`BotContext`] to control the bot.
44//!
45//! [`with_handler`]: crate::FlowBotBuilder::with_handler
46//! [`ApiExt`]: crate::api::api_ext::ApiExt
47//! [`BotContext`]: crate::base::context::BotContext
48//!
49//! The returned value of a handler is a [`HandlerControl`] which determines the flow of the event processing.
50//! [`HandlerControl::Continue`] means the event will be passed to the next handler, [`HandlerControl::Block`] means the event will not be passed to the next handler.
51//! [`HandlerControl::Skip`] means the event will be passed to the next handler but the event will not be processed by the current handler, used in the case where the event criteria is not met within the handler.
52//! It is a crucial difference from many other bot SDKs that we do not provide a matcher machenism to match the event, so that you need to implement the logic in the handler. However, a similar way is mimiced by the extractor mechanism. See the [Extractors] section below.
53//!
54//! [`HandlerControl`]: crate::base::handler::HandlerControl
55//! [`HandlerControl::Continue`]: crate::base::handler::HandlerControl::Continue
56//! [`HandlerControl::Block`]: crate::base::handler::HandlerControl::Block
57//! [`HandlerControl::Skip`]: crate::base::handler::HandlerControl::Skip
58//! [Extractors]: #extractors
59//!
60//! # Extractors
61//! Extractors work similarly to the extractors in axum. They are functions that can be registered to extract data from the event. They are to extract data from the context and event for the handler to use.
62//! To see a full list of predefined extractors, see the [`extract`] module.
63//!
64//! [`extract`]: crate::base::extract
65//!
66//! ## Using Extractors
67//!
68//! It is already shown in the example above how to use the predefined [`Message`] extractor which extracts the message from the event. It is also possible to use extractors to match event criteria.
69//!
70//! [`Message`]: crate::base::extract::Message
71//!
72//! ```no_run
73//! use flow_bot::{
74//!    base::extract::MatchGroupId,handler::HandlerControl
75//! };
76//!
77//! async fn on_group_msg(_: MatchGroupId<123>) -> HandlerControl {
78//!    // This handler will only be called when the event is a group message in group 123, otherwise it will be skipped.
79//!    println!("Received message in group 123");
80//!    HandlerControl::Continue
81//! }
82//! ```
83//!
84//! ## Optional Extraction
85//!
86//! Extractors can be optional by using the [`Option`] type. This is useful when the data is not always present in the event.
87//!
88//! ## Custom Extractors
89//!
90//! It is also possible to create custom extractors by implementing the [`FromEvent`] trait.
91//! This is an async trait that takes the context and event as arguments and returns a result of the extracted data.
92//!
93//! [`FromEvent`]: crate::base::extract::FromEvent
94//!
95//! # States
96//!
97//! States are data that can be shared between handlers. They are stored in the context and can be accessed by any handler.
98//! States can be added to the bot using the [`with_state`] method.
99//! States can be any type that implements [`std::any::Any`], [`Send`], and [`Sync`].
100//!
101//! [`with_state`]: crate::FlowBotBuilder::with_state
102//!
103//! In a handler, a state is accessed by using the [`State`] extractor.
104//!
105//! [`State`]: crate::base::extract::State
106//!
107//! There can be multiple states in the bot, each with a unique type.
108//! If the required state is not present in the context, the handler will be skipped.
109//!
110//! # Services
111//!
112//! Services provide a way to make the bot extendable. They are similar to handlers but take the shape of a struct that implements the [`Service`] trait and have their own state.
113//! It is made so that the bot can be extended to use services from other crates with ease.
114//! Services can be added to the bot using the [`with_service`] method.
115//!
116//! [`Service`]: crate::base::service::Service
117//! [`with_service`]: crate::FlowBotBuilder::with_service
118use std::{
119    any::Any,
120    ops::Deref,
121    sync::{
122        Arc,
123        atomic::{AtomicU32, Ordering},
124    },
125};
126
127use base::{
128    connect::ReverseConnectionConfig,
129    context::{BotContext, Context, StateMap},
130    handler::{ErasedHandler, HWrapped, Handler, HandlerControl},
131    service::Service,
132};
133use error::FlowError;
134use event::Event;
135use futures::{
136    StreamExt,
137    stream::{SplitSink, SplitStream},
138};
139use tokio::net::TcpStream;
140use tokio_tungstenite::{
141    MaybeTlsStream, WebSocketStream, connect_async,
142    tungstenite::{Message, Utf8Bytes, client::IntoClientRequest},
143};
144
145pub mod api;
146pub mod base;
147pub mod error;
148pub mod event;
149pub mod extensions;
150pub mod message;
151
152#[cfg(feature = "macros")]
153pub use flow_bot_macros::flow_service;
154
155enum HandlerOrService {
156    Handler(Box<dyn ErasedHandler>),
157    Service(Box<dyn Service>),
158}
159
160pub struct FlowBot {
161    handlers: Arc<Vec<HandlerOrService>>,
162    context: BotContext,
163    connection: ReverseConnectionConfig,
164    reconnect_attempt: AtomicU32,
165}
166
167pub struct FlowBotBuilder {
168    handlers: Vec<HandlerOrService>,
169    connection: ReverseConnectionConfig,
170    states: StateMap,
171}
172
173impl FlowBotBuilder {
174    /// Create a new FlowBotBuilder with the given connection configuration.
175    pub fn new(connection: ReverseConnectionConfig) -> Self {
176        Self {
177            handlers: Vec::new(),
178            connection,
179            states: StateMap::new(),
180        }
181    }
182
183    /// Add a state to the bot.
184    /// If the state of the same type is already present, it will be replaced.
185    pub fn with_state<S: 'static + Any + Send + Sync>(mut self, state: S) -> Self {
186        self.states.insert(state);
187        self
188    }
189
190    /// Add a handler to the bot.
191    /// The order of the handlers added is the order in which they will be called.
192    pub fn with_handler<T, H>(mut self, handler: H) -> Self
193    where
194        T: Send + Sync + 'static,
195        H: Handler<T> + Send + Sync + 'static,
196    {
197        let wrapped = HWrapped {
198            handler,
199            _phantom: std::marker::PhantomData,
200        };
201        self.handlers
202            .push(HandlerOrService::Handler(Box::new(wrapped)));
203        self
204    }
205
206    /// Add a service to the bot.
207    pub fn with_service<Svc>(mut self, service: Svc) -> Self
208    where
209        Svc: Service + Send + Sync + 'static,
210    {
211        self.handlers
212            .push(HandlerOrService::Service(Box::new(service)));
213        self
214    }
215
216    /// Build the FlowBot.
217    pub fn build(self) -> FlowBot {
218        FlowBot {
219            handlers: Arc::new(self.handlers),
220            context: BotContext::new(Context::new(self.states)),
221            connection: self.connection,
222            reconnect_attempt: AtomicU32::new(0),
223        }
224    }
225}
226
227impl FlowBot {
228    /// Run the bot.
229    /// This will connect to the server and start processing events.
230    /// This method will never return unless an error occurs or reconnection attempts are exhausted.
231    pub async fn run(&self) -> Result<(), FlowError> {
232        use base::connect::ReconnectionStrategy;
233
234        match &self.connection.reconnection {
235            ReconnectionStrategy::None => self.run_once().await,
236            ReconnectionStrategy::Infinite {
237                initial_delay_ms,
238                max_delay_ms,
239            } => {
240                self.run_with_infinite_reconnect(*initial_delay_ms, *max_delay_ms)
241                    .await
242            }
243            ReconnectionStrategy::Limited {
244                max_attempts,
245                initial_delay_ms,
246                max_delay_ms,
247            } => {
248                self.run_with_limited_reconnect(*max_attempts, *initial_delay_ms, *max_delay_ms)
249                    .await
250            }
251        }
252    }
253
254    async fn run_once(&self) -> Result<(), FlowError> {
255        let (write, read) = self.connect().await?;
256
257        // Connection established successfully, reset attempt counter
258        self.reconnect_attempt.store(0, Ordering::Relaxed);
259
260        self.set_sink(write).await;
261        self.init_services().await;
262        self.run_msg_loop(read).await?;
263
264        Ok(())
265    }
266
267    async fn run_with_infinite_reconnect(
268        &self,
269        initial_delay_ms: u64,
270        max_delay_ms: u64,
271    ) -> Result<(), FlowError> {
272        loop {
273            let attempt = self.reconnect_attempt.load(Ordering::Relaxed);
274            let current_delay = (initial_delay_ms * 2_u64.pow(attempt)).min(max_delay_ms);
275
276            match self.run_once().await {
277                Ok(_) => {
278                    eprintln!("Connection closed. Reconnecting in {}ms...", current_delay);
279                }
280                Err(e) => {
281                    eprintln!(
282                        "Connection error: {}. Reconnecting in {}ms...",
283                        e, current_delay
284                    );
285                }
286            }
287
288            self.reconnect_attempt.fetch_add(1, Ordering::Relaxed);
289            tokio::time::sleep(tokio::time::Duration::from_millis(current_delay)).await;
290        }
291    }
292
293    async fn run_with_limited_reconnect(
294        &self,
295        max_attempts: u32,
296        initial_delay_ms: u64,
297        max_delay_ms: u64,
298    ) -> Result<(), FlowError> {
299        loop {
300            let attempt = self.reconnect_attempt.load(Ordering::Relaxed);
301
302            if attempt >= max_attempts {
303                return Err(FlowError::ReconnectionFailed(max_attempts));
304            }
305
306            let current_delay = (initial_delay_ms * 2_u64.pow(attempt)).min(max_delay_ms);
307
308            match self.run_once().await {
309                Ok(_) => {
310                    // Connection was successful and has now closed
311                    // Counter was already reset to 0 in run_once
312                    eprintln!("Connection closed. Reconnecting in {}ms...", current_delay);
313                }
314                Err(e) => {
315                    eprintln!(
316                        "Connection error: {}. Reconnecting in {}ms... (attempt {}/{})",
317                        e,
318                        current_delay,
319                        attempt + 1,
320                        max_attempts
321                    );
322                }
323            }
324
325            self.reconnect_attempt.fetch_add(1, Ordering::Relaxed);
326            tokio::time::sleep(tokio::time::Duration::from_millis(current_delay)).await;
327        }
328    }
329
330    async fn connect(
331        &self,
332    ) -> Result<
333        (
334            SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
335            SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
336        ),
337        FlowError,
338    > {
339        let mut request = self.connection.target.clone().into_client_request()?;
340        if let Some(auth) = &self.connection.auth {
341            request
342                .headers_mut()
343                .append("Authorization", auth.parse().unwrap());
344        }
345
346        let (ws_stream, _) = connect_async(request).await?;
347        Ok(ws_stream.split())
348    }
349
350    async fn set_sink(&self, sink: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>) {
351        let mut ws_sink = self.context.sink.lock().await;
352        *ws_sink = Some(sink);
353    }
354
355    async fn run_msg_loop(
356        &self,
357        mut read: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
358    ) -> Result<(), FlowError> {
359        while let Some(msg) = read.next().await {
360            let msg = msg?;
361            if let Message::Text(text) = msg {
362                if let Some(echo) = Self::check_is_echo(&text) {
363                    self.context.on_recv_echo(echo, text.to_string());
364                    continue;
365                }
366                self.handle_event(text)?;
367            }
368        }
369        Ok(())
370    }
371
372    async fn init_services(&self) {
373        for handler in self.handlers.deref() {
374            if let HandlerOrService::Service(service) = handler {
375                service.init(self.context.clone()).await;
376            }
377        }
378    }
379
380    fn handle_event(&self, text: Utf8Bytes) -> Result<(), FlowError> {
381        let event: Event = serde_json::from_slice(text.as_bytes())?;
382        let event = Arc::new(event);
383        let context = self.context.clone();
384        let handlers = self.handlers.clone();
385        tokio::spawn(async move {
386            for handler in handlers.deref() {
387                let control = match handler {
388                    HandlerOrService::Handler(handler) => {
389                        handler.call(context.clone(), event.clone()).await
390                    }
391                    HandlerOrService::Service(service) => {
392                        service.serve(context.clone(), event.clone()).await
393                    }
394                };
395
396                if let HandlerControl::Block = control {
397                    break;
398                }
399            }
400        });
401        Ok(())
402    }
403
404    fn check_is_echo(msg: &str) -> Option<String> {
405        let msg = serde_json::from_str::<serde_json::Value>(msg).unwrap();
406        if let serde_json::Value::Object(obj) = msg
407            && let Some(serde_json::Value::String(echo)) = obj.get("echo")
408        {
409            return Some(echo.clone());
410        }
411        None
412    }
413}