Skip to main content

waapi_rs/
client.rs

1//! WAAPI client implementation: async [WaapiClient] and sync [WaapiClientSync].
2//!
3//! Connection lifecycle: after `connect`, joins the default realm; on disconnect,
4//! all subscriptions are cancelled first, then GOODBYE, then close.
5//! Subscriptions are managed via [SubscriptionHandle]; explicit [SubscriptionHandle::unsubscribe]
6//! or drop will cancel automatically. For sync clients, [SubscriptionHandleSync] is used.
7//!
8//! ---
9//!
10//! WAAPI 客户端实现:异步 [WaapiClient] 与同步 [WaapiClientSync]。
11//!
12//! 连接生命周期:`connect` 后加入默认 realm,断开时先取消所有订阅、再发 GOODBYE、再关闭连接。
13//! 订阅通过 [SubscriptionHandle] 管理,显式 [SubscriptionHandle::unsubscribe] 或 drop 时自动取消;
14//! 同步客户端下由 [SubscriptionHandleSync] 管理。
15
16use std::collections::HashMap;
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18use std::sync::mpsc;
19use std::sync::Arc;
20use std::sync::Mutex as StdMutex;
21use std::thread;
22
23use futures_util::{SinkExt, StreamExt};
24use serde_json::Value;
25use tokio::sync::oneshot;
26use tokio::sync::{mpsc as async_mpsc, Mutex as TokioMutex};
27use tokio_tungstenite::{connect_async, tungstenite::Message};
28
29use log::{debug, info, warn};
30
31use crate::wamp;
32
33/// Default WAAPI WebSocket URL (default port for Wwise local Authoring API).
34///
35/// ---
36///
37/// 默认 WAAPI WebSocket 地址(Wwise 本机 Authoring API 默认端口)。
38const DEFAULT_WAAPI_URL: &str = "ws://localhost:8080/waapi";
39
40/// Default WAMP realm name, matching the Wwise WAAPI server default.
41///
42/// ---
43///
44/// 连接时使用的默认 WAMP realm 名称,与 Wwise WAAPI 服务端默认一致。
45const DEFAULT_REALM: &str = "realm1";
46
47/// WAAPI client error type.
48///
49/// ---
50///
51/// WAAPI 客户端错误类型。
52#[derive(Debug, thiserror::Error)]
53pub enum WaapiError {
54    /// Client already disconnected.
55    ///
56    /// 客户端已断开连接。
57    #[error("client already disconnected")]
58    Disconnected,
59    /// WAMP protocol error (e.g. server returned ERROR message).
60    ///
61    /// WAMP 协议层错误(如服务端返回 ERROR 消息)。
62    #[error("WAMP error: {0}")]
63    Wamp(String),
64    /// WebSocket error.
65    ///
66    /// WebSocket 层错误。
67    #[error("WebSocket error: {0}")]
68    WebSocket(#[from] Box<tokio_tungstenite::tungstenite::Error>),
69    /// Serialization / deserialization error.
70    ///
71    /// 序列化/反序列化错误。
72    #[error("{0}")]
73    Serde(#[from] serde_json::Error),
74    /// IO error (e.g. failed to create tokio runtime).
75    ///
76    /// IO 错误(如 tokio runtime 创建失败)。
77    #[error("{0}")]
78    Io(#[from] std::io::Error),
79}
80
81// ── 内部类型别名 ────────────────────────────────────────────────
82
83type CallResult = Result<Option<Value>, WaapiError>;
84type SubResult = Result<u64, WaapiError>;
85type UnsubResult = Result<(), WaapiError>;
86
87/// 订阅事件 payload:`(pub_id, kwargs)`
88pub type EventPayload = (u64, Option<Value>);
89
90// ── 内部连接状态 ─────────────────────────────────────────────────
91
92type WsSink = futures_util::stream::SplitSink<
93    tokio_tungstenite::WebSocketStream<
94        tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
95    >,
96    Message,
97>;
98
99#[derive(Debug)]
100struct WampConn {
101    ws_tx: TokioMutex<WsSink>,
102    pending_calls: StdMutex<HashMap<u64, oneshot::Sender<CallResult>>>,
103    pending_subs: StdMutex<HashMap<u64, oneshot::Sender<SubResult>>>,
104    pending_unsubs: StdMutex<HashMap<u64, oneshot::Sender<UnsubResult>>>,
105    event_senders: StdMutex<HashMap<u64, async_mpsc::UnboundedSender<EventPayload>>>,
106    next_id: AtomicU64,
107}
108
109impl WampConn {
110    fn new(sink: WsSink) -> Self {
111        Self {
112            ws_tx: TokioMutex::new(sink),
113            pending_calls: StdMutex::new(HashMap::new()),
114            pending_subs: StdMutex::new(HashMap::new()),
115            pending_unsubs: StdMutex::new(HashMap::new()),
116            event_senders: StdMutex::new(HashMap::new()),
117            next_id: AtomicU64::new(1),
118        }
119    }
120
121    fn next_id(&self) -> u64 {
122        self.next_id.fetch_add(1, Ordering::Relaxed)
123    }
124
125    async fn send(&self, text: String) -> Result<(), WaapiError> {
126        self.ws_tx
127            .lock()
128            .await
129            .send(Message::Text(text.into()))
130            .await
131            .map_err(|e| WaapiError::WebSocket(Box::new(e)))
132    }
133}
134
135// ── 事件循环 ──────────────────────────────────────────────────────
136
137type WsStream = tokio_tungstenite::WebSocketStream<
138    tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
139>;
140
141async fn run_event_loop(
142    conn: Arc<WampConn>,
143    mut ws_rx: futures_util::stream::SplitStream<WsStream>,
144    connected: Arc<AtomicBool>,
145) {
146    while let Some(msg) = ws_rx.next().await {
147        match msg {
148            Ok(Message::Text(text)) => {
149                if let Some(wamp_msg) = wamp::parse(&text) {
150                    dispatch(&conn, wamp_msg);
151                }
152            }
153            Ok(Message::Close(_)) | Err(_) => break,
154            _ => {}
155        }
156    }
157    connected.store(false, Ordering::Release);
158    // 连接断开时,唤醒所有等待中的 pending futures 并报错
159    drain_pending(&conn);
160}
161
162fn dispatch(conn: &WampConn, msg: wamp::WampMessage) {
163    match msg {
164        wamp::WampMessage::Result { request_id, kwargs } => {
165            if let Some(tx) = conn
166                .pending_calls
167                .lock()
168                .unwrap_or_else(|e| e.into_inner())
169                .remove(&request_id)
170            {
171                let _ = tx.send(Ok(kwargs));
172            }
173        }
174        wamp::WampMessage::Error {
175            request_type,
176            request_id,
177            error,
178        } => {
179            let err_str = error.clone();
180            // CALL error (type 48)
181            if request_type == 48 {
182                if let Some(tx) = conn
183                    .pending_calls
184                    .lock()
185                    .unwrap_or_else(|e| e.into_inner())
186                    .remove(&request_id)
187                {
188                    let _ = tx.send(Err(WaapiError::Wamp(err_str)));
189                    return;
190                }
191            }
192            // SUBSCRIBE error (type 32)
193            if request_type == 32 {
194                if let Some(tx) = conn
195                    .pending_subs
196                    .lock()
197                    .unwrap_or_else(|e| e.into_inner())
198                    .remove(&request_id)
199                {
200                    let _ = tx.send(Err(WaapiError::Wamp(error)));
201                    return;
202                }
203            }
204            // UNSUBSCRIBE error (type 34)
205            if request_type == 34 {
206                if let Some(tx) = conn
207                    .pending_unsubs
208                    .lock()
209                    .unwrap_or_else(|e| e.into_inner())
210                    .remove(&request_id)
211                {
212                    let _ = tx.send(Err(WaapiError::Wamp(error)));
213                }
214            }
215        }
216        wamp::WampMessage::Subscribed {
217            request_id,
218            sub_id,
219        } => {
220            if let Some(tx) = conn
221                .pending_subs
222                .lock()
223                .unwrap_or_else(|e| e.into_inner())
224                .remove(&request_id)
225            {
226                let _ = tx.send(Ok(sub_id));
227            }
228        }
229        wamp::WampMessage::Unsubscribed { request_id } => {
230            if let Some(tx) = conn
231                .pending_unsubs
232                .lock()
233                .unwrap_or_else(|e| e.into_inner())
234                .remove(&request_id)
235            {
236                let _ = tx.send(Ok(()));
237            }
238        }
239        wamp::WampMessage::Event {
240            sub_id,
241            pub_id,
242            kwargs,
243        } => {
244            let senders = conn
245                .event_senders
246                .lock()
247                .unwrap_or_else(|e| e.into_inner());
248            if let Some(tx) = senders.get(&sub_id) {
249                let _ = tx.send((pub_id, kwargs));
250            }
251        }
252        wamp::WampMessage::Goodbye | wamp::WampMessage::Welcome { .. } => {}
253    }
254}
255
256/// 连接断开时,向所有等待中的 pending futures 发送 Disconnected 错误。
257fn drain_pending(conn: &WampConn) {
258    let calls: Vec<_> = conn
259        .pending_calls
260        .lock()
261        .unwrap_or_else(|e| e.into_inner())
262        .drain()
263        .collect();
264    for (_, tx) in calls {
265        let _ = tx.send(Err(WaapiError::Disconnected));
266    }
267    let subs: Vec<_> = conn
268        .pending_subs
269        .lock()
270        .unwrap_or_else(|e| e.into_inner())
271        .drain()
272        .collect();
273    for (_, tx) in subs {
274        let _ = tx.send(Err(WaapiError::Disconnected));
275    }
276    let unsubs: Vec<_> = conn
277        .pending_unsubs
278        .lock()
279        .unwrap_or_else(|e| e.into_inner())
280        .drain()
281        .collect();
282    for (_, tx) in unsubs {
283        let _ = tx.send(Err(WaapiError::Disconnected));
284    }
285}
286
287// ── 连接握手辅助 ─────────────────────────────────────────────────
288
289/// 从 WebSocket 流读取第一条消息,期望是 WELCOME,否则返回错误。
290async fn read_welcome(
291    ws_rx: &mut futures_util::stream::SplitStream<WsStream>,
292) -> Result<u64, WaapiError> {
293    loop {
294        match ws_rx.next().await {
295            Some(Ok(Message::Text(text))) => {
296                if let Some(wamp::WampMessage::Welcome { session_id }) = wamp::parse(&text) {
297                    return Ok(session_id);
298                }
299                return Err(WaapiError::Wamp(format!("expected WELCOME, got: {text}")));
300            }
301            Some(Ok(_)) => continue, // 忽略非文本帧(如 Ping)
302            Some(Err(e)) => return Err(WaapiError::WebSocket(Box::new(e))),
303            None => return Err(WaapiError::Disconnected),
304        }
305    }
306}
307
308// ── 公共 API ──────────────────────────────────────────────────────
309
310/// Subscription handle: used to cancel a subscription; automatically unsubscribes
311/// in the background on drop.
312///
313/// ---
314///
315/// 订阅句柄:用于取消订阅;drop 时会自动在后台执行 unsubscribe。
316#[derive(Debug)]
317pub struct SubscriptionHandle {
318    sub_id: u64,
319    conn: Arc<WampConn>,
320    subscription_ids: Arc<StdMutex<Vec<u64>>>,
321    recv_task: Option<tokio::task::JoinHandle<()>>,
322    is_unsubscribed: bool,
323}
324
325fn mark_unsubscribed(flag: &mut bool) -> bool {
326    if *flag {
327        false
328    } else {
329        *flag = true;
330        true
331    }
332}
333
334impl SubscriptionHandle {
335    /// Cancel the subscription and stop the callback loop (if any).
336    ///
337    /// ---
338    ///
339    /// 取消订阅并停止回调循环(若有)。
340    pub async fn unsubscribe(mut self) -> Result<(), WaapiError> {
341        debug!("Unsubscribing sub_id={}", self.sub_id);
342        if let Some(task) = self.recv_task.take() {
343            task.abort();
344        }
345        self.subscription_ids
346            .lock()
347            .unwrap_or_else(|e| e.into_inner())
348            .retain(|&id| id != self.sub_id);
349        // Drop the event sender so receivers (e.g. sync bridge thread) see channel closed.
350        self.conn
351            .event_senders
352            .lock()
353            .unwrap_or_else(|e| e.into_inner())
354            .remove(&self.sub_id);
355        if !mark_unsubscribed(&mut self.is_unsubscribed) {
356            return Ok(());
357        }
358        do_network_unsubscribe(&self.conn, self.sub_id).await
359    }
360}
361
362async fn do_network_unsubscribe(conn: &WampConn, sub_id: u64) -> Result<(), WaapiError> {
363    let id = conn.next_id();
364    let (tx, rx) = oneshot::channel();
365    conn.pending_unsubs
366        .lock()
367        .unwrap_or_else(|e| e.into_inner())
368        .insert(id, tx);
369    conn.send(wamp::unsubscribe_msg(id, sub_id)).await?;
370    rx.await.unwrap_or(Err(WaapiError::Disconnected))
371}
372
373impl Drop for SubscriptionHandle {
374    fn drop(&mut self) {
375        let sub_id = self.sub_id;
376        let conn = Arc::clone(&self.conn);
377        let subscription_ids = Arc::clone(&self.subscription_ids);
378        if let Some(task) = self.recv_task.take() {
379            task.abort();
380        }
381        subscription_ids
382            .lock()
383            .unwrap_or_else(|e| e.into_inner())
384            .retain(|&id| id != sub_id);
385        // Drop the event sender so receivers see channel closed.
386        conn.event_senders
387            .lock()
388            .unwrap_or_else(|e| e.into_inner())
389            .remove(&sub_id);
390        if !mark_unsubscribed(&mut self.is_unsubscribed) {
391            return;
392        }
393        if let Ok(rt) = tokio::runtime::Handle::try_current() {
394            debug!("SubscriptionHandle dropped, spawning unsubscribe for sub_id={sub_id}");
395            rt.spawn(async move {
396                let _ = do_network_unsubscribe(&conn, sub_id).await;
397            });
398        } else {
399            warn!("SubscriptionHandle dropped without runtime, skipping network unsubscribe for sub_id={sub_id}");
400        }
401    }
402}
403
404/// Async WAAPI client.
405///
406/// Provides async access to the Wwise Authoring API (WAAPI);
407/// can be shared across tasks (internal Arc).
408///
409/// **It is recommended to call [`disconnect`](WaapiClient::disconnect) explicitly**
410/// for graceful shutdown.
411///
412/// ---
413///
414/// WAAPI 异步客户端。
415///
416/// **建议显式调用 [`disconnect`](WaapiClient::disconnect)** 以确保优雅关闭。
417#[derive(Debug)]
418pub struct WaapiClient {
419    conn: Option<Arc<WampConn>>,
420    event_loop_handle: Option<tokio::task::JoinHandle<()>>,
421    subscription_ids: Arc<StdMutex<Vec<u64>>>,
422    connected: Arc<AtomicBool>,
423}
424
425impl WaapiClient {
426    /// Connect to WAAPI using the default URL.
427    ///
428    /// Connects to `ws://localhost:8080/waapi` with the default realm.
429    ///
430    /// ---
431    ///
432    /// 使用默认 URL 连接到 WAAPI。
433    pub async fn connect() -> Result<Self, WaapiError> {
434        Self::connect_with_url(DEFAULT_WAAPI_URL).await
435    }
436
437    /// Connect to WAAPI at the specified URL.
438    ///
439    /// ---
440    ///
441    /// 使用指定 URL 连接到 WAAPI。
442    pub async fn connect_with_url(url: &str) -> Result<Self, WaapiError> {
443        info!("Connecting to WAAPI at {url}");
444        let (ws_stream, _) = connect_async(url).await.map_err(|e| WaapiError::WebSocket(Box::new(e)))?;
445        let (ws_tx, mut ws_rx) = ws_stream.split();
446
447        let conn = Arc::new(WampConn::new(ws_tx));
448
449        // HELLO handshake
450        conn.send(wamp::hello_msg(DEFAULT_REALM)).await?;
451        let _session_id = read_welcome(&mut ws_rx).await?;
452
453        let connected = Arc::new(AtomicBool::new(true));
454        let connected_flag = Arc::clone(&connected);
455        let conn_for_loop = Arc::clone(&conn);
456        let handle = tokio::spawn(async move {
457            run_event_loop(conn_for_loop, ws_rx, connected_flag).await;
458        });
459
460        info!("Connected to WAAPI at {url}");
461        Ok(Self {
462            conn: Some(conn),
463            event_loop_handle: Some(handle),
464            subscription_ids: Arc::new(StdMutex::new(Vec::new())),
465            connected,
466        })
467    }
468
469    /// Call a WAAPI method.
470    ///
471    /// # Parameters
472    ///
473    /// * `uri` - URI of the WAAPI method, e.g. `"ak.wwise.core.getInfo"` or `ak::wwise::core::GET_INFO`
474    /// * `args` - Optional keyword arguments (`serde_json::Value`, e.g. `json!({...})`)
475    /// * `options` - Optional options dict (`serde_json::Value`)
476    ///
477    /// Returns `Option<Value>`: WAAPI response as JSON; `None` when no result.
478    ///
479    /// ---
480    ///
481    /// 调用 WAAPI 方法。
482    ///
483    /// 返回 `Option<Value>`:WAAPI 响应的 JSON 值;无结果时为 `None`。
484    pub async fn call(
485        &self,
486        uri: &str,
487        args: Option<Value>,
488        options: Option<Value>,
489    ) -> Result<Option<Value>, WaapiError> {
490        let conn = self.conn.as_ref().ok_or(WaapiError::Disconnected)?;
491        let id = conn.next_id();
492        let (tx, rx) = oneshot::channel();
493        conn.pending_calls
494            .lock()
495            .unwrap_or_else(|e| e.into_inner())
496            .insert(id, tx);
497        debug!("Calling WAAPI: {uri} (id={id})");
498        conn.send(wamp::call_msg(id, uri, args.as_ref(), options.as_ref()))
499            .await?;
500        rx.await.unwrap_or(Err(WaapiError::Disconnected))
501    }
502
503    /// Internal subscribe: returns handle and receiver. Used by [WaapiClientSync].
504    pub(crate) async fn subscribe_inner(
505        &self,
506        topic: &str,
507        options: Option<Value>,
508    ) -> Result<
509        (
510            SubscriptionHandle,
511            async_mpsc::UnboundedReceiver<EventPayload>,
512        ),
513        WaapiError,
514    > {
515        let conn = self.conn.as_ref().ok_or(WaapiError::Disconnected)?;
516        let id = conn.next_id();
517        let (tx, rx) = oneshot::channel();
518        conn.pending_subs
519            .lock()
520            .unwrap_or_else(|e| e.into_inner())
521            .insert(id, tx);
522        conn.send(wamp::subscribe_msg(id, topic, options.as_ref()))
523            .await?;
524        let sub_id = rx.await.unwrap_or(Err(WaapiError::Disconnected))?;
525        debug!("Subscribed to {topic} (sub_id={sub_id})");
526
527        let (event_tx, event_rx) = async_mpsc::unbounded_channel();
528        conn.event_senders
529            .lock()
530            .unwrap_or_else(|e| e.into_inner())
531            .insert(sub_id, event_tx);
532        self.subscription_ids
533            .lock()
534            .unwrap_or_else(|e| e.into_inner())
535            .push(sub_id);
536
537        let handle = SubscriptionHandle {
538            sub_id,
539            conn: Arc::clone(conn),
540            subscription_ids: Arc::clone(&self.subscription_ids),
541            recv_task: None,
542            is_unsubscribed: false,
543        };
544        Ok((handle, event_rx))
545    }
546
547    /// Subscribe to a topic with a callback.
548    ///
549    /// The callback runs in a dedicated task with signature `callback(kwargs)`.
550    /// The returned handle is used to cancel; on drop it auto-unsubscribes.
551    ///
552    /// # Parameters
553    ///
554    /// * `topic` - WAMP topic URI
555    /// * `options` - Optional subscription options (`serde_json::Value`)
556    /// * `callback` - Callback invoked on each event with `kwargs` (`Option<Value>`)
557    ///
558    /// ---
559    ///
560    /// 订阅主题并绑定回调(参数为 `Option<Value>`)。
561    pub async fn subscribe<F>(
562        &self,
563        topic: &str,
564        options: Option<Value>,
565        callback: F,
566    ) -> Result<SubscriptionHandle, WaapiError>
567    where
568        F: Fn(Option<Value>) + Send + Sync + 'static,
569    {
570        let (mut handle, mut event_rx) = self.subscribe_inner(topic, options).await?;
571        let recv_task = tokio::spawn(async move {
572            while let Some((_pub_id, kwargs)) = event_rx.recv().await {
573                callback(kwargs);
574            }
575        });
576        handle.recv_task = Some(recv_task);
577        Ok(handle)
578    }
579
580    /// Check whether the client is still connected.
581    ///
582    /// ---
583    ///
584    /// 检查客户端是否仍处于连接状态。
585    #[must_use]
586    pub fn is_connected(&self) -> bool {
587        self.conn.is_some() && self.connected.load(Ordering::Acquire)
588    }
589
590    /// Explicitly disconnect.
591    ///
592    /// **Explicit call is recommended** for graceful shutdown.
593    ///
594    /// ---
595    ///
596    /// 显式断开连接。**推荐显式调用**以确保优雅关闭。
597    pub async fn disconnect(mut self) {
598        info!("Disconnecting from WAAPI");
599        self.cleanup().await;
600        info!("Disconnected from WAAPI");
601    }
602
603    async fn cleanup(&mut self) {
604        self.connected.store(false, Ordering::Release);
605        if let Some(conn) = self.conn.take() {
606            // Unsubscribe all active subscriptions
607            let ids: Vec<u64> = {
608                let mut guard = self.subscription_ids.lock().unwrap_or_else(|e| e.into_inner());
609                std::mem::take(&mut *guard)
610            };
611            for sub_id in ids {
612                let id = conn.next_id();
613                let (tx, rx) = oneshot::channel();
614                conn.pending_unsubs
615                    .lock()
616                    .unwrap_or_else(|e| e.into_inner())
617                    .insert(id, tx);
618                if conn.send(wamp::unsubscribe_msg(id, sub_id)).await.is_ok() {
619                    let _ = rx.await;
620                }
621            }
622            // GOODBYE
623            let _ = conn.send(wamp::goodbye_msg()).await;
624            // Close WebSocket
625            let _ = conn.ws_tx.lock().await.close().await;
626        }
627        if let Some(handle) = self.event_loop_handle.take() {
628            handle.abort();
629        }
630    }
631}
632
633impl Drop for WaapiClient {
634    fn drop(&mut self) {
635        if self.conn.is_some() || self.event_loop_handle.is_some() {
636            let conn = self.conn.take();
637            let event_loop = self.event_loop_handle.take();
638            let subscription_ids = Arc::clone(&self.subscription_ids);
639            let connected = Arc::clone(&self.connected);
640            connected.store(false, Ordering::Release);
641            if let Ok(rt) = tokio::runtime::Handle::try_current() {
642                debug!("WaapiClient dropped, spawning async cleanup");
643                rt.spawn(async move {
644                    if let Some(conn) = conn {
645                        let ids: Vec<u64> = {
646                            let mut guard =
647                                subscription_ids.lock().unwrap_or_else(|e| e.into_inner());
648                            std::mem::take(&mut *guard)
649                        };
650                        for sub_id in ids {
651                            let id = conn.next_id();
652                            let (tx, rx) = oneshot::channel::<UnsubResult>();
653                            conn.pending_unsubs
654                                .lock()
655                                .unwrap_or_else(|e| e.into_inner())
656                                .insert(id, tx);
657                            if conn.send(wamp::unsubscribe_msg(id, sub_id)).await.is_ok() {
658                                let _ = rx.await;
659                            }
660                        }
661                        let _ = conn.send(wamp::goodbye_msg()).await;
662                        let _ = conn.ws_tx.lock().await.close().await;
663                    }
664                    if let Some(h) = event_loop {
665                        h.abort();
666                    }
667                });
668            } else {
669                warn!("WaapiClient dropped without runtime, skipping graceful cleanup");
670                if let Some(h) = event_loop {
671                    h.abort();
672                }
673            }
674        }
675    }
676}
677
678// ── Sync client ───────────────────────────────────────────────────
679
680/// Sync subscription handle: cancels subscriptions created by [WaapiClientSync::subscribe].
681///
682/// Calls [SubscriptionHandleSync::unsubscribe] or drop to cancel and wait for the bridge thread.
683/// **Do not drop this handle inside a callback — it may deadlock.**
684///
685/// ---
686///
687/// 同步订阅句柄。**注意:不要在回调内部 drop 本句柄,否则可能死锁。**
688#[derive(Debug)]
689pub struct SubscriptionHandleSync {
690    runtime: Arc<tokio::runtime::Runtime>,
691    inner: Option<SubscriptionHandle>,
692    bridge_join: Option<thread::JoinHandle<()>>,
693    bridge_thread_id: Option<thread::ThreadId>,
694}
695
696impl SubscriptionHandleSync {
697    /// Cancel the subscription and wait for the event bridge thread to finish.
698    ///
699    /// ---
700    ///
701    /// 取消订阅并等待事件桥接线程结束。
702    pub fn unsubscribe(mut self) -> Result<(), WaapiError> {
703        let inner = self.inner.take();
704        let bridge_join = self.bridge_join.take();
705        if let Some(h) = inner {
706            self.runtime.block_on(h.unsubscribe())?;
707        }
708        if let Some(jh) = bridge_join {
709            let _ = jh.join();
710        }
711        Ok(())
712    }
713}
714
715impl Drop for SubscriptionHandleSync {
716    fn drop(&mut self) {
717        let is_bridge_thread = self.bridge_thread_id.as_ref() == Some(&thread::current().id());
718        let inner = self.inner.take();
719        let bridge_join = self.bridge_join.take();
720        let runtime = Arc::clone(&self.runtime);
721        if let Some(h) = inner {
722            if tokio::runtime::Handle::try_current().is_ok() {
723                warn!("SubscriptionHandleSync dropped inside async context, falling back to spawn");
724                runtime.handle().spawn(async move {
725                    let _ = h.unsubscribe().await;
726                });
727            } else {
728                let _ = runtime.block_on(h.unsubscribe());
729            }
730        }
731        if !is_bridge_thread {
732            if let Some(jh) = bridge_join {
733                let _ = jh.join();
734            }
735        }
736    }
737}
738
739/// Sync WAAPI client.
740///
741/// Provides sync access to the Wwise Authoring API (WAAPI); internally uses a multi-threaded
742/// tokio runtime and wraps [WaapiClient] via `block_on`.
743///
744/// **Explicit [`disconnect`](WaapiClientSync::disconnect) is recommended** for graceful shutdown.
745///
746/// ---
747///
748/// WAAPI 同步客户端。**推荐显式调用 [`disconnect`](WaapiClientSync::disconnect)**。
749#[derive(Debug)]
750pub struct WaapiClientSync {
751    runtime: Arc<tokio::runtime::Runtime>,
752    client: Option<WaapiClient>,
753}
754
755impl WaapiClientSync {
756    /// Connect to WAAPI using the default URL.
757    ///
758    /// ---
759    ///
760    /// 使用默认 URL 连接到 WAAPI。
761    pub fn connect() -> Result<Self, WaapiError> {
762        Self::connect_with_url(DEFAULT_WAAPI_URL)
763    }
764
765    /// Connect to WAAPI at the specified URL.
766    ///
767    /// ---
768    ///
769    /// 使用指定 URL 连接到 WAAPI。
770    pub fn connect_with_url(url: &str) -> Result<Self, WaapiError> {
771        info!("Connecting to WAAPI (sync) at {url}");
772        let runtime = Arc::new(
773            tokio::runtime::Builder::new_multi_thread()
774                .enable_all()
775                .build()?,
776        );
777        let client = runtime.block_on(WaapiClient::connect_with_url(url))?;
778        info!("Connected to WAAPI (sync) at {url}");
779        Ok(Self {
780            runtime,
781            client: Some(client),
782        })
783    }
784
785    /// Call a WAAPI method.
786    ///
787    /// ---
788    ///
789    /// 调用 WAAPI 方法。
790    pub fn call(
791        &self,
792        uri: &str,
793        args: Option<Value>,
794        options: Option<Value>,
795    ) -> Result<Option<Value>, WaapiError> {
796        let client = self.client.as_ref().ok_or(WaapiError::Disconnected)?;
797        self.runtime.block_on(client.call(uri, args, options))
798    }
799
800    /// Subscribe to a topic with a callback.
801    ///
802    /// To unsubscribe: call [SubscriptionHandleSync::unsubscribe] or drop the handle.
803    /// Do not drop the handle inside the callback.
804    ///
805    /// ---
806    ///
807    /// 订阅主题并绑定回调。取消订阅:调用返回的 [SubscriptionHandleSync::unsubscribe],或 drop 句柄。
808    /// 不要在 callback 内 drop 句柄。
809    pub fn subscribe<F>(
810        &self,
811        topic: &str,
812        options: Option<Value>,
813        callback: F,
814    ) -> Result<SubscriptionHandleSync, WaapiError>
815    where
816        F: Fn(Option<Value>) + Send + Sync + 'static,
817    {
818        let client = self.client.as_ref().ok_or(WaapiError::Disconnected)?;
819        let (inner, mut async_rx) = self
820            .runtime
821            .block_on(client.subscribe_inner(topic, options))?;
822        let (id_tx, id_rx) = mpsc::channel();
823        let runtime = Arc::clone(&self.runtime);
824        let bridge_join = thread::spawn(move || {
825            let _ = id_tx.send(thread::current().id());
826            while let Some((_pub_id, kwargs)) = runtime.block_on(async_rx.recv()) {
827                callback(kwargs);
828            }
829        });
830        let bridge_thread_id = id_rx.recv().ok();
831        Ok(SubscriptionHandleSync {
832            runtime: Arc::clone(&self.runtime),
833            inner: Some(inner),
834            bridge_join: Some(bridge_join),
835            bridge_thread_id,
836        })
837    }
838
839    /// Check whether the client is still logically connected.
840    ///
841    /// ---
842    ///
843    /// 检查客户端是否仍处于逻辑连接状态。
844    #[must_use]
845    pub fn is_connected(&self) -> bool {
846        self.client.as_ref().is_some_and(|c| c.is_connected())
847    }
848
849    /// Explicitly disconnect.
850    ///
851    /// ---
852    ///
853    /// 显式断开连接。
854    pub fn disconnect(mut self) {
855        info!("Disconnecting from WAAPI (sync)");
856        if let Some(client) = self.client.take() {
857            self.runtime.block_on(client.disconnect());
858        }
859        info!("Disconnected from WAAPI (sync)");
860    }
861}
862
863impl Drop for WaapiClientSync {
864    fn drop(&mut self) {
865        if let Some(client) = self.client.take() {
866            if tokio::runtime::Handle::try_current().is_ok() {
867                warn!("WaapiClientSync dropped inside async context, offloading cleanup to a dedicated thread");
868                let runtime = Arc::clone(&self.runtime);
869                let _ = thread::Builder::new()
870                    .name("waapi-sync-drop-cleanup".to_string())
871                    .spawn(move || {
872                        runtime.block_on(client.disconnect());
873                    });
874            } else {
875                self.runtime.block_on(client.disconnect());
876            }
877        }
878    }
879}
880
881// ── Tests ─────────────────────────────────────────────────────────
882
883#[cfg(test)]
884mod tests {
885    use super::*;
886
887    #[test]
888    fn test_mark_unsubscribed_is_idempotent() {
889        let mut is_unsubscribed = false;
890        assert!(mark_unsubscribed(&mut is_unsubscribed));
891        assert!(!mark_unsubscribed(&mut is_unsubscribed));
892    }
893
894    #[tokio::test]
895    async fn test_sync_client_drop_inside_async_context_is_safe() {
896        let runtime = Arc::new(
897            tokio::runtime::Builder::new_multi_thread()
898                .enable_all()
899                .build()
900                .expect("failed to create runtime"),
901        );
902        let async_client = WaapiClient {
903            conn: None,
904            event_loop_handle: None,
905            subscription_ids: Arc::new(StdMutex::new(Vec::new())),
906            connected: Arc::new(AtomicBool::new(false)),
907        };
908        let sync_client = WaapiClientSync {
909            runtime,
910            client: Some(async_client),
911        };
912        drop(sync_client);
913    }
914}