use crate::combinator::quorum::{QuorumResult, quorum_outcomes};
use crate::error::ErrorKind;
use crate::record::distributed_region::{ConsistencyLevel, ReplicaInfo};
use crate::security::SecurityContext;
use crate::security::authenticated::AuthenticatedSymbol;
use crate::types::symbol::ObjectId;
use crate::types::{Outcome, Time};
use std::future::Future;
use std::time::Duration;
use super::assignment::{AssignmentStrategy, SymbolAssigner};
use super::encoding::EncodedState;
pub trait DistributorTransport: Sync {
fn send_symbols(
&self,
replica_id: &str,
symbols: Vec<AuthenticatedSymbol>,
) -> impl Future<Output = Result<ReplicaAck, ReplicaFailure>> + Send;
}
#[derive(Debug, Clone)]
pub struct DistributionConfig {
pub consistency: ConsistencyLevel,
pub ack_timeout: Duration,
pub max_concurrent: usize,
pub hedge_enabled: bool,
pub hedge_delay: Duration,
}
impl Default for DistributionConfig {
fn default() -> Self {
Self {
consistency: ConsistencyLevel::Quorum,
ack_timeout: Duration::from_secs(5),
max_concurrent: 10,
hedge_enabled: false,
hedge_delay: Duration::from_millis(50),
}
}
}
#[derive(Debug)]
pub struct DistributionResult {
pub object_id: ObjectId,
pub symbols_distributed: u32,
pub acks: Vec<ReplicaAck>,
pub failures: Vec<ReplicaFailure>,
pub quorum_achieved: bool,
pub duration: Duration,
}
#[derive(Debug, Clone)]
pub struct ReplicaAck {
pub replica_id: String,
pub symbols_received: u32,
pub ack_time: Time,
}
#[derive(Debug, Clone)]
pub struct ReplicaFailure {
pub replica_id: String,
pub error: String,
pub error_kind: ErrorKind,
}
#[derive(Debug, Default)]
pub struct DistributionMetrics {
pub distributions_total: u64,
pub distributions_successful: u64,
pub distributions_failed: u64,
pub symbols_sent_total: u64,
pub acks_received_total: u64,
pub quorum_achieved_count: u64,
pub quorum_missed_count: u64,
}
pub struct SymbolDistributor {
config: DistributionConfig,
pub metrics: DistributionMetrics,
}
impl SymbolDistributor {
#[must_use]
pub fn new(config: DistributionConfig) -> Self {
Self {
config,
metrics: DistributionMetrics::default(),
}
}
#[must_use]
pub fn config(&self) -> &DistributionConfig {
&self.config
}
pub async fn distribute<T: DistributorTransport>(
&mut self,
encoded: &EncodedState,
replicas: &[ReplicaInfo],
transport: &T,
auth_context: &SecurityContext,
) -> DistributionResult {
let start = std::time::Instant::now();
let assignments = Self::compute_assignments(encoded, replicas);
let mut outcomes = Vec::with_capacity(assignments.len());
let mut symbols_sent_total = 0_u64;
for assignment in assignments {
let symbols_for_replica: Vec<AuthenticatedSymbol> = assignment
.symbol_indices
.iter()
.map(|&idx| {
let sym = &encoded.symbols[idx];
auth_context.sign_symbol(sym)
})
.collect();
if symbols_for_replica.is_empty() {
continue;
}
symbols_sent_total =
symbols_sent_total.saturating_add(symbols_for_replica.len() as u64);
let result = transport
.send_symbols(&assignment.replica_id, symbols_for_replica)
.await;
outcomes.push(match result {
Ok(ack) => Outcome::Ok(ack),
Err(fail) => Outcome::Err(fail),
});
}
self.evaluate_outcomes_with_sent(
encoded,
replicas,
outcomes,
symbols_sent_total,
start.elapsed(),
)
}
#[inline]
#[must_use]
pub fn required_acks(consistency: ConsistencyLevel, replica_count: usize) -> usize {
match consistency {
ConsistencyLevel::One => 1,
ConsistencyLevel::Quorum => (replica_count / 2) + 1,
ConsistencyLevel::All => replica_count,
ConsistencyLevel::Local => 0,
}
}
#[inline]
#[must_use]
pub fn compute_assignments(
encoded: &EncodedState,
replicas: &[ReplicaInfo],
) -> Vec<super::assignment::ReplicaAssignment> {
let assigner = SymbolAssigner::new(AssignmentStrategy::Full);
assigner.assign(&encoded.symbols, replicas, encoded.source_count)
}
pub fn evaluate_outcomes(
&mut self,
encoded: &EncodedState,
replicas: &[ReplicaInfo],
outcomes: Vec<Outcome<ReplicaAck, ReplicaFailure>>,
duration: Duration,
) -> DistributionResult {
let symbols_sent_total = Self::compute_assignments(encoded, replicas)
.into_iter()
.map(|assignment| assignment.symbol_indices.len() as u64)
.sum();
self.evaluate_outcomes_with_sent(encoded, replicas, outcomes, symbols_sent_total, duration)
}
fn evaluate_outcomes_with_sent(
&mut self,
encoded: &EncodedState,
replicas: &[ReplicaInfo],
outcomes: Vec<Outcome<ReplicaAck, ReplicaFailure>>,
symbols_sent_total: u64,
duration: Duration,
) -> DistributionResult {
let required = Self::required_acks(self.config.consistency, replicas.len());
let quorum_result: QuorumResult<ReplicaAck, ReplicaFailure> =
quorum_outcomes(required, outcomes);
self.metrics.distributions_total = self.metrics.distributions_total.saturating_add(1);
self.metrics.symbols_sent_total = self
.metrics
.symbols_sent_total
.saturating_add(symbols_sent_total);
let acks: Vec<ReplicaAck> = quorum_result
.successes
.into_iter()
.map(|(_, ack)| ack)
.collect();
let failures: Vec<ReplicaFailure> = quorum_result
.failures
.into_iter()
.filter_map(|(_, f)| match f {
crate::combinator::quorum::QuorumFailure::Error(e) => Some(e),
_ => None,
})
.collect();
self.metrics.acks_received_total = self
.metrics
.acks_received_total
.saturating_add(acks.len() as u64);
if quorum_result.quorum_met {
self.metrics.distributions_successful =
self.metrics.distributions_successful.saturating_add(1);
self.metrics.quorum_achieved_count =
self.metrics.quorum_achieved_count.saturating_add(1);
} else {
self.metrics.distributions_failed = self.metrics.distributions_failed.saturating_add(1);
self.metrics.quorum_missed_count = self.metrics.quorum_missed_count.saturating_add(1);
}
DistributionResult {
object_id: encoded.params.object_id,
symbols_distributed: u32::try_from(symbols_sent_total).unwrap_or(u32::MAX),
acks,
failures,
quorum_achieved: quorum_result.quorum_met,
duration,
}
}
}
impl std::fmt::Debug for SymbolDistributor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SymbolDistributor")
.field("config", &self.config)
.field("metrics", &self.metrics)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::symbol::{ObjectParams, Symbol};
fn create_test_replicas(count: usize) -> Vec<ReplicaInfo> {
(0..count)
.map(|i| ReplicaInfo::new(&format!("r{i}"), &format!("addr{i}")))
.collect()
}
fn create_test_symbols(count: usize) -> Vec<Symbol> {
(0..count)
.map(|i| Symbol::new_for_test(1, 0, i as u32, &[0u8; 128]))
.collect()
}
fn create_test_encoded_state() -> EncodedState {
EncodedState {
params: ObjectParams::new_for_test(1, 1024),
symbols: create_test_symbols(10),
source_count: 8,
repair_count: 2,
original_size: 1000,
encoded_at: Time::ZERO,
}
}
fn make_ack(replica_id: &str, count: u32) -> ReplicaAck {
ReplicaAck {
replica_id: replica_id.to_string(),
symbols_received: count,
ack_time: Time::ZERO,
}
}
fn make_failure(replica_id: &str) -> ReplicaFailure {
ReplicaFailure {
replica_id: replica_id.to_string(),
error: "connection refused".to_string(),
error_kind: ErrorKind::NodeUnavailable,
}
}
struct MockSuccessTransport;
impl DistributorTransport for MockSuccessTransport {
fn send_symbols(
&self,
replica_id: &str,
symbols: Vec<AuthenticatedSymbol>,
) -> impl Future<Output = Result<ReplicaAck, ReplicaFailure>> + Send {
let replica_id = replica_id.to_string();
let symbol_count = symbols.len() as u32;
async move { Ok(make_ack(&replica_id, symbol_count)) }
}
}
#[test]
fn distribute_with_quorum_consistency() {
let config = DistributionConfig {
consistency: ConsistencyLevel::Quorum,
..Default::default()
};
let mut distributor = SymbolDistributor::new(config);
let replicas = create_test_replicas(3);
let encoded = create_test_encoded_state();
let outcomes = vec![
Outcome::Ok(make_ack("r0", 10)),
Outcome::Ok(make_ack("r1", 10)),
Outcome::Err(make_failure("r2")),
];
let result =
distributor.evaluate_outcomes(&encoded, &replicas, outcomes, Duration::from_millis(50));
assert!(result.quorum_achieved);
assert_eq!(result.acks.len(), 2);
assert_eq!(result.failures.len(), 1);
}
#[test]
fn distribute_with_all_consistency() {
let config = DistributionConfig {
consistency: ConsistencyLevel::All,
..Default::default()
};
let mut distributor = SymbolDistributor::new(config);
let replicas = create_test_replicas(3);
let encoded = create_test_encoded_state();
let outcomes = vec![
Outcome::Ok(make_ack("r0", 10)),
Outcome::Ok(make_ack("r1", 10)),
Outcome::Err(make_failure("r2")),
];
let result =
distributor.evaluate_outcomes(&encoded, &replicas, outcomes, Duration::from_millis(50));
assert!(!result.quorum_achieved);
}
#[test]
fn distribute_tracks_failures() {
let config = DistributionConfig::default();
let mut distributor = SymbolDistributor::new(config);
let replicas = create_test_replicas(3);
let encoded = create_test_encoded_state();
let outcomes = vec![
Outcome::Ok(make_ack("r0", 10)),
Outcome::Ok(make_ack("r1", 10)),
Outcome::Err(make_failure("r2")),
];
let result =
distributor.evaluate_outcomes(&encoded, &replicas, outcomes, Duration::from_millis(50));
assert!(!result.failures.is_empty());
assert_eq!(result.failures.len(), 1);
assert_eq!(result.failures[0].replica_id, "r2");
}
#[test]
fn distribution_metrics_updated() {
let config = DistributionConfig::default();
let mut distributor = SymbolDistributor::new(config);
let replicas = create_test_replicas(3);
let encoded = create_test_encoded_state();
let outcomes = vec![
Outcome::Ok(make_ack("r0", 10)),
Outcome::Ok(make_ack("r1", 10)),
Outcome::Ok(make_ack("r2", 10)),
];
distributor.evaluate_outcomes(&encoded, &replicas, outcomes, Duration::from_millis(50));
assert_eq!(distributor.metrics.distributions_total, 1);
assert_eq!(distributor.metrics.distributions_successful, 1);
assert!(distributor.metrics.symbols_sent_total > 0);
assert_eq!(distributor.metrics.acks_received_total, 3);
}
#[test]
fn distribute_counts_symbols_sent_per_replica_attempt() {
let config = DistributionConfig::default();
let mut distributor = SymbolDistributor::new(config);
let replicas = create_test_replicas(3);
let encoded = create_test_encoded_state();
let auth_context = SecurityContext::for_testing(7);
let transport = MockSuccessTransport;
let expected_symbols_sent: u64 =
SymbolDistributor::compute_assignments(&encoded, &replicas)
.into_iter()
.map(|assignment| assignment.symbol_indices.len() as u64)
.sum();
let result = futures_lite::future::block_on(async {
distributor
.distribute(&encoded, &replicas, &transport, &auth_context)
.await
});
assert!(result.quorum_achieved);
assert_eq!(result.acks.len(), replicas.len());
assert_eq!(distributor.metrics.distributions_total, 1);
assert_eq!(
distributor.metrics.symbols_sent_total,
expected_symbols_sent
);
assert_eq!(
result.symbols_distributed,
u32::try_from(expected_symbols_sent).unwrap_or(u32::MAX)
);
}
#[test]
fn distribute_to_no_replicas() {
let config = DistributionConfig::default();
let mut distributor = SymbolDistributor::new(config);
let replicas: Vec<ReplicaInfo> = vec![];
let encoded = create_test_encoded_state();
let outcomes: Vec<Outcome<ReplicaAck, ReplicaFailure>> = vec![];
let result =
distributor.evaluate_outcomes(&encoded, &replicas, outcomes, Duration::from_millis(50));
assert!(!result.quorum_achieved);
assert_eq!(result.symbols_distributed, 0);
}
#[test]
fn evaluate_outcomes_reports_symbols_from_assignment_plan() {
let config = DistributionConfig::default();
let mut distributor = SymbolDistributor::new(config);
let replicas = create_test_replicas(3);
let encoded = create_test_encoded_state();
let assignments = SymbolDistributor::compute_assignments(&encoded, &replicas);
let expected_symbols_sent: u32 = assignments
.iter()
.map(|assignment| assignment.symbol_indices.len() as u32)
.sum();
let outcomes: Vec<Outcome<ReplicaAck, ReplicaFailure>> = assignments
.iter()
.map(|assignment| {
Outcome::Ok(make_ack(
&assignment.replica_id,
assignment.symbol_indices.len() as u32,
))
})
.collect();
let result =
distributor.evaluate_outcomes(&encoded, &replicas, outcomes, Duration::from_millis(50));
assert_eq!(result.symbols_distributed, expected_symbols_sent);
}
#[test]
fn required_acks_calculation() {
assert_eq!(
SymbolDistributor::required_acks(ConsistencyLevel::One, 3),
1
);
assert_eq!(
SymbolDistributor::required_acks(ConsistencyLevel::Quorum, 3),
2
);
assert_eq!(
SymbolDistributor::required_acks(ConsistencyLevel::Quorum, 5),
3
);
assert_eq!(
SymbolDistributor::required_acks(ConsistencyLevel::All, 3),
3
);
assert_eq!(
SymbolDistributor::required_acks(ConsistencyLevel::Local, 3),
0
);
}
#[test]
fn local_consistency_always_succeeds() {
let config = DistributionConfig {
consistency: ConsistencyLevel::Local,
..Default::default()
};
let mut distributor = SymbolDistributor::new(config);
let replicas = create_test_replicas(3);
let encoded = create_test_encoded_state();
let outcomes = vec![
Outcome::Err(make_failure("r0")),
Outcome::Err(make_failure("r1")),
Outcome::Err(make_failure("r2")),
];
let result =
distributor.evaluate_outcomes(&encoded, &replicas, outcomes, Duration::from_millis(50));
assert!(result.quorum_achieved);
}
#[test]
fn partial_ack_quorum_evaluation() {
let config = DistributionConfig {
consistency: ConsistencyLevel::Quorum,
..Default::default()
};
let mut distributor = SymbolDistributor::new(config);
let replicas = create_test_replicas(10);
let encoded = create_test_encoded_state();
let mut outcomes: Vec<Outcome<ReplicaAck, ReplicaFailure>> = Vec::new();
for i in 0..5 {
outcomes.push(Outcome::Ok(make_ack(&format!("r{i}"), 10)));
}
for i in 5..10 {
outcomes.push(Outcome::Err(make_failure(&format!("r{i}"))));
}
let result =
distributor.evaluate_outcomes(&encoded, &replicas, outcomes, Duration::from_millis(50));
assert!(!result.quorum_achieved);
assert_eq!(result.acks.len(), 5);
assert_eq!(result.failures.len(), 5);
}
#[test]
fn partial_ack_quorum_exactly_met() {
let config = DistributionConfig {
consistency: ConsistencyLevel::Quorum,
..Default::default()
};
let mut distributor = SymbolDistributor::new(config);
let replicas = create_test_replicas(10);
let encoded = create_test_encoded_state();
let mut outcomes: Vec<Outcome<ReplicaAck, ReplicaFailure>> = Vec::new();
for i in 0..6 {
outcomes.push(Outcome::Ok(make_ack(&format!("r{i}"), 10)));
}
for i in 6..10 {
outcomes.push(Outcome::Err(make_failure(&format!("r{i}"))));
}
let result =
distributor.evaluate_outcomes(&encoded, &replicas, outcomes, Duration::from_millis(50));
assert!(result.quorum_achieved);
assert_eq!(result.acks.len(), 6);
}
#[test]
fn quorum_with_only_one_replica_available() {
let config = DistributionConfig {
consistency: ConsistencyLevel::Quorum,
..Default::default()
};
let mut distributor = SymbolDistributor::new(config);
let replicas = create_test_replicas(1);
let encoded = create_test_encoded_state();
let outcomes = vec![Outcome::Ok(make_ack("r0", 10))];
let result =
distributor.evaluate_outcomes(&encoded, &replicas, outcomes, Duration::from_millis(50));
assert!(result.quorum_achieved);
}
#[test]
fn quorum_with_one_replica_failing() {
let config = DistributionConfig {
consistency: ConsistencyLevel::Quorum,
..Default::default()
};
let mut distributor = SymbolDistributor::new(config);
let replicas = create_test_replicas(1);
let encoded = create_test_encoded_state();
let outcomes = vec![Outcome::Err(make_failure("r0"))];
let result =
distributor.evaluate_outcomes(&encoded, &replicas, outcomes, Duration::from_millis(50));
assert!(!result.quorum_achieved);
}
#[test]
fn all_consistency_one_failure_breaks_quorum() {
let config = DistributionConfig {
consistency: ConsistencyLevel::All,
..Default::default()
};
let mut distributor = SymbolDistributor::new(config);
let replicas = create_test_replicas(5);
let encoded = create_test_encoded_state();
let mut outcomes: Vec<Outcome<ReplicaAck, ReplicaFailure>> = Vec::new();
for i in 0..4 {
outcomes.push(Outcome::Ok(make_ack(&format!("r{i}"), 10)));
}
outcomes.push(Outcome::Err(make_failure("r4")));
let result =
distributor.evaluate_outcomes(&encoded, &replicas, outcomes, Duration::from_millis(50));
assert!(!result.quorum_achieved);
assert_eq!(result.acks.len(), 4);
assert_eq!(result.failures.len(), 1);
}
#[test]
fn one_consistency_needs_only_one() {
let config = DistributionConfig {
consistency: ConsistencyLevel::One,
..Default::default()
};
let mut distributor = SymbolDistributor::new(config);
let replicas = create_test_replicas(5);
let encoded = create_test_encoded_state();
let mut outcomes: Vec<Outcome<ReplicaAck, ReplicaFailure>> = Vec::new();
outcomes.push(Outcome::Ok(make_ack("r0", 10)));
for i in 1..5 {
outcomes.push(Outcome::Err(make_failure(&format!("r{i}"))));
}
let result =
distributor.evaluate_outcomes(&encoded, &replicas, outcomes, Duration::from_millis(50));
assert!(result.quorum_achieved);
}
#[test]
fn distribution_zero_duration() {
let config = DistributionConfig::default();
let mut distributor = SymbolDistributor::new(config);
let replicas = create_test_replicas(3);
let encoded = create_test_encoded_state();
let outcomes = vec![
Outcome::Ok(make_ack("r0", 10)),
Outcome::Ok(make_ack("r1", 10)),
Outcome::Ok(make_ack("r2", 10)),
];
let result = distributor.evaluate_outcomes(&encoded, &replicas, outcomes, Duration::ZERO);
assert!(result.quorum_achieved);
assert_eq!(result.duration, Duration::ZERO);
}
#[test]
fn metrics_accumulate_across_multiple_distributions() {
let config = DistributionConfig::default();
let mut distributor = SymbolDistributor::new(config);
let replicas = create_test_replicas(3);
let encoded = create_test_encoded_state();
let outcomes1 = vec![
Outcome::Ok(make_ack("r0", 10)),
Outcome::Ok(make_ack("r1", 10)),
Outcome::Ok(make_ack("r2", 10)),
];
distributor.evaluate_outcomes(&encoded, &replicas, outcomes1, Duration::from_millis(50));
let outcomes2 = vec![
Outcome::Err(make_failure("r0")),
Outcome::Err(make_failure("r1")),
Outcome::Err(make_failure("r2")),
];
distributor.evaluate_outcomes(&encoded, &replicas, outcomes2, Duration::from_millis(50));
assert_eq!(distributor.metrics.distributions_total, 2);
assert_eq!(distributor.metrics.distributions_successful, 1);
assert_eq!(distributor.metrics.distributions_failed, 1);
assert_eq!(distributor.metrics.acks_received_total, 3);
assert_eq!(distributor.metrics.quorum_achieved_count, 1);
assert_eq!(distributor.metrics.quorum_missed_count, 1);
}
#[test]
fn required_acks_boundary_values() {
assert_eq!(
SymbolDistributor::required_acks(ConsistencyLevel::Quorum, 2),
2
); assert_eq!(
SymbolDistributor::required_acks(ConsistencyLevel::Quorum, 4),
3
); assert_eq!(
SymbolDistributor::required_acks(ConsistencyLevel::Quorum, 100),
51
);
assert_eq!(
SymbolDistributor::required_acks(ConsistencyLevel::One, 1),
1
);
assert_eq!(
SymbolDistributor::required_acks(ConsistencyLevel::All, 1),
1
);
assert_eq!(
SymbolDistributor::required_acks(ConsistencyLevel::Quorum, 1),
1
);
assert_eq!(
SymbolDistributor::required_acks(ConsistencyLevel::Local, 1),
0
);
assert_eq!(
SymbolDistributor::required_acks(ConsistencyLevel::Local, 0),
0
);
assert_eq!(
SymbolDistributor::required_acks(ConsistencyLevel::One, 0),
1
); }
#[test]
fn cancelled_outcomes_not_counted_as_failures() {
let config = DistributionConfig::default();
let mut distributor = SymbolDistributor::new(config);
let replicas = create_test_replicas(3);
let encoded = create_test_encoded_state();
let outcomes = vec![
Outcome::Ok(make_ack("r0", 10)),
Outcome::Ok(make_ack("r1", 10)),
Outcome::Cancelled(crate::types::CancelReason::timeout()),
];
let result =
distributor.evaluate_outcomes(&encoded, &replicas, outcomes, Duration::from_millis(50));
assert!(result.quorum_achieved);
assert_eq!(result.acks.len(), 2);
assert!(result.failures.is_empty());
}
#[test]
fn config_accessors() {
let config = DistributionConfig {
consistency: ConsistencyLevel::All,
ack_timeout: Duration::from_secs(10),
max_concurrent: 5,
hedge_enabled: true,
hedge_delay: Duration::from_millis(100),
};
let distributor = SymbolDistributor::new(config);
assert_eq!(distributor.config().consistency, ConsistencyLevel::All);
assert_eq!(distributor.config().ack_timeout, Duration::from_secs(10));
assert_eq!(distributor.config().max_concurrent, 5);
assert!(distributor.config().hedge_enabled);
}
#[test]
fn debug_format() {
let distributor = SymbolDistributor::new(DistributionConfig::default());
let debug = format!("{distributor:?}");
assert!(debug.contains("SymbolDistributor"));
assert!(debug.contains("config"));
assert!(debug.contains("metrics"));
}
#[test]
fn distribution_config_debug_clone() {
let cfg = DistributionConfig::default();
let dbg = format!("{cfg:?}");
assert!(dbg.contains("DistributionConfig"), "{dbg}");
let cloned = cfg.clone();
assert_eq!(cfg.max_concurrent, cloned.max_concurrent);
}
#[test]
fn replica_ack_debug_clone() {
let ack = ReplicaAck {
replica_id: "r0".to_string(),
symbols_received: 10,
ack_time: Time::ZERO,
};
let dbg = format!("{ack:?}");
assert!(dbg.contains("ReplicaAck"), "{dbg}");
let cloned = ack;
assert_eq!(cloned.replica_id, "r0");
}
#[test]
fn replica_failure_debug_clone() {
let fail = ReplicaFailure {
replica_id: "r1".to_string(),
error: "timeout".to_string(),
error_kind: ErrorKind::NodeUnavailable,
};
let dbg = format!("{fail:?}");
assert!(dbg.contains("ReplicaFailure"), "{dbg}");
let cloned = fail;
assert_eq!(cloned.error, "timeout");
}
}