Skip to main content

sol_parser_sdk/grpc/
buffers.rs

1//! 事件缓冲区模块 - 用于有序模式下的事件排序和批次处理
2//!
3//! 提供多种缓冲策略:
4//! - `SlotBuffer`: 按 slot 缓冲,支持 Ordered 和 StreamingOrdered 模式
5//! - `MicroBatchBuffer`: 微秒级时间窗口批次,用于 MicroBatch 模式
6
7use crate::DexEvent;
8use std::collections::{BTreeMap, HashMap};
9use tokio::time::Instant;
10
11// ==================== SlotBuffer ====================
12
13/// Slot 缓冲区,用于有序模式下缓存同一 slot 的事件
14#[derive(Default)]
15pub struct SlotBuffer {
16    /// slot -> Vec<(tx_index, event)>
17    slots: BTreeMap<u64, Vec<(u64, DexEvent)>>,
18    /// 当前处理的最大 slot
19    current_slot: u64,
20    /// 上次输出时间
21    last_flush_time: Option<Instant>,
22    /// 流式模式:每个 slot 已释放的最大连续 tx_index
23    streaming_watermarks: HashMap<u64, u64>,
24}
25
26impl SlotBuffer {
27    #[inline]
28    pub fn new() -> Self {
29        Self {
30            slots: BTreeMap::new(),
31            current_slot: 0,
32            last_flush_time: Some(Instant::now()),
33            streaming_watermarks: HashMap::new(),
34        }
35    }
36
37    /// 添加事件到缓冲区
38    #[inline]
39    pub fn push(&mut self, slot: u64, tx_index: u64, event: DexEvent) {
40        self.slots.entry(slot).or_default().push((tx_index, event));
41        if slot > self.current_slot {
42            self.current_slot = slot;
43        }
44    }
45
46    /// 输出所有小于 current_slot 的事件
47    pub fn flush_before(&mut self, current_slot: u64) -> Vec<DexEvent> {
48        let slots_to_flush: Vec<u64> =
49            self.slots.keys().filter(|&&s| s < current_slot).copied().collect();
50
51        let mut result = Vec::with_capacity(slots_to_flush.len() * 4);
52        for slot in slots_to_flush {
53            if let Some(mut events) = self.slots.remove(&slot) {
54                events.sort_unstable_by_key(|(idx, _)| *idx);
55                result.extend(events.into_iter().map(|(_, e)| e));
56            }
57        }
58
59        if !result.is_empty() {
60            self.last_flush_time = Some(Instant::now());
61        }
62        result
63    }
64
65    /// 超时强制输出所有缓冲事件
66    pub fn flush_all(&mut self) -> Vec<DexEvent> {
67        let all_slots: Vec<u64> = self.slots.keys().copied().collect();
68        let mut result = Vec::with_capacity(all_slots.len() * 4);
69
70        for slot in all_slots {
71            if let Some(mut events) = self.slots.remove(&slot) {
72                events.sort_unstable_by_key(|(idx, _)| *idx);
73                result.extend(events.into_iter().map(|(_, e)| e));
74            }
75        }
76
77        if !result.is_empty() {
78            self.last_flush_time = Some(Instant::now());
79        }
80        result
81    }
82
83    /// 检查是否超时
84    #[inline]
85    pub fn should_timeout(&self, timeout_ms: u64) -> bool {
86        self.last_flush_time
87            .map(|t| !self.slots.is_empty() && t.elapsed().as_millis() as u64 > timeout_ms)
88            .unwrap_or(false)
89    }
90
91    /// Streaming release: add event and return releasable continuous sequence
92    /// NOTE: This mode assumes tx_index is continuous (0,1,2,3...)
93    /// For filtered event streams where tx_index may not be continuous, use MicroBatch mode instead
94    pub fn push_streaming(&mut self, slot: u64, tx_index: u64, event: DexEvent) -> Vec<DexEvent> {
95        let mut result = Vec::new();
96
97        // When new slot arrives, release ALL events from previous slots (sorted)
98        if slot > self.current_slot && self.current_slot > 0 {
99            let old_slots: Vec<u64> = self.slots.keys().filter(|&&s| s < slot).copied().collect();
100            for old_slot in old_slots {
101                if let Some(mut events) = self.slots.remove(&old_slot) {
102                    events.sort_unstable_by_key(|(idx, _)| *idx);
103                    result.extend(events.into_iter().map(|(_, e)| e));
104                }
105                self.streaming_watermarks.remove(&old_slot);
106            }
107        }
108
109        if slot > self.current_slot {
110            self.current_slot = slot;
111        }
112
113        // Check if this is the expected tx_index (continuous sequence)
114        let next_expected = *self.streaming_watermarks.get(&slot).unwrap_or(&0);
115
116        if tx_index == next_expected {
117            // Expected index: release immediately
118            result.push(event);
119            let mut watermark = next_expected + 1;
120
121            // Release buffered consecutive events
122            if let Some(buffered) = self.slots.get_mut(&slot) {
123                buffered.sort_unstable_by_key(|(idx, _)| *idx);
124                while let Some(pos) = buffered.iter().position(|(idx, _)| *idx == watermark) {
125                    result.push(buffered.remove(pos).1);
126                    watermark += 1;
127                }
128            }
129            self.streaming_watermarks.insert(slot, watermark);
130        } else if tx_index > next_expected {
131            // Future index: buffer it
132            self.slots.entry(slot).or_default().push((tx_index, event));
133        }
134        // tx_index < next_expected: duplicate event, ignore
135
136        if !result.is_empty() {
137            self.last_flush_time = Some(Instant::now());
138        }
139        result
140    }
141
142    /// 流式模式超时释放
143    pub fn flush_streaming_timeout(&mut self) -> Vec<DexEvent> {
144        let mut result = Vec::new();
145        for (slot, mut events) in std::mem::take(&mut self.slots) {
146            events.sort_unstable_by_key(|(idx, _)| *idx);
147            result.extend(events.into_iter().map(|(_, e)| e));
148            self.streaming_watermarks.remove(&slot);
149        }
150        if !result.is_empty() {
151            self.last_flush_time = Some(Instant::now());
152        }
153        result
154    }
155}
156
157// ==================== MicroBatchBuffer ====================
158
159/// 微批次缓冲区,用于 MicroBatch 模式
160pub struct MicroBatchBuffer {
161    /// 当前窗口内的事件: (slot, tx_index, event)
162    events: Vec<(u64, u64, DexEvent)>,
163    /// 窗口开始时间(微秒)
164    window_start_us: i64,
165}
166
167impl MicroBatchBuffer {
168    #[inline]
169    pub fn new() -> Self {
170        Self { events: Vec::with_capacity(64), window_start_us: 0 }
171    }
172
173    /// 添加事件到窗口,返回是否需要刷新
174    #[inline]
175    pub fn push(
176        &mut self,
177        slot: u64,
178        tx_index: u64,
179        event: DexEvent,
180        now_us: i64,
181        window_us: u64,
182    ) -> bool {
183        if self.events.is_empty() {
184            self.window_start_us = now_us;
185        }
186        self.events.push((slot, tx_index, event));
187        (now_us - self.window_start_us) as u64 >= window_us
188    }
189
190    /// 刷新窗口,返回排序后的事件
191    #[inline]
192    pub fn flush(&mut self) -> Vec<DexEvent> {
193        if self.events.is_empty() {
194            return Vec::new();
195        }
196
197        // 按 (slot, tx_index) 排序
198        self.events.sort_unstable_by_key(|(slot, tx_index, _)| (*slot, *tx_index));
199
200        let result: Vec<DexEvent> =
201            std::mem::take(&mut self.events).into_iter().map(|(_, _, event)| event).collect();
202
203        self.window_start_us = 0;
204        result
205    }
206
207    /// 检查是否需要刷新(窗口超时)
208    #[inline]
209    pub fn should_flush(&self, now_us: i64, window_us: u64) -> bool {
210        !self.events.is_empty() && (now_us - self.window_start_us) as u64 >= window_us
211    }
212}
213
214impl Default for MicroBatchBuffer {
215    fn default() -> Self {
216        Self::new()
217    }
218}