use crate::db::{models::Message, repository::MessageRepository};
use crate::services::ServiceContext;
use anyhow::{Context, Result};
use chrono::Utc;
use uuid::Uuid;
#[derive(Clone)]
pub struct MessageService {
context: ServiceContext,
}
impl MessageService {
pub fn new(context: ServiceContext) -> Self {
Self { context }
}
pub async fn create_message(
&self,
session_id: Uuid,
role: String,
content: String,
) -> Result<Message> {
let repo = MessageRepository::new(self.context.pool());
let sequence = self.get_next_sequence(session_id).await?;
let message = Message {
id: Uuid::new_v4(),
session_id,
role,
content,
sequence,
created_at: Utc::now(),
token_count: None,
cost: None,
input_tokens: None,
cache_creation_tokens: None,
cache_read_tokens: None,
thinking: None,
};
repo.create(&message)
.await
.context("Failed to create message")?;
tracing::debug!(
"Created new message: {} in session {} (seq: {})",
message.id,
session_id,
sequence
);
Ok(message)
}
pub async fn get_message(&self, id: Uuid) -> Result<Option<Message>> {
let repo = MessageRepository::new(self.context.pool());
repo.find_by_id(id).await.context("Failed to get message")
}
pub async fn get_message_required(&self, id: Uuid) -> Result<Message> {
self.get_message(id)
.await?
.ok_or_else(|| anyhow::anyhow!("Message not found: {}", id))
}
pub async fn list_messages_for_session(&self, session_id: Uuid) -> Result<Vec<Message>> {
let repo = MessageRepository::new(self.context.pool());
repo.find_by_session(session_id)
.await
.context("Failed to list messages for session")
}
pub async fn update_message(&self, message: &Message) -> Result<()> {
let repo = MessageRepository::new(self.context.pool());
repo.update(message)
.await
.context("Failed to update message")?;
tracing::debug!("Updated message: {}", message.id);
Ok(())
}
pub async fn update_message_usage(
&self,
id: Uuid,
token_count: i32,
cost: f64,
input_tokens: Option<i32>,
cache_creation_tokens: Option<i32>,
cache_read_tokens: Option<i32>,
) -> Result<()> {
let mut message = self.get_message_required(id).await?;
message.token_count = Some(token_count);
message.cost = Some(cost);
if input_tokens.is_some() {
message.input_tokens = input_tokens;
}
if cache_creation_tokens.is_some() {
message.cache_creation_tokens = cache_creation_tokens;
}
if cache_read_tokens.is_some() {
message.cache_read_tokens = cache_read_tokens;
}
let repo = MessageRepository::new(self.context.pool());
repo.update(&message)
.await
.context("Failed to update message usage")?;
tracing::debug!(
"Updated message usage: {} ({} output, {} input, {} cache_create, {} cache_read, ${:.4})",
id,
token_count,
input_tokens
.map(|t| t.to_string())
.unwrap_or_else(|| "—".to_string()),
cache_creation_tokens
.map(|t| t.to_string())
.unwrap_or_else(|| "—".to_string()),
cache_read_tokens
.map(|t| t.to_string())
.unwrap_or_else(|| "—".to_string()),
cost
);
Ok(())
}
pub async fn last_assistant_input_tokens(&self, session_id: Uuid) -> Result<Option<i32>> {
let repo = MessageRepository::new(self.context.pool());
repo.last_assistant_input_tokens(session_id)
.await
.context("Failed to read last assistant input_tokens")
}
pub async fn append_content(&self, id: Uuid, content_to_append: &str) -> Result<()> {
let repo = MessageRepository::new(self.context.pool());
repo.append_content(id, content_to_append)
.await
.context("Failed to append to message")?;
Ok(())
}
pub async fn set_thinking(&self, id: Uuid, thinking: &str) -> Result<()> {
let repo = MessageRepository::new(self.context.pool());
repo.set_thinking(id, thinking)
.await
.context("Failed to set thinking")?;
Ok(())
}
pub async fn append_thinking(&self, id: Uuid, thinking: &str) -> Result<()> {
let repo = MessageRepository::new(self.context.pool());
repo.append_thinking(id, thinking)
.await
.context("Failed to append thinking")?;
Ok(())
}
pub async fn delete_message(&self, id: Uuid) -> Result<()> {
let repo = MessageRepository::new(self.context.pool());
repo.delete(id).await.context("Failed to delete message")?;
tracing::debug!("Deleted message: {}", id);
Ok(())
}
pub async fn delete_messages_for_session(&self, session_id: Uuid) -> Result<()> {
let repo = MessageRepository::new(self.context.pool());
repo.delete_by_session(session_id)
.await
.context("Failed to delete messages for session")?;
tracing::info!("Deleted messages for session {}", session_id);
Ok(())
}
pub async fn count_messages_in_session(&self, session_id: Uuid) -> Result<i64> {
let repo = MessageRepository::new(self.context.pool());
repo.count_by_session(session_id)
.await
.context("Failed to count messages in session")
}
async fn get_next_sequence(&self, session_id: Uuid) -> Result<i32> {
let count = self.count_messages_in_session(session_id).await?;
Ok((count + 1) as i32)
}
pub async fn get_last_message(&self, session_id: Uuid) -> Result<Option<Message>> {
let messages = self.list_messages_for_session(session_id).await?;
Ok(messages.into_iter().last())
}
pub async fn get_messages_by_role(&self, session_id: Uuid, role: &str) -> Result<Vec<Message>> {
let messages = self.list_messages_for_session(session_id).await?;
Ok(messages.into_iter().filter(|m| m.role == role).collect())
}
pub async fn calculate_total_tokens(&self, session_id: Uuid) -> Result<i32> {
let messages = self.list_messages_for_session(session_id).await?;
let total = messages.iter().filter_map(|m| m.token_count).sum();
Ok(total)
}
pub async fn calculate_total_cost(&self, session_id: Uuid) -> Result<f64> {
let messages = self.list_messages_for_session(session_id).await?;
let total = messages.iter().filter_map(|m| m.cost).sum();
Ok(total)
}
}