Skip to main content

tirea_contract/thread/
changeset.rs

1//! Shared persistence change-set types shared by runtime and storage.
2
3use crate::runtime::state::SerializedStateAction;
4use crate::runtime::RunStatus;
5use crate::storage::RunOrigin;
6use crate::thread::{Message, Thread};
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::collections::HashSet;
10use std::sync::Arc;
11use tirea_state::TrackedPatch;
12
13/// Monotonically increasing version for optimistic concurrency.
14pub type Version = u64;
15
16/// Reason for a checkpoint (delta).
17#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
18pub enum CheckpointReason {
19    UserMessage,
20    AssistantTurnCommitted,
21    ToolResultsCommitted,
22    RunFinished,
23}
24
25/// Run-level metadata carried in a [`ThreadChangeSet`].
26///
27/// When present, the thread store uses this to maintain a run index.
28/// Set on the first changeset of a run (to create the record) and the last
29/// (to finalize status / termination).
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct RunMeta {
32    pub agent_id: String,
33    pub origin: RunOrigin,
34    pub status: RunStatus,
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub parent_thread_id: Option<String>,
37    #[serde(skip_serializing_if = "Option::is_none")]
38    pub termination_code: Option<String>,
39    #[serde(skip_serializing_if = "Option::is_none")]
40    pub termination_detail: Option<String>,
41}
42
43/// An incremental change to a thread produced by a single step.
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct ThreadChangeSet {
46    /// Which run produced this delta.
47    pub run_id: String,
48    /// Parent run (for sub-agent deltas).
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub parent_run_id: Option<String>,
51    /// Run-level metadata for run index maintenance.
52    #[serde(default, skip_serializing_if = "Option::is_none")]
53    pub run_meta: Option<RunMeta>,
54    /// Why this delta was created.
55    pub reason: CheckpointReason,
56    /// New messages appended in this step.
57    pub messages: Vec<Arc<Message>>,
58    /// New patches appended in this step.
59    pub patches: Vec<TrackedPatch>,
60    /// Serialized state actions captured during this step (intent log).
61    #[serde(default, skip_serializing_if = "Vec::is_empty", rename = "actions")]
62    pub state_actions: Vec<SerializedStateAction>,
63    /// If `Some`, a full state snapshot was taken (replaces base state).
64    #[serde(skip_serializing_if = "Option::is_none")]
65    pub snapshot: Option<Value>,
66}
67
68impl ThreadChangeSet {
69    /// Build a `ThreadChangeSet` from explicit delta components.
70    pub fn from_parts(
71        run_id: impl Into<String>,
72        parent_run_id: Option<String>,
73        reason: CheckpointReason,
74        messages: Vec<Arc<Message>>,
75        patches: Vec<TrackedPatch>,
76        state_actions: Vec<SerializedStateAction>,
77        snapshot: Option<Value>,
78    ) -> Self {
79        Self {
80            run_id: run_id.into(),
81            parent_run_id,
82            run_meta: None,
83            reason,
84            messages,
85            patches,
86            state_actions,
87            snapshot,
88        }
89    }
90
91    /// Attach run-level metadata for run index maintenance.
92    #[must_use]
93    pub fn with_run_meta(mut self, meta: RunMeta) -> Self {
94        self.run_meta = Some(meta);
95        self
96    }
97
98    /// Apply this delta to a thread in place.
99    ///
100    /// Messages are deduplicated by `id` — if a message with the same id
101    /// already exists in the thread it is skipped. Messages without an id
102    /// are always appended.
103    pub fn apply_to(&self, thread: &mut Thread) {
104        if let Some(ref snapshot) = self.snapshot {
105            thread.state = snapshot.clone();
106            thread.patches.clear();
107        }
108
109        let mut existing_ids: HashSet<String> = thread
110            .messages
111            .iter()
112            .filter_map(|m| m.id.clone())
113            .collect();
114        for msg in &self.messages {
115            if let Some(ref id) = msg.id {
116                if !existing_ids.insert(id.clone()) {
117                    continue;
118                }
119            }
120            thread.messages.push(msg.clone());
121        }
122        thread.patches.extend(self.patches.iter().cloned());
123    }
124}
125
126#[cfg(test)]
127mod tests {
128    use super::*;
129    use crate::thread::{Message, Thread};
130    use serde_json::json;
131
132    fn sample_changeset_with_state_actions() -> ThreadChangeSet {
133        ThreadChangeSet {
134            run_id: "run-1".into(),
135            parent_run_id: None,
136            run_meta: None,
137            reason: CheckpointReason::AssistantTurnCommitted,
138            messages: vec![Arc::new(Message::assistant("hello"))],
139            patches: vec![],
140            state_actions: vec![SerializedStateAction {
141                state_type_name: "TestCounter".into(),
142                base_path: "test_counter".into(),
143                scope: crate::runtime::state::StateScope::Thread,
144                call_id_override: None,
145                payload: json!({"Increment": 1}),
146            }],
147            snapshot: None,
148        }
149    }
150
151    #[test]
152    fn test_changeset_serde_roundtrip_with_state_actions() {
153        let cs = sample_changeset_with_state_actions();
154        assert_eq!(cs.state_actions.len(), 1);
155
156        let json = serde_json::to_string(&cs).unwrap();
157        let restored: ThreadChangeSet = serde_json::from_str(&json).unwrap();
158
159        assert_eq!(restored.state_actions.len(), 1);
160        assert_eq!(restored.state_actions[0].state_type_name, "TestCounter");
161        assert_eq!(restored.state_actions[0].payload, json!({"Increment": 1}));
162    }
163
164    #[test]
165    fn test_changeset_serde_backward_compat_without_state_actions() {
166        // Simulate old JSON that has no `actions` field.
167        let json = r#"{
168            "run_id": "run-1",
169            "reason": "RunFinished",
170            "messages": [],
171            "patches": []
172        }"#;
173        let cs: ThreadChangeSet = serde_json::from_str(json).unwrap();
174        assert!(cs.state_actions.is_empty());
175    }
176
177    #[test]
178    fn test_apply_to_deduplicates_messages() {
179        let msg = Arc::new(Message::user("hello"));
180        let delta = ThreadChangeSet {
181            run_id: "run-1".into(),
182            parent_run_id: None,
183            run_meta: None,
184            reason: CheckpointReason::AssistantTurnCommitted,
185            messages: vec![msg.clone()],
186            patches: vec![],
187            state_actions: vec![],
188            snapshot: None,
189        };
190
191        let mut thread = Thread::new("t1");
192        delta.apply_to(&mut thread);
193        delta.apply_to(&mut thread);
194
195        // The same message (by id) applied twice should appear only once.
196        assert_eq!(
197            thread.messages.len(),
198            1,
199            "apply_to should deduplicate messages by id"
200        );
201    }
202}