use std::sync::Arc;
use std::{env, time::Duration};
use grammers_client::Client;
use grammers_client::client::UpdatesConfiguration;
use grammers_client::update::Update;
use grammers_mtsender::SenderPool;
use grammers_session::storages::SqliteSession;
use simple_logger::SimpleLogger;
use tokio::task::JoinSet;
use tokio::{runtime, time::sleep};
type Result = std::result::Result<(), Box<dyn std::error::Error + Send + Sync>>;
const SESSION_FILE: &str = "echo.session";
async fn handle_update(client: Client, update: Update) {
match update {
Update::NewMessage(message) if !message.outgoing() => {
let peer = message.peer().unwrap();
println!(
"Responding to {}",
peer.name().unwrap_or(&format!("id {}", message.peer_id()))
);
if message.text() == "slow" {
sleep(Duration::from_secs(5)).await;
}
if let Err(e) = client
.send_message(peer.to_ref().await.unwrap().unwrap(), message.text())
.await
{
println!("Failed to respond! {e}");
};
}
_ => {}
}
}
async fn async_main() -> Result {
SimpleLogger::new()
.with_level(log::LevelFilter::Debug)
.init()
.unwrap();
let api_id = env!("TG_ID").parse().expect("TG_ID invalid");
let token = env::args().nth(1).expect("token missing");
let session = Arc::new(SqliteSession::open(SESSION_FILE).await?);
let SenderPool {
runner,
updates,
handle,
} = SenderPool::new(Arc::clone(&session), api_id);
let client = Client::new(handle.clone());
let pool_task = tokio::spawn(runner.run());
if !client.is_authorized().await? {
println!("Signing in...");
client.bot_sign_in(&token, env!("TG_HASH")).await?;
println!("Signed in!");
}
println!("Waiting for messages...");
let mut handler_tasks = JoinSet::new();
let mut updates = client
.stream_updates(
updates,
UpdatesConfiguration {
catch_up: true,
..Default::default()
},
)
.await?;
loop {
while let Some(_) = handler_tasks.try_join_next() {}
tokio::select! {
_ = tokio::signal::ctrl_c() => break,
update = updates.next() => {
let update = update?;
let handle = client.clone();
handler_tasks.spawn(handle_update(handle, update));
}
}
}
println!("Saving session file...");
updates.sync_update_state().await?;
println!("Gracefully closing connection to notify all pending handlers...");
handle.quit();
let _ = pool_task.await;
println!("Waiting for any slow handlers to finish...");
while let Some(_) = handler_tasks.join_next().await {}
Ok(())
}
fn main() -> Result {
runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async_main())
}