plexus_core/plexus/bidirectional/channel.rs
1//! Generic bidirectional channel implementation
2//!
3//! This module provides [`BidirChannel`], the core primitive for server-to-client
4//! requests during streaming RPC execution. It enables interactive workflows
5//! where the server can request input from clients mid-stream.
6//!
7//! # Architecture
8//!
9//! ```text
10//! ┌─────────────┐ ┌─────────────┐
11//! │ Server │ │ Client │
12//! │ (Activation)│ │ (TypeScript)│
13//! └──────┬──────┘ └──────┬──────┘
14//! │ │
15//! │ ctx.confirm("Delete?") │
16//! │ │
17//! ├──────────────────────────────────┤
18//! │ PlexusStreamItem::Request │
19//! │ {type:"request", requestId:..} │
20//! ├─────────────────────────────────►│
21//! │ │
22//! │ ◄── User interaction
23//! │ │
24//! │◄─────────────────────────────────┤
25//! │ _plexus_respond(requestId, │
26//! │ {type:"confirmed",value:true})│
27//! │ │
28//! │ returns Ok(true) │
29//! ▼ ▼
30//! ```
31//!
32//! # Transport Modes
33//!
34//! The channel supports two response routing modes:
35//!
36//! 1. **Global Registry** (default) - Responses routed through [`registry`](super::registry)
37//! - Used for MCP transport (`_plexus_respond` tool)
38//! - Works with any transport that can't maintain channel references
39//!
40//! 2. **Direct Mode** - Responses handled via `handle_response()` method
41//! - Used for WebSocket transport
42//! - Requires direct access to channel instance
43//!
44//! # Thread Safety
45//!
46//! `BidirChannel` is designed for concurrent use:
47//! - Multiple requests can be pending simultaneously
48//! - Thread-safe internal state via `Arc<Mutex<_>>`
49//! - Clone-friendly for passing to async tasks
50
51use std::collections::HashMap;
52use std::marker::PhantomData;
53use std::sync::{Arc, Mutex};
54use std::time::Duration;
55
56use serde::{de::DeserializeOwned, Serialize};
57use serde_json::Value;
58use tokio::sync::{mpsc, oneshot};
59use tokio::time::timeout;
60use uuid::Uuid;
61
62use super::registry::{register_pending_request, unregister_pending_request};
63use super::types::{BidirError, SelectOption, StandardRequest, StandardResponse};
64use crate::plexus::types::PlexusStreamItem;
65
66/// Generic bidirectional channel for type-safe server-to-client requests.
67///
68/// `BidirChannel` is the core primitive for bidirectional communication in Plexus RPC.
69/// It allows server-side code (activations) to request input from clients during
70/// stream execution, enabling interactive workflows.
71///
72/// # Type Parameters
73///
74/// * `Req` - Request type sent server→client. Must implement `Serialize + DeserializeOwned`.
75/// * `Resp` - Response type sent client→server. Must implement `Serialize + DeserializeOwned`.
76///
77/// # Common Type Aliases
78///
79/// For standard UI patterns, use [`StandardBidirChannel`]:
80///
81/// ```rust,ignore
82/// type StandardBidirChannel = BidirChannel<StandardRequest, StandardResponse>;
83/// ```
84///
85/// # Creating Channels
86///
87/// Channels are typically created by the transport layer, not by activations directly.
88/// The `#[hub_method(bidirectional)]` macro injects the appropriate channel type.
89///
90/// ```rust,ignore
91/// // The macro generates this signature:
92/// async fn wizard(&self, ctx: &Arc<StandardBidirChannel>) -> impl Stream<Item = Event> { ... }
93/// ```
94///
95/// # Making Requests
96///
97/// ## Standard Patterns (via StandardBidirChannel)
98///
99/// ```rust,ignore
100/// // Yes/no confirmation
101/// if ctx.confirm("Delete file?").await? {
102/// // User said yes
103/// }
104///
105/// // Text input
106/// let name = ctx.prompt("Enter name:").await?;
107///
108/// // Selection
109/// let options = vec![
110/// SelectOption::new("dev", "Development"),
111/// SelectOption::new("prod", "Production"),
112/// ];
113/// let selected = ctx.select("Choose env:", options).await?;
114/// ```
115///
116/// ## Custom Types
117///
118/// ```rust,ignore
119/// // Define custom request/response
120/// #[derive(Serialize, Deserialize)]
121/// enum ImageReq { ChooseQuality { min: u8, max: u8 } }
122///
123/// #[derive(Serialize, Deserialize)]
124/// enum ImageResp { Quality(u8), Cancel }
125///
126/// // Use in activation
127/// async fn process(ctx: &BidirChannel<ImageReq, ImageResp>) {
128/// let quality = ctx.request(ImageReq::ChooseQuality { min: 50, max: 100 }).await?;
129/// }
130/// ```
131///
132/// # Error Handling
133///
134/// Always handle [`BidirError::NotSupported`] for transports that don't support
135/// bidirectional communication:
136///
137/// ```rust,ignore
138/// match ctx.confirm("Proceed?").await {
139/// Ok(true) => { /* confirmed */ }
140/// Ok(false) => { /* declined */ }
141/// Err(BidirError::NotSupported) => {
142/// // Non-interactive transport - use safe default
143/// }
144/// Err(BidirError::Cancelled) => {
145/// // User cancelled
146/// }
147/// Err(e) => {
148/// // Other error
149/// }
150/// }
151/// ```
152///
153/// # Timeouts
154///
155/// Default timeout is 30 seconds. Use `request_with_timeout` for custom timeouts:
156///
157/// ```rust,ignore
158/// use std::time::Duration;
159///
160/// // Quick timeout for automated scenarios
161/// ctx.request_with_timeout(req, Duration::from_secs(10)).await?;
162///
163/// // Extended timeout for complex decisions
164/// ctx.request_with_timeout(req, Duration::from_secs(120)).await?;
165/// ```
166///
167/// # Thread Safety
168///
169/// `BidirChannel` uses `Arc<Mutex<_>>` internally and is safe to share across tasks.
170/// Multiple requests can be pending simultaneously.
171pub struct BidirChannel<Req, Resp>
172where
173 Req: Serialize + DeserializeOwned + Send + 'static,
174 Resp: Serialize + DeserializeOwned + Send + 'static,
175{
176 /// Channel to send PlexusStreamItems (including Request items)
177 stream_tx: mpsc::Sender<PlexusStreamItem>,
178
179 /// Pending requests waiting for responses
180 /// Maps request_id -> oneshot channel for response
181 pending: Arc<Mutex<HashMap<String, oneshot::Sender<Resp>>>>,
182
183 /// Whether bidirectional communication is supported by transport
184 bidirectional_supported: bool,
185
186 /// Whether to use global registry for response routing (for MCP transport)
187 /// When true, responses come through the global registry instead of handle_response()
188 use_global_registry: bool,
189
190 /// Provenance path (for debugging/logging)
191 provenance: Vec<String>,
192
193 /// Plexus hash (for metadata)
194 plexus_hash: String,
195
196 /// Phantom data to hold Req type parameter
197 _phantom_req: PhantomData<Req>,
198}
199
200/// Type alias for standard interactive UI patterns.
201///
202/// `StandardBidirChannel` provides convenient methods for common interactions:
203///
204/// - [`confirm()`](Self::confirm) - Yes/no confirmation
205/// - [`prompt()`](Self::prompt) - Text input
206/// - [`select()`](Self::select) - Selection from options
207///
208/// # Example
209///
210/// ```rust,ignore
211/// use plexus_core::plexus::bidirectional::{StandardBidirChannel, SelectOption};
212///
213/// async fn wizard(ctx: &StandardBidirChannel) {
214/// // Step 1: Get name
215/// let name = ctx.prompt("Enter project name:").await?;
216///
217/// // Step 2: Select template
218/// let templates = vec![
219/// SelectOption::new("minimal", "Minimal"),
220/// SelectOption::new("full", "Full Featured"),
221/// ];
222/// let template = ctx.select("Choose template:", templates).await?;
223///
224/// // Step 3: Confirm creation
225/// if ctx.confirm(&format!("Create '{}' with {} template?", name, template[0])).await? {
226/// // Create project
227/// }
228/// }
229/// ```
230///
231/// # Transport Requirements
232///
233/// The underlying transport must support bidirectional communication.
234/// If not, all methods return `Err(BidirError::NotSupported)`.
235pub type StandardBidirChannel = BidirChannel<StandardRequest, StandardResponse>;
236
237impl<Req, Resp> BidirChannel<Req, Resp>
238where
239 Req: Serialize + DeserializeOwned + Send + 'static,
240 Resp: Serialize + DeserializeOwned + Send + 'static,
241{
242 /// Create a new bidirectional channel
243 ///
244 /// By default, uses the global response registry which works with all transport types:
245 /// - MCP: Responses come through `_plexus_respond` tool → global registry
246 /// - WebSocket: Responses can also use global registry via `handle_pending_response()`
247 ///
248 /// Use `new_direct()` if you need direct response handling (for testing or specific transports).
249 pub fn new(
250 stream_tx: mpsc::Sender<PlexusStreamItem>,
251 bidirectional_supported: bool,
252 provenance: Vec<String>,
253 plexus_hash: String,
254 ) -> Self {
255 Self {
256 stream_tx,
257 pending: Arc::new(Mutex::new(HashMap::new())),
258 bidirectional_supported,
259 use_global_registry: true, // Use global registry by default for transport compatibility
260 provenance,
261 plexus_hash,
262 _phantom_req: PhantomData,
263 }
264 }
265
266 /// Create a bidirectional channel that uses direct response handling
267 ///
268 /// Responses must be delivered via `handle_response()` method on this channel instance.
269 /// Use this for testing or when you have direct access to the channel for responses.
270 pub fn new_direct(
271 stream_tx: mpsc::Sender<PlexusStreamItem>,
272 bidirectional_supported: bool,
273 provenance: Vec<String>,
274 plexus_hash: String,
275 ) -> Self {
276 Self {
277 stream_tx,
278 pending: Arc::new(Mutex::new(HashMap::new())),
279 bidirectional_supported,
280 use_global_registry: false,
281 provenance,
282 plexus_hash,
283 _phantom_req: PhantomData,
284 }
285 }
286
287 /// Check if bidirectional communication is supported
288 pub fn is_bidirectional(&self) -> bool {
289 self.bidirectional_supported
290 }
291
292 /// Make a bidirectional request with default timeout (30s)
293 ///
294 /// Sends a request to the client and waits for response.
295 /// Returns error if transport doesn't support bidirectional or timeout occurs.
296 pub async fn request(&self, req: Req) -> Result<Resp, BidirError> {
297 self.request_with_timeout(req, Duration::from_secs(30))
298 .await
299 }
300
301 /// Make a bidirectional request with custom timeout
302 pub async fn request_with_timeout(
303 &self,
304 req: Req,
305 timeout_duration: Duration,
306 ) -> Result<Resp, BidirError> {
307 if !self.bidirectional_supported {
308 return Err(BidirError::NotSupported);
309 }
310
311 // Generate unique request ID
312 let request_id = Uuid::new_v4().to_string();
313
314 // Serialize request
315 let request_data = serde_json::to_value(&req)
316 .map_err(|e| BidirError::Serialization(e.to_string()))?;
317
318 let timeout_ms = timeout_duration.as_millis() as u64;
319
320 if self.use_global_registry {
321 // Use global registry for response routing (MCP transport)
322 self.request_via_registry(request_id, request_data, timeout_duration, timeout_ms)
323 .await
324 } else {
325 // Use internal pending map (WebSocket/direct transport)
326 self.request_direct(request_id, request_data, timeout_duration, timeout_ms)
327 .await
328 }
329 }
330
331 /// Request using internal pending map (for direct transports like WebSocket)
332 async fn request_direct(
333 &self,
334 request_id: String,
335 request_data: Value,
336 timeout_duration: Duration,
337 timeout_ms: u64,
338 ) -> Result<Resp, BidirError> {
339 // Create oneshot channel for response
340 let (tx, rx) = oneshot::channel();
341
342 // Register pending request in internal map
343 self.pending.lock().unwrap().insert(request_id.clone(), tx);
344
345 // Send Request stream item
346 self.stream_tx
347 .send(PlexusStreamItem::request(
348 request_id.clone(),
349 request_data,
350 timeout_ms,
351 ))
352 .await
353 .map_err(|e| BidirError::Transport(format!("Failed to send request: {}", e)))?;
354
355 // Wait for response (or timeout)
356 match timeout(timeout_duration, rx).await {
357 Ok(Ok(resp)) => Ok(resp),
358 Ok(Err(_)) => {
359 // Channel closed before response
360 self.pending.lock().unwrap().remove(&request_id);
361 Err(BidirError::ChannelClosed)
362 }
363 Err(_) => {
364 // Timeout
365 self.pending.lock().unwrap().remove(&request_id);
366 Err(BidirError::Timeout(timeout_ms))
367 }
368 }
369 }
370
371 /// Request using global registry (for MCP transport via _plexus_respond tool)
372 async fn request_via_registry(
373 &self,
374 request_id: String,
375 request_data: Value,
376 timeout_duration: Duration,
377 timeout_ms: u64,
378 ) -> Result<Resp, BidirError> {
379 // Create oneshot channel for Value response (type-erased)
380 let (tx, rx) = oneshot::channel::<Value>();
381
382 // Register in global registry
383 register_pending_request(request_id.clone(), tx);
384
385 // Send Request stream item
386 if let Err(e) = self
387 .stream_tx
388 .send(PlexusStreamItem::request(
389 request_id.clone(),
390 request_data,
391 timeout_ms,
392 ))
393 .await
394 {
395 // Clean up on failure
396 unregister_pending_request(&request_id);
397 return Err(BidirError::Transport(format!("Failed to send request: {}", e)));
398 }
399
400 // Wait for response (or timeout)
401 match timeout(timeout_duration, rx).await {
402 Ok(Ok(value)) => {
403 // Deserialize Value to typed response
404 serde_json::from_value(value).map_err(|e| BidirError::TypeMismatch {
405 expected: std::any::type_name::<Resp>().to_string(),
406 got: e.to_string(),
407 })
408 }
409 Ok(Err(_)) => {
410 // Channel closed before response
411 unregister_pending_request(&request_id);
412 Err(BidirError::ChannelClosed)
413 }
414 Err(_) => {
415 // Timeout - clean up from registry
416 unregister_pending_request(&request_id);
417 Err(BidirError::Timeout(timeout_ms))
418 }
419 }
420 }
421
422 /// Handle a response from the client
423 ///
424 /// Called by transport layer when client responds to a request.
425 /// Deserializes response and sends it through the pending request's channel.
426 pub fn handle_response(
427 &self,
428 request_id: String,
429 response_data: Value,
430 ) -> Result<(), BidirError> {
431 // Look up pending request
432 let tx = self
433 .pending
434 .lock()
435 .unwrap()
436 .remove(&request_id)
437 .ok_or(BidirError::UnknownRequest)?;
438
439 // Deserialize response
440 let resp: Resp = serde_json::from_value(response_data).map_err(|e| {
441 BidirError::TypeMismatch {
442 expected: std::any::type_name::<Resp>().to_string(),
443 got: e.to_string(),
444 }
445 })?;
446
447 // Send response through channel (unblocks request() call)
448 tx.send(resp).map_err(|_| BidirError::ChannelClosed)?;
449
450 Ok(())
451 }
452
453 /// Get provenance path (for debugging)
454 pub fn provenance(&self) -> &[String] {
455 &self.provenance
456 }
457
458 /// Get plexus hash (for metadata)
459 pub fn plexus_hash(&self) -> &str {
460 &self.plexus_hash
461 }
462}
463
464// Convenience methods for StandardBidirChannel
465impl BidirChannel<StandardRequest, StandardResponse> {
466 /// Ask user for yes/no confirmation
467 ///
468 /// # Examples
469 ///
470 /// ```rust,ignore
471 /// if ctx.confirm("Delete this file?").await? {
472 /// // user confirmed
473 /// }
474 /// ```
475 pub async fn confirm(&self, message: &str) -> Result<bool, BidirError> {
476 let resp = self
477 .request(StandardRequest::Confirm {
478 message: message.to_string(),
479 default: None,
480 })
481 .await?;
482
483 match resp {
484 StandardResponse::Confirmed { value } => Ok(value),
485 StandardResponse::Cancelled => Err(BidirError::Cancelled),
486 _ => Err(BidirError::TypeMismatch {
487 expected: "Confirmed".into(),
488 got: format!("{:?}", resp),
489 }),
490 }
491 }
492
493 /// Ask user for text input
494 ///
495 /// Returns the user's input as a `serde_json::Value`. For most prompts,
496 /// this will be a `Value::String`. Use `.as_str()` or `.to_string()` to
497 /// extract the string content.
498 ///
499 /// # Examples
500 ///
501 /// ```rust,ignore
502 /// let name_val = ctx.prompt("Enter your name:").await?;
503 /// let name = name_val.as_str().unwrap_or("").to_string();
504 /// ```
505 pub async fn prompt(&self, message: &str) -> Result<String, BidirError> {
506 let resp = self
507 .request(StandardRequest::Prompt {
508 message: message.to_string(),
509 default: None,
510 placeholder: None,
511 })
512 .await?;
513
514 match resp {
515 StandardResponse::Text { value } => {
516 // Extract string from Value for convenience
517 match value {
518 serde_json::Value::String(s) => Ok(s),
519 other => Ok(other.to_string()),
520 }
521 }
522 StandardResponse::Cancelled => Err(BidirError::Cancelled),
523 _ => Err(BidirError::TypeMismatch {
524 expected: "Text".into(),
525 got: format!("{:?}", resp),
526 }),
527 }
528 }
529
530 /// Ask user to select from options
531 ///
532 /// Returns the selected values as strings. Each `SelectOption` value
533 /// is converted from `serde_json::Value` to `String`.
534 ///
535 /// # Examples
536 ///
537 /// ```rust,ignore
538 /// let options = vec![
539 /// SelectOption::new("dev", "Development"),
540 /// SelectOption::new("prod", "Production"),
541 /// ];
542 /// let selected = ctx.select("Choose environment:", options).await?;
543 /// ```
544 pub async fn select(
545 &self,
546 message: &str,
547 options: Vec<SelectOption>,
548 ) -> Result<Vec<String>, BidirError> {
549 let resp = self
550 .request(StandardRequest::Select {
551 message: message.to_string(),
552 options,
553 multi_select: false,
554 })
555 .await?;
556
557 match resp {
558 StandardResponse::Selected { values } => {
559 // Convert each Value to String for convenience
560 let strings = values
561 .into_iter()
562 .map(|v| match v {
563 serde_json::Value::String(s) => s,
564 other => other.to_string(),
565 })
566 .collect();
567 Ok(strings)
568 }
569 StandardResponse::Cancelled => Err(BidirError::Cancelled),
570 _ => Err(BidirError::TypeMismatch {
571 expected: "Selected".into(),
572 got: format!("{:?}", resp),
573 }),
574 }
575 }
576}
577
578/// Bidirectional channel with fallback when transport doesn't support bidirectional
579///
580/// Wraps a BidirChannel and provides fallback values when bidirectional
581/// requests fail due to NotSupported error.
582pub struct BidirWithFallback<Req, Resp>
583where
584 Req: Serialize + DeserializeOwned + Send + 'static,
585 Resp: Serialize + DeserializeOwned + Send + 'static,
586{
587 channel: Arc<BidirChannel<Req, Resp>>,
588 fallback_fn: Box<dyn Fn(&Req) -> Resp + Send + Sync>,
589}
590
591impl<Req, Resp> BidirWithFallback<Req, Resp>
592where
593 Req: Serialize + DeserializeOwned + Send + 'static,
594 Resp: Serialize + DeserializeOwned + Send + 'static,
595{
596 /// Create a new fallback wrapper with custom fallback function
597 pub fn new(
598 channel: Arc<BidirChannel<Req, Resp>>,
599 fallback: impl Fn(&Req) -> Resp + Send + Sync + 'static,
600 ) -> Self {
601 Self {
602 channel,
603 fallback_fn: Box::new(fallback),
604 }
605 }
606
607 /// Make a request, using fallback if bidirectional not supported
608 pub async fn request(&self, req: Req) -> Resp
609 where
610 Req: Clone,
611 {
612 match self.channel.request(req.clone()).await {
613 Ok(resp) => resp,
614 Err(BidirError::NotSupported) | Err(BidirError::Timeout(_)) => {
615 (self.fallback_fn)(&req)
616 }
617 Err(_) => (self.fallback_fn)(&req),
618 }
619 }
620}
621
622// Helper for StandardBidirChannel fallbacks
623impl BidirWithFallback<StandardRequest, StandardResponse> {
624 /// Create fallback that auto-confirms all requests
625 pub fn auto_confirm(
626 channel: Arc<BidirChannel<StandardRequest, StandardResponse>>,
627 ) -> Self {
628 Self::new(channel, |req| match req {
629 StandardRequest::Confirm { default, .. } => StandardResponse::Confirmed {
630 value: default.unwrap_or(true),
631 },
632 StandardRequest::Prompt { default, .. } => StandardResponse::Text {
633 value: default.clone().unwrap_or(serde_json::Value::String(String::new())),
634 },
635 StandardRequest::Select { options, .. } => StandardResponse::Selected {
636 values: vec![options
637 .first()
638 .map(|o| o.value.clone())
639 .unwrap_or(serde_json::Value::String(String::new()))],
640 },
641 StandardRequest::Custom { data } => StandardResponse::Custom { data: data.clone() },
642 })
643 }
644}
645
646#[cfg(test)]
647mod tests {
648 use super::*;
649
650 #[tokio::test]
651 async fn test_bidir_channel_not_supported() {
652 let (tx, _rx) = mpsc::channel(32);
653 let channel: BidirChannel<StandardRequest, StandardResponse> =
654 BidirChannel::new_direct(tx, false, vec!["test".into()], "hash".into());
655
656 let result = channel.confirm("Test?").await;
657 assert!(matches!(result, Err(BidirError::NotSupported)));
658 }
659
660 #[tokio::test]
661 async fn test_bidir_request_response() {
662 let (tx, mut rx) = mpsc::channel(32);
663 let channel: Arc<BidirChannel<StandardRequest, StandardResponse>> = Arc::new(BidirChannel::new_direct(
664 tx,
665 true,
666 vec!["test".into()],
667 "hash".into(),
668 ));
669
670 // Spawn request in background
671 let channel_clone = channel.clone();
672 let handle = tokio::spawn(async move {
673 channel_clone
674 .request(StandardRequest::Confirm {
675 message: "Test?".into(),
676 default: None,
677 })
678 .await
679 });
680
681 // Receive request
682 if let Some(PlexusStreamItem::Request {
683 request_id,
684 request_data,
685 ..
686 }) = rx.recv().await
687 {
688 // Verify request
689 let req: StandardRequest = serde_json::from_value(request_data).unwrap();
690 assert!(matches!(req, StandardRequest::Confirm { .. }));
691
692 // Send response
693 channel
694 .handle_response(
695 request_id,
696 serde_json::to_value(&StandardResponse::<serde_json::Value>::Confirmed {
697 value: true,
698 })
699 .unwrap(),
700 )
701 .unwrap();
702 } else {
703 panic!("Expected Request item");
704 }
705
706 // Verify response received
707 let result: StandardResponse = handle.await.unwrap().unwrap();
708 assert_eq!(result, StandardResponse::Confirmed { value: true });
709 }
710
711 #[tokio::test]
712 async fn test_convenience_methods() {
713 let (tx, mut rx) = mpsc::channel(32);
714 let channel: Arc<StandardBidirChannel> = Arc::new(BidirChannel::new_direct(
715 tx,
716 true,
717 vec!["test".into()],
718 "hash".into(),
719 ));
720
721 // Test confirm()
722 let channel_clone = channel.clone();
723 let handle = tokio::spawn(async move { channel_clone.confirm("Delete?").await });
724
725 if let Some(PlexusStreamItem::Request { request_id, .. }) = rx.recv().await {
726 channel
727 .handle_response(
728 request_id,
729 serde_json::to_value(&StandardResponse::<serde_json::Value>::Confirmed {
730 value: true,
731 })
732 .unwrap(),
733 )
734 .unwrap();
735 }
736
737 assert_eq!(handle.await.unwrap().unwrap(), true);
738 }
739
740 #[tokio::test]
741 async fn test_timeout() {
742 let (tx, _rx) = mpsc::channel(32);
743 let channel: BidirChannel<StandardRequest, StandardResponse> =
744 BidirChannel::new_direct(tx, true, vec!["test".into()], "hash".into());
745
746 let result = channel
747 .request_with_timeout(
748 StandardRequest::Confirm {
749 message: "Test?".into(),
750 default: None,
751 },
752 Duration::from_millis(100),
753 )
754 .await;
755
756 assert!(matches!(result, Err(BidirError::Timeout(100))));
757 }
758
759 #[tokio::test]
760 async fn test_fallback() {
761 let (tx, _rx) = mpsc::channel(32);
762 let channel = Arc::new(BidirChannel::new_direct(
763 tx,
764 false, // not supported
765 vec!["test".into()],
766 "hash".into(),
767 ));
768
769 let fallback = BidirWithFallback::auto_confirm(channel);
770
771 let resp = fallback
772 .request(StandardRequest::Confirm {
773 message: "Test?".into(),
774 default: Some(false),
775 })
776 .await;
777
778 assert_eq!(resp, StandardResponse::Confirmed { value: false });
779 }
780}