wasmbus_rpc/
common.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
use crate::error::{RpcError, RpcResult};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::{borrow::Cow, fmt};

pub use crate::document::{
    decode_document, decode_number, encode_document, encode_document_ref, encode_number, Document,
    DocumentRef, Number,
};

/// A wasmcloud message
#[derive(Debug)]
pub struct Message<'m> {
    /// Message name, usually in the form 'Trait.method'
    pub method: &'m str,
    /// parameter serialized as a byte array. If the method takes no args, the array will be
    /// zero length.
    pub arg: Cow<'m, [u8]>,
}

/// Context - message passing metadata used by wasmhost Actors and Capability Providers
#[derive(Default, Debug, Clone)]
pub struct Context {
    /// Messages received by Context Provider will have actor set to the actor's public key
    pub actor: Option<String>,

    /// Span name/context for tracing. This is a placeholder for now
    pub span: Option<String>,
}

/// Client config defines the intended recipient of a message and parameters that transport may use to adapt sending it
#[derive(Default, Debug)]
pub struct SendOpts {
    /// Optional flag for idempotent messages - transport may perform retries within configured timeouts
    pub idempotent: bool,

    /// Optional flag for read-only messages - those that do not change the responder's state. read-only messages may be retried within configured timeouts.
    pub read_only: bool,
}

impl SendOpts {
    #[must_use]
    pub fn idempotent(mut self, val: bool) -> SendOpts {
        self.idempotent = val;
        self
    }

    #[must_use]
    pub fn read_only(mut self, val: bool) -> SendOpts {
        self.read_only = val;
        self
    }
}

/// Transport determines how messages are sent
/// Alternate implementations could be mock-server, or test-fuzz-server / test-fuzz-client
#[async_trait]
pub trait Transport: Send + Sync + Clone {
    async fn send(
        &self,
        ctx: &Context,
        req: Message<'_>,
        opts: Option<SendOpts>,
    ) -> Result<Vec<u8>, RpcError>;

    /// Sets rpc timeout
    fn set_timeout(&self, interval: std::time::Duration);
}

// select serialization/deserialization mode
pub fn deserialize<'de, T: Deserialize<'de>>(buf: &'de [u8]) -> Result<T, RpcError> {
    rmp_serde::from_slice(buf).map_err(|e| RpcError::Deser(e.to_string()))
}

pub fn serialize<T: Serialize>(data: &T) -> Result<Vec<u8>, RpcError> {
    rmp_serde::to_vec_named(data).map_err(|e| RpcError::Ser(e.to_string()))
    // for benchmarking: the following line uses msgpack without field names
    //rmp_serde::to_vec(data).map_err(|e| RpcError::Ser(e.to_string()))
}

#[async_trait]
pub trait MessageDispatch {
    async fn dispatch(&self, ctx: &Context, message: Message<'_>) -> Result<Vec<u8>, RpcError>;
}

/// Message encoding format
#[derive(Clone, Eq, PartialEq)]
pub enum MessageFormat {
    Msgpack,
    Cbor,
    Empty,
    Unknown,
}

impl fmt::Display for MessageFormat {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.write_str(match self {
            MessageFormat::Msgpack => "msgpack",
            MessageFormat::Cbor => "cbor",
            MessageFormat::Empty => "empty",
            MessageFormat::Unknown => "unknown",
        })
    }
}

impl fmt::Debug for MessageFormat {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.write_str(&self.to_string())
    }
}

impl MessageFormat {
    pub fn write_header<W: std::io::Write>(&self, mut buf: W) -> std::io::Result<usize> {
        match self {
            MessageFormat::Cbor => buf.write(&[127u8]),    // 0x7f
            MessageFormat::Msgpack => buf.write(&[193u8]), // 0xc1
            MessageFormat::Empty => Ok(0),
            MessageFormat::Unknown => Ok(0),
        }
    }
}

/// returns serialization format,
/// and offset for beginning of payload
pub fn message_format(data: &[u8]) -> (MessageFormat, usize) {
    // The initial byte of the message is used to distinguish between
    // a legacy msgpack payload, and a prefix plus payload.
    // If the payload has only a single byte, it must be msgpack.
    // A wasmbus msgpack payload containing 2 or more bytes will never begin
    // with any of the following:
    //   0x00-0x7f - used by msgpack to encode small ints 0-127
    //   0xc0      - msgpack Nil
    //   0xc1      - unused by msgpack
    // These values can all be used as the initial byte of a prefix.
    //
    // (The reason the first byte cannot be a msgpack small int is that
    //  wasmbus payloads only contain a single value: a primitive type,
    //  or a struct or a map. It cannot be a Nil because the single value
    //  of a wasmbus message is never an Option<> and there is no other
    //  way that a null would be generated.)
    match data.len() {
        0 => (MessageFormat::Empty, 0),
        1 => (MessageFormat::Msgpack, 0), // 1-byte msgpack legacy
        _ => {
            match data[0] {
                0x7f => (MessageFormat::Cbor, 1),           // prefix + cbor
                0xc1 => (MessageFormat::Msgpack, 1),        // prefix + msgpack
                0x00..=0x7e => (MessageFormat::Unknown, 0), // RESERVED
                0xc0 => (MessageFormat::Unknown, 0),        // RESERVED
                _ => (MessageFormat::Msgpack, 0),           // legacy
            }
        }
    }
}

pub type CborDecodeFn<T> = dyn Fn(&mut crate::cbor::Decoder<'_>) -> RpcResult<T>;

pub type CborDecodeFnLt<'de, T> = dyn Fn(&mut crate::cbor::Decoder<'de>) -> RpcResult<T>;

pub fn decode<T: serde::de::DeserializeOwned>(
    buf: &[u8],
    cbor_dec: &CborDecodeFn<T>,
) -> RpcResult<T> {
    let value = match message_format(buf) {
        (MessageFormat::Cbor, offset) => {
            let d = &mut crate::cbor::Decoder::new(&buf[offset..]);
            cbor_dec(d)?
        }
        (MessageFormat::Msgpack, offset) => deserialize(&buf[offset..])
            .map_err(|e| RpcError::Deser(format!("decoding '{e}': {{}}")))?,
        _ => return Err(RpcError::Deser("invalid encoding".to_string())),
    };
    Ok(value)
}

pub fn decode_borrowed<'de, T>(
    buf: &'de [u8],
    cbor_dec: &'de CborDecodeFnLt<'de, T>,
) -> RpcResult<T> {
    match message_format(buf) {
        (MessageFormat::Cbor, offset) => {
            let d = &mut crate::cbor::Decoder::new(&buf[offset..]);
            Ok(cbor_dec(d)?)
        }
        _ => Err(RpcError::Deser("invalid encoding (borrowed)".to_string())),
    }
}

pub fn decode_owned<'vin, T: Clone>(
    buf: &'vin [u8],
    cbor_dec: &CborDecodeFnLt<'vin, T>,
) -> RpcResult<T> {
    match message_format(buf) {
        (MessageFormat::Cbor, offset) => {
            let d = &mut crate::cbor::Decoder::new(&buf[offset..]);
            let out: T = cbor_dec(d)?;
            Ok(out)
        }
        _ => Err(RpcError::Deser("invalid encoding (borrowed)".to_string())),
    }
}

/*
pub fn decode_owned<T>(d: &mut crate::cbor::Decoder) -> RpcResult<T>
where
    T: crate::cbor::DecodeOwned + Sized,
{
    T::decode(d) // .map_err(|e| RpcError::Deser(e.to_string()))
}
 */

/// Wasmbus rpc sender that can send any message and cbor-serializable payload
/// requires Protocol="2"
pub struct AnySender<T: Transport> {
    transport: T,
}

impl<T: Transport> AnySender<T> {
    pub fn new(transport: T) -> Self {
        Self { transport }
    }
}

impl<T: Transport> AnySender<T> {
    /// Send encoded payload
    #[inline]
    async fn send_raw<'s, 'ctx, 'msg>(
        &'s self,
        ctx: &'ctx Context,
        msg: Message<'msg>,
    ) -> RpcResult<Vec<u8>> {
        self.transport.send(ctx, msg, None).await
    }

    /// Send rpc with serializable payload
    pub async fn send<In: Serialize, Out: serde::de::DeserializeOwned>(
        &self,
        ctx: &Context,
        method: &str,
        arg: &In,
    ) -> RpcResult<Out> {
        let mut buf = Vec::new();
        MessageFormat::Cbor.write_header(&mut buf).unwrap();
        minicbor_ser::to_writer(arg, &mut buf).map_err(|e| RpcError::Ser(e.to_string()))?;
        let resp = self.send_raw(ctx, Message { method, arg: Cow::Borrowed(&buf) }).await?;
        let result: Out =
            minicbor_ser::from_slice(&resp).map_err(|e| RpcError::Deser(e.to_string()))?;
        Ok(result)
    }

    /// Send rpc with serializable payload using cbor encode/decode
    pub async fn send_cbor<'de, In: minicbor::Encode<()>, Out: crate::cbor::MDecodeOwned<()>>(
        &mut self,
        ctx: &Context,
        method: &str,
        arg: &In,
    ) -> RpcResult<Out> {
        let mut buf = Vec::new();
        MessageFormat::Cbor.write_header(&mut buf).unwrap();
        crate::minicbor::encode(arg, &mut buf).map_err(|e| RpcError::Ser(e.to_string()))?;
        let resp = self.send_raw(ctx, Message { method, arg: Cow::Borrowed(&buf) }).await?;
        let result: Out =
            crate::minicbor::decode(&resp).map_err(|e| RpcError::Deser(e.to_string()))?;
        Ok(result)
    }
}

pub type Unit = ();