use super::DiscordState;
use crate::brain::agent::AgentService;
use crate::config::{Config, RespondTo};
use crate::db::ChannelMessageRepository;
use crate::db::models::ChannelMessage as DbChannelMessage;
use crate::services::SessionService;
use crate::utils::sanitize::redact_secrets;
use crate::utils::truncate_str;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::Mutex;
use uuid::Uuid;
use serenity::builder::{CreateAttachment, CreateMessage};
use serenity::model::channel::Message;
use serenity::prelude::*;
pub fn split_message(text: &str, max_len: usize) -> Vec<&str> {
if text.len() <= max_len {
return vec![text];
}
let mut chunks = Vec::new();
let mut start = 0;
while start < text.len() {
let mut end = (start + max_len).min(text.len());
while end < text.len() && !text.is_char_boundary(end) {
end -= 1;
}
let break_at = if end < text.len() {
text[start..end]
.rfind('\n')
.filter(|&pos| pos > end - start - 200)
.map(|pos| start + pos + 1)
.unwrap_or(end)
} else {
end
};
chunks.push(&text[start..break_at]);
start = break_at;
}
chunks
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn handle_message(
ctx: &Context,
msg: &Message,
agent: Arc<AgentService>,
session_svc: SessionService,
shared_session: Arc<Mutex<Option<Uuid>>>,
discord_state: Arc<DiscordState>,
config_rx: tokio::sync::watch::Receiver<Config>,
channel_msg_repo: ChannelMessageRepository,
) {
let cfg = config_rx.borrow().clone();
let dc_cfg = &cfg.channels.discord;
let allowed: HashSet<i64> = dc_cfg
.allowed_users
.iter()
.filter_map(|s| s.parse().ok())
.collect();
let respond_to = &dc_cfg.respond_to;
let allowed_channels: HashSet<String> = dc_cfg.allowed_channels.iter().cloned().collect();
let idle_timeout_hours = dc_cfg.session_idle_hours;
let voice_config = cfg.voice_config();
let user_id = msg.author.id.get() as i64;
let store_channel_msg = |text: String| {
let repo = channel_msg_repo.clone();
let channel_chat_id = msg.channel_id.get().to_string();
let guild_name = msg
.guild_id
.map(|g| g.get().to_string())
.unwrap_or_else(|| "DM".to_string());
let sender_id = msg.author.id.get().to_string();
let sender_name = msg.author.name.clone();
let msg_id = msg.id.get().to_string();
async move {
if text.is_empty() {
return;
}
let cm = DbChannelMessage::new(
"discord".into(),
channel_chat_id,
Some(guild_name),
sender_id,
sender_name,
text,
"text".into(),
Some(msg_id),
);
if let Err(e) = repo.insert(&cm).await {
tracing::warn!("Failed to store Discord channel message: {e}");
}
}
};
if !allowed.is_empty() && !allowed.contains(&user_id) {
tracing::debug!(
"Discord: ignoring message from non-allowed user {}",
user_id
);
return;
}
let is_dm = msg.guild_id.is_none();
if !is_dm {
let channel_str = msg.channel_id.get().to_string();
if !allowed_channels.is_empty() && !allowed_channels.contains(&channel_str) {
tracing::debug!(
"Discord: ignoring message in non-allowed channel {}",
channel_str
);
store_channel_msg(msg.content.clone()).await;
return;
}
match respond_to {
RespondTo::DmOnly => {
tracing::debug!("Discord: respond_to=dm_only, ignoring channel message");
store_channel_msg(msg.content.clone()).await;
return;
}
RespondTo::Mention => {
let bot_id = discord_state.bot_user_id().await;
let mentioned =
bot_id.is_some_and(|bid| msg.mentions.iter().any(|u| u.id.get() == bid));
if !mentioned {
tracing::debug!("Discord: respond_to=mention, bot not mentioned — ignoring");
store_channel_msg(msg.content.clone()).await;
return;
}
}
RespondTo::All => {} }
}
if !is_dm {
store_channel_msg(msg.content.clone()).await;
}
let audio_attachment = msg.attachments.iter().find(|a| {
a.content_type
.as_ref()
.is_some_and(|ct| ct.starts_with("audio/"))
});
let mut is_voice = false;
let mut content = msg.content.clone();
if audio_attachment.is_some() && voice_config.stt_enabled {
let _ = msg.channel_id.broadcast_typing(&ctx.http).await;
}
if let Some(audio) = audio_attachment
&& voice_config.stt_enabled
&& let Ok(resp) = reqwest::get(&audio.url).await
&& let Ok(bytes) = resp.bytes().await
{
match crate::channels::voice::transcribe(bytes.to_vec(), &voice_config).await {
Ok(transcript) => {
tracing::info!(
"Discord: transcribed voice: {}",
truncate_str(&transcript, 80)
);
content = transcript;
is_voice = true;
}
Err(e) => tracing::error!("Discord: STT error: {e}"),
}
}
if !is_dm
&& respond_to == &RespondTo::Mention
&& let Some(bot_id) = discord_state.bot_user_id().await
{
let mention_tag = format!("<@{}>", bot_id);
content = content.replace(&mention_tag, "").trim().to_string();
}
if content.is_empty() && msg.attachments.is_empty() {
return;
}
if !is_voice {
use crate::utils::{inject_file_content, process_file_with_vision};
for attachment in &msg.attachments {
let mime = attachment.content_type.as_deref().unwrap_or("");
let fname = &attachment.filename;
if mime.starts_with("image/") {
if content.is_empty() {
content = "Describe this image.".to_string();
}
content.push_str(&format!(" <<IMG:{}>>", attachment.url));
} else if !mime.starts_with("audio/")
&& let Ok(resp) = reqwest::get(attachment.url.as_str()).await
&& let Ok(bytes) = resp.bytes().await
{
let cfg = config_rx.borrow();
let fc = process_file_with_vision(&bytes, mime, fname, &cfg);
let injected = inject_file_content(&fc).0;
if !injected.is_empty() {
content.push_str(&format!("\n\n{injected}"));
}
}
}
}
if content.is_empty() {
return;
}
let text_preview = truncate_str(&content, 50);
tracing::info!(
"Discord: message from {} ({}): {}",
msg.author.name,
user_id,
text_preview
);
let is_owner = allowed.is_empty()
|| allowed
.iter()
.next()
.map(|&a| a == user_id)
.unwrap_or(false);
if is_owner {
discord_state.set_owner_channel(msg.channel_id.get()).await;
}
if let Some(guild_id) = msg.guild_id {
discord_state.set_guild_id(guild_id.get()).await;
}
let session_id = {
let session_title = if is_dm {
format!("Discord: DM {} ({})", msg.author.name, msg.author.id.get())
} else {
format!("Discord: #{}", msg.channel_id.get())
};
let existing = session_svc
.find_session_by_title(&session_title)
.await
.ok()
.flatten();
if let Some(session) = existing {
if idle_timeout_hours.is_some_and(|h| {
let elapsed = (chrono::Utc::now() - session.updated_at).num_seconds();
elapsed > (h * 3600.0) as i64
}) {
if let Err(e) = session_svc.archive_session(session.id).await {
tracing::error!("Discord: failed to archive session {}: {}", session.id, e);
}
match crate::channels::session_init::create_channel_session(
&session_svc,
Some(session_title),
)
.await
{
Ok(new_session) => new_session.id,
Err(e) => {
tracing::error!("Discord: failed to create session: {}", e);
return;
}
}
} else {
session.id
}
} else {
match crate::channels::session_init::create_channel_session(
&session_svc,
Some(session_title),
)
.await
{
Ok(session) => {
tracing::info!(
"Discord: created new channel session {} for #{}",
session.id,
msg.channel_id.get()
);
session.id
}
Err(e) => {
tracing::error!("Discord: failed to create session: {}", e);
return;
}
}
}
};
let session_meta = session_svc.get_session(session_id).await.ok().flatten();
crate::channels::commands::sync_provider_for_session(
&agent,
session_id,
session_meta
.as_ref()
.and_then(|s| s.provider_name.as_deref()),
session_meta.as_ref().and_then(|s| s.model.as_deref()),
)
.await;
{
use crate::channels::commands::{self, ChannelCommand};
let cmd = commands::handle_command(&content, session_id, &agent, &session_svc).await;
if let Some(reply) = commands::try_execute_text_command(&cmd).await {
let _ = msg.channel_id.say(&ctx.http, &reply).await;
return;
}
match cmd {
ChannelCommand::Models(resp) => {
use serenity::builder::{CreateActionRow, CreateButton, CreateMessage};
use serenity::model::application::ButtonStyle;
let rows: Vec<CreateActionRow> = resp
.providers
.chunks(5)
.take(5)
.map(|chunk| {
CreateActionRow::Buttons(
chunk
.iter()
.map(|(name, label)| {
let display = if *name == resp.current_provider {
format!("✓ {}", label)
} else {
label.clone()
};
let display = if display.len() > 80 {
format!("{}…", display.chars().take(79).collect::<String>())
} else {
display
};
CreateButton::new(format!("provider:{}", name))
.label(display)
.style(ButtonStyle::Secondary)
})
.collect(),
)
})
.collect();
let builder = CreateMessage::new().content(&resp.text).components(rows);
let _ = msg.channel_id.send_message(&ctx.http, builder).await;
return;
}
ChannelCommand::NewSession => {
let session_title = if is_dm {
format!("Discord: DM {} ({})", msg.author.name, msg.author.id.get())
} else {
format!("Discord: #{}", msg.channel_id.get())
};
if !is_owner
&& let Ok(Some(old)) = session_svc.find_session_by_title(&session_title).await
&& let Err(e) = session_svc.archive_session(old.id).await
{
tracing::error!("Discord: failed to archive old session {}: {}", old.id, e);
}
match crate::channels::session_init::create_channel_session(
&session_svc,
Some(session_title),
)
.await
{
Ok(new_session) => {
if is_owner && is_dm {
*shared_session.lock().await = Some(new_session.id);
}
discord_state
.register_session_channel(new_session.id, msg.channel_id.get())
.await;
let _ = msg
.channel_id
.say(&ctx.http, "✅ New session started.")
.await;
}
Err(e) => {
tracing::error!("Discord: failed to create session: {}", e);
let _ = msg
.channel_id
.say(&ctx.http, "Failed to create session.")
.await;
}
}
return;
}
ChannelCommand::Sessions(resp) => {
use serenity::builder::{CreateActionRow, CreateButton, CreateMessage};
use serenity::model::application::ButtonStyle;
let rows: Vec<CreateActionRow> = resp
.sessions
.chunks(5)
.take(5)
.map(|chunk| {
CreateActionRow::Buttons(
chunk
.iter()
.map(|(id, label)| {
let display = if *id == resp.current_session_id {
format!("✓ {}", label)
} else {
label.clone()
};
let display = if display.len() > 80 {
format!("{}…", display.chars().take(79).collect::<String>())
} else {
display
};
CreateButton::new(format!("session:{}", id))
.label(display)
.style(ButtonStyle::Secondary)
})
.collect(),
)
})
.collect();
let builder = CreateMessage::new().content(&resp.text).components(rows);
let _ = msg.channel_id.send_message(&ctx.http, builder).await;
return;
}
ChannelCommand::Stop => {
let cancelled = discord_state.cancel_session(session_id).await;
let reply = if cancelled {
"Operation cancelled."
} else {
"No operation in progress."
};
let _ = msg.channel_id.say(&ctx.http, reply).await;
return;
}
ChannelCommand::Compact => {
let _ = msg
.channel_id
.say(&ctx.http, "⏳ Compacting context...")
.await;
content =
"[SYSTEM: Compact context now. Summarize this conversation for continuity.]"
.to_string();
}
ChannelCommand::UserPrompt(prompt) => {
content = prompt;
}
ChannelCommand::NotACommand => {}
_ => {}
}
}
let reply_context = msg.referenced_message.as_ref().and_then(|reply| {
let reply_text = reply.content.trim();
if reply_text.is_empty() {
return None;
}
let reply_sender = if reply.author.bot {
"assistant".to_string()
} else {
reply.author.name.clone()
};
Some(format!("[Replying to {reply_sender}: \"{reply_text}\"]"))
});
let display_text = if is_owner && msg.guild_id.is_none() {
content.clone()
} else {
format!("{}: {}", msg.author.name, content)
};
let agent_input = if !is_owner {
let name = &msg.author.name;
let uid = msg.author.id.get();
if msg.guild_id.is_some() {
let channel = msg.channel_id.get();
format!("[Discord message from {name} (ID {uid}) in channel {channel}]\n{content}")
} else {
format!("[Discord DM from {name} (ID {uid})]\n{content}")
}
} else {
content
};
let agent_input = if let Some(ref ctx) = reply_context {
format!("{ctx}\n{agent_input}")
} else {
agent_input
};
let agent_input = if msg.guild_id.is_some() {
let chat_id_str = msg.channel_id.get().to_string();
match channel_msg_repo
.recent(Some("discord"), &chat_id_str, 30)
.await
{
Ok(messages) if !messages.is_empty() => {
let history: Vec<String> = messages
.iter()
.rev()
.map(|m| {
let ts = m.created_at.format("%H:%M");
format!("[{}] {}: {}", ts, m.sender_name, m.content)
})
.collect();
format!(
"[Recent channel history ({} messages):\n{}\n--- end history ---]\n{}",
history.len(),
history.join("\n"),
agent_input
)
}
_ => agent_input,
}
} else {
agent_input
};
let agent_input = format!(
"[Channel: Discord — your text response is automatically sent to this channel. \
Do NOT call discord_send to deliver your answer. Only use discord_send for: \
sending to a different channel, embeds, reactions, threads, files, or moderation.]\n{agent_input}"
);
discord_state
.register_session_channel(session_id, msg.channel_id.get())
.await;
let approval_cb = make_approval_callback(discord_state.clone());
let cancel_token = tokio_util::sync::CancellationToken::new();
discord_state
.store_cancel_token(session_id, cancel_token.clone())
.await;
let progress_cb: crate::brain::agent::ProgressCallback = {
use crate::brain::agent::ProgressEvent;
use serenity::builder::EditMessage;
use serenity::model::id::MessageId;
struct ToolEntry {
msg_id: Option<MessageId>,
name: String,
context: String,
}
let tools: Arc<Mutex<Vec<ToolEntry>>> = Arc::new(Mutex::new(Vec::new()));
let http = ctx.http.clone();
let channel = msg.channel_id;
Arc::new(move |_session_id, event| {
let tools = tools.clone();
let http = http.clone();
match event {
ProgressEvent::Compacting => {
let http = http.clone();
tokio::spawn(async move {
for _ in 0..12 {
let _ = channel.broadcast_typing(&http).await;
tokio::time::sleep(std::time::Duration::from_secs(8)).await;
}
});
}
ProgressEvent::ToolStarted {
tool_name,
tool_input,
} => {
let ctx_hint = crate::utils::tool_context_hint(&tool_name, &tool_input);
tokio::spawn(async move {
let text = format!("⚙️ **{}**{}", tool_name, ctx_hint);
if let Ok(sent) = channel.say(&http, &text).await {
let mut t = tools.lock().await;
t.push(ToolEntry {
msg_id: Some(sent.id),
name: tool_name,
context: ctx_hint,
});
}
});
}
ProgressEvent::ToolCompleted {
tool_name, success, ..
} => {
tokio::spawn(async move {
let mut t = tools.lock().await;
if let Some(entry) = t
.iter_mut()
.rev()
.find(|e| e.name == tool_name && e.msg_id.is_some())
{
let icon = if success { "✅" } else { "❌" };
let text = format!("{} **{}**{}", icon, entry.name, entry.context);
if let Some(mid) = entry.msg_id.take() {
let _ = channel
.edit_message(&http, mid, EditMessage::new().content(text))
.await;
}
}
});
}
ProgressEvent::SelfHealingAlert { message } => {
tokio::spawn(async move {
let text = format!("🔧 {}", message);
let _ = channel.say(&http, &text).await;
});
}
_ => {}
}
})
};
let discord_chat_id = msg.channel_id.get().to_string();
let question_cb = super::follow_up_question::make_question_callback(discord_state.clone());
let result = agent
.send_message_with_tools_and_display(
session_id,
agent_input,
Some(display_text),
None,
Some(cancel_token),
Some(approval_cb),
Some(progress_cb),
Some(question_cb),
"discord",
Some(&discord_chat_id),
)
.await;
discord_state.remove_cancel_token(session_id).await;
match result {
Ok(response) => {
let (text_only, img_paths) = crate::utils::extract_img_markers(&response.content);
let text_only = crate::utils::sanitize::strip_llm_artifacts(&text_only);
let text_only = redact_secrets(&text_only);
let text_only = if !text_only.trim().is_empty() {
let ctx_max = agent.context_limit_for_session(session_id);
let footer = crate::utils::format_ctx_footer(response.context_tokens, ctx_max);
format!("{}\n{}", text_only, footer)
} else {
text_only
};
for img_path in img_paths {
match tokio::fs::read(&img_path).await {
Ok(bytes) => {
let fname = std::path::Path::new(&img_path)
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("image.png")
.to_string();
let file = CreateAttachment::bytes(bytes.as_slice(), fname);
if let Err(e) = msg
.channel_id
.send_message(&ctx.http, CreateMessage::new().add_file(file))
.await
{
tracing::error!("Discord: failed to send generated image: {}", e);
}
}
Err(e) => {
tracing::error!("Discord: failed to read image {}: {}", img_path, e);
}
}
}
for chunk in split_message(&text_only, 2000) {
if let Err(e) = msg.channel_id.say(&ctx.http, chunk).await {
tracing::error!("Discord: failed to send reply: {}", e);
}
}
if !is_dm && !text_only.trim().is_empty() {
let bot_id = discord_state.bot_user_id().await;
let bot_sender_id = bot_id
.map(|id| id.to_string())
.unwrap_or_else(|| "bot:opencrabs".to_string());
let guild_name = msg
.guild_id
.map(|g| g.get().to_string())
.unwrap_or_else(|| "DM".to_string());
let cm = DbChannelMessage::new(
"discord".into(),
msg.channel_id.get().to_string(),
Some(guild_name),
bot_sender_id,
"OpenCrabs".into(),
text_only.clone(),
"text".into(),
None,
);
if let Err(e) = channel_msg_repo.insert(&cm).await {
tracing::warn!(
"Discord: failed to record bot reply in channel_messages: {}",
e
);
}
}
if is_voice && voice_config.tts_enabled {
match crate::channels::voice::synthesize(&response.content, &voice_config).await {
Ok(audio_bytes) => {
let file = CreateAttachment::bytes(audio_bytes.as_slice(), "response.ogg");
if let Err(e) = msg
.channel_id
.send_message(&ctx.http, CreateMessage::new().add_file(file))
.await
{
tracing::error!("Discord: failed to send TTS voice: {e}");
}
}
Err(e) => tracing::error!("Discord: TTS error: {e}"),
}
}
}
Err(ref e) if matches!(e, crate::brain::agent::AgentError::Cancelled) => {
tracing::info!("Discord: agent call cancelled for session {}", session_id);
}
Err(e) => {
tracing::error!("Discord: agent error: {}", e);
let error_msg = format!("Error: {}", e);
let _ = msg.channel_id.say(&ctx.http, error_msg).await;
}
}
}
pub(crate) fn make_approval_callback(
state: Arc<super::DiscordState>,
) -> crate::brain::agent::ApprovalCallback {
use crate::brain::agent::ToolApprovalInfo;
use crate::utils::{check_approval_policy, persist_auto_session_policy};
use serenity::builder::{CreateActionRow, CreateButton, CreateMessage, EditMessage};
use serenity::model::application::ButtonStyle;
use serenity::model::id::ChannelId;
use tokio::sync::oneshot;
Arc::new(move |info: ToolApprovalInfo| {
let state = state.clone();
Box::pin(async move {
if let Some(result) = check_approval_policy() {
return Ok(result);
}
let http = match state.http().await {
Some(h) => h,
None => {
tracing::warn!("Discord approval: bot not connected");
return Ok((false, false));
}
};
let channel_id = match state.session_channel(info.session_id).await {
Some(id) => id,
None => match state.owner_channel_id().await {
Some(id) => id,
None => {
tracing::warn!(
"Discord approval: no channel_id for session {}",
info.session_id
);
return Ok((false, false));
}
},
};
let approval_id = uuid::Uuid::new_v4().to_string();
let safe_input = crate::utils::redact_tool_input(&info.tool_input);
let input_pretty = serde_json::to_string_pretty(&safe_input)
.unwrap_or_else(|_| safe_input.to_string());
let text = format!(
"🔐 **Tool Approval Required**\n\nTool: `{}`\nInput:\n```json\n{}\n```",
info.tool_name,
truncate_str(&input_pretty, 1800),
);
let row = CreateActionRow::Buttons(vec![
CreateButton::new(format!("approve:{}", approval_id))
.label("✅ Yes")
.style(ButtonStyle::Success),
CreateButton::new(format!("always:{}", approval_id))
.label("🔁 Always (session)")
.style(ButtonStyle::Primary),
CreateButton::new(format!("yolo:{}", approval_id))
.label("🔥 YOLO")
.style(ButtonStyle::Secondary),
CreateButton::new(format!("deny:{}", approval_id))
.label("❌ No")
.style(ButtonStyle::Danger),
]);
let (tx, rx) = oneshot::channel();
state
.register_pending_approval(approval_id.clone(), tx)
.await;
tracing::info!(
"Discord approval: registered pending id={}, sending to channel={}",
approval_id,
channel_id
);
let mut sent_msg = match ChannelId::new(channel_id)
.send_message(
&http,
CreateMessage::new().content(&text).components(vec![row]),
)
.await
{
Ok(m) => m,
Err(e) => {
tracing::error!("Discord approval: failed to send message: {}", e);
return Ok((false, false));
}
};
tracing::info!(
"Discord approval: message sent, waiting for response (id={})",
approval_id
);
match tokio::time::timeout(std::time::Duration::from_secs(300), rx).await {
Ok(Ok((approved, always))) => {
tracing::info!(
"Discord approval: user responded id={}, approved={}, always={}",
approval_id,
approved,
always
);
if always {
persist_auto_session_policy();
}
let label = if always {
"🔁 Always approved (session)"
} else if approved {
"✅ Approved"
} else {
"❌ Denied"
};
let _ = sent_msg
.edit(&http, EditMessage::new().content(label).components(vec![]))
.await;
Ok((approved, always))
}
Ok(Err(_)) => {
tracing::warn!(
"Discord approval: oneshot channel closed (id={})",
approval_id
);
Ok((false, false))
}
Err(_) => {
tracing::warn!(
"Discord approval: 5-minute timeout — auto-denying (id={})",
approval_id
);
let _ = sent_msg
.edit(
&http,
EditMessage::new()
.content("⏱️ Approval timed out — denied")
.components(vec![]),
)
.await;
Ok((false, false))
}
}
})
})
}