use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct ChatSession {
pub id: Uuid,
pub user_id: Uuid,
pub instance_id: Option<Uuid>,
pub lead_score: f64,
pub is_converted: bool,
pub last_active_at: DateTime<Utc>,
pub metadata: serde_json::Value,
pub classified_at: Option<DateTime<Utc>>,
pub classification_claimed_at: Option<DateTime<Utc>>,
pub created_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct ChatMessage {
pub id: Uuid,
pub session_id: Uuid,
pub role: String,
pub content: String,
pub sent_at: DateTime<Utc>,
#[serde(default)]
pub client_msg_id: Option<String>,
#[serde(default)]
pub ghost_decision: bool,
#[serde(default)]
pub user_message_id: Option<Uuid>,
#[serde(default)]
pub continues_from_message_id: Option<Uuid>,
#[serde(default)]
pub truncated: bool,
#[serde(default)]
pub model: Option<String>,
#[serde(default)]
pub usage: Option<serde_json::Value>,
#[serde(default)]
pub generation_id: Option<String>,
#[serde(default)]
pub assistant_action_type: Option<String>,
}
#[derive(Debug, Clone, Serialize, sqlx::FromRow)]
pub struct ChatMessageSlim {
pub id: Uuid,
pub role: String,
pub content: String,
pub sent_at: DateTime<Utc>,
pub client_msg_id: Option<String>,
}
pub struct ChatRepo<'a> {
pub pool: &'a PgPool,
}
impl<'a> ChatRepo<'a> {
pub async fn create_session(
&self,
user_id: Uuid,
instance_id: Uuid,
) -> Result<ChatSession, sqlx::Error> {
self.create_session_with_metadata(user_id, instance_id, serde_json::json!({}))
.await
}
pub async fn create_session_with_metadata(
&self,
user_id: Uuid,
instance_id: Uuid,
metadata: serde_json::Value,
) -> Result<ChatSession, sqlx::Error> {
sqlx::query_as::<_, ChatSession>(
"INSERT INTO engine.chat_sessions (user_id, instance_id, metadata) \
VALUES ($1, $2, $3) \
RETURNING *",
)
.bind(user_id)
.bind(instance_id)
.bind(metadata)
.fetch_one(self.pool)
.await
}
pub async fn get_session(&self, session_id: Uuid) -> Result<Option<ChatSession>, sqlx::Error> {
sqlx::query_as::<_, ChatSession>("SELECT * FROM engine.chat_sessions WHERE id = $1")
.bind(session_id)
.fetch_optional(self.pool)
.await
}
pub async fn create_or_resume(
&self,
user_id: Uuid,
instance_id: Uuid,
) -> Result<ChatSession, sqlx::Error> {
if let Some(existing) = sqlx::query_as::<_, ChatSession>(
"SELECT * FROM engine.chat_sessions \
WHERE user_id = $1 AND instance_id = $2 \
ORDER BY last_active_at DESC LIMIT 1",
)
.bind(user_id)
.bind(instance_id)
.fetch_optional(self.pool)
.await?
{
sqlx::query("UPDATE engine.chat_sessions SET last_active_at = now() WHERE id = $1")
.bind(existing.id)
.execute(self.pool)
.await?;
return Ok(existing);
}
self.create_session(user_id, instance_id).await
}
pub async fn append_message(
&self,
session_id: Uuid,
role: &str,
content: &str,
) -> Result<Uuid, sqlx::Error> {
let mut tx = self.pool.begin().await?;
let id: Uuid = sqlx::query_scalar(
"INSERT INTO engine.chat_messages (session_id, role, content) \
VALUES ($1, $2, $3) RETURNING id",
)
.bind(session_id)
.bind(role)
.bind(content)
.fetch_one(&mut *tx)
.await?;
sqlx::query("UPDATE engine.chat_sessions SET last_active_at = now() WHERE id = $1")
.bind(session_id)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(id)
}
pub async fn history(
&self,
session_id: Uuid,
limit: i64,
offset: i64,
) -> Result<Vec<ChatMessage>, sqlx::Error> {
let mut rows = sqlx::query_as::<_, ChatMessage>(
"SELECT * FROM engine.chat_messages \
WHERE session_id = $1 \
ORDER BY sent_at DESC \
LIMIT $2 OFFSET $3",
)
.bind(session_id)
.bind(limit)
.bind(offset)
.fetch_all(self.pool)
.await?;
rows.reverse();
Ok(rows)
}
pub async fn history_slim(
&self,
session_id: Uuid,
limit: i64,
offset: i64,
) -> Result<Vec<ChatMessageSlim>, sqlx::Error> {
let mut rows = sqlx::query_as::<_, ChatMessageSlim>(
"SELECT id, role, content, sent_at, client_msg_id FROM engine.chat_messages \
WHERE session_id = $1 \
ORDER BY sent_at DESC \
LIMIT $2 OFFSET $3",
)
.bind(session_id)
.bind(limit)
.bind(offset)
.fetch_all(self.pool)
.await?;
rows.reverse();
Ok(rows)
}
pub async fn list_sessions(&self, user_id: Uuid) -> Result<Vec<ChatSession>, sqlx::Error> {
sqlx::query_as::<_, ChatSession>(
"SELECT * FROM engine.chat_sessions \
WHERE user_id = $1 \
ORDER BY last_active_at DESC",
)
.bind(user_id)
.fetch_all(self.pool)
.await
}
}
#[derive(Debug, Clone)]
pub struct AssistantInsert {
pub id: Uuid,
pub content: String,
pub assistant_action_type: String, pub continues_from_message_id: Option<Uuid>,
pub truncated: bool,
pub model: Option<String>,
pub usage: Option<serde_json::Value>,
pub generation_id: Option<String>,
}
#[derive(Debug)]
pub enum UpsertUserOutcome {
Inserted { message_id: Uuid },
Replay {
user_message_id: Uuid,
ghost: bool,
assistant_chain: Vec<ChatMessage>,
},
DuplicateInProgress { user_message_id: Uuid },
}
impl<'a> ChatRepo<'a> {
pub async fn upsert_user_message_idempotent(
&self,
session_id: Uuid,
content: &str,
client_msg_id: &str,
) -> Result<UpsertUserOutcome, sqlx::Error> {
let mut tx = self.pool.begin().await?;
let existing: Option<ChatMessage> = sqlx::query_as::<_, ChatMessage>(
"SELECT * FROM engine.chat_messages \
WHERE session_id = $1 AND client_msg_id = $2 AND role = 'user' \
LIMIT 1",
)
.bind(session_id)
.bind(client_msg_id)
.fetch_optional(&mut *tx)
.await?;
if let Some(row) = existing {
let assistant_chain: Vec<ChatMessage> = sqlx::query_as::<_, ChatMessage>(
"SELECT * FROM engine.chat_messages \
WHERE user_message_id = $1 AND role = 'assistant' \
ORDER BY sent_at ASC",
)
.bind(row.id)
.fetch_all(&mut *tx)
.await?;
tx.commit().await?;
return Ok(if !assistant_chain.is_empty() {
UpsertUserOutcome::Replay {
user_message_id: row.id,
ghost: false,
assistant_chain,
}
} else if row.ghost_decision {
UpsertUserOutcome::Replay {
user_message_id: row.id,
ghost: true,
assistant_chain: vec![],
}
} else {
UpsertUserOutcome::DuplicateInProgress {
user_message_id: row.id,
}
});
}
let id: Uuid = sqlx::query_scalar(
"INSERT INTO engine.chat_messages (session_id, role, content, client_msg_id) \
VALUES ($1, 'user', $2, $3) RETURNING id",
)
.bind(session_id)
.bind(content)
.bind(client_msg_id)
.fetch_one(&mut *tx)
.await?;
sqlx::query("UPDATE engine.chat_sessions SET last_active_at = now() WHERE id = $1")
.bind(session_id)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(UpsertUserOutcome::Inserted { message_id: id })
}
pub async fn mark_user_message_ghosted(
&self,
user_message_id: Uuid,
) -> Result<(), sqlx::Error> {
sqlx::query(
"UPDATE engine.chat_messages SET ghost_decision = true \
WHERE id = $1 AND role = 'user' AND ghost_decision = false",
)
.bind(user_message_id)
.execute(self.pool)
.await?;
Ok(())
}
pub async fn insert_assistant_batch(
&self,
session_id: Uuid,
user_message_id: Uuid,
rows: &[AssistantInsert],
) -> Result<(), sqlx::Error> {
if rows.is_empty() {
return Ok(());
}
let mut tx = self.pool.begin().await?;
for row in rows {
sqlx::query(
"INSERT INTO engine.chat_messages \
(id, session_id, role, content, user_message_id, \
continues_from_message_id, truncated, model, usage, generation_id, \
assistant_action_type) \
VALUES ($1, $2, 'assistant', $3, $4, $5, $6, $7, $8, $9, $10)",
)
.bind(row.id)
.bind(session_id)
.bind(&row.content)
.bind(user_message_id)
.bind(row.continues_from_message_id)
.bind(row.truncated)
.bind(&row.model)
.bind(&row.usage)
.bind(&row.generation_id)
.bind(&row.assistant_action_type)
.execute(&mut *tx)
.await?;
}
sqlx::query("UPDATE engine.chat_sessions SET last_active_at = now() WHERE id = $1")
.bind(session_id)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[sqlx::test(migrations = "./migrations")]
async fn create_then_retrieve_session(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let user_id = Uuid::new_v4();
let instance_id = Uuid::new_v4();
let s = repo.create_session(user_id, instance_id).await.unwrap();
let loaded = repo.get_session(s.id).await.unwrap().unwrap();
assert_eq!(loaded.user_id, user_id);
assert_eq!(loaded.instance_id, Some(instance_id));
assert_eq!(loaded.lead_score, 0.0);
assert!(!loaded.is_converted);
}
#[sqlx::test(migrations = "./migrations")]
async fn append_message_and_history_roundtrip(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let user_id = Uuid::new_v4();
let instance_id = Uuid::new_v4();
let s = repo.create_session(user_id, instance_id).await.unwrap();
repo.append_message(s.id, "user", "hello").await.unwrap();
repo.append_message(s.id, "assistant", "hi there")
.await
.unwrap();
repo.append_message(s.id, "user", "how are you?")
.await
.unwrap();
let history = repo.history(s.id, 50, 0).await.unwrap();
assert_eq!(history.len(), 3);
assert_eq!(history[0].role, "user");
assert_eq!(history[0].content, "hello");
assert_eq!(history[1].role, "assistant");
assert_eq!(history[2].content, "how are you?");
}
#[sqlx::test(migrations = "./migrations")]
async fn history_slim_returns_role_content_sent_at_in_order(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let user_id = Uuid::new_v4();
let instance_id = Uuid::new_v4();
let s = repo.create_session(user_id, instance_id).await.unwrap();
repo.append_message(s.id, "user", "alpha").await.unwrap();
repo.append_message(s.id, "assistant", "beta")
.await
.unwrap();
repo.append_message(s.id, "user", "gamma").await.unwrap();
let slim = repo.history_slim(s.id, 50, 0).await.unwrap();
assert_eq!(slim.len(), 3);
assert_eq!(slim[0].role, "user");
assert_eq!(slim[0].content, "alpha");
assert_eq!(slim[1].role, "assistant");
assert_eq!(slim[1].content, "beta");
assert_eq!(slim[2].role, "user");
assert_eq!(slim[2].content, "gamma");
}
#[sqlx::test(migrations = "./migrations")]
async fn history_slim_respects_limit_and_offset(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let user_id = Uuid::new_v4();
let instance_id = Uuid::new_v4();
let s = repo.create_session(user_id, instance_id).await.unwrap();
for n in 0..5 {
repo.append_message(s.id, "user", &format!("m{n}"))
.await
.unwrap();
}
let page = repo.history_slim(s.id, 2, 0).await.unwrap();
assert_eq!(
page.iter().map(|m| m.content.as_str()).collect::<Vec<_>>(),
vec!["m3", "m4"]
);
let page = repo.history_slim(s.id, 2, 2).await.unwrap();
assert_eq!(
page.iter().map(|m| m.content.as_str()).collect::<Vec<_>>(),
vec!["m1", "m2"]
);
}
#[sqlx::test(migrations = "./migrations")]
async fn list_sessions_for_user(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let user_id = Uuid::new_v4();
let other_user = Uuid::new_v4();
let i1 = Uuid::new_v4();
let i2 = Uuid::new_v4();
let i3 = Uuid::new_v4();
repo.create_session(user_id, i1).await.unwrap();
repo.create_session(user_id, i2).await.unwrap();
repo.create_session(other_user, i3).await.unwrap();
let sessions = repo.list_sessions(user_id).await.unwrap();
assert_eq!(sessions.len(), 2);
assert!(sessions.iter().all(|s| s.user_id == user_id));
}
#[sqlx::test(migrations = "./migrations")]
async fn create_or_resume_returns_existing(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let user_id = Uuid::new_v4();
let instance_id = Uuid::new_v4();
let first = repo.create_session(user_id, instance_id).await.unwrap();
let resumed = repo.create_or_resume(user_id, instance_id).await.unwrap();
assert_eq!(first.id, resumed.id);
}
#[sqlx::test(migrations = "./migrations")]
async fn upsert_user_message_idempotent_first_insert(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let user_id = Uuid::new_v4();
let instance_id = Uuid::new_v4();
let s = repo.create_session(user_id, instance_id).await.unwrap();
let outcome = repo
.upsert_user_message_idempotent(s.id, "hello", "01J0000000000000000000000A")
.await
.unwrap();
match outcome {
UpsertUserOutcome::Inserted { message_id } => {
assert_ne!(message_id, Uuid::nil());
}
other => panic!("expected Inserted, got {other:?}"),
}
}
#[sqlx::test(migrations = "./migrations")]
async fn upsert_user_message_idempotent_replay_after_done(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let user_id = Uuid::new_v4();
let instance_id = Uuid::new_v4();
let s = repo.create_session(user_id, instance_id).await.unwrap();
let first = match repo
.upsert_user_message_idempotent(s.id, "hello", "01J0000000000000000000000A")
.await
.unwrap()
{
UpsertUserOutcome::Inserted { message_id } => message_id,
o => panic!("expected Inserted, got {o:?}"),
};
repo.insert_assistant_batch(
s.id,
first,
&[AssistantInsert {
id: Uuid::new_v4(),
content: "hi back".into(),
assistant_action_type: "reply".into(),
continues_from_message_id: None,
truncated: false,
model: Some("x-ai/grok-4-fast".into()),
usage: Some(
serde_json::json!({"prompt_tokens":3,"completion_tokens":2,"total_tokens":5}),
),
generation_id: Some("gen-1".into()),
}],
)
.await
.unwrap();
let outcome = repo
.upsert_user_message_idempotent(s.id, "hello", "01J0000000000000000000000A")
.await
.unwrap();
match outcome {
UpsertUserOutcome::Replay {
user_message_id,
ghost,
assistant_chain,
} => {
assert_eq!(user_message_id, first);
assert!(!ghost);
assert_eq!(assistant_chain.len(), 1);
assert_eq!(assistant_chain[0].content, "hi back");
}
other => panic!("expected Replay, got {other:?}"),
}
}
#[sqlx::test(migrations = "./migrations")]
async fn upsert_user_message_idempotent_409_when_no_assistant_and_not_ghost(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let user_id = Uuid::new_v4();
let instance_id = Uuid::new_v4();
let s = repo.create_session(user_id, instance_id).await.unwrap();
let first = match repo
.upsert_user_message_idempotent(s.id, "hello", "01J0000000000000000000000A")
.await
.unwrap()
{
UpsertUserOutcome::Inserted { message_id } => message_id,
o => panic!("expected Inserted, got {o:?}"),
};
match repo
.upsert_user_message_idempotent(s.id, "hello", "01J0000000000000000000000A")
.await
.unwrap()
{
UpsertUserOutcome::DuplicateInProgress { user_message_id } => {
assert_eq!(user_message_id, first);
}
other => panic!("expected DuplicateInProgress, got {other:?}"),
}
}
#[sqlx::test(migrations = "./migrations")]
async fn upsert_user_message_idempotent_replay_when_ghost(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let user_id = Uuid::new_v4();
let instance_id = Uuid::new_v4();
let s = repo.create_session(user_id, instance_id).await.unwrap();
let first = match repo
.upsert_user_message_idempotent(s.id, "hello", "01J0000000000000000000000A")
.await
.unwrap()
{
UpsertUserOutcome::Inserted { message_id } => message_id,
o => panic!("expected Inserted, got {o:?}"),
};
repo.mark_user_message_ghosted(first).await.unwrap();
match repo
.upsert_user_message_idempotent(s.id, "hello", "01J0000000000000000000000A")
.await
.unwrap()
{
UpsertUserOutcome::Replay {
ghost,
assistant_chain,
..
} => {
assert!(ghost);
assert!(assistant_chain.is_empty());
}
other => panic!("expected Replay, got {other:?}"),
}
}
}