1use ff_core::keys::{ExecKeyContext, IndexKeys};
9use ff_core::partition::{
10 Partition, PartitionConfig, PartitionFamily, execution_partition,
11};
12use ff_core::types::ExecutionId;
13
14pub struct PartitionRouter {
21 config: PartitionConfig,
22}
23
24impl PartitionRouter {
25 pub fn new(config: PartitionConfig) -> Self {
26 Self { config }
27 }
28
29 pub fn partition_for(&self, eid: &ExecutionId) -> Partition {
31 execution_partition(eid, &self.config)
32 }
33
34 pub fn exec_keys(&self, eid: &ExecutionId) -> ExecKeyContext {
36 let partition = self.partition_for(eid);
37 ExecKeyContext::new(&partition, eid)
38 }
39
40 pub fn index_keys(&self, partition_index: u16) -> IndexKeys {
42 let partition = Partition {
43 family: PartitionFamily::Execution,
44 index: partition_index,
45 };
46 IndexKeys::new(&partition)
47 }
48
49 pub fn config(&self) -> &PartitionConfig {
51 &self.config
52 }
53
54 pub fn num_flow_partitions(&self) -> u16 {
60 self.config.num_flow_partitions
61 }
62}
63
64pub async fn dispatch_dependency_resolution(
75 client: &ferriskey::Client,
76 router: &PartitionRouter,
77 eid: &ExecutionId,
78 flow_id: Option<&str>,
79) {
80 dispatch_dependency_resolution_inner(client, router, eid, flow_id, 0).await;
81}
82
83const MAX_CASCADE_DEPTH: u32 = 50;
85
86async fn dispatch_dependency_resolution_inner(
87 client: &ferriskey::Client,
88 router: &PartitionRouter,
89 eid: &ExecutionId,
90 flow_id: Option<&str>,
91 cascade_depth: u32,
92) {
93 if cascade_depth > MAX_CASCADE_DEPTH {
94 tracing::warn!(
95 execution_id = %eid,
96 cascade_depth,
97 "dispatch_dep: cascade depth limit reached, reconciler will catch remaining"
98 );
99 return;
100 }
101
102 let flow_id_str = match flow_id {
103 Some(fid) if !fid.is_empty() => fid,
104 _ => return, };
106
107 let exec_ctx = router.exec_keys(eid);
109 let core_key = exec_ctx.core();
110 let outcome: Option<String> = match client
111 .cmd("HGET")
112 .arg(&core_key)
113 .arg("terminal_outcome")
114 .execute()
115 .await
116 {
117 Ok(v) => v,
118 Err(e) => {
119 tracing::warn!(
120 execution_id = %eid,
121 error = %e,
122 "dispatch_dep: failed to read terminal_outcome"
123 );
124 return;
125 }
126 };
127
128 let outcome_str = outcome.unwrap_or_default();
129 let upstream_outcome = outcome_str.as_str();
133
134 let fid = match ff_core::types::FlowId::parse(flow_id_str) {
137 Ok(id) => id,
138 Err(e) => {
139 tracing::warn!(
140 flow_id = flow_id_str,
141 error = %e,
142 "dispatch_dep: invalid flow_id"
143 );
144 return;
145 }
146 };
147
148 let flow_partition = ff_core::partition::flow_partition(&fid, router.config());
149 let flow_ctx = ff_core::keys::FlowKeyContext::new(&flow_partition, &fid);
150
151 let out_key = flow_ctx.outgoing(eid);
153 let edge_ids: Vec<String> = match client
154 .cmd("SMEMBERS")
155 .arg(&out_key)
156 .execute()
157 .await
158 {
159 Ok(ids) => ids,
160 Err(e) => {
161 tracing::warn!(
162 execution_id = %eid,
163 flow_id = flow_id_str,
164 error = %e,
165 "dispatch_dep: SMEMBERS outgoing failed"
166 );
167 return;
168 }
169 };
170
171 if edge_ids.is_empty() {
172 return;
173 }
174
175 let now_ms = ff_core::types::TimestampMs::now().0.to_string();
176 let mut resolved: u32 = 0;
177 let mut skipped_children: Vec<(ExecutionId, String)> = Vec::new();
178
179 for edge_id in &edge_ids {
180 let edge_key = flow_ctx.edge(&ff_core::types::EdgeId::parse(edge_id).unwrap_or_default());
182 let downstream_eid_str: Option<String> = match client
183 .cmd("HGET")
184 .arg(&edge_key)
185 .arg("downstream_execution_id")
186 .execute()
187 .await
188 {
189 Ok(v) => v,
190 Err(_) => continue,
191 };
192
193 let downstream_eid_str = match downstream_eid_str {
194 Some(s) if !s.is_empty() => s,
195 _ => continue,
196 };
197
198 let downstream_eid = match ExecutionId::parse(&downstream_eid_str) {
199 Ok(id) => id,
200 Err(_) => continue,
201 };
202
203 let child_partition = router.partition_for(&downstream_eid);
205 let child_ctx = ExecKeyContext::new(&child_partition, &downstream_eid);
206 let child_idx = IndexKeys::new(&child_partition);
207
208 let child_core_key = child_ctx.core();
210 let lane_str: Option<String> = client
211 .cmd("HGET")
212 .arg(&child_core_key)
213 .arg("lane_id")
214 .execute()
215 .await
216 .unwrap_or(None);
217 let lane_id = ff_core::types::LaneId::new(
218 lane_str.as_deref().unwrap_or("default"),
219 );
220
221 let att_idx_str: Option<String> = client
222 .cmd("HGET")
223 .arg(&child_core_key)
224 .arg("current_attempt_index")
225 .execute()
226 .await
227 .unwrap_or(None);
228 let att_idx = ff_core::types::AttemptIndex::new(
229 att_idx_str.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0),
230 );
231
232 let dep_hash = child_ctx.dep_edge(
233 &ff_core::types::EdgeId::parse(edge_id).unwrap_or_default(),
234 );
235
236 let deps_meta = child_ctx.deps_meta();
246 let unresolved = child_ctx.deps_unresolved();
247 let eligible = child_idx.lane_eligible(&lane_id);
248 let terminal = child_idx.lane_terminal(&lane_id);
249 let blocked_deps = child_idx.lane_blocked_dependencies(&lane_id);
250 let attempt_hash = child_ctx.attempt_hash(att_idx);
251 let stream_meta = child_ctx.stream_meta(att_idx);
252 let downstream_payload = child_ctx.payload();
253 let upstream_ctx = ExecKeyContext::new(&child_partition, eid);
254 let upstream_result = upstream_ctx.result();
255
256 let keys: [&str; 11] = [
257 &child_core_key, &deps_meta, &unresolved, &dep_hash, &eligible, &terminal, &blocked_deps, &attempt_hash, &stream_meta, &downstream_payload, &upstream_result, ];
269 let argv: [&str; 3] = [edge_id, upstream_outcome, &now_ms];
270
271 match client
272 .fcall::<ferriskey::Value>("ff_resolve_dependency", &keys, &argv)
273 .await
274 {
275 Ok(val) => {
276 resolved += 1;
277 tracing::debug!(
278 edge_id = edge_id.as_str(),
279 downstream = downstream_eid_str.as_str(),
280 outcome = upstream_outcome,
281 "dispatch_dep: resolved dependency"
282 );
283 if is_child_skipped_result(&val) {
286 skipped_children.push((
287 downstream_eid.clone(),
288 flow_id_str.to_string(),
289 ));
290 }
291 }
292 Err(e) => {
293 tracing::warn!(
294 edge_id = edge_id.as_str(),
295 downstream = downstream_eid_str.as_str(),
296 error = %e,
297 "dispatch_dep: ff_resolve_dependency failed"
298 );
299 }
300 }
301 }
302
303 if resolved > 0 {
304 tracing::info!(
305 execution_id = %eid,
306 flow_id = flow_id_str,
307 resolved,
308 total_edges = edge_ids.len(),
309 skipped_cascade = skipped_children.len(),
310 "dispatch_dep: dependency resolution complete"
311 );
312 }
313
314 for (child_eid, child_flow_id) in &skipped_children {
318 Box::pin(dispatch_dependency_resolution_inner(
319 client, router, child_eid, Some(child_flow_id.as_str()),
320 cascade_depth + 1,
321 )).await;
322 }
323}
324
325fn is_child_skipped_result(value: &ferriskey::Value) -> bool {
335 match value {
336 ferriskey::Value::Array(arr) => {
337 if arr.len() < 4 {
338 return false;
341 }
342 arr.get(3)
343 .and_then(|v| match v {
344 Ok(ferriskey::Value::BulkString(b)) => {
345 Some(&b[..] == b"child_skipped")
346 }
347 Ok(ferriskey::Value::SimpleString(s)) => {
348 Some(s == "child_skipped")
349 }
350 _ => None,
351 })
352 .unwrap_or(false)
353 }
354 _ => {
355 tracing::warn!(
356 "is_child_skipped_result: expected Array, got non-array value"
357 );
358 false
359 }
360 }
361}