1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use crate::compat::rand::random;
use crate::compat::vec::Vec;
use crate::flow_control::{ConsumersInfo, FlowControlId, FlowControls, ProducerInfo};
use crate::Address;

impl FlowControls {
    /// Constructor
    #[allow(clippy::new_without_default)]
    pub fn new() -> Self {
        Self {
            consumers: Default::default(),
            producers: Default::default(),
            producers_additional_addresses: Default::default(),
            spawners: Default::default(),
        }
    }
}

impl FlowControls {
    /// Generate a fresh random [`FlowControlId`]
    pub fn generate_flow_control_id() -> FlowControlId {
        random()
    }

    /// Mark that given [`Address`] is a Consumer for a Producer with the given [`FlowControlId`]
    pub fn add_consumer(&self, address: impl Into<Address>, flow_control_id: &FlowControlId) {
        let address = address.into();
        debug!("Add Consumer {address} to Producer {flow_control_id}");
        let mut consumers = self.consumers.write().unwrap();
        if !consumers.contains_key(flow_control_id) {
            consumers.insert(flow_control_id.clone(), Default::default());
        }

        let flow_control_consumers = consumers.get_mut(flow_control_id).unwrap();

        flow_control_consumers.0.insert(address);
    }

    /// Mark that given [`Address`] is a Producer for to the given [`FlowControlId`]
    /// Also, mark that this Producer was spawned by a Spawner
    /// with the given spawner [`FlowControlId`] (if that's the case).
    pub fn add_producer(
        &self,
        address: impl Into<Address>,
        flow_control_id: &FlowControlId,
        spawner_flow_control_id: Option<&FlowControlId>,
        additional_addresses: Vec<Address>,
    ) {
        let address = address.into();
        debug!(
            "Add Producer {address} with additional_addresses {:?} to {flow_control_id} with spawner {:?}", additional_addresses, spawner_flow_control_id
        );
        let mut producers = self.producers.write().unwrap();
        producers.insert(
            address.clone(),
            ProducerInfo {
                flow_control_id: flow_control_id.clone(),
                spawner_flow_control_id: spawner_flow_control_id.cloned(),
            },
        );
        drop(producers);

        let mut producers_additional_addresses =
            self.producers_additional_addresses.write().unwrap();
        producers_additional_addresses.insert(address.clone(), address.clone());
        for additional_address in additional_addresses {
            producers_additional_addresses.insert(additional_address, address.clone());
        }
    }

    /// Mark that given [`Address`] is a Spawner for to the given [`FlowControlId`]
    pub fn add_spawner(&self, address: impl Into<Address>, flow_control_id: &FlowControlId) {
        let address = address.into();
        debug!("Add Spawner {address} with {flow_control_id}");
        let mut spawners = self.spawners.write().unwrap();

        spawners.insert(address, flow_control_id.clone());
    }

    /// Get known Consumers for the given [`FlowControlId`]
    pub fn get_consumers_info(&self, flow_control_id: &FlowControlId) -> ConsumersInfo {
        let consumers = self.consumers.read().unwrap();
        consumers.get(flow_control_id).cloned().unwrap_or_default()
    }

    /// Get [`FlowControlId`] for which given [`Address`] is a Spawner
    pub fn get_flow_control_with_spawner(&self, address: &Address) -> Option<FlowControlId> {
        let spawners = self.spawners.read().unwrap();
        spawners.get(address).cloned()
    }

    /// Get [`ProducerInfo`] for which given [`Address`] is a Producer
    pub fn get_flow_control_with_producer(&self, address: &Address) -> Option<ProducerInfo> {
        let producers = self.producers.read().unwrap();
        producers.get(address).cloned()
    }

    /// Get [`ProducerInfo`] for which given [`Address`] is a Producer or is an additional [`Address`]
    /// for that Producer (e.g. Encryptor address for its Decryptor, or TCP Sender for its TCP Receiver)
    pub fn find_flow_control_with_producer_address(
        &self,
        address: &Address,
    ) -> Option<ProducerInfo> {
        let producers_additional_addresses = self.producers_additional_addresses.read().unwrap();
        let producer_address = match producers_additional_addresses.get(address) {
            Some(address) => address.clone(),
            None => return None,
        };
        drop(producers_additional_addresses);
        let producers = self.producers.read().unwrap();
        producers.get(&producer_address).cloned()
    }
}