use crate::stream::run::{RunSelector, load_run_by_form, load_run_by_id};
use async_trait::async_trait;
use sift_connect::SiftChannel;
use sift_error::prelude::*;
use sift_rs::runs::v2::Run;
use uuid::Uuid;
use crate::metrics::SiftStreamMetricsSnapshot;
pub mod builder;
pub mod channel;
mod helpers;
pub mod mode;
pub mod retry;
pub use retry::RetryPolicy;
pub mod run;
pub mod time;
pub(crate) mod flow;
pub mod tasks;
pub mod send_error;
pub use send_error::{SendError, SiftStreamSendError, SiftStreamTrySendError, TrySendError};
#[cfg(test)]
mod test;
pub trait MetricsSnapshot: private::Sealed {
fn snapshot(&self) -> SiftStreamMetricsSnapshot;
}
pub trait Encodeable {
type Output: Send + Sync;
type Encoder: Encoder<Message = Self::Output>;
fn encode(
self,
encoder: &mut Self::Encoder,
stream_id: &Uuid,
run: Option<&Run>,
) -> Option<Self::Output>;
}
pub trait Encoder: private::Sealed {
type Message: Send + Sync;
}
#[async_trait]
pub trait Transport: private::Sealed {
type Message: Send + Sync;
type Encoder: Encoder<Message = Self::Message>;
async fn send(
&mut self,
stream_id: &Uuid,
message: Self::Message,
) -> std::result::Result<(), SendError<Self::Message>>;
async fn send_requests<I>(
&mut self,
stream_id: &Uuid,
requests: I,
) -> std::result::Result<(), SendError<Vec<Self::Message>>>
where
I: IntoIterator<Item = Self::Message> + Send,
I::IntoIter: Send;
fn try_send(
&mut self,
stream_id: &Uuid,
message: Self::Message,
) -> std::result::Result<(), TrySendError<Self::Message>>;
fn try_send_requests<I>(
&mut self,
stream_id: &Uuid,
requests: I,
) -> std::result::Result<(), TrySendError<Vec<Self::Message>>>
where
I: IntoIterator<Item = Self::Message> + Send,
I::IntoIter: Send;
async fn finish(self, stream_id: &Uuid) -> Result<()>;
}
pub struct SiftStream<E, T> {
grpc_channel: SiftChannel,
encoder: E,
transport: T,
run: Option<Run>,
sift_stream_id: Uuid,
}
impl<E, T> SiftStream<E, T>
where
E: Encoder + MetricsSnapshot,
T: Transport<Encoder = E>,
{
#[cfg(feature = "metrics-unstable")]
pub fn get_metrics_snapshot(&self) -> SiftStreamMetricsSnapshot {
self.encoder.snapshot()
}
pub async fn attach_run(&mut self, run_selector: RunSelector) -> Result<()> {
let run = match run_selector {
RunSelector::ById(run_id) => load_run_by_id(self.grpc_channel.clone(), &run_id).await?,
RunSelector::ByForm(run_form) => {
load_run_by_form(self.grpc_channel.clone(), run_form).await?
}
};
self.run = Some(run);
Ok(())
}
pub fn detach_run(&mut self) {
self.run = None;
}
pub fn run(&self) -> Option<&Run> {
self.run.as_ref()
}
pub async fn send<M>(
&mut self,
message: M,
) -> std::result::Result<(), SiftStreamSendError<<T as Transport>::Message>>
where
M: Encodeable<Encoder = E, Output = <T as Transport>::Message> + Send + Sync,
{
let encoded = message
.encode(&mut self.encoder, &self.sift_stream_id, self.run.as_ref())
.ok_or_else(|| SiftStreamSendError::encode_error("Failed to encode message"))?;
self.transport
.send(&self.sift_stream_id, encoded)
.await
.map_err(|SendError(msg)| SiftStreamSendError::ChannelClosed(msg))
}
pub async fn send_requests<I>(
&mut self,
requests: I,
) -> std::result::Result<(), SendError<Vec<<T as Transport>::Message>>>
where
I: IntoIterator<Item = <T as Transport>::Message> + Send,
I::IntoIter: Send,
{
self.transport
.send_requests(&self.sift_stream_id, requests)
.await
}
pub fn try_send<M>(
&mut self,
message: M,
) -> std::result::Result<(), SiftStreamTrySendError<<T as Transport>::Message>>
where
M: Encodeable<Encoder = E, Output = <T as Transport>::Message> + Send + Sync,
{
let encoded = message
.encode(&mut self.encoder, &self.sift_stream_id, self.run.as_ref())
.ok_or_else(|| SiftStreamTrySendError::encode_error("Failed to encode message"))?;
self.transport
.try_send(&self.sift_stream_id, encoded)
.map_err(SiftStreamTrySendError::Channel)
}
pub fn try_send_requests<I>(
&mut self,
requests: I,
) -> std::result::Result<(), TrySendError<Vec<<T as Transport>::Message>>>
where
I: IntoIterator<Item = <T as Transport>::Message> + Send,
I::IntoIter: Send,
{
self.transport
.try_send_requests(&self.sift_stream_id, requests)
}
pub async fn finish(self) -> Result<()> {
self.transport.finish(&self.sift_stream_id).await
}
}
impl<E, T> std::ops::Deref for SiftStream<E, T>
where
E: Encoder + MetricsSnapshot,
T: Transport<Encoder = E>,
{
type Target = E;
fn deref(&self) -> &Self::Target {
&self.encoder
}
}
impl<E, T> std::ops::DerefMut for SiftStream<E, T>
where
E: Encoder + MetricsSnapshot,
T: Transport<Encoder = E>,
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.encoder
}
}
mod private {
pub trait Sealed {}
}