noir_compute/block/
batcher.rs

1use std::num::NonZeroUsize;
2use std::time::Duration;
3
4use coarsetime::Instant;
5
6use crate::network::{Coord, NetworkMessage, NetworkSender};
7use crate::operator::{ExchangeData, StreamElement};
8
9/// Which policy to use for batching the messages before sending them.
10///
11/// Avoid constructing directly this enumeration, please use [`BatchMode::fixed()`] and
12/// [`BatchMode::adaptive()`] constructors.
13///
14/// The default batch mode is `Adaptive(1024, 50ms)`, meaning that a batch is flushed either when
15/// it has at least 1024 messages, or no message has been received in the last 50ms.
16#[derive(Debug, Clone, Copy, Eq, PartialEq)]
17pub enum BatchMode {
18    /// A batch is flushed only when the specified number of messages is present.
19    Fixed(NonZeroUsize),
20    /// A batch is flushed only when the specified number of messages is present or a timeout
21    /// expires.
22    Adaptive(NonZeroUsize, Duration),
23
24    /// Send each message infdividually
25    Single,
26}
27
28impl BatchMode {
29    pub fn max_size(&self) -> usize {
30        match self {
31            BatchMode::Fixed(s) => s.get(),
32            BatchMode::Adaptive(s, _) => s.get(),
33            BatchMode::Single => 1,
34        }
35    }
36
37    pub fn interval(&self) -> Option<Duration> {
38        match self {
39            BatchMode::Adaptive(_, ts) => Some(*ts),
40            _ => None,
41        }
42    }
43}
44
45/// A `Batcher` wraps a sender and sends the messages in batches to reduce the network overhead.
46///
47/// Internally it spawns a new task to handle the timeouts and join it at the end.
48pub(crate) struct Batcher<Out: Send + 'static> {
49    /// Sender used to communicate with the other replicas
50    remote_sender: NetworkSender<Out>,
51    /// Batching mode used by the batcher
52    mode: BatchMode,
53    /// Buffer used to keep messages ready to be sent
54    buffer: Vec<StreamElement<Out>>,
55    /// Time of the last flush of the buffer.    
56    last_send: Instant,
57    /// The coordinate of this block, used for marking the sender of the batch.
58    coord: Coord,
59}
60
61impl<Out: ExchangeData> Batcher<Out> {
62    pub(crate) fn new(remote_sender: NetworkSender<Out>, mode: BatchMode, coord: Coord) -> Self {
63        Self {
64            remote_sender,
65            mode,
66            buffer: Default::default(),
67            last_send: Instant::now(),
68            coord,
69        }
70    }
71
72    /// Put a message in the batch queue, it won't be sent immediately.
73    pub(crate) fn enqueue(&mut self, message: StreamElement<Out>) {
74        match self.mode {
75            BatchMode::Adaptive(n, max_delay) => {
76                self.buffer.push(message);
77                let timeout_elapsed = self.last_send.elapsed() > max_delay.into();
78                if self.buffer.len() >= n.get() || timeout_elapsed {
79                    self.flush()
80                }
81            }
82            BatchMode::Fixed(n) => {
83                self.buffer.push(message);
84                if self.buffer.len() >= n.get() {
85                    self.flush()
86                }
87            }
88            BatchMode::Single => {
89                let message = NetworkMessage::new_single(message, self.coord);
90                self.remote_sender.send(message).unwrap();
91            }
92        }
93    }
94
95    /// Flush the internal buffer if it's not empty.
96    pub(crate) fn flush(&mut self) {
97        if !self.buffer.is_empty() {
98            let cap = self.buffer.capacity();
99            let new_cap = if self.buffer.len() < cap / 4 {
100                cap / 2
101            } else {
102                cap
103            };
104            let mut batch = Vec::with_capacity(new_cap);
105            std::mem::swap(&mut self.buffer, &mut batch);
106            let message = NetworkMessage::new_batch(batch, self.coord);
107            self.remote_sender.send(message).unwrap();
108            self.last_send = Instant::now();
109        }
110    }
111
112    /// Tell the batcher that the stream is ended, flush all the remaining messages.
113    pub(crate) fn end(self) {
114        // Send the remaining messages
115        if !self.buffer.is_empty() {
116            let message = NetworkMessage::new_batch(self.buffer, self.coord);
117            self.remote_sender.send(message).unwrap();
118        }
119    }
120}
121
122impl BatchMode {
123    /// Construct a new `BatchMode::Fixed` with the given positive batch size.
124    pub fn fixed(size: usize) -> BatchMode {
125        BatchMode::Fixed(NonZeroUsize::new(size).expect("The batch size must be positive"))
126    }
127
128    /// Construct a new `BatchMode::Adaptive` with the given positive batch size and maximum delay.
129    pub fn adaptive(size: usize, max_delay: Duration) -> BatchMode {
130        BatchMode::Adaptive(
131            NonZeroUsize::new(size).expect("The batch size must be positive"),
132            max_delay,
133        )
134    }
135
136    /// Construct a new `BatchMode::Single`.
137    pub fn single() -> BatchMode {
138        BatchMode::Single
139    }
140
141    pub fn max_delay(&self) -> Option<Duration> {
142        match &self {
143            BatchMode::Adaptive(_, max_delay) => Some(*max_delay),
144            BatchMode::Fixed(_) | BatchMode::Single => None,
145        }
146    }
147}
148
149impl Default for BatchMode {
150    fn default() -> Self {
151        BatchMode::adaptive(1024, Duration::from_millis(50))
152    }
153}