pub mod alerts;
pub mod commands;
pub mod formatter;
use std::collections::HashMap;
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use teloxide::prelude::*;
use teloxide::types::ParseMode;
use teloxide::utils::command::BotCommands;
use tokio_util::sync::CancellationToken;
use crate::client::{ChatMessage, CommandExecutor, CommandResult, TrustyCommand};
use alerts::{AlertConfig, LastSeen};
use commands::TelegramCommand;
use formatter::TelegramFormatter;
type ChatHistories = Arc<Mutex<HashMap<i64, Vec<ChatMessage>>>>;
const LLM_NOT_CONFIGURED: &str =
"LLM chat not configured — set OPENROUTER_API_KEY in .env.local and enable the overseer";
const SESSION_POLL_INTERVAL: Duration = Duration::from_secs(10);
const OVERSEER_POLL_INTERVAL: Duration = Duration::from_secs(30);
#[derive(Debug, Clone, Default)]
pub struct BotOptions {
pub allowed_user_id: Option<i64>,
pub alert_chat_id: Option<i64>,
}
pub fn resolve_token(var_name: &str) -> Option<String> {
for file in [".env.local", ".env"] {
if let Some(value) = read_dotenv_key(Path::new(file), var_name) {
return Some(value);
}
}
std::env::var(var_name).ok().filter(|v| !v.is_empty())
}
fn read_dotenv_key(path: &Path, var_name: &str) -> Option<String> {
let contents = std::fs::read_to_string(path).ok()?;
for line in contents.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with('#') {
continue;
}
if let Some((key, value)) = line.split_once('=')
&& key.trim() == var_name
{
let value = value.trim().trim_matches('"').trim_matches('\'').trim();
if !value.is_empty() {
return Some(value.to_string());
}
}
}
None
}
fn is_authorized(options: &BotOptions, user_id: Option<i64>) -> bool {
match options.allowed_user_id {
None => true,
Some(allowed) => user_id == Some(allowed),
}
}
pub async fn run(
url: String,
token: Option<String>,
check: bool,
options: BotOptions,
) -> anyhow::Result<()> {
let alert_config = AlertConfig::recommended();
if check {
println!("trusty-mpm Telegram bot configuration:");
println!(" daemon url : {url}");
println!(
" token configured : {}",
if token.is_some() { "yes" } else { "no" }
);
println!(" alert categories : {:?}", alert_config.categories);
println!(" memory alerts : {}", alert_config.memory_alerts);
println!(
" alert chat id : {}",
options
.alert_chat_id
.map(|i| i.to_string())
.unwrap_or_else(|| "none".into())
);
println!(
" allowed user id : {}",
options
.allowed_user_id
.map(|i| i.to_string())
.unwrap_or_else(|| "unrestricted".into())
);
println!();
println!("{}", crate::client::command::help_text());
return Ok(());
}
let token = token.ok_or_else(|| {
anyhow::anyhow!("TELEGRAM_BOT_TOKEN is required (or pass --check to validate config)")
})?;
let bot = Bot::new(token);
bot.set_my_commands(TelegramCommand::bot_commands()).await?;
let shutdown = CancellationToken::new();
if let Some(chat_id) = options.alert_chat_id {
let alert_bot = bot.clone();
let alert_url = url.clone();
let alert_cfg = alert_config.clone();
let token = shutdown.clone();
tokio::spawn(async move {
run_alert_loop(alert_bot, ChatId(chat_id), alert_url, alert_cfg, token).await;
});
}
let executor = Arc::new(CommandExecutor::new(url));
let opts = Arc::new(options);
let histories: ChatHistories = Arc::new(Mutex::new(HashMap::new()));
let handler = dptree::entry()
.branch(Update::filter_message().endpoint(on_message))
.branch(Update::filter_callback_query().endpoint(on_callback));
Dispatcher::builder(bot, handler)
.dependencies(dptree::deps![executor, opts, histories])
.enable_ctrlc_handler()
.build()
.dispatch()
.await;
shutdown.cancel();
Ok(())
}
async fn on_message(
bot: Bot,
msg: Message,
executor: Arc<CommandExecutor>,
options: Arc<BotOptions>,
histories: ChatHistories,
) -> ResponseResult<()> {
let Some(text) = msg.text() else {
return Ok(());
};
let user_id = msg.from.as_ref().map(|u| u.id.0 as i64);
if !is_authorized(&options, user_id) {
tracing::warn!(?user_id, "unauthorized Telegram message rejected");
bot.send_message(
msg.chat.id,
"🔒 This bot is restricted to authorized operators.",
)
.await?;
return Ok(());
}
let command = match TelegramCommand::parse(text, "trusty_mpm_bot") {
Ok(cmd) => cmd,
Err(_) => {
if !text.trim().is_empty() {
let reply = llm_chat_reply(&executor, &histories, msg.chat.id.0, text).await;
bot.send_message(msg.chat.id, reply)
.parse_mode(ParseMode::Html)
.await?;
}
return Ok(());
}
};
let result = dispatch_command(command, &executor, msg.chat.id.0).await;
let body = TelegramFormatter::format(&result);
let mut send = bot
.send_message(msg.chat.id, body)
.parse_mode(ParseMode::Html);
if let Some(keyboard) = TelegramFormatter::keyboard_for(&result) {
send = send.reply_markup(keyboard);
}
send.await?;
Ok(())
}
async fn dispatch_command(
command: TelegramCommand,
executor: &CommandExecutor,
chat_id: i64,
) -> CommandResult {
match &command {
TelegramCommand::Pair(code) if !code.trim().is_empty() => {
executor.pair_confirm(code.trim(), chat_id).await
}
TelegramCommand::Start(code) if !code.trim().is_empty() => {
executor.pair_confirm(code.trim(), chat_id).await
}
_ => executor.execute(TrustyCommand::from(command)).await,
}
}
async fn llm_chat_reply(
executor: &CommandExecutor,
histories: &ChatHistories,
chat_id: i64,
text: &str,
) -> String {
let history = {
let guard = histories.lock().expect("chat history mutex poisoned");
guard.get(&chat_id).cloned().unwrap_or_default()
};
match executor.client().llm_chat(text, &history).await {
Ok(Some(outcome)) => {
histories
.lock()
.expect("chat history mutex poisoned")
.insert(chat_id, outcome.history);
formatter::html_escape(&outcome.reply)
}
Ok(None) => LLM_NOT_CONFIGURED.to_string(),
Err(e) => format!("❌ chat: daemon error: {e}"),
}
}
async fn on_callback(
bot: Bot,
query: CallbackQuery,
executor: Arc<CommandExecutor>,
options: Arc<BotOptions>,
) -> ResponseResult<()> {
bot.answer_callback_query(query.id.clone()).await?;
let user_id = Some(query.from.id.0 as i64);
if !is_authorized(&options, user_id) {
tracing::warn!(?user_id, "unauthorized Telegram callback rejected");
return Ok(());
}
let Some(data) = query.data.as_deref() else {
return Ok(());
};
let Some(chat_id) = query.message.as_ref().map(|m| m.chat().id) else {
return Ok(());
};
let result = match data.split_once(':') {
Some(("status", id)) => Some(
executor
.execute(TrustyCommand::Status {
session_id: id.to_string(),
})
.await,
),
Some(("approve", id)) => Some(
executor
.execute(TrustyCommand::Approve {
session_id: id.to_string(),
})
.await,
),
Some(("deny", id)) => Some(
executor
.execute(TrustyCommand::Deny {
session_id: id.to_string(),
})
.await,
),
Some(("adopt", session)) => Some(
executor
.execute(TrustyCommand::Adopt {
session: session.to_string(),
})
.await,
),
Some(("setproj", path)) => Some(executor.register_project(path).await),
_ => None,
};
if let Some(result) = result {
bot.send_message(chat_id, TelegramFormatter::format(&result))
.parse_mode(ParseMode::Html)
.await?;
}
Ok(())
}
pub async fn run_alert_loop(
bot: Bot,
chat_id: ChatId,
daemon_url: String,
config: AlertConfig,
shutdown: CancellationToken,
) {
let client = reqwest::Client::new();
let last_seen = Arc::new(Mutex::new(LastSeen::new()));
let mut session_tick = tokio::time::interval(SESSION_POLL_INTERVAL);
let mut overseer_tick = tokio::time::interval(OVERSEER_POLL_INTERVAL);
loop {
tokio::select! {
_ = shutdown.cancelled() => {
tracing::info!("alert loop shutting down");
return;
}
_ = session_tick.tick() => {
let alerts = poll_session_alerts(&client, &daemon_url, &config, &last_seen).await;
for alert in alerts {
if let Err(e) = bot.send_message(chat_id, &alert.message).await {
tracing::warn!("failed to send alert: {e}");
}
}
}
_ = overseer_tick.tick() => {
if let Some(msg) = poll_overseer_alert(&client, &daemon_url).await
&& let Err(e) = bot.send_message(chat_id, &msg).await {
tracing::warn!("failed to send overseer alert: {e}");
}
}
}
}
}
async fn poll_session_alerts(
client: &reqwest::Client,
daemon_url: &str,
config: &AlertConfig,
last_seen: &Mutex<LastSeen>,
) -> Vec<alerts::PendingAlert> {
let sessions: Vec<serde_json::Value> =
match client.get(format!("{daemon_url}/sessions")).send().await {
Ok(r) => match r.json::<serde_json::Value>().await {
Ok(b) => b["sessions"].as_array().cloned().unwrap_or_default(),
Err(_) => return Vec::new(),
},
Err(_) => return Vec::new(),
};
let mut events_by_session = std::collections::HashMap::new();
for s in &sessions {
let Some(id) = s["id"].as_str() else { continue };
let url = format!("{daemon_url}/sessions/{id}/events/poll");
if let Ok(r) = client.get(&url).send().await
&& let Ok(body) = r.json::<serde_json::Value>().await
{
let events = body["events"].as_array().cloned().unwrap_or_default();
events_by_session.insert(id.to_string(), events);
}
}
let mut guard = last_seen.lock().expect("last_seen mutex poisoned");
alerts::check_and_alert(&sessions, &events_by_session, &mut guard, config)
}
async fn poll_overseer_alert(client: &reqwest::Client, daemon_url: &str) -> Option<String> {
let body: serde_json::Value = client
.get(format!("{daemon_url}/overseer"))
.send()
.await
.ok()?
.json()
.await
.ok()?;
let o = &body["overseer"];
if !o["enabled"].as_bool().unwrap_or(false) {
return None;
}
let blocked = o["blocked_session"].as_str()?;
Some(alerts::format_overseer_block_alert(blocked))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn resolve_token_reads_dotenv() {
use std::io::Write;
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join(".env");
let mut file = std::fs::File::create(&path).unwrap();
writeln!(file, "TELEGRAM_BOT_TOKEN=\"123:ABC\"").unwrap();
let value = read_dotenv_key(&path, "TELEGRAM_BOT_TOKEN");
assert_eq!(value.as_deref(), Some("123:ABC"));
}
#[test]
fn resolve_token_missing_is_none() {
let value = read_dotenv_key(Path::new("/no/such/.env"), "TELEGRAM_BOT_TOKEN");
assert!(value.is_none());
}
#[test]
fn authorization_respects_allowed_user() {
let unrestricted = BotOptions::default();
assert!(is_authorized(&unrestricted, Some(7)));
assert!(is_authorized(&unrestricted, None));
let restricted = BotOptions {
allowed_user_id: Some(42),
alert_chat_id: None,
};
assert!(is_authorized(&restricted, Some(42)));
assert!(!is_authorized(&restricted, Some(99)));
assert!(!is_authorized(&restricted, None));
}
async fn spawn_test_daemon() -> (std::sync::Arc<crate::daemon::state::DaemonState>, String) {
use crate::daemon::{api, state::DaemonState};
use std::future::IntoFuture;
let root = tempfile::tempdir().unwrap().keep();
let state = std::sync::Arc::new(DaemonState::with_root(root));
let router = api::router(std::sync::Arc::clone(&state));
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(axum::serve(listener, router).into_future());
(state, format!("http://{addr}"))
}
#[tokio::test]
async fn llm_chat_reply_reports_unconfigured() {
let (_state, url) = spawn_test_daemon().await;
let executor = CommandExecutor::new(url);
let histories: ChatHistories = Arc::new(Mutex::new(HashMap::new()));
let reply = llm_chat_reply(&executor, &histories, 42, "hello there").await;
assert_eq!(reply, LLM_NOT_CONFIGURED);
assert!(histories.lock().unwrap().get(&42).is_none());
}
#[tokio::test]
async fn dispatch_help_returns_help() {
let executor = CommandExecutor::new("http://unused");
let result = dispatch_command(TelegramCommand::Help, &executor, 1).await;
assert!(matches!(result, CommandResult::Help(_)));
}
#[tokio::test]
async fn dispatch_start_with_no_code_queries_state() {
let (_state, url) = spawn_test_daemon().await;
let executor = CommandExecutor::new(url);
let result = dispatch_command(TelegramCommand::Start(String::new()), &executor, 1).await;
match result {
CommandResult::PairState { paired } => assert!(!paired),
other => panic!("expected PairState, got {other:?}"),
}
}
#[tokio::test]
async fn dispatch_start_with_deep_link_code_confirms() {
let (state, url) = spawn_test_daemon().await;
let code = state.generate_pair_code();
let executor = CommandExecutor::new(url);
let result = dispatch_command(TelegramCommand::Start(code), &executor, 555).await;
match result {
CommandResult::PairSuccess { chat_info } => assert!(chat_info.contains("555")),
other => panic!("expected PairSuccess, got {other:?}"),
}
}
#[tokio::test]
async fn dispatch_pair_with_bad_code_errors() {
let (_state, url) = spawn_test_daemon().await;
let executor = CommandExecutor::new(url);
let result = dispatch_command(TelegramCommand::Pair("ZZZZZZ".into()), &executor, 1).await;
assert!(matches!(result, CommandResult::Error(_)));
}
}