antimatter 2.0.13

antimatter.io Rust library for data control
Documentation
use crate::capsule::classifier::{EnforcePolicy, SpanPolicyDecision};
use crate::capsule::common::{CapsuleError, CapsuleTag, PolicyDecision, SpanTag};
use crate::session::policy_engine::{generate_spans, PolicyEngine, PolicyEngineEvaluator};
use crate::session::process_tags_to_unique_elided;
use crate::session::RUNTIME;
use antimatter_api::models::{
    new_access_log_entry::Operation, NewAccessLogEntry, NewAccessLogEntryReadInfo, TagSummary,
};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

pub struct ColumnPolicyEnforcer<P> {
    pub enforcer: Arc<Mutex<P>>,
    pub column: usize,
}

impl<P: PolicyEnforcer> EnforcePolicy for ColumnPolicyEnforcer<P> {
    fn enforce(
        &mut self,
        span_tags: &[SpanTag],
        redact_tags: &[CapsuleTag],
        data: &[u8],
    ) -> Result<(PolicyDecision, Vec<SpanPolicyDecision>), CapsuleError> {
        self.enforcer
            .lock()
            .unwrap()
            .enforce(span_tags, redact_tags, data, self.column)
    }
}

pub trait PolicyEnforcer: Send + Sized {
    fn init_enforcer(
        engine: Option<Arc<Mutex<PolicyEngine>>>,
        capsule_tags: Vec<CapsuleTag>,
        column_tags: Vec<Vec<CapsuleTag>>,
        read_parameters: HashMap<String, String>,
        domain_identity: HashMap<String, String>,
    ) -> Result<Self, CapsuleError>;

    fn access_log_entry(
        &self,
        allowed_records: usize,
        filtered_records: usize,
    ) -> NewAccessLogEntry;
    fn enforce(
        &mut self,
        span_tags: &[SpanTag],
        redact_tags: &[CapsuleTag],
        data: &[u8],
        column_index: usize,
    ) -> Result<(PolicyDecision, Vec<SpanPolicyDecision>), CapsuleError>;
}

pub struct DefaultPolicyEnforcer {
    engine: Arc<Mutex<PolicyEngine>>,
    capsule_tags: Vec<CapsuleTag>,
    column_tags: Vec<Vec<CapsuleTag>>,
    read_parameters: HashMap<String, String>,
    domain_identity: HashMap<String, String>,

    // metrics tracking for read logging
    allowed_tags: Vec<SpanTag>,
    redacted_tags: Vec<SpanTag>,
    tokenized_tags: Vec<SpanTag>,
    allowed_spans: usize,
    filtered_spans: usize,
}

impl PolicyEnforcer for DefaultPolicyEnforcer {
    fn init_enforcer(
        engine: Option<Arc<Mutex<PolicyEngine>>>,
        capsule_tags: Vec<CapsuleTag>,
        column_tags: Vec<Vec<CapsuleTag>>,
        read_parameters: HashMap<String, String>,
        domain_identity: HashMap<String, String>,
    ) -> Result<Self, CapsuleError> {
        let policy_engine = match engine {
            Some(e) => Ok(e),
            None => Err(CapsuleError::Generic(
                "no policy engine found for DefaultPolicyEnforcer".to_string(),
            )),
        }?;

        Ok(Self {
            engine: policy_engine,
            capsule_tags,
            column_tags,
            read_parameters,
            domain_identity,
            allowed_tags: Vec::new(),
            redacted_tags: Vec::new(),
            tokenized_tags: Vec::new(),
            allowed_spans: 0,
            filtered_spans: 0,
        })
    }
    fn access_log_entry(
        &self,
        allowed_records: usize,
        filtered_records: usize,
    ) -> NewAccessLogEntry {
        // TODO: use std::mem::take instead of clone?
        let (allowed_unique_tags, allowed_elided_tags) =
            process_tags_to_unique_elided(self.allowed_tags.clone());
        let (redacted_unique_tags, redacted_elided_tags) =
            process_tags_to_unique_elided(self.redacted_tags.clone());
        let (tokenized_unique_tags, tokenized_elided_tags) =
            process_tags_to_unique_elided(self.tokenized_tags.clone());

        let mut parameters = self.domain_identity.clone();
        for (key, value) in &self.read_parameters {
            parameters.insert(key.to_string(), value.to_string());
        }

        NewAccessLogEntry::new(
            Operation::Read,
            NewAccessLogEntryReadInfo {
                parameters,
                allowed_tags: Box::new(TagSummary {
                    unique_tags: allowed_unique_tags,
                    elided_tags: allowed_elided_tags,
                }),
                redacted_tags: Box::new(TagSummary {
                    unique_tags: redacted_unique_tags,
                    elided_tags: redacted_elided_tags,
                }),
                tokenized_tags: Box::new(TagSummary {
                    unique_tags: tokenized_unique_tags,
                    elided_tags: tokenized_elided_tags,
                }),
                returned_records: allowed_records as i32,
                filtered_records: filtered_records as i32,
            },
        )
    }

    fn enforce(
        &mut self,
        span_tags: &[SpanTag],
        redact_tags: &[CapsuleTag],
        data: &[u8],
        column_index: usize,
    ) -> Result<(PolicyDecision, Vec<SpanPolicyDecision>), CapsuleError> {
        let spans = generate_spans(
            span_tags,
            &self.column_tags[column_index],
            &self.read_parameters,
            &self.capsule_tags,
            &self.domain_identity,
            data.len(),
        );

        // TODO: sync/async handoff is costly
        RUNTIME.block_on(async {
            // TODO: some of these function calls could be costly
            let policy_decisions = self
                .engine
                .lock()
                .unwrap()
                .evaluate(&spans, redact_tags)
                .await
                .map_err(|e| CapsuleError::Generic(format!("evaluating policy: {}", e)))?;

            let mut decisions = Vec::<SpanPolicyDecision>::new();
            for (span, decision) in spans.iter().zip(policy_decisions.iter()) {
                match decision {
                    PolicyDecision::DenyCapsule => {
                        return Ok((PolicyDecision::DenyCapsule, decisions));
                    }
                    PolicyDecision::DenyRecord => {
                        self.filtered_spans += 1;
                        for tag in span.whole_tags.iter() {
                            self.redacted_tags.push(SpanTag {
                                start: span.start,
                                end: span.end,
                                tag: tag.clone(),
                            });
                        }
                        return Ok((PolicyDecision::DenyRecord, decisions));
                    }
                    PolicyDecision::NoMatch | PolicyDecision::Allow => {
                        self.allowed_spans += 1;
                        for tag in span.whole_tags.iter() {
                            self.allowed_tags.push(SpanTag {
                                start: span.start,
                                end: span.end,
                                tag: tag.clone(),
                            });
                        }
                    }
                    PolicyDecision::Redact => {
                        self.filtered_spans += 1;
                        for tag in span.whole_tags.iter() {
                            self.redacted_tags.push(SpanTag {
                                start: span.start,
                                end: span.end,
                                tag: tag.clone(),
                            });
                        }
                    }
                    PolicyDecision::Tokenize => {
                        // TODO: is allowed correct here?
                        self.allowed_spans += 1;
                        for tag in span.whole_tags.iter() {
                            self.tokenized_tags.push(SpanTag {
                                start: span.start,
                                end: span.end,
                                tag: tag.clone(),
                            });
                        }
                    }
                };

                decisions.push(SpanPolicyDecision {
                    start: span.start,
                    end: span.end,
                    decision: *decision,
                });
            }
            Ok((PolicyDecision::Allow, decisions))
        })
    }
}

// AllowingPolicyEnforcer is a variant of the policy enforcer that always allows the policy to pass
pub struct AllowingPolicyEnforcer {}

impl PolicyEnforcer for AllowingPolicyEnforcer {
    fn init_enforcer(
        _: Option<Arc<Mutex<PolicyEngine>>>,
        _: Vec<CapsuleTag>,
        _: Vec<Vec<CapsuleTag>>,
        _: HashMap<String, String>,
        _: HashMap<String, String>,
    ) -> Result<Self, CapsuleError> {
        Ok(Self {})
    }
    // NOTE: AllowingPolicyEnforcer produces no logs
    fn access_log_entry(
        &self,
        _allowed_records: usize,
        _filtered_records: usize,
    ) -> NewAccessLogEntry {
        NewAccessLogEntry::default()
    }

    fn enforce(
        &mut self,
        _span_tags: &[SpanTag],
        _redact_tags: &[CapsuleTag],
        data: &[u8],
        _column_index: usize,
    ) -> Result<(PolicyDecision, Vec<SpanPolicyDecision>), CapsuleError> {
        let decisions = vec![SpanPolicyDecision {
            start: 0,
            end: data.len(),
            decision: PolicyDecision::Allow,
        }];
        Ok((PolicyDecision::Allow, decisions))
    }
}