Skip to main content

aion_proto/
convert.rs

1//! proto <-> aion-core conversions
2
3use serde::{Serialize, de::DeserializeOwned};
4use uuid::Uuid;
5
6use crate::error::WireError;
7
8const JSON_CONTENT_TYPE: &str = "application/json";
9
10/// Proto representation of `WorkflowId`.
11#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
12pub struct ProtoWorkflowId {
13    /// UUID encoded in canonical string form.
14    #[prost(string, tag = "1")]
15    pub uuid: String,
16}
17
18/// Proto representation of `RunId`.
19#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
20pub struct ProtoRunId {
21    /// UUID encoded in canonical string form.
22    #[prost(string, tag = "1")]
23    pub uuid: String,
24}
25
26/// Proto representation of `ScheduleId`.
27#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
28pub struct ProtoScheduleId {
29    /// UUID encoded in canonical string form.
30    #[prost(string, tag = "1")]
31    pub uuid: String,
32}
33
34/// Proto representation of `ActivityId`.
35#[derive(Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
36pub struct ProtoActivityId {
37    /// Scheduling sequence position.
38    #[prost(uint64, tag = "1")]
39    pub sequence_position: u64,
40}
41
42/// Proto representation of `TimerId`.
43#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
44pub struct ProtoTimerId {
45    /// Timer identifier kind.
46    #[prost(oneof = "proto_timer_id::Kind", tags = "1, 2")]
47    pub kind: Option<proto_timer_id::Kind>,
48}
49
50/// Types nested under [`ProtoTimerId`].
51pub mod proto_timer_id {
52    /// Proto oneof for named and anonymous timer identifiers.
53    #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Oneof)]
54    pub enum Kind {
55        /// Author-assigned timer name.
56        #[prost(string, tag = "1")]
57        Name(String),
58        /// Engine-assigned timer sequence position.
59        #[prost(uint64, tag = "2")]
60        SequencePosition(u64),
61    }
62}
63
64/// Proto representation of `Payload`.
65#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
66pub struct ProtoPayload {
67    /// Stable content type tag.
68    #[prost(string, tag = "1")]
69    pub content_type: String,
70    /// Opaque serialized bytes.
71    #[prost(bytes = "vec", tag = "2")]
72    pub bytes: Vec<u8>,
73}
74
75/// Proto representation of `WorkflowStatus`. Zero is invalid on decode.
76#[derive(
77    Clone,
78    Copy,
79    Debug,
80    PartialEq,
81    Eq,
82    Hash,
83    serde::Serialize,
84    serde::Deserialize,
85    prost::Enumeration,
86)]
87#[repr(i32)]
88pub enum ProtoWorkflowStatus {
89    /// Missing/invalid status.
90    Unspecified = 0,
91    /// Workflow is not terminal.
92    Running = 1,
93    /// Workflow completed successfully.
94    Completed = 2,
95    /// Workflow failed terminally.
96    Failed = 3,
97    /// Workflow was cancelled.
98    Cancelled = 4,
99    /// Workflow timed out.
100    TimedOut = 5,
101    /// Workflow continued as a new run.
102    ContinuedAsNew = 6,
103}
104
105/// Thin proto envelope carrying a serde-encoded aion-core value.
106#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
107pub struct WireEnvelope {
108    /// Namespace that scopes the enclosed value.
109    #[prost(string, tag = "1")]
110    pub namespace: String,
111    /// Optional caller request identifier.
112    #[prost(string, optional, tag = "2")]
113    pub request_id: Option<String>,
114    /// Serialized aion-core value.
115    #[prost(message, optional, tag = "3")]
116    pub payload: Option<ProtoPayload>,
117}
118
119impl From<aion_core::WorkflowId> for ProtoWorkflowId {
120    fn from(value: aion_core::WorkflowId) -> Self {
121        Self {
122            uuid: value.as_uuid().to_string(),
123        }
124    }
125}
126
127impl TryFrom<ProtoWorkflowId> for aion_core::WorkflowId {
128    type Error = WireError;
129
130    fn try_from(value: ProtoWorkflowId) -> Result<Self, Self::Error> {
131        parse_uuid(&value.uuid, "workflow id").map(Self::new)
132    }
133}
134
135impl From<aion_core::RunId> for ProtoRunId {
136    fn from(value: aion_core::RunId) -> Self {
137        Self {
138            uuid: value.as_uuid().to_string(),
139        }
140    }
141}
142
143impl TryFrom<ProtoRunId> for aion_core::RunId {
144    type Error = WireError;
145
146    fn try_from(value: ProtoRunId) -> Result<Self, Self::Error> {
147        parse_uuid(&value.uuid, "run id").map(Self::new)
148    }
149}
150
151impl From<aion_core::ScheduleId> for ProtoScheduleId {
152    fn from(value: aion_core::ScheduleId) -> Self {
153        Self {
154            uuid: value.as_uuid().to_string(),
155        }
156    }
157}
158
159impl TryFrom<ProtoScheduleId> for aion_core::ScheduleId {
160    type Error = WireError;
161
162    fn try_from(value: ProtoScheduleId) -> Result<Self, Self::Error> {
163        parse_uuid(&value.uuid, "schedule id").map(Self::new)
164    }
165}
166
167impl From<aion_core::ActivityId> for ProtoActivityId {
168    fn from(value: aion_core::ActivityId) -> Self {
169        Self {
170            sequence_position: value.sequence_position(),
171        }
172    }
173}
174
175impl From<ProtoActivityId> for aion_core::ActivityId {
176    fn from(value: ProtoActivityId) -> Self {
177        Self::from_sequence_position(value.sequence_position)
178    }
179}
180
181impl From<aion_core::TimerId> for ProtoTimerId {
182    fn from(value: aion_core::TimerId) -> Self {
183        let kind = if let Some(name) = value.name() {
184            proto_timer_id::Kind::Name(String::from(name))
185        } else if let Some(sequence_position) = value.sequence_position() {
186            proto_timer_id::Kind::SequencePosition(sequence_position)
187        } else {
188            proto_timer_id::Kind::SequencePosition(0)
189        };
190        Self { kind: Some(kind) }
191    }
192}
193
194impl TryFrom<ProtoTimerId> for aion_core::TimerId {
195    type Error = WireError;
196
197    fn try_from(value: ProtoTimerId) -> Result<Self, Self::Error> {
198        match value.kind {
199            Some(proto_timer_id::Kind::Name(name)) => aion_core::TimerId::named(name)
200                .map_err(|_| WireError::backend("timer id name must not be empty")),
201            Some(proto_timer_id::Kind::SequencePosition(sequence_position)) => {
202                Ok(Self::anonymous(sequence_position))
203            }
204            None => Err(WireError::backend("timer id kind is missing")),
205        }
206    }
207}
208
209impl From<aion_core::Payload> for ProtoPayload {
210    fn from(value: aion_core::Payload) -> Self {
211        Self {
212            content_type: content_type_to_wire(value.content_type()),
213            bytes: value.bytes().to_vec(),
214        }
215    }
216}
217
218impl TryFrom<ProtoPayload> for aion_core::Payload {
219    type Error = WireError;
220
221    fn try_from(value: ProtoPayload) -> Result<Self, Self::Error> {
222        let content_type = content_type_from_wire(&value.content_type)?;
223        Ok(Self::new(content_type, value.bytes))
224    }
225}
226
227impl From<aion_core::WorkflowStatus> for ProtoWorkflowStatus {
228    fn from(value: aion_core::WorkflowStatus) -> Self {
229        match value {
230            aion_core::WorkflowStatus::Running => Self::Running,
231            aion_core::WorkflowStatus::Completed => Self::Completed,
232            aion_core::WorkflowStatus::Failed => Self::Failed,
233            aion_core::WorkflowStatus::Cancelled => Self::Cancelled,
234            aion_core::WorkflowStatus::TimedOut => Self::TimedOut,
235            aion_core::WorkflowStatus::ContinuedAsNew => Self::ContinuedAsNew,
236        }
237    }
238}
239
240impl TryFrom<ProtoWorkflowStatus> for aion_core::WorkflowStatus {
241    type Error = WireError;
242
243    fn try_from(value: ProtoWorkflowStatus) -> Result<Self, Self::Error> {
244        match value {
245            ProtoWorkflowStatus::Unspecified => {
246                Err(WireError::backend("workflow status is missing"))
247            }
248            ProtoWorkflowStatus::Running => Ok(Self::Running),
249            ProtoWorkflowStatus::Completed => Ok(Self::Completed),
250            ProtoWorkflowStatus::Failed => Ok(Self::Failed),
251            ProtoWorkflowStatus::Cancelled => Ok(Self::Cancelled),
252            ProtoWorkflowStatus::TimedOut => Ok(Self::TimedOut),
253            ProtoWorkflowStatus::ContinuedAsNew => Ok(Self::ContinuedAsNew),
254        }
255    }
256}
257
258/// Serializes a core serde value into a thin wire envelope.
259///
260/// This helper is used for core `Event`, `WorkflowFilter`, and
261/// `WorkflowSummary` values without declaring wire-clone structs for them.
262///
263/// # Errors
264///
265/// Returns [`WireError`] with code `backend` if the core value cannot be
266/// serialized by serde JSON.
267pub fn encode_core_value<T>(
268    namespace: impl Into<String>,
269    request_id: Option<String>,
270    value: &T,
271) -> Result<WireEnvelope, WireError>
272where
273    T: Serialize,
274{
275    let bytes =
276        serde_json::to_vec(value).map_err(|_| WireError::backend("core value encode failed"))?;
277    Ok(WireEnvelope {
278        namespace: namespace.into(),
279        request_id,
280        payload: Some(ProtoPayload {
281            content_type: String::from(JSON_CONTENT_TYPE),
282            bytes,
283        }),
284    })
285}
286
287/// Deserializes a core serde value from a thin wire envelope.
288///
289/// Callers choose the target aion-core type, such as `Event`,
290/// `WorkflowFilter`, or `WorkflowSummary`.
291///
292/// # Errors
293///
294/// Returns [`WireError`] with code `backend` if the envelope payload is
295/// missing, uses an unknown content type, or cannot be deserialized as the
296/// requested core type.
297pub fn decode_core_value<T>(envelope: &WireEnvelope) -> Result<T, WireError>
298where
299    T: DeserializeOwned,
300{
301    let payload = envelope
302        .payload
303        .as_ref()
304        .ok_or_else(|| WireError::invalid_input("wire envelope payload is missing"))?;
305    if payload.content_type != JSON_CONTENT_TYPE {
306        return Err(WireError::invalid_input(
307            "wire envelope content type is unknown",
308        ));
309    }
310    serde_json::from_slice(&payload.bytes)
311        .map_err(|_| WireError::invalid_input("core value decode failed"))
312}
313
314/// Serializes a workflow filter into a thin wire envelope.
315///
316/// # Errors
317///
318/// Returns [`WireError`] if the core filter cannot be serialized into the
319/// envelope payload.
320pub fn encode_workflow_filter(
321    namespace: impl Into<String>,
322    request_id: Option<String>,
323    filter: &aion_core::WorkflowFilter,
324) -> Result<WireEnvelope, WireError> {
325    encode_core_value(namespace, request_id, filter)
326}
327
328/// Deserializes a workflow filter from a thin wire envelope.
329///
330/// # Errors
331///
332/// Returns [`WireError`] if the envelope is missing a payload, has an unknown
333/// content type, or does not contain a valid workflow filter.
334pub fn decode_workflow_filter(
335    envelope: &WireEnvelope,
336) -> Result<aion_core::WorkflowFilter, WireError> {
337    decode_core_value(envelope)
338}
339
340/// Serializes a workflow summary into a thin wire envelope.
341///
342/// # Errors
343///
344/// Returns [`WireError`] if the core summary cannot be serialized into the
345/// envelope payload.
346pub fn encode_workflow_summary(
347    namespace: impl Into<String>,
348    request_id: Option<String>,
349    summary: &aion_core::WorkflowSummary,
350) -> Result<WireEnvelope, WireError> {
351    encode_core_value(namespace, request_id, summary)
352}
353
354/// Deserializes a workflow summary from a thin wire envelope.
355///
356/// # Errors
357///
358/// Returns [`WireError`] if the envelope is missing a payload, has an unknown
359/// content type, or does not contain a valid workflow summary.
360pub fn decode_workflow_summary(
361    envelope: &WireEnvelope,
362) -> Result<aion_core::WorkflowSummary, WireError> {
363    decode_core_value(envelope)
364}
365
366/// Serializes a schedule config into a thin wire envelope.
367///
368/// # Errors
369///
370/// Returns [`WireError`] if the core config cannot be serialized into the envelope payload.
371pub fn encode_schedule_config(
372    namespace: impl Into<String>,
373    request_id: Option<String>,
374    config: &aion_core::ScheduleConfig,
375) -> Result<WireEnvelope, WireError> {
376    encode_core_value(namespace, request_id, config)
377}
378
379/// Deserializes a schedule config from a thin wire envelope.
380///
381/// # Errors
382///
383/// Returns [`WireError`] if the envelope is missing a payload, has an unknown content type, or does
384/// not contain a valid schedule config.
385pub fn decode_schedule_config(
386    envelope: &WireEnvelope,
387) -> Result<aion_core::ScheduleConfig, WireError> {
388    decode_core_value(envelope)
389}
390
391/// Serializes a schedule state into a thin wire envelope.
392///
393/// # Errors
394///
395/// Returns [`WireError`] if the schedule state cannot be serialized into the envelope payload.
396pub fn encode_schedule_state<T>(
397    namespace: impl Into<String>,
398    request_id: Option<String>,
399    state: &T,
400) -> Result<WireEnvelope, WireError>
401where
402    T: Serialize,
403{
404    encode_core_value(namespace, request_id, state)
405}
406
407/// Deserializes a schedule state from a thin wire envelope.
408///
409/// # Errors
410///
411/// Returns [`WireError`] if the envelope is missing a payload, has an unknown content type, or does
412/// not contain the requested schedule state representation.
413pub fn decode_schedule_state<T>(envelope: &WireEnvelope) -> Result<T, WireError>
414where
415    T: DeserializeOwned,
416{
417    decode_core_value(envelope)
418}
419
420/// Serializes a workflow event into a thin wire envelope.
421///
422/// # Errors
423///
424/// Returns [`WireError`] if the core event cannot be serialized into the
425/// envelope payload.
426pub fn encode_event(
427    namespace: impl Into<String>,
428    request_id: Option<String>,
429    event: &aion_core::Event,
430) -> Result<WireEnvelope, WireError> {
431    encode_core_value(namespace, request_id, event)
432}
433
434/// Deserializes a workflow event from a thin wire envelope.
435///
436/// # Errors
437///
438/// Returns [`WireError`] if the envelope is missing a payload, has an unknown
439/// content type, or does not contain a valid workflow event.
440pub fn decode_event(envelope: &WireEnvelope) -> Result<aion_core::Event, WireError> {
441    decode_core_value(envelope)
442}
443
444fn parse_uuid(value: &str, label: &str) -> Result<Uuid, WireError> {
445    Uuid::parse_str(value)
446        .map_err(|_| WireError::invalid_input(format!("{label} uuid is malformed")))
447}
448
449fn content_type_to_wire(content_type: &aion_core::ContentType) -> String {
450    match content_type {
451        aion_core::ContentType::Json => String::from(JSON_CONTENT_TYPE),
452    }
453}
454
455fn content_type_from_wire(content_type: &str) -> Result<aion_core::ContentType, WireError> {
456    match content_type {
457        JSON_CONTENT_TYPE => Ok(aion_core::ContentType::Json),
458        _ => Err(WireError::invalid_input("payload content type is unknown")),
459    }
460}
461
462#[cfg(test)]
463mod tests {
464    use chrono::{DateTime, Utc};
465    use serde_json::json;
466
467    use super::{
468        ProtoActivityId, ProtoPayload, ProtoRunId, ProtoTimerId, ProtoWorkflowId,
469        ProtoWorkflowStatus, WireEnvelope, decode_core_value, encode_core_value, proto_timer_id,
470    };
471    use crate::error::WireError;
472
473    fn workflow_id() -> aion_core::WorkflowId {
474        aion_core::WorkflowId::new(uuid::Uuid::nil())
475    }
476
477    fn run_id() -> aion_core::RunId {
478        aion_core::RunId::new(uuid::Uuid::nil())
479    }
480
481    fn recorded_at() -> Result<DateTime<Utc>, chrono::ParseError> {
482        Ok(DateTime::parse_from_rfc3339("2026-01-01T00:00:00Z")?.with_timezone(&Utc))
483    }
484
485    fn event_envelope() -> Result<aion_core::EventEnvelope, chrono::ParseError> {
486        Ok(aion_core::EventEnvelope {
487            seq: 1,
488            recorded_at: recorded_at()?,
489            workflow_id: workflow_id(),
490        })
491    }
492
493    #[test]
494    fn workflow_id_round_trips_both_directions() -> Result<(), WireError> {
495        let core = workflow_id();
496        let proto = ProtoWorkflowId::from(core.clone());
497        assert_eq!(aion_core::WorkflowId::try_from(proto.clone())?, core);
498        assert_eq!(
499            ProtoWorkflowId::from(aion_core::WorkflowId::try_from(proto)?),
500            ProtoWorkflowId::from(core)
501        );
502        Ok(())
503    }
504
505    #[test]
506    fn run_id_round_trips_both_directions() -> Result<(), WireError> {
507        let core = run_id();
508        let proto = ProtoRunId::from(core.clone());
509        assert_eq!(aion_core::RunId::try_from(proto.clone())?, core);
510        assert_eq!(
511            ProtoRunId::from(aion_core::RunId::try_from(proto)?),
512            ProtoRunId::from(core)
513        );
514        Ok(())
515    }
516
517    #[test]
518    fn activity_id_round_trips_both_directions() {
519        let core = aion_core::ActivityId::from_sequence_position(42);
520        let proto = ProtoActivityId::from(core.clone());
521        assert_eq!(aion_core::ActivityId::from(proto), core);
522        assert_eq!(
523            ProtoActivityId::from(aion_core::ActivityId::from(proto)),
524            proto
525        );
526    }
527
528    #[test]
529    fn timer_id_round_trips_both_directions() -> Result<(), WireError> {
530        let named = aion_core::TimerId::named("deadline")
531            .map_err(|_| WireError::backend("test timer id could not be created"))?;
532        let anonymous = aion_core::TimerId::anonymous(7);
533
534        for core in [named, anonymous] {
535            let proto = ProtoTimerId::from(core.clone());
536            assert_eq!(aion_core::TimerId::try_from(proto.clone())?, core);
537            assert_eq!(
538                ProtoTimerId::from(aion_core::TimerId::try_from(proto)?),
539                ProtoTimerId::from(core)
540            );
541        }
542
543        Ok(())
544    }
545
546    #[test]
547    fn timer_id_rejects_missing_and_empty_name() {
548        let missing = ProtoTimerId { kind: None };
549        assert_eq!(
550            aion_core::TimerId::try_from(missing),
551            Err(WireError::backend("timer id kind is missing"))
552        );
553
554        let empty = ProtoTimerId {
555            kind: Some(proto_timer_id::Kind::Name(String::new())),
556        };
557        assert_eq!(
558            aion_core::TimerId::try_from(empty),
559            Err(WireError::backend("timer id name must not be empty"))
560        );
561    }
562
563    #[test]
564    fn payload_round_trips_all_json_kinds_and_raw_bytes() -> Result<(), WireError> {
565        let values = [
566            serde_json::Value::Null,
567            json!(true),
568            json!(123.45),
569            json!("hello"),
570            json!([null, false, 7, "item"]),
571            json!({"nested": {"value": 1}, "array": [true, false]}),
572        ];
573
574        for value in values {
575            let core = aion_core::Payload::from_json(&value)
576                .map_err(|_| WireError::backend("test payload could not be created"))?;
577            let proto = ProtoPayload::from(core.clone());
578            assert_eq!(proto.content_type, "application/json");
579            assert_eq!(proto.bytes, core.bytes());
580            assert_eq!(aion_core::Payload::try_from(proto.clone())?, core);
581            assert_eq!(
582                ProtoPayload::from(aion_core::Payload::try_from(proto)?),
583                ProtoPayload::from(core)
584            );
585        }
586
587        let raw = aion_core::Payload::new(aion_core::ContentType::Json, vec![0, 159, 146, 150]);
588        let proto = ProtoPayload::from(raw.clone());
589        assert_eq!(proto.bytes, raw.bytes());
590        assert_eq!(aion_core::Payload::try_from(proto)?, raw);
591        Ok(())
592    }
593
594    #[test]
595    fn workflow_status_round_trips_both_directions() -> Result<(), WireError> {
596        let statuses = [
597            aion_core::WorkflowStatus::Running,
598            aion_core::WorkflowStatus::Completed,
599            aion_core::WorkflowStatus::Failed,
600            aion_core::WorkflowStatus::Cancelled,
601            aion_core::WorkflowStatus::TimedOut,
602            aion_core::WorkflowStatus::ContinuedAsNew,
603        ];
604
605        for core in statuses {
606            let proto = ProtoWorkflowStatus::from(core);
607            assert_eq!(aion_core::WorkflowStatus::try_from(proto)?, core);
608            assert_eq!(
609                ProtoWorkflowStatus::from(aion_core::WorkflowStatus::try_from(proto)?),
610                proto
611            );
612        }
613
614        assert_eq!(
615            aion_core::WorkflowStatus::try_from(ProtoWorkflowStatus::Unspecified),
616            Err(WireError::backend("workflow status is missing"))
617        );
618        Ok(())
619    }
620
621    #[test]
622    fn core_event_round_trips_through_wire_envelope() -> Result<(), Box<dyn std::error::Error>> {
623        let event = aion_core::Event::WorkflowStarted {
624            envelope: event_envelope()?,
625            workflow_type: String::from("checkout"),
626            input: aion_core::Payload::from_json(&json!({ "cart": ["sku-1"] }))?,
627            run_id: aion_core::RunId::new(uuid::Uuid::from_u128(1)),
628            parent_run_id: None,
629            package_version: aion_core::PackageVersion::new("a".repeat(64)),
630        };
631
632        let envelope = encode_core_value("tenant-a", Some(String::from("request-1")), &event)?;
633        assert_eq!(envelope.namespace, "tenant-a");
634        assert_eq!(envelope.request_id.as_deref(), Some("request-1"));
635
636        let decoded: aion_core::Event = decode_core_value(&envelope)?;
637        assert_eq!(decoded, event);
638        Ok(())
639    }
640
641    #[test]
642    fn envelope_rejects_missing_payload() {
643        let envelope = WireEnvelope {
644            namespace: String::from("tenant-a"),
645            request_id: None,
646            payload: None,
647        };
648
649        let decoded = decode_core_value::<aion_core::Event>(&envelope);
650        assert_eq!(
651            decoded,
652            Err(WireError::invalid_input("wire envelope payload is missing"))
653        );
654    }
655}