use serde::{Deserialize, Serialize};
use serde_json::Value;
use super::content::ContentBlock;
mod delivery;
pub use delivery::{
DeliveryBoundary, DeliveryGranularity, DeliveryMode, PendingMessageRecord,
pending_queue_revision, select_pending_for_freeze, select_pending_for_freeze_for_run,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum Role {
System,
User,
Assistant,
Tool,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum Visibility {
#[default]
All,
Internal,
}
impl Visibility {
pub fn is_default(&self) -> bool {
*self == Visibility::All
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct MessageMetadata {
#[serde(skip_serializing_if = "Option::is_none")]
pub run_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub step_index: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub sender_agent_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub compaction: Option<CompactionMark>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageRecord {
pub message_id: String,
pub thread_id: String,
pub seq: u64,
pub message: Message,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub produced_by_run_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub step_index: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tool_call_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub created_at: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub compaction: Option<CompactionMark>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct CompactionMark {
pub from_seq: u64,
pub to_seq: u64,
}
impl MessageRecord {
pub fn from_message(thread_id: impl Into<String>, seq: u64, mut message: Message) -> Self {
let message_id = message.id.clone().unwrap_or_else(gen_message_id);
if message.id.is_none() {
message.id = Some(message_id.clone());
}
let produced_by_run_id = message
.metadata
.as_ref()
.and_then(|metadata| metadata.run_id.clone());
let step_index = message
.metadata
.as_ref()
.and_then(|metadata| metadata.step_index);
let tool_call_id = message.tool_call_id.clone();
let compaction = message
.metadata
.as_ref()
.and_then(|metadata| metadata.compaction);
Self {
message_id,
thread_id: thread_id.into(),
seq,
message,
produced_by_run_id,
step_index,
tool_call_id,
created_at: None,
compaction,
}
}
}
#[must_use]
pub fn effective_messages(records: &[MessageRecord]) -> Vec<Message> {
let summary_seqs: std::collections::HashSet<u64> = records
.iter()
.filter(|r| r.compaction.is_some())
.map(|r| r.seq)
.collect();
let mut candidates: Vec<(u64, u64, &Message)> = records
.iter()
.filter_map(|r| r.compaction.map(|c| (c.from_seq, c.to_seq, &r.message)))
.collect();
candidates.sort_by(|a, b| a.0.cmp(&b.0).then(b.1.cmp(&a.1)));
let mut intervals: Vec<(u64, u64, &Message)> = Vec::new();
for cand in candidates {
let dominated = intervals
.iter()
.any(|kept| kept.0 <= cand.0 && cand.1 <= kept.1);
if !dominated {
intervals.push(cand);
}
}
intervals.sort_by_key(|(from, _, _)| *from);
let covered = |seq: u64| {
intervals
.iter()
.any(|(from, to, _)| seq >= *from && seq <= *to)
};
let mut out = Vec::new();
let mut next_interval = intervals.iter().peekable();
for record in records.iter().filter(|r| !summary_seqs.contains(&r.seq)) {
while next_interval
.peek()
.is_some_and(|(from, _, _)| *from <= record.seq)
{
let (_, _, summary) = next_interval.next().unwrap();
out.push((*summary).clone());
}
if covered(record.seq) {
continue; }
out.push(record.message.clone());
}
for (_, _, summary) in next_interval {
out.push((*summary).clone());
}
out
}
pub fn strip_unpaired_tool_calls_from_view(messages: &mut Vec<Message>) {
use std::collections::HashSet;
let mut answered: HashSet<String> = HashSet::new();
let mut retracted: HashSet<String> = HashSet::new();
for message in messages.iter() {
if message.role != Role::Tool {
continue;
}
if let Some(call_id) = message.tool_call_id.clone() {
match message.visibility {
Visibility::All => {
answered.insert(call_id);
}
Visibility::Internal => {
retracted.insert(call_id);
}
}
}
}
for message in messages.iter_mut() {
if message.role != Role::Assistant {
continue;
}
if let Some(ref mut calls) = message.tool_calls {
calls.retain(|call| answered.contains(&call.id) && !retracted.contains(&call.id));
if calls.is_empty() {
message.tool_calls = None;
}
}
}
messages.retain(|message| {
if message.role != Role::Tool {
return true;
}
match message.tool_call_id.as_ref() {
Some(call_id) => message.visibility == Visibility::All && !retracted.contains(call_id),
None => true,
}
});
}
pub fn strip_unpaired_tool_calls_from_owned_view(mut messages: Vec<Message>) -> Vec<Message> {
strip_unpaired_tool_calls_from_view(&mut messages);
messages
}
#[must_use]
pub fn effective_committed_view(committed: Vec<Message>, thread_id: &str) -> Vec<Message> {
let records: Vec<MessageRecord> = committed
.into_iter()
.enumerate()
.map(|(index, message)| MessageRecord::from_message(thread_id, index as u64 + 1, message))
.collect();
strip_unpaired_tool_calls_from_owned_view(effective_messages(&records))
}
pub fn gen_message_id() -> String {
uuid::Uuid::now_v7().to_string()
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Message {
#[serde(skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
pub role: Role,
pub content: Vec<ContentBlock>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tool_calls: Option<Vec<ToolCall>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tool_call_id: Option<String>,
#[serde(default, skip_serializing_if = "Visibility::is_default")]
pub visibility: Visibility,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub metadata: Option<MessageMetadata>,
}
impl Message {
pub fn system(text: impl Into<String>) -> Self {
Self {
id: Some(gen_message_id()),
role: Role::System,
content: vec![ContentBlock::text(text)],
tool_calls: None,
tool_call_id: None,
visibility: Visibility::All,
metadata: None,
}
}
pub fn internal_system(text: impl Into<String>) -> Self {
Self {
id: Some(gen_message_id()),
role: Role::System,
content: vec![ContentBlock::text(text)],
tool_calls: None,
tool_call_id: None,
visibility: Visibility::Internal,
metadata: None,
}
}
pub fn internal_user(text: impl Into<String>) -> Self {
Self {
id: Some(gen_message_id()),
role: Role::User,
content: vec![ContentBlock::text(text)],
tool_calls: None,
tool_call_id: None,
visibility: Visibility::Internal,
metadata: None,
}
}
pub fn user(text: impl Into<String>) -> Self {
Self {
id: Some(gen_message_id()),
role: Role::User,
content: vec![ContentBlock::text(text)],
tool_calls: None,
tool_call_id: None,
visibility: Visibility::All,
metadata: None,
}
}
pub fn user_with_content(content: Vec<ContentBlock>) -> Self {
Self {
id: Some(gen_message_id()),
role: Role::User,
content,
tool_calls: None,
tool_call_id: None,
visibility: Visibility::All,
metadata: None,
}
}
pub fn assistant(text: impl Into<String>) -> Self {
Self {
id: Some(gen_message_id()),
role: Role::Assistant,
content: vec![ContentBlock::text(text)],
tool_calls: None,
tool_call_id: None,
visibility: Visibility::All,
metadata: None,
}
}
pub fn assistant_with_tool_calls(text: impl Into<String>, calls: Vec<ToolCall>) -> Self {
Self {
id: Some(gen_message_id()),
role: Role::Assistant,
content: vec![ContentBlock::text(text)],
tool_calls: if calls.is_empty() { None } else { Some(calls) },
tool_call_id: None,
visibility: Visibility::All,
metadata: None,
}
}
pub fn tool(call_id: impl Into<String>, text: impl Into<String>) -> Self {
Self {
id: Some(gen_message_id()),
role: Role::Tool,
content: vec![ContentBlock::text(text)],
tool_calls: None,
tool_call_id: Some(call_id.into()),
visibility: Visibility::All,
metadata: None,
}
}
pub fn tool_with_content(call_id: impl Into<String>, content: Vec<ContentBlock>) -> Self {
Self {
id: Some(gen_message_id()),
role: Role::Tool,
content,
tool_calls: None,
tool_call_id: Some(call_id.into()),
visibility: Visibility::All,
metadata: None,
}
}
pub fn text(&self) -> String {
super::content::extract_text(&self.content)
}
pub fn is_internal_tool_result(&self) -> bool {
self.role == Role::Tool && self.visibility == Visibility::Internal
}
#[must_use]
pub fn with_id(mut self, id: String) -> Self {
self.id = Some(id);
self
}
#[must_use]
pub fn with_metadata(mut self, metadata: MessageMetadata) -> Self {
self.metadata = Some(metadata);
self
}
#[must_use]
pub fn produced_by_run_id(&self) -> Option<&str> {
self.metadata
.as_ref()
.and_then(|metadata| metadata.run_id.as_deref())
}
pub fn mark_produced_by(&mut self, run_id: &str, step_index: Option<u32>) {
let metadata = self.metadata.get_or_insert_with(MessageMetadata::default);
if metadata.run_id.is_none() {
metadata.run_id = Some(run_id.to_string());
}
if metadata.step_index.is_none() {
metadata.step_index = step_index;
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ToolCall {
pub id: String,
pub name: String,
pub arguments: Value,
}
impl ToolCall {
pub fn new(id: impl Into<String>, name: impl Into<String>, arguments: Value) -> Self {
Self {
id: id.into(),
name: name.into(),
arguments,
}
}
}
#[cfg(test)]
#[path = "message/tests.rs"]
mod tests;