sol_parser_sdk/grpc/
buffers.rs1use crate::DexEvent;
8use std::collections::{BTreeMap, HashMap};
9use tokio::time::Instant;
10
11#[derive(Default)]
15pub struct SlotBuffer {
16 slots: BTreeMap<u64, Vec<(u64, DexEvent)>>,
18 current_slot: u64,
20 last_flush_time: Option<Instant>,
22 streaming_watermarks: HashMap<u64, u64>,
24}
25
26impl SlotBuffer {
27 #[inline]
28 pub fn new() -> Self {
29 Self {
30 slots: BTreeMap::new(),
31 current_slot: 0,
32 last_flush_time: Some(Instant::now()),
33 streaming_watermarks: HashMap::new(),
34 }
35 }
36
37 #[inline]
39 pub fn push(&mut self, slot: u64, tx_index: u64, event: DexEvent) {
40 self.slots.entry(slot).or_default().push((tx_index, event));
41 if slot > self.current_slot {
42 self.current_slot = slot;
43 }
44 }
45
46 pub fn flush_before(&mut self, current_slot: u64) -> Vec<DexEvent> {
48 let slots_to_flush: Vec<u64> = self.slots
49 .keys()
50 .filter(|&&s| s < current_slot)
51 .copied()
52 .collect();
53
54 let mut result = Vec::with_capacity(slots_to_flush.len() * 4);
55 for slot in slots_to_flush {
56 if let Some(mut events) = self.slots.remove(&slot) {
57 events.sort_unstable_by_key(|(idx, _)| *idx);
58 result.extend(events.into_iter().map(|(_, e)| e));
59 }
60 }
61
62 if !result.is_empty() {
63 self.last_flush_time = Some(Instant::now());
64 }
65 result
66 }
67
68 pub fn flush_all(&mut self) -> Vec<DexEvent> {
70 let all_slots: Vec<u64> = self.slots.keys().copied().collect();
71 let mut result = Vec::with_capacity(all_slots.len() * 4);
72
73 for slot in all_slots {
74 if let Some(mut events) = self.slots.remove(&slot) {
75 events.sort_unstable_by_key(|(idx, _)| *idx);
76 result.extend(events.into_iter().map(|(_, e)| e));
77 }
78 }
79
80 if !result.is_empty() {
81 self.last_flush_time = Some(Instant::now());
82 }
83 result
84 }
85
86 #[inline]
88 pub fn should_timeout(&self, timeout_ms: u64) -> bool {
89 self.last_flush_time
90 .map(|t| !self.slots.is_empty() && t.elapsed().as_millis() as u64 > timeout_ms)
91 .unwrap_or(false)
92 }
93
94 pub fn push_streaming(&mut self, slot: u64, tx_index: u64, event: DexEvent) -> Vec<DexEvent> {
98 let mut result = Vec::new();
99
100 if slot > self.current_slot && self.current_slot > 0 {
102 let old_slots: Vec<u64> = self.slots.keys().filter(|&&s| s < slot).copied().collect();
103 for old_slot in old_slots {
104 if let Some(mut events) = self.slots.remove(&old_slot) {
105 events.sort_unstable_by_key(|(idx, _)| *idx);
106 result.extend(events.into_iter().map(|(_, e)| e));
107 }
108 self.streaming_watermarks.remove(&old_slot);
109 }
110 }
111
112 if slot > self.current_slot {
113 self.current_slot = slot;
114 }
115
116 let next_expected = *self.streaming_watermarks.get(&slot).unwrap_or(&0);
118
119 if tx_index == next_expected {
120 result.push(event);
122 let mut watermark = next_expected + 1;
123
124 if let Some(buffered) = self.slots.get_mut(&slot) {
126 buffered.sort_unstable_by_key(|(idx, _)| *idx);
127 while let Some(pos) = buffered.iter().position(|(idx, _)| *idx == watermark) {
128 result.push(buffered.remove(pos).1);
129 watermark += 1;
130 }
131 }
132 self.streaming_watermarks.insert(slot, watermark);
133 } else if tx_index > next_expected {
134 self.slots.entry(slot).or_default().push((tx_index, event));
136 }
137 if !result.is_empty() {
140 self.last_flush_time = Some(Instant::now());
141 }
142 result
143 }
144
145 pub fn flush_streaming_timeout(&mut self) -> Vec<DexEvent> {
147 let mut result = Vec::new();
148 for (slot, mut events) in std::mem::take(&mut self.slots) {
149 events.sort_unstable_by_key(|(idx, _)| *idx);
150 result.extend(events.into_iter().map(|(_, e)| e));
151 self.streaming_watermarks.remove(&slot);
152 }
153 if !result.is_empty() {
154 self.last_flush_time = Some(Instant::now());
155 }
156 result
157 }
158}
159
160pub struct MicroBatchBuffer {
164 events: Vec<(u64, u64, DexEvent)>,
166 window_start_us: i64,
168}
169
170impl MicroBatchBuffer {
171 #[inline]
172 pub fn new() -> Self {
173 Self {
174 events: Vec::with_capacity(64),
175 window_start_us: 0,
176 }
177 }
178
179 #[inline]
181 pub fn push(&mut self, slot: u64, tx_index: u64, event: DexEvent, now_us: i64, window_us: u64) -> bool {
182 if self.events.is_empty() {
183 self.window_start_us = now_us;
184 }
185 self.events.push((slot, tx_index, event));
186 (now_us - self.window_start_us) as u64 >= window_us
187 }
188
189 #[inline]
191 pub fn flush(&mut self) -> Vec<DexEvent> {
192 if self.events.is_empty() {
193 return Vec::new();
194 }
195
196 self.events.sort_unstable_by_key(|(slot, tx_index, _)| (*slot, *tx_index));
198
199 let result: Vec<DexEvent> = std::mem::take(&mut self.events)
200 .into_iter()
201 .map(|(_, _, event)| event)
202 .collect();
203
204 self.window_start_us = 0;
205 result
206 }
207
208 #[inline]
210 pub fn should_flush(&self, now_us: i64, window_us: u64) -> bool {
211 !self.events.is_empty() && (now_us - self.window_start_us) as u64 >= window_us
212 }
213}
214
215impl Default for MicroBatchBuffer {
216 fn default() -> Self {
217 Self::new()
218 }
219}