antimatter 2.0.13

antimatter.io Rust library for data control
Documentation
use crate::capsule::classifier::ProcessHooks;
// use crate::capsule::classifier_reader::ProcessHooksV2;
use crate::capsule::common::{CapsuleError, CapsuleTag, HookInfo, SpanTag, TagType};
use crate::session::{spans_to_byte_idx, Column, DataHookInvoker};
use antimatter_api::apis::configuration;
use antimatter_api::models::{
    DataTaggingHookInput, DataTaggingHookInputRecordsInner,
    DataTaggingHookInputRecordsInnerElementsInner, Tag, WriteContextConfigInfoRequiredHooksInner,
};
use std::sync::{Arc, Mutex};

/// HookProcessor is an internal type responsible for invoking its
/// configured hooks and tracking results over a number of invocations.
/// In particular, it tracks all capsule and span tags observed over its
/// lifetime and exposes the invoked hook versions for inclusion in the
/// capsule footer.
pub struct HookProcessor<I: DataHookInvoker> {
    /// Domain for which requests will be submitted
    domain_id: String,
    /// Write context to use
    write_context_name: Option<String>,
    /// API client configuration
    config: configuration::Configuration,
    /// Hooks to invoke
    hooks: Vec<WriteContextConfigInfoRequiredHooksInner>,
    /// Hook invoker to use
    hook_invoker: Arc<I>,
    /// Column tags, including tags created by any data-structure-classifier
    /// hooks. Since these tags apply to the entire input for each data row,
    /// they will be added to collated_span_tags for each input processed.
    column_tags: Vec<CapsuleTag>,
    /// Collection of capsule tags observed so far. These are updated each
    /// time get_span_tags is called.
    pub collated_capsule_tags: Mutex<Vec<Tag>>,
    /// Collection of span tags observed so far. These are updated each
    /// time get_span_tags is called.
    pub collated_span_tags: Mutex<Vec<SpanTag>>,
    /// Information about hooks invoked so far. These are updated each
    /// time get_span_tags is called.
    pub hook_info: Mutex<Vec<HookInfo>>,
}

impl<I: DataHookInvoker> HookProcessor<I> {
    /// new constructs and returns a HookProcessor. The argument column
    /// will be updated by the constructor if there are any data structure
    /// classifier hooks, which are applied immediately.
    pub fn new(
        domain_id: String,
        write_context_name: Option<String>,
        config: configuration::Configuration,
        column: &mut Column,
        user_hooks: &Vec<WriteContextConfigInfoRequiredHooksInner>,
        hook_invoker: Arc<I>,
        capsule_tags: &[Tag],
    ) -> Self {
        // column tags are needed up front, so we handle them as a special
        // case here. The argument columns are mutable so that we can add
        // the column tags before the hooks are invoked, but the columns
        // are then stored immutably.
        let mut hook_info = Vec::<HookInfo>::new();
        let mut hooks = Vec::<WriteContextConfigInfoRequiredHooksInner>::new();
        for hook in user_hooks {
            if hook.hook.as_str() == "data-structure-classifier" {
                if !column.skip_classification {
                    hook_info.push(HookInfo {
                        name: hook.hook.clone(),
                        version: "1.0.0".to_string(),
                    });
                    column.tags.push(CapsuleTag {
                        name: "tag.antimatter.io/key".to_string(),
                        tag_type: TagType::Str,
                        value: column.name.clone(),
                        source: hook.hook.clone(),
                        hook_version: (1, 0, 0),
                    });
                }
                continue;
            }
            hooks.push(hook.clone());
        }

        // deduplicate column tags
        let mut unique_column_tags: Vec<CapsuleTag> = Vec::new();
        for column_tag in column.tags.drain(..) {
            if !unique_column_tags.iter().any(|t: &CapsuleTag| {
                t.name == column_tag.name
                    && t.source == column_tag.source
                    && t.hook_version == column_tag.hook_version
            }) {
                unique_column_tags.push(column_tag);
            }
        }
        column.tags = unique_column_tags;

        Self {
            domain_id,
            write_context_name,
            config,
            hooks,
            hook_invoker,
            column_tags: column.tags.clone(),
            collated_span_tags: Mutex::new(Vec::new()),
            collated_capsule_tags: Mutex::new(capsule_tags.to_owned()),
            hook_info: Mutex::new(hook_info),
        }
    }

    /// append_column_tags adds the stored column tags to the set of
    /// collated span tags. This is needed because we want to include
    /// one set of column tags for each row in the input, but there
    /// may be more than one call to get_span_tags for each row. This
    /// means it is up to the consumer to call this function when the
    /// input for a column has been processed, but allows for correct
    /// accounting of the number of span tags.
    pub fn append_column_tags(&self) {
        for tag in &self.column_tags {
            self.collated_span_tags.lock().unwrap().push(SpanTag {
                start: 0,
                end: 0, // start and end do not matter for
                tag: tag.clone(),
            });
        }
    }
}

impl<I: DataHookInvoker> ProcessHooks for HookProcessor<I> {
    /// get_span_tags implements ProcessHooks to make HookProcessor
    /// suitable for use with the capsule module's ClassifyingReader.
    fn get_span_tags(&self, content: &[u8], path: &str) -> Result<Vec<SpanTag>, CapsuleError> {
        if self.hooks.is_empty() {
            return Ok(Vec::new());
        }

        // TODO: (performance) some way not to copy content and path?
        let input = DataTaggingHookInput {
            records: vec![DataTaggingHookInputRecordsInner {
                elements: vec![DataTaggingHookInputRecordsInnerElementsInner {
                    // TODO: (performance) use from_utf8_unchecked?
                    content: String::from_utf8(content.to_vec()).map_err(|e| {
                        CapsuleError::Generic(format!("converting input to UTF-8 string: {}", e))
                    })?,
                    path: path.to_string(),
                }],
            }],
        };

        let mut result = Vec::<SpanTag>::new();
        for hook in &self.hooks {
            let mut attempts = 0;
            let max_attempts = 3;

            let mut resp = loop {
                match self.hook_invoker.invoke_hook(
                    &self.config,
                    &self.domain_id,
                    self.write_context_name.as_deref(),
                    hook.hook.as_str(),
                    // TODO: (performance) clone for each loop iteration
                    input.clone(),
                ) {
                    Ok(response) => break Ok(response),
                    Err(e) => {
                        attempts += 1;
                        if attempts >= max_attempts {
                            break Err(e);
                        }
                    }
                }
            }
            .map_err(|e| CapsuleError::Generic(format!("hook invoke error: {}", e)))?;

            let tag_set = &mut resp.records[0].elements[0];

            for tag in &tag_set.span_tags {
                let mut converted = SpanTag::from_api_span_inner(tag)?;
                spans_to_byte_idx(content, &mut converted).map_err(|e| {
                    CapsuleError::Generic(format!("error converting spans to byte index: {}", e))
                })?;
                result.extend(converted);
            }

            // TODO: what about duplicated capsule tags?
            self.collated_capsule_tags
                .lock()
                .unwrap()
                .extend(std::mem::take(&mut tag_set.capsule_tags));

            self.hook_info.lock().unwrap().push(HookInfo {
                name: hook.hook.clone(),
                version: resp.version,
            });
        }

        self.collated_span_tags
            .lock()
            .unwrap()
            .extend(result.clone());
        Ok(result)
    }

    fn has_classification_hooks(&self) -> bool {
        !self.hooks.is_empty()
    }
}