inevents_websocket_client/
lib.rs1use futures_util::{SinkExt, StreamExt};
2use json_filter::Operator;
3use serde::de::DeserializeOwned;
4use tokio_tungstenite::tungstenite::Message;
5
6#[derive(Clone)]
7pub struct EventStreamClient(reqwest::Url);
8
9impl Default for EventStreamClient {
10 fn default() -> Self {
11 Self(reqwest::Url::parse("wss://ws-events-v3.intear.tech").unwrap())
12 }
13}
14
15impl EventStreamClient {
16 pub fn new(url: reqwest::Url) -> Self {
17 Self(url)
18 }
19
20 pub async fn stream_events<E, F, Fut>(
21 &self,
22 event_id: &'static str,
23 filter: Option<Operator>,
24 mut callback: F,
25 ) where
26 E: DeserializeOwned + Send + 'static,
27 F: FnMut(E) -> Fut + Send + 'static,
28 Fut: std::future::Future<Output = ()> + Send + 'static,
29 {
30 'outer: loop {
31 let url = self
32 .0
33 .join(&format!("events/{event_id}"))
34 .expect("Failed to join URL paths");
35
36 let (mut stream, _) = tokio_tungstenite::connect_async(url.as_str())
37 .await
38 .expect("Failed to connect to event stream");
39
40 let filter_json = if let Some(filter) = &filter {
41 serde_json::to_string(filter).expect("Failed to serialize filter")
42 } else {
43 r#"{"And":[]}"#.to_string()
44 };
45
46 stream
47 .send(Message::Text(filter_json))
48 .await
49 .expect("Failed to send filter");
50
51 while let Some(message) = stream.next().await {
52 let Ok(msg) = message else {
53 log::warn!("Connection error, reconnecting in 1 second");
54 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
55 continue 'outer;
56 };
57 match msg {
58 Message::Close(_) => {
59 log::warn!(
60 "Event stream events/{event_id} closed, reconnecting in 1 second"
61 );
62 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
63 continue 'outer;
64 }
65 Message::Ping(data) => {
66 stream
67 .send(Message::Pong(data))
68 .await
69 .expect("Failed to pong");
70 }
71 Message::Pong(_) => {}
72 Message::Text(text) => {
73 let events: Vec<E> = serde_json::from_str(&text).unwrap_or_else(|err| {
74 panic!("Failed to parse message: {text}, error: {err}")
75 });
76 for event in events {
77 callback(event).await;
78 }
79 }
80 Message::Binary(_) => {}
81 Message::Frame(_) => unreachable!(),
82 }
83 }
84 log::warn!("Connection lost, reconnecting in 1 second");
85 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
86 }
87 }
88}
89
90#[cfg(test)]
91mod tests {
92 use json_filter::{Filter, Operator};
93
94 use super::*;
95 use std::sync::{Arc, Mutex};
96
97 #[tokio::test]
98 async fn test_log_text_stream() {
99 let client = EventStreamClient::default();
100 let received_events = Arc::new(Mutex::new(Vec::new()));
101 let events_clone = Arc::clone(&received_events);
102
103 let handle = tokio::spawn(async move {
104 client
105 .stream_events::<serde_json::Value, _, _>("log_text", None, move |event| {
106 let events = Arc::clone(&events_clone);
107 async move {
108 events.lock().unwrap().push(event);
109 }
110 })
111 .await;
112 });
113
114 tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
116 handle.abort();
117
118 let final_events = received_events.lock().unwrap();
119 assert!(!final_events.is_empty(), "Should have received some events");
120 println!("\nTest Summary:");
121 println!("Total events received: {}", final_events.len());
122 println!(
123 "First event: {}",
124 serde_json::to_string(&final_events[0]).unwrap()
125 );
126 println!(
127 "Last event: {}",
128 serde_json::to_string(&final_events.last().unwrap()).unwrap()
129 );
130 }
131
132 #[tokio::test]
133 async fn test_filter_events() {
134 let client = EventStreamClient::default();
135 let received_events = Arc::new(Mutex::new(Vec::new()));
136 let events_clone = Arc::clone(&received_events);
137 let filter = Operator::And(vec![Filter::new(
138 "event_standard",
139 Operator::Equals("nep141".into()),
140 )]);
141
142 let handle = tokio::spawn(async move {
143 client
144 .stream_events::<serde_json::Value, _, _>(
145 "log_nep297",
146 Some(filter),
147 move |event| {
148 let events = Arc::clone(&events_clone);
149 async move {
150 events.lock().unwrap().push(event);
151 }
152 },
153 )
154 .await;
155 });
156
157 tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
159 handle.abort();
160
161 let final_events = received_events.lock().unwrap();
162 assert!(!final_events.is_empty(), "Should have received some events");
163 println!("\nTest Summary:");
164 println!("Total events received: {}", final_events.len());
165 println!(
166 "First event: {}",
167 serde_json::to_string(&final_events[0]).unwrap()
168 );
169 println!(
170 "Last event: {}",
171 serde_json::to_string(&final_events.last().unwrap()).unwrap()
172 );
173 }
174}