use objectiveai_sdk::agent::completions::message::{File, ImageUrl, InputAudio, VideoUrl};
use serde::Serialize;
use crate::db::{Error, Pool};
use super::row::{MessageTable, RowValue};
use super::shadow::WriteOp;
pub async fn write_value<'a>(
pool: &Pool,
op: WriteOp,
value: &RowValue<'a>,
timestamp: i64,
) -> Result<(), Error> {
match op {
WriteOp::Skip => Ok(()),
WriteOp::Insert => insert_value(pool, value, timestamp).await,
WriteOp::Update => update_value(pool, value).await,
}
}
async fn insert_value<'a>(
pool: &Pool,
value: &RowValue<'a>,
timestamp: i64,
) -> Result<(), Error> {
if let RowValue::MessageQueueContent {
response_id,
agent_instance_hierarchy,
message_queue_content_id,
} = *value
{
return insert_message_queue_content_with_msg(
pool,
response_id,
agent_instance_hierarchy,
message_queue_content_id,
timestamp,
)
.await;
}
let mt = value.message_table();
let hier = value.agent_instance_hierarchy();
let row_index = value.row_index();
let row_sub_index = value.row_sub_index();
let response_id = value.response_id();
match *value {
RowValue::MessageQueueContent { .. } => unreachable!(
"MessageQueueContent handled by early-return branch above"
),
RowValue::ToolResponse { tool_call_id, .. } => {
sqlx::query(
"WITH data_ins AS (\
INSERT INTO logs.tool_response (response_id, \"index\", tool_call_id) \
VALUES ($1, $2, $3) RETURNING response_id\
)\
INSERT INTO logs.messages \
(response_id, \"table\", row_index, row_sub_index, \
agent_instance_hierarchy, \"timestamp\") \
SELECT $1, $4, $5, $6, $7, $8 FROM data_ins",
)
.bind(response_id)
.bind(row_index)
.bind(tool_call_id)
.bind(mt)
.bind(row_index)
.bind(row_sub_index)
.bind(hier)
.bind(timestamp)
.execute(&**pool)
.await?;
}
RowValue::AssistantResponseRefusal { text, .. } => {
sqlx::query(
"WITH data_ins AS (\
INSERT INTO logs.assistant_response_refusal (response_id, \"index\", text) \
VALUES ($1, $2, $3) RETURNING response_id\
)\
INSERT INTO logs.messages \
(response_id, \"table\", row_index, row_sub_index, \
agent_instance_hierarchy, \"timestamp\") \
SELECT $1, $4, $5, $6, $7, $8 FROM data_ins",
)
.bind(response_id)
.bind(row_index)
.bind(text)
.bind(mt)
.bind(row_index)
.bind(row_sub_index)
.bind(hier)
.bind(timestamp)
.execute(&**pool)
.await?;
}
RowValue::AssistantResponseReasoning { text, .. } => {
sqlx::query(
"WITH data_ins AS (\
INSERT INTO logs.assistant_response_reasoning (response_id, \"index\", text) \
VALUES ($1, $2, $3) RETURNING response_id\
)\
INSERT INTO logs.messages \
(response_id, \"table\", row_index, row_sub_index, \
agent_instance_hierarchy, \"timestamp\") \
SELECT $1, $4, $5, $6, $7, $8 FROM data_ins",
)
.bind(response_id)
.bind(row_index)
.bind(text)
.bind(mt)
.bind(row_index)
.bind(row_sub_index)
.bind(hier)
.bind(timestamp)
.execute(&**pool)
.await?;
}
RowValue::AssistantResponseToolCalls {
tool_call_index, tool_call_id, function_name, arguments, ..
} => {
sqlx::query(
"WITH data_ins AS (\
INSERT INTO logs.assistant_response_tool_calls \
(response_id, \"index\", tool_call_index, tool_call_id, function_name, arguments) \
VALUES ($1, $2, $3, $4, $5, $6) RETURNING response_id\
)\
INSERT INTO logs.messages \
(response_id, \"table\", row_index, row_sub_index, \
agent_instance_hierarchy, \"timestamp\") \
SELECT $1, $7, $8, $9, $10, $11 FROM data_ins",
)
.bind(response_id)
.bind(row_index)
.bind(tool_call_index as i64)
.bind(tool_call_id)
.bind(function_name)
.bind(arguments)
.bind(mt)
.bind(row_index)
.bind(row_sub_index)
.bind(hier)
.bind(timestamp)
.execute(&**pool)
.await?;
}
RowValue::AssistantResponseContentText { text, .. } => {
insert_text_part_with_msg(pool, "logs.assistant_response_content_text", value, text, timestamp).await?;
}
RowValue::ToolResponseContentText { text, .. } => {
insert_text_part_with_msg(pool, "logs.tool_response_content_text", value, text, timestamp).await?;
}
RowValue::AssistantResponseContentImage { image_url, .. } => {
insert_image_part_with_msg(pool, "logs.assistant_response_content_image", value, image_url, timestamp).await?;
}
RowValue::ToolResponseContentImage { image_url, .. } => {
insert_image_part_with_msg(pool, "logs.tool_response_content_image", value, image_url, timestamp).await?;
}
RowValue::AssistantResponseContentAudio { input_audio, .. } => {
insert_audio_part_with_msg(pool, "logs.assistant_response_content_audio", value, input_audio, timestamp).await?;
}
RowValue::ToolResponseContentAudio { input_audio, .. } => {
insert_audio_part_with_msg(pool, "logs.tool_response_content_audio", value, input_audio, timestamp).await?;
}
RowValue::AssistantResponseContentVideo { video_url, .. } => {
insert_video_part_with_msg(pool, "logs.assistant_response_content_video", value, video_url, timestamp).await?;
}
RowValue::ToolResponseContentVideo { video_url, .. } => {
insert_video_part_with_msg(pool, "logs.tool_response_content_video", value, video_url, timestamp).await?;
}
RowValue::AssistantResponseContentFile { file, .. } => {
insert_file_part_with_msg(pool, "logs.assistant_response_content_file", value, file, timestamp).await?;
}
RowValue::ToolResponseContentFile { file, .. } => {
insert_file_part_with_msg(pool, "logs.tool_response_content_file", value, file, timestamp).await?;
}
}
Ok(())
}
async fn update_value<'a>(pool: &Pool, value: &RowValue<'a>) -> Result<(), Error> {
if matches!(value, RowValue::MessageQueueContent { .. }) {
return Ok(());
}
let mt = value.message_table();
let hier = value.agent_instance_hierarchy();
let row_index = value.row_index();
let row_sub_index = value.row_sub_index();
let response_id = value.response_id();
match *value {
RowValue::MessageQueueContent { .. } => unreachable!(
"MessageQueueContent handled by short-circuit above"
),
RowValue::ToolResponse { tool_call_id, .. } => {
run_update_with_downgrade(
pool,
"UPDATE logs.tool_response SET tool_call_id = $A \
WHERE response_id = $RESP AND \"index\" = $RI",
response_id, row_index, row_sub_index, mt, hier,
&[("A", BindVal::Str(tool_call_id))],
&[BindIdx::Resp, BindIdx::Ri],
).await?;
}
RowValue::AssistantResponseRefusal { text, .. } => {
run_update_with_downgrade(
pool,
"UPDATE logs.assistant_response_refusal SET text = $A \
WHERE response_id = $RESP AND \"index\" = $RI",
response_id, row_index, row_sub_index, mt, hier,
&[("A", BindVal::Str(text))],
&[BindIdx::Resp, BindIdx::Ri],
).await?;
}
RowValue::AssistantResponseReasoning { text, .. } => {
run_update_with_downgrade(
pool,
"UPDATE logs.assistant_response_reasoning SET text = $A \
WHERE response_id = $RESP AND \"index\" = $RI",
response_id, row_index, row_sub_index, mt, hier,
&[("A", BindVal::Str(text))],
&[BindIdx::Resp, BindIdx::Ri],
).await?;
}
RowValue::AssistantResponseToolCalls { tool_call_index, tool_call_id, function_name, arguments, .. } => {
run_update_with_downgrade(
pool,
"UPDATE logs.assistant_response_tool_calls SET tool_call_id = $A, function_name = $B, arguments = $C \
WHERE response_id = $RESP AND \"index\" = $RI AND tool_call_index = $RSI",
response_id, row_index, row_sub_index, mt, hier,
&[("A", BindVal::Str(tool_call_id)), ("B", BindVal::Str(function_name)), ("C", BindVal::Str(arguments))],
&[BindIdx::Resp, BindIdx::Ri, BindIdx::Rsi],
).await?;
let _ = tool_call_index;
}
RowValue::AssistantResponseContentText { text, .. }
| RowValue::ToolResponseContentText { text, .. } => {
let table = match *value {
RowValue::AssistantResponseContentText { .. } => "logs.assistant_response_content_text",
_ => "logs.tool_response_content_text",
};
let sql = format!(
"UPDATE {table} SET text = $A \
WHERE response_id = $RESP AND \"index\" = $RI AND part_index = $RSI"
);
run_update_with_downgrade(
pool, &sql,
response_id, row_index, row_sub_index, mt, hier,
&[("A", BindVal::Str(text))],
&[BindIdx::Resp, BindIdx::Ri, BindIdx::Rsi],
).await?;
}
RowValue::AssistantResponseContentImage { image_url, .. }
| RowValue::ToolResponseContentImage { image_url, .. } => {
let table = match *value {
RowValue::AssistantResponseContentImage { .. } => "logs.assistant_response_content_image",
_ => "logs.tool_response_content_image",
};
let detail = image_url.detail.as_ref().and_then(|d| serde_json::to_string(d).ok());
let sql = format!(
"UPDATE {table} SET url = $A, detail = $B \
WHERE response_id = $RESP AND \"index\" = $RI AND part_index = $RSI"
);
run_update_with_downgrade(
pool, &sql,
response_id, row_index, row_sub_index, mt, hier,
&[("A", BindVal::Str(image_url.url.as_str())), ("B", BindVal::OptString(detail))],
&[BindIdx::Resp, BindIdx::Ri, BindIdx::Rsi],
).await?;
}
RowValue::AssistantResponseContentAudio { input_audio, .. }
| RowValue::ToolResponseContentAudio { input_audio, .. } => {
let table = match *value {
RowValue::AssistantResponseContentAudio { .. } => "logs.assistant_response_content_audio",
_ => "logs.tool_response_content_audio",
};
let sql = format!(
"UPDATE {table} SET data = $A, format = $B \
WHERE response_id = $RESP AND \"index\" = $RI AND part_index = $RSI"
);
run_update_with_downgrade(
pool, &sql,
response_id, row_index, row_sub_index, mt, hier,
&[
("A", BindVal::Str(input_audio.data.as_str())),
("B", BindVal::Str(input_audio.format.as_str())),
],
&[BindIdx::Resp, BindIdx::Ri, BindIdx::Rsi],
).await?;
}
RowValue::AssistantResponseContentVideo { video_url, .. }
| RowValue::ToolResponseContentVideo { video_url, .. } => {
let table = match *value {
RowValue::AssistantResponseContentVideo { .. } => "logs.assistant_response_content_video",
_ => "logs.tool_response_content_video",
};
let sql = format!(
"UPDATE {table} SET url = $A \
WHERE response_id = $RESP AND \"index\" = $RI AND part_index = $RSI"
);
run_update_with_downgrade(
pool, &sql,
response_id, row_index, row_sub_index, mt, hier,
&[("A", BindVal::Str(video_url.url.as_str()))],
&[BindIdx::Resp, BindIdx::Ri, BindIdx::Rsi],
).await?;
}
RowValue::AssistantResponseContentFile { file, .. }
| RowValue::ToolResponseContentFile { file, .. } => {
let table = match *value {
RowValue::AssistantResponseContentFile { .. } => "logs.assistant_response_content_file",
_ => "logs.tool_response_content_file",
};
let sql = format!(
"UPDATE {table} SET file_data = $A, file_id = $B, filename = $C, file_url = $D \
WHERE response_id = $RESP AND \"index\" = $RI AND part_index = $RSI"
);
run_update_with_downgrade(
pool, &sql,
response_id, row_index, row_sub_index, mt, hier,
&[
("A", BindVal::OptStr(file.file_data.as_deref())),
("B", BindVal::OptStr(file.file_id.as_deref())),
("C", BindVal::OptStr(file.filename.as_deref())),
("D", BindVal::OptStr(file.file_url.as_deref())),
],
&[BindIdx::Resp, BindIdx::Ri, BindIdx::Rsi],
).await?;
}
}
Ok(())
}
async fn insert_text_part_with_msg<'a>(
pool: &Pool,
table: &str,
value: &RowValue<'a>,
text: &str,
timestamp: i64,
) -> Result<(), Error> {
let sql = format!(
"WITH data_ins AS (\
INSERT INTO {table} (response_id, \"index\", part_index, text) \
VALUES ($1, $2, $3, $4) RETURNING response_id\
)\
INSERT INTO logs.messages \
(response_id, \"table\", row_index, row_sub_index, \
agent_instance_hierarchy, \"timestamp\") \
SELECT $1, $5, $6, $7, $8, $9 FROM data_ins"
);
sqlx::query(&sql)
.bind(value.response_id())
.bind(value.row_index())
.bind(value.row_sub_index())
.bind(text)
.bind(value.message_table())
.bind(value.row_index())
.bind(value.row_sub_index())
.bind(value.agent_instance_hierarchy())
.bind(timestamp)
.execute(&**pool)
.await?;
Ok(())
}
async fn insert_image_part_with_msg<'a>(
pool: &Pool,
table: &str,
value: &RowValue<'a>,
image: &ImageUrl,
timestamp: i64,
) -> Result<(), Error> {
let detail = image.detail.as_ref().and_then(|d| serde_json::to_string(d).ok());
let sql = format!(
"WITH data_ins AS (\
INSERT INTO {table} (response_id, \"index\", part_index, url, detail) \
VALUES ($1, $2, $3, $4, $5) RETURNING response_id\
)\
INSERT INTO logs.messages \
(response_id, \"table\", row_index, row_sub_index, \
agent_instance_hierarchy, \"timestamp\") \
SELECT $1, $6, $7, $8, $9, $10 FROM data_ins"
);
sqlx::query(&sql)
.bind(value.response_id())
.bind(value.row_index())
.bind(value.row_sub_index())
.bind(image.url.as_str())
.bind(detail)
.bind(value.message_table())
.bind(value.row_index())
.bind(value.row_sub_index())
.bind(value.agent_instance_hierarchy())
.bind(timestamp)
.execute(&**pool)
.await?;
Ok(())
}
async fn insert_audio_part_with_msg<'a>(
pool: &Pool,
table: &str,
value: &RowValue<'a>,
audio: &InputAudio,
timestamp: i64,
) -> Result<(), Error> {
let sql = format!(
"WITH data_ins AS (\
INSERT INTO {table} (response_id, \"index\", part_index, data, format) \
VALUES ($1, $2, $3, $4, $5) RETURNING response_id\
)\
INSERT INTO logs.messages \
(response_id, \"table\", row_index, row_sub_index, \
agent_instance_hierarchy, \"timestamp\") \
SELECT $1, $6, $7, $8, $9, $10 FROM data_ins"
);
sqlx::query(&sql)
.bind(value.response_id())
.bind(value.row_index())
.bind(value.row_sub_index())
.bind(audio.data.as_str())
.bind(audio.format.as_str())
.bind(value.message_table())
.bind(value.row_index())
.bind(value.row_sub_index())
.bind(value.agent_instance_hierarchy())
.bind(timestamp)
.execute(&**pool)
.await?;
Ok(())
}
async fn insert_message_queue_content_with_msg(
pool: &Pool,
response_id: &str,
agent_instance_hierarchy: &str,
message_queue_content_id: i64,
timestamp: i64,
) -> Result<(), Error> {
sqlx::query(
"WITH content AS (\
SELECT id, kind, message_queue_id \
FROM message_queue_contents \
WHERE id = $1 \
), \
flip AS (\
UPDATE message_queue \
SET active = FALSE \
WHERE id = (SELECT message_queue_id FROM content) \
AND active = TRUE \
RETURNING id \
) \
INSERT INTO logs.messages \
(response_id, \"table\", row_index, row_sub_index, \
agent_instance_hierarchy, \"timestamp\") \
SELECT $2, \
CASE (SELECT kind FROM content) \
WHEN 'text' THEN 'message_queue_text'::logs.message_table \
WHEN 'image' THEN 'message_queue_image'::logs.message_table \
WHEN 'audio' THEN 'message_queue_audio'::logs.message_table \
WHEN 'video' THEN 'message_queue_video'::logs.message_table \
WHEN 'file' THEN 'message_queue_file'::logs.message_table \
END, \
$1, NULL, $3, $4 \
FROM content",
)
.bind(message_queue_content_id)
.bind(response_id)
.bind(agent_instance_hierarchy)
.bind(timestamp)
.execute(&**pool)
.await?;
Ok(())
}
async fn insert_video_part_with_msg<'a>(
pool: &Pool,
table: &str,
value: &RowValue<'a>,
video: &VideoUrl,
timestamp: i64,
) -> Result<(), Error> {
let sql = format!(
"WITH data_ins AS (\
INSERT INTO {table} (response_id, \"index\", part_index, url) \
VALUES ($1, $2, $3, $4) RETURNING response_id\
)\
INSERT INTO logs.messages \
(response_id, \"table\", row_index, row_sub_index, \
agent_instance_hierarchy, \"timestamp\") \
SELECT $1, $5, $6, $7, $8, $9 FROM data_ins"
);
sqlx::query(&sql)
.bind(value.response_id())
.bind(value.row_index())
.bind(value.row_sub_index())
.bind(video.url.as_str())
.bind(value.message_table())
.bind(value.row_index())
.bind(value.row_sub_index())
.bind(value.agent_instance_hierarchy())
.bind(timestamp)
.execute(&**pool)
.await?;
Ok(())
}
async fn insert_file_part_with_msg<'a>(
pool: &Pool,
table: &str,
value: &RowValue<'a>,
file: &File,
timestamp: i64,
) -> Result<(), Error> {
let sql = format!(
"WITH data_ins AS (\
INSERT INTO {table} (response_id, \"index\", part_index, file_data, file_id, filename, file_url) \
VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING response_id\
)\
INSERT INTO logs.messages \
(response_id, \"table\", row_index, row_sub_index, \
agent_instance_hierarchy, \"timestamp\") \
SELECT $1, $8, $9, $10, $11, $12 FROM data_ins"
);
sqlx::query(&sql)
.bind(value.response_id())
.bind(value.row_index())
.bind(value.row_sub_index())
.bind(file.file_data.as_deref())
.bind(file.file_id.as_deref())
.bind(file.filename.as_deref())
.bind(file.file_url.as_deref())
.bind(value.message_table())
.bind(value.row_index())
.bind(value.row_sub_index())
.bind(value.agent_instance_hierarchy())
.bind(timestamp)
.execute(&**pool)
.await?;
Ok(())
}
#[derive(Clone, Copy)]
enum BindIdx {
Resp,
Ri,
Rsi,
}
enum BindVal<'a> {
Str(&'a str),
OptStr(Option<&'a str>),
OptString(Option<String>),
Bool(bool),
}
#[allow(clippy::too_many_arguments)]
async fn run_update_with_downgrade<'a>(
pool: &Pool,
update_sql_template: &str,
response_id: &str,
row_index: i64,
row_sub_index: Option<i64>,
message_table: MessageTable,
agent_instance_hierarchy: &str,
extra_binds: &[(&str, BindVal<'a>)],
update_where_binds: &[BindIdx],
) -> Result<(), Error> {
let mut sql = update_sql_template.to_string();
let mut pos = 1usize;
let mut resp_pos: Option<usize> = None;
for slot in update_where_binds {
let idx = pos;
pos += 1;
match slot {
BindIdx::Resp => {
sql = sql.replace("$RESP", &format!("${idx}"));
resp_pos = Some(idx);
}
BindIdx::Ri => {
sql = sql.replace("$RI", &format!("${idx}"));
}
BindIdx::Rsi => {
sql = sql.replace("$RSI", &format!("${idx}"));
}
}
}
let resp_pos = resp_pos.expect("Resp bind required");
for (token, _val) in extra_binds {
let idx = pos;
pos += 1;
sql = sql.replace(&format!("${token}"), &format!("${idx}"));
}
let mt_pos = pos; pos += 1;
let ri_for_msg_pos = pos; pos += 1;
let rsi_for_msg_pos = pos; pos += 1;
let hier_pos = pos;
let final_sql = format!(
"WITH \
data_upd AS ({sql} RETURNING response_id),\
msg AS (\
SELECT \"index\" AS msg_index FROM logs.messages \
WHERE response_id = ${resp_pos} \
AND \"table\" = ${mt_pos} \
AND row_index IS NOT DISTINCT FROM ${ri_for_msg_pos} \
AND row_sub_index IS NOT DISTINCT FROM ${rsi_for_msg_pos}\
)\
UPDATE logs.messages_queue \
SET read_index = msg.msg_index - 1 \
FROM msg, data_upd \
WHERE spawned_agent_instance_hierarchy = ${hier_pos} \
AND read_index >= msg.msg_index",
);
let mut q = sqlx::query(&final_sql);
for slot in update_where_binds {
q = match slot {
BindIdx::Resp => q.bind(response_id),
BindIdx::Ri => q.bind(row_index),
BindIdx::Rsi => q.bind(row_sub_index),
};
}
for (_, val) in extra_binds {
q = match val {
BindVal::Str(s) => q.bind(*s),
BindVal::OptStr(s) => q.bind(*s),
BindVal::OptString(s) => q.bind(s.clone()),
BindVal::Bool(b) => q.bind(*b),
};
}
q = q.bind(message_table);
q = q.bind(row_index);
q = q.bind(row_sub_index);
q = q.bind(agent_instance_hierarchy);
q.execute(&**pool).await?;
Ok(())
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Tier {
Agent,
Vector,
Function,
}
impl Tier {
pub fn request_table(self) -> &'static str {
match self {
Tier::Agent => "logs.agent_completion_requests",
Tier::Vector => "logs.vector_completion_requests",
Tier::Function => "logs.function_execution_requests",
}
}
pub fn response_table(self) -> &'static str {
match self {
Tier::Agent => "logs.agent_completion_responses",
Tier::Vector => "logs.vector_completion_responses",
Tier::Function => "logs.function_execution_responses",
}
}
pub fn request_message_table(self) -> MessageTable {
match self {
Tier::Agent => MessageTable::AgentCompletionRequest,
Tier::Vector => MessageTable::VectorCompletionRequest,
Tier::Function => MessageTable::FunctionExecutionRequest,
}
}
}
pub async fn insert_request_blob<P: Serialize>(
pool: &Pool,
tier: Tier,
response_id: &str,
params: &P,
sender_agent_instance_hierarchy: &str,
timestamp: i64,
) -> Result<(), Error> {
let body = serde_json::to_value(params)?;
let sql = format!(
"INSERT INTO {table} \
(response_id, body, created_at, sender_agent_instance_hierarchy) \
VALUES ($1, $2, $3, $4)",
table = tier.request_table()
);
sqlx::query(&sql)
.bind(response_id)
.bind(sqlx::types::Json(body))
.bind(timestamp)
.bind(sender_agent_instance_hierarchy)
.execute(&**pool)
.await?;
Ok(())
}
pub async fn insert_request_messages_row(
pool: &Pool,
tier: Tier,
response_id: &str,
agent_instance_hierarchy: &str,
timestamp: i64,
) -> Result<(), Error> {
sqlx::query(
"INSERT INTO logs.messages \
(response_id, \"table\", row_index, row_sub_index, \
agent_instance_hierarchy, \"timestamp\") \
VALUES ($1, $2, NULL, NULL, $3, $4)",
)
.bind(response_id)
.bind(tier.request_message_table())
.bind(agent_instance_hierarchy)
.bind(timestamp)
.execute(&**pool)
.await?;
Ok(())
}
pub async fn insert_response_blob<C: Serialize>(
pool: &Pool,
tier: Tier,
response_id: &str,
chunk: &C,
created_at: i64,
) -> Result<(), Error> {
let body = serde_json::to_value(chunk)?;
let sql = format!(
"INSERT INTO {table} (response_id, body, created_at) VALUES ($1, $2, $3)",
table = tier.response_table()
);
sqlx::query(&sql)
.bind(response_id)
.bind(sqlx::types::Json(body))
.bind(created_at)
.execute(&**pool)
.await?;
Ok(())
}
pub async fn update_response_blob<C: Serialize>(
pool: &Pool,
tier: Tier,
response_id: &str,
chunk: &C,
created_at: i64,
) -> Result<(), Error> {
let body = serde_json::to_value(chunk)?;
let sql = format!(
"UPDATE {table} SET body = $2, created_at = $3 WHERE response_id = $1",
table = tier.response_table()
);
sqlx::query(&sql)
.bind(response_id)
.bind(sqlx::types::Json(body))
.bind(created_at)
.execute(&**pool)
.await?;
Ok(())
}