use super::{
RetryPolicy, SiftStream, helpers,
mode::{file_backup::FileBackup, ingestion_config::LiveStreaming},
run::{load_run_by_form, load_run_by_id},
};
use std::collections::HashMap;
mod config_loader;
use crate::{
FlowDescriptor,
backup::{disk::DiskBackupPolicy, sanitize_name},
metrics::SiftStreamMetrics,
stream::{
mode::ingestion_config::IngestionConfigEncoder,
tasks::{CONTROL_CHANNEL_CAPACITY, DATA_CHANNEL_CAPACITY, RecoveryConfig, TaskConfig},
},
};
use config_loader::load_ingestion_config;
use sift_connect::{Credentials, SiftChannel, SiftChannelBuilder};
use sift_error::prelude::*;
use sift_rs::{
ingestion_configs::v2::{FlowConfig, IngestionConfig},
metadata::v1::MetadataValue,
ping::v1::{PingRequest, ping_service_client::PingServiceClient},
runs::v2::Run,
};
use std::{sync::Arc, time::Duration};
use uuid::Uuid;
pub const DEFAULT_CHECKPOINT_INTERVAL: Duration = Duration::from_secs(60);
pub const DEFAULT_METRICS_STREAMING_INTERVAL: Duration = Duration::from_millis(500);
pub struct SiftStreamBuilder {
credentials: Option<Credentials>,
channel: Option<SiftChannel>,
recovery_strategy: Option<RecoveryStrategy>,
checkpoint_interval: Duration,
ingestion_config: Option<IngestionConfigForm>,
enable_tls: bool,
asset_tags: Option<Vec<String>>,
asset_metadata: Option<Vec<MetadataValue>>,
control_channel_capacity: usize,
ingestion_data_channel_capacity: usize,
backup_data_channel_capacity: usize,
enable_compression_for_ingestion: bool,
metrics_streaming_interval: Option<Duration>,
run: Option<RunForm>,
run_id: Option<String>,
}
#[derive(Debug, Clone)]
pub enum RecoveryStrategy {
RetryOnly(RetryPolicy),
RetryWithBackups {
retry_policy: RetryPolicy,
disk_backup_policy: DiskBackupPolicy,
},
}
#[derive(Debug, Clone)]
pub struct IngestionConfigForm {
pub asset_name: String,
pub client_key: String,
pub flows: Vec<FlowConfig>,
}
#[derive(Debug, Clone, Default)]
pub struct RunForm {
pub name: String,
pub client_key: String,
pub description: Option<String>,
pub tags: Option<Vec<String>>,
pub metadata: Option<Vec<MetadataValue>>,
}
impl Default for RecoveryStrategy {
fn default() -> Self {
RecoveryStrategy::RetryOnly(RetryPolicy::default())
}
}
impl RecoveryStrategy {
pub fn default_retry_policy_with_backups() -> Self {
Self::RetryWithBackups {
retry_policy: RetryPolicy::default(),
disk_backup_policy: DiskBackupPolicy::default(),
}
}
}
#[derive(Clone)]
struct CommonSetup {
setup_channel: SiftChannel,
ingestion_channel: SiftChannel,
reingestion_channel: SiftChannel,
ingestion_config: IngestionConfig,
flows_by_name: HashMap<String, FlowDescriptor<String>>,
asset_name: String,
run: Option<Run>,
metrics: Arc<SiftStreamMetrics>,
session_name: String,
sift_stream_id: Uuid,
}
impl SiftStreamBuilder {
pub fn new(credentials: Credentials) -> Self {
SiftStreamBuilder {
credentials: Some(credentials),
channel: None,
enable_tls: true,
enable_compression_for_ingestion: false,
ingestion_config: None,
run: None,
run_id: None,
checkpoint_interval: DEFAULT_CHECKPOINT_INTERVAL,
recovery_strategy: None,
asset_tags: None,
asset_metadata: None,
control_channel_capacity: CONTROL_CHANNEL_CAPACITY,
ingestion_data_channel_capacity: DATA_CHANNEL_CAPACITY,
backup_data_channel_capacity: DATA_CHANNEL_CAPACITY,
metrics_streaming_interval: Some(DEFAULT_METRICS_STREAMING_INTERVAL),
}
}
pub fn from_channel(channel: SiftChannel) -> Self {
SiftStreamBuilder {
credentials: None,
channel: Some(channel),
enable_tls: true,
enable_compression_for_ingestion: false,
ingestion_config: None,
run: None,
run_id: None,
checkpoint_interval: DEFAULT_CHECKPOINT_INTERVAL,
recovery_strategy: None,
asset_tags: None,
asset_metadata: None,
control_channel_capacity: CONTROL_CHANNEL_CAPACITY,
ingestion_data_channel_capacity: DATA_CHANNEL_CAPACITY,
backup_data_channel_capacity: DATA_CHANNEL_CAPACITY,
metrics_streaming_interval: Some(DEFAULT_METRICS_STREAMING_INTERVAL),
}
}
pub fn ingestion_config(mut self, ingestion_config: IngestionConfigForm) -> Self {
self.ingestion_config = Some(ingestion_config);
self
}
pub fn checkpoint_interval(mut self, duration: Duration) -> Self {
self.checkpoint_interval = duration;
self
}
pub fn metrics_streaming_interval(mut self, interval: Option<Duration>) -> Self {
self.metrics_streaming_interval = interval;
self
}
pub fn control_channel_capacity(mut self, capacity: usize) -> Self {
self.control_channel_capacity = capacity;
self
}
pub fn ingestion_data_channel_capacity(mut self, capacity: usize) -> Self {
self.ingestion_data_channel_capacity = capacity;
self
}
pub fn backup_data_channel_capacity(mut self, capacity: usize) -> Self {
self.backup_data_channel_capacity = capacity;
self
}
pub fn recovery_strategy(mut self, strategy: RecoveryStrategy) -> Self {
self.recovery_strategy = Some(strategy);
self
}
pub fn attach_run(mut self, run: RunForm) -> Self {
self.run = Some(run);
self
}
pub fn attach_run_id(mut self, run_id: &str) -> Self {
self.run_id = Some(run_id.into());
self
}
pub fn enable_compression_for_ingestion(mut self, enable: bool) -> Self {
self.enable_compression_for_ingestion = enable;
self
}
pub fn disable_tls(mut self) -> Self {
self.enable_tls = false;
self
}
pub fn add_asset_tags(mut self, tags: Option<Vec<String>>) -> Self {
self.asset_tags = tags;
self
}
pub fn add_asset_metadata(mut self, metadata: Option<Vec<MetadataValue>>) -> Self {
self.asset_metadata = metadata;
self
}
#[allow(clippy::too_many_arguments)]
async fn setup_common(
grpc_channel: Option<SiftChannel>,
credentials: Option<Credentials>,
ingestion_config: Option<IngestionConfigForm>,
run: Option<RunForm>,
run_id: Option<String>,
asset_tags: Option<Vec<String>>,
asset_metadata: Option<Vec<MetadataValue>>,
enable_tls: bool,
) -> Result<CommonSetup> {
let Some(ingestion_config) = ingestion_config else {
return Err(Error::new_arg_error("ingestion_config is required"));
};
if grpc_channel.is_none() && credentials.is_none() {
return Err(Error::new_arg_error(
"either credentials or a gRPC channel must be provided",
));
}
let build_channel = |credentials: Credentials| -> Result<SiftChannel> {
let mut sift_channel_builder = SiftChannelBuilder::new(credentials);
if enable_tls {
sift_channel_builder = sift_channel_builder.use_tls(true);
}
sift_channel_builder.build()
};
let (setup_channel, ingestion_channel, reingestion_channel) = match grpc_channel {
Some(ch) => (ch.clone(), ch.clone(), ch),
None => {
let creds = credentials.unwrap();
let setup_channel = build_channel(creds.clone())?;
let ingestion_channel = setup_channel.clone();
let reingestion_channel = build_channel(creds)?;
(setup_channel, ingestion_channel, reingestion_channel)
}
};
for channel in [
setup_channel.clone(),
ingestion_channel.clone(),
reingestion_channel.clone(),
] {
PingServiceClient::new(channel).ping(PingRequest::default())
.await
.map_err(|e| Error::new(ErrorKind::GrpcConnectError, e))
.context("failed to connect to Sift")
.help("ensure that your API key and Sift gRPC API URL is correct and TLS is configured properly")?;
}
let (ingestion_config, flows, asset) =
load_ingestion_config(setup_channel.clone(), ingestion_config.clone()).await?;
let run = {
if let Some(run_id) = run_id.as_ref() {
Some(load_run_by_id(setup_channel.clone(), run_id).await?)
} else if let Some(selector) = run {
Some(load_run_by_form(setup_channel.clone(), selector).await?)
} else {
None
}
};
let asset_name = asset.name.clone();
helpers::update_asset_tags_and_metadata(
asset.clone(),
asset_tags,
asset_metadata,
setup_channel.clone(),
)
.await?;
let metrics = Arc::new(SiftStreamMetrics::new());
let ingestion_config_id = ingestion_config.ingestion_config_id.clone();
let mut flows_by_name = HashMap::with_capacity(flows.len());
for flow in flows {
let flow_name = flow.name.clone();
let flow_descriptor = FlowDescriptor::try_from((&ingestion_config_id, flow))?;
flows_by_name.insert(flow_name, flow_descriptor);
}
metrics.loaded_flows.add(flows_by_name.len() as u64);
let session_name = format!("stream.{}.{}", asset_name, ingestion_config.client_key);
let sift_stream_id = Uuid::new_v4();
Ok(CommonSetup {
setup_channel,
ingestion_channel,
reingestion_channel,
ingestion_config,
flows_by_name,
asset_name,
run,
metrics,
session_name,
sift_stream_id,
})
}
pub async fn build(self) -> Result<SiftStream<IngestionConfigEncoder, LiveStreaming>> {
let SiftStreamBuilder {
checkpoint_interval,
channel: grpc_channel,
credentials,
enable_tls,
enable_compression_for_ingestion,
ingestion_config,
recovery_strategy,
run,
run_id,
asset_tags,
asset_metadata,
..
} = self;
let CommonSetup {
setup_channel,
ingestion_channel,
reingestion_channel,
ingestion_config,
flows_by_name,
asset_name,
run,
metrics,
session_name,
sift_stream_id,
} = Self::setup_common(
grpc_channel,
credentials,
ingestion_config,
run,
run_id,
asset_tags,
asset_metadata,
enable_tls,
)
.await?;
let recovery_config = match recovery_strategy {
Some(RecoveryStrategy::RetryOnly(retry_policy)) => RecoveryConfig {
retry_policy: retry_policy.clone(),
backups_enabled: false,
backups_directory: String::new(),
backups_prefix: String::new(),
backup_policy: DiskBackupPolicy::default(),
},
Some(RecoveryStrategy::RetryWithBackups {
retry_policy,
disk_backup_policy,
}) => {
let mut dir_name = sanitize_name(&asset_name);
if let Some(run) = run.as_ref() {
dir_name.push_str(&format!("/{}", sanitize_name(&run.name)));
}
RecoveryConfig {
retry_policy: retry_policy.clone(),
backups_enabled: true,
backups_directory: dir_name,
backups_prefix: ingestion_config.client_key.clone(),
backup_policy: disk_backup_policy.clone(),
}
}
None => RecoveryConfig {
retry_policy: RetryPolicy::default(),
backups_enabled: false,
backups_directory: String::new(),
backups_prefix: String::new(),
backup_policy: DiskBackupPolicy::default(),
},
};
let task_config = TaskConfig {
session_name,
sift_stream_id,
ingestion_channel,
reingestion_channel,
setup_channel,
metrics: metrics.clone(),
checkpoint_interval,
enable_compression_for_ingestion,
recovery_config,
control_channel_capacity: self.control_channel_capacity,
ingestion_data_channel_capacity: self.ingestion_data_channel_capacity,
backup_data_channel_capacity: self.backup_data_channel_capacity,
metrics_streaming_interval: self.metrics_streaming_interval,
};
SiftStream::new(ingestion_config, flows_by_name, run, task_config, metrics).await
}
pub async fn build_file_backup(self) -> Result<SiftStream<IngestionConfigEncoder, FileBackup>> {
let SiftStreamBuilder {
channel: grpc_channel,
credentials,
enable_tls,
ingestion_config,
run,
run_id,
asset_tags,
asset_metadata,
recovery_strategy,
backup_data_channel_capacity,
..
} = self;
let CommonSetup {
setup_channel,
ingestion_config,
flows_by_name,
run,
metrics,
asset_name,
session_name,
sift_stream_id,
..
} = Self::setup_common(
grpc_channel,
credentials,
ingestion_config,
run,
run_id,
asset_tags,
asset_metadata,
enable_tls,
)
.await?;
let Some(RecoveryStrategy::RetryWithBackups {
disk_backup_policy, ..
}) = recovery_strategy
else {
return Err(Error::new_arg_error(
"recovery_strategy must be RetryWithBackups to build in file backup mode",
));
};
let mut output_directory = sanitize_name(&asset_name);
if let Some(run) = run.as_ref() {
output_directory.push_str(&format!("/{}", sanitize_name(&run.name)));
}
SiftStream::new_file_backup(
setup_channel,
ingestion_config,
flows_by_name,
run,
disk_backup_policy.backups_dir.unwrap().to_path_buf(),
output_directory.into(),
disk_backup_policy.max_backup_file_size,
backup_data_channel_capacity,
self.control_channel_capacity,
self.metrics_streaming_interval,
session_name,
sift_stream_id,
metrics,
)
.await
}
}
impl From<Credentials> for SiftStreamBuilder {
fn from(value: Credentials) -> Self {
Self::new(value)
}
}
impl From<SiftChannel> for SiftStreamBuilder {
fn from(value: SiftChannel) -> Self {
Self::from_channel(value)
}
}