Skip to main content

sol_parser_sdk/grpc/
buffers.rs

1//! 事件缓冲区模块 - 用于有序模式下的事件排序和批次处理
2//!
3//! 提供多种缓冲策略:
4//! - `SlotBuffer`: 按 slot 缓冲,支持 Ordered 和 StreamingOrdered 模式
5//! - `MicroBatchBuffer`: 微秒级时间窗口批次,用于 MicroBatch 模式
6
7use crate::DexEvent;
8use std::collections::{BTreeMap, HashMap};
9use tokio::time::Instant;
10
11// ==================== SlotBuffer ====================
12
13/// Slot 缓冲区,用于有序模式下缓存同一 slot 的事件
14#[derive(Default)]
15pub struct SlotBuffer {
16    /// slot -> Vec<(tx_index, event)>
17    slots: BTreeMap<u64, Vec<(u64, DexEvent)>>,
18    /// 当前处理的最大 slot
19    current_slot: u64,
20    /// 上次输出时间
21    last_flush_time: Option<Instant>,
22    /// 流式模式:每个 slot 已释放的最大连续 tx_index
23    streaming_watermarks: HashMap<u64, u64>,
24}
25
26impl SlotBuffer {
27    #[inline]
28    pub fn new() -> Self {
29        Self {
30            slots: BTreeMap::new(),
31            current_slot: 0,
32            last_flush_time: Some(Instant::now()),
33            streaming_watermarks: HashMap::new(),
34        }
35    }
36
37    /// 添加事件到缓冲区
38    #[inline]
39    pub fn push(&mut self, slot: u64, tx_index: u64, event: DexEvent) {
40        self.slots.entry(slot).or_default().push((tx_index, event));
41        if slot > self.current_slot {
42            self.current_slot = slot;
43        }
44    }
45
46    /// 输出所有小于 current_slot 的事件
47    pub fn flush_before(&mut self, current_slot: u64) -> Vec<DexEvent> {
48        let slots_to_flush: Vec<u64> = self.slots
49            .keys()
50            .filter(|&&s| s < current_slot)
51            .copied()
52            .collect();
53        
54        let mut result = Vec::with_capacity(slots_to_flush.len() * 4);
55        for slot in slots_to_flush {
56            if let Some(mut events) = self.slots.remove(&slot) {
57                events.sort_unstable_by_key(|(idx, _)| *idx);
58                result.extend(events.into_iter().map(|(_, e)| e));
59            }
60        }
61        
62        if !result.is_empty() {
63            self.last_flush_time = Some(Instant::now());
64        }
65        result
66    }
67
68    /// 超时强制输出所有缓冲事件
69    pub fn flush_all(&mut self) -> Vec<DexEvent> {
70        let all_slots: Vec<u64> = self.slots.keys().copied().collect();
71        let mut result = Vec::with_capacity(all_slots.len() * 4);
72        
73        for slot in all_slots {
74            if let Some(mut events) = self.slots.remove(&slot) {
75                events.sort_unstable_by_key(|(idx, _)| *idx);
76                result.extend(events.into_iter().map(|(_, e)| e));
77            }
78        }
79        
80        if !result.is_empty() {
81            self.last_flush_time = Some(Instant::now());
82        }
83        result
84    }
85
86    /// 检查是否超时
87    #[inline]
88    pub fn should_timeout(&self, timeout_ms: u64) -> bool {
89        self.last_flush_time
90            .map(|t| !self.slots.is_empty() && t.elapsed().as_millis() as u64 > timeout_ms)
91            .unwrap_or(false)
92    }
93
94    /// Streaming release: add event and return releasable continuous sequence
95    /// NOTE: This mode assumes tx_index is continuous (0,1,2,3...)
96    /// For filtered event streams where tx_index may not be continuous, use MicroBatch mode instead
97    pub fn push_streaming(&mut self, slot: u64, tx_index: u64, event: DexEvent) -> Vec<DexEvent> {
98        let mut result = Vec::new();
99        
100        // When new slot arrives, release ALL events from previous slots (sorted)
101        if slot > self.current_slot && self.current_slot > 0 {
102            let old_slots: Vec<u64> = self.slots.keys().filter(|&&s| s < slot).copied().collect();
103            for old_slot in old_slots {
104                if let Some(mut events) = self.slots.remove(&old_slot) {
105                    events.sort_unstable_by_key(|(idx, _)| *idx);
106                    result.extend(events.into_iter().map(|(_, e)| e));
107                }
108                self.streaming_watermarks.remove(&old_slot);
109            }
110        }
111        
112        if slot > self.current_slot {
113            self.current_slot = slot;
114        }
115        
116        // Check if this is the expected tx_index (continuous sequence)
117        let next_expected = *self.streaming_watermarks.get(&slot).unwrap_or(&0);
118        
119        if tx_index == next_expected {
120            // Expected index: release immediately
121            result.push(event);
122            let mut watermark = next_expected + 1;
123            
124            // Release buffered consecutive events
125            if let Some(buffered) = self.slots.get_mut(&slot) {
126                buffered.sort_unstable_by_key(|(idx, _)| *idx);
127                while let Some(pos) = buffered.iter().position(|(idx, _)| *idx == watermark) {
128                    result.push(buffered.remove(pos).1);
129                    watermark += 1;
130                }
131            }
132            self.streaming_watermarks.insert(slot, watermark);
133        } else if tx_index > next_expected {
134            // Future index: buffer it
135            self.slots.entry(slot).or_default().push((tx_index, event));
136        }
137        // tx_index < next_expected: duplicate event, ignore
138        
139        if !result.is_empty() {
140            self.last_flush_time = Some(Instant::now());
141        }
142        result
143    }
144
145    /// 流式模式超时释放
146    pub fn flush_streaming_timeout(&mut self) -> Vec<DexEvent> {
147        let mut result = Vec::new();
148        for (slot, mut events) in std::mem::take(&mut self.slots) {
149            events.sort_unstable_by_key(|(idx, _)| *idx);
150            result.extend(events.into_iter().map(|(_, e)| e));
151            self.streaming_watermarks.remove(&slot);
152        }
153        if !result.is_empty() {
154            self.last_flush_time = Some(Instant::now());
155        }
156        result
157    }
158}
159
160// ==================== MicroBatchBuffer ====================
161
162/// 微批次缓冲区,用于 MicroBatch 模式
163pub struct MicroBatchBuffer {
164    /// 当前窗口内的事件: (slot, tx_index, event)
165    events: Vec<(u64, u64, DexEvent)>,
166    /// 窗口开始时间(微秒)
167    window_start_us: i64,
168}
169
170impl MicroBatchBuffer {
171    #[inline]
172    pub fn new() -> Self {
173        Self {
174            events: Vec::with_capacity(64),
175            window_start_us: 0,
176        }
177    }
178
179    /// 添加事件到窗口,返回是否需要刷新
180    #[inline]
181    pub fn push(&mut self, slot: u64, tx_index: u64, event: DexEvent, now_us: i64, window_us: u64) -> bool {
182        if self.events.is_empty() {
183            self.window_start_us = now_us;
184        }
185        self.events.push((slot, tx_index, event));
186        (now_us - self.window_start_us) as u64 >= window_us
187    }
188
189    /// 刷新窗口,返回排序后的事件
190    #[inline]
191    pub fn flush(&mut self) -> Vec<DexEvent> {
192        if self.events.is_empty() {
193            return Vec::new();
194        }
195        
196        // 按 (slot, tx_index) 排序
197        self.events.sort_unstable_by_key(|(slot, tx_index, _)| (*slot, *tx_index));
198        
199        let result: Vec<DexEvent> = std::mem::take(&mut self.events)
200            .into_iter()
201            .map(|(_, _, event)| event)
202            .collect();
203        
204        self.window_start_us = 0;
205        result
206    }
207
208    /// 检查是否需要刷新(窗口超时)
209    #[inline]
210    pub fn should_flush(&self, now_us: i64, window_us: u64) -> bool {
211        !self.events.is_empty() && (now_us - self.window_start_us) as u64 >= window_us
212    }
213}
214
215impl Default for MicroBatchBuffer {
216    fn default() -> Self {
217        Self::new()
218    }
219}