Skip to main content

BidirChannel

Struct BidirChannel 

Source
pub struct BidirChannel<Req, Resp>
where Req: Serialize + DeserializeOwned + Send + 'static, Resp: Serialize + DeserializeOwned + Send + 'static,
{ /* private fields */ }
Expand description

Generic bidirectional channel for type-safe server-to-client requests.

BidirChannel is the core primitive for bidirectional communication in Plexus RPC. It allows server-side code (activations) to request input from clients during stream execution, enabling interactive workflows.

§Type Parameters

  • Req - Request type sent server→client. Must implement Serialize + DeserializeOwned.
  • Resp - Response type sent client→server. Must implement Serialize + DeserializeOwned.

§Common Type Aliases

For standard UI patterns, use StandardBidirChannel:

type StandardBidirChannel = BidirChannel<StandardRequest, StandardResponse>;

§Creating Channels

Channels are typically created by the transport layer, not by activations directly. The #[hub_method(bidirectional)] macro injects the appropriate channel type.

// The macro generates this signature:
async fn wizard(&self, ctx: &Arc<StandardBidirChannel>) -> impl Stream<Item = Event> { ... }

§Making Requests

§Standard Patterns (via StandardBidirChannel)

// Yes/no confirmation
if ctx.confirm("Delete file?").await? {
    // User said yes
}

// Text input
let name = ctx.prompt("Enter name:").await?;

// Selection
let options = vec![
    SelectOption::new("dev", "Development"),
    SelectOption::new("prod", "Production"),
];
let selected = ctx.select("Choose env:", options).await?;

§Custom Types

// Define custom request/response
#[derive(Serialize, Deserialize)]
enum ImageReq { ChooseQuality { min: u8, max: u8 } }

#[derive(Serialize, Deserialize)]
enum ImageResp { Quality(u8), Cancel }

// Use in activation
async fn process(ctx: &BidirChannel<ImageReq, ImageResp>) {
    let quality = ctx.request(ImageReq::ChooseQuality { min: 50, max: 100 }).await?;
}

§Error Handling

Always handle BidirError::NotSupported for transports that don’t support bidirectional communication:

match ctx.confirm("Proceed?").await {
    Ok(true) => { /* confirmed */ }
    Ok(false) => { /* declined */ }
    Err(BidirError::NotSupported) => {
        // Non-interactive transport - use safe default
    }
    Err(BidirError::Cancelled) => {
        // User cancelled
    }
    Err(e) => {
        // Other error
    }
}

§Timeouts

Default timeout is 30 seconds. Use request_with_timeout for custom timeouts:

use std::time::Duration;

// Quick timeout for automated scenarios
ctx.request_with_timeout(req, Duration::from_secs(10)).await?;

// Extended timeout for complex decisions
ctx.request_with_timeout(req, Duration::from_secs(120)).await?;

§Thread Safety

BidirChannel uses Arc<Mutex<_>> internally and is safe to share across tasks. Multiple requests can be pending simultaneously.

Implementations§

Source§

impl<Req, Resp> BidirChannel<Req, Resp>
where Req: Serialize + DeserializeOwned + Send + 'static, Resp: Serialize + DeserializeOwned + Send + 'static,

Source

pub fn new( stream_tx: Sender<PlexusStreamItem>, bidirectional_supported: bool, provenance: Vec<String>, plexus_hash: String, ) -> BidirChannel<Req, Resp>

Create a new bidirectional channel

By default, uses the global response registry which works with all transport types:

  • MCP: Responses come through _plexus_respond tool → global registry
  • WebSocket: Responses can also use global registry via handle_pending_response()

Use new_direct() if you need direct response handling (for testing or specific transports).

Source

pub fn new_direct( stream_tx: Sender<PlexusStreamItem>, bidirectional_supported: bool, provenance: Vec<String>, plexus_hash: String, ) -> BidirChannel<Req, Resp>

Create a bidirectional channel that uses direct response handling

Responses must be delivered via handle_response() method on this channel instance. Use this for testing or when you have direct access to the channel for responses.

Source

pub fn is_bidirectional(&self) -> bool

Check if bidirectional communication is supported

Source

pub async fn request(&self, req: Req) -> Result<Resp, BidirError>

Make a bidirectional request with default timeout (30s)

Sends a request to the client and waits for response. Returns error if transport doesn’t support bidirectional or timeout occurs.

Source

pub async fn request_with_timeout( &self, req: Req, timeout_duration: Duration, ) -> Result<Resp, BidirError>

Make a bidirectional request with custom timeout

Source

pub fn handle_response( &self, request_id: String, response_data: Value, ) -> Result<(), BidirError>

Handle a response from the client

Called by transport layer when client responds to a request. Deserializes response and sends it through the pending request’s channel.

Source

pub fn provenance(&self) -> &[String]

Get provenance path (for debugging)

Source

pub fn plexus_hash(&self) -> &str

Get plexus hash (for metadata)

Source§

impl BidirChannel<StandardRequest, StandardResponse>

Source

pub async fn confirm(&self, message: &str) -> Result<bool, BidirError>

Ask user for yes/no confirmation

§Examples
if ctx.confirm("Delete this file?").await? {
    // user confirmed
}
Source

pub async fn prompt(&self, message: &str) -> Result<String, BidirError>

Ask user for text input

Returns the user’s input as a serde_json::Value. For most prompts, this will be a Value::String. Use .as_str() or .to_string() to extract the string content.

§Examples
let name_val = ctx.prompt("Enter your name:").await?;
let name = name_val.as_str().unwrap_or("").to_string();
Source

pub async fn select( &self, message: &str, options: Vec<SelectOption>, ) -> Result<Vec<String>, BidirError>

Ask user to select from options

Returns the selected values as strings. Each SelectOption value is converted from serde_json::Value to String.

§Examples
let options = vec![
    SelectOption::new("dev", "Development"),
    SelectOption::new("prod", "Production"),
];
let selected = ctx.select("Choose environment:", options).await?;

Auto Trait Implementations§

§

impl<Req, Resp> Freeze for BidirChannel<Req, Resp>

§

impl<Req, Resp> RefUnwindSafe for BidirChannel<Req, Resp>
where Req: RefUnwindSafe,

§

impl<Req, Resp> Send for BidirChannel<Req, Resp>

§

impl<Req, Resp> Sync for BidirChannel<Req, Resp>
where Req: Sync,

§

impl<Req, Resp> Unpin for BidirChannel<Req, Resp>
where Req: Unpin,

§

impl<Req, Resp> UnsafeUnpin for BidirChannel<Req, Resp>

§

impl<Req, Resp> UnwindSafe for BidirChannel<Req, Resp>
where Req: UnwindSafe,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> Chain<T> for T

Source§

fn len(&self) -> usize

The number of items that this chain link consists of.
Source§

fn append_to(self, v: &mut Vec<T>)

Append the elements in this link to the chain.
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> MaybeSend for T
where T: Send,