1#![doc(html_root_url = "https://docs.rs/chrome-remote-interface/0.1.0-alpha.8")]
2use std::collections::HashMap;
60use std::future::Future;
61use std::io;
62use std::marker::PhantomData;
63use std::ops::{Deref, DerefMut};
64use std::pin::Pin;
65use std::sync::atomic::{AtomicU32, Ordering};
66use std::sync::Arc;
67use std::task::{Context, Poll};
68
69use futures::channel::mpsc;
70use futures::channel::oneshot;
71use futures::future::FutureExt;
72use futures::ready;
73use futures::select;
74use futures::sink::{Sink, SinkExt};
75use futures::stream::{Fuse, Stream, StreamExt};
76use serde::Deserialize;
77use serde_json::Value;
78use tokio::net::TcpStream;
79use tokio::sync::Mutex;
80use tokio_tungstenite::tungstenite::protocol::Message;
81use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
82use url::Url;
83
84pub use browser::*;
85pub use chrome_remote_interface_model as model;
86use model::SessionId;
87
88pub const BROWSER_BIN: &str = "CRI_CHROME_BIN";
90
91macro_rules! recv {
92 ($len:expr, $content:expr) => {
93 log::trace!(target: "chrome_remote_interface::protocol", "<< [{} bytes] {}", $len, $content);
94 }
95}
96
97macro_rules! send {
98 ($len:expr, $content:expr) => {
99 log::trace!(target: "chrome_remote_interface::protocol", ">> [{} bytes] {}", $len, $content);
100 }
101}
102
103mod browser;
104pub(crate) mod os;
105mod pipe;
106pub(crate) mod process;
107
108#[derive(Debug, thiserror::Error)]
110pub enum Error {
111 #[error(transparent)]
113 Io(#[from] io::Error),
114
115 #[error(transparent)]
117 WsRequest(#[from] tokio_tungstenite::tungstenite::Error),
118
119 #[error(transparent)]
121 Json(#[from] serde_json::Error),
122
123 #[error("error response {0:?}")]
125 Response(serde_json::Value),
126
127 #[error("loop canceled")]
129 LoopCanceled(#[from] oneshot::Canceled),
130
131 #[error("loop aborted")]
133 LoopAborted(#[from] mpsc::SendError),
134
135 #[error(transparent)]
137 Browser(#[from] BrowserError),
138}
139
140impl<T> From<mpsc::TrySendError<T>> for Error {
141 fn from(v: mpsc::TrySendError<T>) -> Self {
142 Self::LoopAborted(v.into_send_error())
143 }
144}
145
146pub type Result<T> = std::result::Result<T, Error>;
148
149enum Channel {
150 Ws(Fuse<WebSocketStream<MaybeTlsStream<TcpStream>>>),
151 Pipe(pipe::PipeChannel),
152}
153
154impl Stream for Channel {
155 type Item = Result<Value>;
156
157 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
158 match self.get_mut() {
159 Self::Ws(ws) => loop {
160 match ready!(ws.poll_next_unpin(cx)?) {
161 Some(Message::Text(m)) => {
162 recv!(m.bytes().len(), m);
163 return Poll::Ready(Some(Ok(serde_json::from_str(&m)?)));
164 }
165 Some(Message::Binary(m)) => {
166 recv!(m.len(), String::from_utf8_lossy(&m));
167 return Poll::Ready(Some(Ok(serde_json::from_slice(&m)?)));
168 }
169 Some(..) => {}
170 None => return Poll::Ready(None),
171 }
172 },
173
174 Self::Pipe(inner) => inner.poll_next_unpin(cx),
175 }
176 }
177}
178
179impl Sink<Value> for Channel {
180 type Error = Error;
181
182 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
183 match self.get_mut() {
184 Self::Ws(ws) => {
185 ready!(ws.poll_ready_unpin(cx)?);
186 Poll::Ready(Ok(()))
187 }
188
189 Self::Pipe(inner) => inner.poll_ready_unpin(cx),
190 }
191 }
192
193 fn start_send(self: Pin<&mut Self>, item: Value) -> Result<()> {
194 match self.get_mut() {
195 Self::Ws(ws) => {
196 let item = serde_json::to_string(&item)?;
197 send!(item.bytes().len(), &item);
198 ws.start_send_unpin(Message::Text(item))?;
199 Ok(())
200 }
201
202 Self::Pipe(inner) => inner.start_send_unpin(item),
203 }
204 }
205
206 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
207 match self.get_mut() {
208 Self::Ws(ws) => {
209 ready!(ws.poll_flush_unpin(cx)?);
210 Poll::Ready(Ok(()))
211 }
212
213 Self::Pipe(inner) => inner.poll_flush_unpin(cx),
214 }
215 }
216
217 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
218 match self.get_mut() {
219 Self::Ws(ws) => {
220 ready!(ws.poll_close_unpin(cx)?);
221 Poll::Ready(Ok(()))
222 }
223
224 Self::Pipe(inner) => inner.poll_close_unpin(cx),
225 }
226 }
227}
228
229#[derive(Debug)]
231pub struct CdpEvents {
232 rx: mpsc::UnboundedReceiver<model::Event>,
233}
234
235impl Stream for CdpEvents {
236 type Item = model::Event;
237
238 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
239 self.rx.poll_next_unpin(cx)
240 }
241}
242
243#[derive(Debug)]
245pub struct Request<R> {
246 tx_result: Option<Result<()>>,
247 rx: oneshot::Receiver<std::result::Result<Value, Value>>,
248 _phantom: PhantomData<R>,
249}
250
251impl<R> Future for Request<R>
252where
253 R: for<'r> Deserialize<'r> + Unpin,
254{
255 type Output = Result<R>;
256
257 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
258 let this = self.get_mut();
259 if let Some(r) = this.tx_result.take() {
260 r?;
261 }
262 match ready!(this.rx.poll_unpin(cx))? {
263 Ok(v) => Poll::Ready(Ok(serde_json::from_value(v)?)),
264 Err(err) => Poll::Ready(Err(Error::Response(err))),
265 }
266 }
267}
268
269#[derive(Debug, Clone)]
271pub struct CdpSession {
272 idgen: Arc<AtomicU32>,
273 session_id: Option<SessionId>,
274 control_tx: mpsc::UnboundedSender<Control>,
275 browser: Option<Arc<Mutex<Browser>>>,
276}
277
278impl CdpSession {
279 pub fn request<C>(&mut self, command: C) -> Request<C::Return>
281 where
282 C: model::Command,
283 {
284 let id = self.idgen.fetch_add(1, Ordering::SeqCst);
285 let request = command.into_request(self.session_id.clone(), id);
286 let request = serde_json::to_value(&request).unwrap(); let (tx, rx) = oneshot::channel();
288 let tx_result = self
289 .control_tx
290 .unbounded_send(Control::Request(id, request, tx));
291
292 Request {
293 tx_result: Some(tx_result.map_err(Into::into)),
294 rx,
295 _phantom: PhantomData,
296 }
297 }
298
299 pub fn events(&mut self) -> Result<CdpEvents> {
301 let (tx, rx) = mpsc::unbounded();
302 self.control_tx
303 .unbounded_send(Control::Subscribe(self.session_id.clone(), tx))?;
304 Ok(CdpEvents { rx })
305 }
306}
307
308async fn wait(browser: &Option<Arc<Mutex<Browser>>>) -> io::Result<()> {
309 if let Some(browser) = browser {
310 let mut browser = browser.lock().await;
311 browser.wait().await
312 } else {
313 std::future::pending::<()>().await;
314 Ok(())
315 }
316}
317
318async fn loop_inner(
319 control_rx: &mut mpsc::UnboundedReceiver<Control>,
320 channel: &mut Channel,
321 browser: Option<Arc<Mutex<Browser>>>,
322) -> Result<()> {
323 let mut waiters = HashMap::<u32, oneshot::Sender<std::result::Result<Value, Value>>>::new();
324 let mut events = HashMap::<Option<SessionId>, Vec<mpsc::UnboundedSender<model::Event>>>::new();
325
326 loop {
327 select! {
328 _ = wait(&browser).fuse() => {
329 break
331 },
332
333 ctrl = control_rx.next() => {
334 match ctrl {
335 Some(Control::Subscribe(session_id, tx)) => {
336 events.entry(session_id).or_insert_with(Default::default).push(tx);
337 }
338 Some(Control::Request(id, request, result)) => {
339 channel.send(request).await?;
340 waiters.insert(id, result);
341 }
342 None => break,
343 }
344 },
345
346 msg = channel.next().fuse() => {
347 match msg {
348 Some(Ok(msg)) => {
349 let msg = serde_json::from_value(msg)?;
350 match msg {
351 model::Response::Event(session_id, evt) => {
352 for tx in &mut *events.entry(session_id).or_default() {
353 tx.unbounded_send(evt.clone()).ok(); }
355 }
356
357 model::Response::Return(_, id, v) => {
358 if let Some(tx) = waiters.remove(&id) {
359 tx.send(Ok(v)).unwrap();
360 }
361 }
362
363 model::Response::Error(_, id, err) => {
364 if let Some(tx) = waiters.remove(&id) {
365 tx.send(Err(err)).unwrap();
366 }
367 }
368 }
369 }
370 Some(Err(err)) => return Err(err),
371 None => {}
372 }
373 }
374 }
375 }
376
377 Ok(())
378}
379
380async fn r#loop(
381 mut control_rx: mpsc::UnboundedReceiver<Control>,
382 mut channel: Channel,
383 browser: Option<Arc<Mutex<Browser>>>,
384) -> Result<()> {
385 log::debug!("Begin loop.");
386
387 let result = loop_inner(&mut control_rx, &mut channel, browser.clone()).await;
388
389 if let Some(browser) = browser {
390 log::debug!("send close command.");
391 let close_command = model::browser::CloseCommand::new();
392 let close_command = model::Command::into_request(close_command, None, 0);
393 if let Ok(close_command) = serde_json::to_value(close_command) {
394 channel.send(close_command).await.ok();
395 }
396
397 log::debug!("browser shutdown.");
398 browser.lock().await.close().await;
399 }
400 log::debug!("Loop done.");
401
402 result
403}
404
405#[derive(Debug)]
406enum Control {
407 Subscribe(Option<SessionId>, mpsc::UnboundedSender<model::Event>),
408 Request(
409 u32,
410 Value,
411 oneshot::Sender<std::result::Result<Value, Value>>,
412 ),
413}
414
415pub struct Loop {
417 future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
418}
419
420impl Future for Loop {
421 type Output = Result<()>;
422 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
423 self.future.poll_unpin(cx)
424 }
425}
426
427#[derive(Debug)]
431pub struct CdpClient {
432 control_tx: mpsc::UnboundedSender<Control>,
433 session: CdpSession,
434}
435
436impl CdpClient {
437 async fn connect_internal(channel: Channel, browser: Option<Browser>) -> (Self, Loop) {
438 let browser = browser.map(|b| Arc::new(Mutex::new(b)));
439
440 let (control_tx, control_rx) = mpsc::unbounded();
441 let task = Loop {
442 future: Box::pin(r#loop(control_rx, channel, browser.clone())),
443 };
444 let session = CdpSession {
445 idgen: Arc::new(AtomicU32::default()),
446 session_id: None,
447 control_tx: control_tx.clone(),
448 browser,
449 };
450 let client = CdpClient {
451 control_tx,
452 session,
453 };
454 (client, task)
455 }
456
457 pub async fn connect(url: &Url) -> Result<(Self, impl Future<Output = Result<()>>)> {
459 Self::connect_ws(url, None).await
460 }
461
462 async fn connect_ws(url: &Url, browser: Option<Browser>) -> Result<(Self, Loop)> {
463 let (ws, _) = tokio_tungstenite::connect_async(url).await?;
464 let channel = Channel::Ws(ws.fuse());
465 Ok(Self::connect_internal(channel, browser).await)
466 }
467
468 async fn connect_pipe(browser: Browser, channel: pipe::PipeChannel) -> Result<(Self, Loop)> {
469 let channel = Channel::Pipe(channel);
470 Ok(Self::connect_internal(channel, Some(browser)).await)
471 }
472
473 pub fn session<S: Deref<Target = model::target::SessionId>>(
475 &mut self,
476 session_id: S,
477 ) -> CdpSession {
478 let session_id = Some(SessionId::from(session_id.as_ref()));
479 CdpSession {
480 idgen: self.idgen.clone(),
481 session_id,
482 control_tx: self.control_tx.clone(),
483 browser: self.browser.clone(),
484 }
485 }
486}
487
488impl Deref for CdpClient {
489 type Target = CdpSession;
490 fn deref(&self) -> &Self::Target {
491 &self.session
492 }
493}
494
495impl DerefMut for CdpClient {
496 fn deref_mut(&mut self) -> &mut Self::Target {
497 &mut self.session
498 }
499}