use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, Mutex};
use tokio::time::{timeout, Instant};
use tracing::{debug, instrument, trace};
use viewpoint_cdp::CdpEvent;
use super::{DocumentLoadState, LoadStateWaiter};
use crate::error::WaitError;
const NAVIGATION_DETECTION_WINDOW: Duration = Duration::from_millis(50);
const DEFAULT_NAVIGATION_TIMEOUT: Duration = Duration::from_secs(30);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum NavigationState {
Idle,
Navigating,
Complete,
}
#[derive(Debug)]
pub struct NavigationWaiter {
event_rx: broadcast::Receiver<CdpEvent>,
session_id: String,
frame_id: String,
state: Arc<Mutex<NavigationState>>,
created_at: Instant,
navigation_timeout: Duration,
}
impl NavigationWaiter {
pub fn new(
event_rx: broadcast::Receiver<CdpEvent>,
session_id: String,
frame_id: String,
) -> Self {
debug!(
session_id = %session_id,
frame_id = %frame_id,
"Created NavigationWaiter"
);
Self {
event_rx,
session_id,
frame_id,
state: Arc::new(Mutex::new(NavigationState::Idle)),
created_at: Instant::now(),
navigation_timeout: DEFAULT_NAVIGATION_TIMEOUT,
}
}
#[must_use]
pub fn timeout(mut self, timeout: Duration) -> Self {
self.navigation_timeout = timeout;
self
}
#[instrument(level = "debug", skip(self))]
pub async fn wait_for_navigation_if_triggered(mut self) -> Result<bool, WaitError> {
let navigation_detected = self.detect_navigation().await;
if !navigation_detected {
debug!("No navigation detected within detection window");
return Ok(false);
}
debug!("Navigation detected, waiting for load state");
self.wait_for_load_complete().await?;
debug!("Navigation completed successfully");
Ok(true)
}
async fn detect_navigation(&mut self) -> bool {
let remaining_window = NAVIGATION_DETECTION_WINDOW
.checked_sub(self.created_at.elapsed())
.unwrap_or(Duration::ZERO);
if remaining_window.is_zero() {
return *self.state.lock().await != NavigationState::Idle;
}
let result = timeout(remaining_window, self.wait_for_navigation_event()).await;
if let Ok(true) = result {
trace!("Navigation event received within detection window");
true
} else {
trace!("No navigation event within detection window");
false
}
}
async fn wait_for_navigation_event(&mut self) -> bool {
loop {
let event = match self.event_rx.recv().await {
Ok(event) => event,
Err(broadcast::error::RecvError::Closed) => return false,
Err(broadcast::error::RecvError::Lagged(_)) => continue,
};
if event.session_id.as_deref() != Some(&self.session_id) {
continue;
}
match event.method.as_str() {
"Page.frameNavigated" => {
if let Some(params) = &event.params {
if let Some(frame) = params.get("frame") {
if let Some(frame_id) = frame.get("id").and_then(|v| v.as_str()) {
let parent_id = frame.get("parentId").and_then(|v| v.as_str());
if frame_id == self.frame_id || parent_id.is_none() {
debug!(frame_id = %frame_id, "Frame navigation detected");
*self.state.lock().await = NavigationState::Navigating;
return true;
}
}
}
}
}
"Page.navigatedWithinDocument" => {
if let Some(params) = &event.params {
if let Some(frame_id) = params.get("frameId").and_then(|v| v.as_str()) {
if frame_id == self.frame_id {
debug!(
frame_id = %frame_id,
"Within-document navigation detected"
);
*self.state.lock().await = NavigationState::Complete;
return true;
}
}
}
}
_ => {}
}
}
}
async fn wait_for_load_complete(&mut self) -> Result<(), WaitError> {
if *self.state.lock().await == NavigationState::Complete {
return Ok(());
}
let mut load_waiter = LoadStateWaiter::new(
self.event_rx.resubscribe(),
self.session_id.clone(),
self.frame_id.clone(),
);
load_waiter.set_commit_received().await;
load_waiter
.wait_for_load_state_with_timeout(DocumentLoadState::Load, self.navigation_timeout)
.await?;
*self.state.lock().await = NavigationState::Complete;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_navigation_detection_window_is_reasonable() {
assert_eq!(NAVIGATION_DETECTION_WINDOW, Duration::from_millis(50));
}
#[test]
fn test_default_timeout() {
assert_eq!(DEFAULT_NAVIGATION_TIMEOUT, Duration::from_secs(30));
}
}