1use crate::convert::{ProtoPayload, ProtoRunId, ProtoWorkflowId, WireEnvelope};
4use crate::error::ProtoWireError;
5
6#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
8pub struct ProtoStartWorkflowRequest {
9 #[prost(string, tag = "1")]
11 pub namespace: String,
12 #[prost(string, tag = "2")]
14 pub workflow_type: String,
15 #[prost(message, optional, tag = "3")]
17 pub input: Option<ProtoPayload>,
18}
19
20#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
22pub struct ProtoStartWorkflowResponse {
23 #[prost(message, optional, tag = "1")]
25 pub workflow_id: Option<ProtoWorkflowId>,
26 #[prost(message, optional, tag = "2")]
28 pub run_id: Option<ProtoRunId>,
29}
30
31#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
33pub struct ProtoSignalRequest {
34 #[prost(string, tag = "1")]
36 pub namespace: String,
37 #[prost(message, optional, tag = "2")]
39 pub workflow_id: Option<ProtoWorkflowId>,
40 #[prost(message, optional, tag = "3")]
42 pub run_id: Option<ProtoRunId>,
43 #[prost(string, tag = "4")]
45 pub signal_name: String,
46 #[prost(message, optional, tag = "5")]
48 pub payload: Option<ProtoPayload>,
49}
50
51#[derive(Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
53pub struct ProtoSignalResponse {}
54
55#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
57pub struct ProtoQueryRequest {
58 #[prost(string, tag = "1")]
60 pub namespace: String,
61 #[prost(message, optional, tag = "2")]
63 pub workflow_id: Option<ProtoWorkflowId>,
64 #[prost(message, optional, tag = "3")]
66 pub run_id: Option<ProtoRunId>,
67 #[prost(string, tag = "4")]
69 pub query_name: String,
70}
71
72#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
74pub struct ProtoQueryResponse {
75 #[prost(oneof = "proto_query_response::Outcome", tags = "1, 2")]
77 pub outcome: Option<proto_query_response::Outcome>,
78}
79
80pub mod proto_query_response {
82 #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Oneof)]
84 pub enum Outcome {
85 #[prost(message, tag = "1")]
87 Result(super::ProtoPayload),
88 #[prost(message, tag = "2")]
90 Error(super::ProtoWireError),
91 }
92}
93
94#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
96pub struct ProtoCancelRequest {
97 #[prost(string, tag = "1")]
99 pub namespace: String,
100 #[prost(message, optional, tag = "2")]
102 pub workflow_id: Option<ProtoWorkflowId>,
103 #[prost(message, optional, tag = "3")]
105 pub run_id: Option<ProtoRunId>,
106 #[prost(string, tag = "4")]
108 pub reason: String,
109}
110
111#[derive(Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
113pub struct ProtoCancelResponse {}
114
115#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
117pub struct ProtoListWorkflowsRequest {
118 #[prost(string, tag = "1")]
120 pub namespace: String,
121 #[prost(message, optional, tag = "2")]
123 pub filter: Option<WireEnvelope>,
124}
125
126#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
128pub struct ProtoListWorkflowsResponse {
129 #[prost(message, repeated, tag = "1")]
131 pub summaries: Vec<WireEnvelope>,
132}
133
134#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
136pub struct ProtoCountWorkflowsRequest {
137 #[prost(string, tag = "1")]
139 pub namespace: String,
140 #[prost(message, optional, tag = "2")]
142 pub filter: Option<WireEnvelope>,
143}
144
145#[derive(Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
147pub struct ProtoCountWorkflowsResponse {
148 #[prost(uint64, tag = "1")]
150 pub count: u64,
151}
152
153#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
155pub struct ProtoDescribeWorkflowRequest {
156 #[prost(string, tag = "1")]
158 pub namespace: String,
159 #[prost(message, optional, tag = "2")]
161 pub workflow_id: Option<ProtoWorkflowId>,
162 #[prost(message, optional, tag = "3")]
164 pub run_id: Option<ProtoRunId>,
165 #[prost(bool, tag = "4")]
167 pub include_history: bool,
168}
169
170#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
172pub struct ProtoDescribeWorkflowResponse {
173 #[prost(message, optional, tag = "1")]
175 pub summary: Option<WireEnvelope>,
176 #[prost(message, repeated, tag = "2")]
178 pub history: Vec<WireEnvelope>,
179}
180
181#[cfg(test)]
182mod tests {
183 use std::collections::HashMap;
184
185 use aion_core::SearchAttributeValue;
186 use aion_store::visibility::{ListWorkflowsFilter, SearchAttributePredicate};
187 use chrono::{DateTime, Utc};
188 use prost::Message;
189 use serde::de::DeserializeOwned;
190 use serde_json::json;
191
192 use super::{
193 ProtoCountWorkflowsRequest, ProtoCountWorkflowsResponse, ProtoListWorkflowsRequest,
194 ProtoListWorkflowsResponse, ProtoQueryRequest, ProtoQueryResponse,
195 ProtoStartWorkflowRequest, ProtoStartWorkflowResponse, proto_query_response,
196 };
197 use crate::convert::{
198 ProtoPayload, ProtoRunId, ProtoWorkflowId, decode_core_value, encode_core_value,
199 };
200 use crate::error::{ProtoWireError, WireError};
201
202 fn workflow_id() -> aion_core::WorkflowId {
203 aion_core::WorkflowId::new(uuid::Uuid::nil())
204 }
205
206 fn run_id() -> aion_core::RunId {
207 aion_core::RunId::new(uuid::Uuid::nil())
208 }
209
210 fn payload(label: &str) -> Result<ProtoPayload, aion_core::PayloadError> {
211 Ok(ProtoPayload::from(aion_core::Payload::from_json(
212 &json!({ "label": label }),
213 )?))
214 }
215
216 fn recorded_at() -> Result<DateTime<Utc>, chrono::ParseError> {
217 Ok(DateTime::parse_from_rfc3339("2026-01-01T00:00:00Z")?.with_timezone(&Utc))
218 }
219
220 fn assert_json_round_trip<T>(value: &T) -> Result<(), serde_json::Error>
221 where
222 T: Clone + PartialEq + serde::Serialize + DeserializeOwned,
223 {
224 let encoded = serde_json::to_string(value)?;
225 let decoded = serde_json::from_str::<T>(&encoded)?;
226 assert!(decoded == *value);
227 Ok(())
228 }
229
230 fn assert_proto_round_trip<T>(value: &T) -> Result<(), Box<dyn std::error::Error>>
231 where
232 T: Clone + PartialEq + Message + Default,
233 {
234 let mut bytes = Vec::new();
235 value.encode(&mut bytes)?;
236 let decoded = T::decode(bytes.as_slice())?;
237 assert!(decoded == *value);
238 Ok(())
239 }
240
241 #[test]
242 fn start_workflow_round_trips_json_and_proto() -> Result<(), Box<dyn std::error::Error>> {
243 let request = ProtoStartWorkflowRequest {
244 namespace: String::from("tenant-a"),
245 workflow_type: String::from("checkout"),
246 input: Some(payload("input")?),
247 };
248 let response = ProtoStartWorkflowResponse {
249 workflow_id: Some(ProtoWorkflowId::from(workflow_id())),
250 run_id: Some(ProtoRunId::from(run_id())),
251 };
252
253 assert_json_round_trip(&request)?;
254 assert_proto_round_trip(&request)?;
255 assert_json_round_trip(&response)?;
256 assert_proto_round_trip(&response)?;
257 Ok(())
258 }
259
260 #[test]
261 fn list_workflows_round_trips_json_and_proto() -> Result<(), Box<dyn std::error::Error>> {
262 let filter = ListWorkflowsFilter {
263 workflow_type: Some(String::from("checkout")),
264 status: Some(aion_core::WorkflowStatus::Running),
265 search_attributes: vec![SearchAttributePredicate::Equals {
266 name: String::from("customer_id"),
267 value: SearchAttributeValue::String(String::from("12345")),
268 }],
269 limit: Some(10),
270 offset: Some(5),
271 ..ListWorkflowsFilter::default()
272 };
273 let summary = aion_store::visibility::WorkflowSummary {
274 workflow_id: workflow_id(),
275 run_id: run_id(),
276 workflow_type: String::from("checkout"),
277 status: aion_core::WorkflowStatus::Running,
278 start_time: recorded_at()?,
279 close_time: None,
280 search_attributes: HashMap::from([(
281 String::from("customer_id"),
282 SearchAttributeValue::String(String::from("12345")),
283 )]),
284 };
285 let filter_envelope = encode_core_value("tenant-a", Some(String::from("r1")), &filter)?;
286 let summary_envelope = encode_core_value("tenant-a", None, &summary)?;
287 let request = ProtoListWorkflowsRequest {
288 namespace: String::from("tenant-a"),
289 filter: Some(filter_envelope.clone()),
290 };
291 let response = ProtoListWorkflowsResponse {
292 summaries: vec![summary_envelope.clone()],
293 };
294 let count_request = ProtoCountWorkflowsRequest {
295 namespace: String::from("tenant-a"),
296 filter: Some(filter_envelope.clone()),
297 };
298 let count_response = ProtoCountWorkflowsResponse { count: 1 };
299
300 assert_json_round_trip(&request)?;
301 assert_proto_round_trip(&request)?;
302 assert_json_round_trip(&response)?;
303 assert_proto_round_trip(&response)?;
304 assert_json_round_trip(&count_request)?;
305 assert_proto_round_trip(&count_request)?;
306 assert_json_round_trip(&count_response)?;
307 assert_proto_round_trip(&count_response)?;
308 assert_eq!(
309 decode_core_value::<ListWorkflowsFilter>(&filter_envelope)?,
310 filter
311 );
312 assert_eq!(
313 decode_core_value::<aion_store::visibility::WorkflowSummary>(&summary_envelope)?,
314 summary
315 );
316 Ok(())
317 }
318
319 #[test]
320 fn query_round_trips_json_and_proto() -> Result<(), Box<dyn std::error::Error>> {
321 let request = ProtoQueryRequest {
322 namespace: String::from("tenant-a"),
323 workflow_id: Some(ProtoWorkflowId::from(workflow_id())),
324 run_id: Some(ProtoRunId::from(run_id())),
325 query_name: String::from("state"),
326 };
327 let result_response = ProtoQueryResponse {
328 outcome: Some(proto_query_response::Outcome::Result(payload("result")?)),
329 };
330 let error_response = ProtoQueryResponse {
331 outcome: Some(proto_query_response::Outcome::Error(ProtoWireError::from(
332 WireError::unknown_query("state query is not registered"),
333 ))),
334 };
335
336 assert_json_round_trip(&request)?;
337 assert_proto_round_trip(&request)?;
338 assert_json_round_trip(&result_response)?;
339 assert_proto_round_trip(&result_response)?;
340 assert_json_round_trip(&error_response)?;
341 assert_proto_round_trip(&error_response)?;
342 Ok(())
343 }
344}