aperion_shield/orgmode/
policy_pull.rs1use std::sync::Arc;
13use std::time::Duration;
14
15use tokio::sync::watch;
16
17use super::client::OrgApi;
18use super::state::OrgState;
19use crate::Engine;
20
21const POLL_INTERVAL: Duration = Duration::from_secs(30);
23
24pub struct PolicyPullHandle {
27 pub current: watch::Receiver<Arc<Engine>>,
28 pub killswitch: watch::Receiver<bool>,
29 pub version: Arc<tokio::sync::Mutex<u64>>,
32 pub _task: tokio::task::JoinHandle<()>,
33}
34
35pub fn start_policy_pull(
40 api: Arc<OrgApi>,
41 state: OrgState,
42 initial_engine: Arc<Engine>,
43 initial_version: u64,
44) -> PolicyPullHandle {
45 let (tx, rx) = watch::channel(initial_engine);
46 let (ks_tx, ks_rx) = watch::channel(false);
47 let version = Arc::new(tokio::sync::Mutex::new(initial_version));
48 let version_for_task = version.clone();
49
50 let task = tokio::spawn(async move {
51 let mut ticker = tokio::time::interval(POLL_INTERVAL);
52 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
53 ticker.tick().await;
56 loop {
57 ticker.tick().await;
58 let probe = match api.get_shieldset_version(&state.policy_group).await {
59 Ok(v) => v,
60 Err(e) => {
61 log::warn!("[shield] policy version probe failed: {}", e);
62 continue;
63 }
64 };
65
66 let _ = ks_tx.send(probe.killswitch.on);
69 if probe.killswitch.on {
70 log::warn!(
71 "[shield] killswitch ON (reason={:?}) -- block-all in effect",
72 probe.killswitch.reason
73 );
74 }
75
76 let cur = version_for_task.lock().await;
77 if probe.version <= *cur {
78 continue;
79 }
80 drop(cur); let pulled = match api.get_shieldset(&state.policy_group).await {
83 Ok(p) => p,
84 Err(e) => {
85 log::warn!("[shield] shieldset fetch failed: {}", e);
86 continue;
87 }
88 };
89 let (yaml, new_version) = pulled;
90 let new_engine = match Engine::from_yaml(&yaml) {
91 Ok(e) => e,
92 Err(e) => {
93 log::error!(
94 "[shield] pulled shieldset is invalid (version={}): {}. Keeping previous policy.",
95 new_version, e
96 );
97 continue;
98 }
99 };
100 let mut cur = version_for_task.lock().await;
101 *cur = new_version;
102 drop(cur);
103
104 log::warn!(
105 "[shield] hot-reloaded policy: group={} version={} rules={}",
106 state.policy_group,
107 new_version,
108 new_engine.rules.len()
109 );
110 let _ = tx.send(Arc::new(new_engine));
113 }
114 });
115
116 PolicyPullHandle {
117 current: rx,
118 killswitch: ks_rx,
119 version,
120 _task: task,
121 }
122}