use std::time::{Duration, Instant};
use astrid_gateway::rpc::DaemonEvent;
use jsonrpsee::core::client::Subscription;
use teloxide::prelude::*;
use teloxide::types::ParseMode;
use tracing::{info, warn};
use crate::approval::ApprovalManager;
use crate::elicitation::ElicitationManager;
use crate::format::{chunk_html, md_to_telegram_html};
use crate::session::SessionMap;
const EDIT_THROTTLE: Duration = Duration::from_millis(500);
struct TurnState {
text_buffer: String,
last_edit: Instant,
current_msg_id: teloxide::types::MessageId,
finalized_text: bool,
}
pub async fn run_event_loop(
bot: Bot,
chat_id: ChatId,
placeholder_msg_id: teloxide::types::MessageId,
mut subscription: Subscription<DaemonEvent>,
sessions: SessionMap,
approvals: ApprovalManager,
elicitations: ElicitationManager,
) {
let mut state = TurnState {
text_buffer: String::new(),
last_edit: Instant::now()
.checked_sub(EDIT_THROTTLE)
.expect("EDIT_THROTTLE underflow"),
current_msg_id: placeholder_msg_id,
finalized_text: false,
};
while let Some(event) = next_event(&mut subscription).await {
let done = handle_event(
&event,
&mut state,
&bot,
chat_id,
&sessions,
&approvals,
&elicitations,
)
.await;
if done {
break;
}
}
sessions.set_turn_in_progress(chat_id, false).await;
}
async fn handle_event(
event: &DaemonEvent,
state: &mut TurnState,
bot: &Bot,
chat_id: ChatId,
sessions: &SessionMap,
approvals: &ApprovalManager,
elicitations: &ElicitationManager,
) -> bool {
match event {
DaemonEvent::Text(chunk) => {
handle_text(chunk, state, bot, chat_id).await;
false
},
DaemonEvent::ToolCallStart { name, .. } => {
handle_tool_start(name, state, bot, chat_id).await;
false
},
DaemonEvent::ToolCallResult {
result, is_error, ..
} => {
handle_tool_result(result, *is_error, state, bot, chat_id).await;
false
},
DaemonEvent::ApprovalNeeded {
request_id,
request,
} => {
flush_text(state, bot, chat_id).await;
approvals
.send_approval(bot, chat_id, request_id, request)
.await;
false
},
DaemonEvent::ElicitationNeeded {
request_id,
request,
} => {
flush_text(state, bot, chat_id).await;
elicitations
.send_elicitation(bot, chat_id, request_id, request)
.await;
false
},
DaemonEvent::TurnComplete => {
if !state.text_buffer.is_empty() {
finalize_text(bot, chat_id, &mut state.current_msg_id, &state.text_buffer).await;
}
sessions.set_turn_in_progress(chat_id, false).await;
info!("Turn complete for chat {chat_id}");
true
},
DaemonEvent::Error(msg) => {
let html = format!("Error: {}", crate::format::html_escape(msg));
let _ = bot
.send_message(chat_id, html)
.parse_mode(ParseMode::Html)
.await;
sessions.set_turn_in_progress(chat_id, false).await;
true
},
DaemonEvent::Usage { .. }
| DaemonEvent::SessionSaved
| DaemonEvent::CapsuleLoaded { .. }
| DaemonEvent::CapsuleFailed { .. }
| DaemonEvent::CapsuleUnloaded { .. } => false,
}
}
async fn handle_text(chunk: &str, state: &mut TurnState, bot: &Bot, chat_id: ChatId) {
state.text_buffer.push_str(chunk);
if state.last_edit.elapsed() >= EDIT_THROTTLE && !state.text_buffer.is_empty() {
let html = md_to_telegram_html(&state.text_buffer);
let display = truncate_for_edit(&html);
if state.finalized_text {
match bot
.send_message(chat_id, &display)
.parse_mode(ParseMode::Html)
.await
{
Ok(msg) => {
state.current_msg_id = msg.id;
state.finalized_text = false;
state.last_edit = Instant::now();
},
Err(e) => warn!("Failed to send message: {e}"),
}
} else {
let result = bot
.edit_message_text(chat_id, state.current_msg_id, &display)
.parse_mode(ParseMode::Html)
.await;
if result.is_ok() {
state.last_edit = Instant::now();
}
}
}
}
async fn handle_tool_start(name: &str, state: &mut TurnState, bot: &Bot, chat_id: ChatId) {
if !state.text_buffer.is_empty() && !state.finalized_text {
finalize_text(bot, chat_id, &mut state.current_msg_id, &state.text_buffer).await;
}
let tool_msg = format!(
"Running tool: <b>{}</b>...",
crate::format::html_escape(name),
);
match bot
.send_message(chat_id, &tool_msg)
.parse_mode(ParseMode::Html)
.await
{
Ok(msg) => state.current_msg_id = msg.id,
Err(e) => warn!("Failed to send tool message: {e}"),
}
state.text_buffer.clear();
state.finalized_text = false;
}
async fn handle_tool_result(
result: &str,
is_error: bool,
state: &mut TurnState,
bot: &Bot,
chat_id: ChatId,
) {
let status = if is_error { "Error" } else { "Done" };
let preview = truncate_preview(result, 200);
let html = format!(
"<b>{status}</b>\n<pre>{}</pre>",
crate::format::html_escape(&preview),
);
let _ = bot
.edit_message_text(chat_id, state.current_msg_id, &html)
.parse_mode(ParseMode::Html)
.await;
state.finalized_text = true;
}
async fn flush_text(state: &mut TurnState, bot: &Bot, chat_id: ChatId) {
if !state.text_buffer.is_empty() && !state.finalized_text {
finalize_text(bot, chat_id, &mut state.current_msg_id, &state.text_buffer).await;
state.text_buffer.clear();
state.finalized_text = true;
}
}
async fn finalize_text(
bot: &Bot,
chat_id: ChatId,
current_msg_id: &mut teloxide::types::MessageId,
text: &str,
) {
let html = md_to_telegram_html(text);
let chunks = chunk_html(&html, 4000);
if let Some((first, rest)) = chunks.split_first() {
let _ = bot
.edit_message_text(chat_id, *current_msg_id, first)
.parse_mode(ParseMode::Html)
.await;
for chunk in rest {
match bot
.send_message(chat_id, chunk)
.parse_mode(ParseMode::Html)
.await
{
Ok(msg) => *current_msg_id = msg.id,
Err(e) => warn!("Failed to send continuation message: {e}"),
}
}
}
}
async fn next_event(sub: &mut Subscription<DaemonEvent>) -> Option<DaemonEvent> {
match sub.next().await {
Some(Ok(event)) => Some(event),
Some(Err(e)) => {
warn!("Subscription error: {e}");
None
},
None => None,
}
}
pub(crate) fn truncate_for_edit(html: &str) -> String {
const MAX_HTML_LEN: usize = 4000;
const TRUNCATED_TARGET: usize = 3940;
if html.len() <= MAX_HTML_LEN {
html.to_string()
} else {
let boundary = crate::format::find_safe_html_boundary(html, TRUNCATED_TARGET);
let truncated = &html[..boundary];
let mut s = crate::format::close_open_tags(truncated);
s.push_str("...");
s
}
}
pub(crate) fn truncate_preview(s: &str, max: usize) -> String {
if s.len() <= max {
s.to_string()
} else {
let mut t = astrid_core::truncate_to_boundary(s, max).to_string();
t.push_str("...");
t
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn truncate_for_edit_short_text() {
let text = "Hello world";
assert_eq!(truncate_for_edit(text), text);
}
#[test]
fn truncate_for_edit_long_text() {
let text = "x".repeat(5000);
let result = truncate_for_edit(&text);
assert_eq!(result.len(), 3943); assert!(result.ends_with("..."));
}
#[test]
fn truncate_for_edit_closes_open_tags() {
let padding = "x".repeat(3985);
let html = format!("<b>{padding}more bold text</b>");
assert!(html.len() > 4000);
let result = truncate_for_edit(&html);
assert!(result.ends_with("...</b>") || result.ends_with("</b>..."));
assert!(result.contains("</b>"));
}
#[test]
fn truncate_for_edit_multibyte_safe() {
let text = "๐".repeat(1500); let result = truncate_for_edit(&text);
assert!(result.ends_with("..."));
assert!(result.len() <= 3983);
}
#[test]
fn truncate_preview_multibyte_safe() {
let text = "ๆฅๆฌ่ชใในใ".repeat(100);
let result = truncate_preview(&text, 50);
assert!(result.ends_with("..."));
}
#[test]
fn truncate_for_edit_avoids_mid_tag() {
let padding = "x".repeat(3995);
let html = format!("{padding}<b>bold</b>yyy");
assert!(html.len() > 4000);
let result = truncate_for_edit(&html);
assert!(result.ends_with("..."));
assert!(
!result.trim_end_matches("...").ends_with('<'),
"truncated inside tag: {result}"
);
}
#[test]
fn truncate_for_edit_avoids_mid_entity() {
let padding = "x".repeat(3996);
let html = format!("{padding}& more text");
assert!(html.len() > 4000);
let result = truncate_for_edit(&html);
assert!(result.ends_with("..."));
let truncated = result.trim_end_matches("...");
assert!(
!truncated.ends_with('&'),
"truncated inside entity: {result}"
);
}
#[test]
fn truncate_preview_short() {
assert_eq!(truncate_preview("hello", 10), "hello");
}
#[test]
fn truncate_preview_long() {
let result = truncate_preview("hello world", 5);
assert_eq!(result, "hello...");
}
}