Skip to main content

awaken_runtime_contract/
thread.rs

1//! Thread types for persistent conversation state.
2
3use std::collections::HashMap;
4
5use crate::contract::lifecycle::RunStatus;
6use crate::contract::storage::{RunRecord, StorageError};
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9
10/// Normalize a lineage identifier by trimming whitespace and treating blanks as absent.
11#[must_use]
12pub fn normalize_lineage_id(value: Option<&str>) -> Option<String> {
13    value
14        .map(str::trim)
15        .filter(|value| !value.is_empty())
16        .map(ToOwned::to_owned)
17}
18
19/// Normalize an owned lineage identifier by trimming whitespace and treating blanks as absent.
20#[must_use]
21pub fn normalize_lineage_id_owned(value: Option<String>) -> Option<String> {
22    normalize_lineage_id(value.as_deref())
23}
24
25/// Thread metadata.
26#[derive(Debug, Clone, Default, Serialize, Deserialize)]
27pub struct ThreadMetadata {
28    /// Creation timestamp (unix millis).
29    #[serde(skip_serializing_if = "Option::is_none")]
30    pub created_at: Option<u64>,
31    /// Last update timestamp (unix millis).
32    #[serde(skip_serializing_if = "Option::is_none")]
33    pub updated_at: Option<u64>,
34    /// Optional thread title.
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub title: Option<String>,
37    /// Custom metadata key-value pairs.
38    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
39    pub custom: HashMap<String, Value>,
40}
41
42/// A persistent conversation thread (metadata only).
43///
44/// Messages are stored separately via `ThreadStore::load_messages` /
45/// `ThreadStore::save_messages` to maintain a single source of truth.
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct Thread {
48    /// Unique thread identifier (UUID v7).
49    pub id: String,
50    /// External resource or tenant grouping for this thread.
51    #[serde(default, skip_serializing_if = "Option::is_none")]
52    pub resource_id: Option<String>,
53    /// Parent thread for child or delegated conversations.
54    #[serde(default, skip_serializing_if = "Option::is_none")]
55    pub parent_thread_id: Option<String>,
56    /// Thread metadata (timestamps, title, custom data).
57    #[serde(default)]
58    pub metadata: ThreadMetadata,
59    /// Run currently executing on a worker for this thread.
60    #[serde(default, skip_serializing_if = "Option::is_none")]
61    pub active_run_id: Option<String>,
62    /// Current unfinished user intent for this thread. Waiting runs remain open.
63    #[serde(default, skip_serializing_if = "Option::is_none")]
64    pub open_run_id: Option<String>,
65    /// Most recently known run for this thread.
66    #[serde(default, skip_serializing_if = "Option::is_none")]
67    pub latest_run_id: Option<String>,
68    /// `updated_at` watermark for the projected latest run.
69    #[serde(default, skip_serializing_if = "Option::is_none")]
70    pub latest_run_updated_at: Option<u64>,
71}
72
73impl Thread {
74    /// Create a new thread with a generated UUID v7 identifier.
75    pub fn new() -> Self {
76        Self {
77            id: uuid::Uuid::now_v7().to_string(),
78            resource_id: None,
79            parent_thread_id: None,
80            metadata: ThreadMetadata::default(),
81            active_run_id: None,
82            open_run_id: None,
83            latest_run_id: None,
84            latest_run_updated_at: None,
85        }
86    }
87
88    /// Create a new thread with a specific identifier.
89    pub fn with_id(id: impl Into<String>) -> Self {
90        Self {
91            id: id.into(),
92            resource_id: None,
93            parent_thread_id: None,
94            metadata: ThreadMetadata::default(),
95            active_run_id: None,
96            open_run_id: None,
97            latest_run_id: None,
98            latest_run_updated_at: None,
99        }
100    }
101
102    /// Set the title.
103    #[must_use]
104    pub fn with_title(mut self, title: impl Into<String>) -> Self {
105        self.metadata.title = Some(title.into());
106        self
107    }
108
109    /// Set the external resource grouping.
110    #[must_use]
111    pub fn with_resource_id(mut self, resource_id: impl Into<String>) -> Self {
112        self.resource_id = normalize_lineage_id_owned(Some(resource_id.into()));
113        self
114    }
115
116    /// Set the parent thread identifier.
117    #[must_use]
118    pub fn with_parent_thread_id(mut self, parent_thread_id: impl Into<String>) -> Self {
119        self.parent_thread_id = normalize_lineage_id_owned(Some(parent_thread_id.into()));
120        self
121    }
122
123    /// Normalize lineage identifiers in-place.
124    pub fn normalize_lineage(&mut self) {
125        self.resource_id = normalize_lineage_id_owned(self.resource_id.take());
126        self.parent_thread_id = normalize_lineage_id_owned(self.parent_thread_id.take());
127    }
128
129    /// Validate model-level invariants before persisting a thread projection.
130    pub fn validate_for_persist(&self) -> Result<(), StorageError> {
131        require_non_empty("thread id", &self.id)?;
132        require_optional_non_empty("thread resource_id", self.resource_id.as_deref())?;
133        require_optional_non_empty("thread parent_thread_id", self.parent_thread_id.as_deref())?;
134        require_optional_non_empty("thread active_run_id", self.active_run_id.as_deref())?;
135        require_optional_non_empty("thread open_run_id", self.open_run_id.as_deref())?;
136        require_optional_non_empty("thread latest_run_id", self.latest_run_id.as_deref())?;
137
138        if normalize_lineage_id(self.parent_thread_id.as_deref()).as_deref() == Some(self.id.trim())
139        {
140            return Err(StorageError::Validation(format!(
141                "thread '{}' cannot parent itself",
142                self.id
143            )));
144        }
145
146        Ok(())
147    }
148
149    /// Ensure timestamps are initialized and mark the thread as updated.
150    pub fn touch(&mut self, now: u64) {
151        self.metadata.created_at.get_or_insert(now);
152        self.metadata.updated_at = Some(now);
153    }
154
155    /// Update the thread's run pointers from a durable run record.
156    pub fn apply_run_projection(&mut self, run: &RunRecord) {
157        let is_current_projection = self
158            .latest_run_updated_at
159            .is_none_or(|latest_updated_at| run.updated_at >= latest_updated_at);
160        if !is_current_projection {
161            if run.status == RunStatus::Done {
162                self.clear_run_projection_if_matches(&run.run_id);
163            }
164            return;
165        }
166
167        self.latest_run_id = Some(run.run_id.clone());
168        self.latest_run_updated_at = Some(run.updated_at);
169        if self.parent_thread_id.is_none() {
170            self.parent_thread_id = normalize_lineage_id(
171                run.request
172                    .as_ref()
173                    .and_then(|request| request.parent_thread_id.as_deref()),
174            );
175        }
176        match run.status {
177            RunStatus::Created => {
178                self.active_run_id = None;
179                self.open_run_id = Some(run.run_id.clone());
180            }
181            RunStatus::Running => {
182                self.active_run_id = Some(run.run_id.clone());
183                self.open_run_id = Some(run.run_id.clone());
184            }
185            RunStatus::Waiting => {
186                self.active_run_id = None;
187                self.open_run_id = Some(run.run_id.clone());
188            }
189            RunStatus::Done => {
190                self.clear_run_projection_if_matches(&run.run_id);
191            }
192        }
193    }
194
195    fn clear_run_projection_if_matches(&mut self, run_id: &str) {
196        if self.active_run_id.as_deref() == Some(run_id) {
197            self.active_run_id = None;
198        }
199        if self.open_run_id.as_deref() == Some(run_id) {
200            self.open_run_id = None;
201        }
202    }
203}
204
205fn require_non_empty(field: &str, value: &str) -> Result<(), StorageError> {
206    if value.trim().is_empty() {
207        return Err(StorageError::Validation(format!(
208            "{field} must not be empty"
209        )));
210    }
211    Ok(())
212}
213
214fn require_optional_non_empty(field: &str, value: Option<&str>) -> Result<(), StorageError> {
215    if let Some(value) = value {
216        require_non_empty(field, value)?;
217    }
218    Ok(())
219}
220
221impl Default for Thread {
222    fn default() -> Self {
223        Self::new()
224    }
225}
226
227#[cfg(test)]
228mod tests {
229    use super::*;
230    use crate::contract::lifecycle::RunStatus;
231    use crate::contract::storage::RunRecord;
232    use serde_json::json;
233
234    #[test]
235    fn thread_new_generates_uuid_v7() {
236        let thread = Thread::new();
237        assert_eq!(thread.id.len(), 36);
238        assert_eq!(&thread.id[14..15], "7", "should be UUID v7");
239        assert!(thread.metadata.title.is_none());
240    }
241
242    #[test]
243    fn thread_with_id() {
244        let thread = Thread::with_id("my-thread-1");
245        assert_eq!(thread.id, "my-thread-1");
246    }
247
248    #[test]
249    fn thread_with_title() {
250        let thread = Thread::new().with_title("Test Chat");
251        assert_eq!(thread.metadata.title.as_deref(), Some("Test Chat"));
252    }
253
254    #[test]
255    fn thread_serialization_roundtrip() {
256        let mut thread = Thread::with_id("t-1").with_title("My Thread");
257        thread.metadata.created_at = Some(1000);
258        thread.metadata.updated_at = Some(2000);
259        thread
260            .metadata
261            .custom
262            .insert("env".to_string(), json!("prod"));
263        thread.resource_id = Some("resource-1".to_string());
264        thread.parent_thread_id = Some("parent-1".to_string());
265
266        let json_str = serde_json::to_string(&thread).unwrap();
267        let restored: Thread = serde_json::from_str(&json_str).unwrap();
268
269        assert_eq!(restored.id, "t-1");
270        assert_eq!(restored.resource_id.as_deref(), Some("resource-1"));
271        assert_eq!(restored.parent_thread_id.as_deref(), Some("parent-1"));
272        assert_eq!(restored.metadata.title.as_deref(), Some("My Thread"));
273        assert_eq!(restored.metadata.created_at, Some(1000));
274        assert_eq!(restored.metadata.updated_at, Some(2000));
275        assert_eq!(restored.metadata.custom["env"], json!("prod"));
276    }
277
278    #[test]
279    fn thread_metadata_default() {
280        let meta = ThreadMetadata::default();
281        assert!(meta.created_at.is_none());
282        assert!(meta.updated_at.is_none());
283        assert!(meta.title.is_none());
284        assert!(meta.custom.is_empty());
285    }
286
287    #[test]
288    fn thread_metadata_omits_empty_fields() {
289        let meta = ThreadMetadata::default();
290        let json = serde_json::to_string(&meta).unwrap();
291        assert!(!json.contains("created_at"));
292        assert!(!json.contains("updated_at"));
293        assert!(!json.contains("title"));
294        assert!(!json.contains("custom"));
295    }
296
297    #[test]
298    fn thread_default_is_new() {
299        let thread = Thread::default();
300        assert_eq!(thread.id.len(), 36);
301    }
302
303    #[test]
304    fn distinct_threads_get_distinct_ids() {
305        let a = Thread::new();
306        let b = Thread::new();
307        assert_ne!(a.id, b.id);
308    }
309
310    #[test]
311    fn thread_with_custom_metadata() {
312        let mut thread = Thread::with_id("t-1");
313        thread.metadata.created_at = Some(1000);
314        thread.metadata.updated_at = Some(2000);
315        thread
316            .metadata
317            .custom
318            .insert("env".to_string(), json!("prod"));
319
320        assert_eq!(thread.metadata.created_at, Some(1000));
321        assert_eq!(thread.metadata.custom["env"], json!("prod"));
322    }
323
324    #[test]
325    fn thread_with_title_chaining() {
326        let thread = Thread::with_id("t-1").with_title("Test");
327        assert_eq!(thread.metadata.title.as_deref(), Some("Test"));
328    }
329
330    #[test]
331    fn thread_lineage_builders() {
332        let thread = Thread::with_id("t-1")
333            .with_resource_id("resource-1")
334            .with_parent_thread_id("parent-1");
335
336        assert_eq!(thread.resource_id.as_deref(), Some("resource-1"));
337        assert_eq!(thread.parent_thread_id.as_deref(), Some("parent-1"));
338    }
339
340    #[test]
341    fn normalize_lineage_id_trims_and_drops_blank_values() {
342        assert_eq!(
343            normalize_lineage_id(Some(" parent-1 ")),
344            Some("parent-1".into())
345        );
346        assert_eq!(normalize_lineage_id(Some("   ")), None);
347        assert_eq!(normalize_lineage_id(None), None);
348    }
349
350    #[test]
351    fn normalize_lineage_updates_thread_fields() {
352        let mut thread = Thread::with_id("t-1");
353        thread.resource_id = Some(" resource-1 ".into());
354        thread.parent_thread_id = Some("   ".into());
355
356        thread.normalize_lineage();
357
358        assert_eq!(thread.resource_id.as_deref(), Some("resource-1"));
359        assert_eq!(thread.parent_thread_id, None);
360    }
361
362    #[test]
363    fn thread_validate_rejects_empty_id() {
364        let thread = Thread::with_id(" ");
365
366        let err = thread.validate_for_persist().unwrap_err();
367
368        assert!(matches!(err, StorageError::Validation(message) if message.contains("thread id")));
369    }
370
371    #[test]
372    fn thread_validate_rejects_self_parent() {
373        let thread = Thread::with_id("thread-1").with_parent_thread_id(" thread-1 ");
374
375        let err = thread.validate_for_persist().unwrap_err();
376
377        assert!(
378            matches!(err, StorageError::Validation(message) if message.contains("parent itself"))
379        );
380    }
381
382    #[test]
383    fn touch_initializes_created_and_updated_at() {
384        let mut thread = Thread::with_id("t-1");
385
386        thread.touch(1234);
387
388        assert_eq!(thread.metadata.created_at, Some(1234));
389        assert_eq!(thread.metadata.updated_at, Some(1234));
390    }
391
392    #[test]
393    fn touch_preserves_created_at_and_refreshes_updated_at() {
394        let mut thread = Thread::with_id("t-1");
395        thread.metadata.created_at = Some(1000);
396        thread.metadata.updated_at = Some(1500);
397
398        thread.touch(2000);
399
400        assert_eq!(thread.metadata.created_at, Some(1000));
401        assert_eq!(thread.metadata.updated_at, Some(2000));
402    }
403
404    #[test]
405    fn thread_metadata_custom_preserved_in_serde() {
406        let mut thread = Thread::with_id("t-1");
407        thread.metadata.custom.insert("key".to_string(), json!(42));
408        let json_str = serde_json::to_string(&thread).unwrap();
409        let restored: Thread = serde_json::from_str(&json_str).unwrap();
410        assert_eq!(restored.metadata.custom["key"], json!(42));
411    }
412
413    #[test]
414    fn thread_empty_metadata_is_compact() {
415        let thread = Thread::with_id("t-1");
416        let json_str = serde_json::to_string(&thread).unwrap();
417        // Empty custom map should be omitted
418        assert!(!json_str.contains("custom"));
419        assert!(!json_str.contains("resource_id"));
420        assert!(!json_str.contains("parent_thread_id"));
421    }
422
423    fn run_record(run_id: &str, status: RunStatus) -> RunRecord {
424        RunRecord {
425            run_id: run_id.to_string(),
426            thread_id: "thread-1".to_string(),
427            agent_id: "agent-1".to_string(),
428            parent_run_id: None,
429            resolution_id: None,
430            activation: None,
431            request: None,
432            input: None,
433            output: None,
434            status,
435            termination_reason: None,
436            final_output: None,
437            error_payload: None,
438            dispatch_id: None,
439            session_id: None,
440            transport_request_id: None,
441            waiting: None,
442            outcome: None,
443            created_at: 1,
444            started_at: None,
445            finished_at: None,
446            updated_at: 1,
447            steps: 0,
448            input_tokens: 0,
449            output_tokens: 0,
450            state: None,
451        }
452    }
453
454    #[test]
455    fn thread_run_projection_keeps_waiting_run_open_but_not_active() {
456        let mut thread = Thread::with_id("thread-1");
457        thread.apply_run_projection(&run_record("run-1", RunStatus::Created));
458        assert_eq!(thread.open_run_id.as_deref(), Some("run-1"));
459        assert!(thread.active_run_id.is_none());
460
461        thread.apply_run_projection(&run_record("run-1", RunStatus::Running));
462        assert_eq!(thread.open_run_id.as_deref(), Some("run-1"));
463        assert_eq!(thread.active_run_id.as_deref(), Some("run-1"));
464
465        thread.apply_run_projection(&run_record("run-1", RunStatus::Waiting));
466        assert_eq!(thread.open_run_id.as_deref(), Some("run-1"));
467        assert!(thread.active_run_id.is_none());
468
469        thread.apply_run_projection(&run_record("run-1", RunStatus::Done));
470        assert!(thread.open_run_id.is_none());
471        assert!(thread.active_run_id.is_none());
472        assert_eq!(thread.latest_run_id.as_deref(), Some("run-1"));
473    }
474
475    #[test]
476    fn apply_run_projection_ignores_older_run_projection() {
477        let mut thread = Thread::with_id("thread-1");
478        let mut newer = run_record("run-new", RunStatus::Running);
479        newer.updated_at = 20;
480        let mut older = run_record("run-old", RunStatus::Running);
481        older.updated_at = 10;
482
483        thread.apply_run_projection(&newer);
484        thread.apply_run_projection(&older);
485
486        assert_eq!(thread.latest_run_id.as_deref(), Some("run-new"));
487        assert_eq!(thread.active_run_id.as_deref(), Some("run-new"));
488        assert_eq!(thread.open_run_id.as_deref(), Some("run-new"));
489    }
490
491    #[test]
492    fn apply_run_projection_sets_parent_thread_id_when_missing() {
493        let mut thread = Thread::with_id("thread-1");
494        let mut run = run_record("run-1", RunStatus::Created);
495        run.request = Some(crate::contract::storage::RunRequestSnapshot {
496            parent_thread_id: Some(" parent-thread ".to_string()),
497            ..Default::default()
498        });
499
500        thread.apply_run_projection(&run);
501
502        assert_eq!(thread.parent_thread_id.as_deref(), Some("parent-thread"));
503    }
504
505    #[test]
506    fn apply_run_projection_preserves_existing_parent_thread_id() {
507        let mut thread = Thread::with_id("thread-1").with_parent_thread_id("existing-parent");
508        let mut run = run_record("run-1", RunStatus::Created);
509        run.request = Some(crate::contract::storage::RunRequestSnapshot {
510            parent_thread_id: Some("new-parent".to_string()),
511            ..Default::default()
512        });
513
514        thread.apply_run_projection(&run);
515
516        assert_eq!(thread.parent_thread_id.as_deref(), Some("existing-parent"));
517    }
518}