1use ff_core::contracts::*;
4use crate::error::ScriptError;
5use ff_core::keys::{usage_dedup_key, ExecKeyContext, IndexKeys};
6
7use crate::result::{FcallResult, FromFcallResult};
8
9pub const MAX_BUDGET_DIMENSIONS: usize = 64;
20
21pub struct BudgetOpKeys<'a> {
30 pub usage_key: &'a str,
31 pub limits_key: &'a str,
32 pub def_key: &'a str,
33 pub hash_tag: &'a str,
34}
35
36pub struct BlockOpKeys<'a> {
38 pub ctx: &'a ExecKeyContext,
39 pub idx: &'a IndexKeys,
40 pub lane_id: &'a ff_core::types::LaneId,
41}
42
43pub async fn ff_create_budget(
54 conn: &ferriskey::Client,
55 k: &BudgetOpKeys<'_>,
56 resets_zset: &str,
57 policies_index: &str,
58 args: &CreateBudgetArgs,
59) -> Result<CreateBudgetResult, ScriptError> {
60 let keys: Vec<String> = vec![
61 k.def_key.to_string(),
62 k.limits_key.to_string(),
63 k.usage_key.to_string(),
64 resets_zset.to_string(),
65 policies_index.to_string(),
66 ];
67
68 let dim_count = args.dimensions.len();
69 if dim_count > MAX_BUDGET_DIMENSIONS {
71 return Err(ScriptError::Parse {
72 fcall: "ff_create_budget".into(),
73 execution_id: None,
74 message: format!(
75 "too_many_dimensions: limit={}, got={}",
76 MAX_BUDGET_DIMENSIONS, dim_count
77 ),
78 });
79 }
80 if args.hard_limits.len() != dim_count {
81 return Err(ScriptError::Parse {
82 fcall: "ff_create_budget".into(),
83 execution_id: None,
84 message: format!(
85 "dimension_limit_array_mismatch: dimensions={} hard_limits={}",
86 dim_count,
87 args.hard_limits.len()
88 ),
89 });
90 }
91 if args.soft_limits.len() != dim_count {
92 return Err(ScriptError::Parse {
93 fcall: "ff_create_budget".into(),
94 execution_id: None,
95 message: format!(
96 "dimension_limit_array_mismatch: dimensions={} soft_limits={}",
97 dim_count,
98 args.soft_limits.len()
99 ),
100 });
101 }
102 let mut argv: Vec<String> = Vec::with_capacity(9 + dim_count * 3);
106 argv.push(args.budget_id.to_string());
107 argv.push(args.scope_type.clone());
108 argv.push(args.scope_id.clone());
109 argv.push(args.enforcement_mode.clone());
110 argv.push(args.on_hard_limit.clone());
111 argv.push(args.on_soft_limit.clone());
112 argv.push(args.reset_interval_ms.to_string());
113 argv.push(args.now.to_string());
114 argv.push(dim_count.to_string());
115 for dim in &args.dimensions {
116 argv.push(dim.clone());
117 }
118 for hard in &args.hard_limits {
119 argv.push(hard.to_string());
120 }
121 for soft in &args.soft_limits {
122 argv.push(soft.to_string());
123 }
124
125 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
126 let argv_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
127 let raw = conn
128 .fcall::<ferriskey::Value>("ff_create_budget", &key_refs, &argv_refs)
129 .await
130 .map_err(ScriptError::Valkey)?;
131 <CreateBudgetResult as FromFcallResult>::from_fcall_result(&raw)
132}
133
134impl FromFcallResult for CreateBudgetResult {
135 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
136 let r = FcallResult::parse(raw)?.into_success()?;
137 let id_str = r.field_str(0);
138 let budget_id = ff_core::types::BudgetId::parse(&id_str)
139 .map_err(|e| ScriptError::Parse {
140 fcall: "ff_create_budget".into(),
141 execution_id: None,
142 message: format!("invalid budget_id: {e}"),
143 })?;
144 match r.status.as_str() {
145 "OK" => Ok(CreateBudgetResult::Created { budget_id }),
146 "ALREADY_SATISFIED" => Ok(CreateBudgetResult::AlreadySatisfied { budget_id }),
147 _ => Err(ScriptError::Parse {
148 fcall: "ff_create_budget".into(),
149 execution_id: None,
150 message: format!("unexpected status: {}", r.status),
151 }),
152 }
153 }
154}
155
156pub async fn ff_report_usage_and_check(
165 conn: &ferriskey::Client,
166 k: &BudgetOpKeys<'_>,
167 args: &ReportUsageArgs,
168) -> Result<ReportUsageResult, ScriptError> {
169 let keys: Vec<String> = vec![
170 k.usage_key.to_string(),
171 k.limits_key.to_string(),
172 k.def_key.to_string(),
173 ];
174
175 let dim_count = args.dimensions.len();
177 if dim_count > MAX_BUDGET_DIMENSIONS {
179 return Err(ScriptError::Parse {
180 fcall: "ff_report_usage_and_check".into(),
181 execution_id: None,
182 message: format!(
183 "too_many_dimensions: limit={}, got={}",
184 MAX_BUDGET_DIMENSIONS, dim_count
185 ),
186 });
187 }
188 if args.deltas.len() != dim_count {
189 return Err(ScriptError::Parse {
190 fcall: "ff_report_usage_and_check".into(),
191 execution_id: None,
192 message: format!(
193 "dimension_delta_array_mismatch: dimensions={} deltas={}",
194 dim_count,
195 args.deltas.len()
196 ),
197 });
198 }
199 let mut argv: Vec<String> = Vec::with_capacity(3 + dim_count * 2);
200 argv.push(dim_count.to_string());
201 for dim in &args.dimensions {
202 argv.push(dim.clone());
203 }
204 for delta in &args.deltas {
205 argv.push(delta.to_string());
206 }
207 argv.push(args.now.to_string());
208 let dedup_key_val = args.dedup_key
214 .as_deref()
215 .filter(|s| !s.is_empty())
216 .map(|s| usage_dedup_key(k.hash_tag, s))
217 .unwrap_or_default();
218 argv.push(dedup_key_val);
219
220 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
221 let argv_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
222 let raw = conn
223 .fcall::<ferriskey::Value>("ff_report_usage_and_check", &key_refs, &argv_refs)
224 .await
225 .map_err(ScriptError::Valkey)?;
226 <ReportUsageResult as FromFcallResult>::from_fcall_result(&raw)
227}
228
229impl FromFcallResult for ReportUsageResult {
230 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
231 let r = FcallResult::parse(raw)?.into_success()?;
232 match r.status.as_str() {
233 "OK" => Ok(ReportUsageResult::Ok),
234 "ALREADY_APPLIED" => Ok(ReportUsageResult::AlreadyApplied),
235 "SOFT_BREACH" => {
236 let dim = r.field_str(0);
237 let current: u64 = r.field_str(1).parse().unwrap_or(0);
238 let limit: u64 = r.field_str(2).parse().unwrap_or(0);
239 Ok(ReportUsageResult::SoftBreach { dimension: dim, current_usage: current, soft_limit: limit })
240 }
241 "HARD_BREACH" => {
242 let dim = r.field_str(0);
243 let current: u64 = r.field_str(1).parse().unwrap_or(0);
244 let limit: u64 = r.field_str(2).parse().unwrap_or(0);
245 Ok(ReportUsageResult::HardBreach {
246 dimension: dim,
247 current_usage: current,
248 hard_limit: limit,
249 })
250 }
251 _ => Err(ScriptError::Parse {
252 fcall: "ff_report_usage_and_check".into(),
253 execution_id: None,
254 message: format!("unknown budget status: {}", r.status),
255 }),
256 }
257 }
258}
259
260pub async fn ff_reset_budget(
269 conn: &ferriskey::Client,
270 k: &BudgetOpKeys<'_>,
271 resets_zset: &str,
272 args: &ResetBudgetArgs,
273) -> Result<ResetBudgetResult, ScriptError> {
274 let keys: Vec<String> = vec![
275 k.def_key.to_string(),
276 k.usage_key.to_string(),
277 resets_zset.to_string(),
278 ];
279 let argv: Vec<String> = vec![
280 args.budget_id.to_string(),
281 args.now.to_string(),
282 ];
283 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
284 let argv_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
285 let raw = conn
286 .fcall::<ferriskey::Value>("ff_reset_budget", &key_refs, &argv_refs)
287 .await
288 .map_err(ScriptError::Valkey)?;
289 <ResetBudgetResult as FromFcallResult>::from_fcall_result(&raw)
290}
291
292impl FromFcallResult for ResetBudgetResult {
293 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
294 let r = FcallResult::parse(raw)?.into_success()?;
295 let next_str = r.field_str(0);
296 let next_ms: i64 = next_str.parse().unwrap_or(0);
297 Ok(ResetBudgetResult::Reset {
298 next_reset_at: ff_core::types::TimestampMs::from_millis(next_ms),
299 })
300 }
301}
302
303ff_function! {
309 pub ff_block_execution_for_admission(args: BlockExecutionArgs) -> BlockExecutionResult {
310 keys(k: &BlockOpKeys<'_>) {
311 k.ctx.core(),
312 k.idx.lane_eligible(k.lane_id),
313 {
314 match args.blocking_reason.as_str() {
315 "waiting_for_budget" => k.idx.lane_blocked_budget(k.lane_id),
316 "waiting_for_quota" => k.idx.lane_blocked_quota(k.lane_id),
317 _ => k.idx.lane_blocked_budget(k.lane_id),
318 }
319 },
320 }
321 argv {
322 args.execution_id.to_string(),
323 args.blocking_reason.clone(),
324 args.blocking_detail.clone().unwrap_or_default(),
325 args.now.to_string(),
326 }
327 }
328}
329
330impl FromFcallResult for BlockExecutionResult {
331 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
332 let _r = FcallResult::parse(raw)?.into_success()?;
333 Ok(BlockExecutionResult::Blocked)
334 }
335}
336
337ff_function! {
343 pub ff_unblock_execution(args: UnblockExecutionArgs) -> UnblockExecutionResult {
344 keys(k: &BlockOpKeys<'_>) {
345 k.ctx.core(),
346 {
347 match args.expected_blocking_reason.as_deref().unwrap_or("waiting_for_budget") {
348 "waiting_for_budget" => k.idx.lane_blocked_budget(k.lane_id),
349 "waiting_for_quota" => k.idx.lane_blocked_quota(k.lane_id),
350 _ => k.idx.lane_blocked_budget(k.lane_id),
351 }
352 },
353 k.idx.lane_eligible(k.lane_id),
354 }
355 argv {
356 args.execution_id.to_string(),
357 args.now.to_string(),
358 args.expected_blocking_reason.clone().unwrap_or_default(),
359 }
360 }
361}
362
363impl FromFcallResult for UnblockExecutionResult {
364 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
365 let _r = FcallResult::parse(raw)?.into_success()?;
366 Ok(UnblockExecutionResult::Unblocked)
367 }
368}
369