1use crate::api::{LongPollServer, VkApi};
2use crate::error::{VkError, VkResult};
3use crate::handler::MessageHandler;
4use crate::models::{Event, Update};
5use crate::vk_error;
6use futures::stream::{self, StreamExt};
7use log::{info, warn};
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::time::sleep;
11
12#[derive(Debug, Clone)]
14pub struct VkBotConfig {
15 pub long_poll_timeout: u64,
17 pub max_concurrent_handlers: usize,
19 pub auto_reconnect: bool,
21 pub reconnect_delay: u64,
23 pub max_reconnect_attempts: u32,
25 pub log_updates: bool,
27}
28
29impl Default for VkBotConfig {
30 fn default() -> Self {
31 Self {
32 long_poll_timeout: 25,
33 max_concurrent_handlers: 10,
34 auto_reconnect: true,
35 reconnect_delay: 5,
36 max_reconnect_attempts: 10,
37 log_updates: false,
38 }
39 }
40}
41
42#[derive(Debug, Default)]
44pub struct VkBotBuilder {
45 token: Option<String>,
46 group_id: Option<i64>,
47 config: VkBotConfig,
48}
49
50impl VkBotBuilder {
51 pub fn new() -> Self {
53 Self::default()
54 }
55
56 pub fn token<T: Into<String>>(mut self, token: T) -> Self {
58 self.token = Some(token.into());
59 self
60 }
61
62 pub fn group_id(mut self, group_id: i64) -> Self {
64 self.group_id = Some(group_id);
65 self
66 }
67
68 pub fn long_poll_timeout(mut self, timeout: u64) -> Self {
70 self.config.long_poll_timeout = timeout;
71 self
72 }
73
74 pub fn max_concurrent_handlers(mut self, max: usize) -> Self {
76 self.config.max_concurrent_handlers = max;
77 self
78 }
79
80 pub fn auto_reconnect(mut self, enable: bool) -> Self {
82 self.config.auto_reconnect = enable;
83 self
84 }
85
86 pub fn reconnect_delay(mut self, delay: u64) -> Self {
88 self.config.reconnect_delay = delay;
89 self
90 }
91
92 pub fn max_reconnect_attempts(mut self, attempts: u32) -> Self {
94 self.config.max_reconnect_attempts = attempts;
95 self
96 }
97
98 pub fn log_updates(mut self, enable: bool) -> Self {
100 self.config.log_updates = enable;
101 self
102 }
103
104 pub fn build(self) -> VkResult<VkBot> {
106 let token = self
107 .token
108 .ok_or_else(|| VkError::ConfigError("Token is required".to_string()))?;
109 let group_id = self
110 .group_id
111 .ok_or_else(|| VkError::ConfigError("Group ID is required".to_string()))?;
112
113 let api = VkApi::new(&token)?;
114
115 Ok(VkBot {
116 api: Arc::new(api),
117 group_id,
118 config: Arc::new(self.config),
119 long_poll_server: None,
120 handlers: Arc::new(Vec::new()),
121 })
122 }
123}
124
125pub struct VkBot {
127 api: Arc<VkApi>,
128 group_id: i64,
129 config: Arc<VkBotConfig>,
130 long_poll_server: Option<LongPollServer>,
131 handlers: Arc<Vec<Box<dyn MessageHandler>>>,
132}
133
134use std::fmt;
135impl fmt::Debug for VkBot {
136 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
137 f.debug_struct("VkBot")
138 .field("api", &self.api)
139 .field("group_id", &self.group_id)
140 .field("config", &self.config)
141 .field("long_poll_server", &self.long_poll_server)
142 .field(
143 "handlers",
144 &format_args!("Arc<Vec<Box<dyn MessageHandler>>>"),
145 )
146 .finish()
147 }
148}
149
150impl VkBot {
151 pub fn new<T: Into<String>>(token: T, group_id: i64) -> VkResult<Self> {
153 VkBotBuilder::new().token(token).group_id(group_id).build()
154 }
155
156 pub fn builder() -> VkBotBuilder {
158 VkBotBuilder::new()
159 }
160
161 pub fn add_handler<H>(&mut self, handler: H)
163 where
164 H: MessageHandler + 'static,
165 {
166 let handlers =
167 Arc::get_mut(&mut self.handlers).expect("Cannot add handler while bot is running");
168 handlers.push(Box::new(handler));
169 }
170
171 pub async fn init(&mut self) -> VkResult<()> {
173 info!("Initializing bot for group {}...", self.group_id);
174
175 self.long_poll_server = Some(self.api.groups_get_long_poll_server(self.group_id).await?);
176
177 info!("Long Poll server obtained");
178
179 self.get_group_info().await?;
181
182 Ok(())
183 }
184
185 async fn get_group_info(&self) -> VkResult<()> {
187 let response = self
188 .api
189 .groups_get_by_id(&[self.group_id], Some("name,description,members_count"))
190 .await?;
191
192 if let Some(group) = response["response"].as_array().and_then(|arr| arr.first()) {
193 let name = group["name"].as_str().unwrap_or("Unknown");
194 let members = group["members_count"].as_i64().unwrap_or(0);
195 info!("Group: {} (members: {})", name, members);
196 }
197
198 Ok(())
199 }
200
201 async fn long_poll(&mut self) -> VkResult<Vec<Update>> {
203 let server = self
204 .long_poll_server
205 .as_ref()
206 .ok_or_else(|| VkError::ConfigError("Long Poll server not initialized".to_string()))?;
207
208 let url = format!(
209 "{}?act=a_check&key={}&ts={}&wait={}",
210 server.server, server.key, server.ts, self.config.long_poll_timeout
211 );
212
213 let response = reqwest::get(&url).await?;
214
215 if !response.status().is_success() {
216 return Err(VkError::NetworkError(format!(
217 "Long Poll HTTP error: {}",
218 response.status()
219 )));
220 }
221
222 let json_response: serde_json::Value = response.json().await?;
223
224 if let Some(failed) = json_response["failed"].as_i64() {
226 match failed {
227 1 => {
228 if let Some(ts) = json_response["ts"].as_str()
230 && let Some(ref mut server) = self.long_poll_server
231 {
232 server.ts = ts.to_string();
233 }
234 return Ok(Vec::new());
235 }
236 2 | 3 => {
237 warn!("Long Poll failed {}, getting new server...", failed);
239 self.long_poll_server =
240 Some(self.api.groups_get_long_poll_server(self.group_id).await?);
241 return Ok(Vec::new());
242 }
243 _ => {
244 return Err(VkError::api_error(
245 failed as i32,
246 format!("Long Poll failed with code: {}", failed),
247 ));
248 }
249 }
250 }
251
252 if let Some(ts) = json_response["ts"].as_str()
254 && let Some(ref mut server) = self.long_poll_server
255 {
256 server.ts = ts.to_string();
257 }
258
259 let updates = if let Some(updates_array) = json_response["updates"].as_array() {
261 updates_array
262 .iter()
263 .filter_map(|update| serde_json::from_value(update.clone()).ok())
264 .collect()
265 } else {
266 Vec::new()
267 };
268
269 if self.config.log_updates && !updates.is_empty() {
270 info!("Received {} updates", updates.len());
271 }
272
273 Ok(updates)
274 }
275
276 async fn handle_updates(&self, updates: &[Update]) {
278 let handlers = self.handlers.clone();
279 let api = self.api.clone();
280
281 let futures = updates.iter().map(|update| {
283 let handlers = handlers.clone();
284 let api = api.clone();
285 async move {
286 let event = Event::from_update(update);
287
288 for handler in handlers.iter() {
290 if let Err(e) = handler.handle(&event, &api).await {
291 vk_error!("Handler error: {}", e);
292 }
293 }
294 }
295 });
296
297 stream::iter(futures)
298 .buffer_unordered(self.config.max_concurrent_handlers)
299 .for_each(|_| async {})
300 .await;
301 }
302
303 pub async fn run(&mut self) -> VkResult<()> {
305 self.init().await?;
306
307 info!("Bot started. Waiting for events...");
308
309 let mut reconnect_attempts = 0;
310
311 loop {
312 match self.long_poll().await {
313 Ok(updates) => {
314 reconnect_attempts = 0; if !updates.is_empty() {
317 self.handle_updates(&updates).await;
318 }
319 }
320 Err(e) => {
321 vk_error!("Long Poll error: {}", e);
322
323 if !self.config.auto_reconnect {
324 return Err(e);
325 }
326
327 reconnect_attempts += 1;
328
329 if reconnect_attempts > self.config.max_reconnect_attempts {
330 vk_error!("Maximum reconnection attempts reached");
331 return Err(e);
332 }
333
334 warn!(
335 "Reconnecting in {} seconds... (attempt {}/{})",
336 self.config.reconnect_delay,
337 reconnect_attempts,
338 self.config.max_reconnect_attempts
339 );
340
341 sleep(Duration::from_secs(self.config.reconnect_delay)).await;
342
343 if let Err(e) = self.init().await {
345 vk_error!("Failed to reinitialize: {}", e);
346 } else {
347 info!("Reconnected successfully");
348 }
349 }
350 }
351 }
352 }
353
354 pub fn api(&self) -> &VkApi {
356 &self.api
357 }
358
359 pub fn group_id(&self) -> i64 {
361 self.group_id
362 }
363
364 pub fn config(&self) -> &VkBotConfig {
366 &self.config
367 }
368
369 pub async fn broadcast(
371 &self,
372 message: &str,
373 user_ids: &[i64],
374 delay_ms: u64,
375 ) -> VkResult<Vec<(i64, VkResult<i64>)>> {
376 let mut results = Vec::new();
377
378 for &user_id in user_ids {
379 sleep(Duration::from_millis(delay_ms)).await;
380
381 let result = self
382 .api
383 .messages_send(
384 user_id, message, None, None, None, None, None, false, false, None,
385 )
386 .await;
387
388 results.push((user_id, result));
389 }
390
391 Ok(results)
392 }
393
394 pub async fn get_history(&self, peer_id: i64, limit: i32) -> VkResult<Vec<serde_json::Value>> {
396 let response = self
397 .api
398 .messages_get_history(peer_id, 0, limit, None, true)
399 .await?;
400
401 let items = response["response"]["items"]
402 .as_array()
403 .ok_or_else(|| VkError::InvalidResponse("Expected array in response".to_string()))?
404 .clone();
405
406 Ok(items)
407 }
408
409 pub async fn get_user_info(
411 &self,
412 user_id: i64,
413 fields: Option<&str>,
414 ) -> VkResult<serde_json::Value> {
415 let users = self.api.users_get(&[user_id], fields, None).await?;
416
417 users["response"]
418 .as_array()
419 .and_then(|arr| arr.first())
420 .cloned()
421 .ok_or_else(|| VkError::InvalidResponse("User not found".to_string()))
422 }
423
424 pub async fn set_typing(&self, peer_id: i64, user_id: Option<i64>) -> VkResult<()> {
426 self.api
427 .messages_set_activity(peer_id, user_id, "typing")
428 .await?;
429 Ok(())
430 }
431
432 pub async fn mark_as_read(&self, peer_id: i64, start_message_id: Option<i64>) -> VkResult<()> {
434 self.api
435 .messages_mark_as_read(peer_id, start_message_id)
436 .await?;
437 Ok(())
438 }
439}