plexus_core/plexus/
dispatch.rs1use std::pin::Pin;
7use futures::Stream;
8use serde_json::Value;
9
10use super::schema::MethodSchema;
11use super::streaming::PlexusStream;
12use super::types::{PlexusStreamItem, StreamMetadata};
13use super::context::PlexusContext;
14use super::plexus::PlexusError;
15
16#[derive(Debug, Clone)]
18pub enum StreamItem {
19 Data(Value),
21 Error(PlexusError),
23}
24
25pub trait HubDispatch: Send + Sync {
29 fn dispatch(
31 &self,
32 method: &str,
33 params: Value,
34 ) -> Pin<Box<dyn Stream<Item = StreamItem> + Send + 'static>>;
35
36 fn method_names() -> &'static [&'static str]
38 where
39 Self: Sized;
40
41 fn method_schemas() -> Vec<MethodSchema>
43 where
44 Self: Sized;
45
46 fn method_help(&self, method: &str) -> Option<String>;
48}
49
50pub struct PlexusStreamBuilder;
52
53impl PlexusStreamBuilder {
54 pub fn from_dispatch(
56 stream: Pin<Box<dyn Stream<Item = StreamItem> + Send + 'static>>,
57 content_type: String,
58 provenance: Vec<String>,
59 ) -> PlexusStream {
60 use futures::StreamExt;
61
62 let plexus_hash = PlexusContext::hash();
63 let metadata = StreamMetadata::new(provenance, plexus_hash);
64
65 Box::pin(stream.map(move |item| match item {
66 StreamItem::Data(value) => PlexusStreamItem::Data {
67 metadata: metadata.clone(),
68 content_type: content_type.clone(),
69 content: value,
70 },
71 StreamItem::Error(err) => PlexusStreamItem::Error {
72 metadata: metadata.clone(),
73 message: err.to_string(),
74 code: None,
75 recoverable: false,
76 },
77 }))
78 }
79}
80
81pub fn from_dispatch_stream(
83 stream: Pin<Box<dyn Stream<Item = StreamItem> + Send + 'static>>,
84 content_type: String,
85 provenance: Vec<String>,
86) -> PlexusStream {
87 PlexusStreamBuilder::from_dispatch(stream, content_type, provenance)
88}