playwright_core/
connection.rs

1//! JSON-RPC Connection layer for Playwright protocol
2//!
3//! This module implements the request/response correlation layer on top of the transport.
4//! It handles:
5//! - Generating unique request IDs
6//! - Correlating responses with pending requests
7//! - Distinguishing events from responses
8//! - Dispatching events to protocol objects
9//!
10//! # Message Flow
11//!
12//! 1. Client calls `send_message()` with GUID, method, and params
13//! 2. Connection generates unique ID and creates oneshot channel
14//! 3. Request is serialized and sent via transport
15//! 4. Client awaits on the oneshot receiver
16//! 5. Message loop receives response from transport
17//! 6. Response is correlated by ID and sent via oneshot channel
18//! 7. Client receives result
19//!
20//! # Example
21//!
22//! ```ignore
23//! # use playwright_core::connection::Connection;
24//! # use playwright_core::transport::PipeTransport;
25//! # use serde_json::json;
26//! # #[tokio::main]
27//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
28//! // Create transport (after launching server)
29//! // let (transport, message_rx) = PipeTransport::new(stdin, stdout);
30//!
31//! // Create connection
32//! // let connection = Connection::new(transport, message_rx);
33//!
34//! // Spawn message loop in background
35//! // let conn = connection.clone();
36//! // tokio::spawn(async move {
37//! //     conn.run().await;
38//! // });
39//!
40//! // Send request and await response
41//! // let result = connection.send_message(
42//! //     "page@abc123",
43//! //     "goto",
44//! //     json!({"url": "https://example.com"})
45//! // ).await?;
46//! # Ok(())
47//! # }
48//! ```
49//!
50//! # References
51//!
52//! Based on research of official Playwright bindings:
53//! - Python: `playwright/_impl/_connection.py`
54//! - Java: `com/microsoft/playwright/impl/Connection.java`
55//! - .NET: `Microsoft.Playwright/Core/Connection.cs`
56
57use crate::error::{Error, Result};
58use crate::transport::PipeTransport;
59use parking_lot::Mutex as ParkingLotMutex;
60use serde::{Deserialize, Serialize};
61use serde_json::Value;
62use std::collections::HashMap;
63use std::sync::atomic::{AtomicU32, Ordering};
64use std::sync::Arc;
65use tokio::sync::Mutex as TokioMutex;
66use tokio::sync::{mpsc, oneshot};
67
68use std::future::Future;
69use std::pin::Pin;
70
71/// Trait defining the interface that ChannelOwner needs from a Connection
72///
73/// This trait allows ChannelOwner to work with Connection without needing to know
74/// the generic parameters W and R. The Connection struct implements this trait.
75pub trait ConnectionLike: Send + Sync {
76    /// Send a message to the Playwright server and await response
77    fn send_message(
78        &self,
79        guid: &str,
80        method: &str,
81        params: Value,
82    ) -> Pin<Box<dyn Future<Output = Result<Value>> + Send + '_>>;
83
84    /// Register an object in the connection's registry
85    fn register_object(
86        &self,
87        guid: Arc<str>,
88        object: Arc<dyn ChannelOwner>,
89    ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;
90
91    /// Unregister an object from the connection's registry
92    fn unregister_object(&self, guid: &str) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>;
93
94    /// Get an object by GUID
95    fn get_object(&self, guid: &str) -> AsyncChannelOwnerResult<'_>;
96}
97
98// Type alias for complex async return type
99type AsyncChannelOwnerResult<'a> =
100    Pin<Box<dyn Future<Output = Result<Arc<dyn ChannelOwner>>> + Send + 'a>>;
101
102// Forward declaration - will be used for object registry
103use crate::channel_owner::ChannelOwner;
104
105/// Metadata attached to every Playwright protocol message
106///
107/// Contains timing information and optional location data for debugging.
108#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct Metadata {
110    /// Unix timestamp in milliseconds
111    #[serde(rename = "wallTime")]
112    pub wall_time: i64,
113    /// Whether this is an internal call (not user-facing API)
114    #[serde(skip_serializing_if = "Option::is_none")]
115    pub internal: Option<bool>,
116    /// Source location where the API was called
117    #[serde(skip_serializing_if = "Option::is_none")]
118    pub location: Option<Location>,
119    /// Optional title for the operation
120    #[serde(skip_serializing_if = "Option::is_none")]
121    pub title: Option<String>,
122}
123
124/// Source code location for a protocol call
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct Location {
127    /// Source file path
128    pub file: String,
129    /// Line number (1-indexed)
130    #[serde(skip_serializing_if = "Option::is_none")]
131    pub line: Option<i32>,
132    /// Column number (1-indexed)
133    #[serde(skip_serializing_if = "Option::is_none")]
134    pub column: Option<i32>,
135}
136
137impl Metadata {
138    /// Create minimal metadata with current timestamp
139    pub fn now() -> Self {
140        Self {
141            wall_time: std::time::SystemTime::now()
142                .duration_since(std::time::UNIX_EPOCH)
143                .unwrap()
144                .as_millis() as i64,
145            internal: Some(false),
146            location: None,
147            title: None,
148        }
149    }
150}
151
152/// Protocol request message sent to Playwright server
153///
154/// Format matches Playwright's JSON-RPC protocol:
155/// ```json
156/// {
157///   "id": 42,
158///   "guid": "page@3ee5e10621a15eaf80cb985dbccb9a28",
159///   "method": "goto",
160///   "params": {
161///     "url": "https://example.com"
162///   },
163///   "metadata": {
164///     "wallTime": 1699876543210,
165///     "internal": false
166///   }
167/// }
168/// ```
169#[derive(Debug, Clone, Serialize, Deserialize)]
170pub struct Request {
171    /// Unique request ID for correlating responses
172    pub id: u32,
173    /// GUID of the target object (format: "type@hash")
174    #[serde(
175        serialize_with = "serialize_arc_str",
176        deserialize_with = "deserialize_arc_str"
177    )]
178    pub guid: Arc<str>,
179    /// Method name to invoke
180    pub method: String,
181    /// Method parameters as JSON object
182    pub params: Value,
183    /// Metadata with timing and location information
184    pub metadata: Metadata,
185}
186
187/// Serde helpers for `Arc<str>` serialization
188///
189/// These helpers allow `Arc<str>` to be serialized/deserialized as a regular string in JSON.
190/// This is used for GUID fields throughout the protocol layer for performance optimization.
191pub fn serialize_arc_str<S>(arc: &Arc<str>, serializer: S) -> std::result::Result<S::Ok, S::Error>
192where
193    S: serde::Serializer,
194{
195    serializer.serialize_str(arc)
196}
197
198pub fn deserialize_arc_str<'de, D>(deserializer: D) -> std::result::Result<Arc<str>, D::Error>
199where
200    D: serde::Deserializer<'de>,
201{
202    let s: String = serde::Deserialize::deserialize(deserializer)?;
203    Ok(Arc::from(s.as_str()))
204}
205
206/// Protocol response message from Playwright server
207///
208/// Format matches Playwright's JSON-RPC protocol:
209/// ```json
210/// {
211///   "id": 42,
212///   "result": { "response": { "guid": "response@..." } }
213/// }
214/// ```
215///
216/// Or with error:
217/// ```json
218/// {
219///   "id": 42,
220///   "error": {
221///     "error": {
222///       "message": "Navigation timeout",
223///       "name": "TimeoutError",
224///       "stack": "..."
225///     }
226///   }
227/// }
228/// ```
229#[derive(Debug, Clone, Serialize, Deserialize)]
230pub struct Response {
231    /// Request ID this response correlates to
232    pub id: u32,
233    /// Success result (mutually exclusive with error)
234    #[serde(skip_serializing_if = "Option::is_none")]
235    pub result: Option<Value>,
236    /// Error result (mutually exclusive with result)
237    #[serde(skip_serializing_if = "Option::is_none")]
238    pub error: Option<ErrorWrapper>,
239}
240
241/// Wrapper for protocol error payload
242#[derive(Debug, Clone, Serialize, Deserialize)]
243pub struct ErrorWrapper {
244    pub error: ErrorPayload,
245}
246
247/// Protocol error details
248#[derive(Debug, Clone, Serialize, Deserialize)]
249pub struct ErrorPayload {
250    /// Error message
251    pub message: String,
252    /// Error type name (e.g., "TimeoutError", "TargetClosedError")
253    #[serde(skip_serializing_if = "Option::is_none")]
254    pub name: Option<String>,
255    /// Stack trace
256    #[serde(skip_serializing_if = "Option::is_none")]
257    pub stack: Option<String>,
258}
259
260/// Protocol event message from Playwright server
261///
262/// Events are distinguished from responses by the absence of an `id` field:
263/// ```json
264/// {
265///   "guid": "page@3ee5e10621a15eaf80cb985dbccb9a28",
266///   "method": "console",
267///   "params": {
268///     "message": { "type": "log", "text": "Hello world" }
269///   }
270/// }
271/// ```
272#[derive(Debug, Clone, Serialize, Deserialize)]
273pub struct Event {
274    /// GUID of the object that emitted the event
275    #[serde(
276        serialize_with = "serialize_arc_str",
277        deserialize_with = "deserialize_arc_str"
278    )]
279    pub guid: Arc<str>,
280    /// Event method name
281    pub method: String,
282    /// Event parameters as JSON object
283    pub params: Value,
284}
285
286/// Discriminated union of protocol messages
287///
288/// Uses serde's `untagged` to distinguish based on presence of `id` field:
289/// - Messages with `id` are responses
290/// - Messages without `id` are events
291#[derive(Debug, Clone, Serialize, Deserialize)]
292#[serde(untagged)]
293pub enum Message {
294    /// Response message (has `id` field)
295    Response(Response),
296    /// Event message (no `id` field)
297    Event(Event),
298}
299
300/// Type alias for the object registry mapping GUIDs to ChannelOwner objects
301type ObjectRegistry = HashMap<Arc<str>, Arc<dyn ChannelOwner>>;
302
303/// JSON-RPC connection to Playwright server
304///
305/// Manages request/response correlation and event dispatch.
306/// Uses sequential request IDs and oneshot channels for correlation.
307///
308/// # Thread Safety
309///
310/// Connection is thread-safe and can be shared across async tasks using `Arc`.
311/// Multiple concurrent requests are supported.
312///
313/// # Architecture
314///
315/// This follows the pattern from official Playwright bindings:
316/// - Python: Direct callback on message receive
317/// - Java: Callback map with synchronized access
318/// - .NET: ConcurrentDictionary with TaskCompletionSource
319///
320/// Rust implementation uses:
321/// - `AtomicU32` for thread-safe ID generation
322/// - `Arc<Mutex<HashMap>>` for callback storage
323/// - `tokio::sync::oneshot` for request/response correlation
324pub struct Connection<W, R>
325where
326    W: tokio::io::AsyncWrite + Unpin + Send + Sync + 'static,
327    R: tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
328{
329    /// Sequential request ID counter (atomic for thread safety)
330    last_id: AtomicU32,
331    /// Pending request callbacks keyed by request ID
332    callbacks: Arc<TokioMutex<HashMap<u32, oneshot::Sender<Result<Value>>>>>,
333    /// Stdin for sending messages (mutex-wrapped for concurrent sends)
334    stdin: Arc<TokioMutex<W>>,
335    /// Receiver for incoming messages from transport
336    message_rx: Arc<TokioMutex<Option<mpsc::UnboundedReceiver<Value>>>>,
337    /// Receiver half of transport (owned by run loop, only needed once)
338    transport_receiver: Arc<TokioMutex<Option<crate::transport::PipeTransportReceiver<R>>>>,
339    /// Registry of all protocol objects by GUID (parking_lot for sync+async access)
340    objects: Arc<ParkingLotMutex<ObjectRegistry>>,
341}
342
343// Type alias for Connection using concrete transport (most common case)
344pub type RealConnection = Connection<tokio::process::ChildStdin, tokio::process::ChildStdout>;
345
346impl<W, R> Connection<W, R>
347where
348    W: tokio::io::AsyncWrite + Unpin + Send + Sync + 'static,
349    R: tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
350{
351    /// Create a new Connection with the given transport
352    ///
353    /// # Arguments
354    ///
355    /// * `transport` - Transport connected to Playwright server
356    /// * `message_rx` - Receiver for incoming messages from transport
357    ///
358    /// # Example
359    ///
360    /// ```ignore
361    /// # use playwright_core::connection::Connection;
362    /// # use playwright_core::transport::PipeTransport;
363    /// # use tokio::io::duplex;
364    /// # #[tokio::main]
365    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
366    /// let (stdin_read, stdin_write) = duplex(1024);
367    /// let (stdout_read, stdout_write) = duplex(1024);
368    ///
369    /// let (transport, message_rx) = PipeTransport::new(stdin_write, stdout_read);
370    /// let connection = Connection::new(transport, message_rx);
371    /// # Ok(())
372    /// # }
373    /// ```
374    pub fn new(transport: PipeTransport<W, R>, message_rx: mpsc::UnboundedReceiver<Value>) -> Self {
375        // Split transport into send and receive parts
376        // This prevents deadlock: stdin can be locked for sends while
377        // the transport receiver runs independently
378        let (stdin, transport_receiver) = transport.into_parts();
379
380        Self {
381            last_id: AtomicU32::new(0),
382            callbacks: Arc::new(TokioMutex::new(HashMap::new())),
383            stdin: Arc::new(TokioMutex::new(stdin)),
384            message_rx: Arc::new(TokioMutex::new(Some(message_rx))),
385            transport_receiver: Arc::new(TokioMutex::new(Some(transport_receiver))),
386            objects: Arc::new(ParkingLotMutex::new(HashMap::new())),
387        }
388    }
389
390    /// Send a message to the Playwright server and await response
391    ///
392    /// This method:
393    /// 1. Generates a unique request ID
394    /// 2. Creates a oneshot channel for the response
395    /// 3. Stores the channel sender in the callbacks map
396    /// 4. Serializes and sends the request via transport
397    /// 5. Awaits the response on the receiver
398    ///
399    /// # Arguments
400    ///
401    /// * `guid` - GUID of the target object (e.g., "page@abc123")
402    /// * `method` - Method name to invoke (e.g., "goto")
403    /// * `params` - Method parameters as JSON value
404    ///
405    /// # Returns
406    ///
407    /// The result value from the server, or an error if:
408    /// - Transport send fails
409    /// - Server returns an error
410    /// - Connection is closed before response arrives
411    ///
412    /// See module-level documentation for usage examples.
413    pub async fn send_message(&self, guid: &str, method: &str, params: Value) -> Result<Value> {
414        // Generate unique ID (atomic increment for thread safety)
415        let id = self.last_id.fetch_add(1, Ordering::SeqCst);
416
417        tracing::debug!(
418            "Sending message: id={}, guid='{}', method='{}'",
419            id,
420            guid,
421            method
422        );
423
424        // Create oneshot channel for response
425        let (tx, rx) = oneshot::channel();
426
427        // Store callback
428        self.callbacks.lock().await.insert(id, tx);
429
430        // Build request with metadata
431        let request = Request {
432            id,
433            guid: Arc::from(guid),
434            method: method.to_string(),
435            params,
436            metadata: Metadata::now(),
437        };
438
439        // Send via stdin using the helper function
440        let request_value = serde_json::to_value(&request)?;
441        tracing::debug!("Request JSON: {}", request_value);
442
443        match crate::transport::send_message(&mut *self.stdin.lock().await, request_value).await {
444            Ok(()) => tracing::debug!("Message sent successfully, awaiting response"),
445            Err(e) => {
446                tracing::error!("Failed to send message: {:?}", e);
447                return Err(e);
448            }
449        }
450
451        // Await response
452        tracing::debug!("Waiting for response to ID {}", id);
453        rx.await
454            .map_err(|_| Error::ChannelClosed)
455            .and_then(|result| result)
456    }
457
458    /// Initialize the Playwright connection and return the root Playwright object
459    ///
460    /// This method implements the initialization handshake with the Playwright server:
461    /// 1. Creates a temporary Root object
462    /// 2. Sends "initialize" message with sdkLanguage="rust"
463    /// 3. Server creates BrowserType objects (sends `__create__` messages)
464    /// 4. Server responds with Playwright GUID
465    /// 5. Looks up Playwright object from registry (guaranteed to exist)
466    ///
467    /// The `initialize` message is synchronous - by the time the response arrives,
468    /// all protocol objects have been created and registered.
469    ///
470    /// # Returns
471    ///
472    /// An `Arc<dyn ChannelOwner>` that is the Playwright object. Callers should downcast
473    /// to `Playwright` type.
474    ///
475    /// # Errors
476    ///
477    /// Returns error if:
478    /// - Initialize message fails to send
479    /// - Server returns protocol error
480    /// - Response is missing Playwright GUID
481    /// - Playwright object not found in registry
482    /// - Timeout after 30 seconds
483    ///
484    /// See module-level documentation for usage examples.
485    ///
486    /// See also:
487    /// - [ADR 0002: Initialization Flow](../../docs/adr/0002-initialization-flow.md)
488    /// - Python: <https://github.com/microsoft/playwright-python/blob/main/playwright/_impl/_connection.py>
489    pub async fn initialize_playwright(self: &Arc<Self>) -> Result<Arc<dyn ChannelOwner>> {
490        use crate::protocol::Root;
491        use std::time::Duration;
492
493        // Create temporary Root object for initialization
494        // Root has empty GUID ("") and acts as parent for top-level objects
495        let root = Arc::new(Root::new(Arc::clone(self) as Arc<dyn ConnectionLike>))
496            as Arc<dyn ChannelOwner>;
497
498        // CRITICAL: Register Root in objects map with empty GUID
499        // This allows __create__ messages to find Root as their parent
500        // Matches Python's behavior where RootChannelOwner auto-registers
501        self.objects.lock().insert(Arc::from(""), root.clone());
502
503        tracing::debug!("Root object registered, sending initialize message");
504
505        // Downcast to Root type to call initialize()
506        let root_typed = root
507            .as_any()
508            .downcast_ref::<Root>()
509            .expect("Root object should be Root type");
510
511        // Send initialize message (blocks until server responds)
512        // Add timeout to prevent hanging forever
513        let response = tokio::time::timeout(Duration::from_secs(30), root_typed.initialize())
514            .await
515            .map_err(|_| {
516                Error::Timeout("Playwright initialization timeout after 30 seconds".to_string())
517            })??;
518
519        // Extract Playwright GUID from response
520        // Response format: { "playwright": { "guid": "playwright" } }
521        let playwright_guid = response["playwright"]["guid"].as_str().ok_or_else(|| {
522            Error::ProtocolError("Initialize response missing 'playwright.guid' field".to_string())
523        })?;
524
525        tracing::debug!("Initialized Playwright with GUID: {}", playwright_guid);
526
527        // Get Playwright object from registry
528        // By this point, the server has sent all __create__ messages
529        // and the Playwright object is already registered
530        let playwright_obj = self.get_object(playwright_guid).await?;
531
532        // Verify it's actually a Playwright object
533        playwright_obj
534            .as_any()
535            .downcast_ref::<crate::protocol::Playwright>()
536            .ok_or_else(|| {
537                Error::ProtocolError(format!(
538                    "Object with GUID '{}' is not a Playwright instance",
539                    playwright_guid
540                ))
541            })?;
542
543        // Cleanup: Unregister Root after initialization
544        // Root is only needed during the initialization handshake
545        let empty_guid: Arc<str> = Arc::from("");
546        self.objects.lock().remove(&empty_guid);
547        tracing::debug!("Root object unregistered after successful initialization");
548
549        // Return the Arc<dyn ChannelOwner>
550        // The high-level API will handle downcasting
551        Ok(playwright_obj)
552    }
553
554    /// Run the message dispatch loop
555    ///
556    /// This method continuously reads messages from the transport and dispatches them:
557    /// - Responses (with `id`) are correlated with pending requests
558    /// - Events (without `id`) are dispatched to protocol objects (TODO: Future phase - event handling)
559    ///
560    /// The loop runs until the transport channel is closed.
561    ///
562    /// # Usage
563    ///
564    /// This method should be spawned in a background task:
565    ///
566    /// ```ignore
567    /// # use playwright_core::connection::Connection;
568    /// # use playwright_core::transport::PipeTransport;
569    /// # use std::sync::Arc;
570    /// # use tokio::io::duplex;
571    /// # #[tokio::main]
572    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
573    /// # let (stdin_read, stdin_write) = duplex(1024);
574    /// # let (stdout_read, stdout_write) = duplex(1024);
575    /// # let (transport, message_rx) = PipeTransport::new(stdin_write, stdout_read);
576    /// # let connection = Arc::new(Connection::new(transport, message_rx));
577    /// let conn = Arc::clone(&connection);
578    /// tokio::spawn(async move {
579    ///     conn.run().await;
580    /// });
581    /// # Ok(())
582    /// # }
583    /// ```
584    pub async fn run(self: &Arc<Self>) {
585        // Take the transport receiver (can only be called once)
586        let transport_receiver = self
587            .transport_receiver
588            .lock()
589            .await
590            .take()
591            .expect("run() can only be called once - transport receiver already taken");
592
593        // Spawn transport read loop WITHOUT any locks
594        // This prevents deadlock: the receiver owns stdout and runs independently
595        let transport_handle = tokio::spawn(async move {
596            if let Err(e) = transport_receiver.run().await {
597                tracing::error!("Transport error: {}", e);
598            }
599        });
600
601        // Take the message receiver out of the Option (can only be called once)
602        let mut message_rx = self
603            .message_rx
604            .lock()
605            .await
606            .take()
607            .expect("run() can only be called once - message receiver already taken");
608
609        while let Some(message_value) = message_rx.recv().await {
610            // Parse message as Response or Event
611            match serde_json::from_value::<Message>(message_value) {
612                Ok(message) => {
613                    if let Err(e) = self.dispatch_internal(message).await {
614                        tracing::error!("Error dispatching message: {}", e);
615                    }
616                }
617                Err(e) => {
618                    tracing::error!("Failed to parse message: {}", e);
619                }
620            }
621        }
622
623        tracing::debug!("Message loop ended (transport closed)");
624
625        // Wait for transport task to finish
626        let _ = transport_handle.await;
627    }
628
629    /// Dispatch an incoming message from the transport
630    ///
631    /// This method:
632    /// - Parses the message as Response or Event
633    /// - For responses: correlates by ID and completes the oneshot channel
634    /// - For events: dispatches to the appropriate object (TODO: Future phase - event handling)
635    ///
636    /// # Arguments
637    ///
638    /// * `message` - Parsed protocol message
639    ///
640    /// # Errors
641    ///
642    /// Returns error if:
643    /// - Response ID doesn't match any pending request
644    /// - Event GUID doesn't match any registered object
645    #[cfg(test)]
646    pub async fn dispatch(self: &Arc<Self>, message: Message) -> Result<()> {
647        self.dispatch_internal(message).await
648    }
649
650    async fn dispatch_internal(self: &Arc<Self>, message: Message) -> Result<()> {
651        tracing::debug!("Dispatching message: {:?}", message);
652        match message {
653            Message::Response(response) => {
654                tracing::debug!("Processing response for ID: {}", response.id);
655                // Correlate response with pending request
656                let callback = self
657                    .callbacks
658                    .lock()
659                    .await
660                    .remove(&response.id)
661                    .ok_or_else(|| {
662                        Error::ProtocolError(format!(
663                            "Cannot find request to respond: id={}",
664                            response.id
665                        ))
666                    })?;
667
668                // Convert protocol error to Rust error
669                let result = if let Some(error_wrapper) = response.error {
670                    Err(parse_protocol_error(error_wrapper.error))
671                } else {
672                    Ok(response.result.unwrap_or(Value::Null))
673                };
674
675                // Complete the oneshot channel (ignore if receiver was dropped)
676                let _ = callback.send(result);
677                Ok(())
678            }
679            Message::Event(event) => {
680                // Handle special protocol methods
681                match event.method.as_str() {
682                    "__create__" => self.handle_create(&event).await,
683                    "__dispose__" => self.handle_dispose(&event).await,
684                    "__adopt__" => self.handle_adopt(&event).await,
685                    _ => {
686                        // Regular event - dispatch to object
687                        match self.objects.lock().get(&event.guid).cloned() {
688                            Some(object) => {
689                                object.on_event(&event.method, event.params);
690                                Ok(())
691                            }
692                            None => {
693                                tracing::warn!(
694                                    "Event for unknown object: guid={}, method={}",
695                                    event.guid,
696                                    event.method
697                                );
698                                Ok(())
699                            }
700                        }
701                    }
702                }
703            }
704        }
705    }
706
707    /// Handle `__create__` protocol message
708    ///
709    /// Creates a new protocol object and registers it in the connection.
710    async fn handle_create(self: &Arc<Self>, event: &Event) -> Result<()> {
711        use crate::channel_owner::ParentOrConnection;
712        use crate::object_factory::create_object;
713
714        // Extract parameters from event
715        let type_name = event.params["type"]
716            .as_str()
717            .ok_or_else(|| Error::ProtocolError("__create__ missing 'type'".to_string()))?
718            .to_string();
719
720        let object_guid: Arc<str> = Arc::from(
721            event.params["guid"]
722                .as_str()
723                .ok_or_else(|| Error::ProtocolError("__create__ missing 'guid'".to_string()))?,
724        );
725
726        eprintln!(
727            "DEBUG __create__: type={}, guid={}, parent_guid={}",
728            type_name, object_guid, event.guid
729        );
730
731        let initializer = event.params["initializer"].clone();
732
733        // Determine parent
734        let parent_obj = self
735            .objects
736            .lock()
737            .get(&event.guid)
738            .cloned()
739            .ok_or_else(|| {
740                eprintln!(
741                    "DEBUG: Parent object not found for type={}, parent_guid={}",
742                    type_name, event.guid
743                );
744                Error::ProtocolError(format!("Parent object not found: {}", event.guid))
745            })?;
746
747        // Create object using factory
748        // Special case: Playwright object needs Connection, not Parent
749        let parent_or_conn = if type_name == "Playwright" && event.guid.is_empty() {
750            ParentOrConnection::Connection(Arc::clone(self) as Arc<dyn ConnectionLike>)
751        } else {
752            ParentOrConnection::Parent(parent_obj.clone())
753        };
754
755        let object = match create_object(
756            parent_or_conn,
757            type_name.clone(),
758            object_guid.clone(),
759            initializer,
760        )
761        .await
762        {
763            Ok(obj) => obj,
764            Err(e) => {
765                eprintln!(
766                    "DEBUG: Failed to create object type={}, guid={}, error={}",
767                    type_name, object_guid, e
768                );
769                return Err(e);
770            }
771        };
772
773        // Register in connection
774        self.register_object(Arc::clone(&object_guid), object.clone())
775            .await;
776
777        // Register in parent
778        parent_obj.add_child(Arc::clone(&object_guid), object);
779
780        eprintln!(
781            "DEBUG: Successfully created and registered object: type={}, guid={}",
782            type_name, object_guid
783        );
784        tracing::debug!("Created object: type={}, guid={}", type_name, object_guid);
785
786        Ok(())
787    }
788
789    /// Handle `__dispose__` protocol message
790    ///
791    /// Disposes an object and removes it from the registry.
792    async fn handle_dispose(&self, event: &Event) -> Result<()> {
793        use crate::channel_owner::DisposeReason;
794
795        let reason = match event.params.get("reason").and_then(|r| r.as_str()) {
796            Some("gc") => DisposeReason::GarbageCollected,
797            _ => DisposeReason::Closed,
798        };
799
800        // Get object from registry
801        let object = self.objects.lock().get(&event.guid).cloned();
802
803        if let Some(obj) = object {
804            // Dispose the object (this will remove from parent and unregister)
805            obj.dispose(reason);
806
807            tracing::debug!("Disposed object: guid={}", event.guid);
808        } else {
809            tracing::warn!("Dispose for unknown object: guid={}", event.guid);
810        }
811
812        Ok(())
813    }
814
815    /// Handle `__adopt__` protocol message
816    ///
817    /// Moves a child object from one parent to another.
818    async fn handle_adopt(&self, event: &Event) -> Result<()> {
819        let child_guid: Arc<str> = Arc::from(
820            event.params["guid"]
821                .as_str()
822                .ok_or_else(|| Error::ProtocolError("__adopt__ missing 'guid'".to_string()))?,
823        );
824
825        // Get new parent and child from registry
826        let new_parent = self.objects.lock().get(&event.guid).cloned();
827        let child = self.objects.lock().get(&child_guid).cloned();
828
829        match (new_parent, child) {
830            (Some(parent), Some(child_obj)) => {
831                parent.adopt(child_obj);
832                tracing::debug!(
833                    "Adopted object: child={}, new_parent={}",
834                    child_guid,
835                    event.guid
836                );
837                Ok(())
838            }
839            (None, _) => Err(Error::ProtocolError(format!(
840                "Parent object not found: {}",
841                event.guid
842            ))),
843            (_, None) => Err(Error::ProtocolError(format!(
844                "Child object not found: {}",
845                child_guid
846            ))),
847        }
848    }
849}
850
851/// Parse protocol error into Rust error type
852fn parse_protocol_error(error: ErrorPayload) -> Error {
853    match error.name.as_deref() {
854        Some("TimeoutError") => Error::Timeout(error.message),
855        Some("TargetClosedError") => Error::TargetClosed {
856            target_type: "target".to_string(),
857            context: error.message,
858        },
859        _ => Error::ProtocolError(error.message),
860    }
861}
862
863// Implement ConnectionLike trait for Connection
864impl<W, R> ConnectionLike for Connection<W, R>
865where
866    W: tokio::io::AsyncWrite + Unpin + Send + Sync + 'static,
867    R: tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
868{
869    fn send_message(
870        &self,
871        guid: &str,
872        method: &str,
873        params: Value,
874    ) -> Pin<Box<dyn Future<Output = Result<Value>> + Send + '_>> {
875        // Convert to owned strings to avoid lifetime issues
876        let guid = guid.to_string();
877        let method = method.to_string();
878
879        // Box the future returned by the async method
880        Box::pin(async move { Connection::send_message(self, &guid, &method, params).await })
881    }
882
883    fn register_object(
884        &self,
885        guid: Arc<str>,
886        object: Arc<dyn ChannelOwner>,
887    ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
888        Box::pin(async move {
889            self.objects.lock().insert(guid, object);
890        })
891    }
892
893    fn unregister_object(&self, guid: &str) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
894        let guid_arc: Arc<str> = Arc::from(guid);
895        Box::pin(async move {
896            self.objects.lock().remove(&guid_arc);
897        })
898    }
899
900    fn get_object(&self, guid: &str) -> AsyncChannelOwnerResult<'_> {
901        let guid_arc: Arc<str> = Arc::from(guid);
902        Box::pin(async move {
903            self.objects.lock().get(&guid_arc).cloned().ok_or_else(|| {
904                // Determine target type from GUID prefix
905                let target_type = if guid_arc.starts_with("page@") {
906                    "Page"
907                } else if guid_arc.starts_with("frame@") {
908                    "Frame"
909                } else if guid_arc.starts_with("browser-context@") {
910                    "BrowserContext"
911                } else if guid_arc.starts_with("browser@") {
912                    "Browser"
913                } else {
914                    return Error::ProtocolError(format!("Object not found: {}", guid_arc));
915                };
916
917                Error::TargetClosed {
918                    target_type: target_type.to_string(),
919                    context: format!("Object not found: {}", guid_arc),
920                }
921            })
922        })
923    }
924}
925
926#[cfg(test)]
927mod tests {
928    use super::*;
929    use tokio::io::duplex;
930
931    // Helper to create test connection with mock transport
932    fn create_test_connection() -> (
933        Connection<tokio::io::DuplexStream, tokio::io::DuplexStream>,
934        tokio::io::DuplexStream,
935        tokio::io::DuplexStream,
936    ) {
937        let (stdin_read, stdin_write) = duplex(1024);
938        let (stdout_read, stdout_write) = duplex(1024);
939
940        let (transport, message_rx) = PipeTransport::new(stdin_write, stdout_read);
941        let connection = Connection::new(transport, message_rx);
942
943        (connection, stdin_read, stdout_write)
944    }
945
946    #[test]
947    fn test_request_id_increments() {
948        let (connection, _, _) = create_test_connection();
949
950        // Generate IDs by incrementing the counter directly
951        let id1 = connection.last_id.fetch_add(1, Ordering::SeqCst);
952        let id2 = connection.last_id.fetch_add(1, Ordering::SeqCst);
953        let id3 = connection.last_id.fetch_add(1, Ordering::SeqCst);
954
955        assert_eq!(id1, 0);
956        assert_eq!(id2, 1);
957        assert_eq!(id3, 2);
958    }
959
960    #[test]
961    fn test_request_format() {
962        let request = Request {
963            id: 0,
964            guid: Arc::from("page@abc123"),
965            method: "goto".to_string(),
966            params: serde_json::json!({"url": "https://example.com"}),
967            metadata: Metadata::now(),
968        };
969
970        assert_eq!(request.id, 0);
971        assert_eq!(request.guid.as_ref(), "page@abc123");
972        assert_eq!(request.method, "goto");
973        assert_eq!(request.params["url"], "https://example.com");
974    }
975
976    #[tokio::test]
977    async fn test_dispatch_response_success() {
978        let (connection, _, _) = create_test_connection();
979
980        // Generate ID
981        let id = connection.last_id.fetch_add(1, Ordering::SeqCst);
982
983        // Create oneshot channel and store callback
984        let (tx, rx) = oneshot::channel();
985        connection.callbacks.lock().await.insert(id, tx);
986
987        // Simulate response from server
988        let response = Message::Response(Response {
989            id,
990            result: Some(serde_json::json!({"status": "ok"})),
991            error: None,
992        });
993
994        // Dispatch response
995        Arc::new(connection).dispatch(response).await.unwrap();
996
997        // Verify result
998        let result = rx.await.unwrap().unwrap();
999        assert_eq!(result["status"], "ok");
1000    }
1001
1002    #[tokio::test]
1003    async fn test_dispatch_response_error() {
1004        let (connection, _, _) = create_test_connection();
1005
1006        // Generate ID
1007        let id = connection.last_id.fetch_add(1, Ordering::SeqCst);
1008
1009        // Create oneshot channel and store callback
1010        let (tx, rx) = oneshot::channel();
1011        connection.callbacks.lock().await.insert(id, tx);
1012
1013        // Simulate error response from server
1014        let response = Message::Response(Response {
1015            id,
1016            result: None,
1017            error: Some(ErrorWrapper {
1018                error: ErrorPayload {
1019                    message: "Navigation timeout".to_string(),
1020                    name: Some("TimeoutError".to_string()),
1021                    stack: None,
1022                },
1023            }),
1024        });
1025
1026        // Dispatch response
1027        Arc::new(connection).dispatch(response).await.unwrap();
1028
1029        // Verify error
1030        let result = rx.await.unwrap();
1031        assert!(result.is_err());
1032        match result.unwrap_err() {
1033            Error::Timeout(msg) => assert_eq!(msg, "Navigation timeout"),
1034            _ => panic!("Expected Timeout error"),
1035        }
1036    }
1037
1038    #[tokio::test]
1039    async fn test_dispatch_invalid_id() {
1040        let (connection, _, _) = create_test_connection();
1041
1042        // Create response with ID that doesn't match any request
1043        let response = Message::Response(Response {
1044            id: 999,
1045            result: Some(Value::Null),
1046            error: None,
1047        });
1048
1049        // Dispatch should return error
1050        let result = Arc::new(connection).dispatch(response).await;
1051        assert!(result.is_err());
1052        match result.unwrap_err() {
1053            Error::ProtocolError(msg) => assert!(msg.contains("Cannot find request")),
1054            _ => panic!("Expected ProtocolError"),
1055        }
1056    }
1057
1058    #[tokio::test]
1059    async fn test_concurrent_requests() {
1060        let (connection, _, _) = create_test_connection();
1061        let connection = Arc::new(connection);
1062
1063        // Create callbacks for multiple requests
1064        let id1 = connection.last_id.fetch_add(1, Ordering::SeqCst);
1065        let id2 = connection.last_id.fetch_add(1, Ordering::SeqCst);
1066        let id3 = connection.last_id.fetch_add(1, Ordering::SeqCst);
1067
1068        let (tx1, rx1) = oneshot::channel();
1069        let (tx2, rx2) = oneshot::channel();
1070        let (tx3, rx3) = oneshot::channel();
1071
1072        connection.callbacks.lock().await.insert(id1, tx1);
1073        connection.callbacks.lock().await.insert(id2, tx2);
1074        connection.callbacks.lock().await.insert(id3, tx3);
1075
1076        // Verify IDs are unique
1077        assert_eq!(id1, 0);
1078        assert_eq!(id2, 1);
1079        assert_eq!(id3, 2);
1080
1081        // Simulate responses arriving in different order
1082        let conn1 = Arc::clone(&connection);
1083        let conn2 = Arc::clone(&connection);
1084        let conn3 = Arc::clone(&connection);
1085
1086        let handle1 = tokio::spawn(async move {
1087            conn1
1088                .dispatch(Message::Response(Response {
1089                    id: 1,
1090                    result: Some(serde_json::json!({"page": "2"})),
1091                    error: None,
1092                }))
1093                .await
1094                .unwrap();
1095        });
1096
1097        let handle2 = tokio::spawn(async move {
1098            conn2
1099                .dispatch(Message::Response(Response {
1100                    id: 0,
1101                    result: Some(serde_json::json!({"page": "1"})),
1102                    error: None,
1103                }))
1104                .await
1105                .unwrap();
1106        });
1107
1108        let handle3 = tokio::spawn(async move {
1109            conn3
1110                .dispatch(Message::Response(Response {
1111                    id: 2,
1112                    result: Some(serde_json::json!({"page": "3"})),
1113                    error: None,
1114                }))
1115                .await
1116                .unwrap();
1117        });
1118
1119        // Wait for all dispatches to complete
1120        handle1.await.unwrap();
1121        handle2.await.unwrap();
1122        handle3.await.unwrap();
1123
1124        // Verify each receiver gets the correct response
1125        let result1 = rx1.await.unwrap().unwrap();
1126        let result2 = rx2.await.unwrap().unwrap();
1127        let result3 = rx3.await.unwrap().unwrap();
1128
1129        assert_eq!(result1["page"], "1");
1130        assert_eq!(result2["page"], "2");
1131        assert_eq!(result3["page"], "3");
1132    }
1133
1134    #[test]
1135    fn test_message_deserialization_response() {
1136        let json = r#"{"id": 42, "result": {"status": "ok"}}"#;
1137        let message: Message = serde_json::from_str(json).unwrap();
1138
1139        match message {
1140            Message::Response(response) => {
1141                assert_eq!(response.id, 42);
1142                assert!(response.result.is_some());
1143                assert!(response.error.is_none());
1144            }
1145            _ => panic!("Expected Response"),
1146        }
1147    }
1148
1149    #[test]
1150    fn test_message_deserialization_event() {
1151        let json = r#"{"guid": "page@abc", "method": "console", "params": {"text": "hello"}}"#;
1152        let message: Message = serde_json::from_str(json).unwrap();
1153
1154        match message {
1155            Message::Event(event) => {
1156                assert_eq!(event.guid.as_ref(), "page@abc");
1157                assert_eq!(event.method, "console");
1158                assert_eq!(event.params["text"], "hello");
1159            }
1160            _ => panic!("Expected Event"),
1161        }
1162    }
1163
1164    #[test]
1165    fn test_error_type_parsing() {
1166        // TimeoutError
1167        let error = parse_protocol_error(ErrorPayload {
1168            message: "timeout".to_string(),
1169            name: Some("TimeoutError".to_string()),
1170            stack: None,
1171        });
1172        assert!(matches!(error, Error::Timeout(_)));
1173
1174        // TargetClosedError
1175        let error = parse_protocol_error(ErrorPayload {
1176            message: "closed".to_string(),
1177            name: Some("TargetClosedError".to_string()),
1178            stack: None,
1179        });
1180        assert!(matches!(error, Error::TargetClosed { .. }));
1181
1182        // Generic error
1183        let error = parse_protocol_error(ErrorPayload {
1184            message: "generic".to_string(),
1185            name: None,
1186            stack: None,
1187        });
1188        assert!(matches!(error, Error::ProtocolError(_)));
1189    }
1190}