Skip to main content

ff_engine/scanner/
flow_projector.rs

1//! Flow summary projector scanner.
2//!
3//! Scans flow partitions ({fp:N}). For each flow: delegates the per-flow
4//! sample + summary-hash write to `EngineBackend::project_flow_summary`.
5//! The engine-side scanner body is thin by design — enumeration of flows
6//! is Valkey-specific (SSCAN on an index SET), but the projection itself
7//! lives behind the trait.
8//!
9//! Interval: 15s (catchup mode — event-driven in production).
10//!
11//! Cluster-safe: uses SSCAN on a partition-level index SET instead of SCAN.
12//!
13//! # Two sources of `public_flow_state`
14//!
15//! This scanner writes a DERIVED `public_flow_state` field to the flow
16//! summary projection. It does NOT touch `flow_core.public_flow_state`
17//! — that field is owned exclusively by `ff_create_flow` and
18//! `ff_cancel_flow` and is the authoritative state used for mutation
19//! guards (e.g. `ff_add_execution_to_flow` rejects adds when
20//! `flow_core.public_flow_state` is terminal).
21//!
22//! Consumer guidance:
23//! - Mutation-guard / authoritative state → read `flow_core.public_flow_state`.
24//! - Dashboards / projected rollups → read the summary projection.
25//!
26//! Reference: RFC-007 §Flow Summary Projection, RFC-010 §6.7
27
28use std::sync::Arc;
29use std::time::Duration;
30
31use ff_core::backend::ScannerFilter;
32use ff_core::engine_backend::EngineBackend;
33use ff_core::engine_error::EngineError;
34use ff_core::keys::FlowIndexKeys;
35use ff_core::partition::{Partition, PartitionConfig, PartitionFamily};
36use ff_core::types::{FlowId, TimestampMs};
37
38use super::{ScanResult, Scanner};
39
40pub struct FlowProjector {
41    interval: Duration,
42    partition_config: PartitionConfig,
43    /// Issue #122: accepted for uniform API; not applied. See
44    /// [`Self::with_filter`] rustdoc.
45    filter: ScannerFilter,
46    backend: Option<Arc<dyn EngineBackend>>,
47}
48
49impl FlowProjector {
50    pub fn new(interval: Duration, partition_config: PartitionConfig) -> Self {
51        Self::with_filter(interval, partition_config, ScannerFilter::default())
52    }
53
54    /// Accepts a [`ScannerFilter`] for uniform construction across
55    /// all scanners (issue #122) but **does not apply it**. This
56    /// scanner projects flow summaries by aggregating the public
57    /// states of many member executions per flow; per-member
58    /// filtering would add an HGET per member per flow (N×M), which
59    /// does not fit the "no extra HGET to filter" budget set in the
60    /// issue #122 design. The flow summary remains a cross-tenant
61    /// aggregate in shared-Valkey deployments — consumers that
62    /// need per-instance summaries should read exec_core directly
63    /// with the filter applied.
64    pub fn with_filter(
65        interval: Duration,
66        partition_config: PartitionConfig,
67        filter: ScannerFilter,
68    ) -> Self {
69        Self {
70            interval,
71            partition_config,
72            filter,
73            backend: None,
74        }
75    }
76
77    /// PR-7b Cluster 2b-B: wire an `EngineBackend` so the per-flow
78    /// projection runs through the trait (Valkey: lifted scanner
79    /// body; Postgres: migration 0019 `ff_flow_summary` UPSERT;
80    /// SQLite: `Unavailable` per RFC-023).
81    pub fn with_filter_and_backend(
82        interval: Duration,
83        partition_config: PartitionConfig,
84        filter: ScannerFilter,
85        backend: Arc<dyn EngineBackend>,
86    ) -> Self {
87        Self {
88            interval,
89            partition_config,
90            filter,
91            backend: Some(backend),
92        }
93    }
94}
95
96impl Scanner for FlowProjector {
97    fn name(&self) -> &'static str {
98        "flow_projector"
99    }
100
101    fn interval(&self) -> Duration {
102        self.interval
103    }
104
105    fn filter(&self) -> &ScannerFilter {
106        &self.filter
107    }
108
109    async fn scan_partition(
110        &self,
111        client: &ferriskey::Client,
112        partition: u16,
113    ) -> ScanResult {
114        let p = Partition {
115            family: PartitionFamily::Flow,
116            index: partition,
117        };
118        let fidx = FlowIndexKeys::new(&p);
119        let flow_index_key = fidx.flow_index();
120
121        let now_ms_res: Result<u64, String> = if let Some(ref b) = self.backend {
122            b.server_time_ms().await.map_err(|e| e.to_string())
123        } else {
124            crate::scanner::lease_expiry::server_time_ms_legacy(client).await.map_err(|e| e.to_string())
125        };
126        let now_ms = match now_ms_res {
127            Ok(t) => t,
128            Err(e) => {
129                tracing::warn!(partition, error = %e, "flow_projector: failed to get server time");
130                return ScanResult { processed: 0, errors: 1 };
131            }
132        };
133        let now_ts = TimestampMs::from_millis(now_ms as i64);
134
135        let mut processed: u32 = 0;
136        let mut errors: u32 = 0;
137        let mut cursor = "0".to_string();
138
139        // Stream flow_index in SSCAN batches (COUNT 100) instead of a
140        // single SMEMBERS. SMEMBERS on a partition with many flows would
141        // materialise every flow_id into one Vec<String> before we could
142        // project the first one; SSCAN bounds memory to one batch and
143        // keeps the Valkey command's server-side work bounded per call.
144        //
145        // Flow-id enumeration itself stays on the scanner client — it's
146        // a Valkey-shape concern (the `ff:idx:{fp:N}:flow_index` SET
147        // doesn't exist on Postgres, which resolves flow membership
148        // via the `ff_exec_core_flow_idx` index directly). PR-7b
149        // Cluster 2b-B scope: the per-flow projection goes through the
150        // trait; the iteration remains Valkey-shaped.
151        loop {
152            let result: ferriskey::Value = match client
153                .cmd("SSCAN")
154                .arg(&flow_index_key)
155                .arg(cursor.as_str())
156                .arg("COUNT")
157                .arg("100")
158                .execute()
159                .await
160            {
161                Ok(v) => v,
162                Err(e) => {
163                    tracing::warn!(partition, error = %e, "flow_projector: SSCAN failed");
164                    return ScanResult { processed, errors: errors + 1 };
165                }
166            };
167
168            let (next_cursor, flow_ids) = parse_sscan_response(&result);
169
170            for fid_str in &flow_ids {
171                let res = project_one_flow(
172                    client,
173                    self.backend.as_ref(),
174                    &p,
175                    &self.partition_config,
176                    fid_str,
177                    now_ts,
178                )
179                .await;
180                match res {
181                    Ok(true) => processed += 1,
182                    Ok(false) => {} // no members or already up-to-date
183                    Err(e) => {
184                        tracing::warn!(
185                            partition,
186                            flow_id = fid_str.as_str(),
187                            error = %e,
188                            "flow_projector: projection failed"
189                        );
190                        errors += 1;
191                    }
192                }
193            }
194
195            cursor = next_cursor;
196            if cursor == "0" {
197                break;
198            }
199        }
200
201        ScanResult { processed, errors }
202    }
203}
204
205/// Project one flow through the trait when a backend is wired;
206/// otherwise fall back to a direct-client path for test builds that
207/// instantiate the scanner without a backend (mirrors cluster-1
208/// lease_expiry / cluster-2 reconciler patterns). Returns the
209/// trait-shape `Result<bool>` unchanged.
210async fn project_one_flow(
211    client: &ferriskey::Client,
212    backend: Option<&Arc<dyn EngineBackend>>,
213    partition: &Partition,
214    partition_config: &PartitionConfig,
215    fid_str: &str,
216    now_ms: TimestampMs,
217) -> Result<bool, String> {
218    let flow_id = match FlowId::parse(fid_str) {
219        Ok(id) => id,
220        Err(e) => {
221            return Err(format!("malformed flow_id {fid_str:?}: {e}"));
222        }
223    };
224
225    if let Some(backend_arc) = backend {
226        return backend_arc
227            .project_flow_summary(*partition, &flow_id, now_ms)
228            .await
229            .map_err(|e: EngineError| e.to_string());
230    }
231
232    // Test-only fallback: direct composition on the scanner client.
233    // Mirrors the pre-PR-7b body so existing unit tests continue to
234    // pass when instantiated without an EngineBackend.
235    project_direct_fallback(client, partition, partition_config, fid_str, now_ms)
236        .await
237        .map_err(|e| e.to_string())
238}
239
240/// Direct-client projection used only when the scanner is
241/// constructed without an `EngineBackend`. Matches the
242/// pre-PR-7b behaviour closely enough for test fixtures.
243async fn project_direct_fallback(
244    client: &ferriskey::Client,
245    partition: &Partition,
246    config: &PartitionConfig,
247    fid_str: &str,
248    now_ms: TimestampMs,
249) -> Result<bool, ferriskey::Error> {
250    use std::collections::HashMap;
251
252    const BATCH_SIZE: usize = 50;
253    let tag = partition.hash_tag();
254    let fidx = FlowIndexKeys::new(partition);
255    let flow_index_key = fidx.flow_index();
256
257    let core_key = format!("ff:flow:{}:{}:core", tag, fid_str);
258    let members_key = format!("ff:flow:{}:{}:members", tag, fid_str);
259    let summary_key = format!("ff:flow:{}:{}:summary", tag, fid_str);
260
261    let core_exists: bool = client.exists(&core_key).await.unwrap_or(true);
262    if !core_exists {
263        let _: Option<i64> = client
264            .cmd("SREM")
265            .arg(&flow_index_key)
266            .arg(fid_str)
267            .execute()
268            .await
269            .unwrap_or(None);
270        return Ok(false);
271    }
272
273    let true_total: u64 = client
274        .cmd("SCARD")
275        .arg(&members_key)
276        .execute()
277        .await
278        .unwrap_or(0);
279    if true_total == 0 {
280        return Ok(false);
281    }
282
283    let member_eids: Vec<String> = client
284        .cmd("SRANDMEMBER")
285        .arg(&members_key)
286        .arg(BATCH_SIZE.to_string().as_str())
287        .execute()
288        .await
289        .unwrap_or_default();
290    if member_eids.is_empty() {
291        return Ok(false);
292    }
293
294    let mut counts: HashMap<String, u32> = HashMap::new();
295    let mut sampled: u32 = 0;
296    for eid_str in &member_eids {
297        let eid = match ff_core::types::ExecutionId::parse(eid_str) {
298            Ok(id) => id,
299            Err(_) => continue,
300        };
301        let member_partition = ff_core::partition::execution_partition(&eid, config);
302        let ctx_tag = member_partition.hash_tag();
303        let member_core = format!("ff:exec:{}:{}:core", ctx_tag, eid_str);
304
305        let ps: Option<String> = client
306            .cmd("HGET")
307            .arg(&member_core)
308            .arg("public_state")
309            .execute()
310            .await
311            .unwrap_or(None);
312        let state = ps.unwrap_or_else(|| "unknown".to_string());
313        *counts.entry(state).or_insert(0) += 1;
314        sampled += 1;
315    }
316
317    let completed = *counts.get("completed").unwrap_or(&0);
318    let skipped = *counts.get("skipped").unwrap_or(&0);
319    let failed = *counts.get("failed").unwrap_or(&0);
320    let cancelled = *counts.get("cancelled").unwrap_or(&0);
321    let expired = *counts.get("expired").unwrap_or(&0);
322    let active = *counts.get("active").unwrap_or(&0);
323    let suspended = *counts.get("suspended").unwrap_or(&0);
324    let waiting = *counts.get("waiting").unwrap_or(&0);
325    let delayed = *counts.get("delayed").unwrap_or(&0);
326    let rate_limited = *counts.get("rate_limited").unwrap_or(&0);
327    let waiting_children = *counts.get("waiting_children").unwrap_or(&0);
328    let terminal_count = completed + skipped + failed + cancelled + expired;
329    let all_terminal = terminal_count == sampled && sampled > 0;
330    let flow_state = if all_terminal {
331        if failed > 0 || cancelled > 0 || expired > 0 {
332            "failed"
333        } else {
334            "completed"
335        }
336    } else if active > 0 {
337        "running"
338    } else if suspended > 0 || delayed > 0 || rate_limited > 0 || waiting_children > 0 {
339        "blocked"
340    } else {
341        "open"
342    };
343
344    let now_s = now_ms.0.to_string();
345    let _: () = client
346        .cmd("HSET")
347        .arg(&summary_key)
348        .arg("total_members").arg(true_total.to_string().as_str())
349        .arg("sampled_members").arg(sampled.to_string().as_str())
350        .arg("members_completed").arg(completed.to_string().as_str())
351        .arg("members_failed").arg(failed.to_string().as_str())
352        .arg("members_cancelled").arg(cancelled.to_string().as_str())
353        .arg("members_expired").arg(expired.to_string().as_str())
354        .arg("members_skipped").arg(skipped.to_string().as_str())
355        .arg("members_active").arg(active.to_string().as_str())
356        .arg("members_suspended").arg(suspended.to_string().as_str())
357        .arg("members_waiting").arg(waiting.to_string().as_str())
358        .arg("members_delayed").arg(delayed.to_string().as_str())
359        .arg("members_rate_limited").arg(rate_limited.to_string().as_str())
360        .arg("members_waiting_children").arg(waiting_children.to_string().as_str())
361        .arg("public_flow_state").arg(flow_state)
362        .arg("last_summary_update_at").arg(now_s.as_str())
363        .execute()
364        .await?;
365
366    if all_terminal && (sampled as u64) == true_total {
367        let _: Option<i64> = client
368            .cmd("SREM")
369            .arg(&flow_index_key)
370            .arg(fid_str)
371            .execute()
372            .await
373            .unwrap_or(None);
374    }
375
376    Ok(true)
377}
378
379/// Parse an SSCAN reply `[cursor, [member1, member2, ...]]` into
380/// `(cursor, Vec<member>)`. Mirrors the helper in quota_reconciler so
381/// both scanners agree on the wire shape.
382fn parse_sscan_response(val: &ferriskey::Value) -> (String, Vec<String>) {
383    let arr = match val {
384        ferriskey::Value::Array(a) if a.len() >= 2 => a,
385        _ => return ("0".to_string(), vec![]),
386    };
387
388    let cursor = match &arr[0] {
389        Ok(ferriskey::Value::BulkString(b)) => String::from_utf8_lossy(b).into_owned(),
390        Ok(ferriskey::Value::SimpleString(s)) => s.clone(),
391        _ => return ("0".to_string(), vec![]),
392    };
393
394    let mut members = Vec::new();
395    match &arr[1] {
396        Ok(ferriskey::Value::Array(inner)) => {
397            for item in inner {
398                if let Ok(ferriskey::Value::BulkString(b)) = item {
399                    members.push(String::from_utf8_lossy(b).into_owned());
400                }
401            }
402        }
403        Ok(ferriskey::Value::Set(inner)) => {
404            for item in inner {
405                if let ferriskey::Value::BulkString(b) = item {
406                    members.push(String::from_utf8_lossy(b).into_owned());
407                }
408            }
409        }
410        _ => {}
411    }
412
413    (cursor, members)
414}