fuel-core-txpool 0.20.6

Transaction pool
Documentation
use super::*;
use crate::service::test_helpers::{
    TestContext,
    TestContextBuilder,
};
use fuel_core_services::Service as ServiceTrait;
use fuel_core_types::fuel_tx::UniqueIdentifier;
use std::time::Duration;

#[tokio::test]
async fn test_start_stop() {
    let ctx = TestContext::new().await;

    let service = ctx.service();

    // Double start will return false.
    assert!(service.start().is_err(), "double start should fail");

    let state = service.stop_and_await().await.unwrap();
    assert!(state.stopped());
}

#[tokio::test]
async fn test_find() {
    let ctx = TestContext::new().await;

    let tx1 = Arc::new(ctx.setup_script_tx(10));
    let tx2 = Arc::new(ctx.setup_script_tx(20));
    let tx3 = Arc::new(ctx.setup_script_tx(30));

    let service = ctx.service();

    let out = service.shared.insert(vec![tx1.clone(), tx2.clone()]).await;

    assert_eq!(out.len(), 2, "Should be len 2:{out:?}");
    assert!(out[0].is_ok(), "Tx1 should be OK, got err:{out:?}");
    assert!(out[1].is_ok(), "Tx2 should be OK, got err:{out:?}");
    let out = service.shared.find(vec![
        tx1.id(&ConsensusParameters::DEFAULT.chain_id),
        tx3.id(&ConsensusParameters::DEFAULT.chain_id),
    ]);
    assert_eq!(out.len(), 2, "Should be len 2:{out:?}");
    assert!(out[0].is_some(), "Tx1 should be some:{out:?}");
    let id = out[0].as_ref().unwrap().id();
    assert_eq!(
        id,
        tx1.id(&ConsensusParameters::DEFAULT.chain_id),
        "Found tx id match{out:?}"
    );
    assert!(out[1].is_none(), "Tx3 should not be found:{out:?}");
    service.stop_and_await().await.unwrap();
}

#[tokio::test(start_paused = true)]
async fn test_prune_transactions() {
    const TIMEOUT: u64 = 10;

    let config = Config {
        transaction_ttl: Duration::from_secs(TIMEOUT),
        ..Default::default()
    };
    let ctx = TestContextBuilder::new()
        .with_config(config)
        .build_and_start()
        .await;

    let tx1 = Arc::new(ctx.setup_script_tx(10));
    let tx2 = Arc::new(ctx.setup_script_tx(20));
    let tx3 = Arc::new(ctx.setup_script_tx(30));

    let service = ctx.service();

    let out = service
        .shared
        .insert(vec![tx1.clone(), tx2.clone(), tx3.clone()])
        .await;

    // Check that we have all transactions after insertion.
    assert_eq!(out.len(), 3, "Should be len 3:{out:?}");
    assert!(out[0].is_ok(), "Tx1 should be OK, got err:{out:?}");
    assert!(out[1].is_ok(), "Tx2 should be OK, got err:{out:?}");
    assert!(out[2].is_ok(), "Tx3 should be OK, got err:{out:?}");

    tokio::time::sleep(Duration::from_secs(TIMEOUT)).await;
    let out = service.shared.find(vec![
        tx1.id(&ConsensusParameters::DEFAULT.chain_id),
        tx2.id(&ConsensusParameters::DEFAULT.chain_id),
        tx3.id(&ConsensusParameters::DEFAULT.chain_id),
    ]);
    assert_eq!(out.len(), 3, "Should be len 3:{out:?}");
    assert!(out[0].is_some(), "Tx1 should exist");
    assert!(out[1].is_some(), "Tx2 should exist");
    assert!(out[2].is_some(), "Tx3 should exist");

    tokio::time::sleep(Duration::from_secs(TIMEOUT)).await;
    let out = service.shared.find(vec![
        tx1.id(&ConsensusParameters::DEFAULT.chain_id),
        tx2.id(&ConsensusParameters::DEFAULT.chain_id),
        tx3.id(&ConsensusParameters::DEFAULT.chain_id),
    ]);
    assert_eq!(out.len(), 3, "Should be len 3:{out:?}");
    assert!(out[0].is_none(), "Tx1 should be pruned");
    assert!(out[1].is_none(), "Tx2 should be pruned");
    assert!(out[2].is_none(), "Tx3 should be pruned");

    service.stop_and_await().await.unwrap();
}

#[tokio::test]
async fn test_prune_transactions_the_oldest() {
    const TIMEOUT: u64 = 5;

    let config = Config {
        transaction_ttl: Duration::from_secs(TIMEOUT),
        ..Default::default()
    };
    let ctx = TestContextBuilder::new()
        .with_config(config)
        .build_and_start()
        .await;

    let tx1 = Arc::new(ctx.setup_script_tx(10));
    let tx2 = Arc::new(ctx.setup_script_tx(20));
    let tx3 = Arc::new(ctx.setup_script_tx(30));
    let tx4 = Arc::new(ctx.setup_script_tx(40));

    let service = ctx.service();

    // insert tx1 at time `0`
    let out = service.shared.insert(vec![tx1.clone()]).await;
    assert!(out[0].is_ok(), "Tx1 should be OK, got err:{out:?}");

    // sleep for `4` seconds
    tokio::time::sleep(Duration::from_secs(4)).await;
    // insert tx2 at time `4`
    let out = service.shared.insert(vec![tx2.clone()]).await;
    assert!(out[0].is_ok(), "Tx2 should be OK, got err:{out:?}");

    // check that tx1 and tx2 are still there at time `4`
    let out = service.shared.find(vec![
        tx1.id(&ConsensusParameters::DEFAULT.chain_id),
        tx2.id(&ConsensusParameters::DEFAULT.chain_id),
    ]);
    assert!(out[0].is_some(), "Tx1 should exist");
    assert!(out[1].is_some(), "Tx2 should exist");

    // sleep for another `4` seconds
    tokio::time::sleep(Duration::from_secs(4)).await;
    // insert tx3 at time `8`
    let out = service.shared.insert(vec![tx3.clone()]).await;
    assert!(out[0].is_ok(), "Tx3 should be OK, got err:{out:?}");

    // sleep for `3` seconds
    tokio::time::sleep(Duration::from_secs(3)).await;

    // insert tx4 at time `11`
    let out = service.shared.insert(vec![tx4.clone()]).await;
    assert!(out[0].is_ok(), "Tx4 should be OK, got err:{out:?}");

    // time is now `11`, tx1 and tx2 should be pruned
    let out = service.shared.find(vec![
        tx1.id(&ConsensusParameters::DEFAULT.chain_id),
        tx2.id(&ConsensusParameters::DEFAULT.chain_id),
        tx3.id(&ConsensusParameters::DEFAULT.chain_id),
        tx4.id(&ConsensusParameters::DEFAULT.chain_id),
    ]);
    assert!(out[0].is_none(), "Tx1 should be pruned");
    assert!(out[1].is_none(), "Tx2 should be pruned");
    assert!(out[2].is_some(), "Tx3 should exist");
    assert!(out[3].is_some(), "Tx4 should exist");

    // sleep for `5` seconds
    tokio::time::sleep(Duration::from_secs(TIMEOUT)).await;

    // time is now `16`, tx3 should be pruned
    let out = service.shared.find(vec![
        tx1.id(&ConsensusParameters::DEFAULT.chain_id),
        tx2.id(&ConsensusParameters::DEFAULT.chain_id),
        tx3.id(&ConsensusParameters::DEFAULT.chain_id),
        tx4.id(&ConsensusParameters::DEFAULT.chain_id),
    ]);
    assert!(out[0].is_none(), "Tx1 should be pruned");
    assert!(out[1].is_none(), "Tx2 should be pruned");
    assert!(out[2].is_none(), "Tx3 should be pruned");
    assert!(out[3].is_some(), "Tx4 should exist");

    service.stop_and_await().await.unwrap();
}

#[tokio::test]
async fn simple_insert_removal_subscription() {
    let ctx = TestContextBuilder::new().build_and_start().await;

    let tx1 = Arc::new(ctx.setup_script_tx(10));
    let tx2 = Arc::new(ctx.setup_script_tx(20));
    let service = ctx.service();

    let mut new_tx_notification = service.shared.new_tx_notification_subscribe();
    let mut tx1_subscribe_updates = service
        .shared
        .tx_update_subscribe(tx1.cached_id().unwrap())
        .await;
    let mut tx2_subscribe_updates = service
        .shared
        .tx_update_subscribe(tx2.cached_id().unwrap())
        .await;

    let out = service.shared.insert(vec![tx1.clone(), tx2.clone()]).await;

    if out[0].is_ok() {
        assert_eq!(
            new_tx_notification.try_recv(),
            Ok(tx1.cached_id().unwrap()),
            "First added should be tx1"
        );
        let update = tx1_subscribe_updates.next().await.unwrap();
        assert!(
            matches!(
                update,
                TxStatusMessage::Status(TransactionStatus::Submitted { .. })
            ),
            "First message in tx1 stream should be Submitted"
        );
    } else {
        panic!("Tx1 should be OK, got err");
    }

    if out[1].is_ok() {
        assert_eq!(
            new_tx_notification.try_recv(),
            Ok(tx2.cached_id().unwrap()),
            "Second added should be tx2"
        );
        let update = tx2_subscribe_updates.next().await.unwrap();
        assert!(
            matches!(
                update,
                TxStatusMessage::Status(TransactionStatus::Submitted { .. })
            ),
            "First message in tx2 stream should be Submitted"
        );
    } else {
        panic!("Tx2 should be OK, got err");
    }

    // remove them
    service
        .shared
        .remove(vec![tx1.cached_id().unwrap(), tx2.cached_id().unwrap()]);

    let update = tx1_subscribe_updates.next().await.unwrap();
    assert_eq!(
        update,
        TxStatusMessage::Status(TransactionStatus::SqueezedOut {
            reason: "Transaction removed.".to_string()
        }),
        "Second message in tx1 stream should be squeezed out"
    );

    let update = tx2_subscribe_updates.next().await.unwrap();
    assert_eq!(
        update,
        TxStatusMessage::Status(TransactionStatus::SqueezedOut {
            reason: "Transaction removed.".to_string()
        }),
        "Second message in tx2 stream should be squeezed out"
    );

    service.stop_and_await().await.unwrap();
}