inevents_websocket_client/
lib.rs

1use futures_util::{SinkExt, StreamExt};
2use json_filter::Operator;
3use serde::de::DeserializeOwned;
4use tokio_tungstenite::tungstenite::Message;
5
6#[derive(Clone)]
7pub struct EventStreamClient(reqwest::Url);
8
9impl Default for EventStreamClient {
10    fn default() -> Self {
11        Self(reqwest::Url::parse("wss://ws-events-v3.intear.tech").unwrap())
12    }
13}
14
15impl EventStreamClient {
16    pub fn new(url: reqwest::Url) -> Self {
17        Self(url)
18    }
19
20    pub async fn stream_events<E, F, Fut>(
21        &self,
22        event_id: &'static str,
23        filter: Option<Operator>,
24        mut callback: F,
25    ) where
26        E: DeserializeOwned + Send + 'static,
27        F: FnMut(E) -> Fut + Send + 'static,
28        Fut: std::future::Future<Output = ()> + Send + 'static,
29    {
30        'outer: loop {
31            let url = self
32                .0
33                .join(&format!("events/{event_id}"))
34                .expect("Failed to join URL paths");
35
36            let (mut stream, _) = tokio_tungstenite::connect_async(url.as_str())
37                .await
38                .expect("Failed to connect to event stream");
39
40            let filter_json = if let Some(filter) = &filter {
41                serde_json::to_string(filter).expect("Failed to serialize filter")
42            } else {
43                r#"{"And":[]}"#.to_string()
44            };
45
46            stream
47                .send(Message::Text(filter_json))
48                .await
49                .expect("Failed to send filter");
50
51            while let Some(message) = stream.next().await {
52                let Ok(msg) = message else {
53                    log::warn!("Connection error, reconnecting in 1 second");
54                    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
55                    continue 'outer;
56                };
57                match msg {
58                    Message::Close(_) => {
59                        log::warn!(
60                            "Event stream events/{event_id} closed, reconnecting in 1 second"
61                        );
62                        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
63                        continue 'outer;
64                    }
65                    Message::Ping(data) => {
66                        stream
67                            .send(Message::Pong(data))
68                            .await
69                            .expect("Failed to pong");
70                    }
71                    Message::Pong(_) => {}
72                    Message::Text(text) => {
73                        let events: Vec<E> = serde_json::from_str(&text).unwrap_or_else(|err| {
74                            panic!("Failed to parse message: {text}, error: {err}")
75                        });
76                        for event in events {
77                            callback(event).await;
78                        }
79                    }
80                    Message::Binary(_) => {}
81                    Message::Frame(_) => unreachable!(),
82                }
83            }
84            log::warn!("Connection lost, reconnecting in 1 second");
85            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
86        }
87    }
88}
89
90#[cfg(test)]
91mod tests {
92    use json_filter::{Filter, Operator};
93
94    use super::*;
95    use std::sync::{Arc, Mutex};
96
97    #[tokio::test]
98    async fn test_log_text_stream() {
99        let client = EventStreamClient::default();
100        let received_events = Arc::new(Mutex::new(Vec::new()));
101        let events_clone = Arc::clone(&received_events);
102
103        let handle = tokio::spawn(async move {
104            client
105                .stream_events::<serde_json::Value, _, _>("log_text", None, move |event| {
106                    let events = Arc::clone(&events_clone);
107                    async move {
108                        events.lock().unwrap().push(event);
109                    }
110                })
111                .await;
112        });
113
114        // Wait for some events
115        tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
116        handle.abort();
117
118        let final_events = received_events.lock().unwrap();
119        assert!(!final_events.is_empty(), "Should have received some events");
120        println!("\nTest Summary:");
121        println!("Total events received: {}", final_events.len());
122        println!(
123            "First event: {}",
124            serde_json::to_string(&final_events[0]).unwrap()
125        );
126        println!(
127            "Last event: {}",
128            serde_json::to_string(&final_events.last().unwrap()).unwrap()
129        );
130    }
131
132    #[tokio::test]
133    async fn test_filter_events() {
134        let client = EventStreamClient::default();
135        let received_events = Arc::new(Mutex::new(Vec::new()));
136        let events_clone = Arc::clone(&received_events);
137        let filter = Operator::And(vec![Filter::new(
138            "event_standard",
139            Operator::Equals("nep141".into()),
140        )]);
141
142        let handle = tokio::spawn(async move {
143            client
144                .stream_events::<serde_json::Value, _, _>(
145                    "log_nep297",
146                    Some(filter),
147                    move |event| {
148                        let events = Arc::clone(&events_clone);
149                        async move {
150                            events.lock().unwrap().push(event);
151                        }
152                    },
153                )
154                .await;
155        });
156
157        // Wait for some events
158        tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
159        handle.abort();
160
161        let final_events = received_events.lock().unwrap();
162        assert!(!final_events.is_empty(), "Should have received some events");
163        println!("\nTest Summary:");
164        println!("Total events received: {}", final_events.len());
165        println!(
166            "First event: {}",
167            serde_json::to_string(&final_events[0]).unwrap()
168        );
169        println!(
170            "Last event: {}",
171            serde_json::to_string(&final_events.last().unwrap()).unwrap()
172        );
173    }
174}