1use std::sync::Arc;
29use std::time::Duration;
30
31use ff_core::backend::ScannerFilter;
32use ff_core::engine_backend::EngineBackend;
33use ff_core::engine_error::EngineError;
34use ff_core::keys::FlowIndexKeys;
35use ff_core::partition::{Partition, PartitionConfig, PartitionFamily};
36use ff_core::types::{FlowId, TimestampMs};
37
38use super::{ScanResult, Scanner};
39
40pub struct FlowProjector {
41 interval: Duration,
42 partition_config: PartitionConfig,
43 filter: ScannerFilter,
46 backend: Option<Arc<dyn EngineBackend>>,
47}
48
49impl FlowProjector {
50 pub fn new(interval: Duration, partition_config: PartitionConfig) -> Self {
51 Self::with_filter(interval, partition_config, ScannerFilter::default())
52 }
53
54 pub fn with_filter(
65 interval: Duration,
66 partition_config: PartitionConfig,
67 filter: ScannerFilter,
68 ) -> Self {
69 Self {
70 interval,
71 partition_config,
72 filter,
73 backend: None,
74 }
75 }
76
77 pub fn with_filter_and_backend(
82 interval: Duration,
83 partition_config: PartitionConfig,
84 filter: ScannerFilter,
85 backend: Arc<dyn EngineBackend>,
86 ) -> Self {
87 Self {
88 interval,
89 partition_config,
90 filter,
91 backend: Some(backend),
92 }
93 }
94}
95
96impl Scanner for FlowProjector {
97 fn name(&self) -> &'static str {
98 "flow_projector"
99 }
100
101 fn interval(&self) -> Duration {
102 self.interval
103 }
104
105 fn filter(&self) -> &ScannerFilter {
106 &self.filter
107 }
108
109 async fn scan_partition(
110 &self,
111 client: &ferriskey::Client,
112 partition: u16,
113 ) -> ScanResult {
114 let p = Partition {
115 family: PartitionFamily::Flow,
116 index: partition,
117 };
118 let fidx = FlowIndexKeys::new(&p);
119 let flow_index_key = fidx.flow_index();
120
121 let now_ms_res: Result<u64, String> = if let Some(ref b) = self.backend {
122 b.server_time_ms().await.map_err(|e| e.to_string())
123 } else {
124 crate::scanner::lease_expiry::server_time_ms_legacy(client).await.map_err(|e| e.to_string())
125 };
126 let now_ms = match now_ms_res {
127 Ok(t) => t,
128 Err(e) => {
129 tracing::warn!(partition, error = %e, "flow_projector: failed to get server time");
130 return ScanResult { processed: 0, errors: 1 };
131 }
132 };
133 let now_ts = TimestampMs::from_millis(now_ms as i64);
134
135 let mut processed: u32 = 0;
136 let mut errors: u32 = 0;
137 let mut cursor = "0".to_string();
138
139 loop {
152 let result: ferriskey::Value = match client
153 .cmd("SSCAN")
154 .arg(&flow_index_key)
155 .arg(cursor.as_str())
156 .arg("COUNT")
157 .arg("100")
158 .execute()
159 .await
160 {
161 Ok(v) => v,
162 Err(e) => {
163 tracing::warn!(partition, error = %e, "flow_projector: SSCAN failed");
164 return ScanResult { processed, errors: errors + 1 };
165 }
166 };
167
168 let (next_cursor, flow_ids) = parse_sscan_response(&result);
169
170 for fid_str in &flow_ids {
171 let res = project_one_flow(
172 client,
173 self.backend.as_ref(),
174 &p,
175 &self.partition_config,
176 fid_str,
177 now_ts,
178 )
179 .await;
180 match res {
181 Ok(true) => processed += 1,
182 Ok(false) => {} Err(e) => {
184 tracing::warn!(
185 partition,
186 flow_id = fid_str.as_str(),
187 error = %e,
188 "flow_projector: projection failed"
189 );
190 errors += 1;
191 }
192 }
193 }
194
195 cursor = next_cursor;
196 if cursor == "0" {
197 break;
198 }
199 }
200
201 ScanResult { processed, errors }
202 }
203}
204
205async fn project_one_flow(
211 client: &ferriskey::Client,
212 backend: Option<&Arc<dyn EngineBackend>>,
213 partition: &Partition,
214 partition_config: &PartitionConfig,
215 fid_str: &str,
216 now_ms: TimestampMs,
217) -> Result<bool, String> {
218 let flow_id = match FlowId::parse(fid_str) {
219 Ok(id) => id,
220 Err(e) => {
221 return Err(format!("malformed flow_id {fid_str:?}: {e}"));
222 }
223 };
224
225 if let Some(backend_arc) = backend {
226 return backend_arc
227 .project_flow_summary(*partition, &flow_id, now_ms)
228 .await
229 .map_err(|e: EngineError| e.to_string());
230 }
231
232 project_direct_fallback(client, partition, partition_config, fid_str, now_ms)
236 .await
237 .map_err(|e| e.to_string())
238}
239
240async fn project_direct_fallback(
244 client: &ferriskey::Client,
245 partition: &Partition,
246 config: &PartitionConfig,
247 fid_str: &str,
248 now_ms: TimestampMs,
249) -> Result<bool, ferriskey::Error> {
250 use std::collections::HashMap;
251
252 const BATCH_SIZE: usize = 50;
253 let tag = partition.hash_tag();
254 let fidx = FlowIndexKeys::new(partition);
255 let flow_index_key = fidx.flow_index();
256
257 let core_key = format!("ff:flow:{}:{}:core", tag, fid_str);
258 let members_key = format!("ff:flow:{}:{}:members", tag, fid_str);
259 let summary_key = format!("ff:flow:{}:{}:summary", tag, fid_str);
260
261 let core_exists: bool = client.exists(&core_key).await.unwrap_or(true);
262 if !core_exists {
263 let _: Option<i64> = client
264 .cmd("SREM")
265 .arg(&flow_index_key)
266 .arg(fid_str)
267 .execute()
268 .await
269 .unwrap_or(None);
270 return Ok(false);
271 }
272
273 let true_total: u64 = client
274 .cmd("SCARD")
275 .arg(&members_key)
276 .execute()
277 .await
278 .unwrap_or(0);
279 if true_total == 0 {
280 return Ok(false);
281 }
282
283 let member_eids: Vec<String> = client
284 .cmd("SRANDMEMBER")
285 .arg(&members_key)
286 .arg(BATCH_SIZE.to_string().as_str())
287 .execute()
288 .await
289 .unwrap_or_default();
290 if member_eids.is_empty() {
291 return Ok(false);
292 }
293
294 let mut counts: HashMap<String, u32> = HashMap::new();
295 let mut sampled: u32 = 0;
296 for eid_str in &member_eids {
297 let eid = match ff_core::types::ExecutionId::parse(eid_str) {
298 Ok(id) => id,
299 Err(_) => continue,
300 };
301 let member_partition = ff_core::partition::execution_partition(&eid, config);
302 let ctx_tag = member_partition.hash_tag();
303 let member_core = format!("ff:exec:{}:{}:core", ctx_tag, eid_str);
304
305 let ps: Option<String> = client
306 .cmd("HGET")
307 .arg(&member_core)
308 .arg("public_state")
309 .execute()
310 .await
311 .unwrap_or(None);
312 let state = ps.unwrap_or_else(|| "unknown".to_string());
313 *counts.entry(state).or_insert(0) += 1;
314 sampled += 1;
315 }
316
317 let completed = *counts.get("completed").unwrap_or(&0);
318 let skipped = *counts.get("skipped").unwrap_or(&0);
319 let failed = *counts.get("failed").unwrap_or(&0);
320 let cancelled = *counts.get("cancelled").unwrap_or(&0);
321 let expired = *counts.get("expired").unwrap_or(&0);
322 let active = *counts.get("active").unwrap_or(&0);
323 let suspended = *counts.get("suspended").unwrap_or(&0);
324 let waiting = *counts.get("waiting").unwrap_or(&0);
325 let delayed = *counts.get("delayed").unwrap_or(&0);
326 let rate_limited = *counts.get("rate_limited").unwrap_or(&0);
327 let waiting_children = *counts.get("waiting_children").unwrap_or(&0);
328 let terminal_count = completed + skipped + failed + cancelled + expired;
329 let all_terminal = terminal_count == sampled && sampled > 0;
330 let flow_state = if all_terminal {
331 if failed > 0 || cancelled > 0 || expired > 0 {
332 "failed"
333 } else {
334 "completed"
335 }
336 } else if active > 0 {
337 "running"
338 } else if suspended > 0 || delayed > 0 || rate_limited > 0 || waiting_children > 0 {
339 "blocked"
340 } else {
341 "open"
342 };
343
344 let now_s = now_ms.0.to_string();
345 let _: () = client
346 .cmd("HSET")
347 .arg(&summary_key)
348 .arg("total_members").arg(true_total.to_string().as_str())
349 .arg("sampled_members").arg(sampled.to_string().as_str())
350 .arg("members_completed").arg(completed.to_string().as_str())
351 .arg("members_failed").arg(failed.to_string().as_str())
352 .arg("members_cancelled").arg(cancelled.to_string().as_str())
353 .arg("members_expired").arg(expired.to_string().as_str())
354 .arg("members_skipped").arg(skipped.to_string().as_str())
355 .arg("members_active").arg(active.to_string().as_str())
356 .arg("members_suspended").arg(suspended.to_string().as_str())
357 .arg("members_waiting").arg(waiting.to_string().as_str())
358 .arg("members_delayed").arg(delayed.to_string().as_str())
359 .arg("members_rate_limited").arg(rate_limited.to_string().as_str())
360 .arg("members_waiting_children").arg(waiting_children.to_string().as_str())
361 .arg("public_flow_state").arg(flow_state)
362 .arg("last_summary_update_at").arg(now_s.as_str())
363 .execute()
364 .await?;
365
366 if all_terminal && (sampled as u64) == true_total {
367 let _: Option<i64> = client
368 .cmd("SREM")
369 .arg(&flow_index_key)
370 .arg(fid_str)
371 .execute()
372 .await
373 .unwrap_or(None);
374 }
375
376 Ok(true)
377}
378
379fn parse_sscan_response(val: &ferriskey::Value) -> (String, Vec<String>) {
383 let arr = match val {
384 ferriskey::Value::Array(a) if a.len() >= 2 => a,
385 _ => return ("0".to_string(), vec![]),
386 };
387
388 let cursor = match &arr[0] {
389 Ok(ferriskey::Value::BulkString(b)) => String::from_utf8_lossy(b).into_owned(),
390 Ok(ferriskey::Value::SimpleString(s)) => s.clone(),
391 _ => return ("0".to_string(), vec![]),
392 };
393
394 let mut members = Vec::new();
395 match &arr[1] {
396 Ok(ferriskey::Value::Array(inner)) => {
397 for item in inner {
398 if let Ok(ferriskey::Value::BulkString(b)) = item {
399 members.push(String::from_utf8_lossy(b).into_owned());
400 }
401 }
402 }
403 Ok(ferriskey::Value::Set(inner)) => {
404 for item in inner {
405 if let ferriskey::Value::BulkString(b) = item {
406 members.push(String::from_utf8_lossy(b).into_owned());
407 }
408 }
409 }
410 _ => {}
411 }
412
413 (cursor, members)
414}