noir_compute/block/
batcher.rs1use std::num::NonZeroUsize;
2use std::time::Duration;
3
4use coarsetime::Instant;
5
6use crate::network::{Coord, NetworkMessage, NetworkSender};
7use crate::operator::{ExchangeData, StreamElement};
8
9#[derive(Debug, Clone, Copy, Eq, PartialEq)]
17pub enum BatchMode {
18 Fixed(NonZeroUsize),
20 Adaptive(NonZeroUsize, Duration),
23
24 Single,
26}
27
28impl BatchMode {
29 pub fn max_size(&self) -> usize {
30 match self {
31 BatchMode::Fixed(s) => s.get(),
32 BatchMode::Adaptive(s, _) => s.get(),
33 BatchMode::Single => 1,
34 }
35 }
36
37 pub fn interval(&self) -> Option<Duration> {
38 match self {
39 BatchMode::Adaptive(_, ts) => Some(*ts),
40 _ => None,
41 }
42 }
43}
44
45pub(crate) struct Batcher<Out: Send + 'static> {
49 remote_sender: NetworkSender<Out>,
51 mode: BatchMode,
53 buffer: Vec<StreamElement<Out>>,
55 last_send: Instant,
57 coord: Coord,
59}
60
61impl<Out: ExchangeData> Batcher<Out> {
62 pub(crate) fn new(remote_sender: NetworkSender<Out>, mode: BatchMode, coord: Coord) -> Self {
63 Self {
64 remote_sender,
65 mode,
66 buffer: Default::default(),
67 last_send: Instant::now(),
68 coord,
69 }
70 }
71
72 pub(crate) fn enqueue(&mut self, message: StreamElement<Out>) {
74 match self.mode {
75 BatchMode::Adaptive(n, max_delay) => {
76 self.buffer.push(message);
77 let timeout_elapsed = self.last_send.elapsed() > max_delay.into();
78 if self.buffer.len() >= n.get() || timeout_elapsed {
79 self.flush()
80 }
81 }
82 BatchMode::Fixed(n) => {
83 self.buffer.push(message);
84 if self.buffer.len() >= n.get() {
85 self.flush()
86 }
87 }
88 BatchMode::Single => {
89 let message = NetworkMessage::new_single(message, self.coord);
90 self.remote_sender.send(message).unwrap();
91 }
92 }
93 }
94
95 pub(crate) fn flush(&mut self) {
97 if !self.buffer.is_empty() {
98 let cap = self.buffer.capacity();
99 let new_cap = if self.buffer.len() < cap / 4 {
100 cap / 2
101 } else {
102 cap
103 };
104 let mut batch = Vec::with_capacity(new_cap);
105 std::mem::swap(&mut self.buffer, &mut batch);
106 let message = NetworkMessage::new_batch(batch, self.coord);
107 self.remote_sender.send(message).unwrap();
108 self.last_send = Instant::now();
109 }
110 }
111
112 pub(crate) fn end(self) {
114 if !self.buffer.is_empty() {
116 let message = NetworkMessage::new_batch(self.buffer, self.coord);
117 self.remote_sender.send(message).unwrap();
118 }
119 }
120}
121
122impl BatchMode {
123 pub fn fixed(size: usize) -> BatchMode {
125 BatchMode::Fixed(NonZeroUsize::new(size).expect("The batch size must be positive"))
126 }
127
128 pub fn adaptive(size: usize, max_delay: Duration) -> BatchMode {
130 BatchMode::Adaptive(
131 NonZeroUsize::new(size).expect("The batch size must be positive"),
132 max_delay,
133 )
134 }
135
136 pub fn single() -> BatchMode {
138 BatchMode::Single
139 }
140
141 pub fn max_delay(&self) -> Option<Duration> {
142 match &self {
143 BatchMode::Adaptive(_, max_delay) => Some(*max_delay),
144 BatchMode::Fixed(_) | BatchMode::Single => None,
145 }
146 }
147}
148
149impl Default for BatchMode {
150 fn default() -> Self {
151 BatchMode::adaptive(1024, Duration::from_millis(50))
152 }
153}