use objectiveai_sdk::cli::command::agents::logs::read::all::{
AssistantResponsePart, AssistantResponsePartType, ClientNotificationPart,
ClientNotificationPartType, ResponseItem, ToolResponsePart, ToolResponsePartType,
};
use sqlx::Row as _;
use super::super::{Error, Pool};
use super::row::MessageTable;
struct MsgRow {
id: i64,
response_id: String,
table_kind: MessageTable,
agent_instance_hierarchy: String,
timestamp_delivered: i64,
sender_agent_instance_hierarchy: Option<String>,
message_queue_id: Option<i64>,
timestamp_queued: Option<i64>,
message_queue_key: Option<String>,
function_name: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum BlockClass {
AgentCompletionRequest,
VectorCompletionRequest,
FunctionExecutionRequest,
ClientNotification,
AssistantResponse,
ToolResponse,
}
fn block_class(t: MessageTable) -> BlockClass {
match t {
MessageTable::AgentCompletionRequest => BlockClass::AgentCompletionRequest,
MessageTable::VectorCompletionRequest => BlockClass::VectorCompletionRequest,
MessageTable::FunctionExecutionRequest => BlockClass::FunctionExecutionRequest,
MessageTable::MessageQueueText
| MessageTable::MessageQueueImage
| MessageTable::MessageQueueAudio
| MessageTable::MessageQueueVideo
| MessageTable::MessageQueueFile => BlockClass::ClientNotification,
MessageTable::ToolResponse
| MessageTable::ToolResponseContentText
| MessageTable::ToolResponseContentImage
| MessageTable::ToolResponseContentAudio
| MessageTable::ToolResponseContentVideo
| MessageTable::ToolResponseContentFile => BlockClass::ToolResponse,
MessageTable::AssistantResponseRefusal
| MessageTable::AssistantResponseReasoning
| MessageTable::AssistantResponseToolCalls
| MessageTable::AssistantResponseContentText
| MessageTable::AssistantResponseContentImage
| MessageTable::AssistantResponseContentAudio
| MessageTable::AssistantResponseContentVideo
| MessageTable::AssistantResponseContentFile => BlockClass::AssistantResponse,
}
}
fn client_notification_kind(t: MessageTable) -> Option<ClientNotificationPartType> {
match t {
MessageTable::MessageQueueText => Some(ClientNotificationPartType::Text),
MessageTable::MessageQueueImage => Some(ClientNotificationPartType::Image),
MessageTable::MessageQueueAudio => Some(ClientNotificationPartType::Audio),
MessageTable::MessageQueueVideo => Some(ClientNotificationPartType::Video),
MessageTable::MessageQueueFile => Some(ClientNotificationPartType::File),
_ => None,
}
}
fn assistant_response_kind(t: MessageTable) -> Option<AssistantResponsePartType> {
match t {
MessageTable::AssistantResponseRefusal => Some(AssistantResponsePartType::Refusal),
MessageTable::AssistantResponseReasoning => Some(AssistantResponsePartType::Reasoning),
MessageTable::AssistantResponseToolCalls => Some(AssistantResponsePartType::ToolCall),
MessageTable::AssistantResponseContentText => Some(AssistantResponsePartType::Text),
MessageTable::AssistantResponseContentImage => Some(AssistantResponsePartType::Image),
MessageTable::AssistantResponseContentAudio => Some(AssistantResponsePartType::Audio),
MessageTable::AssistantResponseContentVideo => Some(AssistantResponsePartType::Video),
MessageTable::AssistantResponseContentFile => Some(AssistantResponsePartType::File),
_ => None,
}
}
fn tool_response_kind(t: MessageTable) -> Option<ToolResponsePartType> {
match t {
MessageTable::ToolResponse => Some(ToolResponsePartType::Container),
MessageTable::ToolResponseContentText => Some(ToolResponsePartType::Text),
MessageTable::ToolResponseContentImage => Some(ToolResponsePartType::Image),
MessageTable::ToolResponseContentAudio => Some(ToolResponsePartType::Audio),
MessageTable::ToolResponseContentVideo => Some(ToolResponsePartType::Video),
MessageTable::ToolResponseContentFile => Some(ToolResponsePartType::File),
_ => None,
}
}
const SELECT_SHAPE: &str = "SELECT \
m.\"index\" AS id, \
m.response_id, \
m.\"table\" AS table_kind, \
m.agent_instance_hierarchy, \
m.\"timestamp\" AS timestamp_delivered, \
CASE m.\"table\" \
WHEN 'message_queue_text' THEN mq.sender_agent_instance_hierarchy \
WHEN 'message_queue_image' THEN mq.sender_agent_instance_hierarchy \
WHEN 'message_queue_audio' THEN mq.sender_agent_instance_hierarchy \
WHEN 'message_queue_video' THEN mq.sender_agent_instance_hierarchy \
WHEN 'message_queue_file' THEN mq.sender_agent_instance_hierarchy \
WHEN 'agent_completion_request' THEN acr.sender_agent_instance_hierarchy \
WHEN 'vector_completion_request' THEN vcr.sender_agent_instance_hierarchy \
WHEN 'function_execution_request' THEN fer.sender_agent_instance_hierarchy \
ELSE NULL \
END AS sender_agent_instance_hierarchy, \
mq.id AS message_queue_id, \
mq.enqueued_at AS timestamp_queued, \
mq.key AS message_queue_key, \
COALESCE(atc.function_name, '') AS function_name";
const FROM_JOINS: &str = "FROM logs.messages m \
LEFT JOIN message_queue_contents mqc \
ON m.row_index = mqc.id \
AND m.\"table\" IN ( \
'message_queue_text', 'message_queue_image', 'message_queue_audio', \
'message_queue_video', 'message_queue_file' \
) \
LEFT JOIN message_queue mq ON mqc.message_queue_id = mq.id \
LEFT JOIN logs.agent_completion_requests acr \
ON m.response_id = acr.response_id \
AND m.\"table\" = 'agent_completion_request' \
LEFT JOIN logs.vector_completion_requests vcr \
ON m.response_id = vcr.response_id \
AND m.\"table\" = 'vector_completion_request' \
LEFT JOIN logs.function_execution_requests fer \
ON m.response_id = fer.response_id \
AND m.\"table\" = 'function_execution_request' \
LEFT JOIN logs.assistant_response_tool_calls atc \
ON m.response_id = atc.response_id \
AND m.row_index = atc.\"index\" \
AND m.row_sub_index = atc.tool_call_index \
AND m.\"table\" = 'assistant_response_tool_calls'";
fn row_into_msg(r: &sqlx::postgres::PgRow) -> Result<MsgRow, Error> {
Ok(MsgRow {
id: r.try_get("id")?,
response_id: r.try_get("response_id")?,
table_kind: r.try_get("table_kind")?,
agent_instance_hierarchy: r.try_get("agent_instance_hierarchy")?,
timestamp_delivered: r.try_get("timestamp_delivered")?,
sender_agent_instance_hierarchy: r.try_get("sender_agent_instance_hierarchy")?,
message_queue_id: r.try_get("message_queue_id")?,
timestamp_queued: r.try_get("timestamp_queued")?,
message_queue_key: r.try_get("message_queue_key")?,
function_name: r.try_get("function_name")?,
})
}
fn coalesce_into_blocks(rows: Vec<MsgRow>) -> Vec<ResponseItem> {
let mut out: Vec<ResponseItem> = Vec::new();
let mut cur_class: Option<BlockClass> = None;
let mut cur_aih: String = String::new();
let mut cur_rid: String = String::new();
let mut cur_sender: Option<String> = None;
let mut cur_mq_id: Option<i64> = None;
let mut cur_timestamp_queued: Option<i64> = None;
let mut cur_key: Option<String> = None;
let mut cur_notification_parts: Vec<ClientNotificationPart> = Vec::new();
let mut cur_assistant_parts: Vec<AssistantResponsePart> = Vec::new();
let mut cur_tool_parts: Vec<ToolResponsePart> = Vec::new();
let flush = |class: Option<BlockClass>,
aih: &mut String,
rid: &mut String,
sender: &mut Option<String>,
mq_id: &mut Option<i64>,
timestamp_queued: &mut Option<i64>,
key: &mut Option<String>,
notification_parts: &mut Vec<ClientNotificationPart>,
assistant_parts: &mut Vec<AssistantResponsePart>,
tool_parts: &mut Vec<ToolResponsePart>,
out: &mut Vec<ResponseItem>| {
match class {
Some(BlockClass::ClientNotification) if !notification_parts.is_empty() => {
out.push(ResponseItem::ClientNotification {
agent_instance_hierarchy: std::mem::take(aih),
sender_agent_instance_hierarchy: sender.take().unwrap_or_default(),
response_id: std::mem::take(rid),
timestamp_queued: timestamp_queued.take().unwrap_or_default(),
key: key.take(),
parts: std::mem::take(notification_parts),
});
*mq_id = None;
}
Some(BlockClass::AssistantResponse) if !assistant_parts.is_empty() => {
out.push(ResponseItem::AssistantResponse {
agent_instance_hierarchy: std::mem::take(aih),
response_id: std::mem::take(rid),
parts: std::mem::take(assistant_parts),
});
}
Some(BlockClass::ToolResponse) if !tool_parts.is_empty() => {
out.push(ResponseItem::ToolResponse {
agent_instance_hierarchy: std::mem::take(aih),
response_id: std::mem::take(rid),
parts: std::mem::take(tool_parts),
});
}
_ => {
aih.clear();
rid.clear();
*sender = None;
*mq_id = None;
*timestamp_queued = None;
*key = None;
notification_parts.clear();
assistant_parts.clear();
tool_parts.clear();
}
}
};
for row in rows {
let class = block_class(row.table_kind);
match class {
BlockClass::AgentCompletionRequest => {
flush(
cur_class, &mut cur_aih, &mut cur_rid, &mut cur_sender,
&mut cur_mq_id, &mut cur_timestamp_queued, &mut cur_key,
&mut cur_notification_parts, &mut cur_assistant_parts,
&mut cur_tool_parts, &mut out,
);
out.push(ResponseItem::AgentCompletionRequest {
id: row.id,
agent_instance_hierarchy: row.agent_instance_hierarchy,
sender_agent_instance_hierarchy: row
.sender_agent_instance_hierarchy
.unwrap_or_default(),
timestamp_delivered: row.timestamp_delivered,
response_id: row.response_id,
});
cur_class = None;
continue;
}
BlockClass::VectorCompletionRequest => {
flush(
cur_class, &mut cur_aih, &mut cur_rid, &mut cur_sender,
&mut cur_mq_id, &mut cur_timestamp_queued, &mut cur_key,
&mut cur_notification_parts, &mut cur_assistant_parts,
&mut cur_tool_parts, &mut out,
);
out.push(ResponseItem::VectorCompletionRequest {
id: row.id,
agent_instance_hierarchy: row.agent_instance_hierarchy,
sender_agent_instance_hierarchy: row
.sender_agent_instance_hierarchy
.unwrap_or_default(),
timestamp_delivered: row.timestamp_delivered,
response_id: row.response_id,
});
cur_class = None;
continue;
}
BlockClass::FunctionExecutionRequest => {
flush(
cur_class, &mut cur_aih, &mut cur_rid, &mut cur_sender,
&mut cur_mq_id, &mut cur_timestamp_queued, &mut cur_key,
&mut cur_notification_parts, &mut cur_assistant_parts,
&mut cur_tool_parts, &mut out,
);
out.push(ResponseItem::FunctionExecutionRequest {
id: row.id,
agent_instance_hierarchy: row.agent_instance_hierarchy,
sender_agent_instance_hierarchy: row
.sender_agent_instance_hierarchy
.unwrap_or_default(),
timestamp_delivered: row.timestamp_delivered,
response_id: row.response_id,
});
cur_class = None;
continue;
}
_ => {}
}
let boundary = cur_class != Some(class)
|| cur_aih != row.agent_instance_hierarchy
|| cur_rid != row.response_id
|| (class == BlockClass::ClientNotification
&& (cur_sender.as_deref() != row.sender_agent_instance_hierarchy.as_deref()
|| cur_mq_id != row.message_queue_id));
if boundary {
flush(
cur_class, &mut cur_aih, &mut cur_rid, &mut cur_sender,
&mut cur_mq_id, &mut cur_timestamp_queued, &mut cur_key,
&mut cur_notification_parts, &mut cur_assistant_parts,
&mut cur_tool_parts, &mut out,
);
cur_class = Some(class);
cur_aih = row.agent_instance_hierarchy.clone();
cur_rid = row.response_id.clone();
if class == BlockClass::ClientNotification {
cur_sender = row.sender_agent_instance_hierarchy.clone();
cur_mq_id = row.message_queue_id;
cur_timestamp_queued = row.timestamp_queued;
cur_key = row.message_queue_key.clone();
} else {
cur_sender = None;
cur_mq_id = None;
cur_timestamp_queued = None;
cur_key = None;
}
}
match class {
BlockClass::ClientNotification => {
let r#type = client_notification_kind(row.table_kind)
.expect("class invariant: ClientNotification maps to message_queue_*");
cur_notification_parts.push(ClientNotificationPart {
id: row.id,
timestamp_delivered: row.timestamp_delivered,
r#type,
});
}
BlockClass::AssistantResponse => {
let r#type = assistant_response_kind(row.table_kind)
.expect("class invariant: AssistantResponse maps to assistant_response_*");
cur_assistant_parts.push(AssistantResponsePart {
id: row.id,
timestamp_delivered: row.timestamp_delivered,
r#type,
function_name: row.function_name,
});
}
BlockClass::ToolResponse => {
let r#type = tool_response_kind(row.table_kind)
.expect("class invariant: ToolResponse maps to tool_response*");
cur_tool_parts.push(ToolResponsePart {
id: row.id,
timestamp_delivered: row.timestamp_delivered,
r#type,
});
}
_ => unreachable!("request classes handled above"),
}
}
flush(
cur_class, &mut cur_aih, &mut cur_rid, &mut cur_sender,
&mut cur_mq_id, &mut cur_timestamp_queued, &mut cur_key,
&mut cur_notification_parts, &mut cur_assistant_parts,
&mut cur_tool_parts, &mut out,
);
out
}
pub async fn read_all_for_hierarchy(
pool: &Pool,
agent_instance_hierarchy: &str,
after_id: Option<i64>,
limit: Option<i64>,
) -> Result<Vec<ResponseItem>, Error> {
let sql = format!(
"{select} {from} \
WHERE m.agent_instance_hierarchy = $1 \
AND m.\"index\" > COALESCE($2, 0) \
ORDER BY m.\"index\" ASC \
LIMIT $3",
select = SELECT_SHAPE,
from = FROM_JOINS,
);
let rows = sqlx::query(&sql)
.bind(agent_instance_hierarchy)
.bind(after_id)
.bind(limit)
.fetch_all(&**pool)
.await?;
let msg_rows: Vec<MsgRow> = rows.iter().map(row_into_msg).collect::<Result<_, _>>()?;
Ok(coalesce_into_blocks(msg_rows))
}
pub async fn read_pending_for_parent(
pool: &Pool,
parent_agent_instance_hierarchy: &str,
after_id: Option<i64>,
limit: Option<i64>,
) -> Result<Vec<ResponseItem>, Error> {
let sql = format!(
"WITH selected AS ( \
{select} \
{from} \
JOIN logs.messages_queue q \
ON q.spawned_agent_instance_hierarchy = m.agent_instance_hierarchy \
WHERE q.parent_agent_instance_hierarchy = $1 \
AND m.\"index\" > GREATEST(q.read_index, COALESCE($2, 0)) \
ORDER BY m.\"index\" ASC \
LIMIT $3 \
), \
maxes AS ( \
SELECT agent_instance_hierarchy AS spawned, MAX(id) AS max_id \
FROM selected \
GROUP BY agent_instance_hierarchy \
), \
bump AS ( \
UPDATE logs.messages_queue qq \
SET read_index = GREATEST(qq.read_index, mx.max_id) \
FROM maxes mx \
WHERE qq.parent_agent_instance_hierarchy = $1 \
AND qq.spawned_agent_instance_hierarchy = mx.spawned \
RETURNING 1 \
) \
SELECT s.id, s.response_id, s.table_kind, \
s.agent_instance_hierarchy, s.timestamp_delivered, \
s.sender_agent_instance_hierarchy, \
s.message_queue_id, s.timestamp_queued, \
s.message_queue_key \
FROM selected s \
ORDER BY s.id ASC",
select = SELECT_SHAPE,
from = FROM_JOINS,
);
let rows = sqlx::query(&sql)
.bind(parent_agent_instance_hierarchy)
.bind(after_id)
.bind(limit)
.fetch_all(&**pool)
.await?;
let msg_rows: Vec<MsgRow> = rows.iter().map(row_into_msg).collect::<Result<_, _>>()?;
Ok(coalesce_into_blocks(msg_rows))
}
pub async fn any_pending_matching_kinds(
pool: &Pool,
parent_agent_instance_hierarchy: &str,
after_id: Option<i64>,
kinds: Option<&[MessageTable]>,
) -> Result<bool, Error> {
let kinds_clause = match kinds {
Some(ks) if !ks.is_empty() => {
let list = ks
.iter()
.map(|k| format!("'{}'", k.schema_name()))
.collect::<Vec<_>>()
.join(", ");
format!("AND m.\"table\" IN ({list})")
}
_ => String::new(),
};
let sql = format!(
"SELECT EXISTS( \
SELECT 1 FROM logs.messages m \
JOIN logs.messages_queue q \
ON q.spawned_agent_instance_hierarchy = m.agent_instance_hierarchy \
WHERE q.parent_agent_instance_hierarchy = $1 \
AND m.\"index\" > GREATEST(q.read_index, COALESCE($2, 0)) \
{kinds_clause} \
)"
);
let exists: bool = sqlx::query_scalar(&sql)
.bind(parent_agent_instance_hierarchy)
.bind(after_id)
.fetch_one(&**pool)
.await?;
Ok(exists)
}