Skip to main content

vk_bot_api/
bot.rs

1use crate::api::{LongPollServer, VkApi};
2use crate::error::{VkError, VkResult};
3use crate::handler::MessageHandler;
4use crate::models::{Event, Update};
5use crate::vk_error;
6use futures::stream::{self, StreamExt};
7use log::{info, warn};
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::time::sleep;
11
12/// Bot configuration
13#[derive(Debug, Clone)]
14pub struct VkBotConfig {
15    /// Long Poll wait timeout in seconds (default: 25)
16    pub long_poll_timeout: u64,
17    /// Maximum number of concurrent handlers (default: 10)
18    pub max_concurrent_handlers: usize,
19    /// Enable automatic reconnection (default: true)
20    pub auto_reconnect: bool,
21    /// Reconnection delay in seconds (default: 5)
22    pub reconnect_delay: u64,
23    /// Maximum reconnection attempts (default: 10)
24    pub max_reconnect_attempts: u32,
25    /// Log updates (default: false)
26    pub log_updates: bool,
27}
28
29impl Default for VkBotConfig {
30    fn default() -> Self {
31        Self {
32            long_poll_timeout: 25,
33            max_concurrent_handlers: 10,
34            auto_reconnect: true,
35            reconnect_delay: 5,
36            max_reconnect_attempts: 10,
37            log_updates: false,
38        }
39    }
40}
41
42/// Bot builder
43#[derive(Debug, Default)]
44pub struct VkBotBuilder {
45    token: Option<String>,
46    group_id: Option<i64>,
47    config: VkBotConfig,
48}
49
50impl VkBotBuilder {
51    /// Create new builder
52    pub fn new() -> Self {
53        Self::default()
54    }
55
56    /// Set access token
57    pub fn token<T: Into<String>>(mut self, token: T) -> Self {
58        self.token = Some(token.into());
59        self
60    }
61
62    /// Set group ID
63    pub fn group_id(mut self, group_id: i64) -> Self {
64        self.group_id = Some(group_id);
65        self
66    }
67
68    /// Set Long Poll timeout
69    pub fn long_poll_timeout(mut self, timeout: u64) -> Self {
70        self.config.long_poll_timeout = timeout;
71        self
72    }
73
74    /// Set max concurrent handlers
75    pub fn max_concurrent_handlers(mut self, max: usize) -> Self {
76        self.config.max_concurrent_handlers = max;
77        self
78    }
79
80    /// Enable/disable auto reconnection
81    pub fn auto_reconnect(mut self, enable: bool) -> Self {
82        self.config.auto_reconnect = enable;
83        self
84    }
85
86    /// Set reconnection delay
87    pub fn reconnect_delay(mut self, delay: u64) -> Self {
88        self.config.reconnect_delay = delay;
89        self
90    }
91
92    /// Set max reconnection attempts
93    pub fn max_reconnect_attempts(mut self, attempts: u32) -> Self {
94        self.config.max_reconnect_attempts = attempts;
95        self
96    }
97
98    /// Enable/disable update logging
99    pub fn log_updates(mut self, enable: bool) -> Self {
100        self.config.log_updates = enable;
101        self
102    }
103
104    /// Build bot instance
105    pub fn build(self) -> VkResult<VkBot> {
106        let token = self
107            .token
108            .ok_or_else(|| VkError::ConfigError("Token is required".to_string()))?;
109        let group_id = self
110            .group_id
111            .ok_or_else(|| VkError::ConfigError("Group ID is required".to_string()))?;
112
113        let api = VkApi::new(&token)?;
114
115        Ok(VkBot {
116            api: Arc::new(api),
117            group_id,
118            config: Arc::new(self.config),
119            long_poll_server: None,
120            handlers: Arc::new(Vec::new()),
121        })
122    }
123}
124
125/// Main bot structure
126pub struct VkBot {
127    api: Arc<VkApi>,
128    group_id: i64,
129    config: Arc<VkBotConfig>,
130    long_poll_server: Option<LongPollServer>,
131    handlers: Arc<Vec<Box<dyn MessageHandler>>>,
132}
133
134use std::fmt;
135impl fmt::Debug for VkBot {
136    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
137        f.debug_struct("VkBot")
138            .field("api", &self.api)
139            .field("group_id", &self.group_id)
140            .field("config", &self.config)
141            .field("long_poll_server", &self.long_poll_server)
142            .field(
143                "handlers",
144                &format_args!("Arc<Vec<Box<dyn MessageHandler>>>"),
145            )
146            .finish()
147    }
148}
149
150impl VkBot {
151    /// Create new bot with default config
152    pub fn new<T: Into<String>>(token: T, group_id: i64) -> VkResult<Self> {
153        VkBotBuilder::new().token(token).group_id(group_id).build()
154    }
155
156    /// Create new bot with builder
157    pub fn builder() -> VkBotBuilder {
158        VkBotBuilder::new()
159    }
160
161    /// Add message handler
162    pub fn add_handler<H>(&mut self, handler: H)
163    where
164        H: MessageHandler + 'static,
165    {
166        let handlers =
167            Arc::get_mut(&mut self.handlers).expect("Cannot add handler while bot is running");
168        handlers.push(Box::new(handler));
169    }
170
171    /// Initialize bot (get Long Poll server)
172    pub async fn init(&mut self) -> VkResult<()> {
173        info!("Initializing bot for group {}...", self.group_id);
174
175        self.long_poll_server = Some(self.api.groups_get_long_poll_server(self.group_id).await?);
176
177        info!("Long Poll server obtained");
178
179        // Get group info
180        self.get_group_info().await?;
181
182        Ok(())
183    }
184
185    /// Get group information
186    async fn get_group_info(&self) -> VkResult<()> {
187        let response = self
188            .api
189            .groups_get_by_id(&[self.group_id], Some("name,description,members_count"))
190            .await?;
191
192        if let Some(group) = response["response"].as_array().and_then(|arr| arr.first()) {
193            let name = group["name"].as_str().unwrap_or("Unknown");
194            let members = group["members_count"].as_i64().unwrap_or(0);
195            info!("Group: {} (members: {})", name, members);
196        }
197
198        Ok(())
199    }
200
201    /// Perform Long Poll request
202    async fn long_poll(&mut self) -> VkResult<Vec<Update>> {
203        let server = self
204            .long_poll_server
205            .as_ref()
206            .ok_or_else(|| VkError::ConfigError("Long Poll server not initialized".to_string()))?;
207
208        let url = format!(
209            "{}?act=a_check&key={}&ts={}&wait={}",
210            server.server, server.key, server.ts, self.config.long_poll_timeout
211        );
212
213        let response = reqwest::get(&url).await?;
214
215        if !response.status().is_success() {
216            return Err(VkError::NetworkError(format!(
217                "Long Poll HTTP error: {}",
218                response.status()
219            )));
220        }
221
222        let json_response: serde_json::Value = response.json().await?;
223
224        // Handle Long Poll errors
225        if let Some(failed) = json_response["failed"].as_i64() {
226            match failed {
227                1 => {
228                    // Update ts only
229                    if let Some(ts) = json_response["ts"].as_str()
230                        && let Some(ref mut server) = self.long_poll_server
231                    {
232                        server.ts = ts.to_string();
233                    }
234                    return Ok(Vec::new());
235                }
236                2 | 3 => {
237                    // Need new server
238                    warn!("Long Poll failed {}, getting new server...", failed);
239                    self.long_poll_server =
240                        Some(self.api.groups_get_long_poll_server(self.group_id).await?);
241                    return Ok(Vec::new());
242                }
243                _ => {
244                    return Err(VkError::api_error(
245                        failed as i32,
246                        format!("Long Poll failed with code: {}", failed),
247                    ));
248                }
249            }
250        }
251
252        // Update ts
253        if let Some(ts) = json_response["ts"].as_str()
254            && let Some(ref mut server) = self.long_poll_server
255        {
256            server.ts = ts.to_string();
257        }
258
259        // Parse updates
260        let updates = if let Some(updates_array) = json_response["updates"].as_array() {
261            updates_array
262                .iter()
263                .filter_map(|update| serde_json::from_value(update.clone()).ok())
264                .collect()
265        } else {
266            Vec::new()
267        };
268
269        if self.config.log_updates && !updates.is_empty() {
270            info!("Received {} updates", updates.len());
271        }
272
273        Ok(updates)
274    }
275
276    /// Handle updates
277    async fn handle_updates(&self, updates: &[Update]) {
278        let handlers = self.handlers.clone();
279        let api = self.api.clone();
280
281        // Process updates concurrently with limit
282        let futures = updates.iter().map(|update| {
283            let handlers = handlers.clone();
284            let api = api.clone();
285            async move {
286                let event = Event::from_update(update);
287
288                // Process with all handlers
289                for handler in handlers.iter() {
290                    if let Err(e) = handler.handle(&event, &api).await {
291                        vk_error!("Handler error: {}", e);
292                    }
293                }
294            }
295        });
296
297        stream::iter(futures)
298            .buffer_unordered(self.config.max_concurrent_handlers)
299            .for_each(|_| async {})
300            .await;
301    }
302
303    /// Main bot loop
304    pub async fn run(&mut self) -> VkResult<()> {
305        self.init().await?;
306
307        info!("Bot started. Waiting for events...");
308
309        let mut reconnect_attempts = 0;
310
311        loop {
312            match self.long_poll().await {
313                Ok(updates) => {
314                    reconnect_attempts = 0; // Reset on success
315
316                    if !updates.is_empty() {
317                        self.handle_updates(&updates).await;
318                    }
319                }
320                Err(e) => {
321                    vk_error!("Long Poll error: {}", e);
322
323                    if !self.config.auto_reconnect {
324                        return Err(e);
325                    }
326
327                    reconnect_attempts += 1;
328
329                    if reconnect_attempts > self.config.max_reconnect_attempts {
330                        vk_error!("Maximum reconnection attempts reached");
331                        return Err(e);
332                    }
333
334                    warn!(
335                        "Reconnecting in {} seconds... (attempt {}/{})",
336                        self.config.reconnect_delay,
337                        reconnect_attempts,
338                        self.config.max_reconnect_attempts
339                    );
340
341                    sleep(Duration::from_secs(self.config.reconnect_delay)).await;
342
343                    // Try to reinitialize
344                    if let Err(e) = self.init().await {
345                        vk_error!("Failed to reinitialize: {}", e);
346                    } else {
347                        info!("Reconnected successfully");
348                    }
349                }
350            }
351        }
352    }
353
354    /// Get API client reference
355    pub fn api(&self) -> &VkApi {
356        &self.api
357    }
358
359    /// Get group ID
360    pub fn group_id(&self) -> i64 {
361        self.group_id
362    }
363
364    /// Get bot config
365    pub fn config(&self) -> &VkBotConfig {
366        &self.config
367    }
368
369    /// Send broadcast message to multiple users
370    pub async fn broadcast(
371        &self,
372        message: &str,
373        user_ids: &[i64],
374        delay_ms: u64,
375    ) -> VkResult<Vec<(i64, VkResult<i64>)>> {
376        let mut results = Vec::new();
377
378        for &user_id in user_ids {
379            sleep(Duration::from_millis(delay_ms)).await;
380
381            let result = self
382                .api
383                .messages_send(
384                    user_id, message, None, None, None, None, None, false, false, None,
385                )
386                .await;
387
388            results.push((user_id, result));
389        }
390
391        Ok(results)
392    }
393
394    /// Get conversation history
395    pub async fn get_history(&self, peer_id: i64, limit: i32) -> VkResult<Vec<serde_json::Value>> {
396        let response = self
397            .api
398            .messages_get_history(peer_id, 0, limit, None, true)
399            .await?;
400
401        let items = response["response"]["items"]
402            .as_array()
403            .ok_or_else(|| VkError::InvalidResponse("Expected array in response".to_string()))?
404            .clone();
405
406        Ok(items)
407    }
408
409    /// Get user information
410    pub async fn get_user_info(
411        &self,
412        user_id: i64,
413        fields: Option<&str>,
414    ) -> VkResult<serde_json::Value> {
415        let users = self.api.users_get(&[user_id], fields, None).await?;
416
417        users["response"]
418            .as_array()
419            .and_then(|arr| arr.first())
420            .cloned()
421            .ok_or_else(|| VkError::InvalidResponse("User not found".to_string()))
422    }
423
424    /// Set typing indicator
425    pub async fn set_typing(&self, peer_id: i64, user_id: Option<i64>) -> VkResult<()> {
426        self.api
427            .messages_set_activity(peer_id, user_id, "typing")
428            .await?;
429        Ok(())
430    }
431
432    /// Mark messages as read
433    pub async fn mark_as_read(&self, peer_id: i64, start_message_id: Option<i64>) -> VkResult<()> {
434        self.api
435            .messages_mark_as_read(peer_id, start_message_id)
436            .await?;
437        Ok(())
438    }
439}