nimble_host_logic/
combinator.rs1use err_rs::{ErrorLevel, ErrorLevelProvider};
6use log::trace;
7use nimble_participant::ParticipantId;
8use nimble_step::Step;
9use nimble_step_map::StepMap;
10use seq_map::SeqMapError;
11use std::collections::HashMap;
12use tick_id::TickId;
13use tick_queue::{Queue, QueueError};
14
15#[derive(Debug)]
16#[allow(clippy::module_name_repetitions)] pub enum CombinatorError {
18 NotReadyToProduceStep {
19 can_provide: usize,
20 can_not_provide: usize,
21 },
22 OtherError,
23 SeqMapError(SeqMapError),
24 NoBufferForParticipant,
25 QueueError(QueueError),
26}
27
28impl From<QueueError> for CombinatorError {
29 fn from(e: QueueError) -> Self {
30 Self::QueueError(e)
31 }
32}
33
34impl ErrorLevelProvider for CombinatorError {
35 fn error_level(&self) -> ErrorLevel {
36 match self {
37 Self::NotReadyToProduceStep { .. }
38 | Self::OtherError
39 | Self::SeqMapError(_)
40 | Self::NoBufferForParticipant => ErrorLevel::Info,
41 Self::QueueError(_) => ErrorLevel::Critical,
42 }
43 }
44}
45
46impl From<SeqMapError> for CombinatorError {
47 fn from(value: SeqMapError) -> Self {
48 Self::SeqMapError(value)
49 }
50}
51
52#[derive(Default)]
53pub struct Combinator<T: Clone> {
54 pub in_buffers: HashMap<ParticipantId, Queue<T>>,
55 pub tick_id_to_produce: TickId,
56}
57
58impl<T: Clone + std::fmt::Display> Combinator<T> {
59 #[must_use]
60 pub fn new(tick_id_to_produce: TickId) -> Self {
61 Self {
62 in_buffers: HashMap::new(),
63 tick_id_to_produce,
64 }
65 }
66
67 pub fn create_buffer(&mut self, id: ParticipantId) {
68 self.in_buffers.insert(id, Queue::default());
69 }
70
71 pub fn add(
75 &mut self,
76 id: ParticipantId,
77 tick_id: TickId,
78 step: T,
79 ) -> Result<(), CombinatorError> {
80 if let Some(buffer) = self.in_buffers.get_mut(&id) {
81 buffer.push(tick_id, step)?;
82 Ok(())
83 } else {
84 Err(CombinatorError::NoBufferForParticipant)
85 }
86 }
87
88 pub fn get_mut(&mut self, id: &ParticipantId) -> Option<&mut Queue<T>> {
89 self.in_buffers.get_mut(id)
90 }
91
92 #[must_use]
93 pub fn participants_that_can_provide(&self) -> (usize, usize) {
94 let mut participant_count_that_can_not_give_step = 0;
95 let mut participant_count_that_can_provide_step = 0;
96 for steps in self.in_buffers.values() {
97 if let Some(first_tick) = steps.front_tick_id() {
98 if first_tick == self.tick_id_to_produce {
99 participant_count_that_can_provide_step += 1;
100 } else {
101 participant_count_that_can_not_give_step += 1;
102 }
103 } else {
104 participant_count_that_can_not_give_step += 1;
105 }
106 }
107
108 (
109 participant_count_that_can_provide_step,
110 participant_count_that_can_not_give_step,
111 )
112 }
113
114 #[allow(clippy::missing_panics_doc)]
118 pub fn produce(&mut self) -> Result<(TickId, StepMap<Step<T>>), CombinatorError> {
119 let (can_provide, can_not_provide) = self.participants_that_can_provide();
120 if can_provide == 0 {
121 trace!(
122 "notice: can not produce authoritative step {}, no one can provide it",
123 self.tick_id_to_produce
124 );
125 return Err(CombinatorError::NotReadyToProduceStep {
126 can_provide,
127 can_not_provide,
128 });
129 }
130 trace!(
131 "found {} that can provide steps and {} that can not",
132 can_provide,
133 can_not_provide
134 );
135
136 let mut combined_step = StepMap::<Step<T>>::new();
137 for (participant_id, steps) in &mut self.in_buffers {
138 if let Some(first_tick) = steps.front_tick_id() {
139 if first_tick == self.tick_id_to_produce {
140 trace!(
141 "found step from {} for {}, expecting {}",
142 first_tick,
143 participant_id,
144 steps.front_tick_id().unwrap()
145 );
146 combined_step
147 .insert(*participant_id, Step::Custom(steps.pop().unwrap().item))?;
148 } else {
149 trace!(
150 "did not find step from {} for {}, setting it to forced",
151 first_tick,
152 participant_id
153 );
154 combined_step.insert(*participant_id, Step::Forced)?;
155 steps.discard_up_to(self.tick_id_to_produce);
156 }
157 }
158 }
159
160 self.tick_id_to_produce += 1;
161
162 Ok((self.tick_id_to_produce - 1, combined_step))
163 }
164}