nimble_host_logic/
combinator.rs

1/*
2 * Copyright (c) Peter Bjorklund. All rights reserved. https://github.com/nimble-rust/nimble
3 * Licensed under the MIT License. See LICENSE in the project root for license information.
4 */
5use err_rs::{ErrorLevel, ErrorLevelProvider};
6use log::trace;
7use nimble_participant::ParticipantId;
8use nimble_step::Step;
9use nimble_step_map::StepMap;
10use seq_map::SeqMapError;
11use std::collections::HashMap;
12use tick_id::TickId;
13use tick_queue::{Queue, QueueError};
14
15#[derive(Debug)]
16#[allow(clippy::module_name_repetitions)] // TODO: rename CombinatorError
17pub enum CombinatorError {
18    NotReadyToProduceStep {
19        can_provide: usize,
20        can_not_provide: usize,
21    },
22    OtherError,
23    SeqMapError(SeqMapError),
24    NoBufferForParticipant,
25    QueueError(QueueError),
26}
27
28impl From<QueueError> for CombinatorError {
29    fn from(e: QueueError) -> Self {
30        Self::QueueError(e)
31    }
32}
33
34impl ErrorLevelProvider for CombinatorError {
35    fn error_level(&self) -> ErrorLevel {
36        match self {
37            Self::NotReadyToProduceStep { .. }
38            | Self::OtherError
39            | Self::SeqMapError(_)
40            | Self::NoBufferForParticipant => ErrorLevel::Info,
41            Self::QueueError(_) => ErrorLevel::Critical,
42        }
43    }
44}
45
46impl From<SeqMapError> for CombinatorError {
47    fn from(value: SeqMapError) -> Self {
48        Self::SeqMapError(value)
49    }
50}
51
52#[derive(Default)]
53pub struct Combinator<T: Clone> {
54    pub in_buffers: HashMap<ParticipantId, Queue<T>>,
55    pub tick_id_to_produce: TickId,
56}
57
58impl<T: Clone + std::fmt::Display> Combinator<T> {
59    #[must_use]
60    pub fn new(tick_id_to_produce: TickId) -> Self {
61        Self {
62            in_buffers: HashMap::new(),
63            tick_id_to_produce,
64        }
65    }
66
67    pub fn create_buffer(&mut self, id: ParticipantId) {
68        self.in_buffers.insert(id, Queue::default());
69    }
70
71    /// # Errors
72    ///
73    /// `CombinatorError` // TODO:
74    pub fn add(
75        &mut self,
76        id: ParticipantId,
77        tick_id: TickId,
78        step: T,
79    ) -> Result<(), CombinatorError> {
80        if let Some(buffer) = self.in_buffers.get_mut(&id) {
81            buffer.push(tick_id, step)?;
82            Ok(())
83        } else {
84            Err(CombinatorError::NoBufferForParticipant)
85        }
86    }
87
88    pub fn get_mut(&mut self, id: &ParticipantId) -> Option<&mut Queue<T>> {
89        self.in_buffers.get_mut(id)
90    }
91
92    #[must_use]
93    pub fn participants_that_can_provide(&self) -> (usize, usize) {
94        let mut participant_count_that_can_not_give_step = 0;
95        let mut participant_count_that_can_provide_step = 0;
96        for steps in self.in_buffers.values() {
97            if let Some(first_tick) = steps.front_tick_id() {
98                if first_tick == self.tick_id_to_produce {
99                    participant_count_that_can_provide_step += 1;
100                } else {
101                    participant_count_that_can_not_give_step += 1;
102                }
103            } else {
104                participant_count_that_can_not_give_step += 1;
105            }
106        }
107
108        (
109            participant_count_that_can_provide_step,
110            participant_count_that_can_not_give_step,
111        )
112    }
113
114    /// # Errors
115    ///
116    /// `CombinatorError` // TODO:
117    #[allow(clippy::missing_panics_doc)]
118    pub fn produce(&mut self) -> Result<(TickId, StepMap<Step<T>>), CombinatorError> {
119        let (can_provide, can_not_provide) = self.participants_that_can_provide();
120        if can_provide == 0 {
121            trace!(
122                "notice: can not produce authoritative step {}, no one can provide it",
123                self.tick_id_to_produce
124            );
125            return Err(CombinatorError::NotReadyToProduceStep {
126                can_provide,
127                can_not_provide,
128            });
129        }
130        trace!(
131            "found {} that can provide steps and {} that can not",
132            can_provide,
133            can_not_provide
134        );
135
136        let mut combined_step = StepMap::<Step<T>>::new();
137        for (participant_id, steps) in &mut self.in_buffers {
138            if let Some(first_tick) = steps.front_tick_id() {
139                if first_tick == self.tick_id_to_produce {
140                    trace!(
141                        "found step from {} for {}, expecting {}",
142                        first_tick,
143                        participant_id,
144                        steps.front_tick_id().unwrap()
145                    );
146                    combined_step
147                        .insert(*participant_id, Step::Custom(steps.pop().unwrap().item))?;
148                } else {
149                    trace!(
150                        "did not find step from {} for {}, setting it to forced",
151                        first_tick,
152                        participant_id
153                    );
154                    combined_step.insert(*participant_id, Step::Forced)?;
155                    steps.discard_up_to(self.tick_id_to_produce);
156                }
157            }
158        }
159
160        self.tick_id_to_produce += 1;
161
162        Ok((self.tick_id_to_produce - 1, combined_step))
163    }
164}