wascc_actor/
messaging.rs

1//! # Message Broker
2//!
3//! This module contains the message broker client interface through which actor modules access
4//! a bound `wascc:messaging` capability provider
5
6use wapc_guest::host_call;
7
8const CAPID_MESSAGING: &str = "wascc:messaging";
9
10use crate::HandlerResult;
11use codec::messaging::{BrokerMessage, RequestMessage, OP_PERFORM_REQUEST, OP_PUBLISH_MESSAGE};
12use codec::serialize;
13use wascc_codec as codec;
14
15/// Create a new named message broker host binding
16pub fn host(binding: &str) -> MessageBrokerHostBinding {
17    MessageBrokerHostBinding {
18        binding: binding.to_string(),
19    }
20}
21
22/// Create a default message broker host binding
23pub fn default() -> MessageBrokerHostBinding {
24    MessageBrokerHostBinding {
25        binding: "default".to_string(),
26    }
27}
28
29/// Exposes message broker functionality to actor modules
30pub struct MessageBrokerHostBinding {
31    binding: String,
32}
33
34impl MessageBrokerHostBinding {
35    /// Publishes a message on a given subject with an optional reply subject
36    pub fn publish(
37        &self,
38        subject: &str,
39        reply_to: Option<&str>,
40        payload: &[u8],
41    ) -> HandlerResult<()> {
42        let cmd = BrokerMessage {
43            subject: subject.to_string(),
44            reply_to: reply_to.map_or("".to_string(), |r| r.to_string()),
45            body: payload.to_vec(),
46        };
47
48        host_call(
49            &self.binding,
50            CAPID_MESSAGING,
51            OP_PUBLISH_MESSAGE,
52            &serialize(cmd)?,
53        )
54        .map_err(|e| e.into())
55        .map(|_vec| ())
56    }
57
58    /// Publishes a message and expects a reply to come back within a given timeout (in milliseconds)
59    pub fn request(
60        &self,
61        subject: &str,
62        payload: &[u8],
63        timeout_ms: u64,
64    ) -> HandlerResult<Vec<u8>> {
65        let cmd = RequestMessage {
66            subject: subject.to_string(),
67            timeout_ms: timeout_ms as _,
68            body: payload.to_vec(),
69        };
70
71        // The broker plugin applies no wrapper around the response from the broker, the
72        // raw payload is delivered.
73        host_call(
74            &self.binding,
75            CAPID_MESSAGING,
76            OP_PERFORM_REQUEST,
77            &serialize(cmd)?,
78        )
79        .map_err(|e| e.into())
80    }
81}