use std::collections::HashSet;
use std::sync::{Mutex, OnceLock};
use axum::body::Bytes;
use axum::extract::State;
use axum::http::{HeaderMap, StatusCode};
use axum::response::{IntoResponse, Response};
use axum::Json;
use serde_json::{json, Value};
use tandem_channels::signing::verify_telegram_secret_token;
use tandem_channels::telegram_keyboards::{parse_callback_data, ParsedCallbackData};
use crate::AppState;
const DEDUP_CAP: usize = 4096;
static SEEN_UPDATES: OnceLock<Mutex<DedupRing>> = OnceLock::new();
fn dedup_ring() -> &'static Mutex<DedupRing> {
SEEN_UPDATES.get_or_init(|| Mutex::new(DedupRing::new()))
}
struct DedupRing {
set: HashSet<i64>,
order: std::collections::VecDeque<i64>,
}
impl DedupRing {
fn new() -> Self {
Self {
set: HashSet::with_capacity(DEDUP_CAP),
order: std::collections::VecDeque::with_capacity(DEDUP_CAP),
}
}
fn record_new(&mut self, key: i64) -> bool {
if self.set.contains(&key) {
return false;
}
if self.order.len() >= DEDUP_CAP {
if let Some(oldest) = self.order.pop_front() {
self.set.remove(&oldest);
}
}
self.set.insert(key);
self.order.push_back(key);
true
}
}
pub(crate) async fn telegram_interactions(
State(state): State<AppState>,
headers: HeaderMap,
body: Bytes,
) -> Response {
let secret = match read_telegram_secret(&state).await {
Some(s) => s,
None => return reject_unauthorized("telegram webhook secret not configured"),
};
let header_value = headers
.get("x-telegram-bot-api-secret-token")
.and_then(|v| v.to_str().ok());
if let Err(error) = verify_telegram_secret_token(header_value, &secret) {
tracing::warn!(
target: "tandem_server::telegram_interactions",
?error,
"rejecting Telegram update with bad/missing secret token"
);
return reject_unauthorized(&error.to_string());
}
let update: Value = match serde_json::from_slice(&body) {
Ok(v) => v,
Err(err) => return reject_bad_request(&format!("update is not JSON: {err}")),
};
if let Some(update_id) = update.get("update_id").and_then(Value::as_i64) {
let mut guard = dedup_ring().lock().expect("dedup mutex poisoned");
if !guard.record_new(update_id) {
tracing::debug!(
target: "tandem_server::telegram_interactions",
update_id,
"duplicate Telegram update — already processed"
);
return ok_empty();
}
}
let Some(callback_query) = update.get("callback_query") else {
return ok_empty();
};
let callback_data = match callback_query.get("data").and_then(Value::as_str) {
Some(d) => d,
None => return reject_bad_request("callback_query missing data"),
};
let parsed = match parse_callback_data(callback_data) {
Some(p) => p,
None => return reject_bad_request(&format!("unrecognized callback_data: {callback_data}")),
};
if parsed.was_truncated {
tracing::warn!(
target: "tandem_server::telegram_interactions",
"callback_data was truncated; full ID resolution lands in W5"
);
return reject_bad_request("callback identifier truncated; cache resolution not yet wired");
}
let user_id = callback_query
.pointer("/from/id")
.and_then(Value::as_i64)
.map(|id| id.to_string())
.unwrap_or_else(|| "unknown".to_string());
match parsed.action.as_str() {
"approve" | "cancel" => dispatch_decision(state, parsed, &user_id, None).await,
"rework" => {
tracing::info!(
target: "tandem_server::telegram_interactions",
run_id = %parsed.run_id,
"rework button tapped; force-reply capture lands in W5"
);
ok_empty()
}
other => reject_bad_request(&format!("unknown action: {other}")),
}
}
async fn dispatch_decision(
state: AppState,
parsed: ParsedCallbackData,
user_id: &str,
reason: Option<String>,
) -> Response {
let input = crate::http::routines_automations::AutomationV2GateDecisionInput {
decision: parsed.action.clone(),
reason,
};
let result = crate::http::routines_automations::automations_v2_run_gate_decide(
State(state),
axum::extract::Path(parsed.run_id.clone()),
Json(input),
)
.await;
match result {
Ok(_) => {
tracing::info!(
target: "tandem_server::telegram_interactions",
run_id = %parsed.run_id,
user = %user_id,
action = %parsed.action,
"Telegram interaction decided gate"
);
ok_empty()
}
Err((status, body)) => {
tracing::warn!(
target: "tandem_server::telegram_interactions",
run_id = %parsed.run_id,
status = %status,
body = %body.0,
"gate-decide returned non-success"
);
ok_empty()
}
}
}
async fn read_telegram_secret(state: &AppState) -> Option<String> {
let effective = state.config.get_effective_value().await;
effective
.pointer("/channels/telegram/webhook_secret_token")
.and_then(Value::as_str)
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
}
fn reject_unauthorized(reason: &str) -> Response {
(
StatusCode::UNAUTHORIZED,
Json(json!({ "error": "Unauthorized", "reason": reason })),
)
.into_response()
}
fn reject_bad_request(reason: &str) -> Response {
(
StatusCode::BAD_REQUEST,
Json(json!({ "error": "BadRequest", "reason": reason })),
)
.into_response()
}
fn ok_empty() -> Response {
(StatusCode::OK, Json(json!({}))).into_response()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn dedup_ring_returns_false_on_repeat_update_id() {
let mut ring = DedupRing::new();
assert!(ring.record_new(100));
assert!(!ring.record_new(100));
assert!(ring.record_new(101));
}
#[test]
fn dedup_ring_evicts_oldest_at_cap() {
let mut ring = DedupRing::new();
for i in 0..(DEDUP_CAP as i64) {
ring.record_new(i);
}
assert!(!ring.record_new(0));
ring.record_new(DEDUP_CAP as i64);
assert!(ring.record_new(0));
}
#[test]
fn callback_data_format_round_trips() {
let raw = "tdm:approve:auto-v2-run-abc:send_email";
let parsed = parse_callback_data(raw).expect("parses");
assert_eq!(parsed.action, "approve");
assert_eq!(parsed.run_id, "auto-v2-run-abc");
assert_eq!(parsed.node_id, "send_email");
assert!(!parsed.was_truncated);
}
}