tirea_contract/thread/
changeset.rs1use crate::runtime::state::SerializedStateAction;
4use crate::runtime::RunStatus;
5use crate::storage::RunOrigin;
6use crate::thread::{Message, Thread};
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::collections::HashSet;
10use std::sync::Arc;
11use tirea_state::TrackedPatch;
12
13pub type Version = u64;
15
16#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
18pub enum CheckpointReason {
19 UserMessage,
20 AssistantTurnCommitted,
21 ToolResultsCommitted,
22 RunFinished,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct RunMeta {
32 pub agent_id: String,
33 pub origin: RunOrigin,
34 pub status: RunStatus,
35 #[serde(skip_serializing_if = "Option::is_none")]
36 pub parent_thread_id: Option<String>,
37 #[serde(skip_serializing_if = "Option::is_none")]
38 pub termination_code: Option<String>,
39 #[serde(skip_serializing_if = "Option::is_none")]
40 pub termination_detail: Option<String>,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct ThreadChangeSet {
46 pub run_id: String,
48 #[serde(skip_serializing_if = "Option::is_none")]
50 pub parent_run_id: Option<String>,
51 #[serde(default, skip_serializing_if = "Option::is_none")]
53 pub run_meta: Option<RunMeta>,
54 pub reason: CheckpointReason,
56 pub messages: Vec<Arc<Message>>,
58 pub patches: Vec<TrackedPatch>,
60 #[serde(default, skip_serializing_if = "Vec::is_empty", rename = "actions")]
62 pub state_actions: Vec<SerializedStateAction>,
63 #[serde(skip_serializing_if = "Option::is_none")]
65 pub snapshot: Option<Value>,
66}
67
68impl ThreadChangeSet {
69 pub fn from_parts(
71 run_id: impl Into<String>,
72 parent_run_id: Option<String>,
73 reason: CheckpointReason,
74 messages: Vec<Arc<Message>>,
75 patches: Vec<TrackedPatch>,
76 state_actions: Vec<SerializedStateAction>,
77 snapshot: Option<Value>,
78 ) -> Self {
79 Self {
80 run_id: run_id.into(),
81 parent_run_id,
82 run_meta: None,
83 reason,
84 messages,
85 patches,
86 state_actions,
87 snapshot,
88 }
89 }
90
91 #[must_use]
93 pub fn with_run_meta(mut self, meta: RunMeta) -> Self {
94 self.run_meta = Some(meta);
95 self
96 }
97
98 pub fn apply_to(&self, thread: &mut Thread) {
104 if let Some(ref snapshot) = self.snapshot {
105 thread.state = snapshot.clone();
106 thread.patches.clear();
107 }
108
109 let mut existing_ids: HashSet<String> = thread
110 .messages
111 .iter()
112 .filter_map(|m| m.id.clone())
113 .collect();
114 for msg in &self.messages {
115 if let Some(ref id) = msg.id {
116 if !existing_ids.insert(id.clone()) {
117 continue;
118 }
119 }
120 thread.messages.push(msg.clone());
121 }
122 thread.patches.extend(self.patches.iter().cloned());
123 }
124}
125
126#[cfg(test)]
127mod tests {
128 use super::*;
129 use crate::thread::{Message, Thread};
130 use serde_json::json;
131
132 fn sample_changeset_with_state_actions() -> ThreadChangeSet {
133 ThreadChangeSet {
134 run_id: "run-1".into(),
135 parent_run_id: None,
136 run_meta: None,
137 reason: CheckpointReason::AssistantTurnCommitted,
138 messages: vec![Arc::new(Message::assistant("hello"))],
139 patches: vec![],
140 state_actions: vec![SerializedStateAction {
141 state_type_name: "TestCounter".into(),
142 base_path: "test_counter".into(),
143 scope: crate::runtime::state::StateScope::Thread,
144 call_id_override: None,
145 payload: json!({"Increment": 1}),
146 }],
147 snapshot: None,
148 }
149 }
150
151 #[test]
152 fn test_changeset_serde_roundtrip_with_state_actions() {
153 let cs = sample_changeset_with_state_actions();
154 assert_eq!(cs.state_actions.len(), 1);
155
156 let json = serde_json::to_string(&cs).unwrap();
157 let restored: ThreadChangeSet = serde_json::from_str(&json).unwrap();
158
159 assert_eq!(restored.state_actions.len(), 1);
160 assert_eq!(restored.state_actions[0].state_type_name, "TestCounter");
161 assert_eq!(restored.state_actions[0].payload, json!({"Increment": 1}));
162 }
163
164 #[test]
165 fn test_changeset_serde_backward_compat_without_state_actions() {
166 let json = r#"{
168 "run_id": "run-1",
169 "reason": "RunFinished",
170 "messages": [],
171 "patches": []
172 }"#;
173 let cs: ThreadChangeSet = serde_json::from_str(json).unwrap();
174 assert!(cs.state_actions.is_empty());
175 }
176
177 #[test]
178 fn test_apply_to_deduplicates_messages() {
179 let msg = Arc::new(Message::user("hello"));
180 let delta = ThreadChangeSet {
181 run_id: "run-1".into(),
182 parent_run_id: None,
183 run_meta: None,
184 reason: CheckpointReason::AssistantTurnCommitted,
185 messages: vec![msg.clone()],
186 patches: vec![],
187 state_actions: vec![],
188 snapshot: None,
189 };
190
191 let mut thread = Thread::new("t1");
192 delta.apply_to(&mut thread);
193 delta.apply_to(&mut thread);
194
195 assert_eq!(
197 thread.messages.len(),
198 1,
199 "apply_to should deduplicate messages by id"
200 );
201 }
202}