fuel-core 0.15.3

Fuel client.
Documentation
use std::sync::{
    atomic,
    atomic::AtomicBool,
    Arc,
};

use super::*;
use fuel_core_interfaces::{
    common::tai64::Tai64,
    txpool::TransactionStatus,
};
use test_case::test_case;

struct Input<I>
where
    I: Iterator<Item = anyhow::Result<Option<TransactionStatus>>> + Send + 'static,
{
    requested_id: Bytes32,
    status_updates: Vec<Result<TxUpdate, BroadcastStreamRecvError>>,
    db_statuses: I,
}

#[derive(Debug, PartialEq, Eq)]
enum Expected {
    Received(Vec<TransactionStatus>),
    TimedOut(Vec<TransactionStatus>),
    Error,
}

fn txn_id(i: u8) -> Bytes32 {
    [i; 32].into()
}

fn txn_updated(tx_id: Bytes32) -> TxUpdate {
    TxUpdate::updated(tx_id)
}

fn txn_squeezed(tx_id: Bytes32) -> TxUpdate {
    TxUpdate::squeezed_out(tx_id, fuel_txpool::Error::SqueezedOut(String::new()))
}

fn db_always_some(
    mut f: impl FnMut() -> TransactionStatus,
) -> impl Iterator<Item = anyhow::Result<Option<TransactionStatus>>> {
    std::iter::repeat_with(move || Ok(Some(f())))
}

fn db_always_none() -> impl Iterator<Item = anyhow::Result<Option<TransactionStatus>>> {
    std::iter::repeat_with(|| Ok(None))
}

fn db_always_error(
    mut f: impl FnMut() -> anyhow::Error,
) -> impl Iterator<Item = anyhow::Result<Option<TransactionStatus>>> {
    std::iter::repeat_with(move || Err(f()))
}

fn db_some(
    f: Vec<TransactionStatus>,
) -> impl Iterator<Item = anyhow::Result<Option<TransactionStatus>>> {
    f.into_iter().map(|t| Ok(Some(t)))
}

fn db_none(n: usize) -> impl Iterator<Item = anyhow::Result<Option<TransactionStatus>>> {
    (0..n).map(|_| Ok(None))
}

fn db_error(
    f: Vec<anyhow::Error>,
) -> impl Iterator<Item = anyhow::Result<Option<TransactionStatus>>> {
    f.into_iter().map(Err)
}

fn submitted() -> TransactionStatus {
    TransactionStatus::Submitted { time: Tai64(0) }
}

fn success() -> TransactionStatus {
    TransactionStatus::Success {
        block_id: Default::default(),
        time: Tai64(0),
        result: None,
    }
}

fn failed() -> TransactionStatus {
    TransactionStatus::Failed {
        block_id: Default::default(),
        time: Tai64(0),
        result: None,
        reason: Default::default(),
    }
}

fn squeezed() -> TransactionStatus {
    TransactionStatus::SqueezedOut {
        reason: fuel_txpool::Error::SqueezedOut(String::new()).to_string(),
    }
}

#[test_case(
    Input {
        requested_id: txn_id(2),
        status_updates: vec![],
        db_statuses: db_always_none(),
    }
    => Expected::TimedOut(vec![])
    ; "no status update, no db status, times out"
)]
#[test_case(
    Input {
        requested_id: txn_id(2),
        status_updates: vec![Ok(txn_updated(txn_id(3)))],
        db_statuses: db_always_none(),
    }
    => Expected::TimedOut(vec![])
    ; "unrelated status update, no db status, times out"
)]
#[test_case(
    Input {
        requested_id: txn_id(2),
        status_updates: vec![Ok(txn_updated(txn_id(2)))],
        db_statuses: db_always_none(),
    }
    => Expected::Received(vec![])
    ; "status update, no db status, receives no status"
)]
#[test_case(
    Input {
        requested_id: txn_id(2),
        status_updates: vec![Ok(txn_updated(txn_id(2)))],
        db_statuses: db_none(1).chain(db_always_some(submitted)),
    }
    => Expected::TimedOut(vec![submitted()])
    ; "submitted update, submitted db status, times out with one submitted status"
)]
#[test_case(
    Input {
        requested_id: txn_id(2),
        status_updates: vec![Ok(txn_updated(txn_id(2)))],
        db_statuses: db_none(1).chain(db_always_error(|| anyhow::anyhow!("db failed"))),
    }
    => Expected::Error
    ; "status update, none then error db status, gets error"
)]
#[test_case(
    Input {
        requested_id: txn_id(2),
        status_updates: vec![],
        db_statuses: db_error(vec![anyhow::anyhow!("db failed")]),
    }
    => Expected::Error
    ; "no status update, error db status, gets error"
)]
#[test_case(
    Input {
        requested_id: txn_id(2),
        status_updates: vec![Err(BroadcastStreamRecvError::Lagged(1))],
        db_statuses: db_always_none(),
    }
    => Expected::TimedOut(vec![])
    ; "lagged status update, no db status, times out with no results"
)]
#[test_case(
    Input {
        requested_id: txn_id(2),
        status_updates: vec![Ok(txn_updated(txn_id(2))), Ok(txn_updated(txn_id(2)))],
        db_statuses: db_none(1).chain(db_some(vec![submitted()])).chain(db_always_some(success)),
    }
    => Expected::Received(vec![submitted(), success()])
    ; "updated then updated status, submitted then success db status, received submitted then success status"
)]
#[test_case(
    Input {
        requested_id: txn_id(2),
        status_updates: vec![Ok(txn_updated(txn_id(20))), Ok(txn_updated(txn_id(8)))],
        db_statuses: db_none(1),
    }
    => Expected::TimedOut(vec![])
    ; "unrelated status updates, no db status, times out with no status"
)]
#[test_case(
    Input {
        requested_id: txn_id(2),
        status_updates: vec![Ok(txn_updated(txn_id(2))), Ok(txn_updated(txn_id(2)))],
        db_statuses: db_none(1).chain(db_some(vec![submitted()])).chain(db_always_some(failed)),
    }
    => Expected::Received(vec![submitted(), failed()])
    ; "updated then updated status, submitted then failed db status, received submitted then failed status"
)]
#[test_case(
    Input {
        requested_id: txn_id(2),
        status_updates: vec![],
        db_statuses: db_some(vec![success()]),
    }
    => Expected::Received(vec![success()])
    ; "no status update, success db status, received success status"
)]
#[test_case(
    Input {
        requested_id: txn_id(2),
        status_updates: vec![],
        db_statuses: db_some(vec![failed()]),
    }
    => Expected::Received(vec![failed()])
    ; "no status update, failed db status, received failed status"
)]
#[test_case(
    Input {
        requested_id: txn_id(2),
        status_updates: vec![Ok(txn_squeezed(txn_id(2)))],
        db_statuses: db_none(1),
    }
    => Expected::Received(vec![squeezed()])
    ; "squeezed status updates, no db status, received squeezed status"
)]
#[test_case(
    Input {
        requested_id: txn_id(2),
        status_updates: vec![Ok(txn_updated(txn_id(2))), Ok(txn_squeezed(txn_id(2)))],
        db_statuses: db_none(1).chain(db_some(vec![submitted()])).chain(db_none(1)),
    }
    => Expected::Received(vec![submitted(), squeezed()])
    ; "updated then squeezed status, db is set to submitted then nothing, received submitted squeezed"
)]
#[tokio::test]
async fn create_tx_status_change_stream<I>(input: Input<I>) -> Expected
where
    I: Iterator<Item = anyhow::Result<Option<TransactionStatus>>> + Send + 'static,
{
    let Input {
        requested_id: transaction_id,
        status_updates,
        mut db_statuses,
    } = input;
    let mut state = MockTxnStatusChangeState::new();
    state
        .expect_get_tx_status()
        .returning(move |_| db_statuses.next().unwrap().map(|t| t.map(|t| t.into())));
    let state = Box::new(state);
    let ids = status_updates.to_vec();
    let reached_end = Arc::new(AtomicBool::new(false));
    let re = reached_end.clone();
    let stream = futures::stream::iter(ids)
        .chain(futures::stream::once(async move {
            re.store(true, atomic::Ordering::SeqCst);
            Ok(txn_updated(txn_id(255)))
        }))
        .boxed();

    let stream = transaction_status_change(state, stream, transaction_id).await;
    let r: Result<Vec<_>, _> = stream.try_collect().await;
    let timeout = reached_end.load(atomic::Ordering::SeqCst);
    match r {
        Ok(r) => {
            let r = r.into_iter().map(|t| t.into()).collect();
            if timeout {
                Expected::TimedOut(r)
            } else {
                Expected::Received(r)
            }
        }
        Err(_) => Expected::Error,
    }
}