1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
use crate::error::{RpcError, RpcResult};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;

/// A wasmcloud message
#[derive(Debug)]
pub struct Message<'m> {
    /// Message name, usually in the form 'Trait.method'
    pub method: &'m str,
    /// parameter serialized as a byte array. If the method takes no args, the array will be
    /// zero length.
    pub arg: Cow<'m, [u8]>,
}

/// Context - message passing metadata used by wasmhost Actors and Capability Providers
#[derive(Default, Debug, Clone)]
pub struct Context {
    /// Messages received by Context Provider will have actor set to the actor's public key
    pub actor: Option<String>,

    /// Span name/context for tracing. This is a placeholder for now
    pub span: Option<String>,
}

/// Client config defines the intended recipient of a message and parameters that transport may use to adapt sending it
#[derive(Default, Debug)]
pub struct SendOpts {
    /// Optional flag for idempotent messages - transport may perform retries within configured timeouts
    pub idempotent: bool,

    /// Optional flag for read-only messages - those that do not change the responder's state. read-only messages may be retried within configured timeouts.
    pub read_only: bool,
}

impl SendOpts {
    #[must_use]
    pub fn idempotent(mut self, val: bool) -> SendOpts {
        self.idempotent = val;
        self
    }

    #[must_use]
    pub fn read_only(mut self, val: bool) -> SendOpts {
        self.read_only = val;
        self
    }
}

/// Transport determines how messages are sent
/// Alternate implementations could be mock-server, or test-fuzz-server / test-fuzz-client
#[async_trait]
pub trait Transport: Send {
    async fn send(
        &self,
        ctx: &Context,
        req: Message<'_>,
        opts: Option<SendOpts>,
    ) -> std::result::Result<Vec<u8>, RpcError>;

    /// Sets rpc timeout
    fn set_timeout(&self, interval: std::time::Duration);
}

// select serialization/deserialization mode
pub fn deserialize<'de, T: Deserialize<'de>>(buf: &'de [u8]) -> Result<T, RpcError> {
    rmp_serde::from_read_ref(buf).map_err(|e| RpcError::Deser(e.to_string()))
}

pub fn serialize<T: Serialize>(data: &T) -> Result<Vec<u8>, RpcError> {
    rmp_serde::to_vec_named(data).map_err(|e| RpcError::Ser(e.to_string()))
    // for benchmarking: the following line uses msgpack without field names
    //rmp_serde::to_vec(data).map_err(|e| RpcError::Ser(e.to_string()))
}

#[async_trait]
pub trait MessageDispatch {
    async fn dispatch(&self, ctx: &Context, message: Message<'_>) -> Result<Message<'_>, RpcError>;
}

/// Message encodingn format
pub enum MessageFormat {
    Msgpack,
    Cbor,
    Empty,
    Unknown,
}

/// returns serialization format,
/// and offset for beginning of payload
pub fn message_format(data: &[u8]) -> (MessageFormat, usize) {
    // The initial byte of the message is used to distinguish between
    // a legacy msgpack payload, and a prefix plus payload.
    // If the payload has only a single byte, it must be msgpack.
    // A wasmbus msgpack payload containing 2 or more bytes will never begin
    // with any of the following:
    //   0x00-0x7f - used by msgpack to encode small ints 0-127
    //   0xc0      - msgpack Nil
    //   0xc1      - unused by msgpack
    // These values can all be used as the initial byte of a prefix.
    //
    // (The reason the first byte cannot be a msgpack small int is that
    //  wasmbus payloads only contain a single value: a primitive type,
    //  or a struct or a map. It cannot be a Nil because the single value
    //  of a wasmbus message is never an Option<> and there is no other
    //  way that a null would be generated.)
    match data.len() {
        0 => (MessageFormat::Empty, 0),
        1 => (MessageFormat::Msgpack, 0), // 1-byte msgpack legacy
        _ => {
            match data[0] {
                0x00 => (MessageFormat::Cbor, 1),           // prefix + cbor
                0xc1 => (MessageFormat::Msgpack, 1),        // prefix + msgpack
                0x01..=0x7f => (MessageFormat::Unknown, 0), // RESERVED
                0xc0 => (MessageFormat::Unknown, 0),        // RESERVED
                _ => (MessageFormat::Msgpack, 0),           // legacy
            }
        }
    }
}

pub type CborDecodeFn<T> = dyn Fn(&mut crate::cbor::Decoder<'_>) -> RpcResult<T>;

pub fn decode<T: serde::de::DeserializeOwned>(
    buf: &[u8],
    cbor_dec: &CborDecodeFn<T>,
) -> RpcResult<T> {
    let value = match message_format(buf) {
        (MessageFormat::Cbor, offset) => {
            let d = &mut crate::cbor::Decoder::new(&buf[offset..]);
            cbor_dec(d)?
        }
        (MessageFormat::Msgpack, offset) => deserialize(&buf[offset..])
            .map_err(|e| RpcError::Deser(format!("decoding '{}': {{}}", e)))?,
        _ => return Err(RpcError::Deser("invalid encoding for '{}'".to_string())),
    };
    Ok(value)
}