use super::AppState;
use super::channel_helpers::{
resolve_channel_chat_id, resolve_channel_scope, send_typing_indicator,
};
use super::pipeline::{
ChannelClaimContext, PipelineConfig, PipelineError, PipelineOutcome, PipelineRequest,
run_pipeline,
};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
pub(super) fn format_channel_reply_for_delivery(platform: &str, content: &str) -> String {
roboticus_channels::formatter::formatter_for(platform).format(content)
}
fn build_channel_claim_context(
config: &roboticus_core::RoboticusConfig,
inbound: &roboticus_channels::InboundMessage,
chat_id: &str,
) -> ChannelClaimContext {
let platform = &inbound.platform;
let (sender_in_allowlist, allowlist_configured) =
super::channel_helpers::resolve_allowlist_status(
config,
platform,
chat_id,
&inbound.sender_id,
);
ChannelClaimContext {
sender_id: inbound.sender_id.clone(),
chat_id: chat_id.to_string(),
platform: platform.clone(),
sender_in_allowlist,
allowlist_configured,
trusted_sender_ids: config.channels.trusted_sender_ids.clone(),
}
}
#[tracing::instrument(skip_all, fields(channel = %inbound.platform))]
pub async fn process_channel_message(
state: &AppState,
inbound: roboticus_channels::InboundMessage,
) -> Result<(), String> {
tracing::info!(channel = %inbound.platform, peer = %inbound.sender_id, "Processing channel message");
let chat_id = resolve_channel_chat_id(&inbound);
let platform = inbound.platform.clone();
let mut inbound = inbound;
let multimodal_parts = if state.media_service.is_some() {
enrich_multimodal(state, &mut inbound).await
} else {
vec![]
};
if inbound.content.trim().is_empty() {
return Ok(());
}
{
let config = state.config.read().await;
let agent_name = &config.agent.name;
let chain = roboticus_channels::filter::default_addressability_chain(agent_name);
if !chain.accepts(&inbound) {
tracing::debug!(chat_id = %chat_id, "addressability filter: not addressed, skipping");
return Ok(());
}
}
if inbound.content.starts_with('/')
&& let Some(reply) =
super::handle_bot_command(state, &inbound.content, Some(&inbound)).await
{
let reply = format_channel_reply_for_delivery(&platform, &reply);
state
.channel_router
.send_reply(&platform, &chat_id, reply)
.await
.inspect_err(|e| tracing::warn!(error = %e, "failed to send bot command reply"))
.ok();
return Ok(());
}
send_typing_indicator(state, &platform, &chat_id, inbound.metadata.as_ref()).await;
let typing_keepalive_stop = Arc::new(AtomicBool::new(false));
{
let keepalive_state = state.clone();
let keepalive_platform = platform.clone();
let keepalive_chat_id = chat_id.clone();
let keepalive_metadata = inbound.metadata.clone();
let keepalive_stop = Arc::clone(&typing_keepalive_stop);
tokio::spawn(async move {
loop {
tokio::time::sleep(std::time::Duration::from_secs(4)).await;
if keepalive_stop.load(Ordering::Acquire) {
break;
}
send_typing_indicator(
&keepalive_state,
&keepalive_platform,
&keepalive_chat_id,
keepalive_metadata.as_ref(),
)
.await;
}
});
}
let (scope, channel_context) = {
let config = state.config.read().await;
let scope = resolve_channel_scope(&config, &inbound, &chat_id);
let ctx = build_channel_claim_context(&config, &inbound, &chat_id);
(scope, ctx)
};
let request = PipelineRequest {
state,
config: PipelineConfig::channel(&platform),
raw_content: &inbound.content,
session_id_hint: None,
scope_hint: Some(scope),
is_correction_turn: false,
channel_context: Some(channel_context),
content_parts: if multimodal_parts.is_empty() {
None
} else {
Some(multimodal_parts)
},
};
let outcome = match run_pipeline(request).await {
Ok(o) => o,
Err(e) => {
typing_keepalive_stop.store(true, Ordering::Release);
let reply = match &e {
PipelineError::BadRequest(msg) | PipelineError::PayloadTooLarge(msg) => msg.clone(),
PipelineError::InjectionBlocked { .. } => {
"I can't process that message — it was flagged by my safety filters.".into()
}
PipelineError::DuplicateRequest => {
tracing::debug!("dropping duplicate channel message");
return Ok(());
}
PipelineError::SessionError(msg, _) => msg.clone(),
PipelineError::Internal(msg) => msg.clone(),
};
state
.channel_router
.send_reply(
&platform,
&chat_id,
format_channel_reply_for_delivery(&platform, &reply),
)
.await
.inspect_err(|err| tracing::warn!(error = %err, "failed to send error reply"))
.ok();
return Err(e.to_string());
}
};
match outcome {
PipelineOutcome::Complete { result, .. } => {
typing_keepalive_stop.store(true, Ordering::Release);
let outbound = format_channel_reply_for_delivery(&platform, &result.content);
if let Err(e) = state
.channel_router
.send_reply(&platform, &chat_id, outbound)
.await
{
return Err(e.to_string());
}
}
PipelineOutcome::SpecialistProposal { prompt, .. } => {
typing_keepalive_stop.store(true, Ordering::Release);
let formatted = format_channel_reply_for_delivery(&platform, &prompt);
state
.channel_router
.send_reply(&platform, &chat_id, formatted)
.await
.inspect_err(|e| tracing::warn!(error = %e, "failed to send specialist proposal"))
.ok();
}
PipelineOutcome::StreamReady(_) => {
typing_keepalive_stop.store(true, Ordering::Release);
tracing::error!("unexpected StreamReady outcome on channel endpoint");
return Err("unexpected streaming outcome on channel endpoint".into());
}
}
typing_keepalive_stop.store(true, Ordering::Release);
Ok(())
}
fn audio_format_from_content_type(ct: &str) -> roboticus_channels::voice::AudioFormat {
let ct_lower = ct.to_ascii_lowercase();
if ct_lower.contains("ogg") || ct_lower.contains("opus") {
roboticus_channels::voice::AudioFormat::Ogg
} else if ct_lower.contains("mp3") || ct_lower.contains("mpeg") {
roboticus_channels::voice::AudioFormat::Mp3
} else if ct_lower.contains("wav") {
roboticus_channels::voice::AudioFormat::Wav
} else if ct_lower.contains("pcm") || ct_lower.contains("raw") {
roboticus_channels::voice::AudioFormat::Pcm
} else {
roboticus_channels::voice::AudioFormat::Ogg
}
}
async fn enrich_multimodal(
state: &AppState,
inbound: &mut roboticus_channels::InboundMessage,
) -> Vec<roboticus_llm::format::ContentPart> {
let media_svc = match &state.media_service {
Some(svc) => svc,
None => return vec![],
};
let attachments: Vec<roboticus_channels::MediaAttachment> = inbound
.metadata
.as_ref()
.and_then(|m| m.get("attachments"))
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default();
if attachments.is_empty() {
return vec![];
}
let config = state.config.read().await;
let auto_transcribe = config.multimodal.auto_transcribe_audio;
let auto_describe = config.multimodal.auto_describe_images;
drop(config);
let mut enrichments: Vec<String> = Vec::new();
let mut content_parts: Vec<roboticus_llm::format::ContentPart> = Vec::new();
for att in &attachments {
let local_path = if let Some(ref url) = att.source_url {
if url.starts_with("http://") || url.starts_with("https://") {
match media_svc
.download_and_store(url, &att.media_type, att.filename.as_deref())
.await
{
Ok(path) => Some(path),
Err(e) => {
tracing::warn!(
url = %url,
error = %e,
"failed to download media attachment"
);
None
}
}
} else if url.starts_with("whatsapp://media/") {
let media_id = url.trim_start_matches("whatsapp://media/");
if let Some(ref wa) = state.whatsapp {
match media_svc
.download_whatsapp_media(
media_id,
&wa.token,
&att.media_type,
att.filename.as_deref(),
)
.await
{
Ok(path) => Some(path),
Err(e) => {
tracing::warn!(
media_id = %media_id,
error = %e,
"failed to download WhatsApp media"
);
None
}
}
} else {
tracing::debug!("WhatsApp adapter not configured, cannot download media");
None
}
} else {
att.local_path.clone()
}
} else {
att.local_path.clone()
};
if auto_transcribe
&& att.media_type == roboticus_channels::MediaType::Audio
&& let Some(ref path) = local_path
&& let Some(ref voice_lock) = state.voice
{
match tokio::fs::read(path).await {
Ok(audio_data) => {
let format = audio_format_from_content_type(&att.content_type);
let mut voice = voice_lock.write().await;
match voice.transcribe(&audio_data, format).await {
Ok(result) if !result.text.trim().is_empty() => {
tracing::info!(
path = %path.display(),
chars = result.text.len(),
"audio transcription complete"
);
enrichments.push(format!("[Transcription: {}]", result.text.trim()));
}
Ok(_) => {
tracing::debug!("audio transcription returned empty text");
}
Err(e) => {
tracing::warn!(error = %e, "audio transcription failed");
}
}
}
Err(e) => {
tracing::warn!(
path = %path.display(),
error = %e,
"failed to read audio file for transcription"
);
}
}
}
if att.media_type == roboticus_channels::MediaType::Image {
if let Some(ref path) = local_path {
match tokio::fs::read(path).await {
Ok(bytes) => {
let b64 = base64::Engine::encode(
&base64::engine::general_purpose::STANDARD,
&bytes,
);
content_parts.push(roboticus_llm::format::ContentPart::ImageBase64 {
media_type: att.content_type.clone(),
data: b64,
});
tracing::info!(
path = %path.display(),
bytes = bytes.len(),
"image encoded as base64 ContentPart"
);
}
Err(e) => {
tracing::warn!(
path = %path.display(),
error = %e,
"failed to read image file for multimodal"
);
let desc = att.filename.as_deref().unwrap_or("image");
enrichments.push(format!("[Image attached: {desc}]"));
}
}
} else if auto_describe {
let desc = att.filename.as_deref().unwrap_or("image");
enrichments.push(format!("[Image attached: {desc}]"));
}
}
}
if !enrichments.is_empty() {
let prefix = enrichments.join(" ");
if inbound.content.is_empty() {
inbound.content = prefix;
} else {
inbound.content = format!("{prefix}\n{}", inbound.content);
}
}
content_parts
}