Skip to main content

ff_core/
policy.rs

1use crate::types::{BudgetId, TimestampMs};
2use serde::{Deserialize, Serialize};
3use std::collections::BTreeSet;
4
5/// Capability CSV ceilings (RFC-009 §7.5). Shared between `ff-sdk` (worker
6/// caps ingress) and `ff-scheduler` (worker caps ingress); mirrored in
7/// `lua/helpers.lua` and enforced by `lua/scheduling.lua`/`lua/execution.lua`
8/// as defense-in-depth. Inclusive: a CSV exactly CAPS_MAX_BYTES long is
9/// accepted; one byte more is rejected.
10pub const CAPS_MAX_BYTES: usize = 4096;
11pub const CAPS_MAX_TOKENS: usize = 256;
12
13/// Retry configuration for an execution.
14#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
15pub struct RetryPolicy {
16    /// Maximum number of retry attempts (not counting the initial attempt).
17    #[serde(default = "default_max_retries")]
18    pub max_retries: u32,
19    /// Backoff strategy.
20    #[serde(default)]
21    pub backoff: BackoffStrategy,
22    /// Error categories eligible for automatic retry.
23    #[serde(default)]
24    pub retryable_categories: Vec<String>,
25}
26
27fn default_max_retries() -> u32 {
28    3
29}
30
31impl Default for RetryPolicy {
32    fn default() -> Self {
33        Self {
34            max_retries: default_max_retries(),
35            backoff: BackoffStrategy::default(),
36            retryable_categories: Vec::new(),
37        }
38    }
39}
40
41/// Backoff strategy for retries.
42#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
43#[serde(rename_all = "snake_case", tag = "type")]
44pub enum BackoffStrategy {
45    /// Fixed delay between retries.
46    Fixed { delay_ms: u64 },
47    /// Exponential backoff with optional jitter.
48    Exponential {
49        initial_delay_ms: u64,
50        max_delay_ms: u64,
51        multiplier: f64,
52        #[serde(default)]
53        jitter: bool,
54    },
55}
56
57impl Default for BackoffStrategy {
58    fn default() -> Self {
59        Self::Exponential {
60            initial_delay_ms: 1000,
61            max_delay_ms: 60_000,
62            multiplier: 2.0,
63            jitter: false,
64        }
65    }
66}
67
68/// Timeout configuration for an execution.
69#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
70pub struct TimeoutPolicy {
71    /// Per-attempt timeout in milliseconds.
72    #[serde(default, skip_serializing_if = "Option::is_none")]
73    pub attempt_timeout_ms: Option<u64>,
74    /// Total execution deadline (absolute timestamp or duration from creation).
75    #[serde(default, skip_serializing_if = "Option::is_none")]
76    pub execution_deadline_ms: Option<u64>,
77    /// Maximum number of lease-expiry reclaims before failing with max_reclaims_exceeded.
78    /// Default: 100.
79    #[serde(default = "default_max_reclaim_count")]
80    pub max_reclaim_count: u32,
81}
82
83fn default_max_reclaim_count() -> u32 {
84    100
85}
86
87/// Suspension behavior configuration.
88#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
89pub struct SuspensionPolicy {
90    /// Default suspension timeout in milliseconds.
91    #[serde(default, skip_serializing_if = "Option::is_none")]
92    pub default_timeout_ms: Option<u64>,
93    /// What happens when suspension times out: "fail" or "cancel".
94    #[serde(default = "default_timeout_behavior")]
95    pub timeout_behavior: String,
96}
97
98fn default_timeout_behavior() -> String {
99    "fail".to_owned()
100}
101
102/// Fallback chain configuration (provider/model progression).
103#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
104pub struct FallbackPolicy {
105    /// Ordered list of fallback tiers.
106    pub tiers: Vec<FallbackTier>,
107}
108
109/// A single tier in the fallback chain.
110#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
111pub struct FallbackTier {
112    /// Provider name (e.g., "anthropic", "openai").
113    pub provider: String,
114    /// Model identifier.
115    pub model: String,
116    /// Optional per-tier timeout override in ms.
117    #[serde(default, skip_serializing_if = "Option::is_none")]
118    pub timeout_ms: Option<u64>,
119}
120
121/// Routing requirements for worker matching.
122#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
123pub struct RoutingRequirements {
124    /// Required capabilities the worker must advertise. An empty set means
125    /// any worker on the lane may claim (backwards-compatible default).
126    /// `BTreeSet` for deterministic ordering — critical for the sorted CSV
127    /// form that ff_issue_claim_grant receives in ARGV and for reproducible
128    /// test output / log correlation.
129    #[serde(default)]
130    pub required_capabilities: BTreeSet<String>,
131    /// Preferred locality/region.
132    #[serde(default, skip_serializing_if = "Option::is_none")]
133    pub preferred_locality: Option<String>,
134    /// Isolation level.
135    #[serde(default, skip_serializing_if = "Option::is_none")]
136    pub isolation_level: Option<String>,
137}
138
139/// Stream durability and retention configuration.
140#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
141pub struct StreamPolicy {
142    /// Durability mode: "buffered" (default) or "durable".
143    #[serde(default = "default_durability_mode")]
144    pub durability_mode: String,
145    /// Maximum number of frames to retain per stream.
146    #[serde(default = "default_retention_maxlen")]
147    pub retention_maxlen: u64,
148    /// Stream retention TTL in ms after closure.
149    #[serde(default, skip_serializing_if = "Option::is_none")]
150    pub retention_ttl_ms: Option<u64>,
151}
152
153fn default_durability_mode() -> String {
154    "buffered".to_owned()
155}
156
157fn default_retention_maxlen() -> u64 {
158    10_000
159}
160
161/// Complete execution policy snapshot, frozen at creation time.
162#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
163pub struct ExecutionPolicy {
164    /// Higher value = higher priority. Default: 0.
165    #[serde(default)]
166    pub priority: i32,
167    /// Earliest eligible time.
168    #[serde(default, skip_serializing_if = "Option::is_none")]
169    pub delay_until: Option<TimestampMs>,
170    /// Retry configuration.
171    #[serde(default, skip_serializing_if = "Option::is_none")]
172    pub retry_policy: Option<RetryPolicy>,
173    /// Timeout configuration.
174    #[serde(default, skip_serializing_if = "Option::is_none")]
175    pub timeout_policy: Option<TimeoutPolicy>,
176    /// Maximum lease-expiry reclaims. Default: 100.
177    #[serde(default = "default_max_reclaim_count")]
178    pub max_reclaim_count: u32,
179    /// Suspension behavior.
180    #[serde(default, skip_serializing_if = "Option::is_none")]
181    pub suspension_policy: Option<SuspensionPolicy>,
182    /// Fallback chain.
183    #[serde(default, skip_serializing_if = "Option::is_none")]
184    pub fallback_policy: Option<FallbackPolicy>,
185    /// Maximum number of replays. Default: 10.
186    #[serde(default = "default_max_replay_count")]
187    pub max_replay_count: u32,
188    /// Attached budget references.
189    #[serde(default)]
190    pub budget_ids: Vec<BudgetId>,
191    /// Routing requirements.
192    #[serde(default, skip_serializing_if = "Option::is_none")]
193    pub routing_requirements: Option<RoutingRequirements>,
194    /// Idempotency dedup window in ms. V1 default: 24h.
195    #[serde(default, skip_serializing_if = "Option::is_none")]
196    pub dedup_window_ms: Option<u64>,
197    /// Stream policy.
198    #[serde(default, skip_serializing_if = "Option::is_none")]
199    pub stream_policy: Option<StreamPolicy>,
200    /// Maximum signal records accepted. Default: 10000.
201    #[serde(default = "default_max_signals")]
202    pub max_signals_per_execution: u32,
203}
204
205impl Default for ExecutionPolicy {
206    fn default() -> Self {
207        Self {
208            priority: 0,
209            delay_until: None,
210            retry_policy: None,
211            timeout_policy: None,
212            max_reclaim_count: default_max_reclaim_count(),
213            suspension_policy: None,
214            fallback_policy: None,
215            max_replay_count: default_max_replay_count(),
216            budget_ids: Vec::new(),
217            routing_requirements: None,
218            dedup_window_ms: None,
219            stream_policy: None,
220            max_signals_per_execution: default_max_signals(),
221        }
222    }
223}
224
225fn default_max_replay_count() -> u32 {
226    10
227}
228
229fn default_max_signals() -> u32 {
230    10_000
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236
237    #[test]
238    fn execution_policy_defaults() {
239        let policy = ExecutionPolicy::default();
240        assert_eq!(policy.priority, 0);
241        assert_eq!(policy.max_reclaim_count, 100);
242        assert_eq!(policy.max_replay_count, 10);
243        assert_eq!(policy.max_signals_per_execution, 10_000);
244        assert!(policy.retry_policy.is_none());
245        assert!(policy.timeout_policy.is_none());
246    }
247
248    #[test]
249    fn retry_policy_serde() {
250        let policy = RetryPolicy {
251            max_retries: 3,
252            backoff: BackoffStrategy::Exponential {
253                initial_delay_ms: 100,
254                max_delay_ms: 30_000,
255                multiplier: 2.0,
256                jitter: true,
257            },
258            retryable_categories: vec!["timeout".into(), "provider_error".into()],
259        };
260        let json = serde_json::to_string(&policy).unwrap();
261        let parsed: RetryPolicy = serde_json::from_str(&json).unwrap();
262        assert_eq!(policy, parsed);
263    }
264
265    #[test]
266    fn timeout_policy_defaults() {
267        let json = r#"{"attempt_timeout_ms": 30000}"#;
268        let policy: TimeoutPolicy = serde_json::from_str(json).unwrap();
269        assert_eq!(policy.attempt_timeout_ms, Some(30_000));
270        assert_eq!(policy.max_reclaim_count, 100);
271    }
272
273    #[test]
274    fn retry_policy_defaults() {
275        let policy = RetryPolicy::default();
276        assert_eq!(policy.max_retries, 3);
277        assert_eq!(
278            policy.backoff,
279            BackoffStrategy::Exponential {
280                initial_delay_ms: 1000,
281                max_delay_ms: 60_000,
282                multiplier: 2.0,
283                jitter: false,
284            }
285        );
286        assert!(policy.retryable_categories.is_empty());
287    }
288
289    #[test]
290    fn retry_policy_lua_compatible_json() {
291        let policy = RetryPolicy::default();
292        let json = serde_json::to_value(&policy).unwrap();
293        assert_eq!(json["max_retries"], 3);
294        let backoff = &json["backoff"];
295        assert_eq!(backoff["type"], "exponential");
296        assert_eq!(backoff["initial_delay_ms"], 1000);
297        assert_eq!(backoff["max_delay_ms"], 60_000);
298        assert_eq!(backoff["multiplier"], 2.0);
299
300        let fixed = RetryPolicy {
301            max_retries: 1,
302            backoff: BackoffStrategy::Fixed { delay_ms: 5000 },
303            retryable_categories: vec![],
304        };
305        let json = serde_json::to_value(&fixed).unwrap();
306        assert_eq!(json["backoff"]["type"], "fixed");
307        assert_eq!(json["backoff"]["delay_ms"], 5000);
308    }
309
310    #[test]
311    fn retry_policy_deserialize_minimal() {
312        let json = r#"{"max_retries": 5}"#;
313        let policy: RetryPolicy = serde_json::from_str(json).unwrap();
314        assert_eq!(policy.max_retries, 5);
315        assert_eq!(policy.backoff, BackoffStrategy::default());
316    }
317
318    /// Regression: `ExecutionPolicy::default()` must not serialize any
319    /// `null` fields. Lua's `cjson.decode` maps JSON `null` to a
320    /// `cjson.null` sentinel (userdata), which fails the
321    /// `type(field) == "table"` checks in `lua/policy.lua` and produces
322    /// `invalid_policy_json:<field>:not_object` errors on every HTTP
323    /// consumer that posts a default-constructed policy.
324    #[test]
325    fn default_execution_policy_has_no_nulls() {
326        let policy = ExecutionPolicy::default();
327        let json = serde_json::to_value(&policy).unwrap();
328        let obj = json.as_object().expect("top-level object");
329        for (key, value) in obj {
330            assert!(
331                !value.is_null(),
332                "default ExecutionPolicy must not emit null field `{key}` — \
333                 Lua policy validation rejects null for optional table fields"
334            );
335        }
336        // Sanity: the scalar fields with defaults are still present.
337        assert_eq!(obj.get("priority"), Some(&serde_json::json!(0)));
338        assert_eq!(obj.get("max_reclaim_count"), Some(&serde_json::json!(100)));
339    }
340
341    /// Regression: setting a single optional field must not surface
342    /// `null` for the other seven. Reproduces the original failure mode:
343    /// a consumer sets `retry_policy` only, and the server rejects the
344    /// request because `routing_requirements: null` is interpreted as
345    /// a non-table by cjson.
346    #[test]
347    fn partial_execution_policy_omits_unset_options() {
348        let policy = ExecutionPolicy {
349            retry_policy: Some(RetryPolicy::default()),
350            ..Default::default()
351        };
352        let json = serde_json::to_value(&policy).unwrap();
353        let obj = json.as_object().expect("top-level object");
354        for field in [
355            "delay_until",
356            "timeout_policy",
357            "suspension_policy",
358            "fallback_policy",
359            "routing_requirements",
360            "dedup_window_ms",
361            "stream_policy",
362        ] {
363            assert!(
364                !obj.contains_key(field),
365                "field `{field}` must be absent when unset, not `null`"
366            );
367        }
368        assert!(obj.contains_key("retry_policy"));
369    }
370
371    /// Round-trip a fully-populated policy to prove `skip_serializing_if`
372    /// does not drop set values.
373    #[test]
374    fn populated_execution_policy_round_trip() {
375        let policy = ExecutionPolicy {
376            priority: 7,
377            delay_until: Some(TimestampMs(123_456)),
378            retry_policy: Some(RetryPolicy::default()),
379            timeout_policy: Some(TimeoutPolicy {
380                attempt_timeout_ms: Some(30_000),
381                execution_deadline_ms: Some(300_000),
382                max_reclaim_count: 5,
383            }),
384            suspension_policy: Some(SuspensionPolicy {
385                default_timeout_ms: Some(60_000),
386                timeout_behavior: "cancel".into(),
387            }),
388            fallback_policy: Some(FallbackPolicy {
389                tiers: vec![FallbackTier {
390                    provider: "anthropic".into(),
391                    model: "claude-opus".into(),
392                    timeout_ms: Some(45_000),
393                }],
394            }),
395            routing_requirements: Some(RoutingRequirements {
396                required_capabilities: BTreeSet::from(["gpu".to_owned()]),
397                preferred_locality: Some("us-west-2".into()),
398                isolation_level: Some("strict".into()),
399            }),
400            dedup_window_ms: Some(86_400_000),
401            stream_policy: Some(StreamPolicy {
402                durability_mode: "durable".into(),
403                retention_maxlen: 5000,
404                retention_ttl_ms: Some(3_600_000),
405            }),
406            ..Default::default()
407        };
408        let json = serde_json::to_string(&policy).unwrap();
409        assert!(
410            !json.contains(":null"),
411            "populated policy must not contain null fields: {json}"
412        );
413        let parsed: ExecutionPolicy = serde_json::from_str(&json).unwrap();
414        assert_eq!(policy, parsed);
415    }
416
417    #[test]
418    fn full_execution_policy_serde() {
419        let policy = ExecutionPolicy {
420            priority: 10,
421            retry_policy: Some(RetryPolicy {
422                max_retries: 5,
423                backoff: BackoffStrategy::Fixed { delay_ms: 1000 },
424                retryable_categories: vec![],
425            }),
426            ..Default::default()
427        };
428        let json = serde_json::to_string(&policy).unwrap();
429        let parsed: ExecutionPolicy = serde_json::from_str(&json).unwrap();
430        assert_eq!(policy, parsed);
431    }
432}