af-core 0.1.8

A core library and async runtime for Rust applications.
Documentation
// Copyright © 2021 Alexandra Frydl
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

use af_core::test::prelude::*;
use af_core::{channel, task};

/// Test the `channel` module.
pub fn test(cx: &mut test::Context) {
  cx.scope("::unbounded", test_unbounded);
  cx.scope("::with_capacity", test_with_capacity);
}

/// Test unbounded channels.
fn test_unbounded(cx: &mut test::Context) {
  test!(cx, "can transmit messages synchronously", {
    let (tx, rx) = channel::unbounded();

    tx.try_send(1)?;
    tx.try_send(2)?;

    let one = rx.try_recv()?;
    let two = rx.try_recv()?;

    fail::when!(one.is_none());
    fail::when!(two.is_none());
    fail::when!(one != 1);
    fail::when!(two != 2);
  });

  test!(cx, "can transmit messages asynchronously", {
    let (tx, rx) = channel::unbounded();

    tx.send(1usize).await?;
    tx.send(2).await?;

    let one = rx.recv().await?;
    let two = rx.recv().await?;

    fail::when!(one != 1);
    fail::when!(two != 2);
  });

  test!(cx, "is_closed() returns `true` when closed", {
    let (tx, rx) = channel::unbounded::<()>();

    fail::when!(tx.is_closed(), "Closed immediately.");
    drop(rx);
    fail::when!(!tx.is_closed(), "Dropping the Receiver did not close the channel.");

    let (tx, rx) = channel::unbounded::<()>();

    fail::when!(rx.is_closed(), "Closed immediately.");
    drop(tx);
    fail::when!(!rx.is_closed(), "Dropping the Sender did not close the channel.");
  });

  cx.scope("try_recv()", |cx| {
    test!(cx, "succeeds while open", {
      let (_tx, rx) = channel::unbounded::<()>();

      rx.try_recv()?;
      rx.try_recv()?;
    });

    test!(cx, "fails when closed", {
      let (_, rx) = channel::unbounded::<()>();
      let result = rx.try_recv();

      fail::when!(result.is_ok());
    });

    test!(cx, "returns `None` while empty", {
      let (_tx, rx) = channel::unbounded::<()>();
      let msg = rx.try_recv()?;

      fail::when!(msg.is_some());
    });
  });

  cx.scope("recv()", |cx| {
    test!(cx, "succeeds immediately while open and non-empty", timeout = immediate, {
      let (tx, rx) = channel::unbounded();
      let _ = tx.try_send(());

      rx.recv().await?;
    });

    test!(cx, "fails immediately when closed", timeout = immediate, {
      let (_, rx) = channel::unbounded::<()>();
      let result = rx.recv().await;

      fail::when!(result.is_ok());
    });

    test!(cx, "waits when empty", timeout = "1 s", {
      let (tx, rx) = channel::unbounded::<()>();

      let send = task::start(async move {
        task::sleep(Duration::hz(60)).await;

        tx.try_send(())
      });

      let recv = rx.recv();

      pin!(recv);

      fail::when!(future::poll(&mut recv).is_some(), "Completed immediately.");

      send.join().await??;
      recv.await?;
    });
  });

  cx.scope("try_send()", |cx| {
    test!(cx, "succeeds while open", {
      let (tx, _rx) = channel::unbounded();

      tx.try_send(())?;
      tx.try_send(())?;
    });

    test!(cx, "fails while closed", {
      let (tx, _) = channel::unbounded();

      match tx.try_send(1) {
        Ok(()) => fail!("Succeeded."),
        Err(err) => match err.reason {
          channel::SendErrorReason::Closed => {}
          other => fail!("Unexpected error reason: `{:?}`", other),
        },
      }
    });
  });

  cx.scope("send()", |cx| {
    test!(cx, "succeeds immediately while open", timeout = immediate, {
      let (tx, _rx) = channel::unbounded();

      tx.send(()).await?;
      tx.send(()).await?;
    });

    test!(cx, "fails immediately while closed", timeout = immediate, {
      let (tx, _) = channel::unbounded();

      match tx.send(()).await {
        Ok(()) => fail!("Succeeded."),
        Err(err) => match err.reason {
          channel::SendErrorReason::Closed => {}
          other => fail!("Unexpected error reason: `{:?}`", other),
        },
      }
    });
  });
}

/// Test bounded channels.
fn test_with_capacity(cx: &mut test::Context) {
  test!(cx, "can transmit messages synchronously", {
    let (tx, rx) = channel::with_capacity(2);

    tx.try_send(1)?;
    tx.try_send(2)?;

    let one = rx.try_recv()?;
    let two = rx.try_recv()?;

    fail::when!(one.is_none());
    fail::when!(two.is_none());

    fail::when!(one != 1);
    fail::when!(two != 2);
  });

  test!(cx, "can transmit messages asynchronously", {
    let (tx, rx) = channel::with_capacity(2);

    tx.send(1usize).await?;
    tx.send(2).await?;

    let one = rx.recv().await?;
    let two = rx.recv().await?;

    fail::when!(one != 1);
    fail::when!(two != 2);
  });

  test!(cx, "is_closed() returns `true` when closed", {
    let (tx, rx) = channel::with_capacity::<()>(10);

    fail::when!(tx.is_closed(), "Closed immediately.");
    drop(rx);
    fail::when!(!tx.is_closed(), "Dropping the Receiver did not close the channel.");

    let (tx, rx) = channel::with_capacity::<()>(10);

    fail::when!(rx.is_closed(), "Closed immediately.");
    drop(tx);
    fail::when!(!rx.is_closed(), "Dropping the Sender did not close the channel.");
  });

  cx.scope("try_recv()", |cx| {
    test!(cx, "succeeds while open", {
      let (_tx, rx) = channel::with_capacity::<()>(2);

      rx.try_recv()?;
      rx.try_recv()?;
    });

    test!(cx, "fails when closed", {
      let (_, rx) = channel::with_capacity::<()>(2);
      let result = rx.try_recv();

      fail::when!(result.is_ok());
    });

    test!(cx, "returns `None` while empty", {
      let (_tx, rx) = channel::with_capacity::<()>(2);
      let msg = rx.try_recv()?;

      fail::when!(msg.is_some());
    });
  });

  cx.scope("recv()", |cx| {
    test!(cx, "succeeds immediately while open and non-empty", timeout = immediate, {
      let (tx, rx) = channel::with_capacity(10);
      let _ = tx.try_send(());

      rx.recv().await?;
    });

    test!(cx, "fails immediately when closed", timeout = immediate, {
      let (tx, rx) = channel::with_capacity::<()>(8);

      drop(tx);

      let result = rx.recv().await;

      fail::when!(result.is_ok());
    });

    test!(cx, "waits when empty", timeout = "1 s", {
      let (tx, rx) = channel::with_capacity::<()>(8);

      let send = task::start(async move {
        task::sleep(Duration::hz(60)).await;

        tx.try_send(())
      });

      let recv = rx.recv();

      pin!(recv);

      fail::when!(future::poll(&mut recv).is_some(), "Completed immediately.");

      send.join().await??;
      recv.await?;
    });
  });

  cx.scope("try_send()", |cx| {
    test!(cx, "succeeds while not full", {
      let (tx, _rx) = channel::with_capacity(2);

      tx.try_send(())?;
      tx.try_send(())?;
    });

    test!(cx, "fails while closed", {
      let (tx, _) = channel::with_capacity(13);

      match tx.try_send(()) {
        Ok(()) => fail!("Succeeded."),
        Err(err) => match err.reason {
          channel::SendErrorReason::Closed => {}
          other => fail!("Unexpected error reason: `{:?}`", other),
        },
      }
    });

    test!(cx, "fails while full", {
      let (tx, _rx) = channel::with_capacity(1);

      tx.try_send(())?;

      match tx.try_send(()) {
        Ok(()) => fail!("Succeeded."),
        Err(err) => match err.reason {
          channel::SendErrorReason::Full => {}
          other => fail!("Unexpected error reason: `{:?}`", other),
        },
      }
    });
  });

  cx.scope("send()", |cx| {
    test!(cx, "succeeds immediately while not full", timeout = immediate, {
      let (tx, _rx) = channel::with_capacity(2);

      tx.send(()).await?;
      tx.send(()).await?;
    });

    test!(cx, "fails when closed", timeout = immediate, {
      let (tx, _) = channel::with_capacity(8);

      match tx.send(()).await {
        Ok(()) => fail!("Succeeded."),
        Err(err) => match err.reason {
          channel::SendErrorReason::Closed => {}
          other => fail!("Unexpected error reason: `{:?}`", other),
        },
      }
    });

    test!(cx, "waits while full", timeout = "1 s", {
      let (tx, rx) = channel::with_capacity(1);

      let recv = task::start(async move {
        task::sleep(Duration::hz(60)).await;

        rx.recv().await?;
        rx.recv().await
      });

      tx.send(()).await?;

      let send = tx.send(());

      pin!(send);

      fail::when!(future::poll(&mut send).is_some(), "Sent immediately.");

      send.await?;
      recv.join().await??;
    });
  });
}