1use serde::{Deserialize, Serialize};
9use serde_json::Value;
10use std::collections::{BTreeMap, BTreeSet};
11
12use crate::search::policy::{SemanticMode, SemanticPolicy};
13
14pub const POLICY_REGISTRY_SCHEMA_VERSION: &str = "1";
15
16#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
17pub struct PolicyRegistrySnapshot {
18 pub schema_version: String,
19 pub controllers: Vec<PolicyControllerSnapshot>,
20}
21
22impl PolicyRegistrySnapshot {
23 pub fn new(mut controllers: Vec<PolicyControllerSnapshot>) -> Self {
24 controllers.sort_by(|left, right| left.controller_id.cmp(&right.controller_id));
25 Self {
26 schema_version: POLICY_REGISTRY_SCHEMA_VERSION.to_string(),
27 controllers,
28 }
29 }
30
31 pub fn controller_ids(&self) -> BTreeSet<&str> {
32 self.controllers
33 .iter()
34 .map(|controller| controller.controller_id.as_str())
35 .collect()
36 }
37}
38
39#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
40pub struct PolicyControllerSnapshot {
41 pub controller_id: String,
42 pub policy_id: String,
43 pub policy_version: String,
44 pub status: PolicyControllerStatus,
45 pub fallback_state: PolicyFallbackState,
46 pub conservative_fallback: bool,
47 pub decision_reason: String,
48 #[serde(default)]
49 pub inputs: BTreeMap<String, String>,
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
53#[serde(rename_all = "snake_case")]
54pub enum PolicyControllerStatus {
55 Active,
56 Disabled,
57 Fallback,
58}
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
61#[serde(rename_all = "snake_case")]
62pub enum PolicyFallbackState {
63 NotNeeded,
64 Conservative,
65 Disabled,
66}
67
68pub fn policy_registry_snapshot(
69 semantic_policy: &SemanticPolicy,
70 semantic_available: bool,
71 semantic_fallback_mode: Option<&str>,
72 lexical_rebuild_pipeline: &Value,
73) -> PolicyRegistrySnapshot {
74 PolicyRegistrySnapshot::new(vec![
75 semantic_policy_controller_snapshot(
76 semantic_policy,
77 semantic_available,
78 semantic_fallback_mode,
79 ),
80 lexical_rebuild_policy_controller_snapshot(lexical_rebuild_pipeline),
81 ])
82}
83
84pub fn semantic_policy_controller_snapshot(
85 policy: &SemanticPolicy,
86 semantic_available: bool,
87 semantic_fallback_mode: Option<&str>,
88) -> PolicyControllerSnapshot {
89 let mut inputs = BTreeMap::new();
90 inputs.insert("mode".to_string(), policy.mode.as_str().to_string());
91 inputs.insert(
92 "download_policy".to_string(),
93 policy.download_policy.as_str().to_string(),
94 );
95 inputs.insert(
96 "fast_tier_embedder".to_string(),
97 policy.fast_tier_embedder.clone(),
98 );
99 inputs.insert(
100 "quality_tier_embedder".to_string(),
101 policy.quality_tier_embedder.clone(),
102 );
103 inputs.insert("reranker".to_string(), policy.reranker.clone());
104 inputs.insert(
105 "fast_dimension".to_string(),
106 policy.fast_dimension.to_string(),
107 );
108 inputs.insert(
109 "quality_dimension".to_string(),
110 policy.quality_dimension.to_string(),
111 );
112 inputs.insert(
113 "quality_weight".to_string(),
114 policy.quality_weight.to_string(),
115 );
116 inputs.insert(
117 "max_refinement_docs".to_string(),
118 policy.max_refinement_docs.to_string(),
119 );
120 inputs.insert(
121 "semantic_budget_mb".to_string(),
122 policy.semantic_budget_mb.to_string(),
123 );
124 inputs.insert(
125 "min_free_disk_mb".to_string(),
126 policy.min_free_disk_mb.to_string(),
127 );
128 inputs.insert(
129 "max_model_size_mb".to_string(),
130 policy.max_model_size_mb.to_string(),
131 );
132 inputs.insert(
133 "max_backfill_threads".to_string(),
134 policy.max_backfill_threads.to_string(),
135 );
136 inputs.insert(
137 "max_backfill_rss_mb".to_string(),
138 policy.max_backfill_rss_mb.to_string(),
139 );
140 inputs.insert(
141 "idle_delay_seconds".to_string(),
142 policy.idle_delay_seconds.to_string(),
143 );
144 inputs.insert(
145 "chunk_timeout_seconds".to_string(),
146 policy.chunk_timeout_seconds.to_string(),
147 );
148 inputs.insert(
149 "semantic_schema_version".to_string(),
150 policy.semantic_schema_version.to_string(),
151 );
152 inputs.insert(
153 "chunking_strategy_version".to_string(),
154 policy.chunking_strategy_version.to_string(),
155 );
156 inputs.insert(
157 "semantic_available".to_string(),
158 semantic_available.to_string(),
159 );
160 inputs.insert(
161 "semantic_fallback_mode".to_string(),
162 semantic_fallback_mode.unwrap_or("none").to_string(),
163 );
164
165 let fallback_state = if !policy.mode.should_build_semantic() {
166 PolicyFallbackState::Disabled
167 } else if semantic_fallback_mode.is_some() || !semantic_available {
168 PolicyFallbackState::Conservative
169 } else {
170 PolicyFallbackState::NotNeeded
171 };
172 let status = match fallback_state {
173 PolicyFallbackState::Disabled => PolicyControllerStatus::Disabled,
174 PolicyFallbackState::Conservative => PolicyControllerStatus::Fallback,
175 _ => PolicyControllerStatus::Active,
176 };
177 let decision_reason = match (policy.mode, semantic_available, semantic_fallback_mode) {
178 (SemanticMode::LexicalOnly, _, _) => "semantic disabled by lexical_only policy",
179 (mode, _, Some("lexical")) if mode.requires_semantic() => {
180 "strict semantic policy observed lexical fallback; semantic is unavailable"
181 }
182 (_, _, Some(mode)) => {
183 if mode == "lexical" {
184 "semantic unavailable; lexical fallback remains active"
185 } else {
186 "semantic fallback mode reported by asset inspection"
187 }
188 }
189 (_, false, _) => "semantic assets unavailable; conservative lexical floor remains active",
190 _ => "semantic policy active",
191 };
192
193 PolicyControllerSnapshot {
194 controller_id: "semantic_search".to_string(),
195 policy_id: format!("semantic.{}.v1", policy.mode.as_str()),
196 policy_version: format!(
197 "semantic_schema_{}+chunking_{}",
198 policy.semantic_schema_version, policy.chunking_strategy_version
199 ),
200 status,
201 fallback_state,
202 conservative_fallback: fallback_state == PolicyFallbackState::Conservative,
203 decision_reason: decision_reason.to_string(),
204 inputs,
205 }
206}
207
208pub fn lexical_rebuild_policy_controller_snapshot(pipeline: &Value) -> PolicyControllerSnapshot {
209 let mut inputs = BTreeMap::new();
210 for key in [
211 "controller_mode",
212 "controller_restore_clear_samples",
213 "controller_restore_hold_ms",
214 "pipeline_channel_size",
215 "pipeline_max_message_bytes_in_flight",
216 "page_prep_workers",
217 "staged_merge_workers",
218 "staged_shard_builders",
219 "controller_loadavg_high_watermark_1m",
220 "controller_loadavg_low_watermark_1m",
221 ] {
222 insert_json_input(&mut inputs, key, pipeline.get(key));
223 }
224
225 let runtime = pipeline.get("runtime");
226 let controller_mode = runtime
227 .and_then(|value| value.get("controller_mode"))
228 .and_then(Value::as_str);
229 let controller_reason = runtime
230 .and_then(|value| value.get("controller_reason"))
231 .and_then(Value::as_str)
232 .unwrap_or("pipeline settings active");
233 let fallback_state = match controller_mode {
234 Some("disabled") => PolicyFallbackState::Disabled,
235 Some("conservative") | Some("throttled") | Some("reduced") => {
236 PolicyFallbackState::Conservative
237 }
238 _ => PolicyFallbackState::NotNeeded,
239 };
240 let status = match fallback_state {
241 PolicyFallbackState::Disabled => PolicyControllerStatus::Disabled,
242 PolicyFallbackState::Conservative => PolicyControllerStatus::Fallback,
243 PolicyFallbackState::NotNeeded => PolicyControllerStatus::Active,
244 };
245
246 PolicyControllerSnapshot {
247 controller_id: "lexical_rebuild_pipeline".to_string(),
248 policy_id: "lexical_rebuild.pipeline.v1".to_string(),
249 policy_version: "pipeline_settings_v1".to_string(),
250 status,
251 fallback_state,
252 conservative_fallback: fallback_state == PolicyFallbackState::Conservative,
253 decision_reason: controller_reason.to_string(),
254 inputs,
255 }
256}
257
258fn insert_json_input(inputs: &mut BTreeMap<String, String>, key: &str, value: Option<&Value>) {
259 inputs.insert(
260 key.to_string(),
261 value
262 .map(json_input_string)
263 .unwrap_or_else(|| "null".to_string()),
264 );
265}
266
267fn json_input_string(value: &Value) -> String {
268 match value {
269 Value::Null => "null".to_string(),
270 Value::Bool(value) => value.to_string(),
271 Value::Number(value) => value.to_string(),
272 Value::String(value) => value.clone(),
273 Value::Array(_) | Value::Object(_) => value.to_string(),
274 }
275}
276
277#[cfg(test)]
278mod tests {
279 use super::*;
280 use crate::search::policy::{CliSemanticOverrides, SemanticMode};
281 use serde_json::json;
282
283 #[test]
284 fn registry_snapshot_is_deterministic_and_sorted() {
285 let policy = SemanticPolicy::compiled_defaults();
286 let pipeline = pipeline_fixture();
287
288 let first = policy_registry_snapshot(&policy, false, Some("lexical"), &pipeline);
289 let second = policy_registry_snapshot(&policy, false, Some("lexical"), &pipeline);
290
291 assert_eq!(first, second);
292 assert_eq!(
293 first.controller_ids(),
294 BTreeSet::from(["lexical_rebuild_pipeline", "semantic_search"])
295 );
296 assert_eq!(
297 first.controllers[0].controller_id,
298 "lexical_rebuild_pipeline"
299 );
300 assert_eq!(first.controllers[1].controller_id, "semantic_search");
301 }
302
303 #[test]
304 fn semantic_policy_snapshot_reports_lexical_fallback_without_changing_policy() {
305 let policy = SemanticPolicy::compiled_defaults();
306
307 let snapshot = semantic_policy_controller_snapshot(&policy, false, Some("lexical"));
308
309 assert_eq!(snapshot.status, PolicyControllerStatus::Fallback);
310 assert_eq!(snapshot.fallback_state, PolicyFallbackState::Conservative);
311 assert!(snapshot.conservative_fallback);
312 assert_eq!(snapshot.inputs["mode"], "hybrid_preferred");
313 assert_eq!(policy.mode, SemanticMode::HybridPreferred);
314 }
315
316 #[test]
317 fn semantic_policy_snapshot_reports_disabled_lexical_only_policy() {
318 let policy =
319 SemanticPolicy::compiled_defaults().with_cli_overrides(&CliSemanticOverrides {
320 mode: Some(SemanticMode::LexicalOnly),
321 ..CliSemanticOverrides::default()
322 });
323
324 let snapshot = semantic_policy_controller_snapshot(&policy, true, None);
325
326 assert_eq!(snapshot.status, PolicyControllerStatus::Disabled);
327 assert_eq!(snapshot.fallback_state, PolicyFallbackState::Disabled);
328 assert!(!snapshot.conservative_fallback);
329 assert_eq!(snapshot.policy_id, "semantic.lexical_only.v1");
330 }
331
332 #[test]
333 fn lexical_rebuild_snapshot_uses_only_supplied_pipeline_json() {
334 let pipeline = pipeline_fixture();
335
336 let first = lexical_rebuild_policy_controller_snapshot(&pipeline);
337 let second = lexical_rebuild_policy_controller_snapshot(&pipeline);
338
339 assert_eq!(first, second);
340 assert_eq!(first.status, PolicyControllerStatus::Fallback);
341 assert_eq!(first.fallback_state, PolicyFallbackState::Conservative);
342 assert_eq!(first.inputs["pipeline_channel_size"], "128");
343 assert_eq!(first.decision_reason, "load pressure reduced workers");
344 }
345
346 fn pipeline_fixture() -> Value {
347 json!({
348 "pipeline_channel_size": 128,
349 "pipeline_max_message_bytes_in_flight": 1048576,
350 "page_prep_workers": 12,
351 "staged_merge_workers": 4,
352 "staged_shard_builders": 8,
353 "controller_mode": "auto",
354 "controller_restore_clear_samples": 3,
355 "controller_restore_hold_ms": 5000,
356 "controller_loadavg_high_watermark_1m": 1.75,
357 "controller_loadavg_low_watermark_1m": 0.75,
358 "runtime": {
359 "controller_mode": "throttled",
360 "controller_reason": "load pressure reduced workers"
361 }
362 })
363 }
364}