Skip to main content

plexus_core/plexus/bidirectional/
channel.rs

1//! Generic bidirectional channel implementation
2//!
3//! This module provides [`BidirChannel`], the core primitive for server-to-client
4//! requests during streaming RPC execution. It enables interactive workflows
5//! where the server can request input from clients mid-stream.
6//!
7//! # Architecture
8//!
9//! ```text
10//! ┌─────────────┐                    ┌─────────────┐
11//! │   Server    │                    │   Client    │
12//! │ (Activation)│                    │ (TypeScript)│
13//! └──────┬──────┘                    └──────┬──────┘
14//!        │                                  │
15//!        │  ctx.confirm("Delete?")          │
16//!        │                                  │
17//!        ├──────────────────────────────────┤
18//!        │  PlexusStreamItem::Request       │
19//!        │  {type:"request", requestId:..}  │
20//!        ├─────────────────────────────────►│
21//!        │                                  │
22//!        │              ◄── User interaction
23//!        │                                  │
24//!        │◄─────────────────────────────────┤
25//!        │  _plexus_respond(requestId,      │
26//!        │    {type:"confirmed",value:true})│
27//!        │                                  │
28//!        │  returns Ok(true)                │
29//!        ▼                                  ▼
30//! ```
31//!
32//! # Transport Modes
33//!
34//! The channel supports two response routing modes:
35//!
36//! 1. **Global Registry** (default) - Responses routed through [`registry`](super::registry)
37//!    - Used for MCP transport (`_plexus_respond` tool)
38//!    - Works with any transport that can't maintain channel references
39//!
40//! 2. **Direct Mode** - Responses handled via `handle_response()` method
41//!    - Used for WebSocket transport
42//!    - Requires direct access to channel instance
43//!
44//! # Thread Safety
45//!
46//! `BidirChannel` is designed for concurrent use:
47//! - Multiple requests can be pending simultaneously
48//! - Thread-safe internal state via `Arc<Mutex<_>>`
49//! - Clone-friendly for passing to async tasks
50
51use std::collections::HashMap;
52use std::marker::PhantomData;
53use std::sync::{Arc, Mutex};
54use std::time::Duration;
55
56use serde::{de::DeserializeOwned, Serialize};
57use serde_json::Value;
58use tokio::sync::{mpsc, oneshot};
59use tokio::time::timeout;
60use uuid::Uuid;
61
62use super::registry::{register_pending_request, unregister_pending_request};
63use super::types::{BidirError, SelectOption, StandardRequest, StandardResponse};
64use crate::plexus::types::PlexusStreamItem;
65
66/// Generic bidirectional channel for type-safe server-to-client requests.
67///
68/// `BidirChannel` is the core primitive for bidirectional communication in Plexus RPC.
69/// It allows server-side code (activations) to request input from clients during
70/// stream execution, enabling interactive workflows.
71///
72/// # Type Parameters
73///
74/// * `Req` - Request type sent server→client. Must implement `Serialize + DeserializeOwned`.
75/// * `Resp` - Response type sent client→server. Must implement `Serialize + DeserializeOwned`.
76///
77/// # Common Type Aliases
78///
79/// For standard UI patterns, use [`StandardBidirChannel`]:
80///
81/// ```rust,ignore
82/// type StandardBidirChannel = BidirChannel<StandardRequest, StandardResponse>;
83/// ```
84///
85/// # Creating Channels
86///
87/// Channels are typically created by the transport layer, not by activations directly.
88/// The `#[hub_method(bidirectional)]` macro injects the appropriate channel type.
89///
90/// ```rust,ignore
91/// // The macro generates this signature:
92/// async fn wizard(&self, ctx: &Arc<StandardBidirChannel>) -> impl Stream<Item = Event> { ... }
93/// ```
94///
95/// # Making Requests
96///
97/// ## Standard Patterns (via StandardBidirChannel)
98///
99/// ```rust,ignore
100/// // Yes/no confirmation
101/// if ctx.confirm("Delete file?").await? {
102///     // User said yes
103/// }
104///
105/// // Text input
106/// let name = ctx.prompt("Enter name:").await?;
107///
108/// // Selection
109/// let options = vec![
110///     SelectOption::new("dev", "Development"),
111///     SelectOption::new("prod", "Production"),
112/// ];
113/// let selected = ctx.select("Choose env:", options).await?;
114/// ```
115///
116/// ## Custom Types
117///
118/// ```rust,ignore
119/// // Define custom request/response
120/// #[derive(Serialize, Deserialize)]
121/// enum ImageReq { ChooseQuality { min: u8, max: u8 } }
122///
123/// #[derive(Serialize, Deserialize)]
124/// enum ImageResp { Quality(u8), Cancel }
125///
126/// // Use in activation
127/// async fn process(ctx: &BidirChannel<ImageReq, ImageResp>) {
128///     let quality = ctx.request(ImageReq::ChooseQuality { min: 50, max: 100 }).await?;
129/// }
130/// ```
131///
132/// # Error Handling
133///
134/// Always handle [`BidirError::NotSupported`] for transports that don't support
135/// bidirectional communication:
136///
137/// ```rust,ignore
138/// match ctx.confirm("Proceed?").await {
139///     Ok(true) => { /* confirmed */ }
140///     Ok(false) => { /* declined */ }
141///     Err(BidirError::NotSupported) => {
142///         // Non-interactive transport - use safe default
143///     }
144///     Err(BidirError::Cancelled) => {
145///         // User cancelled
146///     }
147///     Err(e) => {
148///         // Other error
149///     }
150/// }
151/// ```
152///
153/// # Timeouts
154///
155/// Default timeout is 30 seconds. Use `request_with_timeout` for custom timeouts:
156///
157/// ```rust,ignore
158/// use std::time::Duration;
159///
160/// // Quick timeout for automated scenarios
161/// ctx.request_with_timeout(req, Duration::from_secs(10)).await?;
162///
163/// // Extended timeout for complex decisions
164/// ctx.request_with_timeout(req, Duration::from_secs(120)).await?;
165/// ```
166///
167/// # Thread Safety
168///
169/// `BidirChannel` uses `Arc<Mutex<_>>` internally and is safe to share across tasks.
170/// Multiple requests can be pending simultaneously.
171pub struct BidirChannel<Req, Resp>
172where
173    Req: Serialize + DeserializeOwned + Send + 'static,
174    Resp: Serialize + DeserializeOwned + Send + 'static,
175{
176    /// Channel to send PlexusStreamItems (including Request items)
177    stream_tx: mpsc::Sender<PlexusStreamItem>,
178
179    /// Pending requests waiting for responses
180    /// Maps request_id -> oneshot channel for response
181    pending: Arc<Mutex<HashMap<String, oneshot::Sender<Resp>>>>,
182
183    /// Whether bidirectional communication is supported by transport
184    bidirectional_supported: bool,
185
186    /// Whether to use global registry for response routing (for MCP transport)
187    /// When true, responses come through the global registry instead of handle_response()
188    use_global_registry: bool,
189
190    /// Provenance path (for debugging/logging)
191    provenance: Vec<String>,
192
193    /// Plexus hash (for metadata)
194    plexus_hash: String,
195
196    /// Phantom data to hold Req type parameter
197    _phantom_req: PhantomData<Req>,
198}
199
200/// Type alias for standard interactive UI patterns.
201///
202/// `StandardBidirChannel` provides convenient methods for common interactions:
203///
204/// - [`confirm()`](Self::confirm) - Yes/no confirmation
205/// - [`prompt()`](Self::prompt) - Text input
206/// - [`select()`](Self::select) - Selection from options
207///
208/// # Example
209///
210/// ```rust,ignore
211/// use plexus_core::plexus::bidirectional::{StandardBidirChannel, SelectOption};
212///
213/// async fn wizard(ctx: &StandardBidirChannel) {
214///     // Step 1: Get name
215///     let name = ctx.prompt("Enter project name:").await?;
216///
217///     // Step 2: Select template
218///     let templates = vec![
219///         SelectOption::new("minimal", "Minimal"),
220///         SelectOption::new("full", "Full Featured"),
221///     ];
222///     let template = ctx.select("Choose template:", templates).await?;
223///
224///     // Step 3: Confirm creation
225///     if ctx.confirm(&format!("Create '{}' with {} template?", name, template[0])).await? {
226///         // Create project
227///     }
228/// }
229/// ```
230///
231/// # Transport Requirements
232///
233/// The underlying transport must support bidirectional communication.
234/// If not, all methods return `Err(BidirError::NotSupported)`.
235pub type StandardBidirChannel = BidirChannel<StandardRequest, StandardResponse>;
236
237impl<Req, Resp> BidirChannel<Req, Resp>
238where
239    Req: Serialize + DeserializeOwned + Send + 'static,
240    Resp: Serialize + DeserializeOwned + Send + 'static,
241{
242    /// Create a new bidirectional channel
243    ///
244    /// By default, uses the global response registry which works with all transport types:
245    /// - MCP: Responses come through `_plexus_respond` tool → global registry
246    /// - WebSocket: Responses can also use global registry via `handle_pending_response()`
247    ///
248    /// Use `new_direct()` if you need direct response handling (for testing or specific transports).
249    pub fn new(
250        stream_tx: mpsc::Sender<PlexusStreamItem>,
251        bidirectional_supported: bool,
252        provenance: Vec<String>,
253        plexus_hash: String,
254    ) -> Self {
255        Self {
256            stream_tx,
257            pending: Arc::new(Mutex::new(HashMap::new())),
258            bidirectional_supported,
259            use_global_registry: true, // Use global registry by default for transport compatibility
260            provenance,
261            plexus_hash,
262            _phantom_req: PhantomData,
263        }
264    }
265
266    /// Create a bidirectional channel that uses direct response handling
267    ///
268    /// Responses must be delivered via `handle_response()` method on this channel instance.
269    /// Use this for testing or when you have direct access to the channel for responses.
270    pub fn new_direct(
271        stream_tx: mpsc::Sender<PlexusStreamItem>,
272        bidirectional_supported: bool,
273        provenance: Vec<String>,
274        plexus_hash: String,
275    ) -> Self {
276        Self {
277            stream_tx,
278            pending: Arc::new(Mutex::new(HashMap::new())),
279            bidirectional_supported,
280            use_global_registry: false,
281            provenance,
282            plexus_hash,
283            _phantom_req: PhantomData,
284        }
285    }
286
287    /// Check if bidirectional communication is supported
288    pub fn is_bidirectional(&self) -> bool {
289        self.bidirectional_supported
290    }
291
292    /// Make a bidirectional request with default timeout (30s)
293    ///
294    /// Sends a request to the client and waits for response.
295    /// Returns error if transport doesn't support bidirectional or timeout occurs.
296    pub async fn request(&self, req: Req) -> Result<Resp, BidirError> {
297        self.request_with_timeout(req, Duration::from_secs(30))
298            .await
299    }
300
301    /// Make a bidirectional request with custom timeout
302    pub async fn request_with_timeout(
303        &self,
304        req: Req,
305        timeout_duration: Duration,
306    ) -> Result<Resp, BidirError> {
307        if !self.bidirectional_supported {
308            return Err(BidirError::NotSupported);
309        }
310
311        // Generate unique request ID
312        let request_id = Uuid::new_v4().to_string();
313
314        // Serialize request
315        let request_data = serde_json::to_value(&req)
316            .map_err(|e| BidirError::Serialization(e.to_string()))?;
317
318        let timeout_ms = timeout_duration.as_millis() as u64;
319
320        if self.use_global_registry {
321            // Use global registry for response routing (MCP transport)
322            self.request_via_registry(request_id, request_data, timeout_duration, timeout_ms)
323                .await
324        } else {
325            // Use internal pending map (WebSocket/direct transport)
326            self.request_direct(request_id, request_data, timeout_duration, timeout_ms)
327                .await
328        }
329    }
330
331    /// Request using internal pending map (for direct transports like WebSocket)
332    async fn request_direct(
333        &self,
334        request_id: String,
335        request_data: Value,
336        timeout_duration: Duration,
337        timeout_ms: u64,
338    ) -> Result<Resp, BidirError> {
339        // Create oneshot channel for response
340        let (tx, rx) = oneshot::channel();
341
342        // Register pending request in internal map
343        self.pending.lock().unwrap().insert(request_id.clone(), tx);
344
345        // Send Request stream item
346        self.stream_tx
347            .send(PlexusStreamItem::request(
348                request_id.clone(),
349                request_data,
350                timeout_ms,
351            ))
352            .await
353            .map_err(|e| BidirError::Transport(format!("Failed to send request: {}", e)))?;
354
355        // Wait for response (or timeout)
356        match timeout(timeout_duration, rx).await {
357            Ok(Ok(resp)) => Ok(resp),
358            Ok(Err(_)) => {
359                // Channel closed before response
360                self.pending.lock().unwrap().remove(&request_id);
361                Err(BidirError::ChannelClosed)
362            }
363            Err(_) => {
364                // Timeout
365                self.pending.lock().unwrap().remove(&request_id);
366                Err(BidirError::Timeout(timeout_ms))
367            }
368        }
369    }
370
371    /// Request using global registry (for MCP transport via _plexus_respond tool)
372    async fn request_via_registry(
373        &self,
374        request_id: String,
375        request_data: Value,
376        timeout_duration: Duration,
377        timeout_ms: u64,
378    ) -> Result<Resp, BidirError> {
379        // Create oneshot channel for Value response (type-erased)
380        let (tx, rx) = oneshot::channel::<Value>();
381
382        // Register in global registry
383        register_pending_request(request_id.clone(), tx);
384
385        // Send Request stream item
386        if let Err(e) = self
387            .stream_tx
388            .send(PlexusStreamItem::request(
389                request_id.clone(),
390                request_data,
391                timeout_ms,
392            ))
393            .await
394        {
395            // Clean up on failure
396            unregister_pending_request(&request_id);
397            return Err(BidirError::Transport(format!("Failed to send request: {}", e)));
398        }
399
400        // Wait for response (or timeout)
401        match timeout(timeout_duration, rx).await {
402            Ok(Ok(value)) => {
403                // Deserialize Value to typed response
404                serde_json::from_value(value).map_err(|e| BidirError::TypeMismatch {
405                    expected: std::any::type_name::<Resp>().to_string(),
406                    got: e.to_string(),
407                })
408            }
409            Ok(Err(_)) => {
410                // Channel closed before response
411                unregister_pending_request(&request_id);
412                Err(BidirError::ChannelClosed)
413            }
414            Err(_) => {
415                // Timeout - clean up from registry
416                unregister_pending_request(&request_id);
417                Err(BidirError::Timeout(timeout_ms))
418            }
419        }
420    }
421
422    /// Handle a response from the client
423    ///
424    /// Called by transport layer when client responds to a request.
425    /// Deserializes response and sends it through the pending request's channel.
426    pub fn handle_response(
427        &self,
428        request_id: String,
429        response_data: Value,
430    ) -> Result<(), BidirError> {
431        // Look up pending request
432        let tx = self
433            .pending
434            .lock()
435            .unwrap()
436            .remove(&request_id)
437            .ok_or(BidirError::UnknownRequest)?;
438
439        // Deserialize response
440        let resp: Resp = serde_json::from_value(response_data).map_err(|e| {
441            BidirError::TypeMismatch {
442                expected: std::any::type_name::<Resp>().to_string(),
443                got: e.to_string(),
444            }
445        })?;
446
447        // Send response through channel (unblocks request() call)
448        tx.send(resp).map_err(|_| BidirError::ChannelClosed)?;
449
450        Ok(())
451    }
452
453    /// Get provenance path (for debugging)
454    pub fn provenance(&self) -> &[String] {
455        &self.provenance
456    }
457
458    /// Get plexus hash (for metadata)
459    pub fn plexus_hash(&self) -> &str {
460        &self.plexus_hash
461    }
462}
463
464// Convenience methods for StandardBidirChannel
465impl BidirChannel<StandardRequest, StandardResponse> {
466    /// Ask user for yes/no confirmation
467    ///
468    /// # Examples
469    ///
470    /// ```rust,ignore
471    /// if ctx.confirm("Delete this file?").await? {
472    ///     // user confirmed
473    /// }
474    /// ```
475    pub async fn confirm(&self, message: &str) -> Result<bool, BidirError> {
476        let resp = self
477            .request(StandardRequest::Confirm {
478                message: message.to_string(),
479                default: None,
480            })
481            .await?;
482
483        match resp {
484            StandardResponse::Confirmed { value } => Ok(value),
485            StandardResponse::Cancelled => Err(BidirError::Cancelled),
486            _ => Err(BidirError::TypeMismatch {
487                expected: "Confirmed".into(),
488                got: format!("{:?}", resp),
489            }),
490        }
491    }
492
493    /// Ask user for text input
494    ///
495    /// Returns the user's input as a `serde_json::Value`. For most prompts,
496    /// this will be a `Value::String`. Use `.as_str()` or `.to_string()` to
497    /// extract the string content.
498    ///
499    /// # Examples
500    ///
501    /// ```rust,ignore
502    /// let name_val = ctx.prompt("Enter your name:").await?;
503    /// let name = name_val.as_str().unwrap_or("").to_string();
504    /// ```
505    pub async fn prompt(&self, message: &str) -> Result<String, BidirError> {
506        let resp = self
507            .request(StandardRequest::Prompt {
508                message: message.to_string(),
509                default: None,
510                placeholder: None,
511            })
512            .await?;
513
514        match resp {
515            StandardResponse::Text { value } => {
516                // Extract string from Value for convenience
517                match value {
518                    serde_json::Value::String(s) => Ok(s),
519                    other => Ok(other.to_string()),
520                }
521            }
522            StandardResponse::Cancelled => Err(BidirError::Cancelled),
523            _ => Err(BidirError::TypeMismatch {
524                expected: "Text".into(),
525                got: format!("{:?}", resp),
526            }),
527        }
528    }
529
530    /// Ask user to select from options
531    ///
532    /// Returns the selected values as strings. Each `SelectOption` value
533    /// is converted from `serde_json::Value` to `String`.
534    ///
535    /// # Examples
536    ///
537    /// ```rust,ignore
538    /// let options = vec![
539    ///     SelectOption::new("dev", "Development"),
540    ///     SelectOption::new("prod", "Production"),
541    /// ];
542    /// let selected = ctx.select("Choose environment:", options).await?;
543    /// ```
544    pub async fn select(
545        &self,
546        message: &str,
547        options: Vec<SelectOption>,
548    ) -> Result<Vec<String>, BidirError> {
549        let resp = self
550            .request(StandardRequest::Select {
551                message: message.to_string(),
552                options,
553                multi_select: false,
554            })
555            .await?;
556
557        match resp {
558            StandardResponse::Selected { values } => {
559                // Convert each Value to String for convenience
560                let strings = values
561                    .into_iter()
562                    .map(|v| match v {
563                        serde_json::Value::String(s) => s,
564                        other => other.to_string(),
565                    })
566                    .collect();
567                Ok(strings)
568            }
569            StandardResponse::Cancelled => Err(BidirError::Cancelled),
570            _ => Err(BidirError::TypeMismatch {
571                expected: "Selected".into(),
572                got: format!("{:?}", resp),
573            }),
574        }
575    }
576}
577
578/// Bidirectional channel with fallback when transport doesn't support bidirectional
579///
580/// Wraps a BidirChannel and provides fallback values when bidirectional
581/// requests fail due to NotSupported error.
582pub struct BidirWithFallback<Req, Resp>
583where
584    Req: Serialize + DeserializeOwned + Send + 'static,
585    Resp: Serialize + DeserializeOwned + Send + 'static,
586{
587    channel: Arc<BidirChannel<Req, Resp>>,
588    fallback_fn: Box<dyn Fn(&Req) -> Resp + Send + Sync>,
589}
590
591impl<Req, Resp> BidirWithFallback<Req, Resp>
592where
593    Req: Serialize + DeserializeOwned + Send + 'static,
594    Resp: Serialize + DeserializeOwned + Send + 'static,
595{
596    /// Create a new fallback wrapper with custom fallback function
597    pub fn new(
598        channel: Arc<BidirChannel<Req, Resp>>,
599        fallback: impl Fn(&Req) -> Resp + Send + Sync + 'static,
600    ) -> Self {
601        Self {
602            channel,
603            fallback_fn: Box::new(fallback),
604        }
605    }
606
607    /// Make a request, using fallback if bidirectional not supported
608    pub async fn request(&self, req: Req) -> Resp
609    where
610        Req: Clone,
611    {
612        match self.channel.request(req.clone()).await {
613            Ok(resp) => resp,
614            Err(BidirError::NotSupported) | Err(BidirError::Timeout(_)) => {
615                (self.fallback_fn)(&req)
616            }
617            Err(_) => (self.fallback_fn)(&req),
618        }
619    }
620}
621
622// Helper for StandardBidirChannel fallbacks
623impl BidirWithFallback<StandardRequest, StandardResponse> {
624    /// Create fallback that auto-confirms all requests
625    pub fn auto_confirm(
626        channel: Arc<BidirChannel<StandardRequest, StandardResponse>>,
627    ) -> Self {
628        Self::new(channel, |req| match req {
629            StandardRequest::Confirm { default, .. } => StandardResponse::Confirmed {
630                value: default.unwrap_or(true),
631            },
632            StandardRequest::Prompt { default, .. } => StandardResponse::Text {
633                value: default.clone().unwrap_or(serde_json::Value::String(String::new())),
634            },
635            StandardRequest::Select { options, .. } => StandardResponse::Selected {
636                values: vec![options
637                    .first()
638                    .map(|o| o.value.clone())
639                    .unwrap_or(serde_json::Value::String(String::new()))],
640            },
641            StandardRequest::Custom { data } => StandardResponse::Custom { data: data.clone() },
642        })
643    }
644}
645
646#[cfg(test)]
647mod tests {
648    use super::*;
649
650    #[tokio::test]
651    async fn test_bidir_channel_not_supported() {
652        let (tx, _rx) = mpsc::channel(32);
653        let channel: BidirChannel<StandardRequest, StandardResponse> =
654            BidirChannel::new_direct(tx, false, vec!["test".into()], "hash".into());
655
656        let result = channel.confirm("Test?").await;
657        assert!(matches!(result, Err(BidirError::NotSupported)));
658    }
659
660    #[tokio::test]
661    async fn test_bidir_request_response() {
662        let (tx, mut rx) = mpsc::channel(32);
663        let channel: Arc<BidirChannel<StandardRequest, StandardResponse>> = Arc::new(BidirChannel::new_direct(
664            tx,
665            true,
666            vec!["test".into()],
667            "hash".into(),
668        ));
669
670        // Spawn request in background
671        let channel_clone = channel.clone();
672        let handle = tokio::spawn(async move {
673            channel_clone
674                .request(StandardRequest::Confirm {
675                    message: "Test?".into(),
676                    default: None,
677                })
678                .await
679        });
680
681        // Receive request
682        if let Some(PlexusStreamItem::Request {
683            request_id,
684            request_data,
685            ..
686        }) = rx.recv().await
687        {
688            // Verify request
689            let req: StandardRequest = serde_json::from_value(request_data).unwrap();
690            assert!(matches!(req, StandardRequest::Confirm { .. }));
691
692            // Send response
693            channel
694                .handle_response(
695                    request_id,
696                    serde_json::to_value(&StandardResponse::<serde_json::Value>::Confirmed {
697                        value: true,
698                    })
699                    .unwrap(),
700                )
701                .unwrap();
702        } else {
703            panic!("Expected Request item");
704        }
705
706        // Verify response received
707        let result: StandardResponse = handle.await.unwrap().unwrap();
708        assert_eq!(result, StandardResponse::Confirmed { value: true });
709    }
710
711    #[tokio::test]
712    async fn test_convenience_methods() {
713        let (tx, mut rx) = mpsc::channel(32);
714        let channel: Arc<StandardBidirChannel> = Arc::new(BidirChannel::new_direct(
715            tx,
716            true,
717            vec!["test".into()],
718            "hash".into(),
719        ));
720
721        // Test confirm()
722        let channel_clone = channel.clone();
723        let handle = tokio::spawn(async move { channel_clone.confirm("Delete?").await });
724
725        if let Some(PlexusStreamItem::Request { request_id, .. }) = rx.recv().await {
726            channel
727                .handle_response(
728                    request_id,
729                    serde_json::to_value(&StandardResponse::<serde_json::Value>::Confirmed {
730                        value: true,
731                    })
732                    .unwrap(),
733                )
734                .unwrap();
735        }
736
737        assert_eq!(handle.await.unwrap().unwrap(), true);
738    }
739
740    #[tokio::test]
741    async fn test_timeout() {
742        let (tx, _rx) = mpsc::channel(32);
743        let channel: BidirChannel<StandardRequest, StandardResponse> =
744            BidirChannel::new_direct(tx, true, vec!["test".into()], "hash".into());
745
746        let result = channel
747            .request_with_timeout(
748                StandardRequest::Confirm {
749                    message: "Test?".into(),
750                    default: None,
751                },
752                Duration::from_millis(100),
753            )
754            .await;
755
756        assert!(matches!(result, Err(BidirError::Timeout(100))));
757    }
758
759    #[tokio::test]
760    async fn test_fallback() {
761        let (tx, _rx) = mpsc::channel(32);
762        let channel = Arc::new(BidirChannel::new_direct(
763            tx,
764            false, // not supported
765            vec!["test".into()],
766            "hash".into(),
767        ));
768
769        let fallback = BidirWithFallback::auto_confirm(channel);
770
771        let resp = fallback
772            .request(StandardRequest::Confirm {
773                message: "Test?".into(),
774                default: Some(false),
775            })
776            .await;
777
778        assert_eq!(resp, StandardResponse::Confirmed { value: false });
779    }
780}