1use std::time::Duration;
18
19use anyhow::{Context, Result};
20use async_nats::jetstream::{
21 self,
22 kv::Config as KvConfig,
23 object_store::Config as ObjectStoreConfig,
24 stream::{Config as StreamConfig, DiscardPolicy},
25};
26use tracing::info;
27
28use crate::kv::{
29 BUCKET_AGENT_CONFIG, BUCKET_AGENT_GROUPS, BUCKET_AGENTS_STATE, BUCKET_FLEET_CONFIG,
30 BUCKET_JOBS, BUCKET_JOBS_YAML, BUCKET_SCHEDULES, BUCKET_SCHEDULES_YAML, BUCKET_SCRIPT_CURRENT,
31 BUCKET_SCRIPT_STATUS, OBJECT_AGENT_RELEASES, OBJECT_APP_PACKAGES, OBJECT_RESULT_OUTPUT,
32 OBJECT_SCRIPTS, STREAM_AUDIT, STREAM_EVENTS, STREAM_EXEC, STREAM_INVENTORY, STREAM_OBS_EVENTS,
33 STREAM_RESULTS,
34};
35
36pub async fn ensure_jetstream_resources(js: &jetstream::Context) -> Result<()> {
44 js.create_or_update_stream(StreamConfig {
47 name: STREAM_INVENTORY.into(),
48 subjects: vec!["inventory.>".into()],
49 max_age: Duration::from_secs(90 * 24 * 60 * 60),
50 ..Default::default()
51 })
52 .await
53 .with_context(|| format!("create_or_update_stream {STREAM_INVENTORY}"))?;
54 info!(stream = STREAM_INVENTORY, "ready");
55
56 js.create_or_update_stream(StreamConfig {
58 name: STREAM_RESULTS.into(),
59 subjects: vec!["results.>".into()],
60 max_age: Duration::from_secs(30 * 24 * 60 * 60),
61 ..Default::default()
62 })
63 .await
64 .with_context(|| format!("create_or_update_stream {STREAM_RESULTS}"))?;
65 info!(stream = STREAM_RESULTS, "ready");
66
67 js.create_or_update_stream(StreamConfig {
76 name: STREAM_EXEC.into(),
77 subjects: vec!["commands.>".into()],
78 max_messages_per_subject: 1,
79 discard: DiscardPolicy::Old,
80 max_age: Duration::from_secs(7 * 24 * 60 * 60),
81 ..Default::default()
82 })
83 .await
84 .with_context(|| format!("create_or_update_stream {STREAM_EXEC}"))?;
85 info!(stream = STREAM_EXEC, "ready");
86
87 js.create_or_update_stream(StreamConfig {
90 name: STREAM_EVENTS.into(),
91 subjects: vec!["events.>".into()],
92 max_age: Duration::from_secs(7 * 24 * 60 * 60),
93 ..Default::default()
94 })
95 .await
96 .with_context(|| format!("create_or_update_stream {STREAM_EVENTS}"))?;
97 info!(stream = STREAM_EVENTS, "ready");
98
99 js.create_or_update_stream(StreamConfig {
101 name: STREAM_AUDIT.into(),
102 subjects: vec!["audit.>".into()],
103 ..Default::default()
104 })
105 .await
106 .with_context(|| format!("create_or_update_stream {STREAM_AUDIT}"))?;
107 info!(stream = STREAM_AUDIT, "ready");
108
109 const SECS_PER_DAY: u64 = 24 * 60 * 60;
122 const OBS_EVENTS_RETENTION_DAYS: u64 = 90;
123 js.create_or_update_stream(StreamConfig {
124 name: STREAM_OBS_EVENTS.into(),
125 subjects: vec!["obs.>".into()],
126 max_age: Duration::from_secs(OBS_EVENTS_RETENTION_DAYS * SECS_PER_DAY),
127 ..Default::default()
128 })
129 .await
130 .with_context(|| format!("create_or_update_stream {STREAM_OBS_EVENTS}"))?;
131 info!(stream = STREAM_OBS_EVENTS, "ready");
132
133 js.create_or_update_key_value(KvConfig {
136 bucket: BUCKET_SCRIPT_CURRENT.into(),
137 history: 5,
138 ..Default::default()
139 })
140 .await
141 .with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_CURRENT}"))?;
142 info!(bucket = BUCKET_SCRIPT_CURRENT, "ready");
143
144 js.create_or_update_key_value(KvConfig {
146 bucket: BUCKET_SCRIPT_STATUS.into(),
147 history: 5,
148 ..Default::default()
149 })
150 .await
151 .with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_STATUS}"))?;
152 info!(bucket = BUCKET_SCRIPT_STATUS, "ready");
153
154 js.create_or_update_key_value(KvConfig {
156 bucket: BUCKET_AGENTS_STATE.into(),
157 history: 1,
158 ..Default::default()
159 })
160 .await
161 .with_context(|| format!("create_or_update_key_value {BUCKET_AGENTS_STATE}"))?;
162 info!(bucket = BUCKET_AGENTS_STATE, "ready");
163
164 js.create_or_update_key_value(KvConfig {
167 bucket: BUCKET_AGENT_CONFIG.into(),
168 history: 5,
169 ..Default::default()
170 })
171 .await
172 .with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_CONFIG}"))?;
173 info!(bucket = BUCKET_AGENT_CONFIG, "ready");
174
175 js.create_or_update_key_value(KvConfig {
177 bucket: BUCKET_AGENT_GROUPS.into(),
178 history: 5,
179 ..Default::default()
180 })
181 .await
182 .with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_GROUPS}"))?;
183 info!(bucket = BUCKET_AGENT_GROUPS, "ready");
184
185 js.create_or_update_key_value(KvConfig {
189 bucket: BUCKET_SCHEDULES.into(),
190 history: 5,
191 ..Default::default()
192 })
193 .await
194 .with_context(|| format!("create_or_update_key_value {BUCKET_SCHEDULES}"))?;
195 info!(bucket = BUCKET_SCHEDULES, "ready");
196
197 js.create_or_update_key_value(KvConfig {
201 bucket: BUCKET_JOBS.into(),
202 history: 5,
203 ..Default::default()
204 })
205 .await
206 .with_context(|| format!("create_or_update_key_value {BUCKET_JOBS}"))?;
207 info!(bucket = BUCKET_JOBS, "ready");
208
209 js.create_or_update_key_value(KvConfig {
213 bucket: BUCKET_FLEET_CONFIG.into(),
214 history: 1,
215 ..Default::default()
216 })
217 .await
218 .with_context(|| format!("create_or_update_key_value {BUCKET_FLEET_CONFIG}"))?;
219 info!(bucket = BUCKET_FLEET_CONFIG, "ready");
220
221 js.create_or_update_key_value(KvConfig {
227 bucket: BUCKET_JOBS_YAML.into(),
228 history: 5,
229 ..Default::default()
230 })
231 .await
232 .with_context(|| format!("create_or_update_key_value {BUCKET_JOBS_YAML}"))?;
233 info!(bucket = BUCKET_JOBS_YAML, "ready");
234
235 js.create_or_update_key_value(KvConfig {
236 bucket: BUCKET_SCHEDULES_YAML.into(),
237 history: 5,
238 ..Default::default()
239 })
240 .await
241 .with_context(|| format!("create_or_update_key_value {BUCKET_SCHEDULES_YAML}"))?;
242 info!(bucket = BUCKET_SCHEDULES_YAML, "ready");
243
244 js.create_object_store(ObjectStoreConfig {
247 bucket: OBJECT_AGENT_RELEASES.into(),
248 ..Default::default()
249 })
250 .await
251 .with_context(|| format!("create_object_store {OBJECT_AGENT_RELEASES}"))?;
252 info!(store = OBJECT_AGENT_RELEASES, "ready");
253
254 js.create_object_store(ObjectStoreConfig {
260 bucket: OBJECT_APP_PACKAGES.into(),
261 ..Default::default()
262 })
263 .await
264 .with_context(|| format!("create_object_store {OBJECT_APP_PACKAGES}"))?;
265 info!(store = OBJECT_APP_PACKAGES, "ready");
266
267 js.create_object_store(ObjectStoreConfig {
273 bucket: OBJECT_SCRIPTS.into(),
274 ..Default::default()
275 })
276 .await
277 .with_context(|| format!("create_object_store {OBJECT_SCRIPTS}"))?;
278 info!(store = OBJECT_SCRIPTS, "ready");
279
280 js.create_object_store(ObjectStoreConfig {
290 bucket: OBJECT_RESULT_OUTPUT.into(),
291 max_age: Duration::from_secs(SECS_PER_DAY * 30),
292 ..Default::default()
293 })
294 .await
295 .with_context(|| format!("create_object_store {OBJECT_RESULT_OUTPUT}"))?;
296 info!(store = OBJECT_RESULT_OUTPUT, "ready");
297
298 Ok(())
299}