playwright 0.0.20

Playwright port to Rust
Documentation
use crate::imp::{core::*, prelude::*};
pub(crate) use tokio::sync::{broadcast, broadcast::error::TryRecvError};

pub trait EventEmitter {
    type Event: Clone;

    fn tx(&self) -> Option<broadcast::Sender<Self::Event>>;

    fn set_tx(&self, tx: broadcast::Sender<Self::Event>);

    fn new_tx(
        &self
    ) -> (
        broadcast::Sender<Self::Event>,
        broadcast::Receiver<Self::Event>
    ) {
        broadcast::channel(64)
    }

    fn subscribe_event(&self) -> broadcast::Receiver<Self::Event> {
        if let Some(tx) = self.tx() {
            tx.subscribe()
        } else {
            let (tx, rx) = self.new_tx();
            self.set_tx(tx);
            rx
        }
    }

    fn emit_event<E: Into<Self::Event>>(&self, e: E) { self.tx().map(|tx| tx.send(e.into()).ok()); }
}

pub(crate) trait IsEvent: Clone {
    type EventType: Clone + Copy + PartialEq;

    fn event_type(&self) -> Self::EventType;
}

#[cfg(any(feature = "rt-tokio", feature = "rt-actix"))]
pub(crate) async fn expect_event<E>(
    mut rx: broadcast::Receiver<E>,
    evt: E::EventType,
    timeout: u32
) -> Result<E, Error>
where
    E: IsEvent + Send + Sync + 'static,
    <E as event_emitter::IsEvent>::EventType: Send + Sync
{
    consume(&mut rx).await?;
    let sleep = sleep(Duration::from_millis(timeout as u64));
    let event = spawn(async move {
        loop {
            match rx.recv().await {
                Ok(x) if x.event_type() == evt => break Ok(x),
                Ok(_) => continue,
                Err(e) => break Err(e)
            }
        }
    });
    tokio::select! {
        _ = sleep => Err(Error::Timeout),
        x = event => x?.map_err(Error::Event)
    }
}

#[cfg(feature = "rt-async-std")]
pub(crate) async fn expect_event<E>(
    mut rx: broadcast::Receiver<E>,
    evt: E::EventType,
    timeout: u32
) -> Result<E, Error>
where
    E: IsEvent + Send + Sync + 'static,
    <E as event_emitter::IsEvent>::EventType: Send + Sync
{
    consume(&mut rx).await?;
    let sleep = sleep(Duration::from_millis(timeout as u64));
    let event = spawn(async move {
        loop {
            match rx.recv().await {
                Ok(x) if x.event_type() == evt => break Ok(x),
                Ok(_) => continue,
                Err(e) => break Err(e)
            }
        }
    });
    tokio::select! {
        _ = sleep => Err(Error::Timeout),
        x = event => x.map_err(Error::Event)
    }
}

async fn consume<E>(rx: &mut broadcast::Receiver<E>) -> Result<(), Error>
where
    E: IsEvent
{
    loop {
        match rx.try_recv() {
            Err(TryRecvError::Empty) | Err(TryRecvError::Closed) => break,
            _ => {}
        }
    }
    Ok(())
}

#[cfg(test)]
mod tests {
    crate::runtime_test!(select, {
        use crate::imp::prelude::*;
        let first = sleep(Duration::from_millis(200u64));
        let second = sleep(Duration::from_millis(400u64));
        tokio::select! {
            _ = first => {},
            _ = second => unreachable!()
        }
    });
}