use std::pin::Pin;
use futures::Stream;
use serde_json::Value;
use super::schema::MethodSchema;
use super::streaming::PlexusStream;
use super::types::{PlexusStreamItem, StreamMetadata};
use super::context::PlexusContext;
use super::plexus::PlexusError;
#[derive(Debug, Clone)]
pub enum StreamItem {
Data(Value),
Error(PlexusError),
}
pub trait HubDispatch: Send + Sync {
fn dispatch(
&self,
method: &str,
params: Value,
) -> Pin<Box<dyn Stream<Item = StreamItem> + Send + 'static>>;
fn method_names() -> &'static [&'static str]
where
Self: Sized;
fn method_schemas() -> Vec<MethodSchema>
where
Self: Sized;
fn method_help(&self, method: &str) -> Option<String>;
}
pub struct PlexusStreamBuilder;
impl PlexusStreamBuilder {
pub fn from_dispatch(
stream: Pin<Box<dyn Stream<Item = StreamItem> + Send + 'static>>,
content_type: String,
provenance: Vec<String>,
) -> PlexusStream {
use futures::StreamExt;
let plexus_hash = PlexusContext::hash();
let metadata = StreamMetadata::new(provenance, plexus_hash);
Box::pin(stream.map(move |item| match item {
StreamItem::Data(value) => PlexusStreamItem::Data {
metadata: metadata.clone(),
content_type: content_type.clone(),
content: value,
},
StreamItem::Error(err) => PlexusStreamItem::Error {
metadata: metadata.clone(),
message: err.to_string(),
code: None,
recoverable: false,
},
}))
}
}
pub fn from_dispatch_stream(
stream: Pin<Box<dyn Stream<Item = StreamItem> + Send + 'static>>,
content_type: String,
provenance: Vec<String>,
) -> PlexusStream {
PlexusStreamBuilder::from_dispatch(stream, content_type, provenance)
}