chrome_remote_interface/
lib.rs

1#![doc(html_root_url = "https://docs.rs/chrome-remote-interface/0.1.0-alpha.8")]
2//! [Chrome DevTools Protocol](https://chromedevtools.github.io/devtools-protocol/) client.
3//!
4//! Currently Work In Progress.
5//!
6//! ## Example
7//!
8//! ```
9//! use chrome_remote_interface::Browser;
10//! use chrome_remote_interface::model::target::{CreateTargetCommand, AttachToTargetCommand};
11//!
12//! #[tokio::main(flavor = "current_thread")]
13//! async fn main() -> anyhow::Result<()> {
14//!     pretty_env_logger::init();
15//!     let browser = Browser::launcher()
16//!         .headless(true) // headless mode (Default)
17//!         .launch()
18//!         .await?;
19//!
20//!     browser.run_with(|mut client| async move {
21//!         // Open new page
22//!         let response = client.request(CreateTargetCommand::builder()
23//!             .url("https://example.org/".into())
24//!             .build()
25//!             .unwrap()
26//!         ).await?;
27//!
28//!         // Attach opened page.
29//!         let response = client
30//!             .request(AttachToTargetCommand::new((*response).clone(), Some(true)))
31//!             .await?;
32//!
33//!         // construct attached session.
34//!         let mut session = client.session(response);
35//!
36//!         // DO STUFF
37//!         // ...
38//!
39//!         Ok(())
40//!     }).await
41//! }
42//! ```
43//!
44//! ## License
45//!
46//! Licensed under either of
47//! * Apache License, Version 2.0
48//!   ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
49//! * MIT license
50//!   ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT)
51//! at your option.
52//!
53//! ## Contribution
54//!
55//! Unless you explicitly state otherwise, any contribution intentionally submitted
56//! for inclusion in the work by you, as defined in the Apache-2.0 license, shall be
57//! dual licensed as above, without any additional terms or conditions.!
58
59use std::collections::HashMap;
60use std::future::Future;
61use std::io;
62use std::marker::PhantomData;
63use std::ops::{Deref, DerefMut};
64use std::pin::Pin;
65use std::sync::atomic::{AtomicU32, Ordering};
66use std::sync::Arc;
67use std::task::{Context, Poll};
68
69use futures::channel::mpsc;
70use futures::channel::oneshot;
71use futures::future::FutureExt;
72use futures::ready;
73use futures::select;
74use futures::sink::{Sink, SinkExt};
75use futures::stream::{Fuse, Stream, StreamExt};
76use serde::Deserialize;
77use serde_json::Value;
78use tokio::net::TcpStream;
79use tokio::sync::Mutex;
80use tokio_tungstenite::tungstenite::protocol::Message;
81use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
82use url::Url;
83
84pub use browser::*;
85pub use chrome_remote_interface_model as model;
86use model::SessionId;
87
88/// Environment variable key for Browser Path.
89pub const BROWSER_BIN: &str = "CRI_CHROME_BIN";
90
91macro_rules! recv {
92    ($len:expr, $content:expr) => {
93        log::trace!(target: "chrome_remote_interface::protocol", "<< [{} bytes] {}", $len, $content);
94    }
95}
96
97macro_rules! send {
98    ($len:expr, $content:expr) => {
99        log::trace!(target: "chrome_remote_interface::protocol", ">> [{} bytes] {}", $len, $content);
100    }
101}
102
103mod browser;
104pub(crate) mod os;
105mod pipe;
106pub(crate) mod process;
107
108/// Chrome DevTools Protocol Client Error.
109#[derive(Debug, thiserror::Error)]
110pub enum Error {
111    /// IO Error.
112    #[error(transparent)]
113    Io(#[from] io::Error),
114
115    /// Websocket Request Error.
116    #[error(transparent)]
117    WsRequest(#[from] tokio_tungstenite::tungstenite::Error),
118
119    /// Serialize / Deserialze Error.
120    #[error(transparent)]
121    Json(#[from] serde_json::Error),
122
123    /// Chrome DevTools Protocol Command Error.
124    #[error("error response {0:?}")]
125    Response(serde_json::Value),
126
127    /// Loop Cancelation Error.
128    #[error("loop canceled")]
129    LoopCanceled(#[from] oneshot::Canceled),
130
131    /// Maybe Loop Aborted Error.
132    #[error("loop aborted")]
133    LoopAborted(#[from] mpsc::SendError),
134
135    /// Browser Error.
136    #[error(transparent)]
137    Browser(#[from] BrowserError),
138}
139
140impl<T> From<mpsc::TrySendError<T>> for Error {
141    fn from(v: mpsc::TrySendError<T>) -> Self {
142        Self::LoopAborted(v.into_send_error())
143    }
144}
145
146/// Chrome DevTools Protocol Result.
147pub type Result<T> = std::result::Result<T, Error>;
148
149enum Channel {
150    Ws(Fuse<WebSocketStream<MaybeTlsStream<TcpStream>>>),
151    Pipe(pipe::PipeChannel),
152}
153
154impl Stream for Channel {
155    type Item = Result<Value>;
156
157    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
158        match self.get_mut() {
159            Self::Ws(ws) => loop {
160                match ready!(ws.poll_next_unpin(cx)?) {
161                    Some(Message::Text(m)) => {
162                        recv!(m.bytes().len(), m);
163                        return Poll::Ready(Some(Ok(serde_json::from_str(&m)?)));
164                    }
165                    Some(Message::Binary(m)) => {
166                        recv!(m.len(), String::from_utf8_lossy(&m));
167                        return Poll::Ready(Some(Ok(serde_json::from_slice(&m)?)));
168                    }
169                    Some(..) => {}
170                    None => return Poll::Ready(None),
171                }
172            },
173
174            Self::Pipe(inner) => inner.poll_next_unpin(cx),
175        }
176    }
177}
178
179impl Sink<Value> for Channel {
180    type Error = Error;
181
182    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
183        match self.get_mut() {
184            Self::Ws(ws) => {
185                ready!(ws.poll_ready_unpin(cx)?);
186                Poll::Ready(Ok(()))
187            }
188
189            Self::Pipe(inner) => inner.poll_ready_unpin(cx),
190        }
191    }
192
193    fn start_send(self: Pin<&mut Self>, item: Value) -> Result<()> {
194        match self.get_mut() {
195            Self::Ws(ws) => {
196                let item = serde_json::to_string(&item)?;
197                send!(item.bytes().len(), &item);
198                ws.start_send_unpin(Message::Text(item))?;
199                Ok(())
200            }
201
202            Self::Pipe(inner) => inner.start_send_unpin(item),
203        }
204    }
205
206    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
207        match self.get_mut() {
208            Self::Ws(ws) => {
209                ready!(ws.poll_flush_unpin(cx)?);
210                Poll::Ready(Ok(()))
211            }
212
213            Self::Pipe(inner) => inner.poll_flush_unpin(cx),
214        }
215    }
216
217    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
218        match self.get_mut() {
219            Self::Ws(ws) => {
220                ready!(ws.poll_close_unpin(cx)?);
221                Poll::Ready(Ok(()))
222            }
223
224            Self::Pipe(inner) => inner.poll_close_unpin(cx),
225        }
226    }
227}
228
229/// Stream for Chrome DevTools Protocol Event.
230#[derive(Debug)]
231pub struct CdpEvents {
232    rx: mpsc::UnboundedReceiver<model::Event>,
233}
234
235impl Stream for CdpEvents {
236    type Item = model::Event;
237
238    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
239        self.rx.poll_next_unpin(cx)
240    }
241}
242
243/// Chrome DevTools Protocol Client Command Request Future.
244#[derive(Debug)]
245pub struct Request<R> {
246    tx_result: Option<Result<()>>,
247    rx: oneshot::Receiver<std::result::Result<Value, Value>>,
248    _phantom: PhantomData<R>,
249}
250
251impl<R> Future for Request<R>
252where
253    R: for<'r> Deserialize<'r> + Unpin,
254{
255    type Output = Result<R>;
256
257    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
258        let this = self.get_mut();
259        if let Some(r) = this.tx_result.take() {
260            r?;
261        }
262        match ready!(this.rx.poll_unpin(cx))? {
263            Ok(v) => Poll::Ready(Ok(serde_json::from_value(v)?)),
264            Err(err) => Poll::Ready(Err(Error::Response(err))),
265        }
266    }
267}
268
269/// Chrome DevTools Protocol Session.
270#[derive(Debug, Clone)]
271pub struct CdpSession {
272    idgen: Arc<AtomicU32>,
273    session_id: Option<SessionId>,
274    control_tx: mpsc::UnboundedSender<Control>,
275    browser: Option<Arc<Mutex<Browser>>>,
276}
277
278impl CdpSession {
279    /// Request for Chrome DevTools Protocol Command.
280    pub fn request<C>(&mut self, command: C) -> Request<C::Return>
281    where
282        C: model::Command,
283    {
284        let id = self.idgen.fetch_add(1, Ordering::SeqCst);
285        let request = command.into_request(self.session_id.clone(), id);
286        let request = serde_json::to_value(&request).unwrap(); // FIXME
287        let (tx, rx) = oneshot::channel();
288        let tx_result = self
289            .control_tx
290            .unbounded_send(Control::Request(id, request, tx));
291
292        Request {
293            tx_result: Some(tx_result.map_err(Into::into)),
294            rx,
295            _phantom: PhantomData,
296        }
297    }
298
299    /// Subscribe Chrome DevTools Protocol Event.
300    pub fn events(&mut self) -> Result<CdpEvents> {
301        let (tx, rx) = mpsc::unbounded();
302        self.control_tx
303            .unbounded_send(Control::Subscribe(self.session_id.clone(), tx))?;
304        Ok(CdpEvents { rx })
305    }
306}
307
308async fn wait(browser: &Option<Arc<Mutex<Browser>>>) -> io::Result<()> {
309    if let Some(browser) = browser {
310        let mut browser = browser.lock().await;
311        browser.wait().await
312    } else {
313        std::future::pending::<()>().await;
314        Ok(())
315    }
316}
317
318async fn loop_inner(
319    control_rx: &mut mpsc::UnboundedReceiver<Control>,
320    channel: &mut Channel,
321    browser: Option<Arc<Mutex<Browser>>>,
322) -> Result<()> {
323    let mut waiters = HashMap::<u32, oneshot::Sender<std::result::Result<Value, Value>>>::new();
324    let mut events = HashMap::<Option<SessionId>, Vec<mpsc::UnboundedSender<model::Event>>>::new();
325
326    loop {
327        select! {
328            _ = wait(&browser).fuse() => {
329                // FIXME There may be some events that we're not receiving.
330                break
331            },
332
333            ctrl = control_rx.next() => {
334                match ctrl {
335                    Some(Control::Subscribe(session_id, tx)) => {
336                        events.entry(session_id).or_insert_with(Default::default).push(tx);
337                    }
338                    Some(Control::Request(id, request, result)) => {
339                        channel.send(request).await?;
340                        waiters.insert(id, result);
341                    }
342                    None => break,
343                }
344            },
345
346            msg = channel.next().fuse() => {
347                match msg {
348                    Some(Ok(msg)) => {
349                        let msg = serde_json::from_value(msg)?;
350                        match msg {
351                            model::Response::Event(session_id, evt) => {
352                                for tx in &mut *events.entry(session_id).or_default() {
353                                    tx.unbounded_send(evt.clone()).ok(); // TODO remove
354                                }
355                            }
356
357                            model::Response::Return(_, id, v) => {
358                                if let Some(tx) = waiters.remove(&id) {
359                                    tx.send(Ok(v)).unwrap();
360                                }
361                            }
362
363                            model::Response::Error(_, id, err) => {
364                                if let Some(tx) = waiters.remove(&id) {
365                                    tx.send(Err(err)).unwrap();
366                                }
367                            }
368                        }
369                    }
370                    Some(Err(err)) => return Err(err),
371                    None => {}
372                }
373            }
374        }
375    }
376
377    Ok(())
378}
379
380async fn r#loop(
381    mut control_rx: mpsc::UnboundedReceiver<Control>,
382    mut channel: Channel,
383    browser: Option<Arc<Mutex<Browser>>>,
384) -> Result<()> {
385    log::debug!("Begin loop.");
386
387    let result = loop_inner(&mut control_rx, &mut channel, browser.clone()).await;
388
389    if let Some(browser) = browser {
390        log::debug!("send close command.");
391        let close_command = model::browser::CloseCommand::new();
392        let close_command = model::Command::into_request(close_command, None, 0);
393        if let Ok(close_command) = serde_json::to_value(close_command) {
394            channel.send(close_command).await.ok();
395        }
396
397        log::debug!("browser shutdown.");
398        browser.lock().await.close().await;
399    }
400    log::debug!("Loop done.");
401
402    result
403}
404
405#[derive(Debug)]
406enum Control {
407    Subscribe(Option<SessionId>, mpsc::UnboundedSender<model::Event>),
408    Request(
409        u32,
410        Value,
411        oneshot::Sender<std::result::Result<Value, Value>>,
412    ),
413}
414
415/// Message loop.
416pub struct Loop {
417    future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
418}
419
420impl Future for Loop {
421    type Output = Result<()>;
422    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
423        self.future.poll_unpin(cx)
424    }
425}
426
427/// Chrome DevTools Protocol Client.
428///
429/// This serve as browser session.
430#[derive(Debug)]
431pub struct CdpClient {
432    control_tx: mpsc::UnboundedSender<Control>,
433    session: CdpSession,
434}
435
436impl CdpClient {
437    async fn connect_internal(channel: Channel, browser: Option<Browser>) -> (Self, Loop) {
438        let browser = browser.map(|b| Arc::new(Mutex::new(b)));
439
440        let (control_tx, control_rx) = mpsc::unbounded();
441        let task = Loop {
442            future: Box::pin(r#loop(control_rx, channel, browser.clone())),
443        };
444        let session = CdpSession {
445            idgen: Arc::new(AtomicU32::default()),
446            session_id: None,
447            control_tx: control_tx.clone(),
448            browser,
449        };
450        let client = CdpClient {
451            control_tx,
452            session,
453        };
454        (client, task)
455    }
456
457    /// Connect with CDP Websocket.
458    pub async fn connect(url: &Url) -> Result<(Self, impl Future<Output = Result<()>>)> {
459        Self::connect_ws(url, None).await
460    }
461
462    async fn connect_ws(url: &Url, browser: Option<Browser>) -> Result<(Self, Loop)> {
463        let (ws, _) = tokio_tungstenite::connect_async(url).await?;
464        let channel = Channel::Ws(ws.fuse());
465        Ok(Self::connect_internal(channel, browser).await)
466    }
467
468    async fn connect_pipe(browser: Browser, channel: pipe::PipeChannel) -> Result<(Self, Loop)> {
469        let channel = Channel::Pipe(channel);
470        Ok(Self::connect_internal(channel, Some(browser)).await)
471    }
472
473    /// Construct session with target.
474    pub fn session<S: Deref<Target = model::target::SessionId>>(
475        &mut self,
476        session_id: S,
477    ) -> CdpSession {
478        let session_id = Some(SessionId::from(session_id.as_ref()));
479        CdpSession {
480            idgen: self.idgen.clone(),
481            session_id,
482            control_tx: self.control_tx.clone(),
483            browser: self.browser.clone(),
484        }
485    }
486}
487
488impl Deref for CdpClient {
489    type Target = CdpSession;
490    fn deref(&self) -> &Self::Target {
491        &self.session
492    }
493}
494
495impl DerefMut for CdpClient {
496    fn deref_mut(&mut self) -> &mut Self::Target {
497        &mut self.session
498    }
499}