use crate::session::session::SessionError;
use crate::session::RUNTIME;
use antimatter_api::apis::configuration;
use antimatter_api::apis::internal_api::{self as api};
use antimatter_api::models::*;
use lru::LruCache;
use std::cmp::max;
use std::mem;
use std::num::NonZeroUsize;
use std::sync::{Arc, Mutex};
use tokio::task::JoinHandle;
type InvokeFunc = Arc<
dyn Fn(
configuration::Configuration,
String,
String,
String,
CapsuleSealRequest,
) -> Result<(), SessionError>
+ Send
+ Sync,
>;
fn invoke_request(
configuration: configuration::Configuration,
domain_id: String,
id: String,
create_token: String,
seal_request: CapsuleSealRequest,
) -> Result<(), SessionError> {
let c = configuration.clone();
RUNTIME
.block_on(api::domain_seal_capsule(
&c,
&domain_id,
&id,
&create_token,
seal_request,
))
.map_err(|e| SessionError::APIError(format!("invoke seal error: {:?}", e)))
}
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
struct SealKey {
domain_id: String,
id: String,
create_token: String,
}
pub struct SealCache {
cache: LruCache<SealKey, BufferedSealInternal>,
seal_invoker: InvokeFunc,
enabled: bool,
size: usize,
}
impl SealCache {
pub fn new(size: usize, is_async: bool) -> Self {
let cache_size = NonZeroUsize::new(size).unwrap_or_else(|| NonZeroUsize::new(100).unwrap());
Self {
cache: LruCache::new(cache_size),
seal_invoker: Arc::new(invoke_request),
enabled: is_async,
size,
}
}
pub fn seal(
&mut self,
configuration: &configuration::Configuration,
domain_id: String,
id: String,
create_token: String,
request: &mut CapsuleSealRequest,
async_seal: bool,
) -> Result<(), SessionError> {
let key = SealKey {
domain_id: domain_id.to_string(),
id: id.to_string(),
create_token: create_token.to_string(),
};
let buff = match self.cache.get_mut(&key) {
None => {
let new_buff = BufferedSealInternal::new(
configuration.clone(),
domain_id.clone(),
id.clone(),
create_token.clone(),
self.seal_invoker.clone(),
);
self.cache.push(key.clone(), new_buff);
self.cache.get_mut(&key).unwrap()
}
Some(v) => v,
};
if !self.enabled || !async_seal {
buff.seal(
&mut request.span_tags.unique_tags,
&mut request.span_tags.elided_tags,
&mut request.capsule_tags,
request.size,
request.rows,
)
} else {
buff.seal_async(
&mut request.span_tags.unique_tags,
&mut request.span_tags.elided_tags,
&mut request.capsule_tags,
request.size,
request.rows,
)
}
}
pub fn update_tags_for_seal(
&mut self,
configuration: &configuration::Configuration,
domain_id: String,
id: String,
create_token: String,
request: &mut CapsuleSealRequest,
) -> Result<(), SessionError> {
let key = SealKey {
domain_id: domain_id.to_string(),
id: id.to_string(),
create_token: create_token.to_string(),
};
let buff = match self.cache.get_mut(&key) {
None => {
let new_buff = BufferedSealInternal::new(
configuration.clone(),
domain_id.clone(),
id.clone(),
create_token.clone(),
self.seal_invoker.clone(),
);
self.cache.push(key.clone(), new_buff);
self.cache.get_mut(&key).unwrap()
}
Some(v) => v,
};
buff.update_tags_for_seal(
&mut request.span_tags.unique_tags,
&mut request.span_tags.elided_tags,
&mut request.capsule_tags,
request.size,
request.rows,
);
Ok(())
}
pub fn size(&self) -> usize {
self.size
}
pub fn enabled(&self) -> bool {
self.enabled
}
}
struct BufferedSealInternal {
configuration: configuration::Configuration,
domain_id: String,
id: String,
create_token: String,
active_request: Option<JoinHandle<Result<(), SessionError>>>,
pub seal_invoker: InvokeFunc,
request: Arc<Mutex<Option<CapsuleSealRequest>>>,
first: bool,
}
impl Drop for BufferedSealInternal {
fn drop(&mut self) {
match self.complete() {
Ok(_) => {}
Err(e) => {
println!("error handling tail seal event: {:}", e)
}
}
}
}
impl BufferedSealInternal {
pub fn new(
configuration: configuration::Configuration,
domain_id: String,
id: String,
create_token: String,
seal_invoker: InvokeFunc,
) -> Self {
Self {
configuration,
domain_id,
id,
create_token,
active_request: None,
seal_invoker,
request: Arc::new(Mutex::new(None)),
first: true,
}
}
fn request_in_flight(&mut self) -> Result<bool, SessionError> {
if self.active_request.is_some() {
if self.active_request.as_ref().unwrap().is_finished() {
RUNTIME.block_on(async {
match self.active_request.as_mut().unwrap().await {
Ok(Ok(())) => {
self.active_request = None;
Ok(false)
}
Ok(Err(e)) => Err(e),
Err(e) => Err(SessionError::Error(format!(
"error joining active request task: {}",
e
))),
}
})
} else {
Ok(true)
}
} else {
Ok(false)
}
}
pub fn update_tags_for_seal(
&mut self,
unique: &mut Vec<TagSummaryUniqueTagsInner>,
elided: &mut Vec<TagSummaryElidedTagsInner>,
capsule_tags: &mut Vec<Tag>,
size: i64,
rows: i64,
) {
let mut request = self.request.lock().unwrap();
if request.is_none() {
*request = Some(CapsuleSealRequest {
capsule_tags: mem::take(&mut *capsule_tags),
span_tags: Box::new(TagSummary {
unique_tags: mem::take(&mut *unique),
elided_tags: mem::take(&mut *elided),
}),
size,
rows,
})
} else {
let req = request.as_mut().unwrap();
merge_unique_tags(&mut req.span_tags.unique_tags, unique);
merge_elided_tags(&mut req.span_tags.elided_tags, elided);
req.capsule_tags.append(capsule_tags);
req.size += size;
req.rows += rows;
}
}
pub fn seal(
&mut self,
unique: &mut Vec<TagSummaryUniqueTagsInner>,
elided: &mut Vec<TagSummaryElidedTagsInner>,
capsule_tags: &mut Vec<Tag>,
size: i64,
rows: i64,
) -> Result<(), SessionError> {
self.update_tags_for_seal(unique, elided, capsule_tags, size, rows);
let request = self.request.lock().unwrap().take().unwrap();
(self.seal_invoker)(
self.configuration.clone(),
self.domain_id.clone(),
self.id.clone(),
self.create_token.clone(),
request,
)
}
pub fn seal_async(
&mut self,
unique: &mut Vec<TagSummaryUniqueTagsInner>,
elided: &mut Vec<TagSummaryElidedTagsInner>,
capsule_tags: &mut Vec<Tag>,
size: i64,
rows: i64,
) -> Result<(), SessionError> {
self.update_tags_for_seal(unique, elided, capsule_tags, size, rows);
if !self.request_in_flight()? {
let request = self.request.lock().unwrap().take().unwrap();
if self.first {
(self.seal_invoker)(
self.configuration.clone(),
self.domain_id.clone(),
self.id.clone(),
self.create_token.clone(),
request,
)?;
} else {
let config = self.configuration.clone();
let domain_id = self.domain_id.clone();
let id = self.id.clone();
let create_token = self.create_token.clone();
let invoker = self.seal_invoker.clone();
let request_clone = self.request.clone();
self.active_request = Some(tokio::spawn(async move {
(invoker)(
config.clone(),
domain_id.clone(),
id.clone(),
create_token.clone(),
request,
)?;
if let Some(next_request) = request_clone.lock().unwrap().take() {
(invoker)(config, domain_id, id, create_token, next_request)?;
}
Ok(())
}));
}
}
Ok(())
}
pub fn complete(&mut self) -> Result<(), SessionError> {
if self.active_request.is_some() {
RUNTIME.block_on(async { self.active_request.as_mut().unwrap().await.unwrap() })?;
}
self.active_request = None;
let request = self.request.lock().unwrap().take();
if request.is_some() {
(self.seal_invoker)(
self.configuration.clone(),
self.domain_id.clone(),
self.id.clone(),
self.create_token.clone(),
request.unwrap(),
)?;
}
Ok(())
}
}
fn merge_unique_tags(a: &mut Vec<TagSummaryUniqueTagsInner>, b: &Vec<TagSummaryUniqueTagsInner>) {
for bi in b {
if let Some(ai) = a.iter_mut().find(|ai| ai.tag == bi.tag) {
ai.occurrences += bi.occurrences;
} else {
a.push(bi.clone());
}
}
}
fn merge_elided_tags(a: &mut Vec<TagSummaryElidedTagsInner>, b: &Vec<TagSummaryElidedTagsInner>) {
for bi in b {
if let Some(ai) = a.iter_mut().find(|ai| ai.tag_name == bi.tag_name) {
ai.num_unique_tags = max(ai.num_unique_tags, bi.num_unique_tags);
ai.total_occurrences += bi.total_occurrences;
} else {
a.push(bi.clone());
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn mock_invoke_request(
_configuration: configuration::Configuration,
_domain_id: String,
_id: String,
_create_token: String,
_seal_request: CapsuleSealRequest,
) -> Result<(), SessionError> {
Ok(())
}
fn dummy_tag() -> TagSummaryUniqueTagsInner {
TagSummaryUniqueTagsInner {
tag: Box::new(Tag {
name: "dummy_tag".to_string(),
value: "value".to_string(),
r#type: Default::default(),
source: "source".to_string(),
hook_version: Some("1.0.0".to_string()),
}),
occurrences: 10,
}
}
fn seal_request() -> CapsuleSealRequest {
CapsuleSealRequest {
capsule_tags: vec![],
span_tags: Box::new(TagSummary {
unique_tags: vec![],
elided_tags: vec![],
}),
size: 0,
rows: 0,
}
}
#[test]
fn test_buffered_seal() {
let configuration = configuration::Configuration::default();
let domain_id = "test_domain".to_string();
let id = "test_id".to_string();
let create_token = "test_token".to_string();
let mut buffered_seal = BufferedSealInternal::new(
configuration,
domain_id,
id,
create_token,
Arc::new(mock_invoke_request),
);
let mut unique_tags = vec![dummy_tag()];
let mut elided_tags = vec![];
let mut capsule_tags: Vec<Tag> = vec![];
let size = 100;
let rows = 10;
buffered_seal
.seal_async(
&mut unique_tags,
&mut elided_tags,
&mut capsule_tags,
size,
rows,
)
.unwrap();
let complete = buffered_seal.complete();
assert!(complete.is_ok());
assert!(buffered_seal.active_request.is_none());
}
#[test]
fn test_caching_enabled() {
let configuration = configuration::Configuration::default();
let domain_id = "domain1".to_string();
let id = "id1".to_string();
let create_token = "token1".to_string();
let mut request = seal_request();
let mut seal_cache = SealCache::new(10, true);
seal_cache.seal_invoker = Arc::new(mock_invoke_request);
assert!(seal_cache
.seal(
&configuration,
domain_id.clone(),
id.clone(),
create_token.clone(),
&mut request,
true
)
.is_ok());
assert!(seal_cache
.seal(
&configuration,
domain_id,
id,
create_token,
&mut request,
true
)
.is_ok());
assert_eq!(seal_cache.cache.len(), 1);
}
#[test]
fn test_caching_disabled() {
let configuration = configuration::Configuration::default();
let domain_id = "domain2".to_string();
let id = "id2".to_string();
let create_token = "token2".to_string();
let mut request = seal_request();
let mut seal_cache = SealCache::new(0, false);
seal_cache.seal_invoker = Arc::new(mock_invoke_request);
assert!(seal_cache
.seal(
&configuration,
domain_id,
id,
create_token,
&mut request,
true
)
.is_ok());
assert_eq!(seal_cache.enabled, false);
}
#[test]
fn merge_into_empty() {
let mut a: Vec<TagSummaryUniqueTagsInner> = vec![];
let mut b = vec![TagSummaryUniqueTagsInner {
tag: Box::new(Tag {
name: "tag1".to_string(),
value: "value1".to_string(),
r#type: Default::default(),
source: "source1".to_string(),
hook_version: Some("1.0.0".to_string()),
}),
occurrences: 2,
}];
merge_unique_tags(&mut a, &mut b);
assert_eq!(a.len(), 1);
assert_eq!(a[0].occurrences, 2);
assert_eq!(a[0].tag.name, "tag1");
}
#[test]
fn merge_with_no_overlap() {
let mut a = vec![TagSummaryUniqueTagsInner {
tag: Box::new(Tag {
name: "tag1".to_string(),
value: "value1".to_string(),
r#type: Default::default(),
source: "source1".to_string(),
hook_version: Some("1.0.0".to_string()),
}),
occurrences: 2,
}];
let mut b = vec![
TagSummaryUniqueTagsInner {
tag: Box::new(Tag {
name: "tag1".to_string(),
value: "value1".to_string(),
r#type: Default::default(),
source: "source2".to_string(),
hook_version: Some("1.0.0".to_string()),
}),
occurrences: 3,
},
TagSummaryUniqueTagsInner {
tag: Box::new(Tag {
name: "tag2".to_string(),
value: "value2".to_string(),
r#type: Default::default(),
source: "source2".to_string(),
hook_version: Some("1.0.0".to_string()),
}),
occurrences: 3,
},
];
merge_unique_tags(&mut a, &mut b);
assert_eq!(a.len(), 3);
assert_eq!(a[1].occurrences, 3);
assert_eq!(a[1].tag.name, "tag1");
}
#[test]
fn merge_with_overlap() {
let mut a = vec![TagSummaryUniqueTagsInner {
tag: Box::new(Tag {
name: "tag1".to_string(),
value: "value1".to_string(),
r#type: Default::default(),
source: "source1".to_string(),
hook_version: Some("1.0.0".to_string()),
}),
occurrences: 2,
}];
let mut b = vec![
TagSummaryUniqueTagsInner {
tag: Box::new(Tag {
name: "tag1".to_string(),
value: "value1_different".to_string(),
r#type: Default::default(),
source: "source1_different".to_string(),
hook_version: Some("1.0.1".to_string()),
}),
occurrences: 3,
},
TagSummaryUniqueTagsInner {
tag: Box::new(Tag {
name: "tag1".to_string(),
value: "value1".to_string(),
r#type: Default::default(),
source: "source1".to_string(),
hook_version: Some("1.0.0".to_string()),
}),
occurrences: 5,
},
];
merge_unique_tags(&mut a, &mut b);
assert_eq!(a.len(), 2, "One tag should be merged, the other appended");
assert_eq!(a[0].occurrences, 7, "Occurrences should be summed");
}
#[test]
fn merge_elided_into_empty() {
let mut a: Vec<TagSummaryElidedTagsInner> = vec![];
let mut b = vec![TagSummaryElidedTagsInner {
tag_name: "tag1".to_string(),
num_unique_tags: 2,
total_occurrences: 5,
}];
merge_elided_tags(&mut a, &mut b);
assert_eq!(a.len(), 1);
assert_eq!(a[0].num_unique_tags, 2);
assert_eq!(a[0].total_occurrences, 5);
assert_eq!(a[0].tag_name, "tag1");
}
#[test]
fn merge_elided_with_no_overlap() {
let mut a = vec![TagSummaryElidedTagsInner {
tag_name: "tag1".to_string(),
num_unique_tags: 2,
total_occurrences: 5,
}];
let mut b = vec![TagSummaryElidedTagsInner {
tag_name: "tag2".to_string(),
num_unique_tags: 3,
total_occurrences: 7,
}];
merge_elided_tags(&mut a, &mut b);
assert_eq!(a.len(), 2);
assert_eq!(a[1].num_unique_tags, 3);
assert_eq!(a[1].total_occurrences, 7);
assert_eq!(a[1].tag_name, "tag2");
}
#[test]
fn merge_elided_with_overlap() {
let mut a = vec![TagSummaryElidedTagsInner {
tag_name: "tag1".to_string(),
num_unique_tags: 2,
total_occurrences: 5,
}];
let mut b = vec![TagSummaryElidedTagsInner {
tag_name: "tag1".to_string(),
num_unique_tags: 1, total_occurrences: 3,
}];
merge_elided_tags(&mut a, &mut b);
assert_eq!(a.len(), 1, "Tags should be merged, not appended");
assert_eq!(
a[0].num_unique_tags, 2,
"Num unique tags should report the largest set"
);
assert_eq!(
a[0].total_occurrences, 8,
"Total occurrences should be summed"
);
}
}