use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct ChatSession {
pub id: Uuid,
pub user_id: Uuid,
pub instance_id: Option<Uuid>,
pub lead_score: f64,
pub is_converted: bool,
pub last_active_at: DateTime<Utc>,
pub metadata: serde_json::Value,
pub classified_at: Option<DateTime<Utc>>,
pub classification_claimed_at: Option<DateTime<Utc>>,
pub created_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct ChatMessage {
pub id: Uuid,
pub session_id: Uuid,
pub role: String,
pub content: String,
pub sent_at: DateTime<Utc>,
#[serde(default)]
pub client_msg_id: Option<String>,
#[serde(default)]
pub ghost_decision: bool,
#[serde(default)]
pub user_message_id: Option<Uuid>,
#[serde(default)]
pub continues_from_message_id: Option<Uuid>,
#[serde(default)]
pub truncated: bool,
#[serde(default)]
pub model: Option<String>,
#[serde(default)]
pub usage: Option<serde_json::Value>,
#[serde(default)]
pub generation_id: Option<String>,
#[serde(default)]
pub assistant_action_type: Option<String>,
}
#[derive(Debug, Clone, Serialize, sqlx::FromRow)]
pub struct ChatMessageSlim {
pub id: Uuid,
pub role: String,
pub content: String,
pub sent_at: DateTime<Utc>,
pub client_msg_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tips_amount_usd: Option<f64>,
}
pub struct ChatRepo<'a> {
pub pool: &'a PgPool,
}
impl<'a> ChatRepo<'a> {
pub async fn create_session(
&self,
user_id: Uuid,
instance_id: Uuid,
) -> Result<ChatSession, sqlx::Error> {
self.create_session_with_metadata(user_id, instance_id, serde_json::json!({}))
.await
}
pub async fn create_session_with_metadata(
&self,
user_id: Uuid,
instance_id: Uuid,
metadata: serde_json::Value,
) -> Result<ChatSession, sqlx::Error> {
sqlx::query_as::<_, ChatSession>(
"INSERT INTO engine.chat_sessions (user_id, instance_id, metadata) \
VALUES ($1, $2, $3) \
RETURNING *",
)
.bind(user_id)
.bind(instance_id)
.bind(metadata)
.fetch_one(self.pool)
.await
}
pub async fn get_session(&self, session_id: Uuid) -> Result<Option<ChatSession>, sqlx::Error> {
sqlx::query_as::<_, ChatSession>("SELECT * FROM engine.chat_sessions WHERE id = $1")
.bind(session_id)
.fetch_optional(self.pool)
.await
}
pub async fn create_or_resume(
&self,
user_id: Uuid,
instance_id: Uuid,
) -> Result<ChatSession, sqlx::Error> {
if let Some(existing) = sqlx::query_as::<_, ChatSession>(
"SELECT * FROM engine.chat_sessions \
WHERE user_id = $1 AND instance_id = $2 \
ORDER BY last_active_at DESC LIMIT 1",
)
.bind(user_id)
.bind(instance_id)
.fetch_optional(self.pool)
.await?
{
sqlx::query("UPDATE engine.chat_sessions SET last_active_at = now() WHERE id = $1")
.bind(existing.id)
.execute(self.pool)
.await?;
return Ok(existing);
}
self.create_session(user_id, instance_id).await
}
pub async fn resume_latest_session(
&self,
user_id: Uuid,
instance_id: Uuid,
) -> Result<Option<ChatSession>, sqlx::Error> {
sqlx::query_as::<_, ChatSession>(
"UPDATE engine.chat_sessions SET last_active_at = now() \
WHERE id = ( \
SELECT id FROM engine.chat_sessions \
WHERE user_id = $1 AND instance_id = $2 \
ORDER BY last_active_at DESC \
LIMIT 1 \
) \
RETURNING *",
)
.bind(user_id)
.bind(instance_id)
.fetch_optional(self.pool)
.await
}
pub async fn append_message(
&self,
session_id: Uuid,
role: &str,
content: &str,
) -> Result<Uuid, sqlx::Error> {
let mut tx = self.pool.begin().await?;
let id: Uuid = sqlx::query_scalar(
"INSERT INTO engine.chat_messages (session_id, role, content) \
VALUES ($1, $2, $3) RETURNING id",
)
.bind(session_id)
.bind(role)
.bind(content)
.fetch_one(&mut *tx)
.await?;
sqlx::query("UPDATE engine.chat_sessions SET last_active_at = now() WHERE id = $1")
.bind(session_id)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(id)
}
pub async fn history(
&self,
session_id: Uuid,
limit: i64,
offset: i64,
) -> Result<Vec<ChatMessage>, sqlx::Error> {
let mut rows = sqlx::query_as::<_, ChatMessage>(
"SELECT * FROM engine.chat_messages \
WHERE session_id = $1 \
ORDER BY sent_at DESC \
LIMIT $2 OFFSET $3",
)
.bind(session_id)
.bind(limit)
.bind(offset)
.fetch_all(self.pool)
.await?;
rows.reverse();
Ok(rows)
}
pub async fn history_slim(
&self,
session_id: Uuid,
limit: i64,
offset: i64,
) -> Result<Vec<ChatMessageSlim>, sqlx::Error> {
let mut rows = sqlx::query_as::<_, ChatMessageSlim>(
"SELECT id, role, content, sent_at, client_msg_id, \
(metadata->>'tips_amount_usd')::float8 AS tips_amount_usd \
FROM engine.chat_messages \
WHERE session_id = $1 \
ORDER BY sent_at DESC \
LIMIT $2 OFFSET $3",
)
.bind(session_id)
.bind(limit)
.bind(offset)
.fetch_all(self.pool)
.await?;
rows.reverse();
Ok(rows)
}
pub async fn list_sessions(&self, user_id: Uuid) -> Result<Vec<ChatSession>, sqlx::Error> {
sqlx::query_as::<_, ChatSession>(
"SELECT * FROM engine.chat_sessions \
WHERE user_id = $1 \
ORDER BY last_active_at DESC",
)
.bind(user_id)
.fetch_all(self.pool)
.await
}
pub async fn recent_turn_pairs(
&self,
session_id: Uuid,
cutoff: DateTime<Utc>,
limit: u8,
) -> Result<Vec<(String, String)>, sqlx::Error> {
let fetch_n: i64 = (limit as i64) * 2 + 2;
let rows: Vec<(String, String)> = sqlx::query_as(
"SELECT role, content \
FROM engine.chat_messages \
WHERE session_id = $1 \
AND sent_at < $2 \
AND truncated = FALSE \
AND role IN ('user', 'gift_user', 'assistant') \
ORDER BY sent_at DESC \
LIMIT $3",
)
.bind(session_id)
.bind(cutoff)
.bind(fetch_n)
.fetch_all(self.pool)
.await?;
let mut chrono = rows;
chrono.reverse();
let mut pairs: Vec<(String, String)> = Vec::new();
let mut i = 0;
while i + 1 < chrono.len() {
let (role_a, content_a) = &chrono[i];
let (role_b, content_b) = &chrono[i + 1];
if (role_a == "user" || role_a == "gift_user") && role_b == "assistant" {
pairs.push((content_a.clone(), content_b.clone()));
i += 2;
} else {
i += 1;
}
}
let want = limit as usize;
if pairs.len() > want {
let drop = pairs.len() - want;
pairs.drain(..drop);
}
Ok(pairs)
}
pub async fn recent_turn_pairs_before_message(
&self,
session_id: Uuid,
message_id: Uuid,
limit: u8,
) -> Result<Vec<(String, String)>, sqlx::Error> {
let fetch_n: i64 = (limit as i64) * 2 + 2;
let rows: Vec<(String, String)> = sqlx::query_as(
"SELECT role, content \
FROM engine.chat_messages \
WHERE session_id = $1 \
AND sent_at < (SELECT sent_at FROM engine.chat_messages WHERE id = $2) \
AND truncated = FALSE \
AND role IN ('user', 'gift_user', 'assistant') \
ORDER BY sent_at DESC \
LIMIT $3",
)
.bind(session_id)
.bind(message_id)
.bind(fetch_n)
.fetch_all(self.pool)
.await?;
let mut chrono = rows;
chrono.reverse();
let mut pairs: Vec<(String, String)> = Vec::new();
let mut i = 0;
while i + 1 < chrono.len() {
let (role_a, content_a) = &chrono[i];
let (role_b, content_b) = &chrono[i + 1];
if (role_a == "user" || role_a == "gift_user") && role_b == "assistant" {
pairs.push((content_a.clone(), content_b.clone()));
i += 2;
} else {
i += 1;
}
}
let want = limit as usize;
if pairs.len() > want {
let drop = pairs.len() - want;
pairs.drain(..drop);
}
Ok(pairs)
}
}
#[derive(Debug, Clone)]
pub struct FilterAudit {
pub pre_filter_content: String,
pub filter_model: String,
pub filter_triggers: serde_json::Value,
pub f_client_msg_id: String,
pub f_generation_id: Option<String>,
}
#[derive(Debug, Clone)]
pub struct AssistantInsert {
pub id: Uuid,
pub content: String,
pub assistant_action_type: String, pub continues_from_message_id: Option<Uuid>,
pub truncated: bool,
pub model: Option<String>,
pub usage: Option<serde_json::Value>,
pub generation_id: Option<String>,
pub filter_audit: Option<FilterAudit>,
pub metadata: Option<serde_json::Value>,
}
#[derive(Debug)]
pub enum UpsertUserOutcome {
Inserted { message_id: Uuid },
Replay {
user_message_id: Uuid,
ghost: bool,
assistant_chain: Vec<ChatMessage>,
},
DuplicateInProgress { user_message_id: Uuid },
}
impl<'a> ChatRepo<'a> {
pub async fn upsert_user_message_idempotent(
&self,
session_id: Uuid,
content: &str,
client_msg_id: &str,
role: &str,
metadata: Option<&serde_json::Value>,
) -> Result<UpsertUserOutcome, sqlx::Error> {
let mut tx = self.pool.begin().await?;
let existing: Option<ChatMessage> = sqlx::query_as::<_, ChatMessage>(
"SELECT * FROM engine.chat_messages \
WHERE session_id = $1 AND client_msg_id = $2 \
AND role IN ('user', 'gift_user') \
LIMIT 1",
)
.bind(session_id)
.bind(client_msg_id)
.fetch_optional(&mut *tx)
.await?;
if let Some(row) = existing {
let assistant_chain: Vec<ChatMessage> = sqlx::query_as::<_, ChatMessage>(
"SELECT * FROM engine.chat_messages \
WHERE user_message_id = $1 AND role = 'assistant' \
ORDER BY sent_at ASC",
)
.bind(row.id)
.fetch_all(&mut *tx)
.await?;
tx.commit().await?;
return Ok(if !assistant_chain.is_empty() {
UpsertUserOutcome::Replay {
user_message_id: row.id,
ghost: false,
assistant_chain,
}
} else if row.ghost_decision {
UpsertUserOutcome::Replay {
user_message_id: row.id,
ghost: true,
assistant_chain: vec![],
}
} else {
UpsertUserOutcome::DuplicateInProgress {
user_message_id: row.id,
}
});
}
let id: Uuid = sqlx::query_scalar(
"INSERT INTO engine.chat_messages \
(session_id, role, content, client_msg_id, metadata) \
VALUES ($1, $2, $3, $4, $5) RETURNING id",
)
.bind(session_id)
.bind(role)
.bind(content)
.bind(client_msg_id)
.bind(metadata)
.fetch_one(&mut *tx)
.await?;
sqlx::query("UPDATE engine.chat_sessions SET last_active_at = now() WHERE id = $1")
.bind(session_id)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(UpsertUserOutcome::Inserted { message_id: id })
}
pub async fn mark_user_message_ghosted(
&self,
user_message_id: Uuid,
) -> Result<(), sqlx::Error> {
sqlx::query(
"UPDATE engine.chat_messages SET ghost_decision = true \
WHERE id = $1 AND role = 'user' AND ghost_decision = false",
)
.bind(user_message_id)
.execute(self.pool)
.await?;
Ok(())
}
pub async fn insert_assistant_batch(
&self,
session_id: Uuid,
user_message_id: Uuid,
rows: &[AssistantInsert],
) -> Result<(), sqlx::Error> {
if rows.is_empty() {
return Ok(());
}
let mut tx = self.pool.begin().await?;
for row in rows {
let (pre_filter, filter_model, filter_triggers, f_client_msg_id, f_generation_id) =
match &row.filter_audit {
Some(a) => (
Some(a.pre_filter_content.as_str()),
Some(a.filter_model.as_str()),
(!a.filter_triggers.is_null()).then_some(&a.filter_triggers),
Some(a.f_client_msg_id.as_str()),
a.f_generation_id.as_deref(),
),
None => (None, None, None, None, None),
};
sqlx::query(
"INSERT INTO engine.chat_messages \
(id, session_id, role, content, user_message_id, \
continues_from_message_id, truncated, model, usage, generation_id, \
assistant_action_type, \
pre_filter_content, filter_model, filter_triggers, \
f_client_msg_id, f_generation_id, metadata) \
VALUES ($1, $2, 'assistant', $3, $4, $5, $6, $7, $8, $9, $10, \
$11, $12, $13, $14, $15, $16)",
)
.bind(row.id)
.bind(session_id)
.bind(&row.content)
.bind(user_message_id)
.bind(row.continues_from_message_id)
.bind(row.truncated)
.bind(&row.model)
.bind(&row.usage)
.bind(&row.generation_id)
.bind(&row.assistant_action_type)
.bind(pre_filter)
.bind(filter_model)
.bind(filter_triggers)
.bind(f_client_msg_id)
.bind(f_generation_id)
.bind(&row.metadata)
.execute(&mut *tx)
.await?;
}
sqlx::query("UPDATE engine.chat_sessions SET last_active_at = now() WHERE id = $1")
.bind(session_id)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[sqlx::test(migrations = "./migrations")]
async fn create_then_retrieve_session(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let user_id = Uuid::new_v4();
let instance_id = Uuid::new_v4();
let s = repo.create_session(user_id, instance_id).await.unwrap();
let loaded = repo.get_session(s.id).await.unwrap().unwrap();
assert_eq!(loaded.user_id, user_id);
assert_eq!(loaded.instance_id, Some(instance_id));
assert_eq!(loaded.lead_score, 0.0);
assert!(!loaded.is_converted);
}
#[sqlx::test(migrations = "./migrations")]
async fn append_message_and_history_roundtrip(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let user_id = Uuid::new_v4();
let instance_id = Uuid::new_v4();
let s = repo.create_session(user_id, instance_id).await.unwrap();
repo.append_message(s.id, "user", "hello").await.unwrap();
repo.append_message(s.id, "assistant", "hi there")
.await
.unwrap();
repo.append_message(s.id, "user", "how are you?")
.await
.unwrap();
let history = repo.history(s.id, 50, 0).await.unwrap();
assert_eq!(history.len(), 3);
assert_eq!(history[0].role, "user");
assert_eq!(history[0].content, "hello");
assert_eq!(history[1].role, "assistant");
assert_eq!(history[2].content, "how are you?");
}
#[sqlx::test(migrations = "./migrations")]
async fn history_slim_returns_role_content_sent_at_in_order(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let user_id = Uuid::new_v4();
let instance_id = Uuid::new_v4();
let s = repo.create_session(user_id, instance_id).await.unwrap();
repo.append_message(s.id, "user", "alpha").await.unwrap();
repo.append_message(s.id, "assistant", "beta")
.await
.unwrap();
repo.append_message(s.id, "user", "gamma").await.unwrap();
let slim = repo.history_slim(s.id, 50, 0).await.unwrap();
assert_eq!(slim.len(), 3);
assert_eq!(slim[0].role, "user");
assert_eq!(slim[0].content, "alpha");
assert_eq!(slim[1].role, "assistant");
assert_eq!(slim[1].content, "beta");
assert_eq!(slim[2].role, "user");
assert_eq!(slim[2].content, "gamma");
}
#[sqlx::test(migrations = "./migrations")]
async fn history_slim_respects_limit_and_offset(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let user_id = Uuid::new_v4();
let instance_id = Uuid::new_v4();
let s = repo.create_session(user_id, instance_id).await.unwrap();
for n in 0..5 {
repo.append_message(s.id, "user", &format!("m{n}"))
.await
.unwrap();
}
let page = repo.history_slim(s.id, 2, 0).await.unwrap();
assert_eq!(
page.iter().map(|m| m.content.as_str()).collect::<Vec<_>>(),
vec!["m3", "m4"]
);
let page = repo.history_slim(s.id, 2, 2).await.unwrap();
assert_eq!(
page.iter().map(|m| m.content.as_str()).collect::<Vec<_>>(),
vec!["m1", "m2"]
);
}
#[sqlx::test(migrations = "./migrations")]
async fn list_sessions_for_user(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let user_id = Uuid::new_v4();
let other_user = Uuid::new_v4();
let i1 = Uuid::new_v4();
let i2 = Uuid::new_v4();
let i3 = Uuid::new_v4();
repo.create_session(user_id, i1).await.unwrap();
repo.create_session(user_id, i2).await.unwrap();
repo.create_session(other_user, i3).await.unwrap();
let sessions = repo.list_sessions(user_id).await.unwrap();
assert_eq!(sessions.len(), 2);
assert!(sessions.iter().all(|s| s.user_id == user_id));
}
#[sqlx::test(migrations = "./migrations")]
async fn create_or_resume_returns_existing(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let user_id = Uuid::new_v4();
let instance_id = Uuid::new_v4();
let first = repo.create_session(user_id, instance_id).await.unwrap();
let resumed = repo.create_or_resume(user_id, instance_id).await.unwrap();
assert_eq!(first.id, resumed.id);
}
#[sqlx::test(migrations = "./migrations")]
async fn upsert_user_message_idempotent_first_insert(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let user_id = Uuid::new_v4();
let instance_id = Uuid::new_v4();
let s = repo.create_session(user_id, instance_id).await.unwrap();
let outcome = repo
.upsert_user_message_idempotent(
s.id,
"hello",
"01J0000000000000000000000A",
"user",
None,
)
.await
.unwrap();
match outcome {
UpsertUserOutcome::Inserted { message_id } => {
assert_ne!(message_id, Uuid::nil());
}
other => panic!("expected Inserted, got {other:?}"),
}
}
#[sqlx::test(migrations = "./migrations")]
async fn upsert_user_message_idempotent_replay_after_done(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let user_id = Uuid::new_v4();
let instance_id = Uuid::new_v4();
let s = repo.create_session(user_id, instance_id).await.unwrap();
let first = match repo
.upsert_user_message_idempotent(
s.id,
"hello",
"01J0000000000000000000000A",
"user",
None,
)
.await
.unwrap()
{
UpsertUserOutcome::Inserted { message_id } => message_id,
o => panic!("expected Inserted, got {o:?}"),
};
repo.insert_assistant_batch(
s.id,
first,
&[AssistantInsert {
id: Uuid::new_v4(),
content: "hi back".into(),
assistant_action_type: "reply".into(),
continues_from_message_id: None,
truncated: false,
model: Some("x-ai/grok-4-fast".into()),
usage: Some(
serde_json::json!({"prompt_tokens":3,"completion_tokens":2,"total_tokens":5}),
),
generation_id: Some("gen-1".into()),
filter_audit: None,
metadata: None,
}],
)
.await
.unwrap();
let outcome = repo
.upsert_user_message_idempotent(
s.id,
"hello",
"01J0000000000000000000000A",
"user",
None,
)
.await
.unwrap();
match outcome {
UpsertUserOutcome::Replay {
user_message_id,
ghost,
assistant_chain,
} => {
assert_eq!(user_message_id, first);
assert!(!ghost);
assert_eq!(assistant_chain.len(), 1);
assert_eq!(assistant_chain[0].content, "hi back");
}
other => panic!("expected Replay, got {other:?}"),
}
}
#[sqlx::test(migrations = "./migrations")]
async fn upsert_user_message_idempotent_409_when_no_assistant_and_not_ghost(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let user_id = Uuid::new_v4();
let instance_id = Uuid::new_v4();
let s = repo.create_session(user_id, instance_id).await.unwrap();
let first = match repo
.upsert_user_message_idempotent(
s.id,
"hello",
"01J0000000000000000000000A",
"user",
None,
)
.await
.unwrap()
{
UpsertUserOutcome::Inserted { message_id } => message_id,
o => panic!("expected Inserted, got {o:?}"),
};
match repo
.upsert_user_message_idempotent(
s.id,
"hello",
"01J0000000000000000000000A",
"user",
None,
)
.await
.unwrap()
{
UpsertUserOutcome::DuplicateInProgress { user_message_id } => {
assert_eq!(user_message_id, first);
}
other => panic!("expected DuplicateInProgress, got {other:?}"),
}
}
#[sqlx::test(migrations = "./migrations")]
async fn upsert_user_message_idempotent_replay_when_ghost(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let user_id = Uuid::new_v4();
let instance_id = Uuid::new_v4();
let s = repo.create_session(user_id, instance_id).await.unwrap();
let first = match repo
.upsert_user_message_idempotent(
s.id,
"hello",
"01J0000000000000000000000A",
"user",
None,
)
.await
.unwrap()
{
UpsertUserOutcome::Inserted { message_id } => message_id,
o => panic!("expected Inserted, got {o:?}"),
};
repo.mark_user_message_ghosted(first).await.unwrap();
match repo
.upsert_user_message_idempotent(
s.id,
"hello",
"01J0000000000000000000000A",
"user",
None,
)
.await
.unwrap()
{
UpsertUserOutcome::Replay {
ghost,
assistant_chain,
..
} => {
assert!(ghost);
assert!(assistant_chain.is_empty());
}
other => panic!("expected Replay, got {other:?}"),
}
}
#[sqlx::test(migrations = "./migrations")]
async fn resume_latest_session_returns_latest_and_bumps(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let user_id = Uuid::new_v4();
let instance_id = Uuid::new_v4();
assert!(repo
.resume_latest_session(user_id, instance_id)
.await
.unwrap()
.is_none());
let _older = sqlx::query_scalar::<_, Uuid>(
"INSERT INTO engine.chat_sessions (user_id, instance_id, last_active_at) \
VALUES ($1, $2, now() - interval '1 hour') RETURNING id",
)
.bind(user_id)
.bind(instance_id)
.fetch_one(&pool)
.await
.unwrap();
let newer = sqlx::query_scalar::<_, Uuid>(
"INSERT INTO engine.chat_sessions (user_id, instance_id, last_active_at) \
VALUES ($1, $2, now() - interval '1 minute') RETURNING id",
)
.bind(user_id)
.bind(instance_id)
.fetch_one(&pool)
.await
.unwrap();
let before: DateTime<Utc> =
sqlx::query_scalar("SELECT last_active_at FROM engine.chat_sessions WHERE id = $1")
.bind(newer)
.fetch_one(&pool)
.await
.unwrap();
let resumed = repo
.resume_latest_session(user_id, instance_id)
.await
.unwrap()
.expect("resume the most-recent session");
assert_eq!(resumed.id, newer, "must resume the most-recent session");
assert!(
resumed.last_active_at >= before,
"last_active_at must be bumped"
);
}
#[sqlx::test(migrations = "./migrations")]
async fn upsert_user_message_writes_role_user_and_no_metadata(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let s = repo
.create_session(Uuid::new_v4(), Uuid::new_v4())
.await
.unwrap();
let outcome = repo
.upsert_user_message_idempotent(s.id, "hi", "01J0000000000000000000000A", "user", None)
.await
.unwrap();
match outcome {
UpsertUserOutcome::Inserted { .. } => {}
other => panic!("expected Inserted, got {other:?}"),
}
let (role, metadata): (String, Option<serde_json::Value>) =
sqlx::query_as("SELECT role, metadata FROM engine.chat_messages WHERE session_id = $1")
.bind(s.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(role, "user");
assert!(
metadata.is_none(),
"metadata should be NULL on plain user rows"
);
}
#[sqlx::test(migrations = "./migrations")]
async fn upsert_user_message_writes_gift_user_role_and_tip_metadata(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let s = repo
.create_session(Uuid::new_v4(), Uuid::new_v4())
.await
.unwrap();
let meta = serde_json::json!({ "tips_amount_usd": 20.0, "tier": "gold" });
repo.upsert_user_message_idempotent(
s.id,
"(打赏 $20)",
"01J0000000000000000000000B",
"gift_user",
Some(&meta),
)
.await
.unwrap();
let (role, metadata): (String, serde_json::Value) =
sqlx::query_as("SELECT role, metadata FROM engine.chat_messages WHERE session_id = $1")
.bind(s.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(role, "gift_user");
assert_eq!(metadata["tips_amount_usd"].as_f64(), Some(20.0));
assert_eq!(metadata["tier"], serde_json::json!("gold"));
}
#[sqlx::test(migrations = "./migrations")]
async fn upsert_user_message_replay_finds_gift_user_row(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let s = repo
.create_session(Uuid::new_v4(), Uuid::new_v4())
.await
.unwrap();
let meta = serde_json::json!({ "tips_amount_usd": 5.0 });
repo.upsert_user_message_idempotent(
s.id,
"(打赏 $5)",
"01J0000000000000000000000C",
"gift_user",
Some(&meta),
)
.await
.unwrap();
let outcome = repo
.upsert_user_message_idempotent(
s.id,
"(打赏 $5)",
"01J0000000000000000000000C",
"gift_user",
Some(&meta),
)
.await
.unwrap();
match outcome {
UpsertUserOutcome::DuplicateInProgress { .. } => {}
other => panic!("expected DuplicateInProgress, got {other:?}"),
}
}
#[sqlx::test(migrations = "./migrations")]
#[allow(clippy::type_complexity)] async fn assistant_batch_round_trips_filter_audit(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let s = repo
.create_session(Uuid::new_v4(), Uuid::new_v4())
.await
.unwrap();
let user_msg_id = match repo
.upsert_user_message_idempotent(s.id, "hi", "01J0000000000000000000001A", "user", None)
.await
.unwrap()
{
UpsertUserOutcome::Inserted { message_id } => message_id,
other => panic!("{other:?}"),
};
let triggers = serde_json::json!({
"random": 0.3,
"models": ["deepseek/deepseek-v4-flash"],
"traits": { "any": ["nsfw_boost"], "when": "absent" }
});
let row = AssistantInsert {
id: Uuid::new_v4(),
content: "filtered reply".into(),
assistant_action_type: "reply".into(),
continues_from_message_id: None,
truncated: false,
model: Some("anthropic/claude-sonnet-4.6".into()),
usage: None,
generation_id: Some("gen_chat_xyz".into()),
filter_audit: Some(FilterAudit {
pre_filter_content: "raw reply".into(),
filter_model: "anthropic/claude-haiku-4.5".into(),
filter_triggers: triggers.clone(),
f_client_msg_id: "f_01J0000000000000000000001Z".into(),
f_generation_id: Some("gen_filter_abc".into()),
}),
metadata: None,
};
repo.insert_assistant_batch(s.id, user_msg_id, &[row])
.await
.unwrap();
let (content, pre_filter, filter_model, filter_triggers, f_client, f_gen): (
String,
Option<String>,
Option<String>,
Option<serde_json::Value>,
Option<String>,
Option<String>,
) = sqlx::query_as(
"SELECT content, pre_filter_content, filter_model, filter_triggers, \
f_client_msg_id, f_generation_id \
FROM engine.chat_messages WHERE role = 'assistant' AND session_id = $1",
)
.bind(s.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(content, "filtered reply");
assert_eq!(pre_filter.as_deref(), Some("raw reply"));
assert_eq!(filter_model.as_deref(), Some("anthropic/claude-haiku-4.5"));
assert_eq!(filter_triggers, Some(triggers));
assert_eq!(f_client.as_deref(), Some("f_01J0000000000000000000001Z"));
assert_eq!(f_gen.as_deref(), Some("gen_filter_abc"));
}
#[sqlx::test(migrations = "./migrations")]
async fn assistant_batch_filter_audit_columns_default_null(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let s = repo
.create_session(Uuid::new_v4(), Uuid::new_v4())
.await
.unwrap();
let user_msg_id = match repo
.upsert_user_message_idempotent(s.id, "hi", "01J0000000000000000000002A", "user", None)
.await
.unwrap()
{
UpsertUserOutcome::Inserted { message_id } => message_id,
other => panic!("{other:?}"),
};
let row = AssistantInsert {
id: Uuid::new_v4(),
content: "plain reply".into(),
assistant_action_type: "reply".into(),
continues_from_message_id: None,
truncated: false,
model: Some("anthropic/claude-sonnet-4.6".into()),
usage: None,
generation_id: Some("gen_chat_xyz".into()),
filter_audit: None,
metadata: None,
};
repo.insert_assistant_batch(s.id, user_msg_id, &[row])
.await
.unwrap();
let n: i64 = sqlx::query_scalar(
"SELECT count(*) FROM engine.chat_messages \
WHERE role='assistant' AND session_id=$1 \
AND pre_filter_content IS NULL AND filter_model IS NULL \
AND filter_triggers IS NULL AND f_client_msg_id IS NULL \
AND f_generation_id IS NULL",
)
.bind(s.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(n, 1);
}
#[sqlx::test(migrations = "./migrations")]
async fn migration_0019_adds_chat_messages_columns(pool: PgPool) {
for col in [
"metadata",
"pre_filter_content",
"filter_model",
"filter_triggers",
"f_client_msg_id",
"f_generation_id",
] {
let q = format!("SELECT {col} FROM engine.chat_messages LIMIT 0");
sqlx::query(&q)
.execute(&pool)
.await
.unwrap_or_else(|e| panic!("expected column {col} on engine.chat_messages: {e}"));
}
let idx_count: i64 = sqlx::query_scalar(
"SELECT count(*) FROM pg_indexes \
WHERE schemaname = 'engine' \
AND tablename = 'chat_messages' \
AND indexname IN ('chat_messages_tips_amount_idx',
'chat_messages_f_client_msg_id_uidx')",
)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(idx_count, 2, "both new indexes should exist");
}
#[sqlx::test(migrations = "./migrations")]
async fn assistant_batch_filter_audit_with_none_generation_id_writes_null(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let s = repo
.create_session(Uuid::new_v4(), Uuid::new_v4())
.await
.unwrap();
let user_msg_id = match repo
.upsert_user_message_idempotent(s.id, "hi", "01J0000000000000000000003A", "user", None)
.await
.unwrap()
{
UpsertUserOutcome::Inserted { message_id } => message_id,
other => panic!("{other:?}"),
};
let row = AssistantInsert {
id: Uuid::new_v4(),
content: "filtered no gen".into(),
assistant_action_type: "reply".into(),
continues_from_message_id: None,
truncated: false,
model: Some("anthropic/claude-sonnet-4.6".into()),
usage: None,
generation_id: Some("gen_chat_xyz".into()),
filter_audit: Some(FilterAudit {
pre_filter_content: "raw".into(),
filter_model: "anthropic/claude-haiku-4.5".into(),
filter_triggers: serde_json::json!({}),
f_client_msg_id: "f_01J0000000000000000000003Z".into(),
f_generation_id: None,
}),
metadata: None,
};
repo.insert_assistant_batch(s.id, user_msg_id, &[row])
.await
.unwrap();
let (pre_filter, f_gen): (Option<String>, Option<String>) = sqlx::query_as(
"SELECT pre_filter_content, f_generation_id \
FROM engine.chat_messages \
WHERE role='assistant' AND session_id=$1",
)
.bind(s.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(pre_filter.as_deref(), Some("raw"));
assert!(
f_gen.is_none(),
"f_generation_id should be NULL when None inside Some(FilterAudit)"
);
}
#[sqlx::test(migrations = "./migrations")]
async fn assistant_batch_filter_triggers_json_null_persists_as_sql_null(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let s = repo
.create_session(Uuid::new_v4(), Uuid::new_v4())
.await
.unwrap();
let user_msg_id = match repo
.upsert_user_message_idempotent(s.id, "hi", "01J0000000000000000000004A", "user", None)
.await
.unwrap()
{
UpsertUserOutcome::Inserted { message_id } => message_id,
other => panic!("{other:?}"),
};
let row = AssistantInsert {
id: Uuid::new_v4(),
content: "filtered empty-trigger".into(),
assistant_action_type: "reply".into(),
continues_from_message_id: None,
truncated: false,
model: Some("anthropic/claude-sonnet-4.6".into()),
usage: None,
generation_id: Some("gen_chat_xyz".into()),
filter_audit: Some(FilterAudit {
pre_filter_content: "raw".into(),
filter_model: "anthropic/claude-haiku-4.5".into(),
filter_triggers: serde_json::Value::Null,
f_client_msg_id: "f_01J0000000000000000000004Z".into(),
f_generation_id: None,
}),
metadata: None,
};
repo.insert_assistant_batch(s.id, user_msg_id, &[row])
.await
.unwrap();
let (triggers_is_null, model_is_set): (bool, bool) = sqlx::query_as(
"SELECT filter_triggers IS NULL, filter_model IS NOT NULL \
FROM engine.chat_messages \
WHERE role='assistant' AND session_id=$1",
)
.bind(s.id)
.fetch_one(&pool)
.await
.unwrap();
assert!(triggers_is_null, "JSON null must persist as SQL NULL");
assert!(
model_is_set,
"filter_model retained as the filter-ran signal"
);
}
#[sqlx::test(migrations = "./migrations")]
async fn assistant_batch_persists_metadata_with_prompt_traits(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let s = repo
.create_session(Uuid::new_v4(), Uuid::new_v4())
.await
.unwrap();
let user_msg_id = match repo
.upsert_user_message_idempotent(s.id, "hi", "01J0000000000000000000004A", "user", None)
.await
.unwrap()
{
UpsertUserOutcome::Inserted { message_id } => message_id,
other => panic!("{other:?}"),
};
let metadata = serde_json::json!({ "prompt_traits": ["nsfw_boost", "tsundere"] });
let row = AssistantInsert {
id: Uuid::new_v4(),
content: "hi".into(),
assistant_action_type: "reply".into(),
continues_from_message_id: None,
truncated: false,
model: Some("anthropic/claude-sonnet-4.6".into()),
usage: None,
generation_id: Some("gen_chat_xyz".into()),
filter_audit: None,
metadata: Some(metadata.clone()),
};
repo.insert_assistant_batch(s.id, user_msg_id, &[row])
.await
.unwrap();
let m: Option<serde_json::Value> = sqlx::query_scalar(
"SELECT metadata FROM engine.chat_messages \
WHERE role='assistant' AND session_id=$1",
)
.bind(s.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(m, Some(metadata));
}
#[sqlx::test(migrations = "./migrations")]
async fn assistant_batch_metadata_default_null(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let s = repo
.create_session(Uuid::new_v4(), Uuid::new_v4())
.await
.unwrap();
let user_msg_id = match repo
.upsert_user_message_idempotent(s.id, "hi", "01J0000000000000000000005A", "user", None)
.await
.unwrap()
{
UpsertUserOutcome::Inserted { message_id } => message_id,
other => panic!("{other:?}"),
};
let row = AssistantInsert {
id: Uuid::new_v4(),
content: "hi".into(),
assistant_action_type: "reply".into(),
continues_from_message_id: None,
truncated: false,
model: Some("anthropic/claude-sonnet-4.6".into()),
usage: None,
generation_id: Some("gen_chat_xyz".into()),
filter_audit: None,
metadata: None,
};
repo.insert_assistant_batch(s.id, user_msg_id, &[row])
.await
.unwrap();
let m: Option<serde_json::Value> = sqlx::query_scalar(
"SELECT metadata FROM engine.chat_messages \
WHERE role='assistant' AND session_id=$1",
)
.bind(s.id)
.fetch_one(&pool)
.await
.unwrap();
assert!(m.is_none());
}
#[sqlx::test(migrations = "./migrations")]
async fn assistant_batch_persists_metadata_with_tier(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let s = repo
.create_session(Uuid::new_v4(), Uuid::new_v4())
.await
.unwrap();
let user_msg_id = match repo
.upsert_user_message_idempotent(s.id, "hi", "01J0000000000000000000006A", "user", None)
.await
.unwrap()
{
UpsertUserOutcome::Inserted { message_id } => message_id,
other => panic!("{other:?}"),
};
let metadata = serde_json::json!({
"prompt_traits": ["nsfw_boost"],
"tier": "gold"
});
let row = AssistantInsert {
id: Uuid::new_v4(),
content: "hi".into(),
assistant_action_type: "reply".into(),
continues_from_message_id: None,
truncated: false,
model: Some("anthropic/claude-sonnet-4.6".into()),
usage: None,
generation_id: Some("gen_chat_xyz".into()),
filter_audit: None,
metadata: Some(metadata.clone()),
};
repo.insert_assistant_batch(s.id, user_msg_id, &[row])
.await
.unwrap();
let m: Option<serde_json::Value> = sqlx::query_scalar(
"SELECT metadata FROM engine.chat_messages \
WHERE role='assistant' AND session_id=$1",
)
.bind(s.id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(m, Some(metadata));
}
#[sqlx::test(migrations = "./migrations")]
async fn recent_turn_pairs_empty_session_returns_empty(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let s = repo
.create_session(Uuid::new_v4(), Uuid::new_v4())
.await
.unwrap();
let cutoff = Utc::now() + chrono::Duration::seconds(60);
let pairs = repo.recent_turn_pairs(s.id, cutoff, 3).await.unwrap();
assert!(pairs.is_empty());
}
#[sqlx::test(migrations = "./migrations")]
async fn recent_turn_pairs_single_pair_returned(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let s = repo
.create_session(Uuid::new_v4(), Uuid::new_v4())
.await
.unwrap();
let user_msg_id = match repo
.upsert_user_message_idempotent(s.id, "hi", "01J0000000000000000010001A", "user", None)
.await
.unwrap()
{
UpsertUserOutcome::Inserted { message_id } => message_id,
other => panic!("expected Inserted, got {other:?}"),
};
let row = AssistantInsert {
id: Uuid::new_v4(),
content: "hi back".into(),
assistant_action_type: "reply".into(),
continues_from_message_id: None,
truncated: false,
model: Some("test-model".into()),
usage: None,
generation_id: None,
filter_audit: None,
metadata: None,
};
repo.insert_assistant_batch(s.id, user_msg_id, &[row])
.await
.unwrap();
let cutoff = Utc::now() + chrono::Duration::seconds(60);
let pairs = repo.recent_turn_pairs(s.id, cutoff, 3).await.unwrap();
assert_eq!(pairs, vec![("hi".to_string(), "hi back".to_string())]);
}
#[sqlx::test(migrations = "./migrations")]
async fn recent_turn_pairs_skips_truncated_assistant(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let s = repo
.create_session(Uuid::new_v4(), Uuid::new_v4())
.await
.unwrap();
let u1 = match repo
.upsert_user_message_idempotent(s.id, "u1", "01J0000000000000000020001A", "user", None)
.await
.unwrap()
{
UpsertUserOutcome::Inserted { message_id } => message_id,
other => panic!("expected Inserted, got {other:?}"),
};
repo.insert_assistant_batch(
s.id,
u1,
&[AssistantInsert {
id: Uuid::new_v4(),
content: "a1".into(),
assistant_action_type: "reply".into(),
continues_from_message_id: None,
truncated: true,
model: Some("m".into()),
usage: None,
generation_id: None,
filter_audit: None,
metadata: None,
}],
)
.await
.unwrap();
let u2 = match repo
.upsert_user_message_idempotent(s.id, "u2", "01J0000000000000000020003A", "user", None)
.await
.unwrap()
{
UpsertUserOutcome::Inserted { message_id } => message_id,
other => panic!("expected Inserted, got {other:?}"),
};
repo.insert_assistant_batch(
s.id,
u2,
&[AssistantInsert {
id: Uuid::new_v4(),
content: "a2".into(),
assistant_action_type: "reply".into(),
continues_from_message_id: None,
truncated: false,
model: Some("m".into()),
usage: None,
generation_id: None,
filter_audit: None,
metadata: None,
}],
)
.await
.unwrap();
let cutoff = Utc::now() + chrono::Duration::seconds(60);
let pairs = repo.recent_turn_pairs(s.id, cutoff, 3).await.unwrap();
assert_eq!(pairs, vec![("u2".to_string(), "a2".to_string())]);
}
#[sqlx::test(migrations = "./migrations")]
async fn recent_turn_pairs_returns_latest_three_when_more_exist(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let s = repo
.create_session(Uuid::new_v4(), Uuid::new_v4())
.await
.unwrap();
for n in 0..5u8 {
let u_ulid = format!("01J000000000000000003{n}001A");
let user_id = match repo
.upsert_user_message_idempotent(s.id, &format!("u{n}"), &u_ulid, "user", None)
.await
.unwrap()
{
UpsertUserOutcome::Inserted { message_id } => message_id,
other => panic!("expected Inserted, got {other:?}"),
};
repo.insert_assistant_batch(
s.id,
user_id,
&[AssistantInsert {
id: Uuid::new_v4(),
content: format!("a{n}"),
assistant_action_type: "reply".into(),
continues_from_message_id: None,
truncated: false,
model: Some("m".into()),
usage: None,
generation_id: None,
filter_audit: None,
metadata: None,
}],
)
.await
.unwrap();
}
let cutoff = Utc::now() + chrono::Duration::seconds(60);
let pairs = repo.recent_turn_pairs(s.id, cutoff, 3).await.unwrap();
assert_eq!(
pairs,
vec![
("u2".to_string(), "a2".to_string()),
("u3".to_string(), "a3".to_string()),
("u4".to_string(), "a4".to_string()),
]
);
}
#[sqlx::test(migrations = "./migrations")]
async fn recent_turn_pairs_cutoff_excludes_current_turn(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let s = repo
.create_session(Uuid::new_v4(), Uuid::new_v4())
.await
.unwrap();
let u1 = match repo
.upsert_user_message_idempotent(s.id, "u1", "01J0000000000000000040001A", "user", None)
.await
.unwrap()
{
UpsertUserOutcome::Inserted { message_id } => message_id,
other => panic!("expected Inserted, got {other:?}"),
};
repo.insert_assistant_batch(
s.id,
u1,
&[AssistantInsert {
id: Uuid::new_v4(),
content: "a1".into(),
assistant_action_type: "reply".into(),
continues_from_message_id: None,
truncated: false,
model: Some("m".into()),
usage: None,
generation_id: None,
filter_audit: None,
metadata: None,
}],
)
.await
.unwrap();
let cutoff = Utc::now();
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
let _ = repo
.upsert_user_message_idempotent(
s.id,
"current",
"01J0000000000000000040003A",
"user",
None,
)
.await
.unwrap();
let pairs = repo.recent_turn_pairs(s.id, cutoff, 3).await.unwrap();
assert_eq!(pairs, vec![("u1".to_string(), "a1".to_string())]);
}
#[sqlx::test(migrations = "./migrations")]
async fn recent_turn_pairs_drops_orphan_user_at_end(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let s = repo
.create_session(Uuid::new_v4(), Uuid::new_v4())
.await
.unwrap();
let u1 = match repo
.upsert_user_message_idempotent(s.id, "u1", "01J0000000000000000050001A", "user", None)
.await
.unwrap()
{
UpsertUserOutcome::Inserted { message_id } => message_id,
other => panic!("expected Inserted, got {other:?}"),
};
repo.insert_assistant_batch(
s.id,
u1,
&[AssistantInsert {
id: Uuid::new_v4(),
content: "a1".into(),
assistant_action_type: "reply".into(),
continues_from_message_id: None,
truncated: false,
model: Some("m".into()),
usage: None,
generation_id: None,
filter_audit: None,
metadata: None,
}],
)
.await
.unwrap();
let _ = repo
.upsert_user_message_idempotent(s.id, "u2", "01J0000000000000000050003A", "user", None)
.await
.unwrap();
let cutoff = Utc::now() + chrono::Duration::seconds(60);
let pairs = repo.recent_turn_pairs(s.id, cutoff, 3).await.unwrap();
assert_eq!(pairs, vec![("u1".to_string(), "a1".to_string())]);
}
#[sqlx::test(migrations = "./migrations")]
async fn recent_turn_pairs_includes_gift_user_pair(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let s = repo
.create_session(Uuid::new_v4(), Uuid::new_v4())
.await
.unwrap();
let meta = serde_json::json!({"tips_amount_usd": 20.0});
let u1 = match repo
.upsert_user_message_idempotent(
s.id,
"(打赏 $20)",
"01J0000000000000000060001A",
"gift_user",
Some(&meta),
)
.await
.unwrap()
{
UpsertUserOutcome::Inserted { message_id } => message_id,
other => panic!("expected Inserted, got {other:?}"),
};
repo.insert_assistant_batch(
s.id,
u1,
&[AssistantInsert {
id: Uuid::new_v4(),
content: "thanks!".into(),
assistant_action_type: "reply".into(),
continues_from_message_id: None,
truncated: false,
model: Some("m".into()),
usage: None,
generation_id: None,
filter_audit: None,
metadata: None,
}],
)
.await
.unwrap();
let cutoff = Utc::now() + chrono::Duration::seconds(60);
let pairs = repo.recent_turn_pairs(s.id, cutoff, 3).await.unwrap();
assert_eq!(
pairs,
vec![("(打赏 $20)".to_string(), "thanks!".to_string())]
);
}
#[sqlx::test(migrations = "./migrations")]
async fn recent_turn_pairs_before_message_uses_msg_sent_at_not_now(pool: PgPool) {
let repo = ChatRepo { pool: &pool };
let s = repo
.create_session(uuid::Uuid::new_v4(), uuid::Uuid::new_v4())
.await
.unwrap();
let u1 = match repo
.upsert_user_message_idempotent(s.id, "u1", "01J0000000000000000090001A", "user", None)
.await
.unwrap()
{
UpsertUserOutcome::Inserted { message_id } => message_id,
other => panic!("expected Inserted, got {other:?}"),
};
repo.insert_assistant_batch(
s.id,
u1,
&[AssistantInsert {
id: Uuid::new_v4(),
content: "a1".into(),
assistant_action_type: "reply".into(),
truncated: false,
continues_from_message_id: None,
model: Some("m".into()),
usage: None,
generation_id: None,
filter_audit: None,
metadata: None,
}],
)
.await
.unwrap();
let current = match repo
.upsert_user_message_idempotent(
s.id,
"current",
"01J0000000000000000090002A",
"user",
None,
)
.await
.unwrap()
{
UpsertUserOutcome::Inserted { message_id } => message_id,
other => panic!("expected Inserted, got {other:?}"),
};
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
let _ = repo
.upsert_user_message_idempotent(
s.id,
"later",
"01J0000000000000000090003A",
"user",
None,
)
.await
.unwrap();
let pairs = repo
.recent_turn_pairs_before_message(s.id, current, 3)
.await
.unwrap();
assert_eq!(pairs, vec![("u1".to_string(), "a1".to_string())]);
}
}