use crate::capsule::aead::deserialize_ciphertext;
use crate::capsule::bundle_v2::{AccessLogSender, CapsuleOpener};
use crate::capsule::classifier::ClassifyingReader;
use crate::capsule::common::{
CapsuleError, CapsuleHeader, CapsuleTag, Column, HookInfo, RowReader, SpanTag, KEY_SIZE,
NONCE_SIZE,
};
use crate::capsule::framer::{CellDecoder, CellFrameWriter};
use crate::capsule::policy_enforcer::PolicyEnforcer;
use crate::capsule::streaming_aead::{DecryptingAEAD, EncryptingAEADReader, EncryptingAEADWriter};
use crate::capsule::util_readers::{
EOFCallbackReader, LazyEvaluatingReader, MutexCellIterator, MutexReader,
};
use crate::capsule::{CellIterator, RowIterator};
use crate::session::hook_processor::HookProcessor;
use crate::session::policy_engine::PolicyEngine;
use crate::session::session::CapsuleWriter;
use crate::session::{process_tags_to_unique_elided, DataTagger};
use antimatter_api::models::{
CapsuleOpenRequest, CapsuleSealRequest, NewAccessLogEntry, Tag, TagSummary,
};
use serde_tuple::{Deserialize_tuple, Serialize_tuple};
use std::collections::HashMap;
use std::io::{Cursor, Write};
use std::sync::{Arc, Mutex, MutexGuard, RwLock};
use std::{io, io::Read, marker::Send, ops::DerefMut};
pub struct MutexGuardWriter<'a, W: Write>(pub MutexGuard<'a, W>);
impl<'a, W: Write> Write for MutexGuardWriter<'a, W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.0.flush()
}
}
#[derive(Serialize_tuple, Deserialize_tuple, PartialEq)]
pub struct V2CapsuleHeader {
pub columns: Vec<Column>,
pub extra: String,
#[serde(skip)]
pub open_token: String,
#[serde(skip)]
pub disable_read_logging: bool,
}
#[derive(Serialize_tuple, Deserialize_tuple)]
pub struct V2CapsuleFooter {
pub hook_info: Vec<HookInfo>,
}
pub struct SealedV2Capsule<R, P>
where
R: Read + Send + 'static,
P: PolicyEnforcer + 'static,
{
domain_id: String,
extra: String,
capsule_ids: Vec<String>,
capsule_tags: Vec<CapsuleTag>,
columns: Vec<Column>,
reader: Arc<Mutex<DecryptingAEAD<R>>>,
enforcer: Arc<Mutex<P>>,
access_log_sender: Arc<Mutex<dyn Fn(NewAccessLogEntry) -> Result<(), CapsuleError> + Send>>,
footer: Option<V2CapsuleFooter>,
current_cell_iterator: Option<Arc<Mutex<CellDecoder<DecryptingAEAD<R>, P>>>>,
}
impl<R, P> SealedV2Capsule<R, P>
where
R: Read + Send + 'static,
P: PolicyEnforcer + 'static,
{
pub fn from_reader(
input: Arc<Mutex<R>>,
domain_id: &str,
read_context: &str,
access_log_sender: AccessLogSender,
open_capsule: Arc<Mutex<CapsuleOpener>>,
read_params: HashMap<String, String>,
domain_identity_params: HashMap<String, String>,
) -> Result<Self, CapsuleError> {
let header = CapsuleHeader::from_reader(&mut MutexReader {
reader: input.clone(),
})?;
let resolved_read_context = if header.domain_id != domain_id {
format!("{}::{}", domain_id, read_context)
} else {
read_context.to_string()
};
let result = (open_capsule.lock().unwrap())(
&header.domain_id,
&header.capsule_id,
&resolved_read_context,
&header.disaster_recovery_token,
CapsuleOpenRequest {
encrypted_dek: header.encrypted_dek.clone(),
key_id: header.key_id as i64,
read_parameters: crate::session::convert_to_option_vec(&read_params),
},
)?;
let Some((opened_capsule_resp, policy_engine)) = result else {
return Err(CapsuleError::InsufficientPermissions(format!(
"insufficient permissions to open capsule {}",
header.capsule_id
)));
};
let dek = deserialize_ciphertext(&opened_capsule_resp.decryption_key)
.map_err(|e| CapsuleError::Generic(format!("failed to deserialize key: {}", e)))?;
let header_clone = header.clone();
Self::new(
header.domain_id,
header.capsule_id,
dek.ciphertext
.try_into()
.map_err(|_| CapsuleError::Generic("error converting dek to array".to_string()))?,
input,
Arc::new(Mutex::new(move |entry| {
if !opened_capsule_resp
.read_context_configuration
.disable_read_logging
{
(access_log_sender.lock().unwrap())(
entry,
&header_clone.domain_id,
&header_clone.capsule_id,
&opened_capsule_resp.open_token,
)
} else {
Ok(())
}
})),
policy_engine,
opened_capsule_resp
.capsule_tags
.iter()
.map(CapsuleTag::from_tag)
.collect::<Result<Vec<_>, _>>()
.map_err(|e| {
CapsuleError::Generic(format!("converting API tag to CapsuleTag: {}", e))
})?,
read_params,
domain_identity_params,
)
}
fn new(
domain_id: String,
capsule_id: String,
key: [u8; KEY_SIZE],
input: Arc<Mutex<R>>,
access_log_sender: Arc<Mutex<dyn Fn(NewAccessLogEntry) -> Result<(), CapsuleError> + Send>>,
policy_engine: Option<Arc<Mutex<PolicyEngine>>>,
capsule_tags: Vec<CapsuleTag>,
read_parameters: HashMap<String, String>,
domain_identity_parameters: HashMap<String, String>,
) -> Result<Self, CapsuleError>
where
P: PolicyEnforcer,
{
let mut decrypt = DecryptingAEAD::new(&key, input)?;
let secret_header: V2CapsuleHeader = ciborium::from_reader(&mut decrypt)
.map_err(|e| CapsuleError::Generic(format!("decoding V2CapsuleHeader: {}", e)))?;
let enforcer = Arc::new(Mutex::new(P::init_enforcer(
policy_engine,
capsule_tags.clone(),
secret_header
.columns
.iter()
.map(|column| column.tags.clone())
.collect(),
read_parameters,
domain_identity_parameters,
)?));
Ok(Self {
domain_id,
extra: secret_header.extra,
capsule_ids: vec![capsule_id],
capsule_tags: capsule_tags,
reader: Arc::new(Mutex::new(decrypt)),
enforcer,
columns: secret_header.columns,
access_log_sender,
footer: None,
current_cell_iterator: None,
})
}
}
impl<R, P> RowIterator for SealedV2Capsule<R, P>
where
R: Read + Send + 'static,
P: PolicyEnforcer + 'static,
{
fn next_row(
&mut self,
redact_tags: Vec<CapsuleTag>,
) -> Result<Box<dyn CellIterator + 'static>, CapsuleError> {
if self.current_cell_iterator.is_some() {
if self
.current_cell_iterator
.as_ref()
.unwrap()
.lock()
.unwrap()
.end_of_file
{
return Err(CapsuleError::EndOfCapsule);
}
}
let decoder = Arc::new(Mutex::new(CellDecoder::new(
self.reader.clone(),
Some(self.enforcer.clone()),
redact_tags,
)?));
self.current_cell_iterator = Some(decoder.clone());
Ok(Box::new(MutexCellIterator {
it: decoder.clone(),
}))
}
fn for_each_row(
&mut self,
redact_tags: &[CapsuleTag],
f: &mut dyn FnMut(&mut dyn CellIterator) -> Result<(), CapsuleError>,
) -> Result<(), CapsuleError> {
let mut allowed_records: usize = 0;
let mut filtered_records: usize = 0;
loop {
match self.next_row(redact_tags.to_vec()) {
Err(CapsuleError::EndOfCapsule) => break,
Err(CapsuleError::RowAccessDeniedByPolicy) => {
filtered_records += 1;
}
Err(e) => return Err(e),
Ok(mut cell_iterator) => {
allowed_records += 1;
f(&mut *cell_iterator)?;
}
}
}
(self.access_log_sender.lock().unwrap())(
self.enforcer
.lock()
.unwrap()
.access_log_entry(allowed_records, filtered_records),
)?;
self.footer = ciborium::from_reader(self.reader.lock().unwrap().deref_mut())
.map_err(|e| CapsuleError::Generic(format!("reading capsule footer: {}", e)))?;
Ok(())
}
fn domain_id(&self) -> String {
self.domain_id.clone()
}
fn extra_data(&self) -> String {
self.extra.clone()
}
fn capsule_ids(&self) -> Vec<String> {
self.capsule_ids.clone()
}
fn capsule_tags(&self) -> Vec<CapsuleTag> {
self.capsule_tags.clone()
}
fn columns(&self) -> Vec<Column> {
self.columns.clone()
}
fn open_failures(&self) -> Vec<String> {
Vec::new()
}
}
pub fn sealed_capsule_v2_reader<'a, F>(
nonce_block: [u8; NONCE_SIZE],
dek: Vec<u8>,
extra: String,
columns: Vec<Column>,
hook_processors: Vec<Arc<RwLock<HookProcessor<DataTagger>>>>, data: Vec<RowReader>, seal: F,
) -> Result<impl std::io::Read + Send + 'a, CapsuleError>
where
F: Fn(CapsuleSealRequest) -> Result<(), std::io::Error> + Send + 'a,
{
let row_count = data.len();
let mut row_tags: Vec<Tag> = Vec::new();
for row in data.iter() {
for tag in row.tags.iter() {
row_tags.push(Tag::from(tag.clone()))
}
}
let reader = ClassifyingReader::new(data, columns.clone(), hook_processors.clone());
let hook_processors_copy = hook_processors.clone();
let framer = LazyEvaluatingReader::new(reader, move || {
let mut hook_info = Vec::<HookInfo>::new();
for processor in &hook_processors_copy {
for info in processor.read().unwrap().hook_info.lock().unwrap().iter() {
if !hook_info.contains(info) {
hook_info.push(info.clone());
}
}
}
Ok(V2CapsuleFooter { hook_info })
});
let mut secret_header: Vec<u8> = Vec::new();
ciborium::into_writer(
&V2CapsuleHeader {
columns,
extra,
disable_read_logging: false, open_token: "".to_string(), },
&mut secret_header,
)
.map_err(|e| CapsuleError::Generic(format!("serializing V2CapsuleHeader: {}", e)))?;
let cipher = EncryptingAEADReader::new(
nonce_block,
&dek.try_into()
.map_err(|_| CapsuleError::Generic("error converting dek to array".to_string()))?,
Cursor::new(secret_header).chain(framer),
)?;
Ok(EOFCallbackReader::new(cipher, move |bytes_read| {
let mut capsule_tags: Vec<Tag> = Vec::new();
let mut span_tags: Vec<SpanTag> = Vec::new();
for processor in &hook_processors {
capsule_tags.append(
&mut processor
.read()
.unwrap()
.collated_capsule_tags
.lock()
.unwrap(),
);
span_tags.append(&mut processor.read().unwrap().collated_span_tags.lock().unwrap());
}
for tag in &row_tags {
capsule_tags.push(tag.clone());
}
let mut unique_capsule_tags: Vec<Tag> = Vec::new();
for capsule_tag in capsule_tags.drain(..) {
if !unique_capsule_tags.iter().any(|t: &Tag| {
t.name == capsule_tag.name
&& t.source == capsule_tag.source
&& t.hook_version == capsule_tag.hook_version
}) {
unique_capsule_tags.push(capsule_tag);
}
}
let (unique, elided) = process_tags_to_unique_elided(span_tags);
seal(CapsuleSealRequest {
capsule_tags: unique_capsule_tags,
span_tags: Box::new(TagSummary {
unique_tags: unique,
elided_tags: elided,
}),
size: bytes_read as i64,
rows: row_count as i64,
})
}))
}
pub struct DefaultCapsuleWriter<W: Write, F: FnMut(CapsuleSealRequest) -> Result<(), io::Error>> {
writer: CellFrameWriter<EncryptingAEADWriter<W>>,
columns: Vec<Column>,
hook_processors: Vec<Arc<RwLock<HookProcessor<DataTagger>>>>, seal: F,
bytes_written: u64,
rows: usize,
finalized: bool,
}
impl<W: Write, F: FnMut(CapsuleSealRequest) -> Result<(), std::io::Error>>
DefaultCapsuleWriter<W, F>
{
pub fn new(
output: Arc<Mutex<W>>,
nonce_block: [u8; NONCE_SIZE],
dek: Vec<u8>,
extra: String,
columns: Vec<Column>,
hook_processors: Vec<Arc<RwLock<HookProcessor<DataTagger>>>>, seal: F,
) -> Result<Self, CapsuleError> {
let mut cipher = EncryptingAEADWriter::new(
nonce_block,
&dek.try_into()
.map_err(|_| CapsuleError::Generic("error converting dek to array".to_string()))?,
output,
)?;
ciborium::into_writer(
&V2CapsuleHeader {
columns: columns.clone(),
extra,
disable_read_logging: false, open_token: "".to_string(), },
&mut cipher,
)
.map_err(|e| CapsuleError::Generic(format!("serializing V2CapsuleHeader: {}", e)))?;
Ok(Self {
writer: CellFrameWriter::new(cipher)?,
columns,
hook_processors,
seal,
bytes_written: 0,
rows: 0,
finalized: false,
})
}
}
impl<W, F> CapsuleWriter for DefaultCapsuleWriter<W, F>
where
W: Write,
F: FnMut(CapsuleSealRequest) -> Result<(), std::io::Error>,
{
fn add_rows(&mut self, data: Vec<RowReader>) -> Result<(), CapsuleError> {
if data.is_empty() {
return Ok(());
}
for (idx, row) in data.iter().enumerate() {
if row.cells.len() != self.columns.len() {
return Err(CapsuleError::CapsuleUpdateError(format!(
"dimension error: row[{}] has length: {}, expected: {}",
idx,
row.cells.len(),
self.columns.len()
)));
}
}
let row_count = data.len();
let reader =
ClassifyingReader::new(data, self.columns.clone(), self.hook_processors.clone());
self.rows += row_count;
self.bytes_written += self
.writer
.write_stream(reader)
.map_err(|e| CapsuleError::CapsuleUpdateError(format!("failed to add rows: {}", e)))?;
Ok(())
}
fn finalize(&mut self) -> Result<(), CapsuleError> {
self.writer
.flush_rows(true)
.map_err(|e| CapsuleError::CapsuleUpdateError(format!("failed to flush: {}", e)))?;
let mut capsule_tags: Vec<Tag> = Vec::new();
let mut span_tags: Vec<SpanTag> = Vec::new();
let mut hook_info = Vec::<HookInfo>::new();
for processor in &self.hook_processors {
for info in processor.read().unwrap().hook_info.lock().unwrap().iter() {
if !hook_info.contains(info) {
hook_info.push(info.clone());
}
}
capsule_tags.append(
&mut processor
.read()
.unwrap()
.collated_capsule_tags
.lock()
.unwrap(),
);
span_tags.append(&mut processor.read().unwrap().collated_span_tags.lock().unwrap());
}
let footer = V2CapsuleFooter { hook_info };
ciborium::into_writer(&footer, &mut self.writer)
.map_err(|e| CapsuleError::Generic(format!("serializing capsule footer: {}", e)))?;
self.writer
.flush()
.map_err(|e| CapsuleError::Generic(format!("finalizing capsule: {}", e)))?;
let mut unique_capsule_tags: Vec<Tag> = Vec::new();
for capsule_tag in capsule_tags.drain(..) {
if !unique_capsule_tags.iter().any(|t: &Tag| {
t.name == capsule_tag.name
&& t.source == capsule_tag.source
&& t.hook_version == capsule_tag.hook_version
}) {
unique_capsule_tags.push(capsule_tag);
}
}
let (unique, elided) = process_tags_to_unique_elided(span_tags);
(self.seal)(CapsuleSealRequest {
capsule_tags: unique_capsule_tags,
span_tags: Box::new(TagSummary {
unique_tags: unique,
elided_tags: elided,
}),
size: self.bytes_written as i64,
rows: self.rows as i64,
})
.map_err(|e| CapsuleError::Generic(format!("sealing capsule: {}", e)))?;
self.finalized = true;
Ok(())
}
}
impl<W, F> Drop for DefaultCapsuleWriter<W, F>
where
W: Write,
F: FnMut(CapsuleSealRequest) -> Result<(), std::io::Error>,
{
fn drop(&mut self) {
if !self.finalized {
self.finalized = true;
self.finalize().unwrap();
}
}
}