Skip to main content

plexus_core/plexus/
dispatch.rs

1//! HubDispatch trait for method delegation
2//!
3//! This trait is generated by the `#[hub_methods]` attribute macro
4//! and provides a uniform dispatch interface for routing method calls.
5
6use std::pin::Pin;
7use futures::Stream;
8use serde_json::Value;
9
10use super::schema::MethodSchema;
11use super::streaming::PlexusStream;
12use super::types::{PlexusStreamItem, StreamMetadata};
13use super::context::PlexusContext;
14use super::plexus::PlexusError;
15
16/// Simple stream item for dispatch results
17#[derive(Debug, Clone)]
18pub enum StreamItem {
19    /// Successful data
20    Data(Value),
21    /// Error occurred
22    Error(PlexusError),
23}
24
25/// Trait for types that can dispatch method calls
26///
27/// This is generated by `#[hub_methods]` on impl blocks.
28pub trait HubDispatch: Send + Sync {
29    /// Dispatch a method call and return a stream of results
30    fn dispatch(
31        &self,
32        method: &str,
33        params: Value,
34    ) -> Pin<Box<dyn Stream<Item = StreamItem> + Send + 'static>>;
35
36    /// Get the list of method names
37    fn method_names() -> &'static [&'static str]
38    where
39        Self: Sized;
40
41    /// Get method schemas for all methods
42    fn method_schemas() -> Vec<MethodSchema>
43    where
44        Self: Sized;
45
46    /// Get help for a specific method
47    fn method_help(&self, method: &str) -> Option<String>;
48}
49
50/// Wrapper to create PlexusStream from dispatch results
51pub struct PlexusStreamBuilder;
52
53impl PlexusStreamBuilder {
54    /// Create a PlexusStream from a dispatch stream
55    pub fn from_dispatch(
56        stream: Pin<Box<dyn Stream<Item = StreamItem> + Send + 'static>>,
57        content_type: String,
58        provenance: Vec<String>,
59    ) -> PlexusStream {
60        use futures::StreamExt;
61
62        let plexus_hash = PlexusContext::hash();
63        let metadata = StreamMetadata::new(provenance, plexus_hash);
64
65        Box::pin(stream.map(move |item| match item {
66            StreamItem::Data(value) => PlexusStreamItem::Data {
67                metadata: metadata.clone(),
68                content_type: content_type.clone(),
69                content: value,
70            },
71            StreamItem::Error(err) => PlexusStreamItem::Error {
72                metadata: metadata.clone(),
73                message: err.to_string(),
74                code: None,
75                recoverable: false,
76            },
77        }))
78    }
79}
80
81/// Create a PlexusStream from a dispatch stream (convenience function)
82pub fn from_dispatch_stream(
83    stream: Pin<Box<dyn Stream<Item = StreamItem> + Send + 'static>>,
84    content_type: String,
85    provenance: Vec<String>,
86) -> PlexusStream {
87    PlexusStreamBuilder::from_dispatch(stream, content_type, provenance)
88}