use {
crate::{
discovery::Discovery,
network::{
self,
LocalNode,
ProtocolProvider,
link::{self, Protocol},
},
primitives::{Datum, Digest},
},
accept::Acceptor,
iroh::protocol::RouterBuilder,
producer::Sinks,
std::sync::Arc,
};
mod accept;
mod config;
mod criteria;
pub mod consumer;
pub mod producer;
pub mod status;
pub use {
config::{Config, ConfigBuilder, ConfigBuilderError, backoff},
consumer::Consumer,
criteria::Criteria,
producer::Producer,
};
pub trait StreamProducer {
type Producer;
fn producer(network: &crate::Network) -> Self::Producer;
fn online_producer(
network: &crate::Network,
) -> impl Future<Output = Self::Producer> + Send + Sync + 'static;
}
pub trait StreamConsumer {
type Consumer;
fn consumer(network: &crate::Network) -> Self::Consumer;
fn online_consumer(
network: &crate::Network,
) -> impl Future<Output = Self::Consumer> + Send + Sync + 'static;
}
pub type ProducerOf<S> = <S as StreamProducer>::Producer;
pub type ConsumerOf<S> = <S as StreamConsumer>::Consumer;
pub struct ProducerDef<T: Datum> {
pub stream_id: Option<StreamId>,
_marker: core::marker::PhantomData<fn(&T)>,
}
impl<T: Datum> ProducerDef<T> {
pub const fn new(stream_id: Option<StreamId>) -> Self {
Self {
stream_id,
_marker: core::marker::PhantomData,
}
}
#[inline]
pub fn open<'s>(
&self,
network: &'s crate::Network,
) -> producer::Builder<'s, T> {
let mut builder = network.streams().producer::<T>();
if let Some(id) = self.stream_id {
builder = builder.with_stream_id(id);
}
builder
}
}
pub struct ConsumerDef<T: Datum> {
pub stream_id: Option<StreamId>,
_marker: core::marker::PhantomData<fn(&T)>,
}
impl<T: Datum> ConsumerDef<T> {
pub const fn new(stream_id: Option<StreamId>) -> Self {
Self {
stream_id,
_marker: core::marker::PhantomData,
}
}
#[inline]
pub fn open<'s>(
&self,
network: &'s crate::Network,
) -> consumer::Builder<'s, T> {
let mut builder = network.streams().consumer::<T>();
if let Some(id) = self.stream_id {
builder = builder.with_stream_id(id);
}
builder
}
}
#[macro_export]
macro_rules! stream {
(#[$($meta:tt)*] $($rest:tt)*) => {
$crate::stream! { @attrs [#[$($meta)*]] $($rest)* }
};
(@attrs [$($attrs:tt)*] #[$($meta:tt)*] $($rest:tt)*) => {
$crate::stream! { @attrs [$($attrs)* #[$($meta)*]] $($rest)* }
};
(@attrs [$($attrs:tt)*] $($rest:tt)*) => {
$crate::__stream_impl! { @$crate; $($attrs)* $($rest)* }
};
($($tt:tt)*) => {
$crate::__stream_impl! { @$crate; $($tt)* }
};
}
pub type StreamId = Digest;
pub struct Streams {
config: Arc<Config>,
local: LocalNode,
discovery: Discovery,
sinks: Arc<Sinks>,
}
impl Streams {
pub fn produce<D: Datum>(&self) -> Producer<D> {
match producer::Builder::new(self).build() {
Ok(producer) => producer,
Err(producer::BuilderError::AlreadyExists(existing)) => existing,
}
}
pub fn producer<D: Datum>(&self) -> producer::Builder<'_, D> {
producer::Builder::new(self)
}
#[allow(clippy::needless_pass_by_value)]
pub fn producer_of<D: Datum>(
&self,
def: StreamDef<D>,
) -> producer::Builder<'_, D> {
let mut builder = self.producer::<D>();
if let Some(stream_id) = def.stream_id {
builder = builder.with_stream_id(stream_id);
}
builder
}
pub fn consume<D: Datum>(&self) -> Consumer<D> {
self.consumer().build()
}
pub fn consumer<D: Datum>(&self) -> consumer::Builder<'_, D> {
consumer::Builder::new(self)
}
#[allow(clippy::needless_pass_by_value)]
pub fn consumer_of<D: Datum>(
&self,
def: StreamDef<D>,
) -> consumer::Builder<'_, D> {
let mut builder = self.consumer::<D>();
if let Some(stream_id) = def.stream_id {
builder = builder.with_stream_id(stream_id);
}
builder
}
}
impl Streams {
pub(crate) fn new(
local: LocalNode,
discovery: &Discovery,
config: Config,
) -> Self {
Self {
local: local.clone(),
config: Arc::new(config),
discovery: discovery.clone(),
sinks: Arc::new(Sinks::new(local, discovery.clone())),
}
}
}
impl ProtocolProvider for Streams {
fn install(&self, protocols: RouterBuilder) -> RouterBuilder {
protocols.accept(Self::ALPN, Acceptor::new(self))
}
}
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct StreamDef<T: Datum> {
pub stream_id: Option<StreamId>,
_marker: core::marker::PhantomData<fn(&T)>,
}
impl<T: Datum> Clone for StreamDef<T> {
fn clone(&self) -> Self {
*self
}
}
impl<T: Datum> Copy for StreamDef<T> {}
impl<T: Datum> Default for StreamDef<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: Datum> StreamDef<T> {
pub const fn new() -> Self {
Self {
stream_id: None,
_marker: core::marker::PhantomData,
}
}
#[must_use]
pub const fn with_stream_id(stream_id: StreamId) -> Self {
Self {
stream_id: Some(stream_id),
_marker: core::marker::PhantomData,
}
}
}
impl link::Protocol for Streams {
const ALPN: &'static [u8] = b"/mosaik/streams/1.0";
}
network::error::make_close_reason!(
struct StreamNotFound, 10_404);
network::error::make_close_reason!(
struct NotAllowed, 10_403);
network::error::make_close_reason!(
struct NoCapacity, 10_509);
network::error::make_close_reason!(
struct TooSlow, 10_413);