use std::collections::HashMap;
use std::pin::Pin;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use futures::Stream;
use uuid::Uuid;
use crate::error::ChannelError;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AttachmentKind {
Audio,
Image,
Document,
}
impl AttachmentKind {
pub fn from_mime_type(mime: &str) -> Self {
let base = mime.split(';').next().unwrap_or(mime).trim();
if base.starts_with("audio/") {
Self::Audio
} else if base.starts_with("image/") {
Self::Image
} else {
Self::Document
}
}
}
#[derive(Debug, Clone)]
pub struct IncomingAttachment {
pub id: String,
pub kind: AttachmentKind,
pub mime_type: String,
pub filename: Option<String>,
pub size_bytes: Option<u64>,
pub source_url: Option<String>,
pub storage_key: Option<String>,
pub extracted_text: Option<String>,
pub data: Vec<u8>,
pub duration_secs: Option<u32>,
}
#[derive(Debug, Clone)]
pub struct IncomingMessage {
pub id: Uuid,
pub channel: String,
pub user_id: String,
pub owner_id: String,
pub sender_id: String,
pub user_name: Option<String>,
pub content: String,
pub thread_id: Option<String>,
pub conversation_scope_id: Option<String>,
pub received_at: DateTime<Utc>,
pub metadata: serde_json::Value,
pub timezone: Option<String>,
pub attachments: Vec<IncomingAttachment>,
pub(crate) is_internal: bool,
}
impl IncomingMessage {
pub fn new(
channel: impl Into<String>,
user_id: impl Into<String>,
content: impl Into<String>,
) -> Self {
let user_id = user_id.into();
Self {
id: Uuid::new_v4(),
channel: channel.into(),
owner_id: user_id.clone(),
sender_id: user_id.clone(),
user_id,
user_name: None,
content: content.into(),
thread_id: None,
conversation_scope_id: None,
received_at: Utc::now(),
metadata: serde_json::Value::Null,
timezone: None,
attachments: Vec::new(),
is_internal: false,
}
}
pub fn with_thread(mut self, thread_id: impl Into<String>) -> Self {
let thread_id = thread_id.into();
self.conversation_scope_id = Some(thread_id.clone());
self.thread_id = Some(thread_id);
self
}
pub fn with_owner_id(mut self, owner_id: impl Into<String>) -> Self {
self.owner_id = owner_id.into();
self
}
pub fn with_sender_id(mut self, sender_id: impl Into<String>) -> Self {
self.sender_id = sender_id.into();
self
}
pub fn with_conversation_scope(mut self, scope_id: impl Into<String>) -> Self {
self.conversation_scope_id = Some(scope_id.into());
self
}
pub fn with_metadata(mut self, metadata: serde_json::Value) -> Self {
self.metadata = metadata;
self
}
pub fn with_user_name(mut self, name: impl Into<String>) -> Self {
self.user_name = Some(name.into());
self
}
pub fn with_timezone(mut self, tz: impl Into<String>) -> Self {
self.timezone = Some(tz.into());
self
}
pub fn with_attachments(mut self, attachments: Vec<IncomingAttachment>) -> Self {
self.attachments = attachments;
self
}
pub(crate) fn into_internal(mut self) -> Self {
self.is_internal = true;
self
}
pub fn conversation_scope(&self) -> Option<&str> {
self.conversation_scope_id
.as_deref()
.or(self.thread_id.as_deref())
}
pub fn routing_target(&self) -> Option<String> {
routing_target_from_metadata(&self.metadata).or_else(|| {
if self.sender_id.is_empty() {
None
} else {
Some(self.sender_id.clone())
}
})
}
}
pub fn routing_target_from_metadata(metadata: &serde_json::Value) -> Option<String> {
metadata
.get("signal_target")
.and_then(|value| match value {
serde_json::Value::String(s) => Some(s.clone()),
serde_json::Value::Number(n) => Some(n.to_string()),
_ => None,
})
.or_else(|| {
metadata.get("chat_id").and_then(|value| match value {
serde_json::Value::String(s) => Some(s.clone()),
serde_json::Value::Number(n) => Some(n.to_string()),
_ => None,
})
})
.or_else(|| {
metadata.get("target").and_then(|value| match value {
serde_json::Value::String(s) => Some(s.clone()),
serde_json::Value::Number(n) => Some(n.to_string()),
_ => None,
})
})
}
pub type MessageStream = Pin<Box<dyn Stream<Item = IncomingMessage> + Send>>;
#[derive(Debug, Clone)]
pub struct OutgoingResponse {
pub content: String,
pub thread_id: Option<String>,
pub attachments: Vec<String>,
pub metadata: serde_json::Value,
}
impl OutgoingResponse {
pub fn text(content: impl Into<String>) -> Self {
Self {
content: content.into(),
thread_id: None,
attachments: Vec::new(),
metadata: serde_json::Value::Null,
}
}
pub fn in_thread(mut self, thread_id: impl Into<String>) -> Self {
self.thread_id = Some(thread_id.into());
self
}
pub fn with_attachments(mut self, paths: Vec<String>) -> Self {
self.attachments = paths;
self
}
}
#[derive(Debug, Clone)]
pub struct ToolDecision {
pub tool_name: String,
pub rationale: String,
}
#[derive(Debug, Clone)]
pub enum StatusUpdate {
Thinking(String),
ToolStarted { name: String },
ToolCompleted {
name: String,
success: bool,
error: Option<String>,
parameters: Option<String>,
},
ToolResult { name: String, preview: String },
StreamChunk(String),
Status(String),
JobStarted {
job_id: String,
title: String,
browse_url: String,
},
ApprovalNeeded {
request_id: String,
tool_name: String,
description: String,
parameters: serde_json::Value,
allow_always: bool,
},
AuthRequired {
extension_name: String,
instructions: Option<String>,
auth_url: Option<String>,
setup_url: Option<String>,
},
AuthCompleted {
extension_name: String,
success: bool,
message: String,
},
ImageGenerated {
data_url: String,
path: Option<String>,
},
Suggestions { suggestions: Vec<String> },
ReasoningUpdate {
narrative: String,
decisions: Vec<ToolDecision>,
},
TurnCost {
input_tokens: u64,
output_tokens: u64,
cost_usd: String,
},
}
impl StatusUpdate {
pub fn tool_completed(
name: String,
result: &Result<String, crate::error::Error>,
params: &serde_json::Value,
tool: Option<&dyn crate::tools::Tool>,
) -> Self {
let success = result.is_ok();
let sensitive = tool.map(|t| t.sensitive_params()).unwrap_or(&[]);
Self::ToolCompleted {
name,
success,
error: result.as_ref().err().map(|e| e.to_string()),
parameters: if !success {
let safe = crate::tools::redact_params(params, sensitive);
Some(serde_json::to_string_pretty(&safe).unwrap_or_else(|_| safe.to_string()))
} else {
None
},
}
}
}
#[async_trait]
pub trait Channel: Send + Sync {
fn name(&self) -> &str;
async fn start(&self) -> Result<MessageStream, ChannelError>;
async fn respond(
&self,
msg: &IncomingMessage,
response: OutgoingResponse,
) -> Result<(), ChannelError>;
async fn send_status(
&self,
_status: StatusUpdate,
_metadata: &serde_json::Value,
) -> Result<(), ChannelError> {
Ok(())
}
async fn broadcast(
&self,
_user_id: &str,
_response: OutgoingResponse,
) -> Result<(), ChannelError> {
Ok(())
}
async fn health_check(&self) -> Result<(), ChannelError>;
fn conversation_context(&self, _metadata: &serde_json::Value) -> HashMap<String, String> {
HashMap::new()
}
async fn shutdown(&self) -> Result<(), ChannelError> {
Ok(())
}
}
#[async_trait]
pub trait ChannelSecretUpdater: Send + Sync {
async fn update_secret(&self, new_secret: Option<secrecy::SecretString>);
}
#[cfg(test)]
mod tests {
use super::*;
use crate::testing::credentials::TEST_REDACT_SECRET_123;
struct SecretTool;
#[async_trait]
impl crate::tools::Tool for SecretTool {
fn name(&self) -> &str {
"secret_save"
}
fn description(&self) -> &str {
"stub"
}
fn parameters_schema(&self) -> serde_json::Value {
serde_json::json!({"type": "object", "properties": {}})
}
async fn execute(
&self,
_params: serde_json::Value,
_ctx: &crate::context::JobContext,
) -> Result<crate::tools::ToolOutput, crate::tools::ToolError> {
unreachable!()
}
fn sensitive_params(&self) -> &[&str] {
&["value"]
}
}
#[test]
fn tool_completed_redacts_sensitive_params_on_failure() {
let params = serde_json::json!({"name": "api_key", "value": TEST_REDACT_SECRET_123});
let err: Result<String, crate::error::Error> =
Err(crate::error::ToolError::ExecutionFailed {
name: "secret_save".into(),
reason: "db error".into(),
}
.into());
let tool = SecretTool;
let status = StatusUpdate::tool_completed(
"secret_save".into(),
&err,
¶ms,
Some(&tool as &dyn crate::tools::Tool),
);
if let StatusUpdate::ToolCompleted {
success,
error,
parameters,
..
} = &status
{
assert!(!success);
let err_msg = error.as_deref().expect("should have error");
assert!(err_msg.contains("db error"), "error: {}", err_msg);
let param_str = parameters
.as_ref()
.expect("should have parameters on failure");
assert!(
param_str.contains("[REDACTED]"),
"sensitive value should be redacted: {}",
param_str
);
assert!(
!param_str.contains(TEST_REDACT_SECRET_123),
"raw secret should not appear: {}",
param_str
);
assert!(
param_str.contains("api_key"),
"non-sensitive params should be preserved: {}",
param_str
);
} else {
panic!("expected ToolCompleted variant");
}
}
#[test]
fn tool_completed_no_params_on_success() {
let params = serde_json::json!({"name": "key", "value": "secret"});
let ok: Result<String, crate::error::Error> = Ok("done".into());
let status = StatusUpdate::tool_completed("secret_save".into(), &ok, ¶ms, None);
if let StatusUpdate::ToolCompleted {
success,
error,
parameters,
..
} = &status
{
assert!(success);
assert!(error.is_none());
assert!(parameters.is_none(), "no params should be sent on success");
} else {
panic!("expected ToolCompleted variant");
}
}
#[test]
fn tool_completed_no_tool_passes_params_unredacted() {
let params = serde_json::json!({"cmd": "ls -la"});
let err: Result<String, crate::error::Error> =
Err(crate::error::ToolError::ExecutionFailed {
name: "shell".into(),
reason: "timeout".into(),
}
.into());
let status = StatusUpdate::tool_completed("shell".into(), &err, ¶ms, None);
if let StatusUpdate::ToolCompleted { parameters, .. } = &status {
let param_str = parameters.as_ref().expect("should have parameters");
assert!(
param_str.contains("ls -la"),
"non-sensitive params should pass through: {}",
param_str
);
} else {
panic!("expected ToolCompleted variant");
}
}
#[test]
fn test_incoming_message_with_timezone() {
let msg = IncomingMessage::new("test", "user1", "hello").with_timezone("America/New_York");
assert_eq!(msg.timezone.as_deref(), Some("America/New_York"));
}
}