1use std::collections::HashMap;
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18use std::sync::mpsc;
19use std::sync::Arc;
20use std::sync::Mutex as StdMutex;
21use std::thread;
22
23use futures_util::{SinkExt, StreamExt};
24use serde_json::Value;
25use tokio::sync::oneshot;
26use tokio::sync::{mpsc as async_mpsc, Mutex as TokioMutex};
27use tokio_tungstenite::{connect_async, tungstenite::Message};
28
29use log::{debug, info, warn};
30
31use crate::wamp;
32
33const DEFAULT_WAAPI_URL: &str = "ws://localhost:8080/waapi";
39
40const DEFAULT_REALM: &str = "realm1";
46
47#[derive(Debug, thiserror::Error)]
53pub enum WaapiError {
54 #[error("client already disconnected")]
58 Disconnected,
59 #[error("WAMP error: {0}")]
63 Wamp(String),
64 #[error("WebSocket error: {0}")]
68 WebSocket(#[from] Box<tokio_tungstenite::tungstenite::Error>),
69 #[error("{0}")]
73 Serde(#[from] serde_json::Error),
74 #[error("{0}")]
78 Io(#[from] std::io::Error),
79}
80
81type CallResult = Result<Option<Value>, WaapiError>;
84type SubResult = Result<u64, WaapiError>;
85type UnsubResult = Result<(), WaapiError>;
86
87pub type EventPayload = (u64, Option<Value>);
89
90type WsSink = futures_util::stream::SplitSink<
93 tokio_tungstenite::WebSocketStream<
94 tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
95 >,
96 Message,
97>;
98
99#[derive(Debug)]
100struct WampConn {
101 ws_tx: TokioMutex<WsSink>,
102 pending_calls: StdMutex<HashMap<u64, oneshot::Sender<CallResult>>>,
103 pending_subs: StdMutex<HashMap<u64, oneshot::Sender<SubResult>>>,
104 pending_unsubs: StdMutex<HashMap<u64, oneshot::Sender<UnsubResult>>>,
105 event_senders: StdMutex<HashMap<u64, async_mpsc::UnboundedSender<EventPayload>>>,
106 next_id: AtomicU64,
107}
108
109impl WampConn {
110 fn new(sink: WsSink) -> Self {
111 Self {
112 ws_tx: TokioMutex::new(sink),
113 pending_calls: StdMutex::new(HashMap::new()),
114 pending_subs: StdMutex::new(HashMap::new()),
115 pending_unsubs: StdMutex::new(HashMap::new()),
116 event_senders: StdMutex::new(HashMap::new()),
117 next_id: AtomicU64::new(1),
118 }
119 }
120
121 fn next_id(&self) -> u64 {
122 self.next_id.fetch_add(1, Ordering::Relaxed)
123 }
124
125 async fn send(&self, text: String) -> Result<(), WaapiError> {
126 self.ws_tx
127 .lock()
128 .await
129 .send(Message::Text(text.into()))
130 .await
131 .map_err(|e| WaapiError::WebSocket(Box::new(e)))
132 }
133}
134
135type WsStream = tokio_tungstenite::WebSocketStream<
138 tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
139>;
140
141async fn run_event_loop(
142 conn: Arc<WampConn>,
143 mut ws_rx: futures_util::stream::SplitStream<WsStream>,
144 connected: Arc<AtomicBool>,
145) {
146 while let Some(msg) = ws_rx.next().await {
147 match msg {
148 Ok(Message::Text(text)) => {
149 if let Some(wamp_msg) = wamp::parse(&text) {
150 dispatch(&conn, wamp_msg);
151 }
152 }
153 Ok(Message::Close(_)) | Err(_) => break,
154 _ => {}
155 }
156 }
157 connected.store(false, Ordering::Release);
158 drain_pending(&conn);
160}
161
162fn dispatch(conn: &WampConn, msg: wamp::WampMessage) {
163 match msg {
164 wamp::WampMessage::Result { request_id, kwargs } => {
165 if let Some(tx) = conn
166 .pending_calls
167 .lock()
168 .unwrap_or_else(|e| e.into_inner())
169 .remove(&request_id)
170 {
171 let _ = tx.send(Ok(kwargs));
172 }
173 }
174 wamp::WampMessage::Error {
175 request_type,
176 request_id,
177 error,
178 } => {
179 let err_str = error.clone();
180 if request_type == 48 {
182 if let Some(tx) = conn
183 .pending_calls
184 .lock()
185 .unwrap_or_else(|e| e.into_inner())
186 .remove(&request_id)
187 {
188 let _ = tx.send(Err(WaapiError::Wamp(err_str)));
189 return;
190 }
191 }
192 if request_type == 32 {
194 if let Some(tx) = conn
195 .pending_subs
196 .lock()
197 .unwrap_or_else(|e| e.into_inner())
198 .remove(&request_id)
199 {
200 let _ = tx.send(Err(WaapiError::Wamp(error)));
201 return;
202 }
203 }
204 if request_type == 34 {
206 if let Some(tx) = conn
207 .pending_unsubs
208 .lock()
209 .unwrap_or_else(|e| e.into_inner())
210 .remove(&request_id)
211 {
212 let _ = tx.send(Err(WaapiError::Wamp(error)));
213 }
214 }
215 }
216 wamp::WampMessage::Subscribed {
217 request_id,
218 sub_id,
219 } => {
220 if let Some(tx) = conn
221 .pending_subs
222 .lock()
223 .unwrap_or_else(|e| e.into_inner())
224 .remove(&request_id)
225 {
226 let _ = tx.send(Ok(sub_id));
227 }
228 }
229 wamp::WampMessage::Unsubscribed { request_id } => {
230 if let Some(tx) = conn
231 .pending_unsubs
232 .lock()
233 .unwrap_or_else(|e| e.into_inner())
234 .remove(&request_id)
235 {
236 let _ = tx.send(Ok(()));
237 }
238 }
239 wamp::WampMessage::Event {
240 sub_id,
241 pub_id,
242 kwargs,
243 } => {
244 let senders = conn
245 .event_senders
246 .lock()
247 .unwrap_or_else(|e| e.into_inner());
248 if let Some(tx) = senders.get(&sub_id) {
249 let _ = tx.send((pub_id, kwargs));
250 }
251 }
252 wamp::WampMessage::Goodbye | wamp::WampMessage::Welcome { .. } => {}
253 }
254}
255
256fn drain_pending(conn: &WampConn) {
258 let calls: Vec<_> = conn
259 .pending_calls
260 .lock()
261 .unwrap_or_else(|e| e.into_inner())
262 .drain()
263 .collect();
264 for (_, tx) in calls {
265 let _ = tx.send(Err(WaapiError::Disconnected));
266 }
267 let subs: Vec<_> = conn
268 .pending_subs
269 .lock()
270 .unwrap_or_else(|e| e.into_inner())
271 .drain()
272 .collect();
273 for (_, tx) in subs {
274 let _ = tx.send(Err(WaapiError::Disconnected));
275 }
276 let unsubs: Vec<_> = conn
277 .pending_unsubs
278 .lock()
279 .unwrap_or_else(|e| e.into_inner())
280 .drain()
281 .collect();
282 for (_, tx) in unsubs {
283 let _ = tx.send(Err(WaapiError::Disconnected));
284 }
285}
286
287async fn read_welcome(
291 ws_rx: &mut futures_util::stream::SplitStream<WsStream>,
292) -> Result<u64, WaapiError> {
293 loop {
294 match ws_rx.next().await {
295 Some(Ok(Message::Text(text))) => {
296 if let Some(wamp::WampMessage::Welcome { session_id }) = wamp::parse(&text) {
297 return Ok(session_id);
298 }
299 return Err(WaapiError::Wamp(format!("expected WELCOME, got: {text}")));
300 }
301 Some(Ok(_)) => continue, Some(Err(e)) => return Err(WaapiError::WebSocket(Box::new(e))),
303 None => return Err(WaapiError::Disconnected),
304 }
305 }
306}
307
308#[derive(Debug)]
317pub struct SubscriptionHandle {
318 sub_id: u64,
319 conn: Arc<WampConn>,
320 subscription_ids: Arc<StdMutex<Vec<u64>>>,
321 recv_task: Option<tokio::task::JoinHandle<()>>,
322 is_unsubscribed: bool,
323}
324
325fn mark_unsubscribed(flag: &mut bool) -> bool {
326 if *flag {
327 false
328 } else {
329 *flag = true;
330 true
331 }
332}
333
334impl SubscriptionHandle {
335 pub async fn unsubscribe(mut self) -> Result<(), WaapiError> {
341 debug!("Unsubscribing sub_id={}", self.sub_id);
342 if let Some(task) = self.recv_task.take() {
343 task.abort();
344 }
345 self.subscription_ids
346 .lock()
347 .unwrap_or_else(|e| e.into_inner())
348 .retain(|&id| id != self.sub_id);
349 self.conn
351 .event_senders
352 .lock()
353 .unwrap_or_else(|e| e.into_inner())
354 .remove(&self.sub_id);
355 if !mark_unsubscribed(&mut self.is_unsubscribed) {
356 return Ok(());
357 }
358 do_network_unsubscribe(&self.conn, self.sub_id).await
359 }
360}
361
362async fn do_network_unsubscribe(conn: &WampConn, sub_id: u64) -> Result<(), WaapiError> {
363 let id = conn.next_id();
364 let (tx, rx) = oneshot::channel();
365 conn.pending_unsubs
366 .lock()
367 .unwrap_or_else(|e| e.into_inner())
368 .insert(id, tx);
369 conn.send(wamp::unsubscribe_msg(id, sub_id)).await?;
370 rx.await.unwrap_or(Err(WaapiError::Disconnected))
371}
372
373impl Drop for SubscriptionHandle {
374 fn drop(&mut self) {
375 let sub_id = self.sub_id;
376 let conn = Arc::clone(&self.conn);
377 let subscription_ids = Arc::clone(&self.subscription_ids);
378 if let Some(task) = self.recv_task.take() {
379 task.abort();
380 }
381 subscription_ids
382 .lock()
383 .unwrap_or_else(|e| e.into_inner())
384 .retain(|&id| id != sub_id);
385 conn.event_senders
387 .lock()
388 .unwrap_or_else(|e| e.into_inner())
389 .remove(&sub_id);
390 if !mark_unsubscribed(&mut self.is_unsubscribed) {
391 return;
392 }
393 if let Ok(rt) = tokio::runtime::Handle::try_current() {
394 debug!("SubscriptionHandle dropped, spawning unsubscribe for sub_id={sub_id}");
395 rt.spawn(async move {
396 let _ = do_network_unsubscribe(&conn, sub_id).await;
397 });
398 } else {
399 warn!("SubscriptionHandle dropped without runtime, skipping network unsubscribe for sub_id={sub_id}");
400 }
401 }
402}
403
404#[derive(Debug)]
418pub struct WaapiClient {
419 conn: Option<Arc<WampConn>>,
420 event_loop_handle: Option<tokio::task::JoinHandle<()>>,
421 subscription_ids: Arc<StdMutex<Vec<u64>>>,
422 connected: Arc<AtomicBool>,
423}
424
425impl WaapiClient {
426 pub async fn connect() -> Result<Self, WaapiError> {
434 Self::connect_with_url(DEFAULT_WAAPI_URL).await
435 }
436
437 pub async fn connect_with_url(url: &str) -> Result<Self, WaapiError> {
443 info!("Connecting to WAAPI at {url}");
444 let (ws_stream, _) = connect_async(url).await.map_err(|e| WaapiError::WebSocket(Box::new(e)))?;
445 let (ws_tx, mut ws_rx) = ws_stream.split();
446
447 let conn = Arc::new(WampConn::new(ws_tx));
448
449 conn.send(wamp::hello_msg(DEFAULT_REALM)).await?;
451 let _session_id = read_welcome(&mut ws_rx).await?;
452
453 let connected = Arc::new(AtomicBool::new(true));
454 let connected_flag = Arc::clone(&connected);
455 let conn_for_loop = Arc::clone(&conn);
456 let handle = tokio::spawn(async move {
457 run_event_loop(conn_for_loop, ws_rx, connected_flag).await;
458 });
459
460 info!("Connected to WAAPI at {url}");
461 Ok(Self {
462 conn: Some(conn),
463 event_loop_handle: Some(handle),
464 subscription_ids: Arc::new(StdMutex::new(Vec::new())),
465 connected,
466 })
467 }
468
469 pub async fn call(
485 &self,
486 uri: &str,
487 args: Option<Value>,
488 options: Option<Value>,
489 ) -> Result<Option<Value>, WaapiError> {
490 let conn = self.conn.as_ref().ok_or(WaapiError::Disconnected)?;
491 let id = conn.next_id();
492 let (tx, rx) = oneshot::channel();
493 conn.pending_calls
494 .lock()
495 .unwrap_or_else(|e| e.into_inner())
496 .insert(id, tx);
497 debug!("Calling WAAPI: {uri} (id={id})");
498 conn.send(wamp::call_msg(id, uri, args.as_ref(), options.as_ref()))
499 .await?;
500 rx.await.unwrap_or(Err(WaapiError::Disconnected))
501 }
502
503 pub(crate) async fn subscribe_inner(
505 &self,
506 topic: &str,
507 options: Option<Value>,
508 ) -> Result<
509 (
510 SubscriptionHandle,
511 async_mpsc::UnboundedReceiver<EventPayload>,
512 ),
513 WaapiError,
514 > {
515 let conn = self.conn.as_ref().ok_or(WaapiError::Disconnected)?;
516 let id = conn.next_id();
517 let (tx, rx) = oneshot::channel();
518 conn.pending_subs
519 .lock()
520 .unwrap_or_else(|e| e.into_inner())
521 .insert(id, tx);
522 conn.send(wamp::subscribe_msg(id, topic, options.as_ref()))
523 .await?;
524 let sub_id = rx.await.unwrap_or(Err(WaapiError::Disconnected))?;
525 debug!("Subscribed to {topic} (sub_id={sub_id})");
526
527 let (event_tx, event_rx) = async_mpsc::unbounded_channel();
528 conn.event_senders
529 .lock()
530 .unwrap_or_else(|e| e.into_inner())
531 .insert(sub_id, event_tx);
532 self.subscription_ids
533 .lock()
534 .unwrap_or_else(|e| e.into_inner())
535 .push(sub_id);
536
537 let handle = SubscriptionHandle {
538 sub_id,
539 conn: Arc::clone(conn),
540 subscription_ids: Arc::clone(&self.subscription_ids),
541 recv_task: None,
542 is_unsubscribed: false,
543 };
544 Ok((handle, event_rx))
545 }
546
547 pub async fn subscribe<F>(
562 &self,
563 topic: &str,
564 options: Option<Value>,
565 callback: F,
566 ) -> Result<SubscriptionHandle, WaapiError>
567 where
568 F: Fn(Option<Value>) + Send + Sync + 'static,
569 {
570 let (mut handle, mut event_rx) = self.subscribe_inner(topic, options).await?;
571 let recv_task = tokio::spawn(async move {
572 while let Some((_pub_id, kwargs)) = event_rx.recv().await {
573 callback(kwargs);
574 }
575 });
576 handle.recv_task = Some(recv_task);
577 Ok(handle)
578 }
579
580 #[must_use]
586 pub fn is_connected(&self) -> bool {
587 self.conn.is_some() && self.connected.load(Ordering::Acquire)
588 }
589
590 pub async fn disconnect(mut self) {
598 info!("Disconnecting from WAAPI");
599 self.cleanup().await;
600 info!("Disconnected from WAAPI");
601 }
602
603 async fn cleanup(&mut self) {
604 self.connected.store(false, Ordering::Release);
605 if let Some(conn) = self.conn.take() {
606 let ids: Vec<u64> = {
608 let mut guard = self.subscription_ids.lock().unwrap_or_else(|e| e.into_inner());
609 std::mem::take(&mut *guard)
610 };
611 for sub_id in ids {
612 let id = conn.next_id();
613 let (tx, rx) = oneshot::channel();
614 conn.pending_unsubs
615 .lock()
616 .unwrap_or_else(|e| e.into_inner())
617 .insert(id, tx);
618 if conn.send(wamp::unsubscribe_msg(id, sub_id)).await.is_ok() {
619 let _ = rx.await;
620 }
621 }
622 let _ = conn.send(wamp::goodbye_msg()).await;
624 let _ = conn.ws_tx.lock().await.close().await;
626 }
627 if let Some(handle) = self.event_loop_handle.take() {
628 handle.abort();
629 }
630 }
631}
632
633impl Drop for WaapiClient {
634 fn drop(&mut self) {
635 if self.conn.is_some() || self.event_loop_handle.is_some() {
636 let conn = self.conn.take();
637 let event_loop = self.event_loop_handle.take();
638 let subscription_ids = Arc::clone(&self.subscription_ids);
639 let connected = Arc::clone(&self.connected);
640 connected.store(false, Ordering::Release);
641 if let Ok(rt) = tokio::runtime::Handle::try_current() {
642 debug!("WaapiClient dropped, spawning async cleanup");
643 rt.spawn(async move {
644 if let Some(conn) = conn {
645 let ids: Vec<u64> = {
646 let mut guard =
647 subscription_ids.lock().unwrap_or_else(|e| e.into_inner());
648 std::mem::take(&mut *guard)
649 };
650 for sub_id in ids {
651 let id = conn.next_id();
652 let (tx, rx) = oneshot::channel::<UnsubResult>();
653 conn.pending_unsubs
654 .lock()
655 .unwrap_or_else(|e| e.into_inner())
656 .insert(id, tx);
657 if conn.send(wamp::unsubscribe_msg(id, sub_id)).await.is_ok() {
658 let _ = rx.await;
659 }
660 }
661 let _ = conn.send(wamp::goodbye_msg()).await;
662 let _ = conn.ws_tx.lock().await.close().await;
663 }
664 if let Some(h) = event_loop {
665 h.abort();
666 }
667 });
668 } else {
669 warn!("WaapiClient dropped without runtime, skipping graceful cleanup");
670 if let Some(h) = event_loop {
671 h.abort();
672 }
673 }
674 }
675 }
676}
677
678#[derive(Debug)]
689pub struct SubscriptionHandleSync {
690 runtime: Arc<tokio::runtime::Runtime>,
691 inner: Option<SubscriptionHandle>,
692 bridge_join: Option<thread::JoinHandle<()>>,
693 bridge_thread_id: Option<thread::ThreadId>,
694}
695
696impl SubscriptionHandleSync {
697 pub fn unsubscribe(mut self) -> Result<(), WaapiError> {
703 let inner = self.inner.take();
704 let bridge_join = self.bridge_join.take();
705 if let Some(h) = inner {
706 self.runtime.block_on(h.unsubscribe())?;
707 }
708 if let Some(jh) = bridge_join {
709 let _ = jh.join();
710 }
711 Ok(())
712 }
713}
714
715impl Drop for SubscriptionHandleSync {
716 fn drop(&mut self) {
717 let is_bridge_thread = self.bridge_thread_id.as_ref() == Some(&thread::current().id());
718 let inner = self.inner.take();
719 let bridge_join = self.bridge_join.take();
720 let runtime = Arc::clone(&self.runtime);
721 if let Some(h) = inner {
722 if tokio::runtime::Handle::try_current().is_ok() {
723 warn!("SubscriptionHandleSync dropped inside async context, falling back to spawn");
724 runtime.handle().spawn(async move {
725 let _ = h.unsubscribe().await;
726 });
727 } else {
728 let _ = runtime.block_on(h.unsubscribe());
729 }
730 }
731 if !is_bridge_thread {
732 if let Some(jh) = bridge_join {
733 let _ = jh.join();
734 }
735 }
736 }
737}
738
739#[derive(Debug)]
750pub struct WaapiClientSync {
751 runtime: Arc<tokio::runtime::Runtime>,
752 client: Option<WaapiClient>,
753}
754
755impl WaapiClientSync {
756 pub fn connect() -> Result<Self, WaapiError> {
762 Self::connect_with_url(DEFAULT_WAAPI_URL)
763 }
764
765 pub fn connect_with_url(url: &str) -> Result<Self, WaapiError> {
771 info!("Connecting to WAAPI (sync) at {url}");
772 let runtime = Arc::new(
773 tokio::runtime::Builder::new_multi_thread()
774 .enable_all()
775 .build()?,
776 );
777 let client = runtime.block_on(WaapiClient::connect_with_url(url))?;
778 info!("Connected to WAAPI (sync) at {url}");
779 Ok(Self {
780 runtime,
781 client: Some(client),
782 })
783 }
784
785 pub fn call(
791 &self,
792 uri: &str,
793 args: Option<Value>,
794 options: Option<Value>,
795 ) -> Result<Option<Value>, WaapiError> {
796 let client = self.client.as_ref().ok_or(WaapiError::Disconnected)?;
797 self.runtime.block_on(client.call(uri, args, options))
798 }
799
800 pub fn subscribe<F>(
810 &self,
811 topic: &str,
812 options: Option<Value>,
813 callback: F,
814 ) -> Result<SubscriptionHandleSync, WaapiError>
815 where
816 F: Fn(Option<Value>) + Send + Sync + 'static,
817 {
818 let client = self.client.as_ref().ok_or(WaapiError::Disconnected)?;
819 let (inner, mut async_rx) = self
820 .runtime
821 .block_on(client.subscribe_inner(topic, options))?;
822 let (id_tx, id_rx) = mpsc::channel();
823 let runtime = Arc::clone(&self.runtime);
824 let bridge_join = thread::spawn(move || {
825 let _ = id_tx.send(thread::current().id());
826 while let Some((_pub_id, kwargs)) = runtime.block_on(async_rx.recv()) {
827 callback(kwargs);
828 }
829 });
830 let bridge_thread_id = id_rx.recv().ok();
831 Ok(SubscriptionHandleSync {
832 runtime: Arc::clone(&self.runtime),
833 inner: Some(inner),
834 bridge_join: Some(bridge_join),
835 bridge_thread_id,
836 })
837 }
838
839 #[must_use]
845 pub fn is_connected(&self) -> bool {
846 self.client.as_ref().is_some_and(|c| c.is_connected())
847 }
848
849 pub fn disconnect(mut self) {
855 info!("Disconnecting from WAAPI (sync)");
856 if let Some(client) = self.client.take() {
857 self.runtime.block_on(client.disconnect());
858 }
859 info!("Disconnected from WAAPI (sync)");
860 }
861}
862
863impl Drop for WaapiClientSync {
864 fn drop(&mut self) {
865 if let Some(client) = self.client.take() {
866 if tokio::runtime::Handle::try_current().is_ok() {
867 warn!("WaapiClientSync dropped inside async context, offloading cleanup to a dedicated thread");
868 let runtime = Arc::clone(&self.runtime);
869 let _ = thread::Builder::new()
870 .name("waapi-sync-drop-cleanup".to_string())
871 .spawn(move || {
872 runtime.block_on(client.disconnect());
873 });
874 } else {
875 self.runtime.block_on(client.disconnect());
876 }
877 }
878 }
879}
880
881#[cfg(test)]
884mod tests {
885 use super::*;
886
887 #[test]
888 fn test_mark_unsubscribed_is_idempotent() {
889 let mut is_unsubscribed = false;
890 assert!(mark_unsubscribed(&mut is_unsubscribed));
891 assert!(!mark_unsubscribed(&mut is_unsubscribed));
892 }
893
894 #[tokio::test]
895 async fn test_sync_client_drop_inside_async_context_is_safe() {
896 let runtime = Arc::new(
897 tokio::runtime::Builder::new_multi_thread()
898 .enable_all()
899 .build()
900 .expect("failed to create runtime"),
901 );
902 let async_client = WaapiClient {
903 conn: None,
904 event_loop_handle: None,
905 subscription_ids: Arc::new(StdMutex::new(Vec::new())),
906 connected: Arc::new(AtomicBool::new(false)),
907 };
908 let sync_client = WaapiClientSync {
909 runtime,
910 client: Some(async_client),
911 };
912 drop(sync_client);
913 }
914}