#[allow(unused_imports)]
use async_trait::async_trait;
#[allow(unused_imports)]
use serde::{Deserialize, Serialize};
#[allow(unused_imports)]
use std::{borrow::Borrow, borrow::Cow, io::Write, string::ToString};
#[allow(unused_imports)]
use wasmbus_rpc::{
cbor::*,
common::{
deserialize, message_format, serialize, Context, Message, MessageDispatch, MessageFormat,
SendOpts, Transport,
},
error::{RpcError, RpcResult},
Timestamp,
};
#[allow(dead_code)]
pub const SMITHY_VERSION: &str = "1.0";
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct PubMessage {
#[serde(default)]
pub subject: String,
#[serde(rename = "replyTo")]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub reply_to: Option<String>,
#[serde(with = "serde_bytes")]
#[serde(default)]
pub body: Vec<u8>,
}
#[doc(hidden)]
#[allow(unused_mut)]
pub fn encode_pub_message<W: wasmbus_rpc::cbor::Write>(
mut e: &mut wasmbus_rpc::cbor::Encoder<W>,
val: &PubMessage,
) -> RpcResult<()>
where
<W as wasmbus_rpc::cbor::Write>::Error: std::fmt::Display,
{
e.array(3)?;
e.str(&val.subject)?;
if let Some(val) = val.reply_to.as_ref() {
e.str(val)?;
} else {
e.null()?;
}
e.bytes(&val.body)?;
Ok(())
}
#[doc(hidden)]
pub fn decode_pub_message(d: &mut wasmbus_rpc::cbor::Decoder<'_>) -> Result<PubMessage, RpcError> {
let __result = {
let mut subject: Option<String> = None;
let mut reply_to: Option<Option<String>> = Some(None);
let mut body: Option<Vec<u8>> = None;
let is_array = match d.datatype()? {
wasmbus_rpc::cbor::Type::Array => true,
wasmbus_rpc::cbor::Type::Map => false,
_ => {
return Err(RpcError::Deser(
"decoding struct PubMessage, expected array or map".to_string(),
))
}
};
if is_array {
let len = d.fixed_array()?;
for __i in 0..(len as usize) {
match __i {
0 => subject = Some(d.str()?.to_string()),
1 => {
reply_to = if wasmbus_rpc::cbor::Type::Null == d.datatype()? {
d.skip()?;
Some(None)
} else {
Some(Some(d.str()?.to_string()))
}
}
2 => body = Some(d.bytes()?.to_vec()),
_ => d.skip()?,
}
}
} else {
let len = d.fixed_map()?;
for __i in 0..(len as usize) {
match d.str()? {
"subject" => subject = Some(d.str()?.to_string()),
"replyTo" => {
reply_to = if wasmbus_rpc::cbor::Type::Null == d.datatype()? {
d.skip()?;
Some(None)
} else {
Some(Some(d.str()?.to_string()))
}
}
"body" => body = Some(d.bytes()?.to_vec()),
_ => d.skip()?,
}
}
}
PubMessage {
subject: if let Some(__x) = subject {
__x
} else {
return Err(RpcError::Deser(
"missing field PubMessage.subject (#0)".to_string(),
));
},
reply_to: reply_to.unwrap(),
body: if let Some(__x) = body {
__x
} else {
return Err(RpcError::Deser(
"missing field PubMessage.body (#2)".to_string(),
));
},
}
};
Ok(__result)
}
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct ReplyMessage {
#[serde(default)]
pub subject: String,
#[serde(rename = "replyTo")]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub reply_to: Option<String>,
#[serde(with = "serde_bytes")]
#[serde(default)]
pub body: Vec<u8>,
}
#[doc(hidden)]
#[allow(unused_mut)]
pub fn encode_reply_message<W: wasmbus_rpc::cbor::Write>(
mut e: &mut wasmbus_rpc::cbor::Encoder<W>,
val: &ReplyMessage,
) -> RpcResult<()>
where
<W as wasmbus_rpc::cbor::Write>::Error: std::fmt::Display,
{
e.array(3)?;
e.str(&val.subject)?;
if let Some(val) = val.reply_to.as_ref() {
e.str(val)?;
} else {
e.null()?;
}
e.bytes(&val.body)?;
Ok(())
}
#[doc(hidden)]
pub fn decode_reply_message(
d: &mut wasmbus_rpc::cbor::Decoder<'_>,
) -> Result<ReplyMessage, RpcError> {
let __result = {
let mut subject: Option<String> = None;
let mut reply_to: Option<Option<String>> = Some(None);
let mut body: Option<Vec<u8>> = None;
let is_array = match d.datatype()? {
wasmbus_rpc::cbor::Type::Array => true,
wasmbus_rpc::cbor::Type::Map => false,
_ => {
return Err(RpcError::Deser(
"decoding struct ReplyMessage, expected array or map".to_string(),
))
}
};
if is_array {
let len = d.fixed_array()?;
for __i in 0..(len as usize) {
match __i {
0 => subject = Some(d.str()?.to_string()),
1 => {
reply_to = if wasmbus_rpc::cbor::Type::Null == d.datatype()? {
d.skip()?;
Some(None)
} else {
Some(Some(d.str()?.to_string()))
}
}
2 => body = Some(d.bytes()?.to_vec()),
_ => d.skip()?,
}
}
} else {
let len = d.fixed_map()?;
for __i in 0..(len as usize) {
match d.str()? {
"subject" => subject = Some(d.str()?.to_string()),
"replyTo" => {
reply_to = if wasmbus_rpc::cbor::Type::Null == d.datatype()? {
d.skip()?;
Some(None)
} else {
Some(Some(d.str()?.to_string()))
}
}
"body" => body = Some(d.bytes()?.to_vec()),
_ => d.skip()?,
}
}
}
ReplyMessage {
subject: if let Some(__x) = subject {
__x
} else {
return Err(RpcError::Deser(
"missing field ReplyMessage.subject (#0)".to_string(),
));
},
reply_to: reply_to.unwrap(),
body: if let Some(__x) = body {
__x
} else {
return Err(RpcError::Deser(
"missing field ReplyMessage.body (#2)".to_string(),
));
},
}
};
Ok(__result)
}
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct RequestMessage {
#[serde(default)]
pub subject: String,
#[serde(with = "serde_bytes")]
#[serde(default)]
pub body: Vec<u8>,
#[serde(rename = "timeoutMs")]
#[serde(default)]
pub timeout_ms: u32,
}
#[doc(hidden)]
#[allow(unused_mut)]
pub fn encode_request_message<W: wasmbus_rpc::cbor::Write>(
mut e: &mut wasmbus_rpc::cbor::Encoder<W>,
val: &RequestMessage,
) -> RpcResult<()>
where
<W as wasmbus_rpc::cbor::Write>::Error: std::fmt::Display,
{
e.array(3)?;
e.str(&val.subject)?;
e.bytes(&val.body)?;
e.u32(val.timeout_ms)?;
Ok(())
}
#[doc(hidden)]
pub fn decode_request_message(
d: &mut wasmbus_rpc::cbor::Decoder<'_>,
) -> Result<RequestMessage, RpcError> {
let __result = {
let mut subject: Option<String> = None;
let mut body: Option<Vec<u8>> = None;
let mut timeout_ms: Option<u32> = None;
let is_array = match d.datatype()? {
wasmbus_rpc::cbor::Type::Array => true,
wasmbus_rpc::cbor::Type::Map => false,
_ => {
return Err(RpcError::Deser(
"decoding struct RequestMessage, expected array or map".to_string(),
))
}
};
if is_array {
let len = d.fixed_array()?;
for __i in 0..(len as usize) {
match __i {
0 => subject = Some(d.str()?.to_string()),
1 => body = Some(d.bytes()?.to_vec()),
2 => timeout_ms = Some(d.u32()?),
_ => d.skip()?,
}
}
} else {
let len = d.fixed_map()?;
for __i in 0..(len as usize) {
match d.str()? {
"subject" => subject = Some(d.str()?.to_string()),
"body" => body = Some(d.bytes()?.to_vec()),
"timeoutMs" => timeout_ms = Some(d.u32()?),
_ => d.skip()?,
}
}
}
RequestMessage {
subject: if let Some(__x) = subject {
__x
} else {
return Err(RpcError::Deser(
"missing field RequestMessage.subject (#0)".to_string(),
));
},
body: if let Some(__x) = body {
__x
} else {
return Err(RpcError::Deser(
"missing field RequestMessage.body (#1)".to_string(),
));
},
timeout_ms: if let Some(__x) = timeout_ms {
__x
} else {
return Err(RpcError::Deser(
"missing field RequestMessage.timeout_ms (#2)".to_string(),
));
},
}
};
Ok(__result)
}
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct SubMessage {
#[serde(default)]
pub subject: String,
#[serde(rename = "replyTo")]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub reply_to: Option<String>,
#[serde(with = "serde_bytes")]
#[serde(default)]
pub body: Vec<u8>,
}
#[doc(hidden)]
#[allow(unused_mut)]
pub fn encode_sub_message<W: wasmbus_rpc::cbor::Write>(
mut e: &mut wasmbus_rpc::cbor::Encoder<W>,
val: &SubMessage,
) -> RpcResult<()>
where
<W as wasmbus_rpc::cbor::Write>::Error: std::fmt::Display,
{
e.array(3)?;
e.str(&val.subject)?;
if let Some(val) = val.reply_to.as_ref() {
e.str(val)?;
} else {
e.null()?;
}
e.bytes(&val.body)?;
Ok(())
}
#[doc(hidden)]
pub fn decode_sub_message(d: &mut wasmbus_rpc::cbor::Decoder<'_>) -> Result<SubMessage, RpcError> {
let __result = {
let mut subject: Option<String> = None;
let mut reply_to: Option<Option<String>> = Some(None);
let mut body: Option<Vec<u8>> = None;
let is_array = match d.datatype()? {
wasmbus_rpc::cbor::Type::Array => true,
wasmbus_rpc::cbor::Type::Map => false,
_ => {
return Err(RpcError::Deser(
"decoding struct SubMessage, expected array or map".to_string(),
))
}
};
if is_array {
let len = d.fixed_array()?;
for __i in 0..(len as usize) {
match __i {
0 => subject = Some(d.str()?.to_string()),
1 => {
reply_to = if wasmbus_rpc::cbor::Type::Null == d.datatype()? {
d.skip()?;
Some(None)
} else {
Some(Some(d.str()?.to_string()))
}
}
2 => body = Some(d.bytes()?.to_vec()),
_ => d.skip()?,
}
}
} else {
let len = d.fixed_map()?;
for __i in 0..(len as usize) {
match d.str()? {
"subject" => subject = Some(d.str()?.to_string()),
"replyTo" => {
reply_to = if wasmbus_rpc::cbor::Type::Null == d.datatype()? {
d.skip()?;
Some(None)
} else {
Some(Some(d.str()?.to_string()))
}
}
"body" => body = Some(d.bytes()?.to_vec()),
_ => d.skip()?,
}
}
}
SubMessage {
subject: if let Some(__x) = subject {
__x
} else {
return Err(RpcError::Deser(
"missing field SubMessage.subject (#0)".to_string(),
));
},
reply_to: reply_to.unwrap(),
body: if let Some(__x) = body {
__x
} else {
return Err(RpcError::Deser(
"missing field SubMessage.body (#2)".to_string(),
));
},
}
};
Ok(__result)
}
#[async_trait]
pub trait MessageSubscriber {
fn contract_id() -> &'static str {
"wasmcloud:messaging"
}
async fn handle_message(&self, ctx: &Context, arg: &SubMessage) -> RpcResult<()>;
}
#[doc(hidden)]
#[async_trait]
pub trait MessageSubscriberReceiver: MessageDispatch + MessageSubscriber {
async fn dispatch(&self, ctx: &Context, message: Message<'_>) -> Result<Vec<u8>, RpcError> {
match message.method {
"HandleMessage" => {
let value: SubMessage = wasmbus_rpc::common::deserialize(&message.arg)
.map_err(|e| RpcError::Deser(format!("'SubMessage': {}", e)))?;
let _resp = MessageSubscriber::handle_message(self, ctx, &value).await?;
let buf = Vec::new();
Ok(buf)
}
_ => Err(RpcError::MethodNotHandled(format!(
"MessageSubscriber::{}",
message.method
))),
}
}
}
#[derive(Clone, Debug)]
pub struct MessageSubscriberSender<T: Transport> {
transport: T,
}
impl<T: Transport> MessageSubscriberSender<T> {
pub fn via(transport: T) -> Self {
Self { transport }
}
pub fn set_timeout(&self, interval: std::time::Duration) {
self.transport.set_timeout(interval);
}
}
#[cfg(not(target_arch = "wasm32"))]
impl<'send> MessageSubscriberSender<wasmbus_rpc::provider::ProviderTransport<'send>> {
pub fn for_actor(ld: &'send wasmbus_rpc::core::LinkDefinition) -> Self {
Self {
transport: wasmbus_rpc::provider::ProviderTransport::new(ld, None),
}
}
}
#[cfg(target_arch = "wasm32")]
impl MessageSubscriberSender<wasmbus_rpc::actor::prelude::WasmHost> {
pub fn to_actor(actor_id: &str) -> Self {
let transport =
wasmbus_rpc::actor::prelude::WasmHost::to_actor(actor_id.to_string()).unwrap();
Self { transport }
}
}
#[async_trait]
impl<T: Transport + std::marker::Sync + std::marker::Send> MessageSubscriber
for MessageSubscriberSender<T>
{
#[allow(unused)]
async fn handle_message(&self, ctx: &Context, arg: &SubMessage) -> RpcResult<()> {
let buf = wasmbus_rpc::common::serialize(arg)?;
let resp = self
.transport
.send(
ctx,
Message {
method: "MessageSubscriber.HandleMessage",
arg: Cow::Borrowed(&buf),
},
None,
)
.await?;
Ok(())
}
}
#[async_trait]
pub trait Messaging {
fn contract_id() -> &'static str {
"wasmcloud:messaging"
}
async fn publish(&self, ctx: &Context, arg: &PubMessage) -> RpcResult<()>;
async fn request(&self, ctx: &Context, arg: &RequestMessage) -> RpcResult<ReplyMessage>;
}
#[doc(hidden)]
#[async_trait]
pub trait MessagingReceiver: MessageDispatch + Messaging {
async fn dispatch(&self, ctx: &Context, message: Message<'_>) -> Result<Vec<u8>, RpcError> {
match message.method {
"Publish" => {
let value: PubMessage = wasmbus_rpc::common::deserialize(&message.arg)
.map_err(|e| RpcError::Deser(format!("'PubMessage': {}", e)))?;
let _resp = Messaging::publish(self, ctx, &value).await?;
let buf = Vec::new();
Ok(buf)
}
"Request" => {
let value: RequestMessage = wasmbus_rpc::common::deserialize(&message.arg)
.map_err(|e| RpcError::Deser(format!("'RequestMessage': {}", e)))?;
let resp = Messaging::request(self, ctx, &value).await?;
let buf = wasmbus_rpc::common::serialize(&resp)?;
Ok(buf)
}
_ => Err(RpcError::MethodNotHandled(format!(
"Messaging::{}",
message.method
))),
}
}
}
#[derive(Clone, Debug)]
pub struct MessagingSender<T: Transport> {
transport: T,
}
impl<T: Transport> MessagingSender<T> {
pub fn via(transport: T) -> Self {
Self { transport }
}
pub fn set_timeout(&self, interval: std::time::Duration) {
self.transport.set_timeout(interval);
}
}
#[cfg(target_arch = "wasm32")]
impl MessagingSender<wasmbus_rpc::actor::prelude::WasmHost> {
pub fn new() -> Self {
let transport =
wasmbus_rpc::actor::prelude::WasmHost::to_provider("wasmcloud:messaging", "default")
.unwrap();
Self { transport }
}
pub fn new_with_link(link_name: &str) -> wasmbus_rpc::error::RpcResult<Self> {
let transport =
wasmbus_rpc::actor::prelude::WasmHost::to_provider("wasmcloud:messaging", link_name)?;
Ok(Self { transport })
}
}
#[async_trait]
impl<T: Transport + std::marker::Sync + std::marker::Send> Messaging for MessagingSender<T> {
#[allow(unused)]
async fn publish(&self, ctx: &Context, arg: &PubMessage) -> RpcResult<()> {
let buf = wasmbus_rpc::common::serialize(arg)?;
let resp = self
.transport
.send(
ctx,
Message {
method: "Messaging.Publish",
arg: Cow::Borrowed(&buf),
},
None,
)
.await?;
Ok(())
}
#[allow(unused)]
async fn request(&self, ctx: &Context, arg: &RequestMessage) -> RpcResult<ReplyMessage> {
let buf = wasmbus_rpc::common::serialize(arg)?;
let resp = self
.transport
.send(
ctx,
Message {
method: "Messaging.Request",
arg: Cow::Borrowed(&buf),
},
None,
)
.await?;
let value: ReplyMessage = wasmbus_rpc::common::deserialize(&resp)
.map_err(|e| RpcError::Deser(format!("'{}': ReplyMessage", e)))?;
Ok(value)
}
}