ff_script/functions/
budget.rs1use ff_core::contracts::*;
4use crate::error::ScriptError;
5use ff_core::keys::{ExecKeyContext, IndexKeys};
6
7use crate::result::{FcallResult, FromFcallResult};
8
9pub struct BudgetOpKeys<'a> {
11 pub usage_key: &'a str,
12 pub limits_key: &'a str,
13 pub def_key: &'a str,
14}
15
16pub struct BlockOpKeys<'a> {
18 pub ctx: &'a ExecKeyContext,
19 pub idx: &'a IndexKeys,
20 pub lane_id: &'a ff_core::types::LaneId,
21}
22
23pub async fn ff_create_budget(
34 conn: &ferriskey::Client,
35 k: &BudgetOpKeys<'_>,
36 resets_zset: &str,
37 policies_index: &str,
38 args: &CreateBudgetArgs,
39) -> Result<CreateBudgetResult, ScriptError> {
40 let keys: Vec<String> = vec![
41 k.def_key.to_string(),
42 k.limits_key.to_string(),
43 k.usage_key.to_string(),
44 resets_zset.to_string(),
45 policies_index.to_string(),
46 ];
47
48 let dim_count = args.dimensions.len();
49 let mut argv: Vec<String> = Vec::with_capacity(9 + dim_count * 3);
53 argv.push(args.budget_id.to_string());
54 argv.push(args.scope_type.clone());
55 argv.push(args.scope_id.clone());
56 argv.push(args.enforcement_mode.clone());
57 argv.push(args.on_hard_limit.clone());
58 argv.push(args.on_soft_limit.clone());
59 argv.push(args.reset_interval_ms.to_string());
60 argv.push(args.now.to_string());
61 argv.push(dim_count.to_string());
62 for dim in &args.dimensions {
63 argv.push(dim.clone());
64 }
65 for hard in &args.hard_limits {
66 argv.push(hard.to_string());
67 }
68 for soft in &args.soft_limits {
69 argv.push(soft.to_string());
70 }
71
72 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
73 let argv_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
74 let raw = conn
75 .fcall::<ferriskey::Value>("ff_create_budget", &key_refs, &argv_refs)
76 .await
77 .map_err(ScriptError::Valkey)?;
78 <CreateBudgetResult as FromFcallResult>::from_fcall_result(&raw)
79}
80
81impl FromFcallResult for CreateBudgetResult {
82 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
83 let r = FcallResult::parse(raw)?.into_success()?;
84 let id_str = r.field_str(0);
85 let budget_id = ff_core::types::BudgetId::parse(&id_str)
86 .map_err(|e| ScriptError::Parse(format!("invalid budget_id: {e}")))?;
87 match r.status.as_str() {
88 "OK" => Ok(CreateBudgetResult::Created { budget_id }),
89 "ALREADY_SATISFIED" => Ok(CreateBudgetResult::AlreadySatisfied { budget_id }),
90 _ => Err(ScriptError::Parse(format!("unexpected status: {}", r.status))),
91 }
92 }
93}
94
95pub async fn ff_report_usage_and_check(
104 conn: &ferriskey::Client,
105 k: &BudgetOpKeys<'_>,
106 args: &ReportUsageArgs,
107) -> Result<ReportUsageResult, ScriptError> {
108 let keys: Vec<String> = vec![
109 k.usage_key.to_string(),
110 k.limits_key.to_string(),
111 k.def_key.to_string(),
112 ];
113
114 let dim_count = args.dimensions.len();
116 let mut argv: Vec<String> = Vec::with_capacity(3 + dim_count * 2);
117 argv.push(dim_count.to_string());
118 for dim in &args.dimensions {
119 argv.push(dim.clone());
120 }
121 for delta in &args.deltas {
122 argv.push(delta.to_string());
123 }
124 argv.push(args.now.to_string());
125 argv.push(args.dedup_key.clone().unwrap_or_default());
126
127 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
128 let argv_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
129 let raw = conn
130 .fcall::<ferriskey::Value>("ff_report_usage_and_check", &key_refs, &argv_refs)
131 .await
132 .map_err(ScriptError::Valkey)?;
133 <ReportUsageResult as FromFcallResult>::from_fcall_result(&raw)
134}
135
136impl FromFcallResult for ReportUsageResult {
137 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
138 let r = FcallResult::parse(raw)?.into_success()?;
139 match r.status.as_str() {
140 "OK" => Ok(ReportUsageResult::Ok),
141 "ALREADY_APPLIED" => Ok(ReportUsageResult::AlreadyApplied),
142 "SOFT_BREACH" => {
143 let dim = r.field_str(0);
144 let current: u64 = r.field_str(1).parse().unwrap_or(0);
145 let limit: u64 = r.field_str(2).parse().unwrap_or(0);
146 Ok(ReportUsageResult::SoftBreach { dimension: dim, current_usage: current, soft_limit: limit })
147 }
148 "HARD_BREACH" => {
149 let dim = r.field_str(0);
150 let current: u64 = r.field_str(1).parse().unwrap_or(0);
151 let limit: u64 = r.field_str(2).parse().unwrap_or(0);
152 Ok(ReportUsageResult::HardBreach {
153 dimension: dim,
154 current_usage: current,
155 hard_limit: limit,
156 })
157 }
158 _ => Err(ScriptError::Parse(format!("unknown budget status: {}", r.status))),
159 }
160 }
161}
162
163pub async fn ff_reset_budget(
172 conn: &ferriskey::Client,
173 k: &BudgetOpKeys<'_>,
174 resets_zset: &str,
175 args: &ResetBudgetArgs,
176) -> Result<ResetBudgetResult, ScriptError> {
177 let keys: Vec<String> = vec![
178 k.def_key.to_string(),
179 k.usage_key.to_string(),
180 resets_zset.to_string(),
181 ];
182 let argv: Vec<String> = vec![
183 args.budget_id.to_string(),
184 args.now.to_string(),
185 ];
186 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
187 let argv_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
188 let raw = conn
189 .fcall::<ferriskey::Value>("ff_reset_budget", &key_refs, &argv_refs)
190 .await
191 .map_err(ScriptError::Valkey)?;
192 <ResetBudgetResult as FromFcallResult>::from_fcall_result(&raw)
193}
194
195impl FromFcallResult for ResetBudgetResult {
196 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
197 let r = FcallResult::parse(raw)?.into_success()?;
198 let next_str = r.field_str(0);
199 let next_ms: i64 = next_str.parse().unwrap_or(0);
200 Ok(ResetBudgetResult::Reset {
201 next_reset_at: ff_core::types::TimestampMs::from_millis(next_ms),
202 })
203 }
204}
205
206ff_function! {
212 pub ff_block_execution_for_admission(args: BlockExecutionArgs) -> BlockExecutionResult {
213 keys(k: &BlockOpKeys<'_>) {
214 k.ctx.core(),
215 k.idx.lane_eligible(k.lane_id),
216 {
217 match args.blocking_reason.as_str() {
218 "waiting_for_budget" => k.idx.lane_blocked_budget(k.lane_id),
219 "waiting_for_quota" => k.idx.lane_blocked_quota(k.lane_id),
220 _ => k.idx.lane_blocked_budget(k.lane_id),
221 }
222 },
223 }
224 argv {
225 args.execution_id.to_string(),
226 args.blocking_reason.clone(),
227 args.blocking_detail.clone().unwrap_or_default(),
228 args.now.to_string(),
229 }
230 }
231}
232
233impl FromFcallResult for BlockExecutionResult {
234 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
235 let _r = FcallResult::parse(raw)?.into_success()?;
236 Ok(BlockExecutionResult::Blocked)
237 }
238}
239
240ff_function! {
246 pub ff_unblock_execution(args: UnblockExecutionArgs) -> UnblockExecutionResult {
247 keys(k: &BlockOpKeys<'_>) {
248 k.ctx.core(),
249 {
250 match args.expected_blocking_reason.as_deref().unwrap_or("waiting_for_budget") {
251 "waiting_for_budget" => k.idx.lane_blocked_budget(k.lane_id),
252 "waiting_for_quota" => k.idx.lane_blocked_quota(k.lane_id),
253 _ => k.idx.lane_blocked_budget(k.lane_id),
254 }
255 },
256 k.idx.lane_eligible(k.lane_id),
257 }
258 argv {
259 args.execution_id.to_string(),
260 args.now.to_string(),
261 args.expected_blocking_reason.clone().unwrap_or_default(),
262 }
263 }
264}
265
266impl FromFcallResult for UnblockExecutionResult {
267 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
268 let _r = FcallResult::parse(raw)?.into_success()?;
269 Ok(UnblockExecutionResult::Unblocked)
270 }
271}
272