use crate::config::RaptorQConfig;
use crate::error::{Error, ErrorKind};
use crate::observability::Metrics;
use crate::security::SecurityContext;
use crate::transport::sink::SymbolSink;
use crate::transport::stream::SymbolStream;
use super::pipeline::{RaptorQReceiver, RaptorQSender};
pub struct RaptorQSenderBuilder<T = ()> {
config: Option<RaptorQConfig>,
transport: Option<T>,
security: Option<SecurityContext>,
metrics: Option<Metrics>,
}
impl<T> Default for RaptorQSenderBuilder<T> {
fn default() -> Self {
Self {
config: None,
transport: None,
security: None,
metrics: None,
}
}
}
impl RaptorQSenderBuilder<()> {
#[must_use]
#[inline]
pub fn new() -> Self {
Self::default()
}
}
impl<T> RaptorQSenderBuilder<T> {
#[must_use]
#[inline]
pub fn config(mut self, config: RaptorQConfig) -> Self {
self.config = Some(config);
self
}
#[must_use]
#[inline]
pub fn transport<U>(self, transport: U) -> RaptorQSenderBuilder<U> {
RaptorQSenderBuilder {
config: self.config,
transport: Some(transport),
security: self.security,
metrics: self.metrics,
}
}
#[must_use]
#[inline]
pub fn security(mut self, ctx: SecurityContext) -> Self {
self.security = Some(ctx);
self
}
#[must_use]
#[inline]
pub fn metrics(mut self, metrics: Metrics) -> Self {
self.metrics = Some(metrics);
self
}
}
impl<T: SymbolSink + Unpin> RaptorQSenderBuilder<T> {
#[allow(clippy::result_large_err)]
pub fn build(self) -> Result<RaptorQSender<T>, Error> {
let transport = self.transport.ok_or_else(|| {
Error::new(ErrorKind::InvalidEncodingParams)
.with_message("transport is required for RaptorQSender")
})?;
let config = self.config.unwrap_or_default();
config.validate().map_err(|e| {
Error::new(ErrorKind::InvalidEncodingParams).with_message(e.to_string())
})?;
Ok(RaptorQSender::new(
config,
transport,
self.security,
self.metrics,
))
}
}
pub struct RaptorQReceiverBuilder<S = ()> {
config: Option<RaptorQConfig>,
source: Option<S>,
security: Option<SecurityContext>,
metrics: Option<Metrics>,
}
impl<S> Default for RaptorQReceiverBuilder<S> {
fn default() -> Self {
Self {
config: None,
source: None,
security: None,
metrics: None,
}
}
}
impl RaptorQReceiverBuilder<()> {
#[must_use]
#[inline]
pub fn new() -> Self {
Self::default()
}
}
impl<S> RaptorQReceiverBuilder<S> {
#[must_use]
#[inline]
pub fn config(mut self, config: RaptorQConfig) -> Self {
self.config = Some(config);
self
}
#[must_use]
#[inline]
pub fn source<U>(self, source: U) -> RaptorQReceiverBuilder<U> {
RaptorQReceiverBuilder {
config: self.config,
source: Some(source),
security: self.security,
metrics: self.metrics,
}
}
#[must_use]
pub fn security(mut self, ctx: SecurityContext) -> Self {
self.security = Some(ctx);
self
}
#[must_use]
#[inline]
pub fn metrics(mut self, metrics: Metrics) -> Self {
self.metrics = Some(metrics);
self
}
}
impl<S: SymbolStream + Unpin> RaptorQReceiverBuilder<S> {
#[allow(clippy::result_large_err)]
pub fn build(self) -> Result<RaptorQReceiver<S>, Error> {
let source = self.source.ok_or_else(|| {
Error::new(ErrorKind::InvalidEncodingParams)
.with_message("source is required for RaptorQReceiver")
})?;
let config = self.config.unwrap_or_default();
config.validate().map_err(|e| {
Error::new(ErrorKind::InvalidEncodingParams).with_message(e.to_string())
})?;
Ok(RaptorQReceiver::new(
config,
source,
self.security,
self.metrics,
))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::transport::error::{SinkError, StreamError};
use std::pin::Pin;
use std::task::{Context, Poll};
struct NoopSink;
impl SymbolSink for NoopSink {
fn poll_send(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
_symbol: crate::security::AuthenticatedSymbol,
) -> Poll<Result<(), SinkError>> {
Poll::Ready(Ok(()))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), SinkError>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), SinkError>> {
Poll::Ready(Ok(()))
}
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), SinkError>> {
Poll::Ready(Ok(()))
}
}
impl Unpin for NoopSink {}
struct NoopStream;
impl SymbolStream for NoopStream {
fn poll_next(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<crate::security::AuthenticatedSymbol, StreamError>>> {
Poll::Ready(None)
}
}
impl Unpin for NoopStream {}
#[test]
fn test_sender_builder_missing_transport_errors() {
let result = RaptorQSenderBuilder::<NoopSink>::default()
.config(RaptorQConfig::default())
.build();
match result {
Err(err) => assert_eq!(err.kind(), ErrorKind::InvalidEncodingParams),
Ok(_) => panic!("expected missing transport error"),
}
}
#[test]
fn test_receiver_builder_missing_source_errors() {
let result = RaptorQReceiverBuilder::<NoopStream>::default()
.config(RaptorQConfig::default())
.build();
match result {
Err(err) => assert_eq!(err.kind(), ErrorKind::InvalidEncodingParams),
Ok(_) => panic!("expected missing source error"),
}
}
#[test]
fn test_sender_builder_invalid_config_errors() {
let mut config = RaptorQConfig::default();
config.encoding.symbol_size = 0;
let result = RaptorQSenderBuilder::new()
.config(config)
.transport(NoopSink)
.build();
match result {
Err(err) => assert_eq!(err.kind(), ErrorKind::InvalidEncodingParams),
Ok(_) => panic!("expected invalid config error"),
}
}
#[test]
fn test_receiver_builder_invalid_config_errors() {
let mut config = RaptorQConfig::default();
config.encoding.symbol_size = 0;
let result = RaptorQReceiverBuilder::new()
.config(config)
.source(NoopStream)
.build();
match result {
Err(err) => assert_eq!(err.kind(), ErrorKind::InvalidEncodingParams),
Ok(_) => panic!("expected invalid config error"),
}
}
#[test]
fn test_sender_builder_default_config_used_when_missing() {
let sender = RaptorQSenderBuilder::new()
.transport(NoopSink)
.build()
.unwrap();
assert_eq!(sender.config().encoding.symbol_size, 256);
}
#[test]
fn test_sender_builder_same_inputs_are_idempotent() {
let mut config = RaptorQConfig::default();
config.encoding.symbol_size = 512;
config.encoding.repair_overhead = 1.25;
let expected = format!("{config:?}");
let sender_a = RaptorQSenderBuilder::new()
.config(config.clone())
.transport(NoopSink)
.build()
.unwrap();
let sender_b = RaptorQSenderBuilder::new()
.config(config)
.transport(NoopSink)
.build()
.unwrap();
assert_eq!(format!("{:?}", sender_a.config()), expected);
assert_eq!(format!("{:?}", sender_b.config()), expected);
assert_eq!(
format!("{:?}", sender_a.config()),
format!("{:?}", sender_b.config())
);
}
#[test]
fn test_receiver_builder_accepts_security_and_metrics() {
let security = SecurityContext::for_testing(7);
let metrics = Metrics::new();
let receiver = RaptorQReceiverBuilder::new()
.source(NoopStream)
.security(security)
.metrics(metrics)
.build();
assert!(receiver.is_ok());
}
}