use crate::stream::flow::{FlowBuilder, FlowDescriptor};
use crate::stream::mode::ingestion_config::Flow;
use sift_connect::SiftChannel;
use sift_error::prelude::*;
use sift_rs::runs::v2::Run;
use sift_rs::{
assets::v1::Asset,
ingest::v1::{IngestWithConfigDataChannelValue, IngestWithConfigDataStreamRequest},
metadata::v1::MetadataValue,
retry::{RetryConfig, RetryExt},
wrappers::assets::{AssetServiceWrapper, new_asset_service},
};
pub(crate) fn message_to_ingest_req(
message: &Flow,
run: Option<&Run>,
descriptor: &FlowDescriptor<String>,
) -> Option<IngestWithConfigDataStreamRequest> {
let mut builder = FlowBuilder::new(descriptor);
for value in message.values.iter() {
builder
.set_with_key(&value.name, value.value.clone())
.ok()?;
}
if let Some(run) = run {
builder.attach_run_id(run.run_id.clone());
}
Some(builder.request(message.timestamp.clone()))
}
pub(crate) fn message_to_ingest_req_direct(
message: &Flow,
ingestion_config_id: &str,
run: Option<&Run>,
) -> IngestWithConfigDataStreamRequest {
let channel_values = message
.values
.iter()
.map(|val| IngestWithConfigDataChannelValue {
r#type: Some(val.pb_value()),
})
.collect::<Vec<_>>();
IngestWithConfigDataStreamRequest {
channel_values,
flow: message.flow_name.to_string(),
ingestion_config_id: ingestion_config_id.to_string(),
timestamp: Some(message.timestamp.0),
run_id: run.map(|r| r.run_id.clone()).unwrap_or_default(),
..Default::default()
}
}
pub(crate) async fn update_asset_tags_and_metadata(
mut asset: Asset,
asset_tags: Option<Vec<String>>,
asset_metadata: Option<Vec<MetadataValue>>,
channel: SiftChannel,
) -> Result<()> {
let mut update_mask = Vec::new();
if let Some(asset_tags) = asset_tags {
asset.tags = asset_tags;
update_mask.push("tags".to_string());
}
if let Some(asset_metadata) = asset_metadata {
asset.metadata = asset_metadata;
update_mask.push("metadata".to_string());
}
if update_mask.is_empty() {
return Ok(());
}
let asset_service_wrapper = new_asset_service(channel);
let retrying_asset = asset_service_wrapper.retrying(RetryConfig::default());
let asset_clone = asset.clone();
let update_mask_clone = update_mask.clone();
let _ = retrying_asset
.call(|mut w| {
let asset = asset_clone.clone();
let update_mask = update_mask_clone.clone();
async move { w.try_update_asset(asset, update_mask).await }
})
.await?;
Ok(())
}