1use std::collections::HashMap;
2use std::hash::Hash;
3
4use sift_error::prelude::*;
5use sift_rs::common::r#type::v1::ChannelDataType;
6use sift_rs::ingest::v1::IngestWithConfigDataChannelValue;
7use sift_rs::ingest::v1::{
8 IngestWithConfigDataStreamRequest, ingest_with_config_data_channel_value::Type,
9};
10use sift_rs::ingestion_configs::v2::FlowConfig;
11use sift_rs::wrappers::ingestion_configs::{
12 IngestionConfigServiceWrapper, new_ingestion_config_service,
13};
14use sift_rs::{RetryConfig, RetryExt, SiftChannel};
15use tokio::task::JoinError;
16
17use crate::{TimeValue, Value};
18
19#[derive(Copy, Clone, PartialEq, Eq, Hash)]
27pub struct ChannelIndex(usize);
28
29#[cfg(test)]
30impl ChannelIndex {
31 pub(crate) fn as_usize(self) -> usize {
32 self.0
33 }
34}
35
36#[derive(Clone)]
62pub struct FlowDescriptor<K> {
63 name: String,
65
66 ingestion_config_id: String,
68
69 field_types: Vec<ChannelDataType>,
72
73 index_map: HashMap<K, ChannelIndex>,
78}
79
80impl<K> FlowDescriptor<K>
81where
82 K: Eq + Hash,
83{
84 fn new(ingestion_config_id: impl Into<String>, name: impl Into<String>) -> Self {
86 Self {
87 ingestion_config_id: ingestion_config_id.into(),
88 name: name.into(),
89 field_types: Vec::new(),
90 index_map: HashMap::new(),
91 }
92 }
93
94 pub fn get<Q>(&self, key: &Q) -> Option<ChannelDataType>
96 where
97 K: core::borrow::Borrow<Q>,
98 Q: Eq + Hash + ?Sized,
99 {
100 let index = self.index_map.get(key)?.0;
101 Some(self.field_types[index])
102 }
103
104 pub fn mapping(&self) -> &HashMap<K, ChannelIndex> {
106 &self.index_map
107 }
108}
109
110pub struct FlowDescriptorBuilder<K> {
116 flow_descriptor: FlowDescriptor<K>,
117}
118
119impl<K> FlowDescriptorBuilder<K>
120where
121 K: Eq + Hash,
122{
123 pub fn new(ingestion_config_id: impl Into<String>, name: impl Into<String>) -> Self {
125 Self {
126 flow_descriptor: FlowDescriptor::new(ingestion_config_id, name),
127 }
128 }
129
130 pub fn add(&mut self, key: K, field_type: ChannelDataType) -> ChannelIndex {
135 let index = self.flow_descriptor.field_types.len();
136 self.flow_descriptor.field_types.push(field_type);
137
138 self.flow_descriptor
139 .index_map
140 .insert(key, ChannelIndex(index));
141
142 ChannelIndex(index)
143 }
144
145 pub fn build(self) -> FlowDescriptor<K> {
147 self.flow_descriptor
148 }
149}
150
151impl<S> TryFrom<(S, &'_ FlowConfig)> for FlowDescriptor<String>
152where
153 S: ToString,
154{
155 type Error = Error;
156
157 fn try_from((ingestion_config_id, flow_config): (S, &'_ FlowConfig)) -> Result<Self> {
158 let mut builder =
159 FlowDescriptorBuilder::new(ingestion_config_id.to_string(), flow_config.name.clone());
160 for channel in flow_config.channels.iter() {
161 let data_type = ChannelDataType::try_from(channel.data_type).map_err(|_| {
162 Error::new_msg(
163 ErrorKind::ArgumentValidationError,
164 format!(
165 "invalid data type {:?} for channel {}",
166 channel.data_type, channel.name
167 ),
168 )
169 })?;
170
171 builder.add(channel.name.clone(), data_type);
172 }
173 Ok(builder.build())
174 }
175}
176
177impl<S> TryFrom<(S, FlowConfig)> for FlowDescriptor<String>
178where
179 S: ToString,
180{
181 type Error = Error;
182
183 fn try_from((ingestion_config_id, flow_config): (S, FlowConfig)) -> Result<Self> {
184 let mut builder =
185 FlowDescriptorBuilder::new(ingestion_config_id.to_string(), flow_config.name);
186 for channel in flow_config.channels {
187 let data_type = ChannelDataType::try_from(channel.data_type).map_err(|_| {
188 Error::new_msg(
189 ErrorKind::ArgumentValidationError,
190 format!(
191 "invalid data type {:?} for channel {}",
192 channel.data_type, channel.name
193 ),
194 )
195 })?;
196
197 builder.add(channel.name, data_type);
198 }
199 Ok(builder.build())
200 }
201}
202
203pub struct FlowBuilder<'a, K> {
210 flow_descriptor: &'a FlowDescriptor<K>,
212
213 values: Vec<IngestWithConfigDataChannelValue>,
216
217 run_id: String,
219}
220
221impl<K> FlowBuilder<'_, K> {
222 pub fn request(self, now: TimeValue) -> IngestWithConfigDataStreamRequest {
224 IngestWithConfigDataStreamRequest {
225 ingestion_config_id: self.flow_descriptor.ingestion_config_id.clone(),
226 flow: self.flow_descriptor.name.clone(),
227 timestamp: Some(now.0),
228 channel_values: self.values,
229 run_id: self.run_id,
230 ..Default::default()
231 }
232 }
233}
234
235impl<'a, K> FlowBuilder<'a, K>
236where
237 K: Eq + Hash,
238{
239 pub fn new(flow_descriptor: &'a FlowDescriptor<K>) -> Self {
241 let values = vec![
242 IngestWithConfigDataChannelValue {
243 r#type: Some(Type::Empty(pbjson_types::Empty {}))
244 };
245 flow_descriptor.field_types.len()
246 ];
247 Self {
248 flow_descriptor,
249 values,
250 run_id: String::new(),
251 }
252 }
253
254 pub fn attach_run_id(&mut self, run_id: impl Into<String>) {
256 self.run_id = run_id.into();
257 }
258
259 pub fn set<V>(&mut self, index: ChannelIndex, value: V) -> Result<()>
261 where
262 V: Into<Value>,
263 {
264 let value = value.into();
265 let pb_value = value.pb_value();
266
267 if !matches!(value, Value::Empty) {
269 let pb_data_type = value.pb_data_type();
270
271 let expected_data_type = self.flow_descriptor.field_types[index.0];
274
275 if expected_data_type != pb_data_type {
277 return Err(Error::new_msg(
278 ErrorKind::ArgumentValidationError,
279 format!(
280 "value has incorrect data type, expected {expected_data_type:?}, got {pb_data_type:?}"
281 ),
282 ));
283 }
284 }
285
286 self.values[index.0].r#type = Some(pb_value);
288
289 Ok(())
290 }
291
292 pub fn set_with_key<Q, V>(&mut self, key: &Q, value: V) -> Result<()>
294 where
295 K: core::borrow::Borrow<Q>,
296 Q: Eq + Hash + ?Sized,
297 V: Into<Value>,
298 {
299 let Some(index) = self.flow_descriptor.index_map.get(key) else {
301 return Err(Error::new_msg(
302 ErrorKind::NotFoundError,
303 "provided key was not found in flow descriptor",
304 ));
305 };
306
307 self.set(*index, value)
308 }
309}
310
311#[cfg(test)]
312mod test;
313
314pub(crate) fn validate_flows(
319 user_specified: &[FlowConfig],
320 sift_flows: &[FlowConfig],
321) -> Result<()> {
322 for user_flow in user_specified {
323 let num_matches_by_name = sift_flows
324 .iter()
325 .filter(|f| user_flow.name == f.name)
326 .count();
327 let num_exact_matches = sift_flows.iter().filter(|f| &user_flow == f).count();
328
329 if num_matches_by_name > 0 && num_exact_matches == 0 {
330 return Err(Error::new_msg(ErrorKind::IncompatibleIngestionConfigChange, "incompatible change to ingestion config"))
331 .with_context(|| format!("flow(s) with name '{}' exist but their channel configs do not match what the user specified", user_flow.name))
332 .help("Did you modify an existing flow? Try updating the the flow's name or the 'client_key' of `sift_stream::IngestionConfigForm`");
333 } else if num_exact_matches == 0 {
334 return Err(Error::new_msg(ErrorKind::IncompatibleIngestionConfigChange, "incompatible change to ingestion config"))
335 .with_context(|| format!("flow(s) with name '{}' not found in Sift", user_flow.name))
336 .help("try creating a new ingestion config by providing a new 'client_key' to `sift_stream::IngestionConfigForm` and notify Sift");
337 }
338 }
339 Ok(())
340}
341
342pub(crate) async fn add_new_flows(
350 grpc_channel: SiftChannel,
351 ingestion_config_id: &str,
352 flow_configs: Vec<FlowConfig>,
353) -> Vec<std::result::Result<Result<()>, JoinError>> {
354 #[cfg(feature = "tracing")]
355 tracing::info!(
356 ingestion_config_id =? ingestion_config_id,
357 new_flows = flow_configs
358 .iter()
359 .map(|f| f.name.as_str())
360 .collect::<Vec<&str>>()
361 .join(","),
362 "adding new flows to ingestion config"
363 );
364
365 let mut calls = Vec::with_capacity(flow_configs.len());
366
367 for flow_config in flow_configs {
368 let channel = grpc_channel.clone();
369 let config_id = ingestion_config_id.to_string();
370
371 calls.push(tokio::spawn(async move {
372 let wrapper = new_ingestion_config_service(channel);
373 let retrying = wrapper.retrying(RetryConfig::default());
374 retrying
375 .call(|mut w| {
376 let config_id = config_id.clone();
377 let flow_config = flow_config.clone();
378 async move { w.try_create_flows(&config_id, vec![flow_config]).await }
379 })
380 .await
381 .context("SiftStream::add_new_flows")
382 }));
383 }
384
385 futures::future::join_all(calls).await
386}