ff-engine 0.15.0

FlowFabric cross-partition dispatch and background scanners
Documentation
//! Pending waitpoint expiry scanner.
//!
//! Iterates `ff:idx:{p:N}:pending_waitpoint_expiry` for each partition,
//! finding pending waitpoints whose `expires_at` score is <= now. For each,
//! calls `FCALL ff_close_waitpoint` to close the expired pending waitpoint.
//!
//! Reference: RFC-004 §Pending waitpoint expiry scanner, RFC-010 §6.3

use std::sync::Arc;
use std::time::Duration;

use ff_core::backend::ScannerFilter;
use ff_core::engine_backend::EngineBackend;
use ff_core::keys::IndexKeys;
use ff_core::partition::{Partition, PartitionFamily};
use ff_core::types::{ExecutionId, TimestampMs};

use super::{should_skip_candidate, FailureTracker, ScanResult, Scanner};

const BATCH_SIZE: u32 = 100;

pub struct PendingWaitpointExpiryScanner {
    interval: Duration,
    failures: FailureTracker,
    filter: ScannerFilter,
    /// PR-7b Cluster 1: trait-dispatch target.
    backend: Option<Arc<dyn EngineBackend>>,
}

impl PendingWaitpointExpiryScanner {
    pub fn new(interval: Duration) -> Self {
        Self::with_filter(interval, ScannerFilter::default())
    }

    /// Construct with a [`ScannerFilter`] applied per candidate
    /// (issue #122). The filter is resolved to the waitpoint's
    /// owning execution via the waitpoint hash's `execution_id`
    /// field, then applied via [`should_skip_candidate`].
    pub fn with_filter(interval: Duration, filter: ScannerFilter) -> Self {
        Self {
            interval,
            failures: FailureTracker::new(),
            filter,
            backend: None,
        }
    }

    /// PR-7b Cluster 1: wire an `EngineBackend` for trait-routed FCALLs.
    pub fn with_filter_and_backend(
        interval: Duration,
        filter: ScannerFilter,
        backend: Arc<dyn EngineBackend>,
    ) -> Self {
        Self {
            interval,
            failures: FailureTracker::new(),
            filter,
            backend: Some(backend),
        }
    }
}

impl Scanner for PendingWaitpointExpiryScanner {
    fn name(&self) -> &'static str {
        "pending_wp_expiry"
    }

    fn interval(&self) -> Duration {
        self.interval
    }

    fn filter(&self) -> &ScannerFilter {
        &self.filter
    }

    async fn scan_partition(
        &self,
        client: &ferriskey::Client,
        partition: u16,
    ) -> ScanResult {
        let p = Partition {
            family: PartitionFamily::Execution,
            index: partition,
        };
        let idx = IndexKeys::new(&p);
        let expiry_key = idx.pending_waitpoint_expiry();
        let tag = p.hash_tag();

        let now_ms_res: Result<u64, String> = if let Some(ref b) = self.backend {
            b.server_time_ms().await.map_err(|e| e.to_string())
        } else {
            crate::scanner::lease_expiry::server_time_ms_legacy(client).await.map_err(|e| e.to_string())
        };
        let now_ms = match now_ms_res {
            Ok(t) => t,
            Err(e) => {
                tracing::warn!(partition, error = %e, "pending_wp_expiry: failed to get server time");
                return ScanResult { processed: 0, errors: 1 };
            }
        };

        // ZRANGEBYSCORE pending_waitpoint_expiry -inf now LIMIT 0 batch_size
        let expired: Vec<String> = match client
            .cmd("ZRANGEBYSCORE")
            .arg(&expiry_key)
            .arg("-inf")
            .arg(now_ms.to_string().as_str())
            .arg("LIMIT")
            .arg("0")
            .arg(BATCH_SIZE.to_string().as_str())
            .execute()
            .await
        {
            Ok(ids) => ids,
            Err(e) => {
                tracing::warn!(partition, error = %e, "pending_wp_expiry: ZRANGEBYSCORE failed");
                return ScanResult { processed: 0, errors: 1 };
            }
        };

        if partition == 0 {
            self.failures.advance_cycle();
        }

        if expired.is_empty() {
            return ScanResult { processed: 0, errors: 0 };
        }

        let mut processed: u32 = 0;
        let mut errors: u32 = 0;

        for wp_id_str in &expired {
            if self.failures.should_skip(wp_id_str) {
                continue;
            }
            // Issue #122: this scanner iterates waitpoint IDs, not
            // exec IDs. To apply the filter we resolve the owning
            // execution via the waitpoint hash's `execution_id`
            // field (one HGET), then call the shared helper. When
            // the filter is a no-op this entire block is skipped.
            if !self.filter.is_noop() {
                let waitpoint_hash = format!("ff:wp:{}:{}", tag, wp_id_str);
                let eid: Option<String> = match client
                    .cmd("HGET")
                    .arg(&waitpoint_hash)
                    .arg("execution_id")
                    .execute()
                    .await
                {
                    Ok(v) => v,
                    Err(_) => {
                        // Conservative skip on transport error.
                        continue;
                    }
                };
                let Some(eid) = eid else { continue };
                if should_skip_candidate(self.backend.as_ref(), &self.filter, partition, &eid).await {
                    continue;
                }
            }

            let res = if let Some(ref backend) = self.backend {
                // Resolve owning execution_id from the waitpoint hash
                // — the trait takes both (exec_id, wp_id) so the
                // Valkey impl doesn't re-read it. If the hash is
                // missing/resolve fails we still attempt the call
                // with an empty exec_id (the Lua cleanup path no-ops
                // on missing exec_core).
                let eid_str: String = client
                    .cmd("HGET")
                    .arg(format!("ff:wp:{}:{}", tag, wp_id_str).as_str())
                    .arg("execution_id")
                    .execute::<Option<String>>()
                    .await
                    .ok()
                    .flatten()
                    .unwrap_or_default();
                let Ok(eid) = ExecutionId::parse(&eid_str) else { tracing::warn!(execution_id=%eid_str, "malformed eid; skipping"); continue; };
                backend
                    .close_waitpoint(p, &eid, wp_id_str, TimestampMs(now_ms as i64))
                    .await
                    .map_err(|e| e.to_string())
            } else {
                close_expired_waitpoint(client, &tag, &idx, wp_id_str)
                    .await
                    .map_err(|e| e.to_string())
            };
            match res {
                Ok(()) => {
                    self.failures.record_success(wp_id_str);
                    processed += 1;
                }
                Err(e) => {
                    tracing::warn!(
                        partition,
                        waitpoint_id = wp_id_str.as_str(),
                        error = %e,
                        "pending_wp_expiry: close_waitpoint failed"
                    );
                    self.failures.record_failure(wp_id_str, "pending_wp_expiry");
                    errors += 1;
                }
            }
        }

        ScanResult { processed, errors }
    }
}

/// Call ff_close_waitpoint for one expired pending waitpoint.
///
/// KEYS (3): exec_core, waitpoint_hash, pending_wp_expiry_zset
/// ARGV (2): waitpoint_id, reason
///
/// NOTE: exec_core is required by the function signature but the waitpoint
/// record contains the execution_id. We read it from the waitpoint to
/// construct the exec_core key. If the waitpoint doesn't exist, the Lua
/// function handles it gracefully.
async fn close_expired_waitpoint(
    client: &ferriskey::Client,
    tag: &str,
    idx: &IndexKeys,
    wp_id_str: &str,
) -> Result<(), ferriskey::Error> {
    let waitpoint_hash = format!("ff:wp:{}:{}", tag, wp_id_str);

    // Read execution_id from waitpoint to construct exec_core key
    let eid: Option<String> = client
        .cmd("HGET")
        .arg(&waitpoint_hash)
        .arg("execution_id")
        .execute()
        .await?;

    let eid = eid.unwrap_or_default();
    let exec_core = format!("ff:exec:{}:{}:core", tag, eid);
    let pending_wp_expiry = idx.pending_waitpoint_expiry();

    let keys: [&str; 3] = [
        &exec_core,
        &waitpoint_hash,
        &pending_wp_expiry,
    ];

    let argv: [&str; 2] = [wp_id_str, "never_committed"];

    let _: ferriskey::Value = client
        .fcall("ff_close_waitpoint", &keys, &argv)
        .await?;

    Ok(())
}