Skip to main content

ff_engine/scanner/
flow_projector.rs

1//! Flow summary projector scanner.
2//!
3//! Scans flow partitions ({fp:N}). For each flow: samples up to BATCH_SIZE
4//! member executions (SRANDMEMBER), reads each member's public_state
5//! (cross-partition), and updates the flow summary hash with aggregate
6//! counts and derived public_flow_state.
7//!
8//! Interval: 15s (catchup mode — event-driven in production).
9//!
10//! Cluster-safe: uses SMEMBERS on a partition-level index SET instead of SCAN.
11//!
12//! # Two sources of `public_flow_state`
13//!
14//! This scanner writes a DERIVED `public_flow_state` field to the flow
15//! summary hash (`ff:flow:{fp:N}:<flow_id>:summary`). It does NOT touch
16//! `flow_core.public_flow_state` — that field is owned exclusively by
17//! `ff_create_flow` and `ff_cancel_flow` and is the authoritative
18//! state used for mutation guards (e.g. `ff_add_execution_to_flow`
19//! rejects adds when `flow_core.public_flow_state` is terminal).
20//!
21//! Consumer guidance:
22//! - Mutation-guard / authoritative state → read `flow_core.public_flow_state`.
23//! - Dashboards / projected rollups → read `:summary.public_flow_state`.
24//!
25//! Reference: RFC-007 §Flow Summary Projection, RFC-010 §6.7
26
27use std::collections::HashMap;
28use std::time::Duration;
29
30use ff_core::backend::ScannerFilter;
31use ff_core::keys::FlowIndexKeys;
32use ff_core::partition::{Partition, PartitionConfig, PartitionFamily};
33
34use super::{ScanResult, Scanner};
35
36const BATCH_SIZE: usize = 50;
37
38pub struct FlowProjector {
39    interval: Duration,
40    partition_config: PartitionConfig,
41    /// Issue #122: accepted for uniform API; not applied. See
42    /// [`Self::with_filter`] rustdoc.
43    filter: ScannerFilter,
44}
45
46impl FlowProjector {
47    pub fn new(interval: Duration, partition_config: PartitionConfig) -> Self {
48        Self::with_filter(interval, partition_config, ScannerFilter::default())
49    }
50
51    /// Accepts a [`ScannerFilter`] for uniform construction across
52    /// all scanners (issue #122) but **does not apply it**. This
53    /// scanner projects flow summaries by aggregating the public
54    /// states of many member executions per flow; per-member
55    /// filtering would add an HGET per member per flow (N×M), which
56    /// does not fit the "no extra HGET to filter" budget set in the
57    /// issue #122 design. The flow summary remains a cross-tenant
58    /// aggregate in shared-Valkey deployments — consumers that
59    /// need per-instance summaries should read exec_core directly
60    /// with the filter applied.
61    pub fn with_filter(
62        interval: Duration,
63        partition_config: PartitionConfig,
64        filter: ScannerFilter,
65    ) -> Self {
66        Self {
67            interval,
68            partition_config,
69            filter,
70        }
71    }
72}
73
74impl Scanner for FlowProjector {
75    fn name(&self) -> &'static str {
76        "flow_projector"
77    }
78
79    fn interval(&self) -> Duration {
80        self.interval
81    }
82
83    fn filter(&self) -> &ScannerFilter {
84        &self.filter
85    }
86
87    async fn scan_partition(
88        &self,
89        client: &ferriskey::Client,
90        partition: u16,
91    ) -> ScanResult {
92        let p = Partition {
93            family: PartitionFamily::Flow,
94            index: partition,
95        };
96        let tag = p.hash_tag();
97        let fidx = FlowIndexKeys::new(&p);
98        let flow_index_key = fidx.flow_index();
99
100        let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
101            Ok(t) => t,
102            Err(e) => {
103                tracing::warn!(partition, error = %e, "flow_projector: failed to get server time");
104                return ScanResult { processed: 0, errors: 1 };
105            }
106        };
107
108        let mut processed: u32 = 0;
109        let mut errors: u32 = 0;
110        let mut cursor = "0".to_string();
111
112        // Stream flow_index in SSCAN batches (COUNT 100) instead of a
113        // single SMEMBERS. SMEMBERS on a partition with many flows would
114        // materialise every flow_id into one Vec<String> before we could
115        // project the first one; SSCAN bounds memory to one batch and
116        // keeps the Valkey command's server-side work bounded per call.
117        loop {
118            let result: ferriskey::Value = match client
119                .cmd("SSCAN")
120                .arg(&flow_index_key)
121                .arg(cursor.as_str())
122                .arg("COUNT")
123                .arg("100")
124                .execute()
125                .await
126            {
127                Ok(v) => v,
128                Err(e) => {
129                    tracing::warn!(partition, error = %e, "flow_projector: SSCAN failed");
130                    return ScanResult { processed, errors: errors + 1 };
131                }
132            };
133
134            let (next_cursor, flow_ids) = parse_sscan_response(&result);
135
136            for fid_str in &flow_ids {
137                match project_flow_summary(
138                    client, &tag, &flow_index_key, fid_str, now_ms, &self.partition_config,
139                ).await {
140                    Ok(true) => processed += 1,
141                    Ok(false) => {} // no members or already up-to-date
142                    Err(e) => {
143                        tracing::warn!(
144                            partition,
145                            flow_id = fid_str.as_str(),
146                            error = %e,
147                            "flow_projector: projection failed"
148                        );
149                        errors += 1;
150                    }
151                }
152            }
153
154            cursor = next_cursor;
155            if cursor == "0" {
156                break;
157            }
158        }
159
160        ScanResult { processed, errors }
161    }
162}
163
164/// Project the summary for one flow. Returns Ok(true) if updated.
165async fn project_flow_summary(
166    client: &ferriskey::Client,
167    tag: &str,
168    flow_index_key: &str,
169    fid_str: &str,
170    now_ms: u64,
171    config: &PartitionConfig,
172) -> Result<bool, ferriskey::Error> {
173    let core_key = format!("ff:flow:{}:{}:core", tag, fid_str);
174    let members_key = format!("ff:flow:{}:{}:members", tag, fid_str);
175    let summary_key = format!("ff:flow:{}:{}:summary", tag, fid_str);
176
177    // Defensive prune: index entry for a flow whose core is gone (manual
178    // delete / retention purge) — drop it so SMEMBERS stays correct.
179    let core_exists: bool = client.exists(&core_key).await.unwrap_or(true);
180    if !core_exists {
181        let _: Option<i64> = client
182            .cmd("SREM")
183            .arg(flow_index_key)
184            .arg(fid_str)
185            .execute()
186            .await
187            .unwrap_or(None);
188        return Ok(false);
189    }
190
191    // Get true membership count for accurate total_members reporting.
192    let true_total: u64 = client
193        .cmd("SCARD")
194        .arg(&members_key)
195        .execute()
196        .await
197        .unwrap_or(0);
198
199    if true_total == 0 {
200        return Ok(false);
201    }
202
203    // Sample up to BATCH_SIZE member execution IDs (avoids loading the
204    // entire membership set into memory for large flows).
205    // SRANDMEMBER key count returns up to `count` distinct members.
206    let member_eids: Vec<String> = client
207        .cmd("SRANDMEMBER")
208        .arg(&members_key)
209        .arg(BATCH_SIZE.to_string().as_str())
210        .execute()
211        .await
212        .unwrap_or_default();
213
214    if member_eids.is_empty() {
215        return Ok(false);
216    }
217
218    // Count public_state for each sampled member (cross-partition reads)
219    let mut counts: HashMap<String, u32> = HashMap::new();
220    let mut sampled: u32 = 0;
221
222    for eid_str in &member_eids {
223        let eid = match ff_core::types::ExecutionId::parse(eid_str) {
224            Ok(id) => id,
225            Err(_) => continue,
226        };
227        let partition = ff_core::partition::execution_partition(&eid, config);
228        let ctx_tag = partition.hash_tag();
229        let core_key = format!("ff:exec:{}:{}:core", ctx_tag, eid_str);
230
231        let ps: Option<String> = client
232            .cmd("HGET")
233            .arg(&core_key)
234            .arg("public_state")
235            .execute()
236            .await
237            .unwrap_or(None);
238
239        let state = ps.unwrap_or_else(|| "unknown".to_string());
240        *counts.entry(state).or_insert(0) += 1;
241        sampled += 1;
242    }
243
244    // Derive public_flow_state from sample
245    let completed = *counts.get("completed").unwrap_or(&0);
246    let skipped = *counts.get("skipped").unwrap_or(&0);
247    let failed = *counts.get("failed").unwrap_or(&0);
248    let cancelled = *counts.get("cancelled").unwrap_or(&0);
249    let expired = *counts.get("expired").unwrap_or(&0);
250    let active = *counts.get("active").unwrap_or(&0);
251    let suspended = *counts.get("suspended").unwrap_or(&0);
252    let waiting = *counts.get("waiting").unwrap_or(&0);
253    let delayed = *counts.get("delayed").unwrap_or(&0);
254    let rate_limited = *counts.get("rate_limited").unwrap_or(&0);
255    let waiting_children = *counts.get("waiting_children").unwrap_or(&0);
256
257    let terminal_count = completed + skipped + failed + cancelled + expired;
258    let all_terminal = terminal_count == sampled && sampled > 0;
259
260    let flow_state = if all_terminal {
261        if failed > 0 || cancelled > 0 || expired > 0 {
262            "failed"
263        } else {
264            "completed"
265        }
266    } else if active > 0 {
267        "running"
268    } else if suspended > 0 || delayed > 0 || rate_limited > 0 || waiting_children > 0 {
269        "blocked"
270    } else {
271        "open"
272    };
273
274    // Update summary hash
275    let _: () = client
276        .cmd("HSET")
277        .arg(&summary_key)
278        .arg("total_members").arg(true_total.to_string().as_str())
279        .arg("sampled_members").arg(sampled.to_string().as_str())
280        .arg("members_completed").arg(completed.to_string().as_str())
281        .arg("members_failed").arg(failed.to_string().as_str())
282        .arg("members_cancelled").arg(cancelled.to_string().as_str())
283        .arg("members_expired").arg(expired.to_string().as_str())
284        .arg("members_skipped").arg(skipped.to_string().as_str())
285        .arg("members_active").arg(active.to_string().as_str())
286        .arg("members_suspended").arg(suspended.to_string().as_str())
287        .arg("members_waiting").arg(waiting.to_string().as_str())
288        .arg("members_delayed").arg(delayed.to_string().as_str())
289        .arg("members_rate_limited").arg(rate_limited.to_string().as_str())
290        .arg("members_waiting_children").arg(waiting_children.to_string().as_str())
291        .arg("public_flow_state").arg(flow_state)
292        .arg("last_summary_update_at").arg(now_ms.to_string().as_str())
293        .execute()
294        .await?;
295
296    // Prune the index entry only when we've observed EVERY member in this
297    // cycle and all of them are terminal. Gating on the full walk (not the
298    // sample) matters for two reasons:
299    //   1. For flows larger than BATCH_SIZE, a sample being "all terminal"
300    //      does not imply the flow is actually done; unsampled members
301    //      may still be running. Pruning on the sample would freeze the
302    //      summary mid-flight.
303    //   2. ff_replay_execution runs on {p:N}, so it cannot re-SADD the
304    //      {fp:N} flow_index when a terminal flow member is revived.
305    //      If we SREM while any revival path is reachable, the flow never
306    //      comes back into the projector's view.
307    // For large flows that never satisfy sampled == true_total, the
308    // defensive HGETALL-empty prune and retention deletion still clean up
309    // eventually. We accept a modest cardinality cost for correctness.
310    if all_terminal && (sampled as u64) == true_total {
311        let _: Option<i64> = client
312            .cmd("SREM")
313            .arg(flow_index_key)
314            .arg(fid_str)
315            .execute()
316            .await
317            .unwrap_or(None);
318    }
319
320    Ok(true)
321}
322
323/// Parse an SSCAN reply `[cursor, [member1, member2, ...]]` into
324/// `(cursor, Vec<member>)`. Mirrors the helper in quota_reconciler so
325/// both scanners agree on the wire shape.
326fn parse_sscan_response(val: &ferriskey::Value) -> (String, Vec<String>) {
327    let arr = match val {
328        ferriskey::Value::Array(a) if a.len() >= 2 => a,
329        _ => return ("0".to_string(), vec![]),
330    };
331
332    let cursor = match &arr[0] {
333        Ok(ferriskey::Value::BulkString(b)) => String::from_utf8_lossy(b).into_owned(),
334        Ok(ferriskey::Value::SimpleString(s)) => s.clone(),
335        _ => return ("0".to_string(), vec![]),
336    };
337
338    let mut members = Vec::new();
339    match &arr[1] {
340        Ok(ferriskey::Value::Array(inner)) => {
341            for item in inner {
342                if let Ok(ferriskey::Value::BulkString(b)) = item {
343                    members.push(String::from_utf8_lossy(b).into_owned());
344                }
345            }
346        }
347        Ok(ferriskey::Value::Set(inner)) => {
348            for item in inner {
349                if let ferriskey::Value::BulkString(b) = item {
350                    members.push(String::from_utf8_lossy(b).into_owned());
351                }
352            }
353        }
354        _ => {}
355    }
356
357    (cursor, members)
358}
359