Skip to main content

cc_lb_plugin_conformance/
dispatch.rs

1#![cfg(feature = "dispatch")]
2
3use std::collections::BTreeSet;
4
5use cc_lb_plugin_wire::augmented_metadata::AugmentedMetadata;
6use cc_lb_plugin_wire::limits::{
7    HANDSHAKE_FUEL, HANDSHAKE_WALL_MS, SKIP_HANDSHAKE_IF_FRESH_TTL_SECS,
8};
9pub use cc_lb_plugin_wire::wire_function::FallbackPolicy;
10use cc_lb_plugin_wire::wire_function::WireFunction;
11use cc_lb_runtime_protocol::{BuildPluginError, build_plugin};
12use thiserror::Error;
13
14use crate::handshake::HandshakeError;
15
16/// Build a temporary plugin session, dispatch one wire call, and return the wire outcome.
17///
18/// Instantiation and handshake failures are returned as [`RunError`]. Wire-call failures,
19/// including traps or panics inside Extism dispatch, are caught by the protocol layer and
20/// returned as [`DispatchOutcome::Fallback`] according to the function's fallback policy.
21pub fn run<F: WireFunction>(
22    wasm: &[u8],
23    request: F::Request,
24) -> Result<DispatchOutcome<F::Response>, RunError> {
25    let plugin = build_plugin(wasm, HANDSHAKE_WALL_MS, HANDSHAKE_FUEL)
26        .map_err(RunError::from_build_error)?;
27    let metadata = metadata_from_handshake(wasm, &BTreeSet::new()).map_err(RunError::Handshake)?;
28    let mut session = PluginSession { plugin, metadata };
29
30    Ok(session.dispatch::<F>(request))
31}
32
33/// Reusable in-process plugin session for conformance dispatch checks.
34///
35/// `PluginSession` is intentionally not cloneable: an Extism plugin instance is mutable
36/// execution state and dispatch requires `&mut self`. Panics inside Extism's wire-call path
37/// are caught by `cc_lb_runtime_protocol::dispatch::dispatch_wire_call` and surface as
38/// [`DispatchOutcome::Fallback`] with the dispatched function's [`FallbackPolicy`]. Such a
39/// fallback does not poison the session; later dispatches use the same plugin instance normally.
40#[non_exhaustive]
41pub struct PluginSession {
42    plugin: extism::Plugin,
43    metadata: AugmentedMetadata,
44}
45
46impl PluginSession {
47    /// Instantiate a plugin and run handshake with no host capabilities.
48    pub fn new(wasm: &[u8]) -> Result<Self, HandshakeError> {
49        Self::new_with_caps(wasm, &BTreeSet::new())
50    }
51
52    /// Instantiate a plugin and run handshake with caller-supplied host capabilities.
53    pub fn new_with_caps(
54        wasm: &[u8],
55        host_capabilities: &BTreeSet<String>,
56    ) -> Result<Self, HandshakeError> {
57        let plugin = build_plugin(wasm, HANDSHAKE_WALL_MS, HANDSHAKE_FUEL)
58            .map_err(HandshakeError::from_build_error)?;
59        let metadata = metadata_from_handshake(wasm, host_capabilities)?;
60
61        Ok(Self { plugin, metadata })
62    }
63
64    /// Dispatch one wire call through the existing plugin instance.
65    ///
66    /// This mirrors the protocol layer: wire-call failures never return `Result::Err` and
67    /// instead become [`DispatchOutcome::Fallback`]. The call is not retried.
68    pub fn dispatch<F: WireFunction>(
69        &mut self,
70        request: F::Request,
71    ) -> DispatchOutcome<F::Response> {
72        DispatchOutcome::from_protocol(cc_lb_runtime_protocol::dispatch::dispatch_wire_call::<F>(
73            &mut self.plugin,
74            &self.metadata,
75            request,
76        ))
77    }
78}
79
80/// Result of a conformance dispatch call.
81#[non_exhaustive]
82#[derive(Clone, Debug, PartialEq, Eq)]
83pub enum DispatchOutcome<R> {
84    /// The plugin returned a decoded response.
85    Ok(R),
86    /// The protocol layer applied the wire function's fallback policy.
87    Fallback(FallbackPolicy),
88}
89
90impl<R> DispatchOutcome<R> {
91    pub(crate) fn from_protocol(
92        value: cc_lb_runtime_protocol::dispatch::DispatchOutcome<R>,
93    ) -> Self {
94        match value {
95            cc_lb_runtime_protocol::dispatch::DispatchOutcome::Ok(response) => Self::Ok(response),
96            cc_lb_runtime_protocol::dispatch::DispatchOutcome::Fallback(policy) => {
97                Self::Fallback(policy)
98            }
99            _ => unreachable!(),
100        }
101    }
102}
103
104/// Errors that can prevent `dispatch::run` from reaching the wire-call layer.
105#[non_exhaustive]
106#[derive(Debug, Error)]
107pub enum RunError {
108    /// The plugin could not be instantiated for dispatch.
109    #[error("plugin instantiation failed: {reason}")]
110    Build { reason: String },
111    /// The plugin failed handshake or identity checks needed for dispatch metadata.
112    #[error("handshake failed: {0}")]
113    Handshake(HandshakeError),
114}
115
116impl RunError {
117    pub(crate) fn from_build_error(value: BuildPluginError) -> Self {
118        match value {
119            BuildPluginError::Instantiate { reason } => Self::Build { reason },
120            _ => unreachable!(),
121        }
122    }
123}
124
125impl HandshakeError {
126    pub(crate) fn from_build_error(value: BuildPluginError) -> Self {
127        match value {
128            BuildPluginError::Instantiate { reason } => Self::Instantiate { reason },
129            _ => unreachable!(),
130        }
131    }
132}
133
134fn metadata_from_handshake(
135    wasm: &[u8],
136    host_capabilities: &BTreeSet<String>,
137) -> Result<AugmentedMetadata, HandshakeError> {
138    let offer = cc_lb_runtime_protocol::handshake::build_offer(host_capabilities);
139    let accept = cc_lb_runtime_protocol::handshake::execute_handshake(wasm, &offer)
140        .map_err(HandshakeError::from_protocol)?;
141    let identity = cc_lb_runtime_protocol::identity::read_identity(wasm).map_err(|source| {
142        HandshakeError::InvalidIdentity {
143            field: "custom_section",
144            reason: source.to_string(),
145        }
146    })?;
147
148    Ok(AugmentedMetadata {
149        identity,
150        negotiated_functions: accept.chosen_versions,
151        negotiated_capabilities: accept.required_capabilities,
152        handshake_completed_at: 1,
153        self_check_passed: true,
154        self_check_completed_at: 1,
155        expires_at: 1 + SKIP_HANDSHAKE_IF_FRESH_TTL_SECS as i64,
156    })
157}