#[cfg(feature = "dandelion")]
use blvm_spec_lock::spec_locked;
use blvm_protocol::Hash;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use std::collections::HashMap;
use std::time::{Duration, Instant};
use tracing::debug;
pub struct DandelionRelay<C: Clock = SystemClock> {
stem_paths: HashMap<String, StemPath>,
stem_txs: HashMap<Hash, StemState>,
stem_timeout: Duration,
fluff_probability: f64,
max_stem_hops: u8,
rng: StdRng,
clock: C,
}
#[derive(Debug, Clone)]
struct StemPath {
next_peer: String,
expiry: Instant,
hop_count: u8,
}
#[derive(Debug, Clone)]
pub struct StemState {
current_peer: String,
next_peer: Option<String>,
stem_start: Instant,
hops: u8,
source_peer: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DandelionPhase {
Stem,
Fluff,
}
impl Default for DandelionRelay<SystemClock> {
fn default() -> Self {
Self::new()
}
}
pub trait Clock: Clone + Send + Sync + 'static {
fn now(&self) -> Instant;
}
#[derive(Clone)]
pub struct SystemClock;
impl Clock for SystemClock {
fn now(&self) -> Instant {
Instant::now()
}
}
impl DandelionRelay<SystemClock> {
pub fn new() -> Self {
Self {
stem_paths: HashMap::new(),
stem_txs: HashMap::new(),
stem_timeout: Duration::from_secs(10),
fluff_probability: 0.1, max_stem_hops: 2,
rng: StdRng::from_entropy(),
clock: SystemClock,
}
}
pub fn with_params(stem_timeout: Duration, fluff_probability: f64, max_stem_hops: u8) -> Self {
Self {
stem_paths: HashMap::new(),
stem_txs: HashMap::new(),
stem_timeout,
fluff_probability,
max_stem_hops,
rng: StdRng::from_entropy(),
clock: SystemClock,
}
}
}
impl<C: Clock> DandelionRelay<C> {
pub fn with_rng_and_clock(rng: StdRng, clock: C) -> Self {
Self {
stem_paths: HashMap::new(),
stem_txs: HashMap::new(),
stem_timeout: Duration::from_secs(10),
fluff_probability: 0.1,
max_stem_hops: 2,
rng,
clock,
}
}
pub fn set_stem_timeout(&mut self, timeout: Duration) {
self.stem_timeout = timeout;
}
pub fn set_fluff_probability(&mut self, p: f64) {
self.fluff_probability = p;
}
pub fn set_max_stem_hops(&mut self, hops: u8) {
self.max_stem_hops = hops;
}
pub fn set_clock(&mut self, clock: C) {
self.clock = clock;
}
pub fn initialize_stem_path(
&mut self,
peer_id: String,
available_peers: &[String],
) -> Option<String> {
let rng = &mut self.rng;
let candidates: Vec<_> = available_peers.iter().filter(|p| *p != &peer_id).collect();
if candidates.is_empty() {
return None;
}
let next_peer = candidates[rng.gen_range(0..candidates.len())].clone();
let path = StemPath {
next_peer: next_peer.clone(),
expiry: self.clock.now() + Duration::from_secs(600), hop_count: 0,
};
self.stem_paths.insert(peer_id.clone(), path);
debug!(
"Initialized Dandelion stem path: {} -> {}",
peer_id, next_peer
);
Some(next_peer)
}
pub fn update_stem_path(&mut self, peer_id: &str, available_peers: &[String]) {
if let Some(path) = self.stem_paths.get_mut(peer_id) {
let rng = &mut self.rng;
let candidates: Vec<_> = available_peers
.iter()
.filter(|p| *p != peer_id && *p != &path.next_peer)
.collect();
if !candidates.is_empty() {
path.next_peer = candidates[rng.gen_range(0..candidates.len())].clone();
path.hop_count += 1;
debug!(
"Updated Dandelion stem path: {} -> {} (hop {})",
peer_id, path.next_peer, path.hop_count
);
}
}
}
#[spec_locked("10.6")]
pub fn start_stem_phase(
&mut self,
tx_hash: Hash,
current_peer: String,
available_peers: &[String],
) -> Option<String> {
let next_peer = if let Some(path) = self.stem_paths.get(¤t_peer) {
if path.expiry > Instant::now() {
Some(path.next_peer.clone())
} else {
self.initialize_stem_path(current_peer.clone(), available_peers)
}
} else {
self.initialize_stem_path(current_peer.clone(), available_peers)
};
if let Some(next) = next_peer.as_ref() {
let stem_state = StemState {
current_peer: current_peer.clone(),
next_peer: Some(next.clone()),
stem_start: self.clock.now(),
hops: 0,
source_peer: Some(current_peer.clone()),
};
self.stem_txs.insert(tx_hash, stem_state);
debug!(
"Started Dandelion stem phase for tx {}: {} -> {}",
hex::encode(tx_hash),
current_peer,
next
);
}
next_peer
}
#[spec_locked("10.6")]
pub fn should_fluff(&mut self, tx_hash: &Hash) -> bool {
if let Some(state) = self.stem_txs.get(tx_hash) {
if self.clock.now().duration_since(state.stem_start) >= self.stem_timeout {
debug!("Dandelion stem timeout for tx {}", hex::encode(tx_hash));
return true;
}
if state.hops >= self.max_stem_hops {
debug!("Dandelion max hops reached for tx {}", hex::encode(tx_hash));
return true;
}
if self.rng.gen::<f64>() < self.fluff_probability {
debug!("Dandelion random fluff for tx {}", hex::encode(tx_hash));
return true;
}
}
false
}
#[spec_locked("10.6")]
pub fn advance_stem(&mut self, tx_hash: Hash, available_peers: &[String]) -> Option<String> {
if let Some(state) = self.stem_txs.get_mut(&tx_hash) {
state.hops += 1;
let current_peer = state.current_peer.clone();
let _ = state;
self.update_stem_path(¤t_peer, available_peers);
if let Some(state2) = self.stem_txs.get_mut(&tx_hash) {
if let Some(path) = self.stem_paths.get(¤t_peer) {
let next = path.next_peer.clone();
state2.current_peer = next.clone();
state2.next_peer = Some(next.clone());
debug!(
"Advanced Dandelion stem for tx {}: hop {} -> {}",
hex::encode(tx_hash),
state2.hops,
next
);
return Some(next);
}
}
}
None
}
#[spec_locked("10.6")]
pub fn transition_to_fluff(&mut self, tx_hash: Hash) -> DandelionPhase {
self.stem_txs.remove(&tx_hash);
debug!(
"Transitioned tx {} to Dandelion fluff phase",
hex::encode(tx_hash)
);
DandelionPhase::Fluff
}
#[spec_locked("10.6")]
pub fn get_phase(&self, tx_hash: &Hash) -> Option<DandelionPhase> {
if self.stem_txs.contains_key(tx_hash) {
Some(DandelionPhase::Stem)
} else {
Some(DandelionPhase::Fluff)
}
}
pub fn get_stem_peer(&self, tx_hash: &Hash) -> Option<String> {
self.stem_txs.get(tx_hash).and_then(|s| s.next_peer.clone())
}
pub fn cleanup_expired(&mut self) {
let now = self.clock.now();
self.stem_paths.retain(|_, path| path.expiry > now);
self.stem_txs.retain(|_, state| {
self.clock.now().duration_since(state.stem_start) < self.stem_timeout * 2
});
}
pub fn get_stats(&self) -> DandelionStats {
DandelionStats {
active_stem_paths: self.stem_paths.len(),
stem_transactions: self.stem_txs.len(),
stem_timeout_secs: self.stem_timeout.as_secs(),
fluff_probability: self.fluff_probability,
max_stem_hops: self.max_stem_hops,
}
}
}
#[derive(Debug, Clone)]
pub struct DandelionStats {
pub active_stem_paths: usize,
pub stem_transactions: usize,
pub stem_timeout_secs: u64,
pub fluff_probability: f64,
pub max_stem_hops: u8,
}
#[cfg(test)]
mod tests {
use super::*;
#[derive(Clone)]
struct TestClock {
now: Instant,
}
impl TestClock {
fn new(start: Instant) -> Self {
Self { now: start }
}
fn advance(&mut self, d: Duration) {
self.now += d;
}
}
impl Clock for TestClock {
fn now(&self) -> Instant {
self.now
}
}
fn peers() -> Vec<String> {
vec!["p1".into(), "p2".into(), "p3".into(), "p4".into()]
}
#[test]
fn stem_initialization_and_advance() {
let rng = StdRng::seed_from_u64(42);
let clock = TestClock::new(Instant::now());
let mut d: DandelionRelay<TestClock> =
DandelionRelay::with_rng_and_clock(rng, clock.clone());
let next = d.initialize_stem_path("p1".into(), &peers());
assert!(next.is_some());
let tx = [1u8; 32];
let hop = d.start_stem_phase(tx, "p1".into(), &peers());
assert!(hop.is_some());
let _ = d.advance_stem(tx, &peers());
assert_eq!(d.get_phase(&tx), Some(DandelionPhase::Stem));
}
#[test]
fn timeout_triggers_fluff() {
let rng = StdRng::seed_from_u64(7);
let start = Instant::now();
let mut clock = TestClock::new(start);
let mut d: DandelionRelay<TestClock> =
DandelionRelay::with_rng_and_clock(rng, clock.clone());
d.stem_timeout = Duration::from_millis(50);
let tx = [2u8; 32];
let _ = d.start_stem_phase(tx, "p2".into(), &peers());
assert_eq!(d.get_phase(&tx), Some(DandelionPhase::Stem));
clock.advance(Duration::from_millis(60));
d.clock = clock.clone();
assert!(d.should_fluff(&tx));
}
}