use super::depacketizer::{Depacketizer, DepacketizerFrame};
use crate::{
api::{DataTrackFrame, DataTrackInfo},
e2ee::{DecryptionProvider, EncryptedPayload},
packet::Packet,
};
use std::sync::Arc;
pub(super) struct PipelineOptions {
pub info: Arc<DataTrackInfo>,
pub publisher_identity: Arc<str>,
pub decryption_provider: Option<Arc<dyn DecryptionProvider>>,
}
pub(super) struct Pipeline {
publisher_identity: Arc<str>,
e2ee_provider: Option<Arc<dyn DecryptionProvider>>,
depacketizer: Depacketizer,
}
impl Pipeline {
pub fn new(options: PipelineOptions) -> Self {
debug_assert_eq!(options.info.uses_e2ee, options.decryption_provider.is_some());
let depacketizer = Depacketizer::new();
Self {
publisher_identity: options.publisher_identity,
e2ee_provider: options.decryption_provider,
depacketizer,
}
}
pub fn process_packet(&mut self, packet: Packet) -> Option<DataTrackFrame> {
let frame = self.depacketize(packet)?;
let frame = self.decrypt_if_needed(frame)?;
Some(frame.into())
}
fn depacketize(&mut self, packet: Packet) -> Option<DepacketizerFrame> {
let result = self.depacketizer.push(packet);
if let Some(drop) = result.drop_error {
log::debug!("{}", drop);
};
result.frame
}
fn decrypt_if_needed(&self, mut frame: DepacketizerFrame) -> Option<DepacketizerFrame> {
let Some(decryption) = &self.e2ee_provider else { return frame.into() };
let Some(e2ee) = frame.extensions.e2ee else {
log::error!("Missing E2EE meta");
return None;
};
let encrypted =
EncryptedPayload { payload: frame.payload, iv: e2ee.iv, key_index: e2ee.key_index };
frame.payload = match decryption.decrypt(encrypted, &self.publisher_identity) {
Ok(decrypted) => decrypted,
Err(err) => {
log::error!("{}", err);
return None;
}
};
frame.into()
}
}
impl From<DepacketizerFrame> for DataTrackFrame {
fn from(frame: DepacketizerFrame) -> Self {
Self {
payload: frame.payload,
user_timestamp: frame.extensions.user_timestamp.map(|v| v.0),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::packet::{FrameMarker, Header};
use fake::{Fake, Faker};
#[test]
fn test_process_frame() {
const PAYLOAD_LEN: usize = 1024;
let mut info: DataTrackInfo = Faker.fake();
info.uses_e2ee = false;
let publisher_identity: Arc<str> = Faker.fake::<String>().into();
let options =
PipelineOptions { info: info.into(), publisher_identity, decryption_provider: None };
let mut pipeline = Pipeline::new(options);
let mut header: Header = Faker.fake();
header.marker = FrameMarker::Single;
header.extensions.e2ee = None;
let frame = Packet { header, payload: vec![Faker.fake(); PAYLOAD_LEN].into() };
let frame = pipeline.process_packet(frame).expect("Should return a frame");
assert_eq!(frame.payload.len(), PAYLOAD_LEN);
}
}