pub mod api_requests;
pub mod requests;
pub mod structs;
use log::{debug, info, log_enabled};
use requests::*;
use std::time::Duration;
use std::{panic, sync::Arc};
use tokio::time::interval;
use tokio::{select, sync::Mutex};
use tokio::{
sync::mpsc::{Receiver, Sender, channel},
time::Instant,
};
use crate::structs::config::Config;
use crate::structs::context::{UnifyContext, UnifyedContext};
use crate::structs::middleware::MiddlewareChain;
use crate::structs::struct_to_vec::param;
use crate::structs::tg::TGGetUpdates;
use crate::structs::vk::{VKGetServerResponse, VKGetUpdates, VKTs};
async fn get_vk_updates(
server: &mut str,
key: &mut str,
ts: &mut String,
tx: &Sender<UnifyedContext>,
config: Arc<Config>,
) {
let get_updates = request(
server,
&config.vk_access_token,
vec![
param("act", "a_check"),
param("key", key.to_string()),
param("ts", ts.to_string()),
param("wait", "25"),
],
)
.await;
let empty_str = "".to_string();
let updates_str = get_updates.as_ref().unwrap_or(&empty_str);
let updates: VKGetUpdates = serde_json::from_str(updates_str).unwrap_or(VKGetUpdates {
ts: ts.clone(),
updates: Some(vec![]),
failed: Some(1),
});
*ts = updates.ts;
if updates.failed.is_some() {
if let Ok(vk_ts) = serde_json::from_str::<VKTs>(updates_str) {
*ts = vk_ts.ts;
}
return;
}
let vk_updates = updates.updates.unwrap_or_default();
debug!(
"[LONGPOLL] [VK] Got {} updates, processing",
vk_updates.len()
);
for update in vk_updates {
let unified = update.unify(config.clone());
tx.send(unified).await.unwrap();
}
}
async fn get_vk_settings(config: Arc<Config>) -> VKGetServerResponse {
let vk_group_id = config.vk_group_id.to_string();
let get_server = request(
"https://api.vk.com/method/groups.getLongPollServer",
&config.vk_access_token,
vec![
param("group_id", vk_group_id),
param("v", &config.vk_api_version),
],
)
.await
.unwrap();
let server: VKGetServerResponse = serde_json::from_str(&get_server).unwrap();
debug!("[LONGPOLL] [VK] Got longpoll server: {:?}", server);
server
}
async fn get_tg_updates(offset: &mut i64, tx: &Sender<UnifyedContext>, config: Arc<Config>) {
let get_updates = request(
&format!(
"https://api.telegram.org/{}/getUpdates",
config.tg_access_token
),
"",
vec![
param("timeout", "25"),
param("offset", offset.to_string()),
param("limit", "100"),
],
)
.await;
let updates: TGGetUpdates = serde_json::from_str(&get_updates.unwrap_or("".to_string()))
.unwrap_or(TGGetUpdates {
ok: false,
result: vec![],
});
debug!(
"[LONGPOLL] [TELEGRAM] Got {} updates, processing",
updates.result.len()
);
for update in updates.result {
let unified = update.unify(config.clone());
tx.send(unified).await.unwrap();
*offset = update.update_id + 1;
}
}
pub async fn start_longpoll_client(middleware: MiddlewareChain, config: Config) {
info!("Start getting updates...");
let config = Arc::new(config.check());
let vk_settings = get_vk_settings(config.clone()).await;
let mut server = vk_settings.response.server;
let mut key = vk_settings.response.key;
let mut ts = vk_settings.response.ts;
let mut offset: i64 = 0;
let (tx, rx): (Sender<UnifyedContext>, Receiver<UnifyedContext>) = channel(100);
let rx = Arc::new(Mutex::new(rx));
let middleware = Arc::new(middleware);
for _i in 0..4 {
let rx_clone = Arc::clone(&rx);
let middleware_clone = Arc::clone(&middleware);
tokio::task::spawn(async move {
loop {
if let Some(update) = rx_clone.lock().await.recv().await {
if log_enabled!(log::Level::Debug) {
let start_time = Instant::now();
middleware_clone.execute(update).await;
let end_time = Instant::now();
let elapsed_time = end_time.duration_since(start_time);
debug!("Processing time: {:?}", elapsed_time);
} else {
middleware_clone.execute(update).await
}
}
}
});
}
let mut interval = interval(Duration::from_secs(600));
loop {
let vk_task = get_vk_updates(&mut server, &mut key, &mut ts, &tx, config.clone());
let tg_task = get_tg_updates(&mut offset, &tx, config.clone());
select! {
_ = vk_task => {
},
_ = tg_task => {
},
_ = interval.tick() => {
let vk_settings = get_vk_settings(config.clone()).await;
server = vk_settings.response.server;
key = vk_settings.response.key;
ts = vk_settings.response.ts;
},
}
}
}