wp_connector_api/runtime/source/
event.rs

1use smol_str::SmolStr;
2use std::net::IpAddr;
3use std::sync::Arc;
4use wp_parse_api::RawData;
5
6use super::types::Tags;
7
8/// Parse 侧预处理钩子
9pub type EventPreHook = Arc<dyn Fn(&mut SourceEvent) + Send + Sync + 'static>;
10
11#[derive(Clone)]
12pub struct SourceEvent {
13    pub event_id: u64,
14    pub src_key: SmolStr,
15    pub payload: RawData,
16    pub tags: Arc<Tags>,
17    pub ups_ip: Option<IpAddr>,
18    /// 可选:parse 线程在进入 WPL 前调用
19    pub preproc: Option<EventPreHook>,
20}
21
22/// 一批源事件,便于批量传输;允许返回空 Vec 代表暂时无数据。
23/// A batch of events for bulk delivery; empty Vec means "no data for now".
24pub type SourceBatch = Vec<SourceEvent>;
25
26impl SourceEvent {
27    /// 构造一个最小帧
28    pub fn new(
29        event_id: u64,
30        src_key: impl Into<SmolStr>,
31        payload: RawData,
32        tags: Arc<Tags>,
33    ) -> Self {
34        Self {
35            event_id,
36            src_key: src_key.into(),
37            payload,
38            tags,
39            ups_ip: None,
40            preproc: None,
41        }
42    }
43}
44
45impl std::fmt::Debug for SourceEvent {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        f.debug_struct("SourceEvent")
48            .field("id", &self.event_id)
49            .field("src_key", &self.src_key)
50            .field(
51                "payload",
52                &match &self.payload {
53                    RawData::String(s) => format!("String(len={})", s.len()),
54                    RawData::Bytes(b) => format!("Bytes(len={})", b.len()),
55                    RawData::ArcBytes(arc) => format!("ArcBytes(len={}, zcp ={})", arc.len(), true),
56                },
57            )
58            .field("tags", &format!("{} tags", self.tags.len()))
59            .field("ups_ip", &self.ups_ip)
60            .finish()
61    }
62}
63
64#[cfg(test)]
65mod tests {
66    use super::*;
67    use std::sync::Arc;
68    use wp_parse_api::RawData;
69
70    #[test]
71    fn source_event_new_sets_defaults() {
72        let tags = Arc::new(Tags::default());
73        let event = SourceEvent::new(7, "main", RawData::from_string("payload"), tags.clone());
74
75        assert_eq!(event.event_id, 7);
76        assert_eq!(event.src_key.as_str(), "main");
77        assert!(matches!(event.payload, RawData::String(_)));
78        assert!(Arc::ptr_eq(&event.tags, &tags));
79        assert!(event.ups_ip.is_none());
80        assert!(event.preproc.is_none());
81    }
82
83    #[test]
84    fn debug_impl_reports_summary() {
85        let event = SourceEvent::new(
86            1,
87            "key",
88            RawData::from_string("hello"),
89            Arc::new(Tags::default()),
90        );
91        let debug = format!("{event:?}");
92        assert!(debug.contains("SourceEvent"));
93        assert!(debug.contains("len=5"));
94    }
95}