use super::types::{
AnthropicContent, AnthropicContentBlock, AnthropicMessage, CacheControl, SystemMessage,
};
use crate::config::AnthropicConfig;
use crate::logging::log_debug;
use crate::messages::{CacheType, MessageContent, MessageRole, UnifiedMessage};
fn get_cache_ttl(cache_type: CacheType) -> &'static str {
match cache_type {
CacheType::Ephemeral => "5m",
CacheType::Extended => "1h",
}
}
pub(super) fn transform_unified_messages(
messages: &[&UnifiedMessage],
config: &AnthropicConfig,
enable_caching: bool,
) -> (Vec<SystemMessage>, Vec<AnthropicMessage>) {
let (mut system_messages, conversation_msg_sources) =
separate_system_and_conversation_messages(messages);
let conversation_messages = convert_and_combine_conversation_messages(
&conversation_msg_sources,
config,
enable_caching,
);
apply_system_cache_control(&mut system_messages, messages, config, enable_caching);
(system_messages, conversation_messages)
}
fn separate_system_and_conversation_messages<'a>(
messages: &'a [&'a UnifiedMessage],
) -> (Vec<SystemMessage>, Vec<&'a UnifiedMessage>) {
let mut system_messages = Vec::new();
let mut conversation_msg_sources = Vec::new();
for msg in messages {
match msg.role {
MessageRole::System => {
system_messages.push(SystemMessage {
message_type: "text".to_string(),
text: extract_text_content(&msg.content),
cache_control: None,
});
}
MessageRole::User | MessageRole::Assistant | MessageRole::Tool => {
conversation_msg_sources.push(*msg);
}
}
}
(system_messages, conversation_msg_sources)
}
fn convert_and_combine_conversation_messages(
conversation_msg_sources: &[&UnifiedMessage],
config: &AnthropicConfig,
enable_caching: bool,
) -> Vec<AnthropicMessage> {
let mut combined_messages: Vec<AnthropicMessage> = Vec::new();
for (index, msg) in conversation_msg_sources.iter().enumerate() {
let should_cache =
determine_cache_decision(msg, index, conversation_msg_sources.len(), enable_caching);
let anthropic_msg =
unified_message_to_anthropic_with_cache_control(msg, config, should_cache);
combine_or_add_message(&mut combined_messages, anthropic_msg);
}
combined_messages
}
fn determine_cache_decision(
msg: &UnifiedMessage,
index: usize,
total_messages: usize,
enable_caching: bool,
) -> bool {
if msg.attributes.cacheable && enable_caching {
let should_cache =
should_place_cache_breakpoint_at_conversation_index(index, total_messages);
log_debug!(
provider = "anthropic",
msg_index = index,
total_messages = total_messages,
should_cache = should_cache,
role = ?msg.role,
"Cache breakpoint decision for conversation message"
);
should_cache
} else {
log_debug!(
provider = "anthropic",
msg_index = index,
should_cache = false,
role = ?msg.role,
"Message not cacheable or caching disabled"
);
false
}
}
fn combine_or_add_message(
combined_messages: &mut Vec<AnthropicMessage>,
anthropic_msg: AnthropicMessage,
) {
if let Some(last_msg) = combined_messages.last_mut() {
if last_msg.role == anthropic_msg.role {
log_debug!(
provider = "anthropic",
role = %anthropic_msg.role,
"Combining consecutive messages with same role"
);
merge_content_blocks(&mut last_msg.content, anthropic_msg.content);
} else {
combined_messages.push(anthropic_msg);
}
} else {
combined_messages.push(anthropic_msg);
}
}
fn merge_content_blocks(existing: &mut AnthropicContent, new: AnthropicContent) {
match (&mut *existing, new) {
(AnthropicContent::Blocks(existing_blocks), AnthropicContent::Blocks(new)) => {
existing_blocks.extend(new);
}
(AnthropicContent::Blocks(existing_blocks), AnthropicContent::Text(text)) => {
existing_blocks.push(AnthropicContentBlock::Text {
text,
cache_control: None,
});
}
(AnthropicContent::Text(existing_text), AnthropicContent::Text(new_text)) => {
existing_text.push('\n');
existing_text.push_str(&new_text);
}
(AnthropicContent::Text(existing_text), AnthropicContent::Blocks(blocks)) => {
let mut all_blocks = vec![AnthropicContentBlock::Text {
text: existing_text.clone(),
cache_control: None,
}];
all_blocks.extend(blocks);
*existing = AnthropicContent::Blocks(all_blocks);
}
}
}
fn apply_system_cache_control(
system_messages: &mut [SystemMessage],
messages: &[&UnifiedMessage],
config: &AnthropicConfig,
enable_caching: bool,
) {
if !enable_caching || system_messages.is_empty() {
return;
}
let system_msgs: Vec<_> = messages
.iter()
.filter(|m| m.role == MessageRole::System)
.collect();
if let Some(last_cacheable_index) = system_msgs.iter().rposition(|msg| msg.attributes.cacheable)
{
if let Some(system_msg) = system_messages.get_mut(last_cacheable_index) {
let ttl = system_msgs[last_cacheable_index]
.attributes
.cache_type
.map(get_cache_ttl)
.map(|s| s.to_string())
.unwrap_or_else(|| config.cache_ttl.clone());
system_msg.cache_control = Some(CacheControl {
cache_type: "ephemeral".to_string(),
ttl: Some(ttl),
});
}
}
}
fn unified_message_to_anthropic_with_cache_control(
msg: &UnifiedMessage,
config: &AnthropicConfig,
should_cache: bool,
) -> AnthropicMessage {
let role = convert_message_role(&msg.role);
let content = convert_message_content(&msg.content, &role, msg, config, should_cache);
AnthropicMessage { role, content }
}
fn convert_message_role(role: &MessageRole) -> String {
match role {
MessageRole::User => "user".to_string(),
MessageRole::Assistant => "assistant".to_string(),
MessageRole::Tool => "user".to_string(), MessageRole::System => "user".to_string(), }
}
fn convert_message_content(
content: &MessageContent,
role: &str,
msg: &UnifiedMessage,
config: &AnthropicConfig,
should_cache: bool,
) -> AnthropicContent {
match content {
MessageContent::Text(text) => convert_text_content(text, role, msg, config, should_cache),
MessageContent::Json(value) => convert_json_content(value, msg, config, should_cache),
MessageContent::ToolCall {
id,
name,
arguments,
} => convert_tool_call(id, name, arguments),
MessageContent::ToolResult {
tool_call_id,
content,
is_error,
} => convert_tool_result(tool_call_id, content, *is_error),
}
}
fn convert_text_content(
text: &str,
role: &str,
msg: &UnifiedMessage,
config: &AnthropicConfig,
should_cache: bool,
) -> AnthropicContent {
if should_cache && !text.is_empty() {
let ttl = msg
.attributes
.cache_type
.map(get_cache_ttl)
.map(|s| s.to_string())
.unwrap_or_else(|| config.cache_ttl.clone());
log_debug!(
provider = "anthropic",
role = %role,
text_preview = %text.chars().take(100).collect::<String>(),
"Applying cache_control to conversation message"
);
AnthropicContent::Blocks(vec![AnthropicContentBlock::Text {
text: text.to_string(),
cache_control: Some(CacheControl {
cache_type: "ephemeral".to_string(),
ttl: Some(ttl),
}),
}])
} else {
log_debug!(
provider = "anthropic",
role = %role,
text_preview = %text.chars().take(100).collect::<String>(),
"NOT applying cache_control to conversation message"
);
AnthropicContent::Text(text.to_string())
}
}
fn convert_json_content(
value: &serde_json::Value,
msg: &UnifiedMessage,
config: &AnthropicConfig,
should_cache: bool,
) -> AnthropicContent {
let json_text = serde_json::to_string_pretty(value).unwrap_or_default();
if should_cache && !json_text.is_empty() {
let ttl = msg
.attributes
.cache_type
.map(get_cache_ttl)
.map(|s| s.to_string())
.unwrap_or_else(|| config.cache_ttl.clone());
AnthropicContent::Blocks(vec![AnthropicContentBlock::Text {
text: json_text,
cache_control: Some(CacheControl {
cache_type: "ephemeral".to_string(),
ttl: Some(ttl),
}),
}])
} else {
AnthropicContent::Text(json_text)
}
}
fn convert_tool_call(id: &str, name: &str, arguments: &serde_json::Value) -> AnthropicContent {
log_debug!(
provider = "anthropic",
tool_id = %id,
tool_name = %name,
"Converting ToolCall to Anthropic format for tool result pairing"
);
AnthropicContent::Blocks(vec![AnthropicContentBlock::ToolUse {
id: id.to_string(),
name: name.to_string(),
input: arguments.clone(),
}])
}
fn convert_tool_result(tool_call_id: &str, content: &str, is_error: bool) -> AnthropicContent {
log_debug!(
provider = "anthropic",
tool_call_id = %tool_call_id,
is_error = is_error,
content_len = content.len(),
content_preview = %if content.len() > 200 {
format!("{}...", &content[..200])
} else {
content.to_string()
},
"DEBUG: Converting ToolResult to Anthropic format"
);
AnthropicContent::Blocks(vec![AnthropicContentBlock::ToolResult {
tool_use_id: tool_call_id.to_string(),
content: if is_error {
format!("Error: {}", content)
} else {
content.to_string()
},
}])
}
fn extract_text_content(content: &MessageContent) -> String {
match content {
MessageContent::Text(text) => text.clone(),
MessageContent::Json(value) => serde_json::to_string_pretty(value).unwrap_or_default(),
MessageContent::ToolCall {
name, arguments, ..
} => {
format!(
"Tool call: {} with args: {}",
name,
serde_json::to_string(arguments).unwrap_or_default()
)
}
MessageContent::ToolResult {
content, is_error, ..
} => {
if *is_error {
format!("Error: {}", content)
} else {
content.clone()
}
}
}
}
fn should_place_cache_breakpoint_at_conversation_index(
index: usize,
total_messages: usize,
) -> bool {
match total_messages {
1..=2 => {
index == 0
}
3..=5 => {
index == 0 || index == 2
}
_ => {
if total_messages >= 2 {
index == total_messages - 1 || index == total_messages - 2
} else {
index == total_messages - 1
}
}
}
}