ff-server 0.11.0

FlowFabric server library and HTTP binary
Documentation
//! RFC-017 §14.8 mandatory Stage B CI test: `shutdown_prepare` under
//! active stream-op load.
//!
//! Asserts the four invariants §14.8.d enumerates:
//!
//! (a) All 16 concurrent stream-op sessions terminate cleanly within
//!     `grace` (5s) once `shutdown_prepare` is called.
//! (b) No panics from dropped-permit accounting on the stream
//!     semaphore.
//! (c) New stream-op requests arriving *after* `Semaphore::close()`
//!     surface as `EngineError::Unavailable` — not `Transport` or
//!     a panic. (The `EngineError::Unavailable → HTTP 503` REST leg
//!     is covered by `parity_stage_b.rs`'s status-mapping asserts
//!     inside `ff-server::api`'s `IntoResponse for ApiError`.)
//! (d) `ff_shutdown_timeout_total` stays at 0 for the happy path;
//!     increments (via `Metrics::inc_shutdown_timeout`) when `grace`
//!     is deliberately set below the drain time.
//!
//! The test stands up an `Arc<ValkeyBackend>` directly (no Server,
//! no Valkey) by using `ValkeyBackend::from_client_and_partitions`
//! together with a *scripted* `read_stream` path — but the stream-op
//! permits are managed by the backend regardless of whether the
//! underlying FCALL dispatches. This keeps the invariant CI-friendly
//! (no Docker, no Valkey container) while exercising the real
//! semaphore + shutdown_prepare body.
//!
//! Both tests are `#[ignore]`-gated — they dial `127.0.0.1:6379` to
//! construct the `ValkeyBackend` (`ferriskey::Client` has no in-
//! process test double in the current SDK). Run them with:
//!     docker run -d --name valkey-stageb -p 6379:6379 valkey/valkey:8-alpine
//!     cargo test -p ff-server --test shutdown_prepare_under_load -- --ignored --nocapture
//!     docker rm -f valkey-stageb
//!
//! The tests exercise the semaphore + `shutdown_prepare` drain
//! machinery only — no FCALLs dispatch against the dialled client.

use std::sync::Arc;
use std::time::Duration;

use ff_backend_valkey::ValkeyBackend;
use ff_core::engine_backend::EngineBackend;
use ff_core::engine_error::EngineError;
use ff_core::partition::PartitionConfig;

/// Build a `ValkeyBackend` whose `client` is never actually used for
/// FCALLs during this test — we only exercise the semaphore +
/// shutdown_prepare invariants. The client needs to exist so the
/// struct is constructible; `duplicate_connection` is never invoked.
async fn construct_backend_with_permits(permits: u32) -> Arc<ValkeyBackend> {
    // Dial directly via ferriskey's ClientBuilder so we keep a
    // concrete `Arc<ValkeyBackend>` (not `Arc<dyn EngineBackend>`)
    // and can call `with_stream_semaphore_permits`. The dialled
    // client is never used for FCALLs — this test only exercises
    // the semaphore path.
    let dial = tokio::time::timeout(
        Duration::from_millis(500),
        ferriskey::ClientBuilder::new()
            .host("127.0.0.1", 6379)
            .connect_timeout(Duration::from_millis(400))
            .request_timeout(Duration::from_millis(400))
            .build(),
    )
    .await;
    let client = match dial {
        Ok(Ok(c)) => c,
        Ok(Err(e)) => panic!("dial 127.0.0.1:6379 failed: {e}"),
        Err(_) => panic!("dial 127.0.0.1:6379 timed out"),
    };
    let mut backend =
        ValkeyBackend::from_client_and_partitions(client, PartitionConfig::default());
    assert!(
        backend.with_stream_semaphore_permits(permits),
        "size stream-op pool on fresh Arc"
    );
    backend
}

/// Invariant (a), (b), (d-happy): the concurrent-tail scenario drains
/// cleanly inside `grace`, no panics, and `ff_shutdown_timeout_total`
/// stays at 0.
///
/// `#[ignore]`-gated because it dials a live Valkey (`127.0.0.1:6379`)
/// to construct the `ValkeyBackend`; the backend's client is not
/// actually invoked during the test (only the semaphore path is
/// exercised), but the struct cannot be built without a live dial
/// through the public API.
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[ignore]
async fn shutdown_prepare_drains_in_flight_tails_within_grace() {
    const PERMITS: u32 = 16;
    let backend = construct_backend_with_permits(PERMITS).await;

    // Hold all 16 permits via the public `try_acquire_owned` path
    // (mirrors what `read_stream` / `tail_stream` do internally).
    let sem = backend.stream_semaphore_clone_for_tests();
    let mut held = Vec::with_capacity(PERMITS as usize);
    for _ in 0..PERMITS {
        held.push(sem.clone().try_acquire_owned().expect("acquire"));
    }
    assert_eq!(backend.stream_semaphore_available(), 0);

    // Background: release each permit after a staggered delay so the
    // drain loop sees them disappear one at a time (max 400ms).
    let handle = tokio::spawn(async move {
        let mut held = held;
        for i in 0..(PERMITS as u64) {
            tokio::time::sleep(Duration::from_millis(25 + i * 10)).await;
            held.pop();
        }
    });

    // Call `shutdown_prepare(5s)`. Must return Ok within the 5s grace
    // once `held` is fully drained (the background task finishes by
    // ~550ms, well inside).
    let start = std::time::Instant::now();
    let res = backend
        .shutdown_prepare(Duration::from_secs(5))
        .await;
    let elapsed = start.elapsed();
    assert!(res.is_ok(), "happy-path shutdown_prepare Ok: {res:?}");
    // RFC §14.8: all 16 sessions terminate cleanly within the 5s
    // grace. Staggered-release window (~550ms of tail releases +
    // drain-loop poll slack) stays well under the budget.
    assert!(
        elapsed < Duration::from_secs(5),
        "drain completed inside 5s grace; elapsed={elapsed:?}"
    );

    // (c) new stream-op acquires post-close surface as
    //     `EngineError::Unavailable { op: "stream_ops" }`.
    let post_close = backend
        .read_stream(
            &ff_core::types::ExecutionId::solo(
                &ff_core::types::LaneId::new("post_close"),
                &PartitionConfig::default(),
            ),
            ff_core::types::AttemptIndex::new(0),
            ff_core::contracts::StreamCursor::Start,
            ff_core::contracts::StreamCursor::End,
            10,
        )
        .await;
    assert!(
        matches!(
            &post_close,
            Err(EngineError::Unavailable { op: "stream_ops" })
        ),
        "post-close acquire → Unavailable, got {post_close:?}"
    );

    handle.await.expect("background releaser joined");
}

/// Invariant (d-timeout): if `grace` is shorter than the drain time,
/// `shutdown_prepare` returns `EngineError::Timeout` — the server
/// maps that to `Metrics::inc_shutdown_timeout`.
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[ignore]
async fn shutdown_prepare_times_out_when_grace_too_short() {
    const PERMITS: u32 = 4;
    let backend = construct_backend_with_permits(PERMITS).await;

    // Hold all permits with no release task — drain can never finish.
    let sem = backend.stream_semaphore_clone_for_tests();
    let mut held = Vec::with_capacity(PERMITS as usize);
    for _ in 0..PERMITS {
        held.push(sem.clone().try_acquire_owned().expect("acquire"));
    }

    let res = backend
        .shutdown_prepare(Duration::from_millis(100))
        .await;
    assert!(
        matches!(&res, Err(EngineError::Timeout { op: "shutdown_prepare", .. })),
        "starved drain → Timeout, got {res:?}"
    );

    // Release manually so the permits drop cleanly before the test
    // exits (cleanliness; no assertion rides on this).
    drop(held);
}