1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
use crate::capsule::classifier::ProcessHooks;
// use crate::capsule::classifier_reader::ProcessHooksV2;
use crate::capsule::common::{CapsuleError, CapsuleTag, HookInfo, SpanTag, TagType};
use crate::session::{spans_to_byte_idx, Column, DataHookInvoker};
use antimatter_api::apis::configuration;
use antimatter_api::models::{
DataTaggingHookInput, DataTaggingHookInputRecordsInner,
DataTaggingHookInputRecordsInnerElementsInner, Tag, WriteContextConfigInfoRequiredHooksInner,
};
use std::sync::{Arc, Mutex};
/// HookProcessor is an internal type responsible for invoking its
/// configured hooks and tracking results over a number of invocations.
/// In particular, it tracks all capsule and span tags observed over its
/// lifetime and exposes the invoked hook versions for inclusion in the
/// capsule footer.
pub struct HookProcessor<I: DataHookInvoker> {
/// Domain for which requests will be submitted
domain_id: String,
/// Write context to use
write_context_name: Option<String>,
/// API client configuration
config: configuration::Configuration,
/// Hooks to invoke
hooks: Vec<WriteContextConfigInfoRequiredHooksInner>,
/// Hook invoker to use
hook_invoker: Arc<I>,
/// Column tags, including tags created by any data-structure-classifier
/// hooks. Since these tags apply to the entire input for each data row,
/// they will be added to collated_span_tags for each input processed.
column_tags: Vec<CapsuleTag>,
/// Collection of capsule tags observed so far. These are updated each
/// time get_span_tags is called.
pub collated_capsule_tags: Mutex<Vec<Tag>>,
/// Collection of span tags observed so far. These are updated each
/// time get_span_tags is called.
pub collated_span_tags: Mutex<Vec<SpanTag>>,
/// Information about hooks invoked so far. These are updated each
/// time get_span_tags is called.
pub hook_info: Mutex<Vec<HookInfo>>,
}
impl<I: DataHookInvoker> HookProcessor<I> {
/// new constructs and returns a HookProcessor. The argument column
/// will be updated by the constructor if there are any data structure
/// classifier hooks, which are applied immediately.
pub fn new(
domain_id: String,
write_context_name: Option<String>,
config: configuration::Configuration,
column: &mut Column,
user_hooks: &Vec<WriteContextConfigInfoRequiredHooksInner>,
hook_invoker: Arc<I>,
capsule_tags: &[Tag],
) -> Self {
// column tags are needed up front, so we handle them as a special
// case here. The argument columns are mutable so that we can add
// the column tags before the hooks are invoked, but the columns
// are then stored immutably.
let mut hook_info = Vec::<HookInfo>::new();
let mut hooks = Vec::<WriteContextConfigInfoRequiredHooksInner>::new();
for hook in user_hooks {
if hook.hook.as_str() == "data-structure-classifier" {
if !column.skip_classification {
hook_info.push(HookInfo {
name: hook.hook.clone(),
version: "1.0.0".to_string(),
});
column.tags.push(CapsuleTag {
name: "tag.antimatter.io/key".to_string(),
tag_type: TagType::Str,
value: column.name.clone(),
source: hook.hook.clone(),
hook_version: (1, 0, 0),
});
}
continue;
}
hooks.push(hook.clone());
}
// deduplicate column tags
let mut unique_column_tags: Vec<CapsuleTag> = Vec::new();
for column_tag in column.tags.drain(..) {
if !unique_column_tags.iter().any(|t: &CapsuleTag| {
t.name == column_tag.name
&& t.source == column_tag.source
&& t.hook_version == column_tag.hook_version
}) {
unique_column_tags.push(column_tag);
}
}
column.tags = unique_column_tags;
Self {
domain_id,
write_context_name,
config,
hooks,
hook_invoker,
column_tags: column.tags.clone(),
collated_span_tags: Mutex::new(Vec::new()),
collated_capsule_tags: Mutex::new(capsule_tags.to_owned()),
hook_info: Mutex::new(hook_info),
}
}
/// append_column_tags adds the stored column tags to the set of
/// collated span tags. This is needed because we want to include
/// one set of column tags for each row in the input, but there
/// may be more than one call to get_span_tags for each row. This
/// means it is up to the consumer to call this function when the
/// input for a column has been processed, but allows for correct
/// accounting of the number of span tags.
pub fn append_column_tags(&self) {
for tag in &self.column_tags {
self.collated_span_tags.lock().unwrap().push(SpanTag {
start: 0,
end: 0, // start and end do not matter for
tag: tag.clone(),
});
}
}
}
impl<I: DataHookInvoker> ProcessHooks for HookProcessor<I> {
/// get_span_tags implements ProcessHooks to make HookProcessor
/// suitable for use with the capsule module's ClassifyingReader.
fn get_span_tags(&self, content: &[u8], path: &str) -> Result<Vec<SpanTag>, CapsuleError> {
if self.hooks.is_empty() {
return Ok(Vec::new());
}
// TODO: (performance) some way not to copy content and path?
let input = DataTaggingHookInput {
records: vec![DataTaggingHookInputRecordsInner {
elements: vec![DataTaggingHookInputRecordsInnerElementsInner {
// TODO: (performance) use from_utf8_unchecked?
content: String::from_utf8(content.to_vec()).map_err(|e| {
CapsuleError::Generic(format!("converting input to UTF-8 string: {}", e))
})?,
path: path.to_string(),
}],
}],
};
let mut result = Vec::<SpanTag>::new();
for hook in &self.hooks {
let mut attempts = 0;
let max_attempts = 3;
let mut resp = loop {
match self.hook_invoker.invoke_hook(
&self.config,
&self.domain_id,
self.write_context_name.as_deref(),
hook.hook.as_str(),
// TODO: (performance) clone for each loop iteration
input.clone(),
) {
Ok(response) => break Ok(response),
Err(e) => {
attempts += 1;
if attempts >= max_attempts {
break Err(e);
}
}
}
}
.map_err(|e| CapsuleError::Generic(format!("hook invoke error: {}", e)))?;
let tag_set = &mut resp.records[0].elements[0];
for tag in &tag_set.span_tags {
let mut converted = SpanTag::from_api_span_inner(tag)?;
spans_to_byte_idx(content, &mut converted).map_err(|e| {
CapsuleError::Generic(format!("error converting spans to byte index: {}", e))
})?;
result.extend(converted);
}
// TODO: what about duplicated capsule tags?
self.collated_capsule_tags
.lock()
.unwrap()
.extend(std::mem::take(&mut tag_set.capsule_tags));
self.hook_info.lock().unwrap().push(HookInfo {
name: hook.hook.clone(),
version: resp.version,
});
}
self.collated_span_tags
.lock()
.unwrap()
.extend(result.clone());
Ok(result)
}
fn has_classification_hooks(&self) -> bool {
!self.hooks.is_empty()
}
}