Skip to main content

Module bidirectional

Module bidirectional 

Source
Expand description

Bidirectional streaming support for Plexus RPC

This module enables server-to-client requests during streaming RPC execution, allowing for interactive workflows like confirmations, prompts, and multi-step wizards.

§Overview

Traditional RPC is unidirectional: clients send requests, servers respond. Bidirectional communication extends this by allowing the server to request input from the client during stream execution. This is essential for:

  • User confirmations before destructive operations
  • Interactive prompts for missing information
  • Multi-step wizards that guide users through complex workflows
  • Dynamic selection menus based on server-side state

§Architecture

The bidirectional system is built on generic types that can work with any serializable request/response types:

┌─────────────────────────────────────────────────────────────────────┐
│                        BidirChannel<Req, Resp>                      │
│  Generic channel for type-safe server→client requests              │
└─────────────────────────────────────────────────────────────────────┘
                                   │
                   ┌───────────────┴───────────────┐
                   ▼                               ▼
┌──────────────────────────────┐   ┌──────────────────────────────┐
│     StandardBidirChannel     │   │   Custom Request/Response    │
│  (confirm/prompt/select)     │   │   (domain-specific types)    │
└──────────────────────────────┘   └──────────────────────────────┘

§Wire Format

Bidirectional requests are sent as PlexusStreamItem::Request:

{
  "type": "request",
  "requestId": "550e8400-e29b-41d4-a716-446655440000",
  "requestData": { "type": "confirm", "message": "Delete file?" },
  "timeoutMs": 30000
}

Clients respond via the _plexus_respond method or transport-specific mechanism.

§Core Types

§Helper Functions

§Examples

§Using StandardBidirChannel (Most Common)

The StandardBidirChannel provides convenience methods for common UI patterns:

use plexus_core::plexus::bidirectional::{StandardBidirChannel, BidirError, SelectOption};

async fn my_method(ctx: &StandardBidirChannel) -> Result<(), BidirError> {
    // Simple yes/no confirmation
    if ctx.confirm("Delete this file?").await? {
        // User confirmed - proceed with deletion
    }

    // Text input prompt
    let name = ctx.prompt("Enter your name:").await?;
    println!("Hello, {}!", name);

    // Selection from options
    let choices = vec![
        SelectOption::new("dev", "Development")
            .with_description("Local development environment"),
        SelectOption::new("staging", "Staging")
            .with_description("Pre-production testing"),
        SelectOption::new("prod", "Production")
            .with_description("Live environment (requires approval)"),
    ];
    let selected = ctx.select("Choose environment:", choices).await?;
    println!("Selected: {:?}", selected);

    Ok(())
}

§Handling Errors Gracefully

Always handle bidirectional errors to support non-interactive transports:

use plexus_core::plexus::bidirectional::{StandardBidirChannel, BidirError, bidir_error_message};

async fn safe_delete(ctx: &StandardBidirChannel, path: &str) -> Result<bool, String> {
    match ctx.confirm(&format!("Delete '{}'?", path)).await {
        Ok(true) => {
            // User confirmed
            Ok(true)
        }
        Ok(false) => {
            // User declined
            Ok(false)
        }
        Err(BidirError::NotSupported) => {
            // Transport doesn't support bidirectional - skip deletion for safety
            Err("Cannot delete without user confirmation".into())
        }
        Err(BidirError::Cancelled) => {
            // User explicitly cancelled
            Ok(false)
        }
        Err(e) => {
            // Other error - log and return user-friendly message
            Err(bidir_error_message(&e))
        }
    }
}

§Using Custom Request/Response Types

For domain-specific interactions, define custom types:

use serde::{Deserialize, Serialize};
use schemars::JsonSchema;
use plexus_core::plexus::bidirectional::{BidirChannel, BidirError};

#[derive(Serialize, Deserialize, JsonSchema)]
#[serde(tag = "type", rename_all = "snake_case")]
enum ImageRequest {
    ConfirmOverwrite { path: String, size: u64 },
    ChooseQuality { min: u8, max: u8, default: u8 },
    SelectFormat { formats: Vec<String> },
}

#[derive(Serialize, Deserialize, JsonSchema)]
#[serde(tag = "type", rename_all = "snake_case")]
enum ImageResponse {
    Confirmed { value: bool },
    Quality { value: u8 },
    Format { value: String },
    Cancelled,
}

type ImageBidirChannel = BidirChannel<ImageRequest, ImageResponse>;

async fn process_image(
    ctx: &ImageBidirChannel,
    path: &str,
) -> Result<(), BidirError> {
    // Ask for quality
    let quality = ctx.request(ImageRequest::ChooseQuality {
        min: 50, max: 100, default: 85,
    }).await?;

    if let ImageResponse::Quality { value } = quality {
        println!("Processing with quality: {}", value);
    }

    Ok(())
}

§Testing with Auto-Response Channels

Use test helpers for deterministic unit tests:

use plexus_core::plexus::bidirectional::{
    auto_respond_channel, StandardRequest, StandardResponse
};

#[tokio::test]
async fn test_wizard_flow() {
    let ctx = auto_respond_channel(|req: &StandardRequest| {
        match req {
            StandardRequest::Confirm { .. } => StandardResponse::Confirmed { value: true },
            StandardRequest::Prompt { .. } => StandardResponse::Text { value: "test-value".into() },
            StandardRequest::Select { options, .. } => {
                StandardResponse::Selected { values: vec![options[0].value.clone()] }
            }
        }
    });

    // Test your activation with deterministic responses
    let result = ctx.confirm("Test?").await;
    assert_eq!(result.unwrap(), true);
}

§Transport Support

Bidirectional communication works differently across transports:

TransportMechanism
WebSocketRequest sent as stream item, response via dedicated call
MCPRequest as logging notification, response via _plexus_respond tool
HTTPNot supported (stateless)

The BidirChannel automatically detects transport capabilities and returns BidirError::NotSupported for transports that cannot handle bidirectional requests.

Modules§

channel
Generic bidirectional channel implementation
helpers
Helper functions and utilities for bidirectional communication
registry
Global pending response registry for bidirectional communication
types
Bidirectional streaming types

Structs§

BidirChannel
Generic bidirectional channel for type-safe server-to-client requests.
BidirWithFallback
Bidirectional channel with fallback when transport doesn’t support bidirectional
SelectOption
An option in a StandardRequest::Select request.
TimeoutConfig
Timeout configuration presets for bidirectional requests

Enums§

BidirError
Error types for bidirectional communication
StandardRequest
Standard request types for common interactive UI patterns.
StandardResponse
Standard response types matching StandardRequest.

Functions§

auto_confirm_channel
Create a channel that auto-confirms all requests with the given value
auto_respond_channel
Create a bidirectional channel that automatically responds based on a function
bidir_error_message
Get a user-friendly error message from a BidirError
create_test_bidir_channel
Create a test bidirectional channel for unit tests
create_test_standard_channel
Create a standard bidirectional channel for testing
handle_pending_response
Handle a response for a pending request
is_request_pending
Check if a request is pending
pending_count
Get the count of pending requests (for monitoring/debugging)
register_pending_request
Register a pending request in the global registry
unregister_pending_request
Remove a pending request from the registry (e.g., on timeout)

Type Aliases§

StandardBidirChannel
Type alias for standard interactive UI patterns.