sol_parser_sdk/grpc/
buffers.rs1use crate::DexEvent;
8use std::collections::{BTreeMap, HashMap};
9use tokio::time::Instant;
10
11#[derive(Default)]
15pub struct SlotBuffer {
16 slots: BTreeMap<u64, Vec<(u64, DexEvent)>>,
18 current_slot: u64,
20 last_flush_time: Option<Instant>,
22 streaming_watermarks: HashMap<u64, u64>,
24}
25
26impl SlotBuffer {
27 #[inline]
28 pub fn new() -> Self {
29 Self {
30 slots: BTreeMap::new(),
31 current_slot: 0,
32 last_flush_time: Some(Instant::now()),
33 streaming_watermarks: HashMap::new(),
34 }
35 }
36
37 #[inline]
39 pub fn push(&mut self, slot: u64, tx_index: u64, event: DexEvent) {
40 self.slots.entry(slot).or_default().push((tx_index, event));
41 if slot > self.current_slot {
42 self.current_slot = slot;
43 }
44 }
45
46 pub fn flush_before(&mut self, current_slot: u64) -> Vec<DexEvent> {
48 let slots_to_flush: Vec<u64> =
49 self.slots.keys().filter(|&&s| s < current_slot).copied().collect();
50
51 let mut result = Vec::with_capacity(slots_to_flush.len() * 4);
52 for slot in slots_to_flush {
53 if let Some(mut events) = self.slots.remove(&slot) {
54 events.sort_unstable_by_key(|(idx, _)| *idx);
55 result.extend(events.into_iter().map(|(_, e)| e));
56 }
57 }
58
59 if !result.is_empty() {
60 self.last_flush_time = Some(Instant::now());
61 }
62 result
63 }
64
65 pub fn flush_all(&mut self) -> Vec<DexEvent> {
67 let all_slots: Vec<u64> = self.slots.keys().copied().collect();
68 let mut result = Vec::with_capacity(all_slots.len() * 4);
69
70 for slot in all_slots {
71 if let Some(mut events) = self.slots.remove(&slot) {
72 events.sort_unstable_by_key(|(idx, _)| *idx);
73 result.extend(events.into_iter().map(|(_, e)| e));
74 }
75 }
76
77 if !result.is_empty() {
78 self.last_flush_time = Some(Instant::now());
79 }
80 result
81 }
82
83 #[inline]
85 pub fn should_timeout(&self, timeout_ms: u64) -> bool {
86 self.last_flush_time
87 .map(|t| !self.slots.is_empty() && t.elapsed().as_millis() as u64 > timeout_ms)
88 .unwrap_or(false)
89 }
90
91 pub fn push_streaming(&mut self, slot: u64, tx_index: u64, event: DexEvent) -> Vec<DexEvent> {
95 let mut result = Vec::new();
96
97 if slot > self.current_slot && self.current_slot > 0 {
99 let old_slots: Vec<u64> = self.slots.keys().filter(|&&s| s < slot).copied().collect();
100 for old_slot in old_slots {
101 if let Some(mut events) = self.slots.remove(&old_slot) {
102 events.sort_unstable_by_key(|(idx, _)| *idx);
103 result.extend(events.into_iter().map(|(_, e)| e));
104 }
105 self.streaming_watermarks.remove(&old_slot);
106 }
107 }
108
109 if slot > self.current_slot {
110 self.current_slot = slot;
111 }
112
113 let next_expected = *self.streaming_watermarks.get(&slot).unwrap_or(&0);
115
116 if tx_index == next_expected {
117 result.push(event);
119 let mut watermark = next_expected + 1;
120
121 if let Some(buffered) = self.slots.get_mut(&slot) {
123 buffered.sort_unstable_by_key(|(idx, _)| *idx);
124 while let Some(pos) = buffered.iter().position(|(idx, _)| *idx == watermark) {
125 result.push(buffered.remove(pos).1);
126 watermark += 1;
127 }
128 }
129 self.streaming_watermarks.insert(slot, watermark);
130 } else if tx_index > next_expected {
131 self.slots.entry(slot).or_default().push((tx_index, event));
133 }
134 if !result.is_empty() {
137 self.last_flush_time = Some(Instant::now());
138 }
139 result
140 }
141
142 pub fn flush_streaming_timeout(&mut self) -> Vec<DexEvent> {
144 let mut result = Vec::new();
145 for (slot, mut events) in std::mem::take(&mut self.slots) {
146 events.sort_unstable_by_key(|(idx, _)| *idx);
147 result.extend(events.into_iter().map(|(_, e)| e));
148 self.streaming_watermarks.remove(&slot);
149 }
150 if !result.is_empty() {
151 self.last_flush_time = Some(Instant::now());
152 }
153 result
154 }
155}
156
157pub struct MicroBatchBuffer {
161 events: Vec<(u64, u64, DexEvent)>,
163 window_start_us: i64,
165}
166
167impl MicroBatchBuffer {
168 #[inline]
169 pub fn new() -> Self {
170 Self { events: Vec::with_capacity(64), window_start_us: 0 }
171 }
172
173 #[inline]
175 pub fn push(
176 &mut self,
177 slot: u64,
178 tx_index: u64,
179 event: DexEvent,
180 now_us: i64,
181 window_us: u64,
182 ) -> bool {
183 if self.events.is_empty() {
184 self.window_start_us = now_us;
185 }
186 self.events.push((slot, tx_index, event));
187 (now_us - self.window_start_us) as u64 >= window_us
188 }
189
190 #[inline]
192 pub fn flush(&mut self) -> Vec<DexEvent> {
193 if self.events.is_empty() {
194 return Vec::new();
195 }
196
197 self.events.sort_unstable_by_key(|(slot, tx_index, _)| (*slot, *tx_index));
199
200 let result: Vec<DexEvent> =
201 std::mem::take(&mut self.events).into_iter().map(|(_, _, event)| event).collect();
202
203 self.window_start_us = 0;
204 result
205 }
206
207 #[inline]
209 pub fn should_flush(&self, now_us: i64, window_us: u64) -> bool {
210 !self.events.is_empty() && (now_us - self.window_start_us) as u64 >= window_us
211 }
212}
213
214impl Default for MicroBatchBuffer {
215 fn default() -> Self {
216 Self::new()
217 }
218}