Skip to main content

ff_script/functions/
quota.rs

1//! Typed FCALL wrapper for quota admission function (lua/quota.lua).
2
3use ff_core::contracts::*;
4use crate::error::ScriptError;
5use ff_core::keys::QuotaKeyContext;
6
7use crate::result::{FcallResult, FromFcallResult};
8
9/// Key context for quota admission check on {q:K}.
10pub struct QuotaOpKeys<'a> {
11    pub ctx: &'a QuotaKeyContext,
12    pub dimension: &'a str,
13    pub execution_id: &'a ff_core::types::ExecutionId,
14}
15
16// ─── ff_create_quota_policy ───────────────────────────────────────────
17//
18// Lua KEYS (5): quota_def, quota_window_zset, quota_concurrency_counter,
19//               admitted_set, quota_policies_index
20// Lua ARGV (5): quota_policy_id, window_seconds, max_requests_per_window,
21//               max_concurrent, now_ms
22
23ff_function! {
24    pub ff_create_quota_policy(args: CreateQuotaPolicyArgs) -> CreateQuotaPolicyResult {
25        keys(k: &QuotaOpKeys<'_>) {
26            k.ctx.definition(),
27            k.ctx.window(k.dimension),
28            k.ctx.concurrency(),
29            k.ctx.admitted_set(),
30            ff_core::keys::quota_policies_index(k.ctx.hash_tag()),
31        }
32        argv {
33            args.quota_policy_id.to_string(),
34            args.window_seconds.to_string(),
35            args.max_requests_per_window.to_string(),
36            args.max_concurrent.to_string(),
37            args.now.to_string(),
38        }
39    }
40}
41
42impl FromFcallResult for CreateQuotaPolicyResult {
43    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
44        let r = FcallResult::parse(raw)?.into_success()?;
45        let id_str = r.field_str(0);
46        let qid = ff_core::types::QuotaPolicyId::parse(&id_str)
47            .map_err(|e| ScriptError::Parse(format!("invalid quota_policy_id: {e}")))?;
48        match r.status.as_str() {
49            "OK" => Ok(CreateQuotaPolicyResult::Created { quota_policy_id: qid }),
50            "ALREADY_SATISFIED" => Ok(CreateQuotaPolicyResult::AlreadySatisfied { quota_policy_id: qid }),
51            _ => Err(ScriptError::Parse(format!("unexpected status: {}", r.status))),
52        }
53    }
54}
55
56// ─── ff_check_admission_and_record ────────────────────────────────────
57//
58// Lua KEYS (5): window_zset, concurrency_counter, quota_def, admitted_guard,
59//               admitted_set
60// Lua ARGV (6): now_ms, window_seconds, rate_limit, concurrency_cap,
61//               execution_id, jitter_ms
62
63ff_function! {
64    pub ff_check_admission_and_record(args: CheckAdmissionArgs) -> CheckAdmissionResult {
65        keys(k: &QuotaOpKeys<'_>) {
66            k.ctx.window(k.dimension),
67            k.ctx.concurrency(),
68            k.ctx.definition(),
69            k.ctx.admitted(k.execution_id),
70            k.ctx.admitted_set(),
71        }
72        argv {
73            args.now.to_string(),
74            args.window_seconds.to_string(),
75            args.rate_limit.to_string(),
76            args.concurrency_cap.to_string(),
77            args.execution_id.to_string(),
78            args.jitter_ms.unwrap_or(0).to_string(),
79        }
80    }
81}
82
83impl FromFcallResult for CheckAdmissionResult {
84    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
85        // Domain-specific return: {"ADMITTED"}, {"ALREADY_ADMITTED"},
86        // {"RATE_EXCEEDED", retry_after_ms}, {"CONCURRENCY_EXCEEDED"}
87        let arr = match raw {
88            ferriskey::Value::Array(arr) => arr,
89            _ => return Err(ScriptError::Parse("expected Array".into())),
90        };
91        let status = match arr.first() {
92            Some(Ok(ferriskey::Value::BulkString(b))) => String::from_utf8_lossy(b).into_owned(),
93            Some(Ok(ferriskey::Value::SimpleString(s))) => s.clone(),
94            _ => return Err(ScriptError::Parse("expected status string".into())),
95        };
96        match status.as_str() {
97            "ADMITTED" => Ok(CheckAdmissionResult::Admitted),
98            "ALREADY_ADMITTED" => Ok(CheckAdmissionResult::AlreadyAdmitted),
99            "RATE_EXCEEDED" => {
100                let retry_str = match arr.get(1) {
101                    Some(Ok(ferriskey::Value::BulkString(b))) => {
102                        String::from_utf8_lossy(b).into_owned()
103                    }
104                    Some(Ok(ferriskey::Value::Int(n))) => n.to_string(),
105                    _ => "0".to_string(),
106                };
107                let retry_after: u64 = retry_str.parse().unwrap_or(0);
108                Ok(CheckAdmissionResult::RateExceeded {
109                    retry_after_ms: retry_after,
110                })
111            }
112            "CONCURRENCY_EXCEEDED" => Ok(CheckAdmissionResult::ConcurrencyExceeded),
113            _ => Err(ScriptError::Parse(format!(
114                "unknown admission status: {status}"
115            ))),
116        }
117    }
118}
119
120// ─── ff_release_admission ────────────────────────────────────────────
121//
122// Lua KEYS (3): admitted_guard_key, admitted_set, concurrency_counter
123// Lua ARGV (1): execution_id
124
125ff_function! {
126    pub ff_release_admission(args: ReleaseAdmissionArgs) -> ReleaseAdmissionResult {
127        keys(k: &QuotaOpKeys<'_>) {
128            k.ctx.admitted(k.execution_id),
129            k.ctx.admitted_set(),
130            k.ctx.concurrency(),
131        }
132        argv {
133            args.execution_id.to_string(),
134        }
135    }
136}
137
138impl FromFcallResult for ReleaseAdmissionResult {
139    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
140        let _r = FcallResult::parse(raw)?.into_success()?;
141        Ok(ReleaseAdmissionResult::Released)
142    }
143}