use std::collections::VecDeque;
use futures_util::StreamExt;
use maud::html;
use wasm_bindgen::JsValue;
use crate::turn_flow::{
classify_empty, classify_turn, empty_message, EmptyKind, TurnOutcome,
MAX_AUTO_CONTINUATIONS,
};
use crate::{Agent, StreamChunk};
use super::dom;
use super::templates;
use super::APP;
mod access;
mod confirm_guard;
mod dedup;
mod prompt;
mod session;
mod stage;
mod tools;
pub(crate) use access::{
credit_address_existing, credit_signer, ensure_credit_meter, escrow_bridge_wei,
};
#[allow(unused_imports)]
pub(crate) use access::model_access_is_credits;
pub(crate) use session::start_session;
use access::{collect_payment_if_required, resolve_credit_access, short_hash};
thread_local! {
static TURN_ACTIVE: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
static TURN_CANCEL: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
static PENDING_CLEAR: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
static PENDING_COMPACT: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
}
pub(crate) fn set_pending_clear() {
PENDING_CLEAR.with(|c| c.set(true));
}
pub(crate) fn set_pending_compact() {
PENDING_COMPACT.with(|c| c.set(true));
}
pub(crate) fn request_stop_turn() {
TURN_CANCEL.with(|c| c.set(true));
if let Some(agent) = APP.with(|cell| cell.borrow().agent.clone()) {
agent.cancel_turn();
}
}
pub(crate) fn turn_cancelled() -> bool {
TURN_CANCEL.with(|c| c.get())
}
struct TurnGuard;
impl Drop for TurnGuard {
fn drop(&mut self) {
TURN_ACTIVE.with(|c| c.set(false));
TURN_CANCEL.with(|c| c.set(false));
stage::end();
if dom::by_id("terminal-stop").is_some() {
dom::swap_outer("terminal-stop", &templates::send_button().into_string());
}
}
}
pub(crate) async fn run_send() {
let Some(prompt_area) = dom::textarea_by_id("prompt") else {
dom::set_status("internal: #prompt textarea missing", true);
return;
};
let prompt = prompt_area.value().trim().to_string();
if prompt.is_empty() {
return;
}
let access = match resolve_credit_access().await {
Some(a) => a,
None => {
super::show_api_key_modal();
return;
}
};
if access.base_url.is_some() {
ensure_credit_meter().await;
}
let key = access.cfg_auth;
if TURN_ACTIVE.with(|c| c.get()) {
return;
}
TURN_ACTIVE.with(|c| c.set(true));
TURN_CANCEL.with(|c| c.set(false));
let _turn_guard = TurnGuard;
dom::set_status("", false);
let (user_turn_id, assistant_turn_id, first_seg_id) = APP.with(|cell| {
let mut app = cell.borrow_mut();
(app.alloc_id(), app.alloc_id(), app.alloc_id())
});
dom::append_html(
"transcript",
&templates::turn(user_turn_id, "user", html! { (prompt) }, false).into_string(),
);
dom::append_html(
"transcript",
&templates::turn(
assistant_turn_id,
"assistant",
html! {
(templates::stage_container(assistant_turn_id))
(templates::text_segment(first_seg_id, ""))
},
true,
)
.into_string(),
);
dom::scroll_to_bottom("transcript");
stage::begin(assistant_turn_id);
prompt_area.set_value("");
let _ = prompt_area.style().set_property("height", "auto");
let _ = prompt_area.focus();
dom::swap_outer("terminal-send", &templates::stop_button().into_string());
match collect_payment_if_required().await {
Ok(None) => {} Ok(Some(tx_hash)) => {
dom::set_status(
&format!("payment received ({}); sending…", short_hash(&tx_hash)),
false,
);
}
Err(err) => {
fail_pending_turn(assistant_turn_id, &format!("payment failed: {err}"));
dom::set_status("payment failed — see the message above", true);
return;
}
}
if access.base_url.is_none() {
if let Ok(Some(storage)) = dom::session_storage() {
let _ = storage.set_item("gemini_api_key", &key);
}
}
let session_needs_start = APP.with(|cell| {
let app = cell.borrow();
app.agent.is_none() || app.session_key.as_deref() != Some(access.identity.as_str())
});
if session_needs_start {
stage::enter(crate::turn_stage::Stage::Starting);
if let Err(err) = start_session(&key, access.base_url.clone(), &access.identity).await {
fail_pending_turn(assistant_turn_id, &format!("session start failed: {err:?}"));
dom::set_status("session start failed — see the message above", true);
return;
}
}
let Some(agent) = APP.with(|cell| cell.borrow().agent.clone()) else {
fail_pending_turn(assistant_turn_id, "internal: agent not set after start_session");
dom::set_status("internal: agent not set after start_session", true);
return;
};
confirm_guard::note_user_message(&prompt);
let mut next_input = TurnInput::User(prompt);
let mut auto_continuations: u32 = 0;
let mut preallocated = Some((assistant_turn_id, first_seg_id));
dedup::reset_run();
loop {
if TURN_CANCEL.with(|c| c.get()) {
break;
}
apply_difficulty_route(&agent, &next_input);
let outcome = stream_turn(&agent, next_input, preallocated.take()).await;
super::history::save_from_agent().await;
super::opfs::refresh().await;
if PENDING_CLEAR.with(|c| c.replace(false)) {
PENDING_COMPACT.with(|c| c.set(false));
agent.clear_history(); super::history::clear_persisted().await; dom::swap_inner("transcript", ""); break; }
if PENDING_COMPACT.with(|c| c.replace(false)) && agent.compact().await {
let entries = agent.transcript();
dom::swap_inner("transcript", "");
super::history::paint_entries(&entries);
super::history::save_from_agent().await;
}
match outcome {
TurnOutcome::Finished
| TurnOutcome::FinalAnswer
| TurnOutcome::Empty
| TurnOutcome::Error
| TurnOutcome::Cancelled => break,
TurnOutcome::Incomplete | TurnOutcome::EmptyTruncated => {
if auto_continuations >= MAX_AUTO_CONTINUATIONS {
let note_id = APP.with(|cell| cell.borrow_mut().alloc_id());
dom::append_html(
"transcript",
&templates::turn(
note_id,
"assistant",
templates::text_segment(
note_id,
"(paused — reached the auto-continue limit for this \
message. Send another message to keep going.)",
),
false,
)
.into_string(),
);
dom::scroll_to_bottom("transcript");
break;
}
auto_continuations += 1;
next_input = if matches!(outcome, TurnOutcome::EmptyTruncated) {
TurnInput::ResumeTruncated
} else {
TurnInput::AutoContinue
};
}
}
}
if let Some((turn_id, _)) = preallocated {
mark_turn_done(turn_id);
}
if dom::by_id("terminal-stop").is_some() {
dom::swap_outer("terminal-stop", &templates::send_button().into_string());
}
}
const GEMINI_MAX_OUTPUT_TOKENS: u32 = 32_768;
const ANTHROPIC_MAX_OUTPUT_TOKENS: u32 = 16_384;
pub(crate) const TRUNCATED_RETRY_NUDGE: &str = "Your previous response was cut off before \
you finished (it hit the output limit). Continue and finish your answer now, \
concisely. If the task is large, break it into smaller steps and take just the \
next one.";
pub(crate) const AUTO_CONTINUE_NUDGE: &str = "Continue toward the user's goal. First review \
what you already did above — NEVER repeat an action that already succeeded (no duplicate \
notifications, transfers, posts, or feedback). If the task is fully complete, call the \
`finish` tool. If you're blocked or need a decision, ask the user a question. Otherwise \
take the next step now without waiting.";
pub(crate) fn is_internal_nudge(text: &str) -> bool {
text == AUTO_CONTINUE_NUDGE || text == TRUNCATED_RETRY_NUDGE
}
fn apply_difficulty_route(agent: &Agent, input: &TurnInput) {
let (ceiling, session_model) =
APP.with(|cell| {
let app = cell.borrow();
(app.session_thinking_ceiling, app.session_model.clone())
});
let Some(ceiling) = ceiling else {
agent.set_thinking_override(None);
agent.set_model_override(None);
return;
};
let (prompt, last_turn_used_tools) = match input {
TurnInput::User(p) => (p.as_str(), false),
TurnInput::AutoContinue | TurnInput::ResumeTruncated => ("", true),
};
let tier = crate::difficulty::classify_turn(prompt, last_turn_used_tools);
let desired = crate::difficulty::route_tier(tier).thinking;
let applied = crate::difficulty::clamp_thinking(desired, ceiling);
agent.set_thinking_override(Some(applied));
let model = session_model
.as_deref()
.and_then(|m| crate::difficulty::route_model(tier, m));
agent.set_model_override(model);
}
enum TurnInput {
User(String),
AutoContinue,
ResumeTruncated,
}
async fn stream_turn(agent: &Agent, input: TurnInput, pre: Option<(u32, u32)>) -> TurnOutcome {
let (prompt, render_user) = match input {
TurnInput::User(p) => (p, true),
TurnInput::AutoContinue => (AUTO_CONTINUE_NUDGE.to_string(), false),
TurnInput::ResumeTruncated => (TRUNCATED_RETRY_NUDGE.to_string(), false),
};
let (assistant_turn_id, mut seg_id) = match pre {
Some(ids) => ids,
None => {
let (user_turn_id, assistant_turn_id, seg_id) = APP.with(|cell| {
let mut app = cell.borrow_mut();
(app.alloc_id(), app.alloc_id(), app.alloc_id())
});
if render_user {
dom::append_html(
"transcript",
&templates::turn(user_turn_id, "user", html! { (prompt) }, false)
.into_string(),
);
}
dom::append_html(
"transcript",
&templates::turn(
assistant_turn_id,
"assistant",
html! {
(templates::stage_container(assistant_turn_id))
(templates::text_segment(seg_id, ""))
},
true,
)
.into_string(),
);
dom::scroll_to_bottom("transcript");
stage::begin(assistant_turn_id);
(assistant_turn_id, seg_id)
}
};
let assistant_body_id = format!("turn-body-{assistant_turn_id}");
let mut pending_tools: VecDeque<(u32, crate::types::ToolCall)> = VecDeque::new();
let mut text_segments: Vec<(u32, String)> = vec![(seg_id, String::new())];
let mut any_visible = false;
let mut saw_tool_call = false; let mut saw_finish = false; let mut saw_question = false; let mut saw_thinking = false;
let response = match agent.chat(prompt).await {
Ok(r) => r,
Err(err) => {
report_turn_error("agent.chat", &format!("{err}"), assistant_turn_id);
return TurnOutcome::Error;
}
};
let mut cursor = response.chunks();
while let Some(item) = cursor.next().await {
if TURN_CANCEL.with(|c| c.get()) {
break;
}
match item {
Ok(StreamChunk::Text { text, .. }) => {
if !text.is_empty() {
any_visible = true;
stage::enter(crate::turn_stage::Stage::Streaming);
let (cur_id, cur_text) = text_segments
.last_mut()
.expect("text_segments seeded at start of turn");
cur_text.push_str(&text);
let inner = html! { (cur_text) }.into_string();
dom::swap_inner(&format!("seg-{cur_id}"), &inner);
dom::scroll_to_bottom("transcript");
}
}
Ok(StreamChunk::ToolCall(call)) => {
any_visible = true;
stage::enter(crate::turn_stage::Stage::Tools);
if call.name == "finish" {
saw_finish = true;
} else if call.name == "ask_question" {
saw_question = true;
} else {
saw_tool_call = true;
}
let tool_seg_id = APP.with(|cell| cell.borrow_mut().alloc_id());
dom::append_html(
&assistant_body_id,
&templates::tool_call_block(tool_seg_id, &call).into_string(),
);
pending_tools.push_back((tool_seg_id, call));
seg_id = APP.with(|cell| cell.borrow_mut().alloc_id());
text_segments.push((seg_id, String::new()));
dom::append_html(
&assistant_body_id,
&templates::text_segment(seg_id, "").into_string(),
);
dom::scroll_to_bottom("transcript");
}
Ok(StreamChunk::ToolResult(result)) => {
if let Some((tool_seg_id, call)) = pending_tools.pop_front() {
let result_target = format!("tool-{tool_seg_id}-result");
dom::swap_inner(
&result_target,
&templates::tool_call_result(&result).into_string(),
);
let thumb = if call.name == "render_html" && result.error.is_none() {
super::display::snapshot_data_url()
} else {
None
};
if let Some(card) = templates::inline_result_card(
&call.name,
&call.args,
&result,
thumb.as_deref(),
) {
dom::swap_inner(
&format!("tool-{tool_seg_id}-card"),
&card.into_string(),
);
if call.name == "embed_app" && result.error.is_none() {
super::display::launch_pending_embed(&format!(
"tool-{tool_seg_id}-card"
))
.await;
}
}
dom::scroll_to_bottom("transcript");
} else {
web_sys::console::warn_1(&JsValue::from_str(
"orphaned ToolResult (no pending tool call) — dropping",
));
}
}
Ok(StreamChunk::Thought { .. }) => {
saw_thinking = true;
stage::enter(crate::turn_stage::Stage::Thinking);
}
Err(err) => {
report_turn_error("stream", &format!("{err}"), assistant_turn_id);
return TurnOutcome::Error;
}
}
}
for (id, raw) in &text_segments {
if raw.is_empty() {
continue;
}
let html_str = templates::rendered_markdown(raw).into_string();
dom::swap_inner(&format!("seg-{id}"), &html_str);
}
mark_turn_done(assistant_turn_id);
if response.finished() {
saw_finish = true;
if let Some(summary) = response.finish_summary().filter(|s| !s.is_empty()) {
dom::append_html(
&assistant_body_id,
&templates::rendered_markdown(&summary).into_string(),
);
dom::scroll_to_bottom("transcript");
any_visible = true;
}
if !any_visible {
dom::remove(&format!("turn-{assistant_turn_id}"));
}
}
let empty_kind = if !any_visible && !saw_finish && !TURN_CANCEL.with(|c| c.get()) {
let kind = classify_empty(response.finish_note().as_deref(), saw_thinking);
let body_id = format!("turn-body-{assistant_turn_id}");
if !matches!(kind, EmptyKind::Truncated) {
dom::append_html(
&body_id,
&format!(
"<div class=\"turn-error\">{}</div>",
dom::msg_span(dom::Msg::Muted, empty_message(kind))
),
);
dom::scroll_to_bottom("transcript");
} else {
dom::remove(&format!("turn-{assistant_turn_id}"));
}
Some(kind)
} else {
None
};
if TURN_CANCEL.with(|c| c.get()) {
{
let note_id = APP.with(|cell| cell.borrow_mut().alloc_id());
dom::append_html(
"transcript",
&templates::turn(
note_id,
"assistant",
templates::text_segment(note_id, "Stopped. What should I do instead?"),
false,
)
.into_string(),
);
dom::scroll_to_bottom("transcript");
}
return TurnOutcome::Cancelled;
}
APP.with(|cell| cell.borrow_mut().turn_count += 1);
let retryable_empty = matches!(empty_kind, Some(EmptyKind::Truncated));
let awaiting_confirmation = confirm_guard::take_awaiting_confirmation();
let outcome = classify_turn(
saw_finish,
saw_question,
saw_tool_call,
any_visible,
retryable_empty,
);
if awaiting_confirmation && matches!(outcome, TurnOutcome::Incomplete) {
return TurnOutcome::FinalAnswer;
}
outcome
}
fn report_turn_error(context: &str, err: &str, assistant_turn_id: u32) {
mark_turn_done(assistant_turn_id);
let lower = err.to_lowercase();
let stale_token = lower.contains("stale or future timestamp");
if !stale_token
&& !lower.contains("402")
&& !lower.contains("no $lh")
&& !lower.contains("no credit")
&& !lower.contains("cancel")
{
let agent = crate::app::tenant::current_name().unwrap_or_else(|| "apex".to_string());
let first = err.lines().next().unwrap_or(err);
let title = format!(
"turn error ({context}): {}",
first.chars().take(120).collect::<String>()
);
let signature = crate::app::telemetry::signature_for(&agent, context, err);
let body = format!("agent: {agent}\ncontext: {context}\n\nerror:\n{err}");
wasm_bindgen_futures::spawn_local(crate::app::telemetry::report(
"error".to_string(),
title,
signature,
body,
));
}
let looks_like_auth = !stale_token
&& (lower.contains("api key")
|| lower.contains("api_key")
|| lower.contains("401")
|| lower.contains("403")
|| lower.contains("permission_denied")
|| lower.contains("unauthenticated"));
let looks_like_credits = lower.contains("402")
|| lower.contains("payment required")
|| lower.contains("insufficient")
|| lower.contains("no active session")
|| lower.contains("quota")
|| lower.contains("429");
let body_id = format!("turn-body-{assistant_turn_id}");
if looks_like_credits {
web_sys::console::warn_1(&wasm_bindgen::JsValue::from_str(err));
dom::append_html(&body_id, &super::templates::out_of_credits_card().into_string());
} else {
let bubble = if stale_token {
format!(
"request auth went stale — your device clock looks off by more \
than 5 minutes; sync it and retry. Raw error: {err}"
)
} else if looks_like_auth {
format!("model rejected the API key — check your Gemini key. Raw error: {err}")
} else {
format!("{context} failed: {err}")
};
dom::append_html(
&body_id,
&format!(
"<div class=\"turn-error\">{}</div>",
dom::msg_span(dom::Msg::Error, &bubble)
),
);
}
dom::scroll_to_bottom("transcript");
if stale_token {
dom::set_status("auth token went stale — check your device clock, then retry", true);
} else if looks_like_auth {
dom::set_status("API key rejected — check your Gemini key.", true);
super::show_api_key_modal();
} else if looks_like_credits {
dom::set_status("no credits / session for this origin — see the account tab.", true);
} else {
dom::set_status("turn failed — see the message above", true);
}
}
fn fail_pending_turn(turn_id: u32, text: &str) {
mark_turn_done(turn_id);
dom::append_html(
&format!("turn-body-{turn_id}"),
&format!(
"<div class=\"turn-error\">{}</div>",
dom::msg_span(dom::Msg::Error, text)
),
);
dom::scroll_to_bottom("transcript");
}
pub(crate) const COMPACTION_THRESHOLD: u32 = 128_000;
fn mark_turn_done(turn_id: u32) {
stage::end();
let id = format!("turn-{turn_id}");
if let Some(el) = dom::by_id(&id) {
let cls = el.class_name();
let new_cls: Vec<&str> =
cls.split_whitespace().filter(|c| *c != "streaming").collect();
el.set_class_name(&new_cls.join(" "));
}
}