Skip to main content

sift_stream/stream/flow/
mod.rs

1use std::collections::HashMap;
2use std::hash::Hash;
3
4use sift_error::prelude::*;
5use sift_rs::common::r#type::v1::ChannelDataType;
6use sift_rs::ingest::v1::IngestWithConfigDataChannelValue;
7use sift_rs::ingest::v1::{
8    IngestWithConfigDataStreamRequest, ingest_with_config_data_channel_value::Type,
9};
10use sift_rs::ingestion_configs::v2::FlowConfig;
11use sift_rs::wrappers::ingestion_configs::{
12    IngestionConfigServiceWrapper, new_ingestion_config_service,
13};
14use sift_rs::{RetryConfig, RetryExt, SiftChannel};
15use tokio::task::JoinError;
16
17use crate::{TimeValue, Value};
18
19/// Represents the index of a channel in a flow.
20///
21/// This provides a convenient and performant way to access the value at the given channel index
22/// when building a new flow.
23///
24/// This type is only returned by the [`FlowDescriptor`] when adding a new channel to the
25/// flow ensuring that the index is safe to use.
26#[derive(Copy, Clone, PartialEq, Eq, Hash)]
27pub struct ChannelIndex(usize);
28
29#[cfg(test)]
30impl ChannelIndex {
31    pub(crate) fn as_usize(self) -> usize {
32        self.0
33    }
34}
35
36/// Describes the schema of a flow, providing a convenient, performant, and correct way to
37/// build the flow being described.
38///
39/// The descriptor itself is immutable, to ensure that the flow is constructed correctly
40/// since successful ingestion requires Sift and the client to agree on the schema of the flow.
41///
42/// While the key `K` can be arbitrary, it is recommended to use a trivial key that avoids
43/// allocations, such as a `usize` or `u32`, though for convenience, a string (the channel
44/// name) could also be used and will still help minimize additional string allocations.
45///
46/// # Example
47///
48/// ```rust
49/// use sift_stream::{FlowDescriptor, FlowDescriptorBuilder, FlowBuilder, ChannelDataType};
50///
51/// let mut flow_descriptor_builder = FlowDescriptorBuilder::new("ingestion_config_id", "my_flow_name");
52/// let my_channel_idx = flow_descriptor_builder.add("my_channel_key", ChannelDataType::String);
53/// let my_other_channel_idx = flow_descriptor_builder.add("my_other_channel_key", ChannelDataType::Uint64);
54///
55/// let flow_descriptor = flow_descriptor_builder.build();
56///
57/// let mut flow_builder = FlowBuilder::new(&flow_descriptor);
58/// flow_builder.set(my_channel_idx, "my_value".to_string());
59/// flow_builder.set_with_key("my_other_channel_key", 123_u64);
60/// ```
61#[derive(Clone)]
62pub struct FlowDescriptor<K> {
63    /// The name of the flow.
64    name: String,
65
66    /// The ID of the ingestion config that this flow belongs to.
67    ingestion_config_id: String,
68
69    /// The data types of the channels in the flow which will be used
70    /// to validate the values when building a new flow.
71    field_types: Vec<ChannelDataType>,
72
73    /// A mapping of arbitrary keys to the index of the channel in the flow.
74    ///
75    /// Ideally the key should be a trivial key that avoids allocations, though
76    /// for convenience, a string (the channel name) could also be used.
77    index_map: HashMap<K, ChannelIndex>,
78}
79
80impl<K> FlowDescriptor<K>
81where
82    K: Eq + Hash,
83{
84    /// Initializes a new flow descriptor with the provided ingestion config ID and flow name.
85    fn new(ingestion_config_id: impl Into<String>, name: impl Into<String>) -> Self {
86        Self {
87            ingestion_config_id: ingestion_config_id.into(),
88            name: name.into(),
89            field_types: Vec::new(),
90            index_map: HashMap::new(),
91        }
92    }
93
94    /// Gets the type of the channel with the given key.
95    pub fn get<Q>(&self, key: &Q) -> Option<ChannelDataType>
96    where
97        K: core::borrow::Borrow<Q>,
98        Q: Eq + Hash + ?Sized,
99    {
100        let index = self.index_map.get(key)?.0;
101        Some(self.field_types[index])
102    }
103
104    /// Gets the mapping of keys to channel indices.
105    pub fn mapping(&self) -> &HashMap<K, ChannelIndex> {
106        &self.index_map
107    }
108}
109
110/// Builds a [`FlowDescriptor`], which defines the schema of a flow.
111///
112/// The builder is mutable, to allow for the addition of channels to the flow descriptor
113/// while the descriptor itself is immuatble, ensuring that the described flow will be
114/// constructed correctly.
115pub struct FlowDescriptorBuilder<K> {
116    flow_descriptor: FlowDescriptor<K>,
117}
118
119impl<K> FlowDescriptorBuilder<K>
120where
121    K: Eq + Hash,
122{
123    /// Initializes a new [`FlowDescriptorBuilder`] with the provided ingestion config ID and flow name.
124    pub fn new(ingestion_config_id: impl Into<String>, name: impl Into<String>) -> Self {
125        Self {
126            flow_descriptor: FlowDescriptor::new(ingestion_config_id, name),
127        }
128    }
129
130    /// Adds a new channel to the flow.
131    ///
132    /// This returns the index of the channel in the flow. This index can then be used to
133    /// access the value at the given channel index when building a new flow.
134    pub fn add(&mut self, key: K, field_type: ChannelDataType) -> ChannelIndex {
135        let index = self.flow_descriptor.field_types.len();
136        self.flow_descriptor.field_types.push(field_type);
137
138        self.flow_descriptor
139            .index_map
140            .insert(key, ChannelIndex(index));
141
142        ChannelIndex(index)
143    }
144
145    /// Builds the [`FlowDescriptor`] from the builder.
146    pub fn build(self) -> FlowDescriptor<K> {
147        self.flow_descriptor
148    }
149}
150
151impl<S> TryFrom<(S, &'_ FlowConfig)> for FlowDescriptor<String>
152where
153    S: ToString,
154{
155    type Error = Error;
156
157    fn try_from((ingestion_config_id, flow_config): (S, &'_ FlowConfig)) -> Result<Self> {
158        let mut builder =
159            FlowDescriptorBuilder::new(ingestion_config_id.to_string(), flow_config.name.clone());
160        for channel in flow_config.channels.iter() {
161            let data_type = ChannelDataType::try_from(channel.data_type).map_err(|_| {
162                Error::new_msg(
163                    ErrorKind::ArgumentValidationError,
164                    format!(
165                        "invalid data type {:?} for channel {}",
166                        channel.data_type, channel.name
167                    ),
168                )
169            })?;
170
171            builder.add(channel.name.clone(), data_type);
172        }
173        Ok(builder.build())
174    }
175}
176
177impl<S> TryFrom<(S, FlowConfig)> for FlowDescriptor<String>
178where
179    S: ToString,
180{
181    type Error = Error;
182
183    fn try_from((ingestion_config_id, flow_config): (S, FlowConfig)) -> Result<Self> {
184        let mut builder =
185            FlowDescriptorBuilder::new(ingestion_config_id.to_string(), flow_config.name);
186        for channel in flow_config.channels {
187            let data_type = ChannelDataType::try_from(channel.data_type).map_err(|_| {
188                Error::new_msg(
189                    ErrorKind::ArgumentValidationError,
190                    format!(
191                        "invalid data type {:?} for channel {}",
192                        channel.data_type, channel.name
193                    ),
194                )
195            })?;
196
197            builder.add(channel.name, data_type);
198        }
199        Ok(builder.build())
200    }
201}
202
203/// Builder to assist in constructing a flow, utilizing the flow descriptor
204/// to ensure that the flow is constructed correctly (i.e. value in the
205/// correct order and the correct data type).
206///
207/// By using the builder and the flow descriptor, the channel names are not
208/// necessary, which helps improve performance.
209pub struct FlowBuilder<'a, K> {
210    /// The flow descriptor which defines the value schema of the flow.
211    flow_descriptor: &'a FlowDescriptor<K>,
212
213    /// The values of the flow, where the index of the value corresponds to
214    /// the index of the channel in the [`FlowDescriptor`].
215    values: Vec<IngestWithConfigDataChannelValue>,
216
217    /// The optional run ID of the flow.
218    run_id: String,
219}
220
221impl<K> FlowBuilder<'_, K> {
222    /// Builds an [IngestWithConfigDataStreamRequest], consuming the builder.
223    pub fn request(self, now: TimeValue) -> IngestWithConfigDataStreamRequest {
224        IngestWithConfigDataStreamRequest {
225            ingestion_config_id: self.flow_descriptor.ingestion_config_id.clone(),
226            flow: self.flow_descriptor.name.clone(),
227            timestamp: Some(now.0),
228            channel_values: self.values,
229            run_id: self.run_id,
230            ..Default::default()
231        }
232    }
233}
234
235impl<'a, K> FlowBuilder<'a, K>
236where
237    K: Eq + Hash,
238{
239    /// Initializes a new flow builder with the provided flow descriptor.
240    pub fn new(flow_descriptor: &'a FlowDescriptor<K>) -> Self {
241        let values = vec![
242            IngestWithConfigDataChannelValue {
243                r#type: Some(Type::Empty(pbjson_types::Empty {}))
244            };
245            flow_descriptor.field_types.len()
246        ];
247        Self {
248            flow_descriptor,
249            values,
250            run_id: String::new(),
251        }
252    }
253
254    /// Attaches a run ID to the flow.
255    pub fn attach_run_id(&mut self, run_id: impl Into<String>) {
256        self.run_id = run_id.into();
257    }
258
259    /// Sets the value of the channel with the given key.
260    pub fn set<V>(&mut self, index: ChannelIndex, value: V) -> Result<()>
261    where
262        V: Into<Value>,
263    {
264        let value = value.into();
265        let pb_value = value.pb_value();
266
267        // If the value is not empty, validate that the value has the correct data type.
268        if !matches!(value, Value::Empty) {
269            let pb_data_type = value.pb_data_type();
270
271            // Since the [ChannelIndex] is only created by the [FlowDescriptor], we can safely
272            // assume that the index is valid and index directly into the `field_types` vector.
273            let expected_data_type = self.flow_descriptor.field_types[index.0];
274
275            // Validate that the value has the correct data type.
276            if expected_data_type != pb_data_type {
277                return Err(Error::new_msg(
278                    ErrorKind::ArgumentValidationError,
279                    format!(
280                        "value has incorrect data type, expected {expected_data_type:?}, got {pb_data_type:?}"
281                    ),
282                ));
283            }
284        }
285
286        // Update the value.
287        self.values[index.0].r#type = Some(pb_value);
288
289        Ok(())
290    }
291
292    /// Sets the value of the channel with the given key.
293    pub fn set_with_key<Q, V>(&mut self, key: &Q, value: V) -> Result<()>
294    where
295        K: core::borrow::Borrow<Q>,
296        Q: Eq + Hash + ?Sized,
297        V: Into<Value>,
298    {
299        // Get the index of the channel with the given key.
300        let Some(index) = self.flow_descriptor.index_map.get(key) else {
301            return Err(Error::new_msg(
302                ErrorKind::NotFoundError,
303                "provided key was not found in flow descriptor",
304            ));
305        };
306
307        self.set(*index, value)
308    }
309}
310
311#[cfg(test)]
312mod test;
313
314/// Compares two sets of flows and ensures that all flows in `user_specified` has a corresponding
315/// equivalent flow in `sift_flows`. If there is no corresponding flow then it either doesn't exist
316/// in Sift (this would be a bug) or a user made a backwards incompatible change to their ingestion
317/// config.
318pub(crate) fn validate_flows(
319    user_specified: &[FlowConfig],
320    sift_flows: &[FlowConfig],
321) -> Result<()> {
322    for user_flow in user_specified {
323        let num_matches_by_name = sift_flows
324            .iter()
325            .filter(|f| user_flow.name == f.name)
326            .count();
327        let num_exact_matches = sift_flows.iter().filter(|f| &user_flow == f).count();
328
329        if num_matches_by_name > 0 && num_exact_matches == 0 {
330            return Err(Error::new_msg(ErrorKind::IncompatibleIngestionConfigChange, "incompatible change to ingestion config"))
331                .with_context(|| format!("flow(s) with name '{}' exist but their channel configs do not match what the user specified", user_flow.name))
332                .help("Did you modify an existing flow? Try updating the the flow's name or the 'client_key' of `sift_stream::IngestionConfigForm`");
333        } else if num_exact_matches == 0 {
334            return Err(Error::new_msg(ErrorKind::IncompatibleIngestionConfigChange, "incompatible change to ingestion config"))
335                .with_context(|| format!("flow(s) with name '{}' not found in Sift", user_flow.name))
336                .help("try creating a new ingestion config by providing a new 'client_key' to `sift_stream::IngestionConfigForm` and notify Sift");
337        }
338    }
339    Ok(())
340}
341
342/// Concurrently creates flows for the given ingestion config.
343///
344/// Takes ownership of `flow_configs` so each can be moved directly into its spawned task
345/// without an intermediate clone. The per-retry clone inside each task is unavoidable.
346/// Errors are returned per-flow so callers can handle them individually, including treating
347/// [`ErrorKind::AlreadyExistsError`] as a success when a concurrent instance races to create
348/// the same flow.
349pub(crate) async fn add_new_flows(
350    grpc_channel: SiftChannel,
351    ingestion_config_id: &str,
352    flow_configs: Vec<FlowConfig>,
353) -> Vec<std::result::Result<Result<()>, JoinError>> {
354    #[cfg(feature = "tracing")]
355    tracing::info!(
356        ingestion_config_id =? ingestion_config_id,
357        new_flows = flow_configs
358            .iter()
359            .map(|f| f.name.as_str())
360            .collect::<Vec<&str>>()
361            .join(","),
362        "adding new flows to ingestion config"
363    );
364
365    let mut calls = Vec::with_capacity(flow_configs.len());
366
367    for flow_config in flow_configs {
368        let channel = grpc_channel.clone();
369        let config_id = ingestion_config_id.to_string();
370
371        calls.push(tokio::spawn(async move {
372            let wrapper = new_ingestion_config_service(channel);
373            let retrying = wrapper.retrying(RetryConfig::default());
374            retrying
375                .call(|mut w| {
376                    let config_id = config_id.clone();
377                    let flow_config = flow_config.clone();
378                    async move { w.try_create_flows(&config_id, vec![flow_config]).await }
379                })
380                .await
381                .context("SiftStream::add_new_flows")
382        }));
383    }
384
385    futures::future::join_all(calls).await
386}