Skip to main content

aion_proto/
workflow.rs

1//! Workflow-management serde/prost wire types.
2
3use crate::convert::{ProtoPayload, ProtoRunId, ProtoWorkflowId, WireEnvelope};
4use crate::error::ProtoWireError;
5
6/// Proto representation of `StartWorkflowRequest`.
7#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
8pub struct ProtoStartWorkflowRequest {
9    /// Namespace that scopes the operation.
10    #[prost(string, tag = "1")]
11    pub namespace: String,
12    /// Workflow type name registered with the engine.
13    #[prost(string, tag = "2")]
14    pub workflow_type: String,
15    /// Workflow start input payload.
16    #[prost(message, optional, tag = "3")]
17    pub input: Option<ProtoPayload>,
18}
19
20/// Proto representation of `StartWorkflowResponse`.
21#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
22pub struct ProtoStartWorkflowResponse {
23    /// Assigned workflow identifier.
24    #[prost(message, optional, tag = "1")]
25    pub workflow_id: Option<ProtoWorkflowId>,
26    /// Assigned concrete run identifier.
27    #[prost(message, optional, tag = "2")]
28    pub run_id: Option<ProtoRunId>,
29}
30
31/// Proto representation of `SignalRequest`.
32#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
33pub struct ProtoSignalRequest {
34    /// Namespace that scopes the operation.
35    #[prost(string, tag = "1")]
36    pub namespace: String,
37    /// Target workflow identifier.
38    #[prost(message, optional, tag = "2")]
39    pub workflow_id: Option<ProtoWorkflowId>,
40    /// Target run identifier.
41    #[prost(message, optional, tag = "3")]
42    pub run_id: Option<ProtoRunId>,
43    /// Signal name registered by workflow code.
44    #[prost(string, tag = "4")]
45    pub signal_name: String,
46    /// Signal payload.
47    #[prost(message, optional, tag = "5")]
48    pub payload: Option<ProtoPayload>,
49}
50
51/// Proto representation of `SignalResponse`.
52#[derive(Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
53pub struct ProtoSignalResponse {}
54
55/// Proto representation of `QueryRequest`.
56#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
57pub struct ProtoQueryRequest {
58    /// Namespace that scopes the operation.
59    #[prost(string, tag = "1")]
60    pub namespace: String,
61    /// Target workflow identifier.
62    #[prost(message, optional, tag = "2")]
63    pub workflow_id: Option<ProtoWorkflowId>,
64    /// Target run identifier.
65    #[prost(message, optional, tag = "3")]
66    pub run_id: Option<ProtoRunId>,
67    /// Query name registered by workflow code.
68    #[prost(string, tag = "4")]
69    pub query_name: String,
70}
71
72/// Proto representation of `QueryResponse`.
73#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
74pub struct ProtoQueryResponse {
75    /// Query result or typed wire error.
76    #[prost(oneof = "proto_query_response::Outcome", tags = "1, 2")]
77    pub outcome: Option<proto_query_response::Outcome>,
78}
79
80/// Types nested under [`ProtoQueryResponse`].
81pub mod proto_query_response {
82    /// Proto oneof for successful query payloads and typed failures.
83    #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Oneof)]
84    pub enum Outcome {
85        /// Query result payload.
86        #[prost(message, tag = "1")]
87        Result(super::ProtoPayload),
88        /// Typed query error.
89        #[prost(message, tag = "2")]
90        Error(super::ProtoWireError),
91    }
92}
93
94/// Proto representation of `CancelRequest`.
95#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
96pub struct ProtoCancelRequest {
97    /// Namespace that scopes the operation.
98    #[prost(string, tag = "1")]
99    pub namespace: String,
100    /// Target workflow identifier.
101    #[prost(message, optional, tag = "2")]
102    pub workflow_id: Option<ProtoWorkflowId>,
103    /// Target run identifier.
104    #[prost(message, optional, tag = "3")]
105    pub run_id: Option<ProtoRunId>,
106    /// Human-readable cancellation reason.
107    #[prost(string, tag = "4")]
108    pub reason: String,
109}
110
111/// Proto representation of `CancelResponse`.
112#[derive(Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
113pub struct ProtoCancelResponse {}
114
115/// Proto representation of `ListWorkflowsRequest`.
116#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
117pub struct ProtoListWorkflowsRequest {
118    /// Namespace that scopes the operation.
119    #[prost(string, tag = "1")]
120    pub namespace: String,
121    /// Serde-encoded `aion_store::visibility::ListWorkflowsFilter` envelope.
122    #[prost(message, optional, tag = "2")]
123    pub filter: Option<WireEnvelope>,
124}
125
126/// Proto representation of `ListWorkflowsResponse`.
127#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
128pub struct ProtoListWorkflowsResponse {
129    /// Serde-encoded `aion_store::visibility::WorkflowSummary` envelopes.
130    #[prost(message, repeated, tag = "1")]
131    pub summaries: Vec<WireEnvelope>,
132}
133
134/// Proto representation of `CountWorkflowsRequest`.
135#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
136pub struct ProtoCountWorkflowsRequest {
137    /// Namespace that scopes the operation.
138    #[prost(string, tag = "1")]
139    pub namespace: String,
140    /// Serde-encoded `aion_store::visibility::ListWorkflowsFilter` envelope.
141    #[prost(message, optional, tag = "2")]
142    pub filter: Option<WireEnvelope>,
143}
144
145/// Proto representation of `CountWorkflowsResponse`.
146#[derive(Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
147pub struct ProtoCountWorkflowsResponse {
148    /// Number of visibility summaries matching the filter.
149    #[prost(uint64, tag = "1")]
150    pub count: u64,
151}
152
153/// Proto representation of `DescribeWorkflowRequest`.
154#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
155pub struct ProtoDescribeWorkflowRequest {
156    /// Namespace that scopes the operation.
157    #[prost(string, tag = "1")]
158    pub namespace: String,
159    /// Target workflow identifier.
160    #[prost(message, optional, tag = "2")]
161    pub workflow_id: Option<ProtoWorkflowId>,
162    /// Target run identifier.
163    #[prost(message, optional, tag = "3")]
164    pub run_id: Option<ProtoRunId>,
165    /// Whether event history should be included in the response.
166    #[prost(bool, tag = "4")]
167    pub include_history: bool,
168}
169
170/// Proto representation of `DescribeWorkflowResponse`.
171#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
172pub struct ProtoDescribeWorkflowResponse {
173    /// Serde-encoded `aion_core::WorkflowSummary` envelope.
174    #[prost(message, optional, tag = "1")]
175    pub summary: Option<WireEnvelope>,
176    /// Optional serde-encoded `aion_core::Event` envelopes.
177    #[prost(message, repeated, tag = "2")]
178    pub history: Vec<WireEnvelope>,
179}
180
181#[cfg(test)]
182mod tests {
183    use std::collections::HashMap;
184
185    use aion_core::SearchAttributeValue;
186    use aion_store::visibility::{ListWorkflowsFilter, SearchAttributePredicate};
187    use chrono::{DateTime, Utc};
188    use prost::Message;
189    use serde::de::DeserializeOwned;
190    use serde_json::json;
191
192    use super::{
193        ProtoCountWorkflowsRequest, ProtoCountWorkflowsResponse, ProtoListWorkflowsRequest,
194        ProtoListWorkflowsResponse, ProtoQueryRequest, ProtoQueryResponse,
195        ProtoStartWorkflowRequest, ProtoStartWorkflowResponse, proto_query_response,
196    };
197    use crate::convert::{
198        ProtoPayload, ProtoRunId, ProtoWorkflowId, decode_core_value, encode_core_value,
199    };
200    use crate::error::{ProtoWireError, WireError};
201
202    fn workflow_id() -> aion_core::WorkflowId {
203        aion_core::WorkflowId::new(uuid::Uuid::nil())
204    }
205
206    fn run_id() -> aion_core::RunId {
207        aion_core::RunId::new(uuid::Uuid::nil())
208    }
209
210    fn payload(label: &str) -> Result<ProtoPayload, aion_core::PayloadError> {
211        Ok(ProtoPayload::from(aion_core::Payload::from_json(
212            &json!({ "label": label }),
213        )?))
214    }
215
216    fn recorded_at() -> Result<DateTime<Utc>, chrono::ParseError> {
217        Ok(DateTime::parse_from_rfc3339("2026-01-01T00:00:00Z")?.with_timezone(&Utc))
218    }
219
220    fn assert_json_round_trip<T>(value: &T) -> Result<(), serde_json::Error>
221    where
222        T: Clone + PartialEq + serde::Serialize + DeserializeOwned,
223    {
224        let encoded = serde_json::to_string(value)?;
225        let decoded = serde_json::from_str::<T>(&encoded)?;
226        assert!(decoded == *value);
227        Ok(())
228    }
229
230    fn assert_proto_round_trip<T>(value: &T) -> Result<(), Box<dyn std::error::Error>>
231    where
232        T: Clone + PartialEq + Message + Default,
233    {
234        let mut bytes = Vec::new();
235        value.encode(&mut bytes)?;
236        let decoded = T::decode(bytes.as_slice())?;
237        assert!(decoded == *value);
238        Ok(())
239    }
240
241    #[test]
242    fn start_workflow_round_trips_json_and_proto() -> Result<(), Box<dyn std::error::Error>> {
243        let request = ProtoStartWorkflowRequest {
244            namespace: String::from("tenant-a"),
245            workflow_type: String::from("checkout"),
246            input: Some(payload("input")?),
247        };
248        let response = ProtoStartWorkflowResponse {
249            workflow_id: Some(ProtoWorkflowId::from(workflow_id())),
250            run_id: Some(ProtoRunId::from(run_id())),
251        };
252
253        assert_json_round_trip(&request)?;
254        assert_proto_round_trip(&request)?;
255        assert_json_round_trip(&response)?;
256        assert_proto_round_trip(&response)?;
257        Ok(())
258    }
259
260    #[test]
261    fn list_workflows_round_trips_json_and_proto() -> Result<(), Box<dyn std::error::Error>> {
262        let filter = ListWorkflowsFilter {
263            workflow_type: Some(String::from("checkout")),
264            status: Some(aion_core::WorkflowStatus::Running),
265            search_attributes: vec![SearchAttributePredicate::Equals {
266                name: String::from("customer_id"),
267                value: SearchAttributeValue::String(String::from("12345")),
268            }],
269            limit: Some(10),
270            offset: Some(5),
271            ..ListWorkflowsFilter::default()
272        };
273        let summary = aion_store::visibility::WorkflowSummary {
274            workflow_id: workflow_id(),
275            run_id: run_id(),
276            workflow_type: String::from("checkout"),
277            status: aion_core::WorkflowStatus::Running,
278            start_time: recorded_at()?,
279            close_time: None,
280            search_attributes: HashMap::from([(
281                String::from("customer_id"),
282                SearchAttributeValue::String(String::from("12345")),
283            )]),
284        };
285        let filter_envelope = encode_core_value("tenant-a", Some(String::from("r1")), &filter)?;
286        let summary_envelope = encode_core_value("tenant-a", None, &summary)?;
287        let request = ProtoListWorkflowsRequest {
288            namespace: String::from("tenant-a"),
289            filter: Some(filter_envelope.clone()),
290        };
291        let response = ProtoListWorkflowsResponse {
292            summaries: vec![summary_envelope.clone()],
293        };
294        let count_request = ProtoCountWorkflowsRequest {
295            namespace: String::from("tenant-a"),
296            filter: Some(filter_envelope.clone()),
297        };
298        let count_response = ProtoCountWorkflowsResponse { count: 1 };
299
300        assert_json_round_trip(&request)?;
301        assert_proto_round_trip(&request)?;
302        assert_json_round_trip(&response)?;
303        assert_proto_round_trip(&response)?;
304        assert_json_round_trip(&count_request)?;
305        assert_proto_round_trip(&count_request)?;
306        assert_json_round_trip(&count_response)?;
307        assert_proto_round_trip(&count_response)?;
308        assert_eq!(
309            decode_core_value::<ListWorkflowsFilter>(&filter_envelope)?,
310            filter
311        );
312        assert_eq!(
313            decode_core_value::<aion_store::visibility::WorkflowSummary>(&summary_envelope)?,
314            summary
315        );
316        Ok(())
317    }
318
319    #[test]
320    fn query_round_trips_json_and_proto() -> Result<(), Box<dyn std::error::Error>> {
321        let request = ProtoQueryRequest {
322            namespace: String::from("tenant-a"),
323            workflow_id: Some(ProtoWorkflowId::from(workflow_id())),
324            run_id: Some(ProtoRunId::from(run_id())),
325            query_name: String::from("state"),
326        };
327        let result_response = ProtoQueryResponse {
328            outcome: Some(proto_query_response::Outcome::Result(payload("result")?)),
329        };
330        let error_response = ProtoQueryResponse {
331            outcome: Some(proto_query_response::Outcome::Error(ProtoWireError::from(
332                WireError::unknown_query("state query is not registered"),
333            ))),
334        };
335
336        assert_json_round_trip(&request)?;
337        assert_proto_round_trip(&request)?;
338        assert_json_round_trip(&result_response)?;
339        assert_proto_round_trip(&result_response)?;
340        assert_json_round_trip(&error_response)?;
341        assert_proto_round_trip(&error_response)?;
342        Ok(())
343    }
344}