use super::config::BrowseConfig;
fn extract_event_tab_id(event: &oxibrowser_core::BrowserEvent) -> uuid::Uuid {
match event {
oxibrowser_core::BrowserEvent::NavigationStarted { tab_id, .. }
| oxibrowser_core::BrowserEvent::WaitingForSelector { tab_id, .. }
| oxibrowser_core::BrowserEvent::DocumentReady { tab_id, .. }
| oxibrowser_core::BrowserEvent::ScreenshotCaptured { tab_id, .. } => *tab_id,
_ => uuid::Uuid::nil(),
}
}
fn browse_progress_from_event(event: &oxibrowser_core::BrowserEvent) -> Option<BrowseProgress> {
use oxibrowser_core::BrowserEvent::*;
match event {
NavigationStarted { url, .. } => {
Some(BrowseProgress::NavigationStarted { url: url.clone() })
}
WaitingForSelector {
selector,
timeout_ms,
..
} => Some(BrowseProgress::WaitingForSelector {
selector: selector.clone(),
timeout_ms: *timeout_ms,
}),
DocumentReady {
final_url,
title,
status,
total_bytes,
total_duration,
..
} => Some(BrowseProgress::DocumentReady {
url: final_url.clone(),
title: title.clone(),
status: *status,
bytes: *total_bytes,
duration_ms: total_duration.as_millis() as u64,
}),
ScreenshotCaptured {
bytes,
viewport_width,
duration,
..
} => Some(BrowseProgress::ScreenshotCaptured {
bytes: *bytes,
width: *viewport_width,
duration_ms: duration.as_millis() as u64,
}),
_ => None,
}
}
use super::engine::{
BrowseProgress, BrowserError, BrowserTab as BrowserTabTrait, PageContent, TabCallbackRegistry,
};
use async_trait::async_trait;
use serde_json::Value;
use std::sync::Arc;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
pub struct OxiBrowserEngine {
browser: oxibrowser_core::Browser,
config: BrowseConfig,
progress: Arc<TabCallbackRegistry>,
event_task: Mutex<Option<JoinHandle<()>>>,
}
impl OxiBrowserEngine {
pub async fn new() -> Result<Self, BrowserError> {
Self::with_config(BrowseConfig::default()).await
}
pub async fn with_config(config: BrowseConfig) -> Result<Self, BrowserError> {
let mut browser_config = oxibrowser_core::BrowserConfig::headless();
if let Some(ref ua) = config.user_agent {
browser_config.user_agent = ua.clone();
}
browser_config.obey_robots = config.obey_robots;
browser_config.js_timeout_ms = config.js_timeout_ms;
let browser = oxibrowser_core::Browser::new(browser_config)
.await
.map_err(|e| BrowserError::Backend(format!("Failed to create browser: {}", e)))?;
let progress = Arc::new(TabCallbackRegistry::new());
let mut events_rx = browser.subscribe_events();
let progress_clone = Arc::clone(&progress);
let event_task = tokio::spawn(async move {
loop {
match events_rx.recv().await {
Ok(event) => {
let tab_id = extract_event_tab_id(&event);
if let Some(bp) = browse_progress_from_event(&event) {
progress_clone.invoke_browse(&tab_id, bp);
}
progress_clone.invoke(&tab_id, event.short_label());
}
Err(RecvError::Lagged(skipped)) => {
tracing::debug!(
skipped = skipped,
"oxibrowser event subscriber lagged; some events were dropped"
);
}
Err(RecvError::Closed) => {
break;
}
}
}
});
Ok(Self {
browser,
config,
progress,
event_task: Mutex::new(Some(event_task)),
})
}
}
impl Default for OxiBrowserEngine {
fn default() -> Self {
let rt = tokio::runtime::Runtime::new().expect("failed to create tokio runtime");
rt.block_on(Self::new())
.expect("Failed to create default OxiBrowserEngine")
}
}
#[async_trait]
impl super::engine::BrowserEngine for OxiBrowserEngine {
async fn new_tab(&self) -> Result<Box<dyn BrowserTabTrait>, BrowserError> {
let tab = self
.browser
.new_tab()
.await
.map_err(|e| BrowserError::Backend(format!("Failed to create tab: {}", e)))?;
let tab_id = tab.tab_id();
Ok(Box::new(OxiTab {
inner: tab,
config: self.config.clone(),
tab_id,
registry: Arc::clone(&self.progress),
}))
}
async fn close(&self) -> Result<(), BrowserError> {
self.browser
.close()
.await
.map_err(|e| BrowserError::Backend(format!("Browser close failed: {}", e)))?;
if let Some(handle) = self.event_task.lock().await.take() {
handle.abort();
let _ = handle.await; }
Ok(())
}
async fn is_alive(&self) -> bool {
self.browser.is_open()
}
fn callback_registry(&self) -> Arc<TabCallbackRegistry> {
Arc::clone(&self.progress)
}
}
#[allow(dead_code)] pub struct OxiTab {
inner: oxibrowser_core::Tab,
config: BrowseConfig,
tab_id: uuid::Uuid,
registry: Arc<TabCallbackRegistry>,
}
impl OxiTab {
pub fn set_progress_callback(&self, cb: crate::tools::ProgressCallback) {
self.registry.set(self.tab_id, cb);
}
pub fn clear_progress_callback(&self) {
self.registry.clear(&self.tab_id);
}
pub fn set_browse_progress_callback_impl(&self, cb: super::engine::BrowseProgressCallback) {
self.registry.set_browse(self.tab_id, cb);
}
pub fn tab_id(&self) -> uuid::Uuid {
self.tab_id
}
}
#[async_trait]
impl BrowserTabTrait for OxiTab {
async fn goto(&self, url: &str) -> Result<PageContent, BrowserError> {
let page = self
.inner
.goto(url)
.await
.map_err(|e| BrowserError::Navigation(e.to_string()))?;
Ok(browse_result_to_page_content(page))
}
async fn click(&self, selector: &str) -> Result<(), BrowserError> {
self.inner
.click(selector)
.await
.map_err(|e| BrowserError::ElementNotFound(e.to_string()))
}
async fn type_(&self, selector: &str, text: &str) -> Result<(), BrowserError> {
self.inner
.r#type(selector, text)
.await
.map_err(|e| BrowserError::ElementNotFound(e.to_string()))
}
async fn fill(&self, selector: &str, value: &str) -> Result<(), BrowserError> {
self.inner
.fill(selector, value)
.await
.map_err(|e| BrowserError::ElementNotFound(e.to_string()))
}
async fn press(&self, combo: &str) -> Result<(), BrowserError> {
self.inner
.press(combo)
.await
.map_err(|e| BrowserError::Evaluation(e.to_string()))
}
async fn wait_for(&self, selector: &str, timeout_ms: u64) -> Result<(), BrowserError> {
self.inner
.wait_for(selector, timeout_ms)
.await
.map_err(|e| BrowserError::Timeout(e.to_string()))
}
async fn content(&self) -> Result<PageContent, BrowserError> {
let page = self
.inner
.content()
.await
.map_err(|e| BrowserError::Backend(e.to_string()))?;
Ok(browse_result_to_page_content(page))
}
async fn query_all(&self, selector: &str) -> Result<Vec<String>, BrowserError> {
self.inner
.query_all(selector)
.await
.map_err(|e| BrowserError::ElementNotFound(e.to_string()))
}
async fn evaluate(&self, js: &str) -> Result<Value, BrowserError> {
self.inner
.evaluate(js)
.await
.map_err(|e| BrowserError::Evaluation(e.to_string()))
}
async fn screenshot(&self, width: u32) -> Result<Vec<u8>, BrowserError> {
self.inner
.screenshot(width)
.await
.map_err(|e| BrowserError::Screenshot(e.to_string()))
}
async fn close(&self) -> Result<(), BrowserError> {
self.inner
.close()
.await
.map_err(|e| BrowserError::TabClosed(e.to_string()))
}
async fn back(&self) -> Result<PageContent, BrowserError> {
let page = self
.inner
.back()
.await
.map_err(|e| BrowserError::Navigation(e.to_string()))?;
Ok(browse_result_to_page_content(page))
}
async fn forward(&self) -> Result<PageContent, BrowserError> {
let page = self
.inner
.forward()
.await
.map_err(|e| BrowserError::Navigation(e.to_string()))?;
Ok(browse_result_to_page_content(page))
}
async fn reload(&self) -> Result<PageContent, BrowserError> {
let page = self
.inner
.reload()
.await
.map_err(|e| BrowserError::Navigation(e.to_string()))?;
Ok(browse_result_to_page_content(page))
}
async fn select_option(&self, selector: &str, value: &str) -> Result<(), BrowserError> {
self.inner
.select_option(selector, value)
.await
.map_err(|e| BrowserError::ElementNotFound(e.to_string()))
}
async fn check(&self, selector: &str) -> Result<(), BrowserError> {
self.inner
.check(selector)
.await
.map_err(|e| BrowserError::ElementNotFound(e.to_string()))
}
async fn uncheck(&self, selector: &str) -> Result<(), BrowserError> {
self.inner
.uncheck(selector)
.await
.map_err(|e| BrowserError::ElementNotFound(e.to_string()))
}
async fn clear(&self, selector: &str) -> Result<(), BrowserError> {
self.inner
.clear_input(selector)
.await
.map_err(|e| BrowserError::ElementNotFound(e.to_string()))
}
async fn hover(&self, selector: &str) -> Result<(), BrowserError> {
self.inner
.hover(selector)
.await
.map_err(|e| BrowserError::ElementNotFound(e.to_string()))
}
async fn double_click(&self, selector: &str) -> Result<(), BrowserError> {
self.inner
.double_click(selector)
.await
.map_err(|e| BrowserError::ElementNotFound(e.to_string()))
}
async fn right_click(&self, selector: &str) -> Result<(), BrowserError> {
self.inner
.right_click(selector)
.await
.map_err(|e| BrowserError::ElementNotFound(e.to_string()))
}
async fn scroll(&self, delta_x: f64, delta_y: f64) -> Result<(), BrowserError> {
self.inner
.scroll(delta_x, delta_y)
.await
.map_err(|e| BrowserError::Evaluation(e.to_string()))
}
async fn scroll_into_view(&self, selector: &str) -> Result<(), BrowserError> {
self.inner
.scroll_into_view(selector, true)
.await
.map_err(|e| BrowserError::ElementNotFound(e.to_string()))
}
async fn drag(&self, from_selector: &str, to_selector: &str) -> Result<(), BrowserError> {
self.inner
.drag(from_selector, to_selector)
.await
.map_err(|e| BrowserError::ElementNotFound(e.to_string()))
}
async fn upload_file(&self, selector: &str, path: &str) -> Result<(), BrowserError> {
self.inner
.upload_file(selector, path)
.await
.map_err(|e| BrowserError::ElementNotFound(e.to_string()))
}
async fn get_value(&self, selector: &str) -> Result<String, BrowserError> {
self.inner
.get_value(selector)
.await
.map_err(|e| BrowserError::ElementNotFound(e.to_string()))
}
async fn evaluate_await(&self, js: &str) -> Result<Value, BrowserError> {
self.inner
.evaluate_await(js)
.await
.map_err(|e| BrowserError::Evaluation(e.to_string()))
}
fn is_closed(&self) -> bool {
self.inner.is_closed()
}
fn tab_id(&self) -> uuid::Uuid {
self.tab_id
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn clear_progress_callback(&self) {
self.registry.clear(&self.tab_id);
}
fn set_browse_progress_callback(&self, cb: super::engine::BrowseProgressCallback) {
self.set_browse_progress_callback_impl(cb);
}
}
fn browse_result_to_page_content(page: oxibrowser_core::BrowseResult) -> PageContent {
PageContent {
url: page.url.clone(),
title: page.title.clone(),
status: page.status,
markdown: page.markdown.clone(),
html: page.html.clone(),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tools::browse::engine::BrowserEngine;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex as StdMutex;
use std::time::Duration;
#[tokio::test]
async fn engine_forwards_browser_events_to_progress_callback() {
let engine = OxiBrowserEngine::new().await.unwrap();
let registry = engine.callback_registry();
let received: Arc<StdMutex<Vec<String>>> = Arc::new(StdMutex::new(Vec::new()));
let received_clone = Arc::clone(&received);
let tab = engine.new_tab().await.unwrap();
let tab_id = tab
.as_any()
.downcast_ref::<OxiTab>()
.map(|t| t.tab_id())
.unwrap_or_default();
registry.set(
tab_id,
oxi_ai::progress_callback(move |msg: String| {
received_clone.lock().unwrap().push(msg);
}),
);
let _ = tab
.goto("data:text/html,<title>Hi</title><p>Hello</p>")
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let got = received.lock().unwrap().clone();
assert!(
got.iter().any(|s| s.starts_with("Opening")),
"expected 'Opening …' event, got {got:?}"
);
assert!(
got.iter().any(|s| s.contains("Loaded")),
"expected 'Loaded …' event, got {got:?}"
);
let _ = tab.close().await;
let _ = engine.close().await;
}
#[tokio::test]
async fn engine_replaces_progress_callback_cleanly() {
let engine = OxiBrowserEngine::new().await.unwrap();
let registry = engine.callback_registry();
let count_a = Arc::new(AtomicUsize::new(0));
let count_b = Arc::new(AtomicUsize::new(0));
let tab = engine.new_tab().await.unwrap();
let tab_id = tab
.as_any()
.downcast_ref::<OxiTab>()
.map(|t| t.tab_id())
.unwrap_or_default();
let ca = Arc::clone(&count_a);
registry.set(
tab_id,
oxi_ai::progress_callback(move |_| {
ca.fetch_add(1, Ordering::SeqCst);
}),
);
let _ = tab.goto("data:text/html,<title>A</title>").await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let a_after_first = count_a.load(Ordering::SeqCst);
assert!(a_after_first > 0, "callback A should have fired");
let cb_clone = Arc::clone(&count_b);
registry.set(
tab_id,
oxi_ai::progress_callback(move |_| {
cb_clone.fetch_add(1, Ordering::SeqCst);
}),
);
let _ = tab.goto("data:text/html,<title>B</title>").await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let a_final = count_a.load(Ordering::SeqCst);
let b_final = count_b.load(Ordering::SeqCst);
assert_eq!(
a_final, a_after_first,
"callback A should not fire after being replaced"
);
assert!(b_final > 0, "callback B should have fired");
let _ = tab.close().await;
let _ = engine.close().await;
}
#[tokio::test]
async fn engine_forwards_browse_progress_to_callback() {
use crate::tools::browse::BrowseProgress;
let engine = OxiBrowserEngine::new().await.unwrap();
let registry = engine.callback_registry();
let received: Arc<StdMutex<Vec<BrowseProgress>>> = Arc::new(StdMutex::new(Vec::new()));
let received_clone = Arc::clone(&received);
let tab = engine.new_tab().await.unwrap();
let tab_id = tab.tab_id();
registry.set_browse(
tab_id,
Arc::new(move |bp: BrowseProgress| {
received_clone.lock().unwrap().push(bp);
}),
);
let _ = tab
.goto("data:text/html,<title>Hi</title><p>Hello</p>")
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let events = received.lock().unwrap().clone();
assert!(
events
.iter()
.any(|bp| matches!(bp, BrowseProgress::DocumentReady { status: 200, .. })),
"expected DocumentReady with status 200, got {events:?}"
);
let doc_ready = events.iter().find_map(|bp| match bp {
BrowseProgress::DocumentReady {
title,
bytes,
duration_ms,
..
} => Some((title.clone(), *bytes, *duration_ms)),
_ => None,
});
let (title, bytes, duration_ms) = doc_ready.expect("DocumentReady present");
assert_eq!(title, "Hi");
assert!(
bytes > 0,
"bytes should be > 0 for non-empty page, got {bytes}"
);
assert!(
duration_ms < 30_000,
"duration_ms should be reasonable, got {duration_ms}"
);
let _ = tab.close().await;
let _ = engine.close().await;
}
#[tokio::test]
async fn engine_routes_browse_progress_by_tab_id() {
use crate::tools::browse::BrowseProgress;
let engine = OxiBrowserEngine::new().await.unwrap();
let registry = engine.callback_registry();
let received_a: Arc<StdMutex<Vec<BrowseProgress>>> = Arc::new(StdMutex::new(Vec::new()));
let received_b: Arc<StdMutex<Vec<BrowseProgress>>> = Arc::new(StdMutex::new(Vec::new()));
let ra = Arc::clone(&received_a);
let rb = Arc::clone(&received_b);
let tab_a = engine.new_tab().await.unwrap();
let tab_b = engine.new_tab().await.unwrap();
let tid_a = tab_a.tab_id();
let tid_b = tab_b.tab_id();
registry.set_browse(
tid_a,
Arc::new(move |bp: BrowseProgress| {
ra.lock().unwrap().push(bp);
}),
);
registry.set_browse(
tid_b,
Arc::new(move |bp: BrowseProgress| {
rb.lock().unwrap().push(bp);
}),
);
let _ = tab_a
.goto("data:text/html,<title>OnlyA</title>")
.await
.unwrap();
let _ = tab_b
.goto("data:text/html,<title>OnlyB</title>")
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let got_a = received_a.lock().unwrap().clone();
let got_b = received_b.lock().unwrap().clone();
let a_titles: Vec<&str> = got_a
.iter()
.filter_map(|bp| match bp {
BrowseProgress::DocumentReady { title, .. } => Some(title.as_str()),
_ => None,
})
.collect();
let b_titles: Vec<&str> = got_b
.iter()
.filter_map(|bp| match bp {
BrowseProgress::DocumentReady { title, .. } => Some(title.as_str()),
_ => None,
})
.collect();
assert!(
a_titles.contains(&"OnlyA"),
"A should have OnlyA, got {a_titles:?}"
);
assert!(!a_titles.contains(&"OnlyB"), "A should NOT have OnlyB");
assert!(
b_titles.contains(&"OnlyB"),
"B should have OnlyB, got {b_titles:?}"
);
assert!(!b_titles.contains(&"OnlyA"), "B should NOT have OnlyA");
let _ = tab_a.close().await;
let _ = tab_b.close().await;
let _ = engine.close().await;
}
#[tokio::test]
async fn engine_routes_events_by_tab_id_concurrent() {
let engine = OxiBrowserEngine::new().await.unwrap();
let registry = engine.callback_registry();
let received_a: Arc<StdMutex<Vec<String>>> = Arc::new(StdMutex::new(Vec::new()));
let received_b: Arc<StdMutex<Vec<String>>> = Arc::new(StdMutex::new(Vec::new()));
let received_a_clone = Arc::clone(&received_a);
let received_b_clone = Arc::clone(&received_b);
let tab_a = engine.new_tab().await.unwrap();
let tab_b = engine.new_tab().await.unwrap();
let tab_id_a = tab_a.tab_id();
let tab_id_b = tab_b.tab_id();
assert_ne!(tab_id_a, tab_id_b, "two tabs must have distinct IDs");
registry.set(
tab_id_a,
oxi_ai::progress_callback(move |msg: String| {
received_a_clone.lock().unwrap().push(msg);
}),
);
registry.set(
tab_id_b,
oxi_ai::progress_callback(move |msg: String| {
received_b_clone.lock().unwrap().push(msg);
}),
);
let _ = tab_a
.goto("data:text/html,<title>TabA</title>")
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let _ = tab_b
.goto("data:text/html,<title>TabB</title>")
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let got_a = received_a.lock().unwrap().clone();
let got_b = received_b.lock().unwrap().clone();
assert!(
got_a.iter().any(|s| s.contains("TabA")),
"tab A callback should have received TabA events, got {got_a:?}"
);
assert!(
got_b.iter().any(|s| s.contains("TabB")),
"tab B callback should have received TabB events, got {got_b:?}"
);
assert!(
!got_a.iter().any(|s| s.contains("TabB")),
"tab A callback should NOT have received TabB events, got {got_a:?}"
);
assert!(
!got_b.iter().any(|s| s.contains("TabA")),
"tab B callback should NOT have received TabA events, got {got_b:?}"
);
let _ = tab_a.close().await;
let _ = tab_b.close().await;
let _ = engine.close().await;
}
}