viewpoint_core/network/events/
mod.rs

1//! Network event handling.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::Duration;
6
7use tokio::sync::broadcast;
8use viewpoint_cdp::protocol::network::{
9    LoadingFailedEvent, LoadingFinishedEvent, RequestWillBeSentEvent, ResponseReceivedEvent,
10};
11use viewpoint_cdp::CdpConnection;
12
13use super::request::Request;
14use super::response::Response;
15use super::types::{ResourceType, UrlMatcher};
16use crate::error::NetworkError;
17
18/// Event emitted when a request is made.
19#[derive(Debug, Clone)]
20pub struct RequestEvent {
21    /// The request.
22    pub request: Request,
23}
24
25/// Event emitted when a response is received.
26#[derive(Debug, Clone)]
27pub struct ResponseEvent {
28    /// The response.
29    pub response: Response,
30}
31
32/// Event emitted when a request finishes.
33#[derive(Debug, Clone)]
34pub struct RequestFinishedEvent {
35    /// The request that finished.
36    pub request: Request,
37}
38
39/// Event emitted when a request fails.
40#[derive(Debug, Clone)]
41pub struct RequestFailedEvent {
42    /// The failed request.
43    pub request: Request,
44    /// The error message.
45    pub error: String,
46}
47
48/// Network event types.
49#[derive(Debug, Clone)]
50pub enum NetworkEvent {
51    /// Request made.
52    Request(RequestEvent),
53    /// Response received.
54    Response(ResponseEvent),
55    /// Request finished.
56    RequestFinished(RequestFinishedEvent),
57    /// Request failed.
58    RequestFailed(RequestFailedEvent),
59}
60
61/// Network event listener for a page.
62#[derive(Debug)]
63pub struct NetworkEventListener {
64    /// CDP connection.
65    connection: Arc<CdpConnection>,
66    /// Session ID.
67    session_id: String,
68    /// Event sender.
69    event_tx: broadcast::Sender<NetworkEvent>,
70}
71
72impl NetworkEventListener {
73    /// Create a new network event listener.
74    pub fn new(connection: Arc<CdpConnection>, session_id: String) -> Self {
75        let (event_tx, _) = broadcast::channel(256);
76        Self {
77            connection,
78            session_id,
79            event_tx,
80        }
81    }
82
83    /// Subscribe to network events.
84    pub fn subscribe(&self) -> broadcast::Receiver<NetworkEvent> {
85        self.event_tx.subscribe()
86    }
87
88    /// Start listening for network events.
89    ///
90    /// This spawns a background task that processes CDP events.
91    pub fn start(&self) {
92        let mut cdp_events = self.connection.subscribe_events();
93        let session_id = self.session_id.clone();
94        let event_tx = self.event_tx.clone();
95        let connection = self.connection.clone();
96
97        tokio::spawn(async move {
98            // Track pending requests for building responses
99            let mut pending_requests: HashMap<String, Request> = HashMap::new();
100
101            while let Ok(event) = cdp_events.recv().await {
102                // Filter events for this session
103                if event.session_id.as_deref() != Some(&session_id) {
104                    continue;
105                }
106
107                // Process network events
108                match event.method.as_str() {
109                    "Network.requestWillBeSent" => {
110                        if let Some(params) = &event.params {
111                            if let Ok(req_event) =
112                                serde_json::from_value::<RequestWillBeSentEvent>(params.clone())
113                            {
114                                // Check if this is a redirect (redirect_response is present)
115                                let previous_request = if req_event.redirect_response.is_some() {
116                                    // This is a redirect - get the previous request with the same ID
117                                    pending_requests.remove(&req_event.request_id)
118                                } else {
119                                    None
120                                };
121
122                                let request =
123                                    parse_request_will_be_sent(&req_event, previous_request);
124                                pending_requests
125                                    .insert(req_event.request_id.clone(), request.clone());
126                                let _ = event_tx.send(NetworkEvent::Request(RequestEvent {
127                                    request,
128                                }));
129                            }
130                        }
131                    }
132                    "Network.responseReceived" => {
133                        if let Some(params) = &event.params {
134                            if let Ok(resp_event) =
135                                serde_json::from_value::<ResponseReceivedEvent>(params.clone())
136                            {
137                                // Get the associated request
138                                if let Some(request) =
139                                    pending_requests.get(&resp_event.request_id).cloned()
140                                {
141                                    let response = Response::new(
142                                        resp_event.response,
143                                        request,
144                                        connection.clone(),
145                                        session_id.clone(),
146                                        resp_event.request_id.clone(),
147                                    );
148                                    let _ = event_tx.send(NetworkEvent::Response(ResponseEvent {
149                                        response,
150                                    }));
151                                }
152                            }
153                        }
154                    }
155                    "Network.loadingFinished" => {
156                        if let Some(params) = &event.params {
157                            if let Ok(finished_event) =
158                                serde_json::from_value::<LoadingFinishedEvent>(params.clone())
159                            {
160                                if let Some(request) =
161                                    pending_requests.remove(&finished_event.request_id)
162                                {
163                                    let _ = event_tx.send(NetworkEvent::RequestFinished(
164                                        RequestFinishedEvent { request },
165                                    ));
166                                }
167                            }
168                        }
169                    }
170                    "Network.loadingFailed" => {
171                        if let Some(params) = &event.params {
172                            if let Ok(failed_event) =
173                                serde_json::from_value::<LoadingFailedEvent>(params.clone())
174                            {
175                                if let Some(request) =
176                                    pending_requests.remove(&failed_event.request_id)
177                                {
178                                    let _ = event_tx.send(NetworkEvent::RequestFailed(
179                                        RequestFailedEvent {
180                                            request,
181                                            error: failed_event.error_text,
182                                        },
183                                    ));
184                                }
185                            }
186                        }
187                    }
188                    _ => {}
189                }
190            }
191        });
192    }
193}
194
195/// Parse a `RequestWillBeSentEvent` into a Request.
196/// Parse a `RequestWillBeSentEvent` into a Request.
197///
198/// If `previous_request` is provided, it will be set as the `redirected_from` source.
199fn parse_request_will_be_sent(
200    event: &RequestWillBeSentEvent,
201    previous_request: Option<Request>,
202) -> Request {
203    let resource_type = event
204        .resource_type
205        .as_ref()
206        .map_or(ResourceType::Other, |t| parse_resource_type(t));
207
208    Request {
209        url: event.request.url.clone(),
210        method: event.request.method.clone(),
211        headers: event.request.headers.clone(),
212        post_data: event.request.post_data.clone(),
213        resource_type,
214        frame_id: event.frame_id.clone().unwrap_or_default(),
215        is_navigation: event.initiator.initiator_type == "navigation",
216        connection: None,
217        session_id: None,
218        request_id: Some(event.request_id.clone()),
219        redirected_from: previous_request.map(Box::new),
220        redirected_to: None,
221        timing: None,
222        failure_text: None,
223    }
224}
225
226/// Parse a resource type string into `ResourceType` enum.
227fn parse_resource_type(s: &str) -> ResourceType {
228    match s.to_lowercase().as_str() {
229        "document" => ResourceType::Document,
230        "stylesheet" => ResourceType::Stylesheet,
231        "image" => ResourceType::Image,
232        "media" => ResourceType::Media,
233        "font" => ResourceType::Font,
234        "script" => ResourceType::Script,
235        "texttrack" => ResourceType::TextTrack,
236        "xhr" => ResourceType::Xhr,
237        "fetch" => ResourceType::Fetch,
238        "eventsource" => ResourceType::EventSource,
239        "websocket" => ResourceType::WebSocket,
240        "manifest" => ResourceType::Manifest,
241        "ping" => ResourceType::Ping,
242        "other" | _ => ResourceType::Other,
243    }
244}
245
246/// Builder for waiting for a request.
247#[derive(Debug)]
248pub struct WaitForRequestBuilder<'a, M> {
249    /// Connection.
250    connection: &'a Arc<CdpConnection>,
251    /// Session ID.
252    session_id: &'a str,
253    /// Pattern to match.
254    pattern: M,
255    /// Timeout duration.
256    timeout: Duration,
257}
258
259impl<'a, M: UrlMatcher + Clone + 'static> WaitForRequestBuilder<'a, M> {
260    /// Create a new wait for request builder.
261    pub fn new(connection: &'a Arc<CdpConnection>, session_id: &'a str, pattern: M) -> Self {
262        Self {
263            connection,
264            session_id,
265            pattern,
266            timeout: Duration::from_secs(30),
267        }
268    }
269
270    /// Set the timeout duration.
271    #[must_use]
272    pub fn timeout(mut self, timeout: Duration) -> Self {
273        self.timeout = timeout;
274        self
275    }
276
277    /// Wait for a matching request.
278    ///
279    /// # Errors
280    ///
281    /// Returns an error if the wait times out before a matching request is received,
282    /// or if the event stream is aborted.
283    pub async fn wait(self) -> Result<Request, NetworkError> {
284        let mut events = self.connection.subscribe_events();
285        let session_id = self.session_id.to_string();
286        let pattern = self.pattern;
287        let timeout = self.timeout;
288
289        tokio::time::timeout(timeout, async move {
290            while let Ok(event) = events.recv().await {
291                // Filter for this session
292                if event.session_id.as_deref() != Some(&session_id) {
293                    continue;
294                }
295
296                if event.method == "Network.requestWillBeSent" {
297                    if let Some(params) = &event.params {
298                        if let Ok(req_event) =
299                            serde_json::from_value::<RequestWillBeSentEvent>(params.clone())
300                        {
301                            if pattern.matches(&req_event.request.url) {
302                                return Ok(parse_request_will_be_sent(&req_event, None));
303                            }
304                        }
305                    }
306                }
307            }
308            Err(NetworkError::Aborted)
309        })
310        .await
311        .map_err(|_| NetworkError::Timeout(timeout))?
312    }
313}
314
315/// Builder for waiting for a response.
316#[derive(Debug)]
317pub struct WaitForResponseBuilder<'a, M> {
318    /// Connection.
319    connection: &'a Arc<CdpConnection>,
320    /// Session ID.
321    session_id: &'a str,
322    /// Pattern to match.
323    pattern: M,
324    /// Timeout duration.
325    timeout: Duration,
326}
327
328impl<'a, M: UrlMatcher + Clone + 'static> WaitForResponseBuilder<'a, M> {
329    /// Create a new wait for response builder.
330    pub fn new(connection: &'a Arc<CdpConnection>, session_id: &'a str, pattern: M) -> Self {
331        Self {
332            connection,
333            session_id,
334            pattern,
335            timeout: Duration::from_secs(30),
336        }
337    }
338
339    /// Set the timeout duration.
340    #[must_use]
341    pub fn timeout(mut self, timeout: Duration) -> Self {
342        self.timeout = timeout;
343        self
344    }
345
346    /// Wait for a matching response.
347    ///
348    /// # Errors
349    ///
350    /// Returns an error if the wait times out before a matching response is received,
351    /// or if the event stream is aborted.
352    pub async fn wait(self) -> Result<Response, NetworkError> {
353        let mut events = self.connection.subscribe_events();
354        let session_id = self.session_id.to_string();
355        let pattern = self.pattern;
356        let timeout = self.timeout;
357        let connection = self.connection.clone();
358
359        tokio::time::timeout(timeout, async move {
360            let mut pending_requests: HashMap<String, Request> = HashMap::new();
361
362            while let Ok(event) = events.recv().await {
363                // Filter for this session
364                if event.session_id.as_deref() != Some(&session_id) {
365                    continue;
366                }
367
368                match event.method.as_str() {
369                    "Network.requestWillBeSent" => {
370                        // Track requests so we can associate them with responses
371                        if let Some(params) = &event.params {
372                            if let Ok(req_event) =
373                                serde_json::from_value::<RequestWillBeSentEvent>(params.clone())
374                            {
375                                let request = parse_request_will_be_sent(&req_event, None);
376                                pending_requests.insert(req_event.request_id.clone(), request);
377                            }
378                        }
379                    }
380                    "Network.responseReceived" => {
381                        if let Some(params) = &event.params {
382                            if let Ok(resp_event) =
383                                serde_json::from_value::<ResponseReceivedEvent>(params.clone())
384                            {
385                                if pattern.matches(&resp_event.response.url) {
386                                    // Get the associated request or create a minimal one
387                                    let request = pending_requests
388                                        .get(&resp_event.request_id)
389                                        .cloned()
390                                        .unwrap_or_else(|| Request {
391                                            url: resp_event.response.url.clone(),
392                                            method: "GET".to_string(),
393                                            headers: HashMap::new(),
394                                            post_data: None,
395                                            resource_type: ResourceType::Other,
396                                            frame_id: resp_event.frame_id.clone().unwrap_or_default(),
397                                            is_navigation: false,
398                                            connection: None,
399                                            session_id: None,
400                                            request_id: Some(resp_event.request_id.clone()),
401                                            redirected_from: None,
402                                            redirected_to: None,
403                                            timing: None,
404                                            failure_text: None,
405                                        });
406
407                                    return Ok(Response::new(
408                                        resp_event.response,
409                                        request,
410                                        connection.clone(),
411                                        session_id.clone(),
412                                        resp_event.request_id.clone(),
413                                    ));
414                                }
415                            }
416                        }
417                    }
418                    _ => {}
419                }
420            }
421            Err(NetworkError::Aborted)
422        })
423        .await
424        .map_err(|_| NetworkError::Timeout(timeout))?
425    }
426}
427
428#[cfg(test)]
429mod tests;