ff_engine/scanner/
flow_projector.rs1use std::collections::HashMap;
28use std::time::Duration;
29
30use ff_core::backend::ScannerFilter;
31use ff_core::keys::FlowIndexKeys;
32use ff_core::partition::{Partition, PartitionConfig, PartitionFamily};
33
34use super::{ScanResult, Scanner};
35
36const BATCH_SIZE: usize = 50;
37
38pub struct FlowProjector {
39 interval: Duration,
40 partition_config: PartitionConfig,
41 filter: ScannerFilter,
44}
45
46impl FlowProjector {
47 pub fn new(interval: Duration, partition_config: PartitionConfig) -> Self {
48 Self::with_filter(interval, partition_config, ScannerFilter::default())
49 }
50
51 pub fn with_filter(
62 interval: Duration,
63 partition_config: PartitionConfig,
64 filter: ScannerFilter,
65 ) -> Self {
66 Self {
67 interval,
68 partition_config,
69 filter,
70 }
71 }
72}
73
74impl Scanner for FlowProjector {
75 fn name(&self) -> &'static str {
76 "flow_projector"
77 }
78
79 fn interval(&self) -> Duration {
80 self.interval
81 }
82
83 fn filter(&self) -> &ScannerFilter {
84 &self.filter
85 }
86
87 async fn scan_partition(
88 &self,
89 client: &ferriskey::Client,
90 partition: u16,
91 ) -> ScanResult {
92 let p = Partition {
93 family: PartitionFamily::Flow,
94 index: partition,
95 };
96 let tag = p.hash_tag();
97 let fidx = FlowIndexKeys::new(&p);
98 let flow_index_key = fidx.flow_index();
99
100 let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
101 Ok(t) => t,
102 Err(e) => {
103 tracing::warn!(partition, error = %e, "flow_projector: failed to get server time");
104 return ScanResult { processed: 0, errors: 1 };
105 }
106 };
107
108 let mut processed: u32 = 0;
109 let mut errors: u32 = 0;
110 let mut cursor = "0".to_string();
111
112 loop {
118 let result: ferriskey::Value = match client
119 .cmd("SSCAN")
120 .arg(&flow_index_key)
121 .arg(cursor.as_str())
122 .arg("COUNT")
123 .arg("100")
124 .execute()
125 .await
126 {
127 Ok(v) => v,
128 Err(e) => {
129 tracing::warn!(partition, error = %e, "flow_projector: SSCAN failed");
130 return ScanResult { processed, errors: errors + 1 };
131 }
132 };
133
134 let (next_cursor, flow_ids) = parse_sscan_response(&result);
135
136 for fid_str in &flow_ids {
137 match project_flow_summary(
138 client, &tag, &flow_index_key, fid_str, now_ms, &self.partition_config,
139 ).await {
140 Ok(true) => processed += 1,
141 Ok(false) => {} Err(e) => {
143 tracing::warn!(
144 partition,
145 flow_id = fid_str.as_str(),
146 error = %e,
147 "flow_projector: projection failed"
148 );
149 errors += 1;
150 }
151 }
152 }
153
154 cursor = next_cursor;
155 if cursor == "0" {
156 break;
157 }
158 }
159
160 ScanResult { processed, errors }
161 }
162}
163
164async fn project_flow_summary(
166 client: &ferriskey::Client,
167 tag: &str,
168 flow_index_key: &str,
169 fid_str: &str,
170 now_ms: u64,
171 config: &PartitionConfig,
172) -> Result<bool, ferriskey::Error> {
173 let core_key = format!("ff:flow:{}:{}:core", tag, fid_str);
174 let members_key = format!("ff:flow:{}:{}:members", tag, fid_str);
175 let summary_key = format!("ff:flow:{}:{}:summary", tag, fid_str);
176
177 let core_exists: bool = client.exists(&core_key).await.unwrap_or(true);
180 if !core_exists {
181 let _: Option<i64> = client
182 .cmd("SREM")
183 .arg(flow_index_key)
184 .arg(fid_str)
185 .execute()
186 .await
187 .unwrap_or(None);
188 return Ok(false);
189 }
190
191 let true_total: u64 = client
193 .cmd("SCARD")
194 .arg(&members_key)
195 .execute()
196 .await
197 .unwrap_or(0);
198
199 if true_total == 0 {
200 return Ok(false);
201 }
202
203 let member_eids: Vec<String> = client
207 .cmd("SRANDMEMBER")
208 .arg(&members_key)
209 .arg(BATCH_SIZE.to_string().as_str())
210 .execute()
211 .await
212 .unwrap_or_default();
213
214 if member_eids.is_empty() {
215 return Ok(false);
216 }
217
218 let mut counts: HashMap<String, u32> = HashMap::new();
220 let mut sampled: u32 = 0;
221
222 for eid_str in &member_eids {
223 let eid = match ff_core::types::ExecutionId::parse(eid_str) {
224 Ok(id) => id,
225 Err(_) => continue,
226 };
227 let partition = ff_core::partition::execution_partition(&eid, config);
228 let ctx_tag = partition.hash_tag();
229 let core_key = format!("ff:exec:{}:{}:core", ctx_tag, eid_str);
230
231 let ps: Option<String> = client
232 .cmd("HGET")
233 .arg(&core_key)
234 .arg("public_state")
235 .execute()
236 .await
237 .unwrap_or(None);
238
239 let state = ps.unwrap_or_else(|| "unknown".to_string());
240 *counts.entry(state).or_insert(0) += 1;
241 sampled += 1;
242 }
243
244 let completed = *counts.get("completed").unwrap_or(&0);
246 let skipped = *counts.get("skipped").unwrap_or(&0);
247 let failed = *counts.get("failed").unwrap_or(&0);
248 let cancelled = *counts.get("cancelled").unwrap_or(&0);
249 let expired = *counts.get("expired").unwrap_or(&0);
250 let active = *counts.get("active").unwrap_or(&0);
251 let suspended = *counts.get("suspended").unwrap_or(&0);
252 let waiting = *counts.get("waiting").unwrap_or(&0);
253 let delayed = *counts.get("delayed").unwrap_or(&0);
254 let rate_limited = *counts.get("rate_limited").unwrap_or(&0);
255 let waiting_children = *counts.get("waiting_children").unwrap_or(&0);
256
257 let terminal_count = completed + skipped + failed + cancelled + expired;
258 let all_terminal = terminal_count == sampled && sampled > 0;
259
260 let flow_state = if all_terminal {
261 if failed > 0 || cancelled > 0 || expired > 0 {
262 "failed"
263 } else {
264 "completed"
265 }
266 } else if active > 0 {
267 "running"
268 } else if suspended > 0 || delayed > 0 || rate_limited > 0 || waiting_children > 0 {
269 "blocked"
270 } else {
271 "open"
272 };
273
274 let _: () = client
276 .cmd("HSET")
277 .arg(&summary_key)
278 .arg("total_members").arg(true_total.to_string().as_str())
279 .arg("sampled_members").arg(sampled.to_string().as_str())
280 .arg("members_completed").arg(completed.to_string().as_str())
281 .arg("members_failed").arg(failed.to_string().as_str())
282 .arg("members_cancelled").arg(cancelled.to_string().as_str())
283 .arg("members_expired").arg(expired.to_string().as_str())
284 .arg("members_skipped").arg(skipped.to_string().as_str())
285 .arg("members_active").arg(active.to_string().as_str())
286 .arg("members_suspended").arg(suspended.to_string().as_str())
287 .arg("members_waiting").arg(waiting.to_string().as_str())
288 .arg("members_delayed").arg(delayed.to_string().as_str())
289 .arg("members_rate_limited").arg(rate_limited.to_string().as_str())
290 .arg("members_waiting_children").arg(waiting_children.to_string().as_str())
291 .arg("public_flow_state").arg(flow_state)
292 .arg("last_summary_update_at").arg(now_ms.to_string().as_str())
293 .execute()
294 .await?;
295
296 if all_terminal && (sampled as u64) == true_total {
311 let _: Option<i64> = client
312 .cmd("SREM")
313 .arg(flow_index_key)
314 .arg(fid_str)
315 .execute()
316 .await
317 .unwrap_or(None);
318 }
319
320 Ok(true)
321}
322
323fn parse_sscan_response(val: &ferriskey::Value) -> (String, Vec<String>) {
327 let arr = match val {
328 ferriskey::Value::Array(a) if a.len() >= 2 => a,
329 _ => return ("0".to_string(), vec![]),
330 };
331
332 let cursor = match &arr[0] {
333 Ok(ferriskey::Value::BulkString(b)) => String::from_utf8_lossy(b).into_owned(),
334 Ok(ferriskey::Value::SimpleString(s)) => s.clone(),
335 _ => return ("0".to_string(), vec![]),
336 };
337
338 let mut members = Vec::new();
339 match &arr[1] {
340 Ok(ferriskey::Value::Array(inner)) => {
341 for item in inner {
342 if let Ok(ferriskey::Value::BulkString(b)) = item {
343 members.push(String::from_utf8_lossy(b).into_owned());
344 }
345 }
346 }
347 Ok(ferriskey::Value::Set(inner)) => {
348 for item in inner {
349 if let ferriskey::Value::BulkString(b) = item {
350 members.push(String::from_utf8_lossy(b).into_owned());
351 }
352 }
353 }
354 _ => {}
355 }
356
357 (cursor, members)
358}
359