viewpoint_core/wait/
waiter.rs

1//! Load state waiter implementation.
2
3use std::collections::HashSet;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::sync::Arc;
6use std::time::Duration;
7
8use viewpoint_cdp::protocol::network::{LoadingFailedEvent, LoadingFinishedEvent, RequestWillBeSentEvent};
9use viewpoint_cdp::CdpEvent;
10use tokio::sync::{broadcast, Mutex};
11use tokio::time::{sleep, timeout, Instant};
12use tracing::{debug, instrument, trace, warn};
13
14use super::DocumentLoadState;
15use crate::error::WaitError;
16
17/// Default timeout for wait operations.
18const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
19
20/// Network idle threshold (no requests for this duration).
21const NETWORK_IDLE_THRESHOLD: Duration = Duration::from_millis(500);
22
23/// Waits for page load states by listening to CDP events.
24#[derive(Debug)]
25pub struct LoadStateWaiter {
26    /// Current load state.
27    current_state: Arc<Mutex<DocumentLoadState>>,
28    /// Event receiver for CDP events.
29    event_rx: broadcast::Receiver<CdpEvent>,
30    /// Session ID to filter events for.
31    session_id: String,
32    /// Frame ID to wait for.
33    frame_id: String,
34    /// Pending network request count.
35    pending_requests: Arc<AtomicUsize>,
36    /// Set of pending request IDs.
37    pending_request_ids: Arc<Mutex<HashSet<String>>>,
38}
39
40impl LoadStateWaiter {
41    /// Create a new load state waiter.
42    pub fn new(
43        event_rx: broadcast::Receiver<CdpEvent>,
44        session_id: String,
45        frame_id: String,
46    ) -> Self {
47        debug!(session_id = %session_id, frame_id = %frame_id, "Created LoadStateWaiter");
48        Self {
49            current_state: Arc::new(Mutex::new(DocumentLoadState::Commit)),
50            event_rx,
51            session_id,
52            frame_id,
53            pending_requests: Arc::new(AtomicUsize::new(0)),
54            pending_request_ids: Arc::new(Mutex::new(HashSet::new())),
55        }
56    }
57
58    /// Wait for the specified load state to be reached.
59    ///
60    /// # Errors
61    ///
62    /// Returns an error if the wait times out or is cancelled.
63    pub async fn wait_for_load_state(
64        &mut self,
65        target_state: DocumentLoadState,
66    ) -> Result<(), WaitError> {
67        self.wait_for_load_state_with_timeout(target_state, DEFAULT_TIMEOUT)
68            .await
69    }
70
71    /// Wait for the specified load state with a custom timeout.
72    ///
73    /// # Errors
74    ///
75    /// Returns an error if the wait times out or is cancelled.
76    #[instrument(level = "debug", skip(self), fields(target_state = ?target_state, timeout_ms = timeout_duration.as_millis()))]
77    pub async fn wait_for_load_state_with_timeout(
78        &mut self,
79        target_state: DocumentLoadState,
80        timeout_duration: Duration,
81    ) -> Result<(), WaitError> {
82        // Check if already reached
83        {
84            let current = *self.current_state.lock().await;
85            if target_state.is_reached(current) {
86                debug!(current = ?current, "Target state already reached");
87                return Ok(());
88            }
89            trace!(current = ?current, "Starting wait for target state");
90        }
91
92        let result = timeout(timeout_duration, self.wait_for_state_impl(target_state)).await;
93
94        match result {
95            Ok(Ok(())) => {
96                debug!("Wait completed successfully");
97                Ok(())
98            }
99            Ok(Err(e)) => {
100                warn!(error = ?e, "Wait failed with error");
101                Err(e)
102            }
103            Err(_) => {
104                warn!(timeout_ms = timeout_duration.as_millis(), "Wait timed out");
105                Err(WaitError::Timeout(timeout_duration))
106            }
107        }
108    }
109
110    /// Internal implementation of waiting for a load state.
111    async fn wait_for_state_impl(&mut self, target_state: DocumentLoadState) -> Result<(), WaitError> {
112        let mut last_network_activity = Instant::now();
113
114        loop {
115            // Check current state
116            {
117                let current = *self.current_state.lock().await;
118                if target_state.is_reached(current) {
119                    // For NetworkIdle, we need additional checking
120                    if target_state == DocumentLoadState::NetworkIdle {
121                        let pending = self.pending_requests.load(Ordering::Relaxed);
122                        if pending == 0 && last_network_activity.elapsed() >= NETWORK_IDLE_THRESHOLD {
123                            return Ok(());
124                        }
125                    } else {
126                        return Ok(());
127                    }
128                }
129            }
130
131            // Wait for the next event
132            let event = match self.event_rx.recv().await {
133                Ok(event) => event,
134                Err(broadcast::error::RecvError::Closed) => {
135                    return Err(WaitError::PageClosed);
136                }
137                Err(broadcast::error::RecvError::Lagged(_)) => {
138                    // Missed some events, continue
139                    continue;
140                }
141            };
142
143            // Filter for our session
144            if event.session_id.as_deref() != Some(&self.session_id) {
145                continue;
146            }
147
148            // Process the event
149            match event.method.as_str() {
150                "Page.domContentEventFired" => {
151                    let mut current = self.current_state.lock().await;
152                    if *current < DocumentLoadState::DomContentLoaded {
153                        debug!(previous = ?*current, "State transition: DomContentLoaded");
154                        *current = DocumentLoadState::DomContentLoaded;
155                    }
156                }
157                "Page.loadEventFired" => {
158                    let mut current = self.current_state.lock().await;
159                    if *current < DocumentLoadState::Load {
160                        debug!(previous = ?*current, "State transition: Load");
161                        *current = DocumentLoadState::Load;
162                    }
163                }
164                "Network.requestWillBeSent" => {
165                    if let Some(params) = event.params {
166                        if let Ok(req) = serde_json::from_value::<RequestWillBeSentEvent>(params) {
167                            // Only track main frame requests
168                            if req.frame_id.as_deref() == Some(&self.frame_id) {
169                                let mut ids = self.pending_request_ids.lock().await;
170                                if ids.insert(req.request_id.clone()) {
171                                    let count = self.pending_requests.fetch_add(1, Ordering::Relaxed) + 1;
172                                    trace!(request_id = %req.request_id, pending_count = count, "Network request started");
173                                    last_network_activity = Instant::now();
174                                }
175                            }
176                        }
177                    }
178                }
179                "Network.loadingFinished" => {
180                    if let Some(params) = event.params {
181                        if let Ok(finished) = serde_json::from_value::<LoadingFinishedEvent>(params) {
182                            let mut ids = self.pending_request_ids.lock().await;
183                            if ids.remove(&finished.request_id) {
184                                let count = self.pending_requests.fetch_sub(1, Ordering::Relaxed) - 1;
185                                trace!(request_id = %finished.request_id, pending_count = count, "Network request finished");
186                                last_network_activity = Instant::now();
187                            }
188                        }
189                    }
190                }
191                "Network.loadingFailed" => {
192                    if let Some(params) = event.params {
193                        if let Ok(failed) = serde_json::from_value::<LoadingFailedEvent>(params) {
194                            let mut ids = self.pending_request_ids.lock().await;
195                            if ids.remove(&failed.request_id) {
196                                let count = self.pending_requests.fetch_sub(1, Ordering::Relaxed) - 1;
197                                trace!(request_id = %failed.request_id, pending_count = count, "Network request failed");
198                                last_network_activity = Instant::now();
199                            }
200                        }
201                    }
202                }
203                _ => {}
204            }
205
206            // For NetworkIdle, check if we've been idle long enough
207            if target_state == DocumentLoadState::NetworkIdle {
208                let pending = self.pending_requests.load(Ordering::Relaxed);
209                let current = *self.current_state.lock().await;
210                if pending == 0 && current >= DocumentLoadState::Load {
211                    // Wait for the idle threshold
212                    sleep(NETWORK_IDLE_THRESHOLD).await;
213                    // Check again after sleeping
214                    let pending_after = self.pending_requests.load(Ordering::Relaxed);
215                    if pending_after == 0 {
216                        return Ok(());
217                    }
218                }
219            }
220        }
221    }
222
223    /// Set the current load state (used when commit is detected).
224    pub async fn set_commit_received(&self) {
225        let mut current = self.current_state.lock().await;
226        if *current < DocumentLoadState::Commit {
227            debug!("State transition: Commit");
228            *current = DocumentLoadState::Commit;
229        }
230    }
231
232    /// Get the current load state.
233    pub async fn current_state(&self) -> DocumentLoadState {
234        *self.current_state.lock().await
235    }
236}