viewpoint_core/network/events/
mod.rs1use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::Duration;
6
7use tokio::sync::broadcast;
8use viewpoint_cdp::protocol::network::{
9 LoadingFailedEvent, LoadingFinishedEvent, RequestWillBeSentEvent, ResponseReceivedEvent,
10};
11use viewpoint_cdp::CdpConnection;
12
13use super::request::Request;
14use super::response::Response;
15use super::types::{ResourceType, UrlMatcher};
16use crate::error::NetworkError;
17
18#[derive(Debug, Clone)]
20pub struct RequestEvent {
21 pub request: Request,
23}
24
25#[derive(Debug, Clone)]
27pub struct ResponseEvent {
28 pub response: Response,
30}
31
32#[derive(Debug, Clone)]
34pub struct RequestFinishedEvent {
35 pub request: Request,
37}
38
39#[derive(Debug, Clone)]
41pub struct RequestFailedEvent {
42 pub request: Request,
44 pub error: String,
46}
47
48#[derive(Debug, Clone)]
50pub enum NetworkEvent {
51 Request(RequestEvent),
53 Response(ResponseEvent),
55 RequestFinished(RequestFinishedEvent),
57 RequestFailed(RequestFailedEvent),
59}
60
61#[derive(Debug)]
63pub struct NetworkEventListener {
64 connection: Arc<CdpConnection>,
66 session_id: String,
68 event_tx: broadcast::Sender<NetworkEvent>,
70}
71
72impl NetworkEventListener {
73 pub fn new(connection: Arc<CdpConnection>, session_id: String) -> Self {
75 let (event_tx, _) = broadcast::channel(256);
76 Self {
77 connection,
78 session_id,
79 event_tx,
80 }
81 }
82
83 pub fn subscribe(&self) -> broadcast::Receiver<NetworkEvent> {
85 self.event_tx.subscribe()
86 }
87
88 pub fn start(&self) {
92 let mut cdp_events = self.connection.subscribe_events();
93 let session_id = self.session_id.clone();
94 let event_tx = self.event_tx.clone();
95 let connection = self.connection.clone();
96
97 tokio::spawn(async move {
98 let mut pending_requests: HashMap<String, Request> = HashMap::new();
100
101 while let Ok(event) = cdp_events.recv().await {
102 if event.session_id.as_deref() != Some(&session_id) {
104 continue;
105 }
106
107 match event.method.as_str() {
109 "Network.requestWillBeSent" => {
110 if let Some(params) = &event.params {
111 if let Ok(req_event) =
112 serde_json::from_value::<RequestWillBeSentEvent>(params.clone())
113 {
114 let previous_request = if req_event.redirect_response.is_some() {
116 pending_requests.remove(&req_event.request_id)
118 } else {
119 None
120 };
121
122 let request =
123 parse_request_will_be_sent(&req_event, previous_request);
124 pending_requests
125 .insert(req_event.request_id.clone(), request.clone());
126 let _ = event_tx.send(NetworkEvent::Request(RequestEvent {
127 request,
128 }));
129 }
130 }
131 }
132 "Network.responseReceived" => {
133 if let Some(params) = &event.params {
134 if let Ok(resp_event) =
135 serde_json::from_value::<ResponseReceivedEvent>(params.clone())
136 {
137 if let Some(request) =
139 pending_requests.get(&resp_event.request_id).cloned()
140 {
141 let response = Response::new(
142 resp_event.response,
143 request,
144 connection.clone(),
145 session_id.clone(),
146 resp_event.request_id.clone(),
147 );
148 let _ = event_tx.send(NetworkEvent::Response(ResponseEvent {
149 response,
150 }));
151 }
152 }
153 }
154 }
155 "Network.loadingFinished" => {
156 if let Some(params) = &event.params {
157 if let Ok(finished_event) =
158 serde_json::from_value::<LoadingFinishedEvent>(params.clone())
159 {
160 if let Some(request) =
161 pending_requests.remove(&finished_event.request_id)
162 {
163 let _ = event_tx.send(NetworkEvent::RequestFinished(
164 RequestFinishedEvent { request },
165 ));
166 }
167 }
168 }
169 }
170 "Network.loadingFailed" => {
171 if let Some(params) = &event.params {
172 if let Ok(failed_event) =
173 serde_json::from_value::<LoadingFailedEvent>(params.clone())
174 {
175 if let Some(request) =
176 pending_requests.remove(&failed_event.request_id)
177 {
178 let _ = event_tx.send(NetworkEvent::RequestFailed(
179 RequestFailedEvent {
180 request,
181 error: failed_event.error_text,
182 },
183 ));
184 }
185 }
186 }
187 }
188 _ => {}
189 }
190 }
191 });
192 }
193}
194
195fn parse_request_will_be_sent(
200 event: &RequestWillBeSentEvent,
201 previous_request: Option<Request>,
202) -> Request {
203 let resource_type = event
204 .resource_type
205 .as_ref()
206 .map_or(ResourceType::Other, |t| parse_resource_type(t));
207
208 Request {
209 url: event.request.url.clone(),
210 method: event.request.method.clone(),
211 headers: event.request.headers.clone(),
212 post_data: event.request.post_data.clone(),
213 resource_type,
214 frame_id: event.frame_id.clone().unwrap_or_default(),
215 is_navigation: event.initiator.initiator_type == "navigation",
216 connection: None,
217 session_id: None,
218 request_id: Some(event.request_id.clone()),
219 redirected_from: previous_request.map(Box::new),
220 redirected_to: None,
221 timing: None,
222 failure_text: None,
223 }
224}
225
226fn parse_resource_type(s: &str) -> ResourceType {
228 match s.to_lowercase().as_str() {
229 "document" => ResourceType::Document,
230 "stylesheet" => ResourceType::Stylesheet,
231 "image" => ResourceType::Image,
232 "media" => ResourceType::Media,
233 "font" => ResourceType::Font,
234 "script" => ResourceType::Script,
235 "texttrack" => ResourceType::TextTrack,
236 "xhr" => ResourceType::Xhr,
237 "fetch" => ResourceType::Fetch,
238 "eventsource" => ResourceType::EventSource,
239 "websocket" => ResourceType::WebSocket,
240 "manifest" => ResourceType::Manifest,
241 "ping" => ResourceType::Ping,
242 "other" | _ => ResourceType::Other,
243 }
244}
245
246#[derive(Debug)]
248pub struct WaitForRequestBuilder<'a, M> {
249 connection: &'a Arc<CdpConnection>,
251 session_id: &'a str,
253 pattern: M,
255 timeout: Duration,
257}
258
259impl<'a, M: UrlMatcher + Clone + 'static> WaitForRequestBuilder<'a, M> {
260 pub fn new(connection: &'a Arc<CdpConnection>, session_id: &'a str, pattern: M) -> Self {
262 Self {
263 connection,
264 session_id,
265 pattern,
266 timeout: Duration::from_secs(30),
267 }
268 }
269
270 #[must_use]
272 pub fn timeout(mut self, timeout: Duration) -> Self {
273 self.timeout = timeout;
274 self
275 }
276
277 pub async fn wait(self) -> Result<Request, NetworkError> {
284 let mut events = self.connection.subscribe_events();
285 let session_id = self.session_id.to_string();
286 let pattern = self.pattern;
287 let timeout = self.timeout;
288
289 tokio::time::timeout(timeout, async move {
290 while let Ok(event) = events.recv().await {
291 if event.session_id.as_deref() != Some(&session_id) {
293 continue;
294 }
295
296 if event.method == "Network.requestWillBeSent" {
297 if let Some(params) = &event.params {
298 if let Ok(req_event) =
299 serde_json::from_value::<RequestWillBeSentEvent>(params.clone())
300 {
301 if pattern.matches(&req_event.request.url) {
302 return Ok(parse_request_will_be_sent(&req_event, None));
303 }
304 }
305 }
306 }
307 }
308 Err(NetworkError::Aborted)
309 })
310 .await
311 .map_err(|_| NetworkError::Timeout(timeout))?
312 }
313}
314
315#[derive(Debug)]
317pub struct WaitForResponseBuilder<'a, M> {
318 connection: &'a Arc<CdpConnection>,
320 session_id: &'a str,
322 pattern: M,
324 timeout: Duration,
326}
327
328impl<'a, M: UrlMatcher + Clone + 'static> WaitForResponseBuilder<'a, M> {
329 pub fn new(connection: &'a Arc<CdpConnection>, session_id: &'a str, pattern: M) -> Self {
331 Self {
332 connection,
333 session_id,
334 pattern,
335 timeout: Duration::from_secs(30),
336 }
337 }
338
339 #[must_use]
341 pub fn timeout(mut self, timeout: Duration) -> Self {
342 self.timeout = timeout;
343 self
344 }
345
346 pub async fn wait(self) -> Result<Response, NetworkError> {
353 let mut events = self.connection.subscribe_events();
354 let session_id = self.session_id.to_string();
355 let pattern = self.pattern;
356 let timeout = self.timeout;
357 let connection = self.connection.clone();
358
359 tokio::time::timeout(timeout, async move {
360 let mut pending_requests: HashMap<String, Request> = HashMap::new();
361
362 while let Ok(event) = events.recv().await {
363 if event.session_id.as_deref() != Some(&session_id) {
365 continue;
366 }
367
368 match event.method.as_str() {
369 "Network.requestWillBeSent" => {
370 if let Some(params) = &event.params {
372 if let Ok(req_event) =
373 serde_json::from_value::<RequestWillBeSentEvent>(params.clone())
374 {
375 let request = parse_request_will_be_sent(&req_event, None);
376 pending_requests.insert(req_event.request_id.clone(), request);
377 }
378 }
379 }
380 "Network.responseReceived" => {
381 if let Some(params) = &event.params {
382 if let Ok(resp_event) =
383 serde_json::from_value::<ResponseReceivedEvent>(params.clone())
384 {
385 if pattern.matches(&resp_event.response.url) {
386 let request = pending_requests
388 .get(&resp_event.request_id)
389 .cloned()
390 .unwrap_or_else(|| Request {
391 url: resp_event.response.url.clone(),
392 method: "GET".to_string(),
393 headers: HashMap::new(),
394 post_data: None,
395 resource_type: ResourceType::Other,
396 frame_id: resp_event.frame_id.clone().unwrap_or_default(),
397 is_navigation: false,
398 connection: None,
399 session_id: None,
400 request_id: Some(resp_event.request_id.clone()),
401 redirected_from: None,
402 redirected_to: None,
403 timing: None,
404 failure_text: None,
405 });
406
407 return Ok(Response::new(
408 resp_event.response,
409 request,
410 connection.clone(),
411 session_id.clone(),
412 resp_event.request_id.clone(),
413 ));
414 }
415 }
416 }
417 }
418 _ => {}
419 }
420 }
421 Err(NetworkError::Aborted)
422 })
423 .await
424 .map_err(|_| NetworkError::Timeout(timeout))?
425 }
426}
427
428#[cfg(test)]
429mod tests;