plexus-core 0.5.3

Core infrastructure for Plexus RPC: Activation trait, DynamicHub, schemas
Documentation
//! HubDispatch trait for method delegation
//!
//! This trait is generated by the `#[hub_methods]` attribute macro
//! and provides a uniform dispatch interface for routing method calls.

use std::pin::Pin;
use futures::Stream;
use serde_json::Value;

use super::schema::MethodSchema;
use super::streaming::PlexusStream;
use super::types::{PlexusStreamItem, StreamMetadata};
use super::context::PlexusContext;
use super::plexus::PlexusError;

/// Simple stream item for dispatch results
#[derive(Debug, Clone)]
pub enum StreamItem {
    /// Successful data
    Data(Value),
    /// Error occurred
    Error(PlexusError),
}

/// Trait for types that can dispatch method calls
///
/// This is generated by `#[hub_methods]` on impl blocks.
pub trait HubDispatch: Send + Sync {
    /// Dispatch a method call and return a stream of results
    fn dispatch(
        &self,
        method: &str,
        params: Value,
    ) -> Pin<Box<dyn Stream<Item = StreamItem> + Send + 'static>>;

    /// Get the list of method names
    fn method_names() -> &'static [&'static str]
    where
        Self: Sized;

    /// Get method schemas for all methods
    fn method_schemas() -> Vec<MethodSchema>
    where
        Self: Sized;

    /// Get help for a specific method
    fn method_help(&self, method: &str) -> Option<String>;
}

/// Wrapper to create PlexusStream from dispatch results
pub struct PlexusStreamBuilder;

impl PlexusStreamBuilder {
    /// Create a PlexusStream from a dispatch stream
    pub fn from_dispatch(
        stream: Pin<Box<dyn Stream<Item = StreamItem> + Send + 'static>>,
        content_type: String,
        provenance: Vec<String>,
    ) -> PlexusStream {
        use futures::StreamExt;

        let plexus_hash = PlexusContext::hash();
        let metadata = StreamMetadata::new(provenance, plexus_hash);

        Box::pin(stream.map(move |item| match item {
            StreamItem::Data(value) => PlexusStreamItem::Data {
                metadata: metadata.clone(),
                content_type: content_type.clone(),
                content: value,
            },
            StreamItem::Error(err) => PlexusStreamItem::Error {
                metadata: metadata.clone(),
                message: err.to_string(),
                code: None,
                recoverable: false,
            },
        }))
    }
}

/// Create a PlexusStream from a dispatch stream (convenience function)
pub fn from_dispatch_stream(
    stream: Pin<Box<dyn Stream<Item = StreamItem> + Send + 'static>>,
    content_type: String,
    provenance: Vec<String>,
) -> PlexusStream {
    PlexusStreamBuilder::from_dispatch(stream, content_type, provenance)
}