Skip to main content

ff_script/functions/
budget.rs

1//! Typed FCALL wrappers for budget functions (lua/budget.lua).
2
3use ff_core::contracts::*;
4use crate::error::ScriptError;
5use ff_core::keys::{usage_dedup_key, ExecKeyContext, IndexKeys};
6
7use crate::result::{FcallResult, FromFcallResult};
8
9/// Single source of truth for the budget dimension cap (#104).
10///
11/// Enforced at both the HTTP boundary in `ff-server` (which re-exports this
12/// constant) and inside the typed FCALL wrappers below, so direct
13/// script-helper callers (tests, tools, alternate services) cannot reach
14/// Valkey with an unbounded `dim_count` by skipping the REST layer.
15///
16/// 64 is generously above any legitimate scoping dimension count
17/// (org/tenant/project/region/lane/tier/…) while bounding worst-case
18/// FCALL ARGV to ~200 strings — well below Valkey argv limits.
19pub const MAX_BUDGET_DIMENSIONS: usize = 64;
20
21/// Key context for budget operations on {b:M}.
22///
23/// `hash_tag` is the budget partition's Valkey hash-tag (e.g. `{b:3}`,
24/// braces included) so the typed wrapper can wrap any per-call `dedup_key`
25/// into a slot-co-located `ff:usagededup:{b:M}:<dedup_id>` key without the
26/// caller needing to know the wrapping format. The canonical source for
27/// the value to pass is [`ff_core::keys::BudgetKeyContext::hash_tag`]
28/// (#108).
29pub struct BudgetOpKeys<'a> {
30    pub usage_key: &'a str,
31    pub limits_key: &'a str,
32    pub def_key: &'a str,
33    pub hash_tag: &'a str,
34}
35
36/// Key context for budget block/unblock on {p:N}.
37pub struct BlockOpKeys<'a> {
38    pub ctx: &'a ExecKeyContext,
39    pub idx: &'a IndexKeys,
40    pub lane_id: &'a ff_core::types::LaneId,
41}
42
43// ─── ff_create_budget ─────────────────────────────────────────────────
44//
45// Lua KEYS (5): budget_def, budget_limits, budget_usage, budget_resets_zset,
46//               budget_policies_index
47// Lua ARGV (variable): budget_id, scope_type, scope_id, enforcement_mode,
48//   on_hard_limit, on_soft_limit, reset_interval_ms, now_ms,
49//   dimension_count, dim_1..dim_N, hard_1..hard_N, soft_1..soft_N
50//
51// Manual implementation because ff_function! macro cannot handle variable-length ARGV.
52
53pub async fn ff_create_budget(
54    conn: &ferriskey::Client,
55    k: &BudgetOpKeys<'_>,
56    resets_zset: &str,
57    policies_index: &str,
58    args: &CreateBudgetArgs,
59) -> Result<CreateBudgetResult, ScriptError> {
60    let keys: Vec<String> = vec![
61        k.def_key.to_string(),
62        k.limits_key.to_string(),
63        k.usage_key.to_string(),
64        resets_zset.to_string(),
65        policies_index.to_string(),
66    ];
67
68    let dim_count = args.dimensions.len();
69    // Cap ARGV before allocation — see MAX_BUDGET_DIMENSIONS (#104).
70    if dim_count > MAX_BUDGET_DIMENSIONS {
71        return Err(ScriptError::Parse {
72            fcall: "ff_create_budget".into(),
73            execution_id: None,
74            message: format!(
75            "too_many_dimensions: limit={}, got={}",
76            MAX_BUDGET_DIMENSIONS, dim_count
77        ),
78        });
79    }
80    if args.hard_limits.len() != dim_count {
81        return Err(ScriptError::Parse {
82            fcall: "ff_create_budget".into(),
83            execution_id: None,
84            message: format!(
85            "dimension_limit_array_mismatch: dimensions={} hard_limits={}",
86            dim_count,
87            args.hard_limits.len()
88        ),
89        });
90    }
91    if args.soft_limits.len() != dim_count {
92        return Err(ScriptError::Parse {
93            fcall: "ff_create_budget".into(),
94            execution_id: None,
95            message: format!(
96            "dimension_limit_array_mismatch: dimensions={} soft_limits={}",
97            dim_count,
98            args.soft_limits.len()
99        ),
100        });
101    }
102    // ARGV: budget_id, scope_type, scope_id, enforcement_mode,
103    //   on_hard_limit, on_soft_limit, reset_interval_ms, now_ms,
104    //   dim_count, dim_1..dim_N, hard_1..hard_N, soft_1..soft_N
105    let mut argv: Vec<String> = Vec::with_capacity(9 + dim_count * 3);
106    argv.push(args.budget_id.to_string());
107    argv.push(args.scope_type.clone());
108    argv.push(args.scope_id.clone());
109    argv.push(args.enforcement_mode.clone());
110    argv.push(args.on_hard_limit.clone());
111    argv.push(args.on_soft_limit.clone());
112    argv.push(args.reset_interval_ms.to_string());
113    argv.push(args.now.to_string());
114    argv.push(dim_count.to_string());
115    for dim in &args.dimensions {
116        argv.push(dim.clone());
117    }
118    for hard in &args.hard_limits {
119        argv.push(hard.to_string());
120    }
121    for soft in &args.soft_limits {
122        argv.push(soft.to_string());
123    }
124
125    let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
126    let argv_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
127    let raw = conn
128        .fcall::<ferriskey::Value>("ff_create_budget", &key_refs, &argv_refs)
129        .await
130        .map_err(ScriptError::Valkey)?;
131    <CreateBudgetResult as FromFcallResult>::from_fcall_result(&raw)
132}
133
134impl FromFcallResult for CreateBudgetResult {
135    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
136        let r = FcallResult::parse(raw)?.into_success()?;
137        let id_str = r.field_str(0);
138        let budget_id = ff_core::types::BudgetId::parse(&id_str)
139            .map_err(|e| ScriptError::Parse {
140                fcall: "ff_create_budget".into(),
141                execution_id: None,
142                message: format!("invalid budget_id: {e}"),
143            })?;
144        match r.status.as_str() {
145            "OK" => Ok(CreateBudgetResult::Created { budget_id }),
146            "ALREADY_SATISFIED" => Ok(CreateBudgetResult::AlreadySatisfied { budget_id }),
147            _ => Err(ScriptError::Parse {
148                fcall: "ff_create_budget".into(),
149                execution_id: None,
150                message: format!("unexpected status: {}", r.status),
151            }),
152        }
153    }
154}
155
156// ─── ff_report_usage_and_check ────────────────────────────────────────
157//
158// Lua KEYS (3): budget_usage, budget_limits, budget_def
159// Lua ARGV (variable): dimension_count, dim_1..dim_N, delta_1..delta_N, now_ms, [dedup_key]
160//
161// Manual implementation because ff_function! macro cannot handle variable-length
162// ARGV. The Lua reads positional args: [dim_count, dim1..dimN, delta1..deltaN, now_ms, dedup_key].
163
164pub async fn ff_report_usage_and_check(
165    conn: &ferriskey::Client,
166    k: &BudgetOpKeys<'_>,
167    args: &ReportUsageArgs,
168) -> Result<ReportUsageResult, ScriptError> {
169    let keys: Vec<String> = vec![
170        k.usage_key.to_string(),
171        k.limits_key.to_string(),
172        k.def_key.to_string(),
173    ];
174
175    // Build flat ARGV: [dim_count, dim1..dimN, delta1..deltaN, now_ms, dedup_key]
176    let dim_count = args.dimensions.len();
177    // Cap ARGV before allocation — see MAX_BUDGET_DIMENSIONS (#104).
178    if dim_count > MAX_BUDGET_DIMENSIONS {
179        return Err(ScriptError::Parse {
180            fcall: "ff_report_usage_and_check".into(),
181            execution_id: None,
182            message: format!(
183            "too_many_dimensions: limit={}, got={}",
184            MAX_BUDGET_DIMENSIONS, dim_count
185        ),
186        });
187    }
188    if args.deltas.len() != dim_count {
189        return Err(ScriptError::Parse {
190            fcall: "ff_report_usage_and_check".into(),
191            execution_id: None,
192            message: format!(
193            "dimension_delta_array_mismatch: dimensions={} deltas={}",
194            dim_count,
195            args.deltas.len()
196        ),
197        });
198    }
199    let mut argv: Vec<String> = Vec::with_capacity(3 + dim_count * 2);
200    argv.push(dim_count.to_string());
201    for dim in &args.dimensions {
202        argv.push(dim.clone());
203    }
204    for delta in &args.deltas {
205        argv.push(delta.to_string());
206    }
207    argv.push(args.now.to_string());
208    // #108: wrap dedup_key with the budget hash-tag so it co-locates with
209    // `k.usage_key`/`k.limits_key`/`k.def_key` on the same cluster slot and
210    // matches the format produced by `ff-server` + `ff-sdk` at the REST
211    // boundary. Empty/missing dedup_key forwards as empty string
212    // (Lua disables dedup in that branch).
213    let dedup_key_val = args.dedup_key
214        .as_deref()
215        .filter(|s| !s.is_empty())
216        .map(|s| usage_dedup_key(k.hash_tag, s))
217        .unwrap_or_default();
218    argv.push(dedup_key_val);
219
220    let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
221    let argv_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
222    let raw = conn
223        .fcall::<ferriskey::Value>("ff_report_usage_and_check", &key_refs, &argv_refs)
224        .await
225        .map_err(ScriptError::Valkey)?;
226    <ReportUsageResult as FromFcallResult>::from_fcall_result(&raw)
227}
228
229impl FromFcallResult for ReportUsageResult {
230    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
231        let r = FcallResult::parse(raw)?.into_success()?;
232        match r.status.as_str() {
233            "OK" => Ok(ReportUsageResult::Ok),
234            "ALREADY_APPLIED" => Ok(ReportUsageResult::AlreadyApplied),
235            "SOFT_BREACH" => {
236                let dim = r.field_str(0);
237                let current: u64 = r.field_str(1).parse().unwrap_or(0);
238                let limit: u64 = r.field_str(2).parse().unwrap_or(0);
239                Ok(ReportUsageResult::SoftBreach { dimension: dim, current_usage: current, soft_limit: limit })
240            }
241            "HARD_BREACH" => {
242                let dim = r.field_str(0);
243                let current: u64 = r.field_str(1).parse().unwrap_or(0);
244                let limit: u64 = r.field_str(2).parse().unwrap_or(0);
245                Ok(ReportUsageResult::HardBreach {
246                    dimension: dim,
247                    current_usage: current,
248                    hard_limit: limit,
249                })
250            }
251            _ => Err(ScriptError::Parse {
252                fcall: "ff_report_usage_and_check".into(),
253                execution_id: None,
254                message: format!("unknown budget status: {}", r.status),
255            }),
256        }
257    }
258}
259
260// ─── ff_reset_budget ──────────────────────────────────────────────────
261//
262// Lua KEYS (3): budget_def, budget_usage, budget_resets_zset
263// Lua ARGV (2): budget_id, now_ms
264//
265// Manual implementation: BudgetOpKeys doesn't carry resets_zset, so we
266// accept it as a separate parameter (same pattern as ff_create_budget).
267
268pub async fn ff_reset_budget(
269    conn: &ferriskey::Client,
270    k: &BudgetOpKeys<'_>,
271    resets_zset: &str,
272    args: &ResetBudgetArgs,
273) -> Result<ResetBudgetResult, ScriptError> {
274    let keys: Vec<String> = vec![
275        k.def_key.to_string(),
276        k.usage_key.to_string(),
277        resets_zset.to_string(),
278    ];
279    let argv: Vec<String> = vec![
280        args.budget_id.to_string(),
281        args.now.to_string(),
282    ];
283    let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
284    let argv_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
285    let raw = conn
286        .fcall::<ferriskey::Value>("ff_reset_budget", &key_refs, &argv_refs)
287        .await
288        .map_err(ScriptError::Valkey)?;
289    <ResetBudgetResult as FromFcallResult>::from_fcall_result(&raw)
290}
291
292impl FromFcallResult for ResetBudgetResult {
293    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
294        let r = FcallResult::parse(raw)?.into_success()?;
295        let next_str = r.field_str(0);
296        let next_ms: i64 = next_str.parse().unwrap_or(0);
297        Ok(ResetBudgetResult::Reset {
298            next_reset_at: ff_core::types::TimestampMs::from_millis(next_ms),
299        })
300    }
301}
302
303// ─── ff_block_execution_for_admission ─────────────────────────────────
304//
305// Lua KEYS (3): exec_core, eligible_zset, target_blocked_zset
306// Lua ARGV (4): execution_id, blocking_reason, blocking_detail, now_ms
307
308ff_function! {
309    pub ff_block_execution_for_admission(args: BlockExecutionArgs) -> BlockExecutionResult {
310        keys(k: &BlockOpKeys<'_>) {
311            k.ctx.core(),
312            k.idx.lane_eligible(k.lane_id),
313            {
314                match args.blocking_reason.as_str() {
315                    "waiting_for_budget" => k.idx.lane_blocked_budget(k.lane_id),
316                    "waiting_for_quota" => k.idx.lane_blocked_quota(k.lane_id),
317                    _ => k.idx.lane_blocked_budget(k.lane_id),
318                }
319            },
320        }
321        argv {
322            args.execution_id.to_string(),
323            args.blocking_reason.clone(),
324            args.blocking_detail.clone().unwrap_or_default(),
325            args.now.to_string(),
326        }
327    }
328}
329
330impl FromFcallResult for BlockExecutionResult {
331    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
332        let _r = FcallResult::parse(raw)?.into_success()?;
333        Ok(BlockExecutionResult::Blocked)
334    }
335}
336
337// ─── ff_unblock_execution ─────────────────────────────────────────────
338//
339// Lua KEYS (3): exec_core, source_blocked_zset, eligible_zset
340// Lua ARGV (3): execution_id, now_ms, expected_blocking_reason
341
342ff_function! {
343    pub ff_unblock_execution(args: UnblockExecutionArgs) -> UnblockExecutionResult {
344        keys(k: &BlockOpKeys<'_>) {
345            k.ctx.core(),
346            {
347                match args.expected_blocking_reason.as_deref().unwrap_or("waiting_for_budget") {
348                    "waiting_for_budget" => k.idx.lane_blocked_budget(k.lane_id),
349                    "waiting_for_quota" => k.idx.lane_blocked_quota(k.lane_id),
350                    _ => k.idx.lane_blocked_budget(k.lane_id),
351                }
352            },
353            k.idx.lane_eligible(k.lane_id),
354        }
355        argv {
356            args.execution_id.to_string(),
357            args.now.to_string(),
358            args.expected_blocking_reason.clone().unwrap_or_default(),
359        }
360    }
361}
362
363impl FromFcallResult for UnblockExecutionResult {
364    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
365        let _r = FcallResult::parse(raw)?.into_success()?;
366        Ok(UnblockExecutionResult::Unblocked)
367    }
368}
369