1use std::collections::HashMap;
4
5use crate::contract::lifecycle::RunStatus;
6use crate::contract::storage::{RunRecord, StorageError};
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9
10#[must_use]
12pub fn normalize_lineage_id(value: Option<&str>) -> Option<String> {
13 value
14 .map(str::trim)
15 .filter(|value| !value.is_empty())
16 .map(ToOwned::to_owned)
17}
18
19#[must_use]
21pub fn normalize_lineage_id_owned(value: Option<String>) -> Option<String> {
22 normalize_lineage_id(value.as_deref())
23}
24
25#[derive(Debug, Clone, Default, Serialize, Deserialize)]
27pub struct ThreadMetadata {
28 #[serde(skip_serializing_if = "Option::is_none")]
30 pub created_at: Option<u64>,
31 #[serde(skip_serializing_if = "Option::is_none")]
33 pub updated_at: Option<u64>,
34 #[serde(skip_serializing_if = "Option::is_none")]
36 pub title: Option<String>,
37 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
39 pub custom: HashMap<String, Value>,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct Thread {
48 pub id: String,
50 #[serde(default, skip_serializing_if = "Option::is_none")]
52 pub resource_id: Option<String>,
53 #[serde(default, skip_serializing_if = "Option::is_none")]
55 pub parent_thread_id: Option<String>,
56 #[serde(default)]
58 pub metadata: ThreadMetadata,
59 #[serde(default, skip_serializing_if = "Option::is_none")]
61 pub active_run_id: Option<String>,
62 #[serde(default, skip_serializing_if = "Option::is_none")]
64 pub open_run_id: Option<String>,
65 #[serde(default, skip_serializing_if = "Option::is_none")]
67 pub latest_run_id: Option<String>,
68 #[serde(default, skip_serializing_if = "Option::is_none")]
70 pub latest_run_updated_at: Option<u64>,
71}
72
73impl Thread {
74 pub fn new() -> Self {
76 Self {
77 id: uuid::Uuid::now_v7().to_string(),
78 resource_id: None,
79 parent_thread_id: None,
80 metadata: ThreadMetadata::default(),
81 active_run_id: None,
82 open_run_id: None,
83 latest_run_id: None,
84 latest_run_updated_at: None,
85 }
86 }
87
88 pub fn with_id(id: impl Into<String>) -> Self {
90 Self {
91 id: id.into(),
92 resource_id: None,
93 parent_thread_id: None,
94 metadata: ThreadMetadata::default(),
95 active_run_id: None,
96 open_run_id: None,
97 latest_run_id: None,
98 latest_run_updated_at: None,
99 }
100 }
101
102 #[must_use]
104 pub fn with_title(mut self, title: impl Into<String>) -> Self {
105 self.metadata.title = Some(title.into());
106 self
107 }
108
109 #[must_use]
111 pub fn with_resource_id(mut self, resource_id: impl Into<String>) -> Self {
112 self.resource_id = normalize_lineage_id_owned(Some(resource_id.into()));
113 self
114 }
115
116 #[must_use]
118 pub fn with_parent_thread_id(mut self, parent_thread_id: impl Into<String>) -> Self {
119 self.parent_thread_id = normalize_lineage_id_owned(Some(parent_thread_id.into()));
120 self
121 }
122
123 pub fn normalize_lineage(&mut self) {
125 self.resource_id = normalize_lineage_id_owned(self.resource_id.take());
126 self.parent_thread_id = normalize_lineage_id_owned(self.parent_thread_id.take());
127 }
128
129 pub fn validate_for_persist(&self) -> Result<(), StorageError> {
131 require_non_empty("thread id", &self.id)?;
132 require_optional_non_empty("thread resource_id", self.resource_id.as_deref())?;
133 require_optional_non_empty("thread parent_thread_id", self.parent_thread_id.as_deref())?;
134 require_optional_non_empty("thread active_run_id", self.active_run_id.as_deref())?;
135 require_optional_non_empty("thread open_run_id", self.open_run_id.as_deref())?;
136 require_optional_non_empty("thread latest_run_id", self.latest_run_id.as_deref())?;
137
138 if normalize_lineage_id(self.parent_thread_id.as_deref()).as_deref() == Some(self.id.trim())
139 {
140 return Err(StorageError::Validation(format!(
141 "thread '{}' cannot parent itself",
142 self.id
143 )));
144 }
145
146 Ok(())
147 }
148
149 pub fn touch(&mut self, now: u64) {
151 self.metadata.created_at.get_or_insert(now);
152 self.metadata.updated_at = Some(now);
153 }
154
155 pub fn apply_run_projection(&mut self, run: &RunRecord) {
157 let is_current_projection = self
158 .latest_run_updated_at
159 .is_none_or(|latest_updated_at| run.updated_at >= latest_updated_at);
160 if !is_current_projection {
161 if run.status == RunStatus::Done {
162 self.clear_run_projection_if_matches(&run.run_id);
163 }
164 return;
165 }
166
167 self.latest_run_id = Some(run.run_id.clone());
168 self.latest_run_updated_at = Some(run.updated_at);
169 if self.parent_thread_id.is_none() {
170 self.parent_thread_id = normalize_lineage_id(
171 run.request
172 .as_ref()
173 .and_then(|request| request.parent_thread_id.as_deref()),
174 );
175 }
176 match run.status {
177 RunStatus::Created => {
178 self.active_run_id = None;
179 self.open_run_id = Some(run.run_id.clone());
180 }
181 RunStatus::Running => {
182 self.active_run_id = Some(run.run_id.clone());
183 self.open_run_id = Some(run.run_id.clone());
184 }
185 RunStatus::Waiting => {
186 self.active_run_id = None;
187 self.open_run_id = Some(run.run_id.clone());
188 }
189 RunStatus::Done => {
190 self.clear_run_projection_if_matches(&run.run_id);
191 }
192 }
193 }
194
195 fn clear_run_projection_if_matches(&mut self, run_id: &str) {
196 if self.active_run_id.as_deref() == Some(run_id) {
197 self.active_run_id = None;
198 }
199 if self.open_run_id.as_deref() == Some(run_id) {
200 self.open_run_id = None;
201 }
202 }
203}
204
205fn require_non_empty(field: &str, value: &str) -> Result<(), StorageError> {
206 if value.trim().is_empty() {
207 return Err(StorageError::Validation(format!(
208 "{field} must not be empty"
209 )));
210 }
211 Ok(())
212}
213
214fn require_optional_non_empty(field: &str, value: Option<&str>) -> Result<(), StorageError> {
215 if let Some(value) = value {
216 require_non_empty(field, value)?;
217 }
218 Ok(())
219}
220
221impl Default for Thread {
222 fn default() -> Self {
223 Self::new()
224 }
225}
226
227#[cfg(test)]
228mod tests {
229 use super::*;
230 use crate::contract::lifecycle::RunStatus;
231 use crate::contract::storage::RunRecord;
232 use serde_json::json;
233
234 #[test]
235 fn thread_new_generates_uuid_v7() {
236 let thread = Thread::new();
237 assert_eq!(thread.id.len(), 36);
238 assert_eq!(&thread.id[14..15], "7", "should be UUID v7");
239 assert!(thread.metadata.title.is_none());
240 }
241
242 #[test]
243 fn thread_with_id() {
244 let thread = Thread::with_id("my-thread-1");
245 assert_eq!(thread.id, "my-thread-1");
246 }
247
248 #[test]
249 fn thread_with_title() {
250 let thread = Thread::new().with_title("Test Chat");
251 assert_eq!(thread.metadata.title.as_deref(), Some("Test Chat"));
252 }
253
254 #[test]
255 fn thread_serialization_roundtrip() {
256 let mut thread = Thread::with_id("t-1").with_title("My Thread");
257 thread.metadata.created_at = Some(1000);
258 thread.metadata.updated_at = Some(2000);
259 thread
260 .metadata
261 .custom
262 .insert("env".to_string(), json!("prod"));
263 thread.resource_id = Some("resource-1".to_string());
264 thread.parent_thread_id = Some("parent-1".to_string());
265
266 let json_str = serde_json::to_string(&thread).unwrap();
267 let restored: Thread = serde_json::from_str(&json_str).unwrap();
268
269 assert_eq!(restored.id, "t-1");
270 assert_eq!(restored.resource_id.as_deref(), Some("resource-1"));
271 assert_eq!(restored.parent_thread_id.as_deref(), Some("parent-1"));
272 assert_eq!(restored.metadata.title.as_deref(), Some("My Thread"));
273 assert_eq!(restored.metadata.created_at, Some(1000));
274 assert_eq!(restored.metadata.updated_at, Some(2000));
275 assert_eq!(restored.metadata.custom["env"], json!("prod"));
276 }
277
278 #[test]
279 fn thread_metadata_default() {
280 let meta = ThreadMetadata::default();
281 assert!(meta.created_at.is_none());
282 assert!(meta.updated_at.is_none());
283 assert!(meta.title.is_none());
284 assert!(meta.custom.is_empty());
285 }
286
287 #[test]
288 fn thread_metadata_omits_empty_fields() {
289 let meta = ThreadMetadata::default();
290 let json = serde_json::to_string(&meta).unwrap();
291 assert!(!json.contains("created_at"));
292 assert!(!json.contains("updated_at"));
293 assert!(!json.contains("title"));
294 assert!(!json.contains("custom"));
295 }
296
297 #[test]
298 fn thread_default_is_new() {
299 let thread = Thread::default();
300 assert_eq!(thread.id.len(), 36);
301 }
302
303 #[test]
304 fn distinct_threads_get_distinct_ids() {
305 let a = Thread::new();
306 let b = Thread::new();
307 assert_ne!(a.id, b.id);
308 }
309
310 #[test]
311 fn thread_with_custom_metadata() {
312 let mut thread = Thread::with_id("t-1");
313 thread.metadata.created_at = Some(1000);
314 thread.metadata.updated_at = Some(2000);
315 thread
316 .metadata
317 .custom
318 .insert("env".to_string(), json!("prod"));
319
320 assert_eq!(thread.metadata.created_at, Some(1000));
321 assert_eq!(thread.metadata.custom["env"], json!("prod"));
322 }
323
324 #[test]
325 fn thread_with_title_chaining() {
326 let thread = Thread::with_id("t-1").with_title("Test");
327 assert_eq!(thread.metadata.title.as_deref(), Some("Test"));
328 }
329
330 #[test]
331 fn thread_lineage_builders() {
332 let thread = Thread::with_id("t-1")
333 .with_resource_id("resource-1")
334 .with_parent_thread_id("parent-1");
335
336 assert_eq!(thread.resource_id.as_deref(), Some("resource-1"));
337 assert_eq!(thread.parent_thread_id.as_deref(), Some("parent-1"));
338 }
339
340 #[test]
341 fn normalize_lineage_id_trims_and_drops_blank_values() {
342 assert_eq!(
343 normalize_lineage_id(Some(" parent-1 ")),
344 Some("parent-1".into())
345 );
346 assert_eq!(normalize_lineage_id(Some(" ")), None);
347 assert_eq!(normalize_lineage_id(None), None);
348 }
349
350 #[test]
351 fn normalize_lineage_updates_thread_fields() {
352 let mut thread = Thread::with_id("t-1");
353 thread.resource_id = Some(" resource-1 ".into());
354 thread.parent_thread_id = Some(" ".into());
355
356 thread.normalize_lineage();
357
358 assert_eq!(thread.resource_id.as_deref(), Some("resource-1"));
359 assert_eq!(thread.parent_thread_id, None);
360 }
361
362 #[test]
363 fn thread_validate_rejects_empty_id() {
364 let thread = Thread::with_id(" ");
365
366 let err = thread.validate_for_persist().unwrap_err();
367
368 assert!(matches!(err, StorageError::Validation(message) if message.contains("thread id")));
369 }
370
371 #[test]
372 fn thread_validate_rejects_self_parent() {
373 let thread = Thread::with_id("thread-1").with_parent_thread_id(" thread-1 ");
374
375 let err = thread.validate_for_persist().unwrap_err();
376
377 assert!(
378 matches!(err, StorageError::Validation(message) if message.contains("parent itself"))
379 );
380 }
381
382 #[test]
383 fn touch_initializes_created_and_updated_at() {
384 let mut thread = Thread::with_id("t-1");
385
386 thread.touch(1234);
387
388 assert_eq!(thread.metadata.created_at, Some(1234));
389 assert_eq!(thread.metadata.updated_at, Some(1234));
390 }
391
392 #[test]
393 fn touch_preserves_created_at_and_refreshes_updated_at() {
394 let mut thread = Thread::with_id("t-1");
395 thread.metadata.created_at = Some(1000);
396 thread.metadata.updated_at = Some(1500);
397
398 thread.touch(2000);
399
400 assert_eq!(thread.metadata.created_at, Some(1000));
401 assert_eq!(thread.metadata.updated_at, Some(2000));
402 }
403
404 #[test]
405 fn thread_metadata_custom_preserved_in_serde() {
406 let mut thread = Thread::with_id("t-1");
407 thread.metadata.custom.insert("key".to_string(), json!(42));
408 let json_str = serde_json::to_string(&thread).unwrap();
409 let restored: Thread = serde_json::from_str(&json_str).unwrap();
410 assert_eq!(restored.metadata.custom["key"], json!(42));
411 }
412
413 #[test]
414 fn thread_empty_metadata_is_compact() {
415 let thread = Thread::with_id("t-1");
416 let json_str = serde_json::to_string(&thread).unwrap();
417 assert!(!json_str.contains("custom"));
419 assert!(!json_str.contains("resource_id"));
420 assert!(!json_str.contains("parent_thread_id"));
421 }
422
423 fn run_record(run_id: &str, status: RunStatus) -> RunRecord {
424 RunRecord {
425 run_id: run_id.to_string(),
426 thread_id: "thread-1".to_string(),
427 agent_id: "agent-1".to_string(),
428 parent_run_id: None,
429 resolution_id: None,
430 activation: None,
431 request: None,
432 input: None,
433 output: None,
434 status,
435 termination_reason: None,
436 final_output: None,
437 error_payload: None,
438 dispatch_id: None,
439 session_id: None,
440 transport_request_id: None,
441 waiting: None,
442 outcome: None,
443 created_at: 1,
444 started_at: None,
445 finished_at: None,
446 updated_at: 1,
447 steps: 0,
448 input_tokens: 0,
449 output_tokens: 0,
450 state: None,
451 }
452 }
453
454 #[test]
455 fn thread_run_projection_keeps_waiting_run_open_but_not_active() {
456 let mut thread = Thread::with_id("thread-1");
457 thread.apply_run_projection(&run_record("run-1", RunStatus::Created));
458 assert_eq!(thread.open_run_id.as_deref(), Some("run-1"));
459 assert!(thread.active_run_id.is_none());
460
461 thread.apply_run_projection(&run_record("run-1", RunStatus::Running));
462 assert_eq!(thread.open_run_id.as_deref(), Some("run-1"));
463 assert_eq!(thread.active_run_id.as_deref(), Some("run-1"));
464
465 thread.apply_run_projection(&run_record("run-1", RunStatus::Waiting));
466 assert_eq!(thread.open_run_id.as_deref(), Some("run-1"));
467 assert!(thread.active_run_id.is_none());
468
469 thread.apply_run_projection(&run_record("run-1", RunStatus::Done));
470 assert!(thread.open_run_id.is_none());
471 assert!(thread.active_run_id.is_none());
472 assert_eq!(thread.latest_run_id.as_deref(), Some("run-1"));
473 }
474
475 #[test]
476 fn apply_run_projection_ignores_older_run_projection() {
477 let mut thread = Thread::with_id("thread-1");
478 let mut newer = run_record("run-new", RunStatus::Running);
479 newer.updated_at = 20;
480 let mut older = run_record("run-old", RunStatus::Running);
481 older.updated_at = 10;
482
483 thread.apply_run_projection(&newer);
484 thread.apply_run_projection(&older);
485
486 assert_eq!(thread.latest_run_id.as_deref(), Some("run-new"));
487 assert_eq!(thread.active_run_id.as_deref(), Some("run-new"));
488 assert_eq!(thread.open_run_id.as_deref(), Some("run-new"));
489 }
490
491 #[test]
492 fn apply_run_projection_sets_parent_thread_id_when_missing() {
493 let mut thread = Thread::with_id("thread-1");
494 let mut run = run_record("run-1", RunStatus::Created);
495 run.request = Some(crate::contract::storage::RunRequestSnapshot {
496 parent_thread_id: Some(" parent-thread ".to_string()),
497 ..Default::default()
498 });
499
500 thread.apply_run_projection(&run);
501
502 assert_eq!(thread.parent_thread_id.as_deref(), Some("parent-thread"));
503 }
504
505 #[test]
506 fn apply_run_projection_preserves_existing_parent_thread_id() {
507 let mut thread = Thread::with_id("thread-1").with_parent_thread_id("existing-parent");
508 let mut run = run_record("run-1", RunStatus::Created);
509 run.request = Some(crate::contract::storage::RunRequestSnapshot {
510 parent_thread_id: Some("new-parent".to_string()),
511 ..Default::default()
512 });
513
514 thread.apply_run_projection(&run);
515
516 assert_eq!(thread.parent_thread_id.as_deref(), Some("existing-parent"));
517 }
518}