use crate::stream::mode::ingestion_config::LiveStreaming;
use crate::stream::run::{RunSelector, load_run_by_form, load_run_by_id};
use async_trait::async_trait;
use sift_connect::SiftChannel;
use sift_error::prelude::*;
use sift_rs::runs::v2::Run;
use uuid::Uuid;
use crate::metrics::SiftStreamMetricsSnapshot;
pub mod builder;
pub mod channel;
mod helpers;
pub mod mode;
pub mod retry;
pub use retry::RetryPolicy;
pub mod run;
pub mod time;
pub(crate) mod flow;
pub mod tasks;
#[cfg(test)]
mod test;
pub trait MetricsSnapshot: private::Sealed {
fn snapshot(&self) -> SiftStreamMetricsSnapshot;
}
pub trait Encodeable {
type Output: Send + Sync;
type Encoder: Encoder<Message = Self::Output>;
fn encode(
self,
encoder: &mut Self::Encoder,
stream_id: &Uuid,
run: Option<&Run>,
) -> Option<Self::Output>;
}
pub trait Encoder: private::Sealed {
type Message: Send + Sync;
}
#[async_trait]
pub trait Transport: private::Sealed {
type Message: Send + Sync;
type Encoder: Encoder<Message = Self::Message>;
fn send(&mut self, stream_id: &Uuid, message: Self::Message) -> Result<()>;
fn send_requests<I>(&mut self, stream_id: &Uuid, requests: I) -> Result<()>
where
I: IntoIterator<Item = Self::Message> + Send,
I::IntoIter: Send;
async fn finish(self, stream_id: &Uuid) -> Result<()>;
}
pub struct SiftStream<E, T = LiveStreaming> {
grpc_channel: SiftChannel,
encoder: E,
transport: T,
run: Option<Run>,
sift_stream_id: Uuid,
}
impl<E, T> SiftStream<E, T>
where
E: Encoder + MetricsSnapshot,
T: Transport<Encoder = E>,
{
#[cfg(feature = "metrics-unstable")]
pub fn get_metrics_snapshot(&self) -> SiftStreamMetricsSnapshot {
self.encoder.snapshot()
}
pub async fn attach_run(&mut self, run_selector: RunSelector) -> Result<()> {
let run = match run_selector {
RunSelector::ById(run_id) => load_run_by_id(self.grpc_channel.clone(), &run_id).await?,
RunSelector::ByForm(run_form) => {
load_run_by_form(self.grpc_channel.clone(), run_form).await?
}
};
self.run = Some(run);
Ok(())
}
pub fn detach_run(&mut self) {
self.run = None;
}
pub fn run(&self) -> Option<&Run> {
self.run.as_ref()
}
pub async fn send<M>(&mut self, message: M) -> Result<()>
where
M: Encodeable<Encoder = E, Output = <T as Transport>::Message> + Send + Sync,
{
let encoded = message
.encode(&mut self.encoder, &self.sift_stream_id, self.run.as_ref())
.ok_or(Error::new_msg(
ErrorKind::EncodeMessageError,
"Failed to encode message",
))?;
self.transport.send(&self.sift_stream_id, encoded)
}
pub async fn send_requests<I>(&mut self, requests: I) -> Result<()>
where
I: IntoIterator<Item = <T as Transport>::Message> + Send,
I::IntoIter: Send,
{
self.transport.send_requests(&self.sift_stream_id, requests)
}
pub fn send_requests_nonblocking<I>(&mut self, requests: I) -> Result<()>
where
I: IntoIterator<Item = <T as Transport>::Message> + Send,
I::IntoIter: Send,
{
self.transport.send_requests(&self.sift_stream_id, requests)
}
pub async fn finish(self) -> Result<()> {
self.transport.finish(&self.sift_stream_id).await
}
}
impl<E, T> std::ops::Deref for SiftStream<E, T>
where
E: Encoder + MetricsSnapshot,
T: Transport<Encoder = E>,
{
type Target = E;
fn deref(&self) -> &Self::Target {
&self.encoder
}
}
impl<E, T> std::ops::DerefMut for SiftStream<E, T>
where
E: Encoder + MetricsSnapshot,
T: Transport<Encoder = E>,
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.encoder
}
}
mod private {
pub trait Sealed {}
}