tgbot 0.44.0

A Telegram Bot library
Documentation
#![allow(missing_docs)]
use std::{
    sync::{
        Arc,
        atomic::{AtomicUsize, Ordering},
    },
    time::{Duration, Instant},
};

use dotenvy::dotenv;
use mockito::{Matcher, Server};
use serde_json::json;
use tgbot::{
    api::Client,
    handler::{LongPoll, LongPollOptions, UpdateHandler},
    types::Update,
};
use tokio::{spawn, sync::Mutex, time::sleep};

struct Handler {
    updates: Arc<Mutex<Vec<Update>>>,
}

impl UpdateHandler for Handler {
    async fn handle(&self, update: Update) {
        let mut updates = self.updates.lock().await;
        updates.push(update);
    }
}

#[tokio::test]
async fn longpoll() {
    dotenv().ok();
    env_logger::init();
    let mut server = Server::new_async().await;
    server
        .mock("POST", "/bot-token/getUpdates")
        .match_body(Matcher::PartialJson(json!({
            "limit": 100,
            "timeout": 10,
            "allowed_updates": []
        })))
        .with_body(
            serde_json::to_vec(&json!({
                "ok": true,
                "result": [
                    {
                        "update_id": 1,
                        "message": {
                            "message_id": 1,
                            "date": 0,
                            "from": {
                                "id": 1,
                                "is_bot": false,
                                "first_name": "test"
                            },
                            "chat": {
                                "id": 1,
                                "type": "private",
                                "first_name": "test"
                            },
                            "text": "test"
                        }
                    },
                    {
                        "update_id": 1,
                        "unknown": {
                            "description": "unknown updates should be parsed",
                        }
                    }
                ]
            }))
            .unwrap(),
        )
        .create();
    let client = Client::new("-token").unwrap().with_host(server.url());
    let updates = Arc::new(Mutex::new(Vec::new()));
    let handler = Handler {
        updates: updates.clone(),
    };
    let poll = LongPoll::new(client, handler);
    let handle = poll.get_handle();
    let wait_updates = updates.clone();
    spawn(async move {
        let now = Instant::now();
        while wait_updates.lock().await.is_empty() {
            if now.elapsed().as_secs() >= 2 {
                break;
            }
            sleep(Duration::from_millis(100)).await;
        }
        handle.shutdown().await
    });
    poll.run().await;
    assert!(!updates.lock().await.is_empty())
}

struct BoundedHandler {
    active: Arc<AtomicUsize>,
    max_active: Arc<AtomicUsize>,
}

impl UpdateHandler for BoundedHandler {
    async fn handle(&self, _update: Update) {
        let count = self.active.fetch_add(1, Ordering::AcqRel) + 1;
        self.max_active.fetch_max(count, Ordering::AcqRel);

        sleep(Duration::from_millis(20)).await;

        self.active.fetch_sub(1, Ordering::AcqRel);
    }
}

#[tokio::test]
async fn longpoll_concurrency_limit() {
    let mut server = Server::new_async().await;

    let updates: Vec<serde_json::Value> = (0..10)
        .map(|i| {
            json!({
                "update_id": i,
                "message": {
                    "message_id": 1,
                    "date": 0,
                    "from": {"id": 1, "is_bot": false, "first_name": "test"},
                    "chat": {"id": 1, "type": "private", "first_name": "test"},
                    "text": "test"
                }
            })
        })
        .collect();

    server
        .mock("POST", "/bot-token/getUpdates")
        .with_body(
            serde_json::to_vec(&json!({
                "ok": true,
                "result": updates
            }))
            .unwrap(),
        )
        .create();

    let client = Client::new("-token").unwrap().with_host(server.url());
    let max_active = Arc::new(AtomicUsize::default());
    let handler = BoundedHandler {
        active: Default::default(),
        max_active: max_active.clone(),
    };

    let limit = 3;
    let options = LongPollOptions::default().with_concurrency_limit(limit);
    let poll = LongPoll::new(client, handler).with_options(options);
    let handle = poll.get_handle();

    spawn(async move {
        sleep(Duration::from_millis(200)).await;
        handle.shutdown().await;
    });

    poll.run().await;

    let max = max_active.load(Ordering::Relaxed);
    assert!(max <= limit, "Max concurrent was {}, but expected {}", max, limit);
}