use std::error::Error;
use std::fmt::{Display, Formatter};
use std::io;
use std::process::Command;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
use crate::server::serve;
use crate::telegram::{
parse_get_updates_response, TelegramPollingConfig, TelegramPollingError, TelegramPollingReply,
};
#[derive(Debug)]
pub enum TelegramPollingRuntimeError {
Transport(String),
Polling(TelegramPollingError),
}
impl Display for TelegramPollingRuntimeError {
fn fmt(&self, formatter: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Transport(message) => write!(formatter, "telegram transport error: {message}"),
Self::Polling(error) => write!(formatter, "telegram polling error: {error}"),
}
}
}
impl Error for TelegramPollingRuntimeError {}
impl From<TelegramPollingError> for TelegramPollingRuntimeError {
fn from(value: TelegramPollingError) -> Self {
Self::Polling(value)
}
}
pub trait TelegramTransport {
fn get_updates(&mut self, url: &str) -> Result<String, TelegramPollingRuntimeError>;
fn send_message(
&mut self,
url: &str,
body: &str,
) -> Result<String, TelegramPollingRuntimeError>;
}
pub struct CurlTelegramTransport {
http_timeout_seconds: u32,
}
impl CurlTelegramTransport {
#[must_use]
pub const fn new(http_timeout_seconds: u32) -> Self {
Self {
http_timeout_seconds,
}
}
fn run_curl(args: &[&str]) -> Result<String, TelegramPollingRuntimeError> {
let output = Command::new("curl").args(args).output().map_err(|error| {
if error.kind() == io::ErrorKind::NotFound {
TelegramPollingRuntimeError::Transport(String::from(
"curl is required for the Telegram polling client; install curl and retry",
))
} else {
TelegramPollingRuntimeError::Transport(error.to_string())
}
})?;
if output.status.success() {
Ok(String::from_utf8_lossy(&output.stdout).into_owned())
} else {
let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
Err(TelegramPollingRuntimeError::Transport(format!(
"curl exited with {status}: {stderr}",
status = output.status,
)))
}
}
}
impl TelegramTransport for CurlTelegramTransport {
fn get_updates(&mut self, url: &str) -> Result<String, TelegramPollingRuntimeError> {
let timeout = self.http_timeout_seconds.to_string();
let args = [
"--silent",
"--show-error",
"--fail",
"--max-time",
&timeout,
url,
];
Self::run_curl(&args)
}
fn send_message(
&mut self,
url: &str,
body: &str,
) -> Result<String, TelegramPollingRuntimeError> {
let timeout = self.http_timeout_seconds.to_string();
let args = [
"--silent",
"--show-error",
"--fail",
"--max-time",
&timeout,
"-H",
"content-type: application/json",
"-X",
"POST",
"-d",
body,
url,
];
Self::run_curl(&args)
}
}
pub fn run_telegram_polling(
config: &TelegramPollingConfig,
initial_offset: Option<i64>,
cancellation: Arc<AtomicBool>,
) -> Result<(), TelegramPollingRuntimeError> {
let mut transport = CurlTelegramTransport::new(config.http_timeout_seconds());
run_telegram_polling_with_transport(config, initial_offset, cancellation, &mut transport)
}
#[allow(clippy::needless_pass_by_value)]
pub fn run_telegram_polling_with_transport<T: TelegramTransport>(
config: &TelegramPollingConfig,
initial_offset: Option<i64>,
cancellation: Arc<AtomicBool>,
transport: &mut T,
) -> Result<(), TelegramPollingRuntimeError> {
eprintln!(
"formal-ai telegram polling started: api_base={} timeout={}s limit={}",
config.api_base, config.timeout_seconds, config.limit
);
let mut offset = initial_offset;
while !cancellation.load(Ordering::Relaxed) {
let updates_url = config.get_updates_url(offset);
let body = match transport.get_updates(&updates_url) {
Ok(body) => body,
Err(error) => {
eprintln!("telegram-poll: getUpdates failed: {error}");
sleep_with_cancellation(Duration::from_secs(1), &cancellation);
continue;
}
};
let batch = match parse_get_updates_response(&body) {
Ok(batch) => batch,
Err(error) => {
eprintln!("telegram-poll: invalid getUpdates response: {error}");
sleep_with_cancellation(Duration::from_secs(1), &cancellation);
continue;
}
};
if let Some(next_offset) = batch.next_offset {
offset = Some(next_offset);
}
for reply in &batch.replies {
send_reply(config, transport, reply);
}
}
eprintln!("formal-ai telegram polling stopped");
Ok(())
}
pub fn run_telegram_webhook_server(address: &str) -> io::Result<()> {
serve(address)
}
fn send_reply<T: TelegramTransport>(
config: &TelegramPollingConfig,
transport: &mut T,
reply: &TelegramPollingReply,
) {
let send_url = config.send_message_url();
let body = reply.to_send_message_body();
match transport.send_message(&send_url, &body) {
Ok(_) => {
eprintln!(
"telegram-poll: sent reply to chat_id={} (message_id={})",
reply.chat_id, reply.reply_parameters.message_id
);
}
Err(error) => {
eprintln!(
"telegram-poll: sendMessage to chat_id={} failed: {error}",
reply.chat_id
);
}
}
}
fn sleep_with_cancellation(total: Duration, cancellation: &AtomicBool) {
let step = Duration::from_millis(200);
let mut remaining = total;
while remaining > Duration::ZERO {
if cancellation.load(Ordering::Relaxed) {
return;
}
let sleep_for = std::cmp::min(step, remaining);
sleep(sleep_for);
remaining = remaining.saturating_sub(sleep_for);
}
}