use crate::compat::rand::random;
use crate::compat::vec::Vec;
use crate::flow_control::{ConsumersInfo, FlowControlId, FlowControls, ProducerInfo};
use crate::Address;
impl FlowControls {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
Self {
consumers: Default::default(),
producers: Default::default(),
producers_additional_addresses: Default::default(),
spawners: Default::default(),
}
}
}
impl FlowControls {
pub fn generate_flow_control_id() -> FlowControlId {
random()
}
pub fn add_consumer(&self, address: impl Into<Address>, flow_control_id: &FlowControlId) {
let address = address.into();
debug!("Add Consumer {address} to Producer {flow_control_id}");
let mut consumers = self.consumers.write().unwrap();
if !consumers.contains_key(flow_control_id) {
consumers.insert(flow_control_id.clone(), Default::default());
}
let flow_control_consumers = consumers.get_mut(flow_control_id).unwrap();
flow_control_consumers.0.insert(address);
}
pub fn add_producer(
&self,
address: impl Into<Address>,
flow_control_id: &FlowControlId,
spawner_flow_control_id: Option<&FlowControlId>,
additional_addresses: Vec<Address>,
) {
let address = address.into();
debug!(
"Add Producer {address} with additional_addresses {:?} to {flow_control_id} with spawner {:?}", additional_addresses, spawner_flow_control_id
);
let mut producers = self.producers.write().unwrap();
producers.insert(
address.clone(),
ProducerInfo {
flow_control_id: flow_control_id.clone(),
spawner_flow_control_id: spawner_flow_control_id.cloned(),
},
);
drop(producers);
let mut producers_additional_addresses =
self.producers_additional_addresses.write().unwrap();
producers_additional_addresses.insert(address.clone(), address.clone());
for additional_address in additional_addresses {
producers_additional_addresses.insert(additional_address, address.clone());
}
}
pub fn add_spawner(&self, address: impl Into<Address>, flow_control_id: &FlowControlId) {
let address = address.into();
debug!("Add Spawner {address} with {flow_control_id}");
let mut spawners = self.spawners.write().unwrap();
spawners.insert(address, flow_control_id.clone());
}
pub fn get_consumers_info(&self, flow_control_id: &FlowControlId) -> ConsumersInfo {
let consumers = self.consumers.read().unwrap();
consumers.get(flow_control_id).cloned().unwrap_or_default()
}
pub fn get_flow_control_with_spawner(&self, address: &Address) -> Option<FlowControlId> {
let spawners = self.spawners.read().unwrap();
spawners.get(address).cloned()
}
pub fn get_flow_control_with_producer(&self, address: &Address) -> Option<ProducerInfo> {
let producers = self.producers.read().unwrap();
producers.get(address).cloned()
}
pub fn find_flow_control_with_producer_address(
&self,
address: &Address,
) -> Option<ProducerInfo> {
let producers_additional_addresses = self.producers_additional_addresses.read().unwrap();
let producer_address = match producers_additional_addresses.get(address) {
Some(address) => address.clone(),
None => return None,
};
drop(producers_additional_addresses);
let producers = self.producers.read().unwrap();
producers.get(&producer_address).cloned()
}
}