Skip to main content

omnia_nats/
messaging.rs

1use std::any::Any;
2use std::collections::HashMap;
3use std::sync::Arc;
4
5use anyhow::{Context, anyhow};
6use async_nats::HeaderMap;
7use futures::future::FutureExt;
8use futures::stream::{self, StreamExt};
9use omnia_wasi_messaging::{
10    Client, FutureResult, Message, MessageProxy, Metadata, Reply, RequestOptions, Subscriptions,
11    WasiMessagingCtx,
12};
13
14/// `wasi-messaging` implementation backed by NATS.
15impl WasiMessagingCtx for crate::Client {
16    fn connect(&self) -> FutureResult<Arc<dyn Client>> {
17        let client = self.clone();
18        async move { Ok(Arc::new(client) as Arc<dyn Client>) }.boxed()
19    }
20
21    fn new_message(&self, data: Vec<u8>) -> anyhow::Result<Arc<dyn Message>> {
22        let length = data.len();
23
24        let msg = async_nats::Message {
25            subject: String::new().into(),
26            reply: None,
27            payload: data.into(),
28            headers: None,
29            status: None,
30            description: None,
31            length,
32        };
33        Ok(Arc::new(NatsMessage(msg)) as Arc<dyn Message>)
34    }
35
36    fn set_content_type(
37        &self, message: Arc<dyn Message>, content_type: String,
38    ) -> anyhow::Result<Arc<dyn Message>> {
39        let nats_msg = message
40            .as_any()
41            .downcast_ref::<NatsMessage>()
42            .ok_or_else(|| anyhow!("invalid message type"))?;
43        let mut msg = nats_msg.0.clone();
44        let mut headers = nats_msg.0.headers.clone().unwrap_or_default();
45        headers.insert("content-type".to_string(), content_type);
46        msg.headers = Some(headers);
47        Ok(Arc::new(NatsMessage(msg)) as Arc<dyn Message>)
48    }
49
50    fn set_payload(
51        &self, message: Arc<dyn Message>, data: Vec<u8>,
52    ) -> anyhow::Result<Arc<dyn Message>> {
53        let nats_msg = message
54            .as_any()
55            .downcast_ref::<NatsMessage>()
56            .ok_or_else(|| anyhow!("invalid message type"))?;
57        let mut msg = nats_msg.0.clone();
58        msg.payload = data.clone().into();
59        msg.length = data.len();
60        Ok(Arc::new(NatsMessage(msg)) as Arc<dyn Message>)
61    }
62
63    fn add_metadata(
64        &self, message: Arc<dyn Message>, key: String, value: String,
65    ) -> anyhow::Result<Arc<dyn Message>> {
66        let nats_msg = message
67            .as_any()
68            .downcast_ref::<NatsMessage>()
69            .ok_or_else(|| anyhow!("invalid message type"))?;
70        let mut msg = nats_msg.0.clone();
71        let mut headers = nats_msg.0.headers.clone().unwrap_or_default();
72        headers.insert(key, value);
73        msg.headers = Some(headers);
74        Ok(Arc::new(NatsMessage(msg)) as Arc<dyn Message>)
75    }
76
77    fn set_metadata(
78        &self, message: Arc<dyn Message>, metadata: Metadata,
79    ) -> anyhow::Result<Arc<dyn Message>> {
80        let nats_msg = message
81            .as_any()
82            .downcast_ref::<NatsMessage>()
83            .ok_or_else(|| anyhow!("invalid message type"))?;
84        let mut msg = nats_msg.0.clone();
85        let mut headers = async_nats::HeaderMap::new();
86        for (k, v) in &metadata.inner {
87            headers.insert(k.as_str(), v.as_str());
88        }
89        msg.headers = Some(headers);
90        Ok(Arc::new(NatsMessage(msg)) as Arc<dyn Message>)
91    }
92
93    fn remove_metadata(
94        &self, message: Arc<dyn Message>, key: String,
95    ) -> anyhow::Result<Arc<dyn Message>> {
96        let nats_msg = message
97            .as_any()
98            .downcast_ref::<NatsMessage>()
99            .ok_or_else(|| anyhow!("invalid message type"))?;
100        let mut msg = nats_msg.0.clone();
101        if let Some(headers) = nats_msg.0.headers.clone() {
102            let mut updated_headers = HeaderMap::new();
103            for (k, v) in headers.iter() {
104                if k.to_string() != key {
105                    for iv in v {
106                        updated_headers.insert(k.clone(), iv.clone());
107                    }
108                }
109            }
110            msg.headers = Some(updated_headers);
111        }
112        Ok(Arc::new(NatsMessage(msg)) as Arc<dyn Message>)
113    }
114}
115
116#[derive(Clone, Debug)]
117struct NatsMessage(async_nats::Message);
118
119impl Message for NatsMessage {
120    fn topic(&self) -> String {
121        self.0.subject.to_string()
122    }
123
124    fn payload(&self) -> Vec<u8> {
125        self.0.payload.to_vec()
126    }
127
128    fn metadata(&self) -> Option<Metadata> {
129        let mut md = HashMap::new();
130        for (k, v) in self.0.headers.as_ref()?.iter() {
131            let v_str = v.iter().map(ToString::to_string).collect::<Vec<String>>().join(", ");
132            md.insert(k.to_string(), v_str);
133        }
134        Some(Metadata { inner: md })
135    }
136
137    fn description(&self) -> Option<String> {
138        self.0.description.clone()
139    }
140
141    fn length(&self) -> usize {
142        self.0.length
143    }
144
145    fn reply(&self) -> Option<Reply> {
146        self.0.reply.clone().map(|r| Reply {
147            client_name: String::new(),
148            topic: r.to_string(),
149        })
150    }
151
152    fn as_any(&self) -> &dyn Any {
153        self
154    }
155}
156
157impl Client for crate::Client {
158    fn subscribe(&self) -> FutureResult<Subscriptions> {
159        let client = self.clone();
160
161        async move {
162            let Some(topics) = client.topics else {
163                return Err(anyhow!("No topics specified"));
164            };
165
166            let mut subscribers = vec![];
167            for t in &topics {
168                let subscriber = client.inner.subscribe(t.clone()).await?;
169                subscribers.push(subscriber);
170            }
171
172            tracing::info!("subscribed to {topics:?} topics");
173
174            // process messages until terminated
175            let stream = stream::select_all(subscribers)
176                .map(|msg| MessageProxy(Arc::new(NatsMessage(msg)) as Arc<dyn Message>));
177            Ok(Box::pin(stream) as Subscriptions)
178        }
179        .boxed()
180    }
181
182    fn send(&self, topic: String, message: MessageProxy) -> FutureResult<()> {
183        let client = self.inner.clone();
184        async move {
185            let payload = message.payload();
186            let Some(headers) = message.metadata() else {
187                client.publish(topic.clone(), payload.into()).await.context("failed to publish")?;
188                return Ok(());
189            };
190
191            let mut nats_headers = async_nats::HeaderMap::new();
192            for (k, v) in headers.iter() {
193                nats_headers.insert(k.as_str(), v.as_str());
194            }
195
196            client
197                .publish_with_headers(topic.clone(), nats_headers, payload.into())
198                .await
199                .context("failed to publish")?;
200
201            Ok(())
202        }
203        .boxed()
204    }
205
206    fn request(
207        &self, topic: String, message: MessageProxy, options: Option<RequestOptions>,
208    ) -> FutureResult<MessageProxy> {
209        let client = self.inner.clone();
210
211        async move {
212            let payload = message.payload();
213            let headers = message.metadata();
214            let mut nats_headers = async_nats::HeaderMap::new();
215            if let Some(meta) = headers {
216                for (k, v) in meta.iter() {
217                    nats_headers.insert(k.as_str(), v.as_str());
218                }
219            }
220            let timeout = options.and_then(|options| options.timeout);
221
222            let request = async_nats::Request::new()
223                .payload(payload.into())
224                .headers(nats_headers)
225                .timeout(timeout);
226
227            let nats_msg = client
228                .send_request(topic.clone(), request)
229                .await
230                .context("failed to send request")?;
231            Ok(MessageProxy(Arc::new(NatsMessage(nats_msg)) as Arc<dyn Message>))
232        }
233        .boxed()
234    }
235}