elizaos-plugin-browser 2.0.0

Browser automation plugin for elizaOS - enables AI agents to browse websites, interact with elements, and extract data
Documentation
use crate::types::{NavigationResult, WebSocketMessage, WebSocketResponse};
use futures_util::{SinkExt, StreamExt};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use tokio_tungstenite::{connect_async, tungstenite::Message};
use tracing::{debug, error, info};
use uuid::Uuid;

type ResponseSender = mpsc::Sender<WebSocketResponse>;
type ResponseMap = Arc<RwLock<HashMap<String, ResponseSender>>>;

pub struct BrowserWebSocketClient {
    server_url: String,
    connected: Arc<RwLock<bool>>,
    pending_requests: ResponseMap,
    sender: Arc<RwLock<Option<mpsc::Sender<Message>>>>,
}

impl BrowserWebSocketClient {
    pub fn new(server_url: &str) -> Self {
        Self {
            server_url: server_url.to_string(),
            connected: Arc::new(RwLock::new(false)),
            pending_requests: Arc::new(RwLock::new(HashMap::new())),
            sender: Arc::new(RwLock::new(None)),
        }
    }

    pub async fn connect(&self) -> Result<(), String> {
        let url = url::Url::parse(&self.server_url).map_err(|e| format!("Invalid URL: {}", e))?;

        let (ws_stream, _) = connect_async(url)
            .await
            .map_err(|e| format!("Connection failed: {}", e))?;

        let (mut write, mut read) = ws_stream.split();

        let (tx, mut rx) = mpsc::channel::<Message>(100);
        *self.sender.write().await = Some(tx);
        *self.connected.write().await = true;

        info!("[Browser] Connected to server at {}", self.server_url);

        let pending = Arc::clone(&self.pending_requests);
        let connected = Arc::clone(&self.connected);

        tokio::spawn(async move {
            while let Some(msg) = read.next().await {
                match msg {
                    Ok(Message::Text(text)) => {
                        if let Ok(response) = serde_json::from_str::<WebSocketResponse>(&text) {
                            let requests = pending.read().await;
                            if let Some(sender) = requests.get(&response.request_id) {
                                let _ = sender.send(response).await;
                            }
                        }
                    }
                    Ok(Message::Close(_)) => {
                        info!("[Browser] Connection closed");
                        *connected.write().await = false;
                        break;
                    }
                    Err(e) => {
                        error!("[Browser] Error receiving message: {}", e);
                    }
                    _ => {}
                }
            }
        });

        tokio::spawn(async move {
            while let Some(msg) = rx.recv().await {
                if let Err(e) = write.send(msg).await {
                    error!("[Browser] Error sending message: {}", e);
                    break;
                }
            }
        });

        Ok(())
    }

    pub async fn send_message(
        &self,
        msg_type: &str,
        data: HashMap<String, serde_json::Value>,
    ) -> Result<WebSocketResponse, String> {
        if !*self.connected.read().await {
            return Err("Not connected to browser server".to_string());
        }

        let request_id = Uuid::new_v4().to_string();
        let mut message_data = data;

        let message = WebSocketMessage {
            msg_type: msg_type.to_string(),
            request_id: request_id.clone(),
            session_id: message_data
                .remove("sessionId")
                .and_then(|v| v.as_str().map(|s| s.to_string())),
            data: if message_data.is_empty() {
                None
            } else {
                Some(message_data)
            },
        };

        let json = serde_json::to_string(&message)
            .map_err(|e| format!("Failed to serialize message: {}", e))?;

        let (tx, mut rx) = mpsc::channel::<WebSocketResponse>(1);
        self.pending_requests
            .write()
            .await
            .insert(request_id.clone(), tx);

        if let Some(sender) = self.sender.read().await.as_ref() {
            sender
                .send(Message::Text(json))
                .await
                .map_err(|e| format!("Failed to send message: {}", e))?;
        }

        debug!("[Browser] Sent message: {} ({})", msg_type, request_id);

        let response = tokio::time::timeout(std::time::Duration::from_secs(30), rx.recv())
            .await
            .map_err(|_| format!("Request timeout for {}", msg_type))?
            .ok_or_else(|| "No response received".to_string())?;

        self.pending_requests.write().await.remove(&request_id);

        if response.msg_type == "error" {
            return Err(response
                .error
                .unwrap_or_else(|| "Unknown error".to_string()));
        }

        Ok(response)
    }

    pub async fn disconnect(&self) {
        *self.connected.write().await = false;
        *self.sender.write().await = None;
        info!("[Browser] Client disconnected");
    }

    pub async fn is_connected(&self) -> bool {
        *self.connected.read().await
    }

    pub async fn navigate(&self, session_id: &str, url: &str) -> Result<NavigationResult, String> {
        let mut data = HashMap::new();
        data.insert("sessionId".to_string(), serde_json::json!(session_id));

        let mut inner_data = HashMap::new();
        inner_data.insert("url".to_string(), serde_json::json!(url));
        data.insert("data".to_string(), serde_json::json!(inner_data));

        let response = self.send_message("navigate", data).await?;

        let resp_data = response.data.unwrap_or_default();
        Ok(NavigationResult {
            success: response.success,
            url: resp_data
                .get("url")
                .and_then(|v| v.as_str())
                .unwrap_or(url)
                .to_string(),
            title: resp_data
                .get("title")
                .and_then(|v| v.as_str())
                .unwrap_or("")
                .to_string(),
            error: response.error,
        })
    }

    pub async fn get_state(
        &self,
        session_id: &str,
    ) -> Result<HashMap<String, serde_json::Value>, String> {
        let mut data = HashMap::new();
        data.insert("sessionId".to_string(), serde_json::json!(session_id));

        let response = self.send_message("getState", data).await?;
        Ok(response.data.unwrap_or_default())
    }

    pub async fn go_back(&self, session_id: &str) -> Result<NavigationResult, String> {
        let mut data = HashMap::new();
        data.insert("sessionId".to_string(), serde_json::json!(session_id));

        let response = self.send_message("goBack", data).await?;
        let resp_data = response.data.unwrap_or_default();

        Ok(NavigationResult {
            success: response.success,
            url: resp_data
                .get("url")
                .and_then(|v| v.as_str())
                .unwrap_or("")
                .to_string(),
            title: resp_data
                .get("title")
                .and_then(|v| v.as_str())
                .unwrap_or("")
                .to_string(),
            error: response.error,
        })
    }

    pub async fn go_forward(&self, session_id: &str) -> Result<NavigationResult, String> {
        let mut data = HashMap::new();
        data.insert("sessionId".to_string(), serde_json::json!(session_id));

        let response = self.send_message("goForward", data).await?;
        let resp_data = response.data.unwrap_or_default();

        Ok(NavigationResult {
            success: response.success,
            url: resp_data
                .get("url")
                .and_then(|v| v.as_str())
                .unwrap_or("")
                .to_string(),
            title: resp_data
                .get("title")
                .and_then(|v| v.as_str())
                .unwrap_or("")
                .to_string(),
            error: response.error,
        })
    }

    pub async fn refresh(&self, session_id: &str) -> Result<NavigationResult, String> {
        let mut data = HashMap::new();
        data.insert("sessionId".to_string(), serde_json::json!(session_id));

        let response = self.send_message("refresh", data).await?;
        let resp_data = response.data.unwrap_or_default();

        Ok(NavigationResult {
            success: response.success,
            url: resp_data
                .get("url")
                .and_then(|v| v.as_str())
                .unwrap_or("")
                .to_string(),
            title: resp_data
                .get("title")
                .and_then(|v| v.as_str())
                .unwrap_or("")
                .to_string(),
            error: response.error,
        })
    }

    pub async fn click(
        &self,
        session_id: &str,
        description: &str,
    ) -> Result<WebSocketResponse, String> {
        let mut data = HashMap::new();
        data.insert("sessionId".to_string(), serde_json::json!(session_id));

        let mut inner_data = HashMap::new();
        inner_data.insert("description".to_string(), serde_json::json!(description));
        data.insert("data".to_string(), serde_json::json!(inner_data));

        self.send_message("click", data).await
    }

    pub async fn type_text(
        &self,
        session_id: &str,
        text: &str,
        field: &str,
    ) -> Result<WebSocketResponse, String> {
        let mut data = HashMap::new();
        data.insert("sessionId".to_string(), serde_json::json!(session_id));

        let mut inner_data = HashMap::new();
        inner_data.insert("text".to_string(), serde_json::json!(text));
        inner_data.insert("field".to_string(), serde_json::json!(field));
        data.insert("data".to_string(), serde_json::json!(inner_data));

        self.send_message("type", data).await
    }

    pub async fn select(
        &self,
        session_id: &str,
        option: &str,
        dropdown: &str,
    ) -> Result<WebSocketResponse, String> {
        let mut data = HashMap::new();
        data.insert("sessionId".to_string(), serde_json::json!(session_id));

        let mut inner_data = HashMap::new();
        inner_data.insert("option".to_string(), serde_json::json!(option));
        inner_data.insert("dropdown".to_string(), serde_json::json!(dropdown));
        data.insert("data".to_string(), serde_json::json!(inner_data));

        self.send_message("select", data).await
    }

    pub async fn extract(
        &self,
        session_id: &str,
        instruction: &str,
    ) -> Result<WebSocketResponse, String> {
        let mut data = HashMap::new();
        data.insert("sessionId".to_string(), serde_json::json!(session_id));

        let mut inner_data = HashMap::new();
        inner_data.insert("instruction".to_string(), serde_json::json!(instruction));
        data.insert("data".to_string(), serde_json::json!(inner_data));

        self.send_message("extract", data).await
    }

    pub async fn screenshot(&self, session_id: &str) -> Result<WebSocketResponse, String> {
        let mut data = HashMap::new();
        data.insert("sessionId".to_string(), serde_json::json!(session_id));

        self.send_message("screenshot", data).await
    }

    pub async fn solve_captcha(&self, session_id: &str) -> Result<WebSocketResponse, String> {
        let mut data = HashMap::new();
        data.insert("sessionId".to_string(), serde_json::json!(session_id));

        self.send_message("solveCaptcha", data).await
    }

    pub async fn health(&self) -> Result<bool, String> {
        let response = self.send_message("health", HashMap::new()).await?;

        let is_healthy = response.msg_type == "health"
            && response
                .data
                .as_ref()
                .and_then(|d| d.get("status"))
                .and_then(|v| v.as_str())
                == Some("ok");

        Ok(is_healthy)
    }
}