use super::{
RetryPolicy, SiftStream, helpers,
mode::{
file_backup::FileBackup, live_only::LiveStreamingOnly,
live_with_backups::LiveStreamingWithBackups,
},
run::{load_run_by_form, load_run_by_id},
};
use std::collections::HashMap;
mod config_loader;
use crate::{
FlowDescriptor,
backup::{disk::DiskBackupPolicy, sanitize_name},
logging::{LogEvent, LogLevel},
metrics::SiftStreamMetrics,
stream::{
mode::ingestion_config::IngestionConfigEncoder,
tasks::{
CONTROL_CHANNEL_CAPACITY, DATA_CHANNEL_CAPACITY, LiveOnlyTaskConfig,
LiveWithBackupsTaskConfig, RecoveryConfig,
},
},
};
use config_loader::load_ingestion_config;
use sift_connect::{Credentials, SiftChannel, SiftChannelBuilder};
use sift_error::prelude::*;
use sift_rs::{
ingestion_configs::v2::{FlowConfig, IngestionConfig},
metadata::v1::MetadataValue,
ping::v1::{PingRequest, ping_service_client::PingServiceClient},
runs::v2::Run,
};
use std::{sync::Arc, time::Duration};
use uuid::Uuid;
const LOG_CHANNEL_CAPACITY: usize = 512;
pub const DEFAULT_CHECKPOINT_INTERVAL: Duration = Duration::from_secs(60);
pub const DEFAULT_METRICS_STREAMING_INTERVAL: Duration = Duration::from_millis(500);
#[derive(Debug, Clone)]
pub struct IngestionConfigForm {
pub asset_name: String,
pub client_key: String,
pub flows: Vec<FlowConfig>,
}
#[derive(Debug, Clone, Default)]
pub struct RunForm {
pub name: String,
pub client_key: String,
pub description: Option<String>,
pub tags: Option<Vec<String>>,
pub metadata: Option<Vec<MetadataValue>>,
}
pub struct SiftStreamBuilder {
credentials: Option<Credentials>,
channel: Option<SiftChannel>,
enable_tls: bool,
}
impl SiftStreamBuilder {
pub fn new(credentials: Credentials) -> Self {
SiftStreamBuilder {
credentials: Some(credentials),
channel: None,
enable_tls: true,
}
}
pub fn from_channel(channel: SiftChannel) -> Self {
SiftStreamBuilder {
credentials: None,
channel: Some(channel),
enable_tls: true,
}
}
pub fn disable_tls(mut self) -> Self {
self.enable_tls = false;
self
}
pub fn ingestion_config(self, form: IngestionConfigForm) -> StreamConfigBuilder {
StreamConfigBuilder {
credentials: self.credentials,
channel: self.channel,
enable_tls: self.enable_tls,
ingestion_config: form,
asset_tags: None,
asset_metadata: None,
run: None,
run_id: None,
log_level_filter: LogLevel::default(),
#[cfg(feature = "tracing")]
scoped_dispatch_base: None,
disable_scoped_dispatch: false,
}
}
}
pub struct StreamConfigBuilder {
pub(crate) credentials: Option<Credentials>,
pub(crate) channel: Option<SiftChannel>,
pub(crate) enable_tls: bool,
pub(crate) ingestion_config: IngestionConfigForm,
pub(crate) asset_tags: Option<Vec<String>>,
pub(crate) asset_metadata: Option<Vec<MetadataValue>>,
pub(crate) run: Option<RunForm>,
pub(crate) run_id: Option<String>,
pub(crate) log_level_filter: LogLevel,
#[cfg(feature = "tracing")]
pub(crate) scoped_dispatch_base: Option<tracing::Dispatch>,
pub(crate) disable_scoped_dispatch: bool,
}
impl StreamConfigBuilder {
pub fn attach_run(mut self, run: RunForm) -> Self {
self.run = Some(run);
self
}
pub fn attach_run_id(mut self, run_id: &str) -> Self {
self.run_id = Some(run_id.into());
self
}
pub fn add_asset_tags(mut self, tags: Vec<String>) -> Self {
self.asset_tags = Some(tags);
self
}
pub fn add_asset_metadata(mut self, metadata: Vec<MetadataValue>) -> Self {
self.asset_metadata = Some(metadata);
self
}
pub fn log_level_filter(mut self, level: LogLevel) -> Self {
self.log_level_filter = level;
self
}
#[cfg(feature = "tracing")]
pub fn with_scoped_dispatch_base(mut self, dispatch: tracing::Dispatch) -> Self {
self.scoped_dispatch_base = Some(dispatch);
self
}
pub(crate) fn disable_scoped_dispatch(mut self) -> Self {
self.disable_scoped_dispatch = true;
self
}
pub fn live_only(self) -> LiveOnlyBuilder {
LiveOnlyBuilder {
base: self,
enable_compression_for_ingestion: false,
metrics_streaming_interval: Some(DEFAULT_METRICS_STREAMING_INTERVAL),
ingestion_data_channel_capacity: DATA_CHANNEL_CAPACITY,
control_channel_capacity: CONTROL_CHANNEL_CAPACITY,
retry_policy: RetryPolicy::default(),
}
}
pub fn live_with_backups(self) -> LiveWithBackupsBuilder {
LiveWithBackupsBuilder {
base: self,
checkpoint_interval: DEFAULT_CHECKPOINT_INTERVAL,
retry_policy: RetryPolicy::default(),
disk_backup_policy: DiskBackupPolicy::default(),
enable_compression_for_ingestion: false,
metrics_streaming_interval: Some(DEFAULT_METRICS_STREAMING_INTERVAL),
ingestion_data_channel_capacity: DATA_CHANNEL_CAPACITY,
backup_data_channel_capacity: DATA_CHANNEL_CAPACITY,
control_channel_capacity: CONTROL_CHANNEL_CAPACITY,
}
}
pub fn file_backup(self) -> FileBackupBuilder {
FileBackupBuilder {
base: self,
disk_backup_policy: DiskBackupPolicy::default(),
backup_data_channel_capacity: DATA_CHANNEL_CAPACITY,
control_channel_capacity: CONTROL_CHANNEL_CAPACITY,
metrics_streaming_interval: Some(DEFAULT_METRICS_STREAMING_INTERVAL),
}
}
}
pub struct LiveOnlyBuilder {
base: StreamConfigBuilder,
enable_compression_for_ingestion: bool,
metrics_streaming_interval: Option<Duration>,
ingestion_data_channel_capacity: usize,
control_channel_capacity: usize,
retry_policy: RetryPolicy,
}
impl LiveOnlyBuilder {
pub fn enable_compression_for_ingestion(mut self, enable: bool) -> Self {
self.enable_compression_for_ingestion = enable;
self
}
pub fn metrics_streaming_interval(mut self, interval: Option<Duration>) -> Self {
self.metrics_streaming_interval = interval;
self
}
pub fn ingestion_data_channel_capacity(mut self, capacity: usize) -> Self {
self.ingestion_data_channel_capacity = capacity;
self
}
pub fn control_channel_capacity(mut self, capacity: usize) -> Self {
self.control_channel_capacity = capacity;
self
}
pub fn retry_policy(mut self, policy: RetryPolicy) -> Self {
self.retry_policy = policy;
self
}
pub async fn build(self) -> Result<SiftStream<IngestionConfigEncoder, LiveStreamingOnly>> {
let setup = setup_common(self.base).await?;
let task_config = LiveOnlyTaskConfig {
session_name: setup.session_name,
sift_stream_id: setup.sift_stream_id,
setup_channel: setup.setup_channel,
ingestion_channel: setup.ingestion_channel,
metrics: setup.metrics.clone(),
enable_compression_for_ingestion: self.enable_compression_for_ingestion,
ingestion_data_channel_capacity: self.ingestion_data_channel_capacity,
control_channel_capacity: self.control_channel_capacity,
metrics_streaming_interval: self.metrics_streaming_interval,
retry_policy: self.retry_policy,
#[cfg(feature = "tracing")]
scoped_dispatch: setup.scoped_dispatch,
log_rx: setup.log_rx,
};
SiftStream::new_live_only(
setup.ingestion_config,
setup.flows_by_name,
setup.run,
task_config,
setup.metrics,
)
.await
}
}
pub struct LiveWithBackupsBuilder {
base: StreamConfigBuilder,
checkpoint_interval: Duration,
retry_policy: RetryPolicy,
disk_backup_policy: DiskBackupPolicy,
enable_compression_for_ingestion: bool,
metrics_streaming_interval: Option<Duration>,
ingestion_data_channel_capacity: usize,
backup_data_channel_capacity: usize,
control_channel_capacity: usize,
}
impl LiveWithBackupsBuilder {
pub fn checkpoint_interval(mut self, duration: Duration) -> Self {
self.checkpoint_interval = duration;
self
}
pub fn retry_policy(mut self, policy: RetryPolicy) -> Self {
self.retry_policy = policy;
self
}
pub fn disk_backup_policy(mut self, policy: DiskBackupPolicy) -> Self {
self.disk_backup_policy = policy;
self
}
pub fn enable_compression_for_ingestion(mut self, enable: bool) -> Self {
self.enable_compression_for_ingestion = enable;
self
}
pub fn metrics_streaming_interval(mut self, interval: Option<Duration>) -> Self {
self.metrics_streaming_interval = interval;
self
}
pub fn ingestion_data_channel_capacity(mut self, capacity: usize) -> Self {
self.ingestion_data_channel_capacity = capacity;
self
}
pub fn backup_data_channel_capacity(mut self, capacity: usize) -> Self {
self.backup_data_channel_capacity = capacity;
self
}
pub fn control_channel_capacity(mut self, capacity: usize) -> Self {
self.control_channel_capacity = capacity;
self
}
pub async fn build(
self,
) -> Result<SiftStream<IngestionConfigEncoder, LiveStreamingWithBackups>> {
let reingestion_channel_pre = if let Some(ref creds) = self.base.credentials {
let mut builder = SiftChannelBuilder::new(creds.clone());
if self.base.enable_tls {
builder = builder.use_tls(true);
}
Some(builder.build()?)
} else {
None
};
let setup = setup_common(self.base).await?;
let reingestion_channel =
reingestion_channel_pre.unwrap_or_else(|| setup.setup_channel.clone());
let (backups_directory, backups_prefix) = {
let mut dir_name = sanitize_name(&setup.asset_name);
if let Some(run) = setup.run.as_ref() {
dir_name.push_str(&format!("/{}", sanitize_name(&run.name)));
}
(dir_name, setup.ingestion_config.client_key.clone())
};
let recovery_config = RecoveryConfig {
backups_directory,
backups_prefix,
backup_policy: self.disk_backup_policy,
};
let task_config = LiveWithBackupsTaskConfig {
session_name: setup.session_name,
sift_stream_id: setup.sift_stream_id,
setup_channel: setup.setup_channel,
ingestion_channel: setup.ingestion_channel,
reingestion_channel,
metrics: setup.metrics.clone(),
checkpoint_interval: self.checkpoint_interval,
enable_compression_for_ingestion: self.enable_compression_for_ingestion,
retry_policy: self.retry_policy,
recovery_config,
control_channel_capacity: self.control_channel_capacity,
ingestion_data_channel_capacity: self.ingestion_data_channel_capacity,
backup_data_channel_capacity: self.backup_data_channel_capacity,
metrics_streaming_interval: self.metrics_streaming_interval,
#[cfg(feature = "tracing")]
scoped_dispatch: setup.scoped_dispatch,
log_rx: setup.log_rx,
};
SiftStream::new_live_with_backups(
setup.ingestion_config,
setup.flows_by_name,
setup.run,
task_config,
setup.metrics,
)
.await
}
}
pub struct FileBackupBuilder {
base: StreamConfigBuilder,
disk_backup_policy: DiskBackupPolicy,
backup_data_channel_capacity: usize,
control_channel_capacity: usize,
metrics_streaming_interval: Option<Duration>,
}
impl FileBackupBuilder {
pub fn disk_backup_policy(mut self, policy: DiskBackupPolicy) -> Self {
self.disk_backup_policy = policy;
self
}
pub fn backup_data_channel_capacity(mut self, capacity: usize) -> Self {
self.backup_data_channel_capacity = capacity;
self
}
pub fn control_channel_capacity(mut self, capacity: usize) -> Self {
self.control_channel_capacity = capacity;
self
}
pub fn metrics_streaming_interval(mut self, interval: Option<Duration>) -> Self {
self.metrics_streaming_interval = interval;
self
}
pub async fn build(self) -> Result<SiftStream<IngestionConfigEncoder, FileBackup>> {
let Some(backups_dir) = self.disk_backup_policy.backups_dir.clone() else {
return Err(Error::new_arg_error(
"disk_backup_policy.backups_dir must be set to use file backup mode",
));
};
let setup = setup_common(self.base).await?;
let mut output_directory = sanitize_name(&setup.asset_name);
if let Some(run) = setup.run.as_ref() {
output_directory.push_str(&format!("/{}", sanitize_name(&run.name)));
}
SiftStream::new_file_backup(
setup.setup_channel,
setup.ingestion_config,
setup.flows_by_name,
setup.run,
backups_dir.to_path_buf(),
output_directory.into(),
self.disk_backup_policy.max_backup_file_size,
self.backup_data_channel_capacity,
self.control_channel_capacity,
self.metrics_streaming_interval,
setup.session_name,
setup.sift_stream_id,
setup.metrics,
)
.await
}
}
#[derive(Clone)]
struct CommonSetup {
setup_channel: SiftChannel,
ingestion_channel: SiftChannel,
ingestion_config: IngestionConfig,
flows_by_name: HashMap<String, FlowDescriptor<String>>,
asset_name: String,
run: Option<Run>,
metrics: Arc<SiftStreamMetrics>,
session_name: String,
sift_stream_id: Uuid,
#[cfg(feature = "tracing")]
scoped_dispatch: Option<tracing::Dispatch>,
log_rx: Option<async_channel::Receiver<LogEvent>>,
}
async fn setup_common(base: StreamConfigBuilder) -> Result<CommonSetup> {
if base.channel.is_none() && base.credentials.is_none() {
return Err(Error::new_arg_error(
"either credentials or a gRPC channel must be provided",
));
}
let build_channel = |credentials: Credentials| -> Result<SiftChannel> {
let mut sift_channel_builder = SiftChannelBuilder::new(credentials);
if base.enable_tls {
sift_channel_builder = sift_channel_builder.use_tls(true);
}
sift_channel_builder.build()
};
let (setup_channel, ingestion_channel) = match base.channel {
Some(ch) => (ch.clone(), ch),
None => {
let creds = base.credentials.unwrap();
let setup_channel = build_channel(creds)?;
let ingestion_channel = setup_channel.clone();
(setup_channel, ingestion_channel)
}
};
for channel in [setup_channel.clone(), ingestion_channel.clone()] {
PingServiceClient::new(channel)
.ping(PingRequest::default())
.await
.map_err(|e| Error::new(ErrorKind::GrpcConnectError, e))
.context("failed to connect to Sift")
.help("ensure that your API key and Sift gRPC API URL is correct and TLS is configured properly")?;
}
let (ingestion_config, flows, asset) =
load_ingestion_config(setup_channel.clone(), base.ingestion_config).await?;
let run = {
if let Some(run_id) = base.run_id.as_ref() {
Some(load_run_by_id(setup_channel.clone(), run_id).await?)
} else if let Some(selector) = base.run {
Some(load_run_by_form(setup_channel.clone(), selector).await?)
} else {
None
}
};
let asset_name = asset.name.clone();
helpers::update_asset_tags_and_metadata(
asset.clone(),
base.asset_tags,
base.asset_metadata,
setup_channel.clone(),
)
.await?;
let metrics = Arc::new(SiftStreamMetrics::new());
let ingestion_config_id = ingestion_config.ingestion_config_id.clone();
let mut flows_by_name = HashMap::with_capacity(flows.len());
for flow in flows {
let flow_name = flow.name.clone();
let flow_descriptor = FlowDescriptor::try_from((&ingestion_config_id, flow))?;
flows_by_name.insert(flow_name, flow_descriptor);
}
metrics.loaded_flows.add(flows_by_name.len() as u64);
let session_name = format!("stream.{}.{}", asset_name, ingestion_config.client_key);
let sift_stream_id = Uuid::new_v4();
#[cfg(feature = "tracing")]
let (scoped_dispatch, log_rx) = {
use tracing_subscriber::layer::SubscriberExt;
if base.disable_scoped_dispatch {
(None, None)
} else {
let (log_tx, log_rx) = async_channel::bounded::<LogEvent>(LOG_CHANNEL_CAPACITY);
let base_dispatch = base
.scoped_dispatch_base
.unwrap_or_else(|| tracing::dispatcher::get_default(|d| d.clone()));
let subscriber = tracing_subscriber::registry()
.with(crate::telemetry::SiftTelemetryLayer::new(
log_tx,
base.log_level_filter,
metrics.clone(),
))
.with(crate::telemetry::DispatchForwardingLayer(base_dispatch));
let dispatch = tracing::Dispatch::new(subscriber);
(Some(dispatch), Some(log_rx))
}
};
#[cfg(not(feature = "tracing"))]
let log_rx: Option<async_channel::Receiver<LogEvent>> = None;
Ok(CommonSetup {
setup_channel,
ingestion_channel,
ingestion_config,
flows_by_name,
asset_name,
run,
metrics,
session_name,
sift_stream_id,
#[cfg(feature = "tracing")]
scoped_dispatch,
log_rx,
})
}
impl From<Credentials> for SiftStreamBuilder {
fn from(value: Credentials) -> Self {
Self::new(value)
}
}
impl From<SiftChannel> for SiftStreamBuilder {
fn from(value: SiftChannel) -> Self {
Self::from_channel(value)
}
}