mysql_async 0.36.2

Tokio based asynchronous MySql client library.
Documentation
#![cfg(test)]

use futures_util::TryStreamExt;

use crate::{from_row, prelude::*, test_misc::get_opts, Conn, Row, TxOpts};

#[tokio::test]
async fn should_stream_text_result_sets() -> crate::Result<()> {
    const QUERY: &str = r"
        SELECT 1;
        SELECT 'foo' UNION ALL SELECT 'bar';
        SELECT 3, 4 UNION ALL SELECT 5, 6;
        SELECT FOO();
        SELECT 3.14;
    ";
    let mut conn = Conn::new(get_opts()).await?;
    let mut result = QUERY.run(&mut conn).await?;

    let stream = result.stream::<u8>().await?.unwrap();
    assert_eq!(vec![1], stream.try_collect::<Vec<_>>().await?);

    let _ = result.stream::<Row>().await?.unwrap();

    let stream = result.stream::<(u8, u8)>().await?.unwrap();
    assert_eq!(vec![(3, 4), (5, 6)], stream.try_collect::<Vec<_>>().await?);

    let _ = result.stream::<Row>().await.unwrap_err();
    assert!(result.stream::<Row>().await?.is_none());

    let mut conn = Conn::new(get_opts()).await?;
    let mut result = QUERY.run(&mut conn).await?;

    assert_eq!(vec![1], result.collect::<u8>().await?);

    assert_eq!("foo", from_row::<String>(result.next().await?.unwrap()));
    let _ = result.stream::<Row>().await?.unwrap();

    assert_eq!((3_u8, 4_u8), from_row(result.next().await?.unwrap()));
    assert_eq!(vec![(5, 6)], result.collect::<(u8, u8)>().await?);

    result.collect::<Row>().await.unwrap_err();
    assert!(result.stream::<Row>().await?.is_none());

    conn.disconnect().await?;
    Ok(())
}

#[tokio::test]
async fn should_stream_binary_result_sets() -> crate::Result<()> {
    let mut conn = Conn::new(get_opts()).await?;
    let mut result = "SELECT ?".with((1_u8,)).run(&mut conn).await?;
    assert_eq!(vec![1], result.collect::<u8>().await?);
    assert!(result.stream::<Row>().await?.is_none());

    let mut result = "SELECT ?".with((1_u8,)).run(&mut conn).await?;
    assert_eq!(
        vec![1],
        result
            .stream::<u8>()
            .await?
            .unwrap()
            .try_collect::<Vec<_>>()
            .await?,
    );
    assert_eq!(result.collect::<u8>().await?, Vec::<u8>::new());

    conn.disconnect().await?;
    Ok(())
}

#[tokio::test]
async fn dropped_query_result_should_emit_errors_on_cleanup() -> super::Result<()> {
    use crate::{Error::Server, ServerError};
    let mut conn = Conn::new(get_opts()).await?;
    conn.query_iter("SELECT '1'; BLABLA;").await?;
    assert!(matches!(
        conn.query_drop("DO 42;").await.unwrap_err(),
        Server(ServerError { code: 1064, .. })
    ));
    conn.disconnect().await?;
    Ok(())
}

#[tokio::test]
async fn should_try_collect() -> super::Result<()> {
    let mut conn = Conn::new(get_opts()).await?;
    let mut result = conn
        .query_iter(
            r"SELECT 'hello', 123
                UNION ALL
                SELECT 'world', 'bar'
                UNION ALL
                SELECT 'hello', 123
            ",
        )
        .await?;
    let mut rows = result.try_collect::<(String, u8)>().await?;
    assert!(rows.pop().unwrap().is_ok());
    assert!(rows.pop().unwrap().is_err());
    assert!(rows.pop().unwrap().is_ok());
    result.drop_result().await?;
    conn.disconnect().await?;
    Ok(())
}

#[tokio::test]
async fn should_try_collect_and_drop() -> super::Result<()> {
    let mut conn = Conn::new(get_opts()).await?;
    let mut rows = conn
        .query_iter(
            r"SELECT 'hello', 123
                UNION ALL
                SELECT 'world', 'bar'
                UNION ALL
                SELECT 'hello', 123;
                SELECT 'foo', 255;
            ",
        )
        .await?
        .try_collect_and_drop::<(String, u8)>()
        .await?;
    assert!(rows.pop().unwrap().is_ok());
    assert!(rows.pop().unwrap().is_err());
    assert!(rows.pop().unwrap().is_ok());
    conn.disconnect().await?;
    Ok(())
}

#[tokio::test]
async fn should_handle_mutliresult_set() -> super::Result<()> {
    let mut conn = Conn::new(get_opts()).await?;
    let mut result = conn
        .query_iter(
            r"SELECT 'hello', 123
                UNION ALL
                SELECT 'world', 231;
                SELECT 'foo', 255;
            ",
        )
        .await?;
    let rows_1 = result.collect::<(String, u8)>().await?;
    let rows_2 = result.collect_and_drop().await?;
    conn.disconnect().await?;

    assert_eq!((String::from("hello"), 123), rows_1[0]);
    assert_eq!((String::from("world"), 231), rows_1[1]);
    assert_eq!((String::from("foo"), 255), rows_2[0]);
    Ok(())
}

#[tokio::test]
async fn should_map_resultset() -> super::Result<()> {
    let mut conn = Conn::new(get_opts()).await?;
    let mut result = conn
        .query_iter(
            r"
                SELECT 'hello', 123
                UNION ALL
                SELECT 'world', 231;
                SELECT 'foo', 255;
            ",
        )
        .await?;

    let rows_1 = result.map(from_row::<(String, u8)>).await?;
    let rows_2 = result.map_and_drop(from_row).await?;
    conn.disconnect().await?;

    assert_eq!((String::from("hello"), 123), rows_1[0]);
    assert_eq!((String::from("world"), 231), rows_1[1]);
    assert_eq!((String::from("foo"), 255), rows_2[0]);
    Ok(())
}

#[tokio::test]
async fn should_reduce_resultset() -> super::Result<()> {
    let mut conn = Conn::new(get_opts()).await?;
    let mut result = conn
        .query_iter(
            r"SELECT 5
                UNION ALL
                SELECT 6;
                SELECT 7;",
        )
        .await?;
    let reduced = result
        .reduce(0, |mut acc, row| {
            acc += from_row::<i32>(row);
            acc
        })
        .await?;
    let rows_2 = result.collect_and_drop::<i32>().await?;
    conn.disconnect().await?;
    assert_eq!(11, reduced);
    assert_eq!(7, rows_2[0]);
    Ok(())
}

#[tokio::test]
async fn should_handle_multi_result_sets_where_some_results_have_no_output() -> super::Result<()> {
    const QUERY: &str = r"SELECT 1;
        UPDATE time_zone SET Time_zone_id = 1 WHERE Time_zone_id = 1;
        SELECT 2;
        SELECT 3;
        UPDATE time_zone SET Time_zone_id = 1 WHERE Time_zone_id = 1;
        UPDATE time_zone SET Time_zone_id = 1 WHERE Time_zone_id = 1;
        SELECT 4;";

    let mut c = Conn::new(get_opts()).await?;
    c.query_drop("CREATE TEMPORARY TABLE time_zone (Time_zone_id INT)")
        .await
        .unwrap();
    let mut t = c.start_transaction(TxOpts::new()).await?;
    t.query_drop(QUERY).await?;
    let r = t.query_iter(QUERY).await?;
    let out = r.collect_and_drop::<u8>().await?;
    assert_eq!(vec![1], out);
    let r = t.query_iter(QUERY).await?;
    r.for_each_and_drop(|x| assert_eq!(from_row::<u8>(x), 1))
        .await?;
    let r = t.query_iter(QUERY).await?;
    let out = r.map_and_drop(from_row::<u8>).await?;
    assert_eq!(vec![1], out);
    let r = t.query_iter(QUERY).await?;
    let out = r
        .reduce_and_drop(0u8, |acc, x| acc + from_row::<u8>(x))
        .await?;
    assert_eq!(1, out);
    t.query_drop(QUERY).await?;
    t.commit().await?;
    let result = c.exec_first("SELECT 1", ()).await?;
    c.disconnect().await?;
    assert_eq!(result, Some(1_u8));
    Ok(())
}

#[tokio::test]
async fn should_iterate_over_resultset() -> super::Result<()> {
    use std::sync::{
        atomic::{AtomicUsize, Ordering},
        Arc,
    };

    let acc = Arc::new(AtomicUsize::new(0));

    let mut conn = Conn::new(get_opts()).await?;
    let mut result = conn
        .query_iter(
            r"SELECT 2
                UNION ALL
                SELECT 3;
                SELECT 5;",
        )
        .await?;
    result
        .for_each({
            let acc = acc.clone();
            move |row| {
                acc.fetch_add(from_row::<usize>(row), Ordering::SeqCst);
            }
        })
        .await?;
    result
        .for_each_and_drop({
            let acc = acc.clone();
            move |row| {
                acc.fetch_add(from_row::<usize>(row), Ordering::SeqCst);
            }
        })
        .await?;
    conn.disconnect().await?;
    assert_eq!(acc.load(Ordering::SeqCst), 10);
    Ok(())
}