Skip to main content

rustvello_proto/
invocation.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3
4use crate::identifiers::{CallId, InvocationId, RunnerId, TaskId};
5use crate::status::{InvocationStatus, InvocationStatusRecord};
6
7/// Persistence DTO for an invocation.
8///
9/// Contains identity and metadata but not the call arguments themselves
10/// (those are in `CallDTO`, separated for efficient storage).
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct InvocationDTO {
13    pub invocation_id: InvocationId,
14    pub task_id: TaskId,
15    pub call_id: CallId,
16    pub status: InvocationStatus,
17    pub created_at: DateTime<Utc>,
18    pub updated_at: DateTime<Utc>,
19    /// The parent invocation that spawned this one (None for top-level calls).
20    pub parent_invocation_id: Option<InvocationId>,
21    /// Workflow identity — present for invocations that belong to a workflow.
22    pub workflow: Option<WorkflowIdentity>,
23}
24
25impl InvocationDTO {
26    pub fn new(invocation_id: InvocationId, task_id: TaskId, call_id: CallId) -> Self {
27        let now = Utc::now();
28        Self {
29            invocation_id,
30            task_id,
31            call_id,
32            status: InvocationStatus::Registered,
33            created_at: now,
34            updated_at: now,
35            parent_invocation_id: None,
36            workflow: None,
37        }
38    }
39
40    /// Create a DTO with workflow identity and optional parent.
41    pub fn with_workflow(
42        invocation_id: InvocationId,
43        task_id: TaskId,
44        call_id: CallId,
45        parent_invocation_id: Option<InvocationId>,
46        workflow: WorkflowIdentity,
47    ) -> Self {
48        let now = Utc::now();
49        Self {
50            invocation_id,
51            task_id,
52            call_id,
53            status: InvocationStatus::Registered,
54            created_at: now,
55            updated_at: now,
56            parent_invocation_id,
57            workflow: Some(workflow),
58        }
59    }
60}
61
62/// An audit log entry for status changes.
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct InvocationHistory {
65    pub invocation_id: InvocationId,
66    pub status_record: InvocationStatusRecord,
67    pub message: Option<String>,
68    /// The runner context that produced this status change, if any.
69    pub runner_id: Option<RunnerId>,
70    /// The parent invocation that registered this one (workflow lineage).
71    pub registered_by_inv_id: Option<InvocationId>,
72    /// The history-level timestamp (mirrors pynenc's `InvocationHistory._timestamp`).
73    /// Used for time-range filtering. Defaults to `status_record.timestamp` when absent.
74    #[serde(default, skip_serializing_if = "Option::is_none")]
75    pub history_timestamp: Option<DateTime<Utc>>,
76}
77
78impl InvocationHistory {
79    pub fn new(
80        invocation_id: InvocationId,
81        status_record: InvocationStatusRecord,
82        message: Option<String>,
83    ) -> Self {
84        Self {
85            invocation_id,
86            status_record,
87            message,
88            runner_id: None,
89            registered_by_inv_id: None,
90            history_timestamp: None,
91        }
92    }
93
94    pub fn with_runner(mut self, runner_id: RunnerId) -> Self {
95        self.runner_id = Some(runner_id);
96        self
97    }
98
99    pub fn with_registered_by(mut self, inv_id: InvocationId) -> Self {
100        self.registered_by_inv_id = Some(inv_id);
101        self
102    }
103}
104
105/// Tracks parent-child relationships in distributed workflows.
106///
107/// Matches pynenc's `WorkflowIdentity`. Every workflow is rooted by a
108/// defining invocation; the `workflow_type` captures which task started it.
109#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct WorkflowIdentity {
111    /// The root workflow invocation ID
112    pub workflow_id: InvocationId,
113    /// The task that defines this workflow (the root task)
114    pub workflow_type: TaskId,
115    /// The parent workflow ID if this is a sub-workflow (None for top-level)
116    pub parent_id: Option<InvocationId>,
117    /// Depth in the workflow tree (0 for root)
118    pub depth: u32,
119}
120
121impl WorkflowIdentity {
122    /// Create a new root workflow identity.
123    pub fn root(workflow_id: InvocationId, workflow_type: TaskId) -> Self {
124        Self {
125            workflow_id,
126            workflow_type,
127            parent_id: None,
128            depth: 0,
129        }
130    }
131
132    /// Create a child identity within the same workflow.
133    pub fn child(
134        workflow_id: InvocationId,
135        workflow_type: TaskId,
136        parent_id: InvocationId,
137        depth: u32,
138    ) -> Self {
139        Self {
140            workflow_id,
141            workflow_type,
142            parent_id: Some(parent_id),
143            depth,
144        }
145    }
146
147    /// Create a new sub-workflow identity (force_new_workflow).
148    pub fn sub_workflow(
149        workflow_id: InvocationId,
150        workflow_type: TaskId,
151        parent_workflow_id: InvocationId,
152    ) -> Self {
153        Self {
154            workflow_id,
155            workflow_type,
156            parent_id: Some(parent_workflow_id),
157            depth: 0,
158        }
159    }
160
161    /// Returns true if this workflow is nested under another.
162    pub fn is_sub_workflow(&self) -> bool {
163        self.parent_id.is_some()
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170    use crate::identifiers::RunnerId;
171
172    #[test]
173    fn invocation_dto_new() {
174        let task_id = TaskId::new("mod", "func");
175        let call_id = CallId::new(task_id.clone(), "hash123");
176        let inv_id = InvocationId::from_string("inv-1");
177        let dto = InvocationDTO::new(inv_id.clone(), task_id.clone(), call_id.clone());
178
179        assert_eq!(dto.invocation_id, inv_id);
180        assert_eq!(dto.task_id, task_id);
181        assert_eq!(dto.call_id, call_id);
182        assert_eq!(dto.status, InvocationStatus::Registered);
183        assert!(dto.created_at <= dto.updated_at);
184    }
185
186    #[test]
187    fn invocation_dto_serde_round_trip() {
188        let task_id = TaskId::new("mod", "func");
189        let call_id = CallId::new(task_id.clone(), "hash123");
190        let dto = InvocationDTO::new(InvocationId::new(), task_id, call_id);
191
192        let json = serde_json::to_string(&dto).unwrap();
193        let back: InvocationDTO = serde_json::from_str(&json).unwrap();
194        assert_eq!(back.invocation_id, dto.invocation_id);
195        assert_eq!(back.status, InvocationStatus::Registered);
196    }
197
198    #[test]
199    fn invocation_history_new() {
200        let inv_id = InvocationId::from_string("inv-1");
201        let record = InvocationStatusRecord::new(
202            InvocationStatus::Running,
203            Some(RunnerId::from_string("runner-1")),
204        );
205        let history = InvocationHistory::new(inv_id.clone(), record, Some("started".to_string()));
206
207        assert_eq!(history.invocation_id, inv_id);
208        assert_eq!(history.status_record.status, InvocationStatus::Running);
209        assert_eq!(history.message, Some("started".to_string()));
210    }
211
212    #[test]
213    fn invocation_history_serde_round_trip() {
214        let history = InvocationHistory::new(
215            InvocationId::from_string("inv-1"),
216            InvocationStatusRecord::new(InvocationStatus::Success, None),
217            None,
218        );
219        let json = serde_json::to_string(&history).unwrap();
220        let back: InvocationHistory = serde_json::from_str(&json).unwrap();
221        assert_eq!(back.invocation_id, history.invocation_id);
222        assert_eq!(back.status_record.status, InvocationStatus::Success);
223    }
224
225    #[test]
226    fn workflow_identity_root() {
227        let wf = WorkflowIdentity::root(
228            InvocationId::from_string("wf-1"),
229            TaskId::new("mod", "my_task"),
230        );
231        assert_eq!(wf.workflow_id.as_str(), "wf-1");
232        assert_eq!(wf.workflow_type.name(), "my_task");
233        assert!(wf.parent_id.is_none());
234        assert_eq!(wf.depth, 0);
235        assert!(!wf.is_sub_workflow());
236    }
237
238    #[test]
239    fn workflow_identity_child() {
240        let wf = WorkflowIdentity::child(
241            InvocationId::from_string("wf-1"),
242            TaskId::new("mod", "parent_task"),
243            InvocationId::from_string("parent-1"),
244            2,
245        );
246        assert_eq!(wf.workflow_id.as_str(), "wf-1");
247        assert_eq!(wf.workflow_type.name(), "parent_task");
248        assert_eq!(wf.parent_id.unwrap().as_str(), "parent-1");
249        assert_eq!(wf.depth, 2);
250    }
251
252    #[test]
253    fn workflow_identity_sub_workflow() {
254        let wf = WorkflowIdentity::sub_workflow(
255            InvocationId::from_string("sub-wf-1"),
256            TaskId::new("mod", "sub_task"),
257            InvocationId::from_string("parent-wf-1"),
258        );
259        assert_eq!(wf.workflow_id.as_str(), "sub-wf-1");
260        assert_eq!(wf.workflow_type.name(), "sub_task");
261        assert!(wf.is_sub_workflow());
262        assert_eq!(wf.depth, 0);
263    }
264
265    #[test]
266    fn workflow_identity_serde_round_trip() {
267        let wf = WorkflowIdentity::child(
268            InvocationId::from_string("wf-1"),
269            TaskId::new("mod", "task"),
270            InvocationId::from_string("p-1"),
271            3,
272        );
273        let json = serde_json::to_string(&wf).unwrap();
274        let back: WorkflowIdentity = serde_json::from_str(&json).unwrap();
275        assert_eq!(back.workflow_id, wf.workflow_id);
276        assert_eq!(back.workflow_type, wf.workflow_type);
277        assert_eq!(back.depth, 3);
278    }
279
280    #[test]
281    fn invocation_dto_with_workflow() {
282        let task_id = TaskId::new("mod", "func");
283        let call_id = CallId::new(task_id.clone(), "hash123");
284        let inv_id = InvocationId::from_string("inv-1");
285        let parent_id = InvocationId::from_string("parent-1");
286        let wf = WorkflowIdentity::root(inv_id.clone(), task_id.clone());
287
288        let dto = InvocationDTO::with_workflow(
289            inv_id.clone(),
290            task_id.clone(),
291            call_id,
292            Some(parent_id.clone()),
293            wf,
294        );
295
296        assert_eq!(dto.parent_invocation_id.as_ref().unwrap(), &parent_id);
297        assert!(dto.workflow.is_some());
298        assert_eq!(dto.workflow.as_ref().unwrap().workflow_type, task_id);
299    }
300
301    #[test]
302    fn invocation_dto_without_workflow_is_none() {
303        let task_id = TaskId::new("mod", "func");
304        let call_id = CallId::new(task_id.clone(), "hash123");
305        let dto = InvocationDTO::new(InvocationId::new(), task_id, call_id);
306        assert!(dto.parent_invocation_id.is_none());
307        assert!(dto.workflow.is_none());
308    }
309
310    #[test]
311    fn invocation_dto_with_workflow_serde() {
312        let task_id = TaskId::new("mod", "func");
313        let call_id = CallId::new(task_id.clone(), "hash");
314        let inv_id = InvocationId::from_string("inv-1");
315        let wf = WorkflowIdentity::root(inv_id.clone(), task_id.clone());
316
317        let dto = InvocationDTO::with_workflow(inv_id, task_id, call_id, None, wf);
318        let json = serde_json::to_string(&dto).unwrap();
319        let back: InvocationDTO = serde_json::from_str(&json).unwrap();
320        assert!(back.workflow.is_some());
321        assert_eq!(back.workflow.unwrap().workflow_type.name(), "func");
322    }
323}