Skip to main content

temporalio_common/
data_converters.rs

1//! Contains traits for and default implementations of data converters, codecs, and other
2//! serialization related functionality.
3
4mod failure_converter;
5
6pub use failure_converter::{
7    ActivityExecutionDecodeHint, ChildWorkflowExecutionDecodeHint, ChildWorkflowSignalDecodeHint,
8    ChildWorkflowStartDecodeHint, DefaultFailureConverter, FailureConverter, FailureDecodeHint,
9};
10
11use crate::protos::temporal::api::common::v1::Payload;
12use futures::{FutureExt, future::BoxFuture};
13use std::{collections::HashMap, sync::Arc};
14
15/// Combines a [`PayloadConverter`], [`FailureConverter`], and [`PayloadCodec`] to handle all
16/// serialization needs for communicating with the Temporal server.
17#[derive(Clone)]
18pub struct DataConverter {
19    payload_converter: PayloadConverter,
20    #[allow(dead_code)] // Will be used for failure conversion
21    failure_converter: Arc<dyn FailureConverter + Send + Sync>,
22    codec: Arc<dyn PayloadCodec + Send + Sync>,
23}
24
25impl std::fmt::Debug for DataConverter {
26    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27        f.debug_struct("DataConverter")
28            .field("payload_converter", &self.payload_converter)
29            .finish_non_exhaustive()
30    }
31}
32
33impl DataConverter {
34    /// Create a new DataConverter with the given payload converter, failure converter, and codec.
35    pub fn new(
36        payload_converter: PayloadConverter,
37        failure_converter: impl FailureConverter + Send + Sync + 'static,
38        codec: impl PayloadCodec + Send + Sync + 'static,
39    ) -> Self {
40        Self {
41            payload_converter,
42            failure_converter: Arc::new(failure_converter),
43            codec: Arc::new(codec),
44        }
45    }
46
47    /// Serialize a value into a single payload, applying the codec.
48    pub async fn to_payload<T: TemporalSerializable + 'static>(
49        &self,
50        data: &SerializationContextData,
51        val: &T,
52    ) -> Result<Payload, PayloadConversionError> {
53        let context = SerializationContext {
54            data,
55            converter: &self.payload_converter,
56        };
57        let payload = self.payload_converter.to_payload(&context, val)?;
58        let encoded = self.codec.encode(data, vec![payload]).await;
59        encoded
60            .into_iter()
61            .next()
62            .ok_or(PayloadConversionError::WrongEncoding)
63    }
64
65    /// Deserialize a value from a single payload, applying the codec.
66    pub async fn from_payload<T: TemporalDeserializable + 'static>(
67        &self,
68        data: &SerializationContextData,
69        payload: Payload,
70    ) -> Result<T, PayloadConversionError> {
71        let context = SerializationContext {
72            data,
73            converter: &self.payload_converter,
74        };
75        let decoded = self.codec.decode(data, vec![payload]).await;
76        let payload = decoded
77            .into_iter()
78            .next()
79            .ok_or(PayloadConversionError::WrongEncoding)?;
80        self.payload_converter.from_payload(&context, payload)
81    }
82
83    /// Serialize a value into multiple payloads (e.g. for multi-arg support), applying the codec.
84    pub async fn to_payloads<T: TemporalSerializable + 'static>(
85        &self,
86        data: &SerializationContextData,
87        val: &T,
88    ) -> Result<Vec<Payload>, PayloadConversionError> {
89        let context = SerializationContext {
90            data,
91            converter: &self.payload_converter,
92        };
93        let payloads = self.payload_converter.to_payloads(&context, val)?;
94        Ok(self.codec.encode(data, payloads).await)
95    }
96
97    /// Deserialize a value from multiple payloads (e.g. for multi-arg support), applying the codec.
98    pub async fn from_payloads<T: TemporalDeserializable + 'static>(
99        &self,
100        data: &SerializationContextData,
101        payloads: Vec<Payload>,
102    ) -> Result<T, PayloadConversionError> {
103        let context = SerializationContext {
104            data,
105            converter: &self.payload_converter,
106        };
107        let decoded = self.codec.decode(data, payloads).await;
108        self.payload_converter.from_payloads(&context, decoded)
109    }
110
111    /// Returns the payload converter component of this data converter.
112    pub fn payload_converter(&self) -> &PayloadConverter {
113        &self.payload_converter
114    }
115
116    /// Returns the failure converter component of this data converter.
117    pub fn failure_converter(&self) -> &(dyn FailureConverter + Send + Sync) {
118        self.failure_converter.as_ref()
119    }
120
121    /// Decode a Temporal failure into a caller-facing Rust error surface.
122    pub fn to_error<H: FailureDecodeHint>(
123        &self,
124        context: &SerializationContextData,
125        failure: crate::protos::temporal::api::failure::v1::Failure,
126        hint: H,
127    ) -> Result<H::Output, PayloadConversionError> {
128        let normalized =
129            self.failure_converter
130                .to_error(failure, &self.payload_converter, context)?;
131        Ok(hint.adapt(normalized))
132    }
133
134    /// Encode a typed Rust error surface into a Temporal failure.
135    pub fn to_failure(
136        &self,
137        context: &SerializationContextData,
138        error: crate::error::OutgoingError,
139    ) -> crate::protos::temporal::api::failure::v1::Failure {
140        self.failure_converter
141            .to_failure(error, &self.payload_converter, context)
142    }
143
144    /// Returns the codec component of this data converter.
145    pub fn codec(&self) -> &(dyn PayloadCodec + Send + Sync) {
146        self.codec.as_ref()
147    }
148}
149
150/// Data about the serialization context, indicating where the serialization is occurring.
151#[derive(Clone, Copy, Debug, PartialEq, Eq)]
152pub enum SerializationContextData {
153    /// Serialization is occurring in a workflow context.
154    Workflow,
155    /// Serialization is occurring in an activity context.
156    Activity,
157    /// Serialization is occurring in a nexus context.
158    Nexus,
159    /// No specific serialization context.
160    None,
161}
162
163/// Context for serialization operations, including the kind of context and the
164/// payload converter for nested serialization.
165#[derive(Clone, Copy)]
166pub struct SerializationContext<'a> {
167    /// The kind of serialization context (workflow, activity, etc.).
168    pub data: &'a SerializationContextData,
169    /// Allows nested types to serialize their contents using the same converter.
170    pub converter: &'a PayloadConverter,
171}
172/// Converts values to and from [`Payload`]s using different encoding strategies.
173#[derive(Clone)]
174pub enum PayloadConverter {
175    /// Uses a serde-based converter for encoding/decoding.
176    Serde(Arc<dyn ErasedSerdePayloadConverter>),
177    /// This variant signals the user wants to delegate to wrapper types
178    UseWrappers,
179    /// Tries multiple converters in order until one succeeds.
180    Composite(Arc<CompositePayloadConverter>),
181}
182
183impl std::fmt::Debug for PayloadConverter {
184    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
185        match self {
186            PayloadConverter::Serde(_) => write!(f, "PayloadConverter::Serde(...)"),
187            PayloadConverter::UseWrappers => write!(f, "PayloadConverter::UseWrappers"),
188            PayloadConverter::Composite(_) => write!(f, "PayloadConverter::Composite(...)"),
189        }
190    }
191}
192impl PayloadConverter {
193    /// Create a payload converter that uses JSON serialization via serde.
194    pub fn serde_json() -> Self {
195        Self::Serde(Arc::new(SerdeJsonPayloadConverter))
196    }
197    // TODO [rust-sdk-branch]: Proto binary, other standard built-ins
198}
199
200impl Default for PayloadConverter {
201    fn default() -> Self {
202        Self::Composite(Arc::new(CompositePayloadConverter {
203            converters: vec![Self::UseWrappers, Self::serde_json()],
204        }))
205    }
206}
207
208/// Errors that can occur during payload conversion.
209#[derive(Debug)]
210pub enum PayloadConversionError {
211    /// The payload's encoding does not match what the converter expects.
212    WrongEncoding,
213    /// An error occurred during encoding or decoding.
214    EncodingError(Box<dyn std::error::Error + Send + Sync>),
215}
216
217impl std::fmt::Display for PayloadConversionError {
218    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
219        match self {
220            PayloadConversionError::WrongEncoding => write!(f, "Wrong encoding"),
221            PayloadConversionError::EncodingError(err) => write!(f, "Encoding error: {}", err),
222        }
223    }
224}
225
226impl std::error::Error for PayloadConversionError {
227    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
228        match self {
229            PayloadConversionError::WrongEncoding => None,
230            PayloadConversionError::EncodingError(err) => Some(err.as_ref()),
231        }
232    }
233}
234
235/// Encodes and decodes payloads, enabling encryption or compression.
236pub trait PayloadCodec {
237    /// Encode payloads before they are sent to the server.
238    fn encode(
239        &self,
240        context: &SerializationContextData,
241        payloads: Vec<Payload>,
242    ) -> BoxFuture<'static, Vec<Payload>>;
243    /// Decode payloads after they are received from the server.
244    fn decode(
245        &self,
246        context: &SerializationContextData,
247        payloads: Vec<Payload>,
248    ) -> BoxFuture<'static, Vec<Payload>>;
249}
250
251impl<T: PayloadCodec> PayloadCodec for Arc<T> {
252    fn encode(
253        &self,
254        context: &SerializationContextData,
255        payloads: Vec<Payload>,
256    ) -> BoxFuture<'static, Vec<Payload>> {
257        (**self).encode(context, payloads)
258    }
259    fn decode(
260        &self,
261        context: &SerializationContextData,
262        payloads: Vec<Payload>,
263    ) -> BoxFuture<'static, Vec<Payload>> {
264        (**self).decode(context, payloads)
265    }
266}
267
268/// A no-op codec that passes payloads through unchanged.
269pub struct DefaultPayloadCodec;
270
271/// Indicates some type can be serialized for use with Temporal.
272///
273/// You don't need to implement this unless you are using a non-serde-compatible custom converter,
274/// in which case you should implement the to/from_payload functions on some wrapper type.
275pub trait TemporalSerializable {
276    /// Return a reference to this value as a serde-serializable trait object.
277    fn as_serde(&self) -> Result<&dyn erased_serde::Serialize, PayloadConversionError> {
278        Err(PayloadConversionError::WrongEncoding)
279    }
280    /// Convert this value into a single [`Payload`].
281    fn to_payload(&self, _: &SerializationContext<'_>) -> Result<Payload, PayloadConversionError> {
282        Err(PayloadConversionError::WrongEncoding)
283    }
284    /// Convert to multiple payloads. Override this for types representing multiple arguments.
285    fn to_payloads(
286        &self,
287        ctx: &SerializationContext<'_>,
288    ) -> Result<Vec<Payload>, PayloadConversionError> {
289        Ok(vec![self.to_payload(ctx)?])
290    }
291}
292
293/// Indicates some type can be deserialized for use with Temporal.
294///
295/// You don't need to implement this unless you are using a non-serde-compatible custom converter,
296/// in which case you should implement the to/from_payload functions on some wrapper type.
297pub trait TemporalDeserializable: Sized {
298    /// Deserialize from a serde-based payload converter.
299    fn from_serde(
300        _: &dyn ErasedSerdePayloadConverter,
301        _ctx: &SerializationContext<'_>,
302        _: Payload,
303    ) -> Result<Self, PayloadConversionError> {
304        Err(PayloadConversionError::WrongEncoding)
305    }
306    /// Deserialize from a single [`Payload`].
307    fn from_payload(
308        ctx: &SerializationContext<'_>,
309        payload: Payload,
310    ) -> Result<Self, PayloadConversionError> {
311        let _ = (ctx, payload);
312        Err(PayloadConversionError::WrongEncoding)
313    }
314    /// Convert from multiple payloads. Override this for types representing multiple arguments.
315    fn from_payloads(
316        ctx: &SerializationContext<'_>,
317        payloads: Vec<Payload>,
318    ) -> Result<Self, PayloadConversionError> {
319        if payloads.len() != 1 {
320            return Err(PayloadConversionError::WrongEncoding);
321        }
322        Self::from_payload(ctx, payloads.into_iter().next().unwrap())
323    }
324}
325
326/// A codec-decoded set of payloads that can be deserialized later with to a user provided type.
327#[derive(Clone, Debug)]
328pub struct DecodablePayloads {
329    payloads: Vec<Payload>,
330    payload_converter: PayloadConverter,
331    context: SerializationContextData,
332}
333
334impl DecodablePayloads {
335    /// Create a new decodable payload set from raw payloads and the converter context needed to
336    /// deserialize them later.
337    pub fn new(
338        payloads: Vec<Payload>,
339        payload_converter: PayloadConverter,
340        context: SerializationContextData,
341    ) -> Self {
342        Self {
343            payloads,
344            payload_converter,
345            context,
346        }
347    }
348
349    /// Deserialize these payloads into a typed value using the stored payload converter.
350    pub fn deserialize<T: TemporalDeserializable + 'static>(
351        &self,
352    ) -> Result<T, PayloadConversionError> {
353        self.payload_converter.from_payloads(
354            &SerializationContext {
355                data: &self.context,
356                converter: &self.payload_converter,
357            },
358            self.payloads.clone(),
359        )
360    }
361
362    /// Returns the underlying payloads.
363    pub fn raw(&self) -> &[Payload] {
364        &self.payloads
365    }
366
367    /// Consume this value and return the underlying payloads as a [`RawValue`].
368    pub fn into_raw(self) -> RawValue {
369        RawValue::new(self.payloads)
370    }
371}
372
373/// An unconverted set of payloads, used when the caller wants to defer deserialization.
374#[derive(Clone, Debug, Default)]
375pub struct RawValue {
376    /// The underlying payloads.
377    pub payloads: Vec<Payload>,
378}
379impl RawValue {
380    /// A RawValue representing no meaningful data, containing a single default payload.
381    /// This ensures the value can still be serialized as a single payload.
382    pub fn empty() -> Self {
383        Self {
384            payloads: vec![Payload::default()],
385        }
386    }
387
388    /// Create a new RawValue from a vector of payloads.
389    pub fn new(payloads: Vec<Payload>) -> Self {
390        Self { payloads }
391    }
392
393    /// Create a [`RawValue`] by serializing a value with the given converter.
394    pub fn from_value<T: TemporalSerializable + 'static>(
395        value: &T,
396        converter: &PayloadConverter,
397    ) -> RawValue {
398        RawValue::new(vec![
399            converter
400                .to_payload(
401                    &SerializationContext {
402                        data: &SerializationContextData::None,
403                        converter,
404                    },
405                    value,
406                )
407                .unwrap(),
408        ])
409    }
410
411    /// Deserialize this [`RawValue`] into a typed value using the given converter.
412    pub fn to_value<T: TemporalDeserializable + 'static>(self, converter: &PayloadConverter) -> T {
413        converter
414            .from_payload(
415                &SerializationContext {
416                    data: &SerializationContextData::None,
417                    converter,
418                },
419                self.payloads.into_iter().next().unwrap(),
420            )
421            .unwrap()
422    }
423}
424
425impl TemporalSerializable for RawValue {
426    fn to_payload(&self, _: &SerializationContext<'_>) -> Result<Payload, PayloadConversionError> {
427        Ok(self.payloads.first().cloned().unwrap_or_default())
428    }
429    fn to_payloads(
430        &self,
431        _: &SerializationContext<'_>,
432    ) -> Result<Vec<Payload>, PayloadConversionError> {
433        Ok(self.payloads.clone())
434    }
435}
436
437impl TemporalDeserializable for RawValue {
438    fn from_payload(
439        _: &SerializationContext<'_>,
440        p: Payload,
441    ) -> Result<Self, PayloadConversionError> {
442        Ok(RawValue { payloads: vec![p] })
443    }
444    fn from_payloads(
445        _: &SerializationContext<'_>,
446        payloads: Vec<Payload>,
447    ) -> Result<Self, PayloadConversionError> {
448        Ok(RawValue { payloads })
449    }
450}
451
452/// Generic interface for converting between typed values and [`Payload`]s.
453pub trait GenericPayloadConverter {
454    /// Serialize a value into a single [`Payload`].
455    fn to_payload<T: TemporalSerializable + 'static>(
456        &self,
457        context: &SerializationContext<'_>,
458        val: &T,
459    ) -> Result<Payload, PayloadConversionError>;
460    /// Deserialize a value from a single [`Payload`].
461    #[allow(clippy::wrong_self_convention)]
462    fn from_payload<T: TemporalDeserializable + 'static>(
463        &self,
464        context: &SerializationContext<'_>,
465        payload: Payload,
466    ) -> Result<T, PayloadConversionError>;
467    /// Serialize a value into multiple [`Payload`]s.
468    fn to_payloads<T: TemporalSerializable + 'static>(
469        &self,
470        context: &SerializationContext<'_>,
471        val: &T,
472    ) -> Result<Vec<Payload>, PayloadConversionError> {
473        Ok(vec![self.to_payload(context, val)?])
474    }
475    /// Deserialize a value from multiple [`Payload`]s.
476    #[allow(clippy::wrong_self_convention)]
477    fn from_payloads<T: TemporalDeserializable + 'static>(
478        &self,
479        context: &SerializationContext<'_>,
480        payloads: Vec<Payload>,
481    ) -> Result<T, PayloadConversionError> {
482        if payloads.len() != 1 {
483            return Err(PayloadConversionError::WrongEncoding);
484        }
485        self.from_payload(context, payloads.into_iter().next().unwrap())
486    }
487}
488
489impl GenericPayloadConverter for PayloadConverter {
490    fn to_payload<T: TemporalSerializable + 'static>(
491        &self,
492        context: &SerializationContext<'_>,
493        val: &T,
494    ) -> Result<Payload, PayloadConversionError> {
495        // If a single payload is explicitly needed for `()`, then produce a null payload
496        if std::any::TypeId::of::<T>() == std::any::TypeId::of::<()>() {
497            return Ok(Payload {
498                metadata: {
499                    let mut hm = HashMap::new();
500                    hm.insert("encoding".to_string(), b"binary/null".to_vec());
501                    hm
502                },
503                data: vec![],
504                external_payloads: vec![],
505            });
506        }
507        let mut payloads = self.to_payloads(context, val)?;
508        if payloads.len() != 1 {
509            return Err(PayloadConversionError::WrongEncoding);
510        }
511        Ok(payloads.pop().unwrap())
512    }
513
514    fn from_payload<T: TemporalDeserializable + 'static>(
515        &self,
516        context: &SerializationContext<'_>,
517        payload: Payload,
518    ) -> Result<T, PayloadConversionError> {
519        self.from_payloads(context, vec![payload])
520    }
521
522    fn to_payloads<T: TemporalSerializable + 'static>(
523        &self,
524        context: &SerializationContext<'_>,
525        val: &T,
526    ) -> Result<Vec<Payload>, PayloadConversionError> {
527        match self {
528            PayloadConverter::Serde(pc) => {
529                // Since Rust SDK uses () to denote no input, we must match other SDKs by producing
530                // no payloads for it.
531                if std::any::TypeId::of::<T>() == std::any::TypeId::of::<()>() {
532                    Ok(Vec::new())
533                } else {
534                    Ok(vec![pc.to_payload(context.data, val.as_serde()?)?])
535                }
536            }
537            PayloadConverter::UseWrappers => T::to_payloads(val, context),
538            PayloadConverter::Composite(composite) => {
539                for converter in &composite.converters {
540                    match converter.to_payloads(context, val) {
541                        Ok(payloads) => return Ok(payloads),
542                        Err(PayloadConversionError::WrongEncoding) => continue,
543                        Err(e) => return Err(e),
544                    }
545                }
546                Err(PayloadConversionError::WrongEncoding)
547            }
548        }
549    }
550
551    fn from_payloads<T: TemporalDeserializable + 'static>(
552        &self,
553        context: &SerializationContext<'_>,
554        payloads: Vec<Payload>,
555    ) -> Result<T, PayloadConversionError> {
556        // Accept empty payloads (no args) and a single binary/null payload (result from a
557        // workflow/update with () return type as ().
558        if std::any::TypeId::of::<T>() == std::any::TypeId::of::<()>()
559            && is_unit_payloads(&payloads)
560        {
561            let boxed: Box<dyn std::any::Any> = Box::new(());
562            return Ok(*boxed.downcast::<T>().unwrap());
563        }
564
565        match self {
566            PayloadConverter::Serde(pc) => {
567                if payloads.len() != 1 {
568                    return Err(PayloadConversionError::WrongEncoding);
569                }
570                T::from_serde(pc.as_ref(), context, payloads.into_iter().next().unwrap())
571            }
572            PayloadConverter::UseWrappers => T::from_payloads(context, payloads),
573            PayloadConverter::Composite(composite) => {
574                for converter in &composite.converters {
575                    match converter.from_payloads(context, payloads.clone()) {
576                        Ok(val) => return Ok(val),
577                        Err(PayloadConversionError::WrongEncoding) => continue,
578                        Err(e) => return Err(e),
579                    }
580                }
581                Err(PayloadConversionError::WrongEncoding)
582            }
583        }
584    }
585}
586
587fn is_unit_payloads(payloads: &[Payload]) -> bool {
588    match payloads {
589        [] => true,
590        [payload] => {
591            payload.data.is_empty()
592                && payload
593                    .metadata
594                    .get("encoding")
595                    .map(|encoding| encoding == b"binary/null")
596                    .unwrap_or(false)
597        }
598        _ => false,
599    }
600}
601
602// TODO [rust-sdk-branch]: Potentially allow opt-out / no-serde compile flags
603impl<T> TemporalSerializable for T
604where
605    T: serde::Serialize,
606{
607    fn as_serde(&self) -> Result<&dyn erased_serde::Serialize, PayloadConversionError> {
608        Ok(self)
609    }
610}
611impl<T> TemporalDeserializable for T
612where
613    T: serde::de::DeserializeOwned,
614{
615    fn from_serde(
616        pc: &dyn ErasedSerdePayloadConverter,
617        context: &SerializationContext<'_>,
618        payload: Payload,
619    ) -> Result<Self, PayloadConversionError>
620    where
621        Self: Sized,
622    {
623        let mut de = pc.from_payload(context.data, payload)?;
624        erased_serde::deserialize(&mut de)
625            .map_err(|e| PayloadConversionError::EncodingError(Box::new(e)))
626    }
627}
628
629struct SerdeJsonPayloadConverter;
630impl ErasedSerdePayloadConverter for SerdeJsonPayloadConverter {
631    fn to_payload(
632        &self,
633        _: &SerializationContextData,
634        value: &dyn erased_serde::Serialize,
635    ) -> Result<Payload, PayloadConversionError> {
636        let as_json = serde_json::to_vec(value)
637            .map_err(|e| PayloadConversionError::EncodingError(e.into()))?;
638        Ok(Payload {
639            metadata: {
640                let mut hm = HashMap::new();
641                hm.insert("encoding".to_string(), b"json/plain".to_vec());
642                hm
643            },
644            data: as_json,
645            external_payloads: vec![],
646        })
647    }
648
649    fn from_payload(
650        &self,
651        _: &SerializationContextData,
652        payload: Payload,
653    ) -> Result<Box<dyn erased_serde::Deserializer<'static>>, PayloadConversionError> {
654        let encoding = payload.metadata.get("encoding").map(|v| v.as_slice());
655        if encoding != Some(b"json/plain".as_slice()) {
656            return Err(PayloadConversionError::WrongEncoding);
657        }
658        let json_v: serde_json::Value = serde_json::from_slice(&payload.data)
659            .map_err(|e| PayloadConversionError::EncodingError(Box::new(e)))?;
660        Ok(Box::new(<dyn erased_serde::Deserializer>::erase(json_v)))
661    }
662}
663/// Type-erased serde-based payload converter for use behind `dyn` trait objects.
664pub trait ErasedSerdePayloadConverter: Send + Sync {
665    /// Serialize a type-erased serde value into a [`Payload`].
666    fn to_payload(
667        &self,
668        context: &SerializationContextData,
669        value: &dyn erased_serde::Serialize,
670    ) -> Result<Payload, PayloadConversionError>;
671    /// Deserialize a [`Payload`] into a type-erased serde deserializer.
672    #[allow(clippy::wrong_self_convention)]
673    fn from_payload(
674        &self,
675        context: &SerializationContextData,
676        payload: Payload,
677    ) -> Result<Box<dyn erased_serde::Deserializer<'static>>, PayloadConversionError>;
678}
679
680// TODO [rust-sdk-branch]: All prost things should be behind a compile flag
681
682/// Wrapper for protobuf messages that implements [`TemporalSerializable`]/[`TemporalDeserializable`]
683/// using `binary/protobuf` encoding.
684pub struct ProstSerializable<T: prost::Message>(pub T);
685impl<T> TemporalSerializable for ProstSerializable<T>
686where
687    T: prost::Message + Default + 'static,
688{
689    fn to_payload(&self, _: &SerializationContext<'_>) -> Result<Payload, PayloadConversionError> {
690        let as_proto = prost::Message::encode_to_vec(&self.0);
691        Ok(Payload {
692            metadata: {
693                let mut hm = HashMap::new();
694                hm.insert("encoding".to_string(), b"binary/protobuf".to_vec());
695                hm
696            },
697            data: as_proto,
698            external_payloads: vec![],
699        })
700    }
701}
702impl<T> TemporalDeserializable for ProstSerializable<T>
703where
704    T: prost::Message + Default + 'static,
705{
706    fn from_payload(
707        _: &SerializationContext<'_>,
708        p: Payload,
709    ) -> Result<Self, PayloadConversionError>
710    where
711        Self: Sized,
712    {
713        let encoding = p.metadata.get("encoding").map(|v| v.as_slice());
714        if encoding != Some(b"binary/protobuf".as_slice()) {
715            return Err(PayloadConversionError::WrongEncoding);
716        }
717        T::decode(p.data.as_slice())
718            .map(ProstSerializable)
719            .map_err(|e| PayloadConversionError::EncodingError(Box::new(e)))
720    }
721}
722
723/// A payload converter that delegates to an ordered list of inner converters.
724#[derive(Clone)]
725pub struct CompositePayloadConverter {
726    converters: Vec<PayloadConverter>,
727}
728
729impl Default for DataConverter {
730    fn default() -> Self {
731        Self::new(
732            PayloadConverter::default(),
733            DefaultFailureConverter,
734            DefaultPayloadCodec,
735        )
736    }
737}
738impl PayloadCodec for DefaultPayloadCodec {
739    fn encode(
740        &self,
741        _: &SerializationContextData,
742        payloads: Vec<Payload>,
743    ) -> BoxFuture<'static, Vec<Payload>> {
744        async move { payloads }.boxed()
745    }
746    fn decode(
747        &self,
748        _: &SerializationContextData,
749        payloads: Vec<Payload>,
750    ) -> BoxFuture<'static, Vec<Payload>> {
751        async move { payloads }.boxed()
752    }
753}
754
755/// Represents multiple arguments for workflows/activities that accept more than one argument.
756/// Use this when interoperating with other language SDKs that allow multiple arguments.
757macro_rules! impl_multi_args {
758    ($name:ident; $count:expr; $($idx:tt: $ty:ident),+) => {
759        #[doc = concat!("Wrapper for ", stringify!($count), " typed arguments, enabling multi-arg serialization.")]
760        #[derive(Clone, Debug, PartialEq, Eq)]
761        pub struct $name<$($ty),+>($(pub $ty),+);
762
763        impl<$($ty),+> TemporalSerializable for $name<$($ty),+>
764        where
765            $($ty: TemporalSerializable + 'static),+
766        {
767            fn to_payload(&self, _: &SerializationContext<'_>) -> Result<Payload, PayloadConversionError> {
768                Err(PayloadConversionError::WrongEncoding)
769            }
770            fn to_payloads(
771                &self,
772                ctx: &SerializationContext<'_>,
773            ) -> Result<Vec<Payload>, PayloadConversionError> {
774                Ok(vec![$(ctx.converter.to_payload(ctx, &self.$idx)?),+])
775            }
776        }
777
778        #[allow(non_snake_case)]
779        impl<$($ty),+> From<($($ty),+,)> for $name<$($ty),+> {
780            fn from(t: ($($ty),+,)) -> Self {
781                $name($(t.$idx),+)
782            }
783        }
784
785        impl<$($ty),+> TemporalDeserializable for $name<$($ty),+>
786        where
787            $($ty: TemporalDeserializable + 'static),+
788        {
789            fn from_payload(_: &SerializationContext<'_>, _: Payload) -> Result<Self, PayloadConversionError> {
790                Err(PayloadConversionError::WrongEncoding)
791            }
792            fn from_payloads(
793                ctx: &SerializationContext<'_>,
794                payloads: Vec<Payload>,
795            ) -> Result<Self, PayloadConversionError> {
796                if payloads.len() != $count {
797                    return Err(PayloadConversionError::WrongEncoding);
798                }
799                let mut iter = payloads.into_iter();
800                Ok($name(
801                    $(ctx.converter.from_payload::<$ty>(ctx, iter.next().unwrap())?),+
802                ))
803            }
804        }
805    };
806}
807
808impl_multi_args!(MultiArgs2; 2; 0: A, 1: B);
809impl_multi_args!(MultiArgs3; 3; 0: A, 1: B, 2: C);
810impl_multi_args!(MultiArgs4; 4; 0: A, 1: B, 2: C, 3: D);
811impl_multi_args!(MultiArgs5; 5; 0: A, 1: B, 2: C, 3: D, 4: E);
812impl_multi_args!(MultiArgs6; 6; 0: A, 1: B, 2: C, 3: D, 4: E, 5: F);
813
814#[cfg(test)]
815mod tests {
816    use super::*;
817
818    #[test]
819    fn test_empty_payloads_as_unit_type() {
820        let converter = PayloadConverter::default();
821        let ctx = SerializationContext {
822            data: &SerializationContextData::Workflow,
823            converter: &converter,
824        };
825
826        let empty_payloads: Vec<Payload> = vec![];
827        let result: Result<(), _> = converter.from_payloads(&ctx, empty_payloads);
828
829        assert!(result.is_ok(), "Empty payloads should deserialize as ()");
830    }
831
832    #[test]
833    fn test_unit_type_roundtrip_serde() {
834        let converter = PayloadConverter::serde_json();
835        let ctx = SerializationContext {
836            data: &SerializationContextData::Workflow,
837            converter: &converter,
838        };
839
840        let payloads = converter.to_payloads(&ctx, &()).unwrap();
841        assert!(payloads.is_empty());
842
843        let result: () = converter.from_payloads(&ctx, payloads).unwrap();
844        assert_eq!(result, ());
845    }
846
847    #[test]
848    fn test_unit_composite_roundtrip() {
849        let converter = PayloadConverter::default();
850        let ctx = SerializationContext {
851            data: &SerializationContextData::Workflow,
852            converter: &converter,
853        };
854
855        let payloads = converter.to_payloads(&ctx, &()).unwrap();
856        assert!(payloads.is_empty());
857
858        let result: () = converter.from_payloads(&ctx, payloads).unwrap();
859        assert_eq!(result, ());
860    }
861
862    #[test]
863    fn test_unit_to_payload_roundtrip() {
864        let converter = PayloadConverter::default();
865        let ctx = SerializationContext {
866            data: &SerializationContextData::Workflow,
867            converter: &converter,
868        };
869
870        let mut payloads = vec![converter.to_payload(&ctx, &()).unwrap()];
871        assert!(is_unit_payloads(&payloads));
872        let result: () = converter
873            .from_payload(&ctx, payloads.pop().unwrap())
874            .unwrap();
875        assert_eq!(result, ());
876    }
877
878    #[test]
879    fn test_unit_use_wrappers_returns_wrong_encoding() {
880        let converter = PayloadConverter::UseWrappers;
881        let ctx = SerializationContext {
882            data: &SerializationContextData::Workflow,
883            converter: &converter,
884        };
885
886        let result = converter.to_payloads(&ctx, &());
887        assert!(
888            matches!(result, Err(PayloadConversionError::WrongEncoding)),
889            "{result:?}"
890        );
891    }
892
893    #[test]
894    fn multi_args_round_trip() {
895        let converter = PayloadConverter::default();
896        let ctx = SerializationContext {
897            data: &SerializationContextData::Workflow,
898            converter: &converter,
899        };
900
901        let args = MultiArgs2("hello".to_string(), 42i32);
902        let payloads = converter.to_payloads(&ctx, &args).unwrap();
903        assert_eq!(payloads.len(), 2);
904
905        let result: MultiArgs2<String, i32> = converter.from_payloads(&ctx, payloads).unwrap();
906        assert_eq!(result, args);
907    }
908
909    #[test]
910    fn multi_args_from_tuple() {
911        let args: MultiArgs2<String, i32> = ("hello".to_string(), 42i32).into();
912        assert_eq!(args, MultiArgs2("hello".to_string(), 42));
913    }
914
915    fn decodable_from_value<T: TemporalSerializable + 'static>(value: &T) -> DecodablePayloads {
916        let converter = PayloadConverter::default();
917        let payloads = converter
918            .to_payloads(
919                &SerializationContext {
920                    data: &SerializationContextData::Workflow,
921                    converter: &converter,
922                },
923                value,
924            )
925            .unwrap();
926        DecodablePayloads::new(payloads, converter, SerializationContextData::Workflow)
927    }
928    #[test]
929    fn decodable_payloads_roundtrip_string() {
930        let payloads = decodable_from_value(&"hello".to_string());
931
932        let result: String = payloads.deserialize().unwrap();
933
934        assert_eq!(result, "hello");
935    }
936
937    #[test]
938    fn decodable_payloads_roundtrip_option_string() {
939        let payloads = decodable_from_value(&Some("hello".to_string()));
940
941        let result: Option<String> = payloads.deserialize().unwrap();
942
943        assert_eq!(result, Some("hello".to_string()));
944    }
945
946    #[test]
947    fn decodable_payloads_roundtrip_unit() {
948        let payloads = decodable_from_value(&());
949
950        let result: () = payloads.deserialize().unwrap();
951
952        assert_eq!(result, ());
953    }
954
955    #[test]
956    fn decodable_payloads_roundtrip_vec_string() {
957        let payloads = decodable_from_value(&vec!["hello".to_string(), "world".to_string()]);
958
959        let result: Vec<String> = payloads.deserialize().unwrap();
960
961        assert_eq!(result, vec!["hello".to_string(), "world".to_string()]);
962    }
963}