ff_engine/scanner/
flow_projector.rs1use std::collections::HashMap;
28use std::time::Duration;
29
30use ff_core::keys::FlowIndexKeys;
31use ff_core::partition::{Partition, PartitionConfig, PartitionFamily};
32
33use super::{ScanResult, Scanner};
34
35const BATCH_SIZE: usize = 50;
36
37pub struct FlowProjector {
38 interval: Duration,
39 partition_config: PartitionConfig,
40}
41
42impl FlowProjector {
43 pub fn new(interval: Duration, partition_config: PartitionConfig) -> Self {
44 Self {
45 interval,
46 partition_config,
47 }
48 }
49}
50
51impl Scanner for FlowProjector {
52 fn name(&self) -> &'static str {
53 "flow_projector"
54 }
55
56 fn interval(&self) -> Duration {
57 self.interval
58 }
59
60 async fn scan_partition(
61 &self,
62 client: &ferriskey::Client,
63 partition: u16,
64 ) -> ScanResult {
65 let p = Partition {
66 family: PartitionFamily::Flow,
67 index: partition,
68 };
69 let tag = p.hash_tag();
70 let fidx = FlowIndexKeys::new(&p);
71 let flow_index_key = fidx.flow_index();
72
73 let now_ms = match crate::scanner::lease_expiry::server_time_ms(client).await {
74 Ok(t) => t,
75 Err(e) => {
76 tracing::warn!(partition, error = %e, "flow_projector: failed to get server time");
77 return ScanResult { processed: 0, errors: 1 };
78 }
79 };
80
81 let mut processed: u32 = 0;
82 let mut errors: u32 = 0;
83 let mut cursor = "0".to_string();
84
85 loop {
91 let result: ferriskey::Value = match client
92 .cmd("SSCAN")
93 .arg(&flow_index_key)
94 .arg(cursor.as_str())
95 .arg("COUNT")
96 .arg("100")
97 .execute()
98 .await
99 {
100 Ok(v) => v,
101 Err(e) => {
102 tracing::warn!(partition, error = %e, "flow_projector: SSCAN failed");
103 return ScanResult { processed, errors: errors + 1 };
104 }
105 };
106
107 let (next_cursor, flow_ids) = parse_sscan_response(&result);
108
109 for fid_str in &flow_ids {
110 match project_flow_summary(
111 client, &tag, &flow_index_key, fid_str, now_ms, &self.partition_config,
112 ).await {
113 Ok(true) => processed += 1,
114 Ok(false) => {} Err(e) => {
116 tracing::warn!(
117 partition,
118 flow_id = fid_str.as_str(),
119 error = %e,
120 "flow_projector: projection failed"
121 );
122 errors += 1;
123 }
124 }
125 }
126
127 cursor = next_cursor;
128 if cursor == "0" {
129 break;
130 }
131 }
132
133 ScanResult { processed, errors }
134 }
135}
136
137async fn project_flow_summary(
139 client: &ferriskey::Client,
140 tag: &str,
141 flow_index_key: &str,
142 fid_str: &str,
143 now_ms: u64,
144 config: &PartitionConfig,
145) -> Result<bool, ferriskey::Error> {
146 let core_key = format!("ff:flow:{}:{}:core", tag, fid_str);
147 let members_key = format!("ff:flow:{}:{}:members", tag, fid_str);
148 let summary_key = format!("ff:flow:{}:{}:summary", tag, fid_str);
149
150 let core_exists: bool = client.exists(&core_key).await.unwrap_or(true);
153 if !core_exists {
154 let _: Option<i64> = client
155 .cmd("SREM")
156 .arg(flow_index_key)
157 .arg(fid_str)
158 .execute()
159 .await
160 .unwrap_or(None);
161 return Ok(false);
162 }
163
164 let true_total: u64 = client
166 .cmd("SCARD")
167 .arg(&members_key)
168 .execute()
169 .await
170 .unwrap_or(0);
171
172 if true_total == 0 {
173 return Ok(false);
174 }
175
176 let member_eids: Vec<String> = client
180 .cmd("SRANDMEMBER")
181 .arg(&members_key)
182 .arg(BATCH_SIZE.to_string().as_str())
183 .execute()
184 .await
185 .unwrap_or_default();
186
187 if member_eids.is_empty() {
188 return Ok(false);
189 }
190
191 let mut counts: HashMap<String, u32> = HashMap::new();
193 let mut sampled: u32 = 0;
194
195 for eid_str in &member_eids {
196 let eid = match ff_core::types::ExecutionId::parse(eid_str) {
197 Ok(id) => id,
198 Err(_) => continue,
199 };
200 let partition = ff_core::partition::execution_partition(&eid, config);
201 let ctx_tag = partition.hash_tag();
202 let core_key = format!("ff:exec:{}:{}:core", ctx_tag, eid_str);
203
204 let ps: Option<String> = client
205 .cmd("HGET")
206 .arg(&core_key)
207 .arg("public_state")
208 .execute()
209 .await
210 .unwrap_or(None);
211
212 let state = ps.unwrap_or_else(|| "unknown".to_string());
213 *counts.entry(state).or_insert(0) += 1;
214 sampled += 1;
215 }
216
217 let completed = *counts.get("completed").unwrap_or(&0);
219 let skipped = *counts.get("skipped").unwrap_or(&0);
220 let failed = *counts.get("failed").unwrap_or(&0);
221 let cancelled = *counts.get("cancelled").unwrap_or(&0);
222 let expired = *counts.get("expired").unwrap_or(&0);
223 let active = *counts.get("active").unwrap_or(&0);
224 let suspended = *counts.get("suspended").unwrap_or(&0);
225 let waiting = *counts.get("waiting").unwrap_or(&0);
226 let delayed = *counts.get("delayed").unwrap_or(&0);
227 let rate_limited = *counts.get("rate_limited").unwrap_or(&0);
228 let waiting_children = *counts.get("waiting_children").unwrap_or(&0);
229
230 let terminal_count = completed + skipped + failed + cancelled + expired;
231 let all_terminal = terminal_count == sampled && sampled > 0;
232
233 let flow_state = if all_terminal {
234 if failed > 0 || cancelled > 0 || expired > 0 {
235 "failed"
236 } else {
237 "completed"
238 }
239 } else if active > 0 {
240 "running"
241 } else if suspended > 0 || delayed > 0 || rate_limited > 0 || waiting_children > 0 {
242 "blocked"
243 } else {
244 "open"
245 };
246
247 let _: () = client
249 .cmd("HSET")
250 .arg(&summary_key)
251 .arg("total_members").arg(true_total.to_string().as_str())
252 .arg("sampled_members").arg(sampled.to_string().as_str())
253 .arg("members_completed").arg(completed.to_string().as_str())
254 .arg("members_failed").arg(failed.to_string().as_str())
255 .arg("members_cancelled").arg(cancelled.to_string().as_str())
256 .arg("members_expired").arg(expired.to_string().as_str())
257 .arg("members_skipped").arg(skipped.to_string().as_str())
258 .arg("members_active").arg(active.to_string().as_str())
259 .arg("members_suspended").arg(suspended.to_string().as_str())
260 .arg("members_waiting").arg(waiting.to_string().as_str())
261 .arg("members_delayed").arg(delayed.to_string().as_str())
262 .arg("members_rate_limited").arg(rate_limited.to_string().as_str())
263 .arg("members_waiting_children").arg(waiting_children.to_string().as_str())
264 .arg("public_flow_state").arg(flow_state)
265 .arg("last_summary_update_at").arg(now_ms.to_string().as_str())
266 .execute()
267 .await?;
268
269 if all_terminal && (sampled as u64) == true_total {
284 let _: Option<i64> = client
285 .cmd("SREM")
286 .arg(flow_index_key)
287 .arg(fid_str)
288 .execute()
289 .await
290 .unwrap_or(None);
291 }
292
293 Ok(true)
294}
295
296fn parse_sscan_response(val: &ferriskey::Value) -> (String, Vec<String>) {
300 let arr = match val {
301 ferriskey::Value::Array(a) if a.len() >= 2 => a,
302 _ => return ("0".to_string(), vec![]),
303 };
304
305 let cursor = match &arr[0] {
306 Ok(ferriskey::Value::BulkString(b)) => String::from_utf8_lossy(b).into_owned(),
307 Ok(ferriskey::Value::SimpleString(s)) => s.clone(),
308 _ => return ("0".to_string(), vec![]),
309 };
310
311 let mut members = Vec::new();
312 match &arr[1] {
313 Ok(ferriskey::Value::Array(inner)) => {
314 for item in inner {
315 if let Ok(ferriskey::Value::BulkString(b)) = item {
316 members.push(String::from_utf8_lossy(b).into_owned());
317 }
318 }
319 }
320 Ok(ferriskey::Value::Set(inner)) => {
321 for item in inner {
322 if let ferriskey::Value::BulkString(b) = item {
323 members.push(String::from_utf8_lossy(b).into_owned());
324 }
325 }
326 }
327 _ => {}
328 }
329
330 (cursor, members)
331}
332