viewpoint_core/wait/
waiter.rs1use std::collections::HashSet;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::sync::Arc;
6use std::time::Duration;
7
8use viewpoint_cdp::protocol::network::{LoadingFailedEvent, LoadingFinishedEvent, RequestWillBeSentEvent};
9use viewpoint_cdp::CdpEvent;
10use tokio::sync::{broadcast, Mutex};
11use tokio::time::{sleep, timeout, Instant};
12use tracing::{debug, instrument, trace, warn};
13
14use super::DocumentLoadState;
15use crate::error::WaitError;
16
17const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
19
20const NETWORK_IDLE_THRESHOLD: Duration = Duration::from_millis(500);
22
23#[derive(Debug)]
25pub struct LoadStateWaiter {
26 current_state: Arc<Mutex<DocumentLoadState>>,
28 event_rx: broadcast::Receiver<CdpEvent>,
30 session_id: String,
32 frame_id: String,
34 pending_requests: Arc<AtomicUsize>,
36 pending_request_ids: Arc<Mutex<HashSet<String>>>,
38}
39
40impl LoadStateWaiter {
41 pub fn new(
43 event_rx: broadcast::Receiver<CdpEvent>,
44 session_id: String,
45 frame_id: String,
46 ) -> Self {
47 debug!(session_id = %session_id, frame_id = %frame_id, "Created LoadStateWaiter");
48 Self {
49 current_state: Arc::new(Mutex::new(DocumentLoadState::Commit)),
50 event_rx,
51 session_id,
52 frame_id,
53 pending_requests: Arc::new(AtomicUsize::new(0)),
54 pending_request_ids: Arc::new(Mutex::new(HashSet::new())),
55 }
56 }
57
58 pub async fn wait_for_load_state(
64 &mut self,
65 target_state: DocumentLoadState,
66 ) -> Result<(), WaitError> {
67 self.wait_for_load_state_with_timeout(target_state, DEFAULT_TIMEOUT)
68 .await
69 }
70
71 #[instrument(level = "debug", skip(self), fields(target_state = ?target_state, timeout_ms = timeout_duration.as_millis()))]
77 pub async fn wait_for_load_state_with_timeout(
78 &mut self,
79 target_state: DocumentLoadState,
80 timeout_duration: Duration,
81 ) -> Result<(), WaitError> {
82 {
84 let current = *self.current_state.lock().await;
85 if target_state.is_reached(current) {
86 debug!(current = ?current, "Target state already reached");
87 return Ok(());
88 }
89 trace!(current = ?current, "Starting wait for target state");
90 }
91
92 let result = timeout(timeout_duration, self.wait_for_state_impl(target_state)).await;
93
94 match result {
95 Ok(Ok(())) => {
96 debug!("Wait completed successfully");
97 Ok(())
98 }
99 Ok(Err(e)) => {
100 warn!(error = ?e, "Wait failed with error");
101 Err(e)
102 }
103 Err(_) => {
104 warn!(timeout_ms = timeout_duration.as_millis(), "Wait timed out");
105 Err(WaitError::Timeout(timeout_duration))
106 }
107 }
108 }
109
110 async fn wait_for_state_impl(&mut self, target_state: DocumentLoadState) -> Result<(), WaitError> {
112 let mut last_network_activity = Instant::now();
113
114 loop {
115 {
117 let current = *self.current_state.lock().await;
118 if target_state.is_reached(current) {
119 if target_state == DocumentLoadState::NetworkIdle {
121 let pending = self.pending_requests.load(Ordering::Relaxed);
122 if pending == 0 && last_network_activity.elapsed() >= NETWORK_IDLE_THRESHOLD {
123 return Ok(());
124 }
125 } else {
126 return Ok(());
127 }
128 }
129 }
130
131 let event = match self.event_rx.recv().await {
133 Ok(event) => event,
134 Err(broadcast::error::RecvError::Closed) => {
135 return Err(WaitError::PageClosed);
136 }
137 Err(broadcast::error::RecvError::Lagged(_)) => {
138 continue;
140 }
141 };
142
143 if event.session_id.as_deref() != Some(&self.session_id) {
145 continue;
146 }
147
148 match event.method.as_str() {
150 "Page.domContentEventFired" => {
151 let mut current = self.current_state.lock().await;
152 if *current < DocumentLoadState::DomContentLoaded {
153 debug!(previous = ?*current, "State transition: DomContentLoaded");
154 *current = DocumentLoadState::DomContentLoaded;
155 }
156 }
157 "Page.loadEventFired" => {
158 let mut current = self.current_state.lock().await;
159 if *current < DocumentLoadState::Load {
160 debug!(previous = ?*current, "State transition: Load");
161 *current = DocumentLoadState::Load;
162 }
163 }
164 "Network.requestWillBeSent" => {
165 if let Some(params) = event.params {
166 if let Ok(req) = serde_json::from_value::<RequestWillBeSentEvent>(params) {
167 if req.frame_id.as_deref() == Some(&self.frame_id) {
169 let mut ids = self.pending_request_ids.lock().await;
170 if ids.insert(req.request_id.clone()) {
171 let count = self.pending_requests.fetch_add(1, Ordering::Relaxed) + 1;
172 trace!(request_id = %req.request_id, pending_count = count, "Network request started");
173 last_network_activity = Instant::now();
174 }
175 }
176 }
177 }
178 }
179 "Network.loadingFinished" => {
180 if let Some(params) = event.params {
181 if let Ok(finished) = serde_json::from_value::<LoadingFinishedEvent>(params) {
182 let mut ids = self.pending_request_ids.lock().await;
183 if ids.remove(&finished.request_id) {
184 let count = self.pending_requests.fetch_sub(1, Ordering::Relaxed) - 1;
185 trace!(request_id = %finished.request_id, pending_count = count, "Network request finished");
186 last_network_activity = Instant::now();
187 }
188 }
189 }
190 }
191 "Network.loadingFailed" => {
192 if let Some(params) = event.params {
193 if let Ok(failed) = serde_json::from_value::<LoadingFailedEvent>(params) {
194 let mut ids = self.pending_request_ids.lock().await;
195 if ids.remove(&failed.request_id) {
196 let count = self.pending_requests.fetch_sub(1, Ordering::Relaxed) - 1;
197 trace!(request_id = %failed.request_id, pending_count = count, "Network request failed");
198 last_network_activity = Instant::now();
199 }
200 }
201 }
202 }
203 _ => {}
204 }
205
206 if target_state == DocumentLoadState::NetworkIdle {
208 let pending = self.pending_requests.load(Ordering::Relaxed);
209 let current = *self.current_state.lock().await;
210 if pending == 0 && current >= DocumentLoadState::Load {
211 sleep(NETWORK_IDLE_THRESHOLD).await;
213 let pending_after = self.pending_requests.load(Ordering::Relaxed);
215 if pending_after == 0 {
216 return Ok(());
217 }
218 }
219 }
220 }
221 }
222
223 pub async fn set_commit_received(&self) {
225 let mut current = self.current_state.lock().await;
226 if *current < DocumentLoadState::Commit {
227 debug!("State transition: Commit");
228 *current = DocumentLoadState::Commit;
229 }
230 }
231
232 pub async fn current_state(&self) -> DocumentLoadState {
234 *self.current_state.lock().await
235 }
236}