antimatter 2.0.13

antimatter.io Rust library for data control
Documentation
use crate::capsule::capsule::*;
use crate::capsule::common::*;
use crate::capsule::{CellIterator, RowIterator};
use crate::session::policy_engine::*;
use crate::session::{process_tags_to_unique_elided, RUNTIME};
use antimatter_api::apis::configuration::Configuration;
use antimatter_api::apis::internal_api::{self as api};
use antimatter_api::models::new_access_log_entry::Operation;
use antimatter_api::models::*;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::{io::Read, ops::DerefMut};

/// ProcessMetadata contains metadata collected while processing a capsule
/// as part of a read. This data is mostly used for logging the event.
#[derive(Default, Clone)]
pub struct ProcessMetadata {
    pub allowed_records: i32,
    pub filtered_records: i32,
    pub allowed_span_tags: Vec<SpanTag>,
    pub redacted_span_tags: Vec<SpanTag>,
    // TODO: tokenization doesnt work yet, but adding the plumbing here for it now
    pub tokenized_span_tags: Vec<SpanTag>,
}

impl ProcessMetadata {
    fn new() -> Self {
        Self {
            allowed_records: 0,
            filtered_records: 0,
            allowed_span_tags: vec![],
            redacted_span_tags: vec![],
            tokenized_span_tags: vec![],
        }
    }
}

/// SessionCapsuleCellIterator is an implementation of CellIterator for
/// use with the SessionCapsule type.
pub struct SessionCapsuleCellIterator {
    redact_tags: Vec<CapsuleTag>,
    /// columns in the capsule
    columns: Vec<Column>,
    /// PolicyEngine to use for policy decisions
    engine: Arc<Mutex<PolicyEngine>>,
    /// cells in the row
    cells: Vec<DataElement>,
    /// read parameters to use when making the policy decision
    read_parameters: HashMap<String, String>,
    /// capsule tags used when generating spans for policy evaluation
    capsule_tags: Vec<CapsuleTag>,
    /// ProcessMetadata which is updated while iterating rows
    meta: Arc<Mutex<ProcessMetadata>>,
    /// tracking if a DenyRecord decision was encountered during iteration
    is_deny_record: bool,
    /// the span tags for the cells in this row, one entry per cell
    span_tags: Vec<Vec<SpanTag>>,
    current_cell: usize,
}

impl CellIterator for SessionCapsuleCellIterator {
    /// is_deny_record returns true if, during iteration of the cells in
    /// this row, a DenyRecord decision was reached. Callers should discard
    /// the row if this value is true upon return.
    fn is_deny_record(&self) -> bool {
        self.is_deny_record
    }

    /// span_tags returns the set of span tags discovered while iterating
    /// this row. Assuming the record is not denied, there is one vector
    /// of SpanTag per cell in the row.
    fn span_tags(&self) -> Vec<Vec<SpanTag>> {
        self.span_tags.clone()
    }

    /// next_cell returns the next cell data in the row after applying
    /// modifications (i.e. redaction, tokenization, etc.)
    fn next_cell(&mut self) -> Result<Box<dyn Read + Send + 'static>, CapsuleError> {
        if self.current_cell >= self.cells.len() {
            return Err(CapsuleError::EndOfCapsule);
        }

        let cell = &self.cells[self.current_cell];

        let col = self.columns.get(self.current_cell).ok_or_else(|| {
            CapsuleError::Generic(format!("no column at index {}", self.current_cell))
        })?;

        let spans = generate_spans(
            &cell.tags,
            &col.tags,
            &self.read_parameters,
            &self.capsule_tags,
            &HashMap::new(),
            cell.data.len(),
        );

        let (rendered_data, adjusted_span_tags, decision) = RUNTIME
            .block_on(async {
                enforce_policies(
                    &cell.data,
                    spans,
                    &self.redact_tags,
                    self.engine.lock().unwrap().deref_mut(),
                )
                .await
            })
            .map_err(|e| CapsuleError::Generic(format!("failed enforcing policiies: {}", e)))?;

        if decision == PolicyDecision::DenyCapsule {
            return Err(CapsuleError::CapsuleAccessDeniedByPolicy);
        } else if decision == PolicyDecision::DenyRecord {
            self.meta.lock().unwrap().filtered_records += 1;
            self.is_deny_record = true;
            return Err(CapsuleError::RowAccessDeniedByPolicy);
        }

        let mut span_tags: Vec<SpanTag> = Vec::new();
        for (span_decision, tag) in adjusted_span_tags.iter() {
            span_tags.push(tag.clone());
            match span_decision {
                PolicyDecision::NoMatch | PolicyDecision::Allow => self
                    .meta
                    .lock()
                    .unwrap()
                    .allowed_span_tags
                    .push(tag.clone()),
                PolicyDecision::Redact => self
                    .meta
                    .lock()
                    .unwrap()
                    .redacted_span_tags
                    .push(tag.clone()),
                PolicyDecision::Tokenize => self
                    .meta
                    .lock()
                    .unwrap()
                    .tokenized_span_tags
                    .push(tag.clone()),
                _ => {
                    return Err(CapsuleError::Generic(format!(
                        "unrecognized span decision {:?}",
                        span_decision
                    )))
                }
            }
        }

        self.span_tags.push(span_tags);

        self.current_cell += 1;
        Ok(Box::new(std::io::Cursor::new(rendered_data)))
    }

    fn cleanup(&mut self) -> Result<(), CapsuleError> {
        Ok(())
    }
}

/// SessionCapsule represents an opened V1 capsule.
#[derive(Clone)]
pub struct SessionCapsule {
    /// domain_id is the domain to which the capsule belongs
    pub domain_id: String,
    /// config is the API client configuration used for sending access logs
    pub config: Configuration,
    /// capsules is the list of capsules in the bundle, together with a
    /// cached policy engine that should be used for policy decisions for
    /// each capsule.
    pub capsules: Vec<(Arc<Mutex<PolicyEngine>>, Capsule)>,
    /// extra is the extra field specified at capsule creation time as part
    /// of the EncapsulateConfig.
    pub extra: String,
    /// read_parameters are used when evaluating policy
    read_parameters: HashMap<String, String>,
    /// open_failures stores the error messages encountered when trying
    /// to open the capsules in the bundle.
    open_failures: Vec<String>,
    meta: Arc<Mutex<ProcessMetadata>>,
    current_capsule: usize,
    current_row: usize,
}

impl RowIterator for SessionCapsule {
    fn domain_id(&self) -> String {
        self.domain_id.clone()
    }

    fn extra_data(&self) -> String {
        self.extra.clone()
    }

    fn capsule_ids(&self) -> Vec<String> {
        self.capsules
            .iter()
            .map(|(_, c)| c.header.capsule_id.clone())
            .collect()
    }

    fn capsule_tags(&self) -> Vec<CapsuleTag> {
        let mut result: Vec<CapsuleTag> = vec![];
        for (_, capsule) in &self.capsules {
            result.extend(capsule.body.capsule_tags.clone());
        }
        result
    }

    fn columns(&self) -> Vec<Column> {
        let (_, capsule) = &self.capsules[0];
        capsule.body.columns.clone()
    }

    fn open_failures(&self) -> Vec<String> {
        self.open_failures.clone()
    }

    /// next_row returns a CellIterator for the next row in the current
    /// capsule, or for the first row in the next capsule if the current
    /// capsule has already been iterated over. If there are no more rows
    /// and capsules to iterate, it returns Err(EndOfCapsule). After a
    /// capsule has been iterated over, if read logging is not disabled,
    /// it sends a read log event to the server. The argument redact_tags
    /// is an additional set of tags for redaction: if any of these tags
    /// appears in a span, that span will be redacted regardless of the
    /// policy decision made by the policy engine.
    fn next_row(
        &mut self,
        redact_tags: Vec<CapsuleTag>,
    ) -> Result<Box<dyn CellIterator + 'static>, CapsuleError> {
        let (engine, capsule) = &self.capsules[self.current_capsule];
        if self.current_row >= capsule.body.rows.len() {
            if !capsule.body.disable_read_logging {
                let (allowed_unique_tags, allowed_elided_tags) = process_tags_to_unique_elided(
                    self.meta.lock().unwrap().allowed_span_tags.clone(),
                );
                let (redacted_unique_tags, redacted_elided_tags) = process_tags_to_unique_elided(
                    self.meta.lock().unwrap().redacted_span_tags.clone(),
                );
                let (tokenized_unique_tags, tokenized_elided_tags) = process_tags_to_unique_elided(
                    self.meta.lock().unwrap().tokenized_span_tags.clone(),
                );

                let request = AddCapsuleLogEntryRequest {
                    entry: Box::new(NewAccessLogEntry {
                        operation: Operation::Read,
                        location: None,
                        read_info: Box::new(NewAccessLogEntryReadInfo {
                            parameters: self.read_parameters.clone(),
                            allowed_tags: Box::new(TagSummary {
                                unique_tags: allowed_unique_tags,
                                elided_tags: allowed_elided_tags,
                            }),
                            redacted_tags: Box::new(TagSummary {
                                unique_tags: redacted_unique_tags,
                                elided_tags: redacted_elided_tags,
                            }),
                            tokenized_tags: Box::new(TagSummary {
                                unique_tags: tokenized_unique_tags,
                                elided_tags: tokenized_elided_tags,
                            }),
                            returned_records: self.meta.lock().unwrap().allowed_records,
                            filtered_records: self.meta.lock().unwrap().filtered_records,
                        }),
                    }),
                };

                RUNTIME
                    .block_on(api::domain_add_access_log_entry(
                        &self.config,
                        &capsule.header.domain_id,
                        &capsule.header.capsule_id,
                        &capsule.body.open_token,
                        request,
                    ))
                    .map_err(|e| {
                        CapsuleError::Generic(format!("failed to log access operation: {}", e))
                    })?;
            }

            self.current_capsule += 1;
            self.current_row = 0;
            self.meta = Arc::new(Mutex::new(ProcessMetadata::new()));
        }

        if self.current_capsule >= self.capsules.len() {
            return Err(CapsuleError::EndOfCapsule);
        }

        self.current_row += 1;
        Ok(Box::new(SessionCapsuleCellIterator {
            redact_tags: redact_tags.to_vec(),
            columns: capsule.body.columns.clone(),
            engine: engine.clone(),
            cells: capsule.body.rows[self.current_row - 1].clone(),
            read_parameters: self.read_parameters.clone(),
            capsule_tags: capsule.body.capsule_tags.clone(),
            meta: self.meta.clone(),
            is_deny_record: false,
            span_tags: Vec::new(),
            current_cell: 0,
        }))
    }
}

impl SessionCapsule {
    /// Creates a new `SessionCapsule` used for reading from, and applying policy,
    /// on capsules within a capsule bundle.
    ///
    /// # Arguments
    /// * `domain_id` - The domain ID associated with the `Session`.
    /// * `config` - The `SessionCapsule`'s configuration parameters.
    /// * `capsules` - The bundled of capsules provided for reading.
    /// * `extra` - Additional information used to provide context to reads.
    ///
    /// # Returns
    /// A new [`SessionCapsule`].
    pub fn new(
        domain_id: String,
        config: Configuration,
        capsules: Vec<(Arc<Mutex<PolicyEngine>>, Capsule)>,
        extra: String,
        read_parameters: HashMap<String, String>,
        open_failures: Vec<String>,
    ) -> Self {
        Self {
            domain_id,
            config,
            capsules,
            extra,
            read_parameters,
            open_failures,
            current_capsule: 0,
            current_row: 0,
            meta: Arc::new(Mutex::new(ProcessMetadata::new())),
        }
    }
}