1use serde::{Serialize, de::DeserializeOwned};
4use uuid::Uuid;
5
6use crate::error::WireError;
7
8const JSON_CONTENT_TYPE: &str = "application/json";
9
10#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
12pub struct ProtoWorkflowId {
13 #[prost(string, tag = "1")]
15 pub uuid: String,
16}
17
18#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
20pub struct ProtoRunId {
21 #[prost(string, tag = "1")]
23 pub uuid: String,
24}
25
26#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
28pub struct ProtoScheduleId {
29 #[prost(string, tag = "1")]
31 pub uuid: String,
32}
33
34#[derive(Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
36pub struct ProtoActivityId {
37 #[prost(uint64, tag = "1")]
39 pub sequence_position: u64,
40}
41
42#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
44pub struct ProtoTimerId {
45 #[prost(oneof = "proto_timer_id::Kind", tags = "1, 2")]
47 pub kind: Option<proto_timer_id::Kind>,
48}
49
50pub mod proto_timer_id {
52 #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Oneof)]
54 pub enum Kind {
55 #[prost(string, tag = "1")]
57 Name(String),
58 #[prost(uint64, tag = "2")]
60 SequencePosition(u64),
61 }
62}
63
64#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
66pub struct ProtoPayload {
67 #[prost(string, tag = "1")]
69 pub content_type: String,
70 #[prost(bytes = "vec", tag = "2")]
72 pub bytes: Vec<u8>,
73}
74
75#[derive(
77 Clone,
78 Copy,
79 Debug,
80 PartialEq,
81 Eq,
82 Hash,
83 serde::Serialize,
84 serde::Deserialize,
85 prost::Enumeration,
86)]
87#[repr(i32)]
88pub enum ProtoWorkflowStatus {
89 Unspecified = 0,
91 Running = 1,
93 Completed = 2,
95 Failed = 3,
97 Cancelled = 4,
99 TimedOut = 5,
101 ContinuedAsNew = 6,
103}
104
105#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
107pub struct WireEnvelope {
108 #[prost(string, tag = "1")]
110 pub namespace: String,
111 #[prost(string, optional, tag = "2")]
113 pub request_id: Option<String>,
114 #[prost(message, optional, tag = "3")]
116 pub payload: Option<ProtoPayload>,
117}
118
119impl From<aion_core::WorkflowId> for ProtoWorkflowId {
120 fn from(value: aion_core::WorkflowId) -> Self {
121 Self {
122 uuid: value.as_uuid().to_string(),
123 }
124 }
125}
126
127impl TryFrom<ProtoWorkflowId> for aion_core::WorkflowId {
128 type Error = WireError;
129
130 fn try_from(value: ProtoWorkflowId) -> Result<Self, Self::Error> {
131 parse_uuid(&value.uuid, "workflow id").map(Self::new)
132 }
133}
134
135impl From<aion_core::RunId> for ProtoRunId {
136 fn from(value: aion_core::RunId) -> Self {
137 Self {
138 uuid: value.as_uuid().to_string(),
139 }
140 }
141}
142
143impl TryFrom<ProtoRunId> for aion_core::RunId {
144 type Error = WireError;
145
146 fn try_from(value: ProtoRunId) -> Result<Self, Self::Error> {
147 parse_uuid(&value.uuid, "run id").map(Self::new)
148 }
149}
150
151impl From<aion_core::ScheduleId> for ProtoScheduleId {
152 fn from(value: aion_core::ScheduleId) -> Self {
153 Self {
154 uuid: value.as_uuid().to_string(),
155 }
156 }
157}
158
159impl TryFrom<ProtoScheduleId> for aion_core::ScheduleId {
160 type Error = WireError;
161
162 fn try_from(value: ProtoScheduleId) -> Result<Self, Self::Error> {
163 parse_uuid(&value.uuid, "schedule id").map(Self::new)
164 }
165}
166
167impl From<aion_core::ActivityId> for ProtoActivityId {
168 fn from(value: aion_core::ActivityId) -> Self {
169 Self {
170 sequence_position: value.sequence_position(),
171 }
172 }
173}
174
175impl From<ProtoActivityId> for aion_core::ActivityId {
176 fn from(value: ProtoActivityId) -> Self {
177 Self::from_sequence_position(value.sequence_position)
178 }
179}
180
181impl From<aion_core::TimerId> for ProtoTimerId {
182 fn from(value: aion_core::TimerId) -> Self {
183 let kind = if let Some(name) = value.name() {
184 proto_timer_id::Kind::Name(String::from(name))
185 } else if let Some(sequence_position) = value.sequence_position() {
186 proto_timer_id::Kind::SequencePosition(sequence_position)
187 } else {
188 proto_timer_id::Kind::SequencePosition(0)
189 };
190 Self { kind: Some(kind) }
191 }
192}
193
194impl TryFrom<ProtoTimerId> for aion_core::TimerId {
195 type Error = WireError;
196
197 fn try_from(value: ProtoTimerId) -> Result<Self, Self::Error> {
198 match value.kind {
199 Some(proto_timer_id::Kind::Name(name)) => aion_core::TimerId::named(name)
200 .map_err(|_| WireError::backend("timer id name must not be empty")),
201 Some(proto_timer_id::Kind::SequencePosition(sequence_position)) => {
202 Ok(Self::anonymous(sequence_position))
203 }
204 None => Err(WireError::backend("timer id kind is missing")),
205 }
206 }
207}
208
209impl From<aion_core::Payload> for ProtoPayload {
210 fn from(value: aion_core::Payload) -> Self {
211 Self {
212 content_type: content_type_to_wire(value.content_type()),
213 bytes: value.bytes().to_vec(),
214 }
215 }
216}
217
218impl TryFrom<ProtoPayload> for aion_core::Payload {
219 type Error = WireError;
220
221 fn try_from(value: ProtoPayload) -> Result<Self, Self::Error> {
222 let content_type = content_type_from_wire(&value.content_type)?;
223 Ok(Self::new(content_type, value.bytes))
224 }
225}
226
227impl From<aion_core::WorkflowStatus> for ProtoWorkflowStatus {
228 fn from(value: aion_core::WorkflowStatus) -> Self {
229 match value {
230 aion_core::WorkflowStatus::Running => Self::Running,
231 aion_core::WorkflowStatus::Completed => Self::Completed,
232 aion_core::WorkflowStatus::Failed => Self::Failed,
233 aion_core::WorkflowStatus::Cancelled => Self::Cancelled,
234 aion_core::WorkflowStatus::TimedOut => Self::TimedOut,
235 aion_core::WorkflowStatus::ContinuedAsNew => Self::ContinuedAsNew,
236 }
237 }
238}
239
240impl TryFrom<ProtoWorkflowStatus> for aion_core::WorkflowStatus {
241 type Error = WireError;
242
243 fn try_from(value: ProtoWorkflowStatus) -> Result<Self, Self::Error> {
244 match value {
245 ProtoWorkflowStatus::Unspecified => {
246 Err(WireError::backend("workflow status is missing"))
247 }
248 ProtoWorkflowStatus::Running => Ok(Self::Running),
249 ProtoWorkflowStatus::Completed => Ok(Self::Completed),
250 ProtoWorkflowStatus::Failed => Ok(Self::Failed),
251 ProtoWorkflowStatus::Cancelled => Ok(Self::Cancelled),
252 ProtoWorkflowStatus::TimedOut => Ok(Self::TimedOut),
253 ProtoWorkflowStatus::ContinuedAsNew => Ok(Self::ContinuedAsNew),
254 }
255 }
256}
257
258pub fn encode_core_value<T>(
268 namespace: impl Into<String>,
269 request_id: Option<String>,
270 value: &T,
271) -> Result<WireEnvelope, WireError>
272where
273 T: Serialize,
274{
275 let bytes =
276 serde_json::to_vec(value).map_err(|_| WireError::backend("core value encode failed"))?;
277 Ok(WireEnvelope {
278 namespace: namespace.into(),
279 request_id,
280 payload: Some(ProtoPayload {
281 content_type: String::from(JSON_CONTENT_TYPE),
282 bytes,
283 }),
284 })
285}
286
287pub fn decode_core_value<T>(envelope: &WireEnvelope) -> Result<T, WireError>
298where
299 T: DeserializeOwned,
300{
301 let payload = envelope
302 .payload
303 .as_ref()
304 .ok_or_else(|| WireError::invalid_input("wire envelope payload is missing"))?;
305 if payload.content_type != JSON_CONTENT_TYPE {
306 return Err(WireError::invalid_input(
307 "wire envelope content type is unknown",
308 ));
309 }
310 serde_json::from_slice(&payload.bytes)
311 .map_err(|_| WireError::invalid_input("core value decode failed"))
312}
313
314pub fn encode_workflow_filter(
321 namespace: impl Into<String>,
322 request_id: Option<String>,
323 filter: &aion_core::WorkflowFilter,
324) -> Result<WireEnvelope, WireError> {
325 encode_core_value(namespace, request_id, filter)
326}
327
328pub fn decode_workflow_filter(
335 envelope: &WireEnvelope,
336) -> Result<aion_core::WorkflowFilter, WireError> {
337 decode_core_value(envelope)
338}
339
340pub fn encode_workflow_summary(
347 namespace: impl Into<String>,
348 request_id: Option<String>,
349 summary: &aion_core::WorkflowSummary,
350) -> Result<WireEnvelope, WireError> {
351 encode_core_value(namespace, request_id, summary)
352}
353
354pub fn decode_workflow_summary(
361 envelope: &WireEnvelope,
362) -> Result<aion_core::WorkflowSummary, WireError> {
363 decode_core_value(envelope)
364}
365
366pub fn encode_schedule_config(
372 namespace: impl Into<String>,
373 request_id: Option<String>,
374 config: &aion_core::ScheduleConfig,
375) -> Result<WireEnvelope, WireError> {
376 encode_core_value(namespace, request_id, config)
377}
378
379pub fn decode_schedule_config(
386 envelope: &WireEnvelope,
387) -> Result<aion_core::ScheduleConfig, WireError> {
388 decode_core_value(envelope)
389}
390
391pub fn encode_schedule_state<T>(
397 namespace: impl Into<String>,
398 request_id: Option<String>,
399 state: &T,
400) -> Result<WireEnvelope, WireError>
401where
402 T: Serialize,
403{
404 encode_core_value(namespace, request_id, state)
405}
406
407pub fn decode_schedule_state<T>(envelope: &WireEnvelope) -> Result<T, WireError>
414where
415 T: DeserializeOwned,
416{
417 decode_core_value(envelope)
418}
419
420pub fn encode_event(
427 namespace: impl Into<String>,
428 request_id: Option<String>,
429 event: &aion_core::Event,
430) -> Result<WireEnvelope, WireError> {
431 encode_core_value(namespace, request_id, event)
432}
433
434pub fn decode_event(envelope: &WireEnvelope) -> Result<aion_core::Event, WireError> {
441 decode_core_value(envelope)
442}
443
444fn parse_uuid(value: &str, label: &str) -> Result<Uuid, WireError> {
445 Uuid::parse_str(value)
446 .map_err(|_| WireError::invalid_input(format!("{label} uuid is malformed")))
447}
448
449fn content_type_to_wire(content_type: &aion_core::ContentType) -> String {
450 match content_type {
451 aion_core::ContentType::Json => String::from(JSON_CONTENT_TYPE),
452 }
453}
454
455fn content_type_from_wire(content_type: &str) -> Result<aion_core::ContentType, WireError> {
456 match content_type {
457 JSON_CONTENT_TYPE => Ok(aion_core::ContentType::Json),
458 _ => Err(WireError::invalid_input("payload content type is unknown")),
459 }
460}
461
462#[cfg(test)]
463mod tests {
464 use chrono::{DateTime, Utc};
465 use serde_json::json;
466
467 use super::{
468 ProtoActivityId, ProtoPayload, ProtoRunId, ProtoTimerId, ProtoWorkflowId,
469 ProtoWorkflowStatus, WireEnvelope, decode_core_value, encode_core_value, proto_timer_id,
470 };
471 use crate::error::WireError;
472
473 fn workflow_id() -> aion_core::WorkflowId {
474 aion_core::WorkflowId::new(uuid::Uuid::nil())
475 }
476
477 fn run_id() -> aion_core::RunId {
478 aion_core::RunId::new(uuid::Uuid::nil())
479 }
480
481 fn recorded_at() -> Result<DateTime<Utc>, chrono::ParseError> {
482 Ok(DateTime::parse_from_rfc3339("2026-01-01T00:00:00Z")?.with_timezone(&Utc))
483 }
484
485 fn event_envelope() -> Result<aion_core::EventEnvelope, chrono::ParseError> {
486 Ok(aion_core::EventEnvelope {
487 seq: 1,
488 recorded_at: recorded_at()?,
489 workflow_id: workflow_id(),
490 })
491 }
492
493 #[test]
494 fn workflow_id_round_trips_both_directions() -> Result<(), WireError> {
495 let core = workflow_id();
496 let proto = ProtoWorkflowId::from(core.clone());
497 assert_eq!(aion_core::WorkflowId::try_from(proto.clone())?, core);
498 assert_eq!(
499 ProtoWorkflowId::from(aion_core::WorkflowId::try_from(proto)?),
500 ProtoWorkflowId::from(core)
501 );
502 Ok(())
503 }
504
505 #[test]
506 fn run_id_round_trips_both_directions() -> Result<(), WireError> {
507 let core = run_id();
508 let proto = ProtoRunId::from(core.clone());
509 assert_eq!(aion_core::RunId::try_from(proto.clone())?, core);
510 assert_eq!(
511 ProtoRunId::from(aion_core::RunId::try_from(proto)?),
512 ProtoRunId::from(core)
513 );
514 Ok(())
515 }
516
517 #[test]
518 fn activity_id_round_trips_both_directions() {
519 let core = aion_core::ActivityId::from_sequence_position(42);
520 let proto = ProtoActivityId::from(core.clone());
521 assert_eq!(aion_core::ActivityId::from(proto), core);
522 assert_eq!(
523 ProtoActivityId::from(aion_core::ActivityId::from(proto)),
524 proto
525 );
526 }
527
528 #[test]
529 fn timer_id_round_trips_both_directions() -> Result<(), WireError> {
530 let named = aion_core::TimerId::named("deadline")
531 .map_err(|_| WireError::backend("test timer id could not be created"))?;
532 let anonymous = aion_core::TimerId::anonymous(7);
533
534 for core in [named, anonymous] {
535 let proto = ProtoTimerId::from(core.clone());
536 assert_eq!(aion_core::TimerId::try_from(proto.clone())?, core);
537 assert_eq!(
538 ProtoTimerId::from(aion_core::TimerId::try_from(proto)?),
539 ProtoTimerId::from(core)
540 );
541 }
542
543 Ok(())
544 }
545
546 #[test]
547 fn timer_id_rejects_missing_and_empty_name() {
548 let missing = ProtoTimerId { kind: None };
549 assert_eq!(
550 aion_core::TimerId::try_from(missing),
551 Err(WireError::backend("timer id kind is missing"))
552 );
553
554 let empty = ProtoTimerId {
555 kind: Some(proto_timer_id::Kind::Name(String::new())),
556 };
557 assert_eq!(
558 aion_core::TimerId::try_from(empty),
559 Err(WireError::backend("timer id name must not be empty"))
560 );
561 }
562
563 #[test]
564 fn payload_round_trips_all_json_kinds_and_raw_bytes() -> Result<(), WireError> {
565 let values = [
566 serde_json::Value::Null,
567 json!(true),
568 json!(123.45),
569 json!("hello"),
570 json!([null, false, 7, "item"]),
571 json!({"nested": {"value": 1}, "array": [true, false]}),
572 ];
573
574 for value in values {
575 let core = aion_core::Payload::from_json(&value)
576 .map_err(|_| WireError::backend("test payload could not be created"))?;
577 let proto = ProtoPayload::from(core.clone());
578 assert_eq!(proto.content_type, "application/json");
579 assert_eq!(proto.bytes, core.bytes());
580 assert_eq!(aion_core::Payload::try_from(proto.clone())?, core);
581 assert_eq!(
582 ProtoPayload::from(aion_core::Payload::try_from(proto)?),
583 ProtoPayload::from(core)
584 );
585 }
586
587 let raw = aion_core::Payload::new(aion_core::ContentType::Json, vec![0, 159, 146, 150]);
588 let proto = ProtoPayload::from(raw.clone());
589 assert_eq!(proto.bytes, raw.bytes());
590 assert_eq!(aion_core::Payload::try_from(proto)?, raw);
591 Ok(())
592 }
593
594 #[test]
595 fn workflow_status_round_trips_both_directions() -> Result<(), WireError> {
596 let statuses = [
597 aion_core::WorkflowStatus::Running,
598 aion_core::WorkflowStatus::Completed,
599 aion_core::WorkflowStatus::Failed,
600 aion_core::WorkflowStatus::Cancelled,
601 aion_core::WorkflowStatus::TimedOut,
602 aion_core::WorkflowStatus::ContinuedAsNew,
603 ];
604
605 for core in statuses {
606 let proto = ProtoWorkflowStatus::from(core);
607 assert_eq!(aion_core::WorkflowStatus::try_from(proto)?, core);
608 assert_eq!(
609 ProtoWorkflowStatus::from(aion_core::WorkflowStatus::try_from(proto)?),
610 proto
611 );
612 }
613
614 assert_eq!(
615 aion_core::WorkflowStatus::try_from(ProtoWorkflowStatus::Unspecified),
616 Err(WireError::backend("workflow status is missing"))
617 );
618 Ok(())
619 }
620
621 #[test]
622 fn core_event_round_trips_through_wire_envelope() -> Result<(), Box<dyn std::error::Error>> {
623 let event = aion_core::Event::WorkflowStarted {
624 envelope: event_envelope()?,
625 workflow_type: String::from("checkout"),
626 input: aion_core::Payload::from_json(&json!({ "cart": ["sku-1"] }))?,
627 run_id: aion_core::RunId::new(uuid::Uuid::from_u128(1)),
628 parent_run_id: None,
629 package_version: aion_core::PackageVersion::new("a".repeat(64)),
630 };
631
632 let envelope = encode_core_value("tenant-a", Some(String::from("request-1")), &event)?;
633 assert_eq!(envelope.namespace, "tenant-a");
634 assert_eq!(envelope.request_id.as_deref(), Some("request-1"));
635
636 let decoded: aion_core::Event = decode_core_value(&envelope)?;
637 assert_eq!(decoded, event);
638 Ok(())
639 }
640
641 #[test]
642 fn envelope_rejects_missing_payload() {
643 let envelope = WireEnvelope {
644 namespace: String::from("tenant-a"),
645 request_id: None,
646 payload: None,
647 };
648
649 let decoded = decode_core_value::<aion_core::Event>(&envelope);
650 assert_eq!(
651 decoded,
652 Err(WireError::invalid_input("wire envelope payload is missing"))
653 );
654 }
655}