Skip to main content

ff_engine/scanner/
flow_projector.rs

1//! Flow summary projector scanner.
2//!
3//! Scans flow partitions ({fp:N}). For each flow: samples up to BATCH_SIZE
4//! member executions (SRANDMEMBER), reads each member's public_state
5//! (cross-partition), and updates the flow summary hash with aggregate
6//! counts and derived public_flow_state.
7//!
8//! Interval: 15s (catchup mode — event-driven in production).
9//!
10//! Cluster-safe: uses SMEMBERS on a partition-level index SET instead of SCAN.
11//!
12//! # Two sources of `public_flow_state`
13//!
14//! This scanner writes a DERIVED `public_flow_state` field to the flow
15//! summary hash (`ff:flow:{fp:N}:<flow_id>:summary`). It does NOT touch
16//! `flow_core.public_flow_state` — that field is owned exclusively by
17//! `ff_create_flow` and `ff_cancel_flow` and is the authoritative
18//! state used for mutation guards (e.g. `ff_add_execution_to_flow`
19//! rejects adds when `flow_core.public_flow_state` is terminal).
20//!
21//! Consumer guidance:
22//! - Mutation-guard / authoritative state → read `flow_core.public_flow_state`.
23//! - Dashboards / projected rollups → read `:summary.public_flow_state`.
24//!
25//! Reference: RFC-007 §Flow Summary Projection, RFC-010 §6.7
26
27use std::collections::HashMap;
28use std::time::Duration;
29
30use ff_core::keys::FlowIndexKeys;
31use ff_core::partition::{Partition, PartitionConfig, PartitionFamily};
32
33use super::{ScanResult, Scanner};
34
35const BATCH_SIZE: usize = 50;
36
37pub struct FlowProjector {
38    interval: Duration,
39    partition_config: PartitionConfig,
40}
41
42impl FlowProjector {
43    pub fn new(interval: Duration, partition_config: PartitionConfig) -> Self {
44        Self {
45            interval,
46            partition_config,
47        }
48    }
49}
50
51impl Scanner for FlowProjector {
52    fn name(&self) -> &'static str {
53        "flow_projector"
54    }
55
56    fn interval(&self) -> Duration {
57        self.interval
58    }
59
60    async fn scan_partition(
61        &self,
62        client: &ferriskey::Client,
63        partition: u16,
64    ) -> ScanResult {
65        let p = Partition {
66            family: PartitionFamily::Flow,
67            index: partition,
68        };
69        let tag = p.hash_tag();
70        let fidx = FlowIndexKeys::new(&p);
71        let flow_index_key = fidx.flow_index();
72
73        let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
74            Ok(t) => t,
75            Err(e) => {
76                tracing::warn!(partition, error = %e, "flow_projector: failed to get server time");
77                return ScanResult { processed: 0, errors: 1 };
78            }
79        };
80
81        let mut processed: u32 = 0;
82        let mut errors: u32 = 0;
83        let mut cursor = "0".to_string();
84
85        // Stream flow_index in SSCAN batches (COUNT 100) instead of a
86        // single SMEMBERS. SMEMBERS on a partition with many flows would
87        // materialise every flow_id into one Vec<String> before we could
88        // project the first one; SSCAN bounds memory to one batch and
89        // keeps the Valkey command's server-side work bounded per call.
90        loop {
91            let result: ferriskey::Value = match client
92                .cmd("SSCAN")
93                .arg(&flow_index_key)
94                .arg(cursor.as_str())
95                .arg("COUNT")
96                .arg("100")
97                .execute()
98                .await
99            {
100                Ok(v) => v,
101                Err(e) => {
102                    tracing::warn!(partition, error = %e, "flow_projector: SSCAN failed");
103                    return ScanResult { processed, errors: errors + 1 };
104                }
105            };
106
107            let (next_cursor, flow_ids) = parse_sscan_response(&result);
108
109            for fid_str in &flow_ids {
110                match project_flow_summary(
111                    client, &tag, &flow_index_key, fid_str, now_ms, &self.partition_config,
112                ).await {
113                    Ok(true) => processed += 1,
114                    Ok(false) => {} // no members or already up-to-date
115                    Err(e) => {
116                        tracing::warn!(
117                            partition,
118                            flow_id = fid_str.as_str(),
119                            error = %e,
120                            "flow_projector: projection failed"
121                        );
122                        errors += 1;
123                    }
124                }
125            }
126
127            cursor = next_cursor;
128            if cursor == "0" {
129                break;
130            }
131        }
132
133        ScanResult { processed, errors }
134    }
135}
136
137/// Project the summary for one flow. Returns Ok(true) if updated.
138async fn project_flow_summary(
139    client: &ferriskey::Client,
140    tag: &str,
141    flow_index_key: &str,
142    fid_str: &str,
143    now_ms: u64,
144    config: &PartitionConfig,
145) -> Result<bool, ferriskey::Error> {
146    let core_key = format!("ff:flow:{}:{}:core", tag, fid_str);
147    let members_key = format!("ff:flow:{}:{}:members", tag, fid_str);
148    let summary_key = format!("ff:flow:{}:{}:summary", tag, fid_str);
149
150    // Defensive prune: index entry for a flow whose core is gone (manual
151    // delete / retention purge) — drop it so SMEMBERS stays correct.
152    let core_exists: bool = client.exists(&core_key).await.unwrap_or(true);
153    if !core_exists {
154        let _: Option<i64> = client
155            .cmd("SREM")
156            .arg(flow_index_key)
157            .arg(fid_str)
158            .execute()
159            .await
160            .unwrap_or(None);
161        return Ok(false);
162    }
163
164    // Get true membership count for accurate total_members reporting.
165    let true_total: u64 = client
166        .cmd("SCARD")
167        .arg(&members_key)
168        .execute()
169        .await
170        .unwrap_or(0);
171
172    if true_total == 0 {
173        return Ok(false);
174    }
175
176    // Sample up to BATCH_SIZE member execution IDs (avoids loading the
177    // entire membership set into memory for large flows).
178    // SRANDMEMBER key count returns up to `count` distinct members.
179    let member_eids: Vec<String> = client
180        .cmd("SRANDMEMBER")
181        .arg(&members_key)
182        .arg(BATCH_SIZE.to_string().as_str())
183        .execute()
184        .await
185        .unwrap_or_default();
186
187    if member_eids.is_empty() {
188        return Ok(false);
189    }
190
191    // Count public_state for each sampled member (cross-partition reads)
192    let mut counts: HashMap<String, u32> = HashMap::new();
193    let mut sampled: u32 = 0;
194
195    for eid_str in &member_eids {
196        let eid = match ff_core::types::ExecutionId::parse(eid_str) {
197            Ok(id) => id,
198            Err(_) => continue,
199        };
200        let partition = ff_core::partition::execution_partition(&eid, config);
201        let ctx_tag = partition.hash_tag();
202        let core_key = format!("ff:exec:{}:{}:core", ctx_tag, eid_str);
203
204        let ps: Option<String> = client
205            .cmd("HGET")
206            .arg(&core_key)
207            .arg("public_state")
208            .execute()
209            .await
210            .unwrap_or(None);
211
212        let state = ps.unwrap_or_else(|| "unknown".to_string());
213        *counts.entry(state).or_insert(0) += 1;
214        sampled += 1;
215    }
216
217    // Derive public_flow_state from sample
218    let completed = *counts.get("completed").unwrap_or(&0);
219    let skipped = *counts.get("skipped").unwrap_or(&0);
220    let failed = *counts.get("failed").unwrap_or(&0);
221    let cancelled = *counts.get("cancelled").unwrap_or(&0);
222    let expired = *counts.get("expired").unwrap_or(&0);
223    let active = *counts.get("active").unwrap_or(&0);
224    let suspended = *counts.get("suspended").unwrap_or(&0);
225    let waiting = *counts.get("waiting").unwrap_or(&0);
226    let delayed = *counts.get("delayed").unwrap_or(&0);
227    let rate_limited = *counts.get("rate_limited").unwrap_or(&0);
228    let waiting_children = *counts.get("waiting_children").unwrap_or(&0);
229
230    let terminal_count = completed + skipped + failed + cancelled + expired;
231    let all_terminal = terminal_count == sampled && sampled > 0;
232
233    let flow_state = if all_terminal {
234        if failed > 0 || cancelled > 0 || expired > 0 {
235            "failed"
236        } else {
237            "completed"
238        }
239    } else if active > 0 {
240        "running"
241    } else if suspended > 0 || delayed > 0 || rate_limited > 0 || waiting_children > 0 {
242        "blocked"
243    } else {
244        "open"
245    };
246
247    // Update summary hash
248    let _: () = client
249        .cmd("HSET")
250        .arg(&summary_key)
251        .arg("total_members").arg(true_total.to_string().as_str())
252        .arg("sampled_members").arg(sampled.to_string().as_str())
253        .arg("members_completed").arg(completed.to_string().as_str())
254        .arg("members_failed").arg(failed.to_string().as_str())
255        .arg("members_cancelled").arg(cancelled.to_string().as_str())
256        .arg("members_expired").arg(expired.to_string().as_str())
257        .arg("members_skipped").arg(skipped.to_string().as_str())
258        .arg("members_active").arg(active.to_string().as_str())
259        .arg("members_suspended").arg(suspended.to_string().as_str())
260        .arg("members_waiting").arg(waiting.to_string().as_str())
261        .arg("members_delayed").arg(delayed.to_string().as_str())
262        .arg("members_rate_limited").arg(rate_limited.to_string().as_str())
263        .arg("members_waiting_children").arg(waiting_children.to_string().as_str())
264        .arg("public_flow_state").arg(flow_state)
265        .arg("last_summary_update_at").arg(now_ms.to_string().as_str())
266        .execute()
267        .await?;
268
269    // Prune the index entry only when we've observed EVERY member in this
270    // cycle and all of them are terminal. Gating on the full walk (not the
271    // sample) matters for two reasons:
272    //   1. For flows larger than BATCH_SIZE, a sample being "all terminal"
273    //      does not imply the flow is actually done; unsampled members
274    //      may still be running. Pruning on the sample would freeze the
275    //      summary mid-flight.
276    //   2. ff_replay_execution runs on {p:N}, so it cannot re-SADD the
277    //      {fp:N} flow_index when a terminal flow member is revived.
278    //      If we SREM while any revival path is reachable, the flow never
279    //      comes back into the projector's view.
280    // For large flows that never satisfy sampled == true_total, the
281    // defensive HGETALL-empty prune and retention deletion still clean up
282    // eventually. We accept a modest cardinality cost for correctness.
283    if all_terminal && (sampled as u64) == true_total {
284        let _: Option<i64> = client
285            .cmd("SREM")
286            .arg(flow_index_key)
287            .arg(fid_str)
288            .execute()
289            .await
290            .unwrap_or(None);
291    }
292
293    Ok(true)
294}
295
296/// Parse an SSCAN reply `[cursor, [member1, member2, ...]]` into
297/// `(cursor, Vec<member>)`. Mirrors the helper in quota_reconciler so
298/// both scanners agree on the wire shape.
299fn parse_sscan_response(val: &ferriskey::Value) -> (String, Vec<String>) {
300    let arr = match val {
301        ferriskey::Value::Array(a) if a.len() >= 2 => a,
302        _ => return ("0".to_string(), vec![]),
303    };
304
305    let cursor = match &arr[0] {
306        Ok(ferriskey::Value::BulkString(b)) => String::from_utf8_lossy(b).into_owned(),
307        Ok(ferriskey::Value::SimpleString(s)) => s.clone(),
308        _ => return ("0".to_string(), vec![]),
309    };
310
311    let mut members = Vec::new();
312    match &arr[1] {
313        Ok(ferriskey::Value::Array(inner)) => {
314            for item in inner {
315                if let Ok(ferriskey::Value::BulkString(b)) = item {
316                    members.push(String::from_utf8_lossy(b).into_owned());
317                }
318            }
319        }
320        Ok(ferriskey::Value::Set(inner)) => {
321            for item in inner {
322                if let ferriskey::Value::BulkString(b) = item {
323                    members.push(String::from_utf8_lossy(b).into_owned());
324                }
325            }
326        }
327        _ => {}
328    }
329
330    (cursor, members)
331}
332