1use std::any::Any;
2use std::collections::HashMap;
3use std::sync::Arc;
4
5use anyhow::{Context, anyhow};
6use async_nats::HeaderMap;
7use futures::future::FutureExt;
8use futures::stream::{self, StreamExt};
9use omnia_wasi_messaging::{
10 Client, FutureResult, Message, MessageProxy, Metadata, Reply, RequestOptions, Subscriptions,
11 WasiMessagingCtx,
12};
13
14impl WasiMessagingCtx for crate::Client {
16 fn connect(&self) -> FutureResult<Arc<dyn Client>> {
17 let client = self.clone();
18 async move { Ok(Arc::new(client) as Arc<dyn Client>) }.boxed()
19 }
20
21 fn new_message(&self, data: Vec<u8>) -> anyhow::Result<Arc<dyn Message>> {
22 let length = data.len();
23
24 let msg = async_nats::Message {
25 subject: String::new().into(),
26 reply: None,
27 payload: data.into(),
28 headers: None,
29 status: None,
30 description: None,
31 length,
32 };
33 Ok(Arc::new(NatsMessage(msg)) as Arc<dyn Message>)
34 }
35
36 fn set_content_type(
37 &self, message: Arc<dyn Message>, content_type: String,
38 ) -> anyhow::Result<Arc<dyn Message>> {
39 let nats_msg = message
40 .as_any()
41 .downcast_ref::<NatsMessage>()
42 .ok_or_else(|| anyhow!("invalid message type"))?;
43 let mut msg = nats_msg.0.clone();
44 let mut headers = nats_msg.0.headers.clone().unwrap_or_default();
45 headers.insert("content-type".to_string(), content_type);
46 msg.headers = Some(headers);
47 Ok(Arc::new(NatsMessage(msg)) as Arc<dyn Message>)
48 }
49
50 fn set_payload(
51 &self, message: Arc<dyn Message>, data: Vec<u8>,
52 ) -> anyhow::Result<Arc<dyn Message>> {
53 let nats_msg = message
54 .as_any()
55 .downcast_ref::<NatsMessage>()
56 .ok_or_else(|| anyhow!("invalid message type"))?;
57 let mut msg = nats_msg.0.clone();
58 msg.payload = data.clone().into();
59 msg.length = data.len();
60 Ok(Arc::new(NatsMessage(msg)) as Arc<dyn Message>)
61 }
62
63 fn add_metadata(
64 &self, message: Arc<dyn Message>, key: String, value: String,
65 ) -> anyhow::Result<Arc<dyn Message>> {
66 let nats_msg = message
67 .as_any()
68 .downcast_ref::<NatsMessage>()
69 .ok_or_else(|| anyhow!("invalid message type"))?;
70 let mut msg = nats_msg.0.clone();
71 let mut headers = nats_msg.0.headers.clone().unwrap_or_default();
72 headers.insert(key, value);
73 msg.headers = Some(headers);
74 Ok(Arc::new(NatsMessage(msg)) as Arc<dyn Message>)
75 }
76
77 fn set_metadata(
78 &self, message: Arc<dyn Message>, metadata: Metadata,
79 ) -> anyhow::Result<Arc<dyn Message>> {
80 let nats_msg = message
81 .as_any()
82 .downcast_ref::<NatsMessage>()
83 .ok_or_else(|| anyhow!("invalid message type"))?;
84 let mut msg = nats_msg.0.clone();
85 let mut headers = async_nats::HeaderMap::new();
86 for (k, v) in &metadata.inner {
87 headers.insert(k.as_str(), v.as_str());
88 }
89 msg.headers = Some(headers);
90 Ok(Arc::new(NatsMessage(msg)) as Arc<dyn Message>)
91 }
92
93 fn remove_metadata(
94 &self, message: Arc<dyn Message>, key: String,
95 ) -> anyhow::Result<Arc<dyn Message>> {
96 let nats_msg = message
97 .as_any()
98 .downcast_ref::<NatsMessage>()
99 .ok_or_else(|| anyhow!("invalid message type"))?;
100 let mut msg = nats_msg.0.clone();
101 if let Some(headers) = nats_msg.0.headers.clone() {
102 let mut updated_headers = HeaderMap::new();
103 for (k, v) in headers.iter() {
104 if k.to_string() != key {
105 for iv in v {
106 updated_headers.insert(k.clone(), iv.clone());
107 }
108 }
109 }
110 msg.headers = Some(updated_headers);
111 }
112 Ok(Arc::new(NatsMessage(msg)) as Arc<dyn Message>)
113 }
114}
115
116#[derive(Clone, Debug)]
117struct NatsMessage(async_nats::Message);
118
119impl Message for NatsMessage {
120 fn topic(&self) -> String {
121 self.0.subject.to_string()
122 }
123
124 fn payload(&self) -> Vec<u8> {
125 self.0.payload.to_vec()
126 }
127
128 fn metadata(&self) -> Option<Metadata> {
129 let mut md = HashMap::new();
130 for (k, v) in self.0.headers.as_ref()?.iter() {
131 let v_str = v.iter().map(ToString::to_string).collect::<Vec<String>>().join(", ");
132 md.insert(k.to_string(), v_str);
133 }
134 Some(Metadata { inner: md })
135 }
136
137 fn description(&self) -> Option<String> {
138 self.0.description.clone()
139 }
140
141 fn length(&self) -> usize {
142 self.0.length
143 }
144
145 fn reply(&self) -> Option<Reply> {
146 self.0.reply.clone().map(|r| Reply {
147 client_name: String::new(),
148 topic: r.to_string(),
149 })
150 }
151
152 fn as_any(&self) -> &dyn Any {
153 self
154 }
155}
156
157impl Client for crate::Client {
158 fn subscribe(&self) -> FutureResult<Subscriptions> {
159 let client = self.clone();
160
161 async move {
162 let Some(topics) = client.topics else {
163 return Err(anyhow!("No topics specified"));
164 };
165
166 let mut subscribers = vec![];
167 for t in &topics {
168 let subscriber = client.inner.subscribe(t.clone()).await?;
169 subscribers.push(subscriber);
170 }
171
172 tracing::info!("subscribed to {topics:?} topics");
173
174 let stream = stream::select_all(subscribers)
176 .map(|msg| MessageProxy(Arc::new(NatsMessage(msg)) as Arc<dyn Message>));
177 Ok(Box::pin(stream) as Subscriptions)
178 }
179 .boxed()
180 }
181
182 fn send(&self, topic: String, message: MessageProxy) -> FutureResult<()> {
183 let client = self.inner.clone();
184 async move {
185 let payload = message.payload();
186 let Some(headers) = message.metadata() else {
187 client.publish(topic.clone(), payload.into()).await.context("failed to publish")?;
188 return Ok(());
189 };
190
191 let mut nats_headers = async_nats::HeaderMap::new();
192 for (k, v) in headers.iter() {
193 nats_headers.insert(k.as_str(), v.as_str());
194 }
195
196 client
197 .publish_with_headers(topic.clone(), nats_headers, payload.into())
198 .await
199 .context("failed to publish")?;
200
201 Ok(())
202 }
203 .boxed()
204 }
205
206 fn request(
207 &self, topic: String, message: MessageProxy, options: Option<RequestOptions>,
208 ) -> FutureResult<MessageProxy> {
209 let client = self.inner.clone();
210
211 async move {
212 let payload = message.payload();
213 let headers = message.metadata();
214 let mut nats_headers = async_nats::HeaderMap::new();
215 if let Some(meta) = headers {
216 for (k, v) in meta.iter() {
217 nats_headers.insert(k.as_str(), v.as_str());
218 }
219 }
220 let timeout = options.and_then(|options| options.timeout);
221
222 let request = async_nats::Request::new()
223 .payload(payload.into())
224 .headers(nats_headers)
225 .timeout(timeout);
226
227 let nats_msg = client
228 .send_request(topic.clone(), request)
229 .await
230 .context("failed to send request")?;
231 Ok(MessageProxy(Arc::new(NatsMessage(nats_msg)) as Arc<dyn Message>))
232 }
233 .boxed()
234 }
235}