1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
use crate::capsule::capsule::*;
use crate::capsule::common::*;
use crate::capsule::{CellIterator, RowIterator};
use crate::session::policy_engine::*;
use crate::session::{process_tags_to_unique_elided, RUNTIME};
use antimatter_api::apis::configuration::Configuration;
use antimatter_api::apis::internal_api::{self as api};
use antimatter_api::models::new_access_log_entry::Operation;
use antimatter_api::models::*;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::{io::Read, ops::DerefMut};
/// ProcessMetadata contains metadata collected while processing a capsule
/// as part of a read. This data is mostly used for logging the event.
#[derive(Default, Clone)]
pub struct ProcessMetadata {
pub allowed_records: i32,
pub filtered_records: i32,
pub allowed_span_tags: Vec<SpanTag>,
pub redacted_span_tags: Vec<SpanTag>,
// TODO: tokenization doesnt work yet, but adding the plumbing here for it now
pub tokenized_span_tags: Vec<SpanTag>,
}
impl ProcessMetadata {
fn new() -> Self {
Self {
allowed_records: 0,
filtered_records: 0,
allowed_span_tags: vec![],
redacted_span_tags: vec![],
tokenized_span_tags: vec![],
}
}
}
/// SessionCapsuleCellIterator is an implementation of CellIterator for
/// use with the SessionCapsule type.
pub struct SessionCapsuleCellIterator {
redact_tags: Vec<CapsuleTag>,
/// columns in the capsule
columns: Vec<Column>,
/// PolicyEngine to use for policy decisions
engine: Arc<Mutex<PolicyEngine>>,
/// cells in the row
cells: Vec<DataElement>,
/// read parameters to use when making the policy decision
read_parameters: HashMap<String, String>,
/// capsule tags used when generating spans for policy evaluation
capsule_tags: Vec<CapsuleTag>,
/// ProcessMetadata which is updated while iterating rows
meta: Arc<Mutex<ProcessMetadata>>,
/// tracking if a DenyRecord decision was encountered during iteration
is_deny_record: bool,
/// the span tags for the cells in this row, one entry per cell
span_tags: Vec<Vec<SpanTag>>,
current_cell: usize,
}
impl CellIterator for SessionCapsuleCellIterator {
/// is_deny_record returns true if, during iteration of the cells in
/// this row, a DenyRecord decision was reached. Callers should discard
/// the row if this value is true upon return.
fn is_deny_record(&self) -> bool {
self.is_deny_record
}
/// span_tags returns the set of span tags discovered while iterating
/// this row. Assuming the record is not denied, there is one vector
/// of SpanTag per cell in the row.
fn span_tags(&self) -> Vec<Vec<SpanTag>> {
self.span_tags.clone()
}
/// next_cell returns the next cell data in the row after applying
/// modifications (i.e. redaction, tokenization, etc.)
fn next_cell(&mut self) -> Result<Box<dyn Read + Send + 'static>, CapsuleError> {
if self.current_cell >= self.cells.len() {
return Err(CapsuleError::EndOfCapsule);
}
let cell = &self.cells[self.current_cell];
let col = self.columns.get(self.current_cell).ok_or_else(|| {
CapsuleError::Generic(format!("no column at index {}", self.current_cell))
})?;
let spans = generate_spans(
&cell.tags,
&col.tags,
&self.read_parameters,
&self.capsule_tags,
&HashMap::new(),
cell.data.len(),
);
let (rendered_data, adjusted_span_tags, decision) = RUNTIME
.block_on(async {
enforce_policies(
&cell.data,
spans,
&self.redact_tags,
self.engine.lock().unwrap().deref_mut(),
)
.await
})
.map_err(|e| CapsuleError::Generic(format!("failed enforcing policiies: {}", e)))?;
if decision == PolicyDecision::DenyCapsule {
return Err(CapsuleError::CapsuleAccessDeniedByPolicy);
} else if decision == PolicyDecision::DenyRecord {
self.meta.lock().unwrap().filtered_records += 1;
self.is_deny_record = true;
return Err(CapsuleError::RowAccessDeniedByPolicy);
}
let mut span_tags: Vec<SpanTag> = Vec::new();
for (span_decision, tag) in adjusted_span_tags.iter() {
span_tags.push(tag.clone());
match span_decision {
PolicyDecision::NoMatch | PolicyDecision::Allow => self
.meta
.lock()
.unwrap()
.allowed_span_tags
.push(tag.clone()),
PolicyDecision::Redact => self
.meta
.lock()
.unwrap()
.redacted_span_tags
.push(tag.clone()),
PolicyDecision::Tokenize => self
.meta
.lock()
.unwrap()
.tokenized_span_tags
.push(tag.clone()),
_ => {
return Err(CapsuleError::Generic(format!(
"unrecognized span decision {:?}",
span_decision
)))
}
}
}
self.span_tags.push(span_tags);
self.current_cell += 1;
Ok(Box::new(std::io::Cursor::new(rendered_data)))
}
fn cleanup(&mut self) -> Result<(), CapsuleError> {
Ok(())
}
}
/// SessionCapsule represents an opened V1 capsule.
#[derive(Clone)]
pub struct SessionCapsule {
/// domain_id is the domain to which the capsule belongs
pub domain_id: String,
/// config is the API client configuration used for sending access logs
pub config: Configuration,
/// capsules is the list of capsules in the bundle, together with a
/// cached policy engine that should be used for policy decisions for
/// each capsule.
pub capsules: Vec<(Arc<Mutex<PolicyEngine>>, Capsule)>,
/// extra is the extra field specified at capsule creation time as part
/// of the EncapsulateConfig.
pub extra: String,
/// read_parameters are used when evaluating policy
read_parameters: HashMap<String, String>,
/// open_failures stores the error messages encountered when trying
/// to open the capsules in the bundle.
open_failures: Vec<String>,
meta: Arc<Mutex<ProcessMetadata>>,
current_capsule: usize,
current_row: usize,
}
impl RowIterator for SessionCapsule {
fn domain_id(&self) -> String {
self.domain_id.clone()
}
fn extra_data(&self) -> String {
self.extra.clone()
}
fn capsule_ids(&self) -> Vec<String> {
self.capsules
.iter()
.map(|(_, c)| c.header.capsule_id.clone())
.collect()
}
fn capsule_tags(&self) -> Vec<CapsuleTag> {
let mut result: Vec<CapsuleTag> = vec![];
for (_, capsule) in &self.capsules {
result.extend(capsule.body.capsule_tags.clone());
}
result
}
fn columns(&self) -> Vec<Column> {
let (_, capsule) = &self.capsules[0];
capsule.body.columns.clone()
}
fn open_failures(&self) -> Vec<String> {
self.open_failures.clone()
}
/// next_row returns a CellIterator for the next row in the current
/// capsule, or for the first row in the next capsule if the current
/// capsule has already been iterated over. If there are no more rows
/// and capsules to iterate, it returns Err(EndOfCapsule). After a
/// capsule has been iterated over, if read logging is not disabled,
/// it sends a read log event to the server. The argument redact_tags
/// is an additional set of tags for redaction: if any of these tags
/// appears in a span, that span will be redacted regardless of the
/// policy decision made by the policy engine.
fn next_row(
&mut self,
redact_tags: Vec<CapsuleTag>,
) -> Result<Box<dyn CellIterator + 'static>, CapsuleError> {
let (engine, capsule) = &self.capsules[self.current_capsule];
if self.current_row >= capsule.body.rows.len() {
if !capsule.body.disable_read_logging {
let (allowed_unique_tags, allowed_elided_tags) = process_tags_to_unique_elided(
self.meta.lock().unwrap().allowed_span_tags.clone(),
);
let (redacted_unique_tags, redacted_elided_tags) = process_tags_to_unique_elided(
self.meta.lock().unwrap().redacted_span_tags.clone(),
);
let (tokenized_unique_tags, tokenized_elided_tags) = process_tags_to_unique_elided(
self.meta.lock().unwrap().tokenized_span_tags.clone(),
);
let request = AddCapsuleLogEntryRequest {
entry: Box::new(NewAccessLogEntry {
operation: Operation::Read,
location: None,
read_info: Box::new(NewAccessLogEntryReadInfo {
parameters: self.read_parameters.clone(),
allowed_tags: Box::new(TagSummary {
unique_tags: allowed_unique_tags,
elided_tags: allowed_elided_tags,
}),
redacted_tags: Box::new(TagSummary {
unique_tags: redacted_unique_tags,
elided_tags: redacted_elided_tags,
}),
tokenized_tags: Box::new(TagSummary {
unique_tags: tokenized_unique_tags,
elided_tags: tokenized_elided_tags,
}),
returned_records: self.meta.lock().unwrap().allowed_records,
filtered_records: self.meta.lock().unwrap().filtered_records,
}),
}),
};
RUNTIME
.block_on(api::domain_add_access_log_entry(
&self.config,
&capsule.header.domain_id,
&capsule.header.capsule_id,
&capsule.body.open_token,
request,
))
.map_err(|e| {
CapsuleError::Generic(format!("failed to log access operation: {}", e))
})?;
}
self.current_capsule += 1;
self.current_row = 0;
self.meta = Arc::new(Mutex::new(ProcessMetadata::new()));
}
if self.current_capsule >= self.capsules.len() {
return Err(CapsuleError::EndOfCapsule);
}
self.current_row += 1;
Ok(Box::new(SessionCapsuleCellIterator {
redact_tags: redact_tags.to_vec(),
columns: capsule.body.columns.clone(),
engine: engine.clone(),
cells: capsule.body.rows[self.current_row - 1].clone(),
read_parameters: self.read_parameters.clone(),
capsule_tags: capsule.body.capsule_tags.clone(),
meta: self.meta.clone(),
is_deny_record: false,
span_tags: Vec::new(),
current_cell: 0,
}))
}
}
impl SessionCapsule {
/// Creates a new `SessionCapsule` used for reading from, and applying policy,
/// on capsules within a capsule bundle.
///
/// # Arguments
/// * `domain_id` - The domain ID associated with the `Session`.
/// * `config` - The `SessionCapsule`'s configuration parameters.
/// * `capsules` - The bundled of capsules provided for reading.
/// * `extra` - Additional information used to provide context to reads.
///
/// # Returns
/// A new [`SessionCapsule`].
pub fn new(
domain_id: String,
config: Configuration,
capsules: Vec<(Arc<Mutex<PolicyEngine>>, Capsule)>,
extra: String,
read_parameters: HashMap<String, String>,
open_failures: Vec<String>,
) -> Self {
Self {
domain_id,
config,
capsules,
extra,
read_parameters,
open_failures,
current_capsule: 0,
current_row: 0,
meta: Arc::new(Mutex::new(ProcessMetadata::new())),
}
}
}