fast_vk 0.2.0

Low-level VK API library designed for millions requests
Documentation
use crate::{VkError as VkError, VkResult, Error};
use serde::Serialize;
use serde_json::value::Value;

use std::sync::Arc;
use tokio::task::JoinHandle;
use tokio::time::sleep;

use super::execute_manager::Event;
use super::{Instance, Message, Method, Sender};
use vk_execute_compiler::ExecuteCompiler;

pub struct Worker {
    #[allow(dead_code)]
    id: usize,
    #[allow(dead_code)]
    thread: JoinHandle<()>,
}

impl Worker {
    pub fn new(
        id: usize,
        instance: Instance,
        receiver: crossbeam_channel::Receiver<Message>,
        event_sender: crossbeam_channel::Sender<Event>,
    ) -> Worker {
        let thread = tokio::spawn(async move {
            loop {
                event_sender.send(Event::FreeWorker).unwrap();

                match receiver.recv() {
                    Ok(message) => match message {
                        Message::NewMethod(method, sender) => {
                            Worker::handle_method(method, sender, &instance)
                        }
                        Message::NewExecute(methods, senders) => {
                            Worker::handle_execute(methods, senders, &instance)
                        }
                        Message::Terminate => {
                            break;
                        }
                    },
                    Err(e) => {
                        panic!("{e}");
                    }
                }

                sleep(instance.time_between_requests).await;
            }
        });

        Worker {
            thread,
            id,
        }
    }

    fn handle_method(
        method: Method,
        sender: Sender,
        instance: &Instance,
    ) {
        let url = format!("{}/method/{}", &instance.api_url, method.name);
        let mut req = instance.client
            .post(url)
            .header("Content-Length", 0)
            // .query(&method.params)
            .query(&[
                ("access_token", &instance.token),
            ])
            .query(&[
                ("v", &instance.api_version),
            ])
            .build()
            .unwrap();
        
        {
            let mut pairs = req.url_mut().query_pairs_mut();
            let serializer = comma_serde_urlencoded::Serializer::new(&mut pairs);
            method.params.serialize(serializer).unwrap();
        }

        let req = instance.client.execute(req);
        
        tokio::spawn(async move {
            let response = req.await;

            let resp = match response {
                Ok(response) => match response.json::<VkResult<Value>>().await {
                    Ok(json) => json.into(),
                    Err(error) => {
                        Err(Error::Custom(error.into()))
                    }
                },
                Err(error) => Err(Error::Custom(error.into())),
            };

            sender
                .send(resp)
                .unwrap();
        });
    }

    fn handle_execute(
        methods: Vec<Method>,
        senders: Vec<Sender>,
        instance: &Instance,
    ) {
        let execute = ExecuteCompiler::compile(methods);

        let url = format!("{}/method/execute", &instance.api_url);
        let req = instance.client
            .post(url)
            .header("Content-Length", 0)
            .query(&[("code", execute)])
            .query(&[("access_token", &instance.token)])
            .query(&[("v", "5.103")])
            .send();

        tokio::spawn(async move {
            let mut raw_response = match req.await {
                Ok(response) => response.json().await.unwrap(),
                Err(error) => {
                    let error = Arc::new(Error::Custom(error.into()));

                    for sender in senders {
                        sender.send(Err(Error::Arc(Arc::clone(&error)))).unwrap();
                    }

                    return;
                }
            };

            let execute_errors_raw = if let Value::Object(ref mut map) = raw_response {
                map.remove("execute_errors")
            } else {
                None
            };

            let mut execute_errors: Vec<VkError> = Vec::new();

            if let Some(execute_errors_value) = execute_errors_raw {
                execute_errors = serde_json::from_value(execute_errors_value).unwrap();
            }

            let response: Result<Value, VkError> = serde_json::from_value::<VkResult<Value>>(raw_response).unwrap().into();

            match response {
                Ok(responses) => {
                    let responses: Vec<Value> = serde_json::from_value(responses).unwrap();

                    for (sender, response) in senders.into_iter().zip(responses.into_iter()) {
                        if let Some(bool) = response.as_bool() {
                            if bool == false {
                                sender
                                    .send(Err(Error::VK(execute_errors.remove(0))))
                                    .unwrap();
                            }
                        } else {
                            sender.send(Ok(response)).unwrap();
                        }
                    }
                }
                Err(error) => {
                    let error = Arc::new(Error::VK(error));

                    for sender in senders {
                        sender.send(Err(Error::Arc(Arc::clone(&error)))).unwrap();
                    }
                }
            }
        });
    }
}