use std::collections::HashMap;
use std::hash::Hash;
use sift_error::prelude::*;
use sift_rs::common::r#type::v1::ChannelDataType;
use sift_rs::ingest::v1::IngestWithConfigDataChannelValue;
use sift_rs::ingest::v1::{
IngestWithConfigDataStreamRequest, ingest_with_config_data_channel_value::Type,
};
use sift_rs::ingestion_configs::v2::FlowConfig;
use crate::{TimeValue, Value};
#[derive(Copy, Clone, PartialEq, Eq, Hash)]
pub struct ChannelIndex(usize);
#[derive(Clone)]
pub struct FlowDescriptor<K> {
name: String,
ingestion_config_id: String,
field_types: Vec<ChannelDataType>,
index_map: HashMap<K, ChannelIndex>,
}
impl<K> FlowDescriptor<K>
where
K: Eq + Hash,
{
fn new(ingestion_config_id: impl Into<String>, name: impl Into<String>) -> Self {
Self {
ingestion_config_id: ingestion_config_id.into(),
name: name.into(),
field_types: Vec::new(),
index_map: HashMap::new(),
}
}
pub fn get<Q>(&self, key: &Q) -> Option<ChannelDataType>
where
K: core::borrow::Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
let index = self.index_map.get(key)?.0;
Some(self.field_types[index])
}
pub fn mapping(&self) -> &HashMap<K, ChannelIndex> {
&self.index_map
}
}
pub struct FlowDescriptorBuilder<K> {
flow_descriptor: FlowDescriptor<K>,
}
impl<K> FlowDescriptorBuilder<K>
where
K: Eq + Hash,
{
pub fn new(ingestion_config_id: impl Into<String>, name: impl Into<String>) -> Self {
Self {
flow_descriptor: FlowDescriptor::new(ingestion_config_id, name),
}
}
pub fn add(&mut self, key: K, field_type: ChannelDataType) -> ChannelIndex {
let index = self.flow_descriptor.field_types.len();
self.flow_descriptor.field_types.push(field_type);
self.flow_descriptor
.index_map
.insert(key, ChannelIndex(index));
ChannelIndex(index)
}
pub fn build(self) -> FlowDescriptor<K> {
self.flow_descriptor
}
}
impl<S> TryFrom<(S, &'_ FlowConfig)> for FlowDescriptor<String>
where
S: ToString,
{
type Error = Error;
fn try_from((ingestion_config_id, flow_config): (S, &'_ FlowConfig)) -> Result<Self> {
let mut builder =
FlowDescriptorBuilder::new(ingestion_config_id.to_string(), flow_config.name.clone());
for channel in flow_config.channels.iter() {
let data_type = ChannelDataType::try_from(channel.data_type).map_err(|_| {
Error::new_msg(
ErrorKind::ArgumentValidationError,
format!(
"invalid data type {:?} for channel {}",
channel.data_type, channel.name
),
)
})?;
builder.add(channel.name.clone(), data_type);
}
Ok(builder.build())
}
}
impl<S> TryFrom<(S, FlowConfig)> for FlowDescriptor<String>
where
S: ToString,
{
type Error = Error;
fn try_from((ingestion_config_id, flow_config): (S, FlowConfig)) -> Result<Self> {
let mut builder =
FlowDescriptorBuilder::new(ingestion_config_id.to_string(), flow_config.name);
for channel in flow_config.channels {
let data_type = ChannelDataType::try_from(channel.data_type).map_err(|_| {
Error::new_msg(
ErrorKind::ArgumentValidationError,
format!(
"invalid data type {:?} for channel {}",
channel.data_type, channel.name
),
)
})?;
builder.add(channel.name, data_type);
}
Ok(builder.build())
}
}
pub struct FlowBuilder<'a, K> {
flow_descriptor: &'a FlowDescriptor<K>,
values: Vec<IngestWithConfigDataChannelValue>,
run_id: String,
}
impl<K> FlowBuilder<'_, K> {
pub fn request(self, now: TimeValue) -> IngestWithConfigDataStreamRequest {
IngestWithConfigDataStreamRequest {
ingestion_config_id: self.flow_descriptor.ingestion_config_id.clone(),
flow: self.flow_descriptor.name.clone(),
timestamp: Some(now.0),
channel_values: self.values,
run_id: self.run_id,
..Default::default()
}
}
}
impl<'a, K> FlowBuilder<'a, K>
where
K: Eq + Hash,
{
pub fn new(flow_descriptor: &'a FlowDescriptor<K>) -> Self {
let values = vec![
IngestWithConfigDataChannelValue {
r#type: Some(Type::Empty(pbjson_types::Empty {}))
};
flow_descriptor.field_types.len()
];
Self {
flow_descriptor,
values,
run_id: String::new(),
}
}
pub fn attach_run_id(&mut self, run_id: impl Into<String>) {
self.run_id = run_id.into();
}
pub fn set<V>(&mut self, index: ChannelIndex, value: V) -> Result<()>
where
V: Into<Value>,
{
let value = value.into();
let pb_value = value.pb_value();
if !matches!(value, Value::Empty) {
let pb_data_type = value.pb_data_type();
let expected_data_type = self.flow_descriptor.field_types[index.0];
if expected_data_type != pb_data_type {
return Err(Error::new_msg(
ErrorKind::ArgumentValidationError,
format!(
"value has incorrect data type, expected {expected_data_type:?}, got {pb_data_type:?}"
),
));
}
}
self.values[index.0].r#type = Some(pb_value);
Ok(())
}
pub fn set_with_key<Q, V>(&mut self, key: &Q, value: V) -> Result<()>
where
K: core::borrow::Borrow<Q>,
Q: Eq + Hash + ?Sized,
V: Into<Value>,
{
let Some(index) = self.flow_descriptor.index_map.get(key) else {
return Err(Error::new_msg(
ErrorKind::NotFoundError,
"provided key was not found in flow descriptor",
));
};
self.set(*index, value)
}
}
#[cfg(test)]
mod test;
pub(crate) fn validate_flows(
user_specified: &[FlowConfig],
sift_flows: &[FlowConfig],
) -> Result<()> {
for user_flow in user_specified {
let num_matches_by_name = sift_flows
.iter()
.filter(|f| user_flow.name == f.name)
.count();
let num_exact_matches = sift_flows.iter().filter(|f| &user_flow == f).count();
if num_matches_by_name > 0 && num_exact_matches == 0 {
return Err(Error::new_msg(ErrorKind::IncompatibleIngestionConfigChange, "incompatible change to ingestion config"))
.with_context(|| format!("flow(s) with name '{}' exist but their channel configs do not match what the user specified", user_flow.name))
.help("Did you modify an existing flow? Try updating the the flow's name or the 'client_key' of `sift_stream::IngestionConfigForm`");
} else if num_exact_matches == 0 {
return Err(Error::new_msg(ErrorKind::IncompatibleIngestionConfigChange, "incompatible change to ingestion config"))
.with_context(|| format!("flow(s) with name '{}' not found in Sift", user_flow.name))
.help("try creating a new ingestion config by providing a new 'client_key' to `sift_stream::IngestionConfigForm` and notify Sift");
}
}
Ok(())
}