pub mod api_helper;
mod chained_reader;
mod domain_cache;
#[doc(hidden)]
pub mod hook_processor;
#[doc(hidden)] pub mod http_client;
#[doc(hidden)]
pub mod policy_engine;
mod read_cache;
#[doc(hidden)]
pub mod reader;
mod seal_cache;
pub mod session;
pub mod version;
mod write_cache;
use crate::capsule::common::*;
use crate::session::session::SessionError;
use antimatter_api::apis::configuration;
use antimatter_api::apis::internal_api::{self as api};
use antimatter_api::models::*;
use anyhow::Result;
use std::collections::HashMap;
use std::hash::Hash;
use tokio::time::Duration;
lazy_static::lazy_static! {
#[doc(hidden)]
pub static ref RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread().enable_all().thread_keep_alive(Duration::from_secs(900)).build().unwrap();
}
const API_TARGET_VERSION: &str = "v2";
const ELIDED_THRESHOLD: usize = 10;
const API_TIMEOUT: Duration = Duration::from_secs(30);
#[doc(hidden)]
pub trait DataHookInvoker: std::marker::Send + std::marker::Sync {
fn invoke_hook(
&self,
configuration: &configuration::Configuration,
domain_id: &str,
write_context_name: Option<&str>,
hook_name: &str,
data_tagging_hook_input: DataTaggingHookInput,
) -> Result<DataTaggingHookResponse, SessionError>;
}
#[doc(hidden)]
pub struct DataTagger;
impl DataHookInvoker for DataTagger {
fn invoke_hook(
&self,
configuration: &configuration::Configuration,
domain_id: &str,
write_context_name: Option<&str>,
hook_name: &str,
data_tagging_hook_input: DataTaggingHookInput,
) -> Result<DataTaggingHookResponse, SessionError> {
RUNTIME
.block_on(api::domain_data_tagging_hook_invoke(
configuration,
domain_id,
hook_name,
data_tagging_hook_input,
write_context_name,
))
.map_err(|e| SessionError::APIError(format!("failed to invoke hook: {:?}", e)))
}
}
fn spans_to_byte_idx(data: &[u8], span_tags: &mut Vec<SpanTag>) -> Result<(), SessionError> {
let mut rune_to_byte_idx_map: HashMap<usize, usize> = HashMap::new();
let s = std::str::from_utf8(data)
.map_err(|e| SessionError::Error(format!("failed to convert to UTF-8: {:?}", e)))?;
for span_tag in &mut *span_tags {
rune_to_byte_idx_map.insert(span_tag.start, usize::MAX);
rune_to_byte_idx_map.insert(span_tag.end, usize::MAX);
}
let mut ridx: usize = 0;
let mut bidx: usize = 0;
for r in s.chars() {
if let Some(_val) = rune_to_byte_idx_map.get(&ridx) {
rune_to_byte_idx_map.insert(ridx, bidx);
}
ridx += 1;
bidx += r.len_utf8();
}
if let Some(_val) = rune_to_byte_idx_map.get(&ridx) {
rune_to_byte_idx_map.insert(ridx, bidx);
}
for span_tag in &mut *span_tags {
if let Some(bidx) = rune_to_byte_idx_map.get(&span_tag.start) {
if *bidx == usize::MAX {
return Err(SessionError::Error(
"span tag rune byte index not processed".to_string(),
));
}
span_tag.start = *bidx;
} else {
return Err(SessionError::Error(
"span tag rune byte index not found".to_string(),
));
}
if let Some(bidx) = rune_to_byte_idx_map.get(&span_tag.end) {
if *bidx == usize::MAX {
return Err(SessionError::Error(
"span tag rune byte index not processed".to_string(),
));
}
span_tag.end = *bidx;
} else {
return Err(SessionError::Error(
"span tag rune byte index not found".to_string(),
));
}
}
Ok(())
}
#[doc(hidden)]
pub fn process_tags_to_unique_elided(
tags: Vec<SpanTag>,
) -> (
Vec<TagSummaryUniqueTagsInner>,
Vec<TagSummaryElidedTagsInner>,
) {
#[derive(Eq, Hash, PartialEq, PartialOrd)]
struct SubTag {
value: String,
tag_type: TagType,
source: String,
hook_version: (i32, i32, i32),
}
let mut tag_map: HashMap<String, HashMap<SubTag, i32>> = HashMap::new();
for tag in tags {
let sub_tag = tag_map.entry(tag.tag.name.clone()).or_default();
let entry = sub_tag
.entry(SubTag {
value: tag.tag.value.clone(),
tag_type: tag.tag.tag_type.clone(),
source: tag.tag.source.clone(),
hook_version: tag.tag.hook_version,
})
.or_insert_with(|| 0);
*entry += 1i32;
}
let mut unique_tags: Vec<TagSummaryUniqueTagsInner> = Vec::new();
let mut elided_tags: Vec<TagSummaryElidedTagsInner> = Vec::new();
for (tag_name, tag_name_group) in tag_map.into_iter() {
if tag_name_group.len() >= ELIDED_THRESHOLD {
elided_tags.push(TagSummaryElidedTagsInner {
tag_name,
num_unique_tags: tag_name_group.len() as i32,
total_occurrences: tag_name_group.values().copied().sum::<i32>(),
});
} else {
for (sub_tag, sub_tag_count) in tag_name_group.into_iter() {
unique_tags.push(TagSummaryUniqueTagsInner {
tag: Box::new(Tag {
name: tag_name.clone(),
value: sub_tag.value.clone(),
r#type: TagTypeField::from(sub_tag.tag_type.clone()),
source: sub_tag.source.clone(),
hook_version: Some(format!(
"{}.{}.{}",
sub_tag.hook_version.0, sub_tag.hook_version.1, sub_tag.hook_version.2
)),
}),
occurrences: sub_tag_count,
});
}
}
}
(unique_tags, elided_tags)
}
pub fn convert_to_option_vec(map: &HashMap<String, String>) -> Option<Vec<ReadParameter>> {
let parameters: Vec<ReadParameter> = map
.into_iter()
.map(|(key, value)| ReadParameter {
key: key.to_string(),
value: value.to_string(),
})
.collect();
if parameters.is_empty() {
None
} else {
Some(parameters)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::capsule::common::{CapsuleTag, SpanTag, TagType};
pub struct MockHookInvoker;
impl DataHookInvoker for MockHookInvoker {
fn invoke_hook(
&self,
_configuration: &configuration::Configuration,
_domain_id: &str,
_write_context_name: Option<&str>,
_hook_name: &str,
data_tagging_hook_input: DataTaggingHookInput,
) -> Result<DataTaggingHookResponse, SessionError> {
let mut response: DataTaggingHookResponse = DataTaggingHookResponse {
version: "".to_string(),
records: vec![],
};
for record in data_tagging_hook_input.records {
let mut output_record: DataTaggingHookResponseRecordsInner =
DataTaggingHookResponseRecordsInner { elements: vec![] };
for _ in record.elements {
output_record.elements.push(TagSet {
capsule_tags: vec![],
span_tags: vec![TagSetSpanTagsInner {
start: 0,
end: 5,
tags: vec![Tag {
name: "hook_name1".to_string(),
value: "hook_value1".to_string(),
r#type: Default::default(),
source: "hook_source1".to_string(),
hook_version: Some("1.2.3".to_string()),
}],
}],
})
}
response.records.push(output_record);
}
Ok(response)
}
}
#[test]
fn test_process_tags_to_unique_elided() {
let mut test_tags: Vec<SpanTag> = Vec::new();
for i in 0..20 {
let tag_name = if i < 10 {
"common_tag".to_string()
} else {
format!("unique_tag_{}", i)
};
test_tags.push(SpanTag {
tag: CapsuleTag {
name: tag_name,
tag_type: TagType::Str,
value: format!("value_{}", i),
source: "test_source".to_string(),
hook_version: (1, 0, i),
},
start: i as usize,
end: (i + 1) as usize,
});
}
let (unique_tags, elided_tags) = process_tags_to_unique_elided(test_tags);
assert_eq!(unique_tags.len(), 10);
assert_eq!(elided_tags.len(), 1);
}
#[test]
fn test_rune_to_byte_index_conversion() {
let mut sp_tgs: Vec<SpanTag> = vec![
SpanTag {
tag: CapsuleTag {
name: "name1".to_string(),
tag_type: TagType::Unary,
value: "value1".to_string(),
source: "source1".to_string(),
hook_version: (1, 2, 3),
},
start: 1,
end: 5,
},
SpanTag {
tag: CapsuleTag {
name: "name2".to_string(),
tag_type: TagType::Unary,
value: "value2".to_string(),
source: "source2".to_string(),
hook_version: (2, 3, 4),
},
start: 5,
end: 15,
},
];
let example_string = "He🌍llo, 🌍! and something else.".as_bytes().to_vec();
spans_to_byte_idx(&example_string, &mut sp_tgs).expect("test failure");
assert_eq!(
sp_tgs,
vec![
SpanTag {
tag: CapsuleTag {
name: "name1".to_string(),
tag_type: TagType::Unary,
value: "value1".to_string(),
source: "source1".to_string(),
hook_version: (1, 2, 3),
},
start: 1,
end: 8,
},
SpanTag {
tag: CapsuleTag {
name: "name2".to_string(),
tag_type: TagType::Unary,
value: "value2".to_string(),
source: "source2".to_string(),
hook_version: (2, 3, 4),
},
start: 8,
end: 21,
},
]
);
}
}