1use std::time::Duration;
18
19use anyhow::{Context, Result};
20use async_nats::jetstream::{
21 self,
22 kv::Config as KvConfig,
23 object_store::Config as ObjectStoreConfig,
24 stream::{Config as StreamConfig, DiscardPolicy},
25};
26use tracing::info;
27
28use crate::kv::{
29 BUCKET_AGENT_CONFIG, BUCKET_AGENT_GROUPS, BUCKET_AGENTS_STATE, BUCKET_JOBS, BUCKET_JOBS_YAML,
30 BUCKET_SCHEDULES, BUCKET_SCHEDULES_YAML, BUCKET_SCRIPT_CURRENT, BUCKET_SCRIPT_STATUS,
31 OBJECT_AGENT_RELEASES, OBJECT_APP_PACKAGES, OBJECT_RESULT_OUTPUT, OBJECT_SCRIPTS, STREAM_AUDIT,
32 STREAM_EVENTS, STREAM_EXEC, STREAM_INVENTORY, STREAM_OBS_EVENTS, STREAM_RESULTS,
33};
34
35pub async fn ensure_jetstream_resources(js: &jetstream::Context) -> Result<()> {
43 js.create_or_update_stream(StreamConfig {
46 name: STREAM_INVENTORY.into(),
47 subjects: vec!["inventory.>".into()],
48 max_age: Duration::from_secs(90 * 24 * 60 * 60),
49 ..Default::default()
50 })
51 .await
52 .with_context(|| format!("create_or_update_stream {STREAM_INVENTORY}"))?;
53 info!(stream = STREAM_INVENTORY, "ready");
54
55 js.create_or_update_stream(StreamConfig {
57 name: STREAM_RESULTS.into(),
58 subjects: vec!["results.>".into()],
59 max_age: Duration::from_secs(30 * 24 * 60 * 60),
60 ..Default::default()
61 })
62 .await
63 .with_context(|| format!("create_or_update_stream {STREAM_RESULTS}"))?;
64 info!(stream = STREAM_RESULTS, "ready");
65
66 js.create_or_update_stream(StreamConfig {
75 name: STREAM_EXEC.into(),
76 subjects: vec!["commands.>".into()],
77 max_messages_per_subject: 1,
78 discard: DiscardPolicy::Old,
79 max_age: Duration::from_secs(7 * 24 * 60 * 60),
80 ..Default::default()
81 })
82 .await
83 .with_context(|| format!("create_or_update_stream {STREAM_EXEC}"))?;
84 info!(stream = STREAM_EXEC, "ready");
85
86 js.create_or_update_stream(StreamConfig {
89 name: STREAM_EVENTS.into(),
90 subjects: vec!["events.>".into()],
91 max_age: Duration::from_secs(7 * 24 * 60 * 60),
92 ..Default::default()
93 })
94 .await
95 .with_context(|| format!("create_or_update_stream {STREAM_EVENTS}"))?;
96 info!(stream = STREAM_EVENTS, "ready");
97
98 js.create_or_update_stream(StreamConfig {
100 name: STREAM_AUDIT.into(),
101 subjects: vec!["audit.>".into()],
102 ..Default::default()
103 })
104 .await
105 .with_context(|| format!("create_or_update_stream {STREAM_AUDIT}"))?;
106 info!(stream = STREAM_AUDIT, "ready");
107
108 const SECS_PER_DAY: u64 = 24 * 60 * 60;
121 const OBS_EVENTS_RETENTION_DAYS: u64 = 90;
122 js.create_or_update_stream(StreamConfig {
123 name: STREAM_OBS_EVENTS.into(),
124 subjects: vec!["obs.>".into()],
125 max_age: Duration::from_secs(OBS_EVENTS_RETENTION_DAYS * SECS_PER_DAY),
126 ..Default::default()
127 })
128 .await
129 .with_context(|| format!("create_or_update_stream {STREAM_OBS_EVENTS}"))?;
130 info!(stream = STREAM_OBS_EVENTS, "ready");
131
132 js.create_or_update_key_value(KvConfig {
135 bucket: BUCKET_SCRIPT_CURRENT.into(),
136 history: 5,
137 ..Default::default()
138 })
139 .await
140 .with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_CURRENT}"))?;
141 info!(bucket = BUCKET_SCRIPT_CURRENT, "ready");
142
143 js.create_or_update_key_value(KvConfig {
145 bucket: BUCKET_SCRIPT_STATUS.into(),
146 history: 5,
147 ..Default::default()
148 })
149 .await
150 .with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_STATUS}"))?;
151 info!(bucket = BUCKET_SCRIPT_STATUS, "ready");
152
153 js.create_or_update_key_value(KvConfig {
155 bucket: BUCKET_AGENTS_STATE.into(),
156 history: 1,
157 ..Default::default()
158 })
159 .await
160 .with_context(|| format!("create_or_update_key_value {BUCKET_AGENTS_STATE}"))?;
161 info!(bucket = BUCKET_AGENTS_STATE, "ready");
162
163 js.create_or_update_key_value(KvConfig {
166 bucket: BUCKET_AGENT_CONFIG.into(),
167 history: 5,
168 ..Default::default()
169 })
170 .await
171 .with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_CONFIG}"))?;
172 info!(bucket = BUCKET_AGENT_CONFIG, "ready");
173
174 js.create_or_update_key_value(KvConfig {
176 bucket: BUCKET_AGENT_GROUPS.into(),
177 history: 5,
178 ..Default::default()
179 })
180 .await
181 .with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_GROUPS}"))?;
182 info!(bucket = BUCKET_AGENT_GROUPS, "ready");
183
184 js.create_or_update_key_value(KvConfig {
188 bucket: BUCKET_SCHEDULES.into(),
189 history: 5,
190 ..Default::default()
191 })
192 .await
193 .with_context(|| format!("create_or_update_key_value {BUCKET_SCHEDULES}"))?;
194 info!(bucket = BUCKET_SCHEDULES, "ready");
195
196 js.create_or_update_key_value(KvConfig {
200 bucket: BUCKET_JOBS.into(),
201 history: 5,
202 ..Default::default()
203 })
204 .await
205 .with_context(|| format!("create_or_update_key_value {BUCKET_JOBS}"))?;
206 info!(bucket = BUCKET_JOBS, "ready");
207
208 js.create_or_update_key_value(KvConfig {
214 bucket: BUCKET_JOBS_YAML.into(),
215 history: 5,
216 ..Default::default()
217 })
218 .await
219 .with_context(|| format!("create_or_update_key_value {BUCKET_JOBS_YAML}"))?;
220 info!(bucket = BUCKET_JOBS_YAML, "ready");
221
222 js.create_or_update_key_value(KvConfig {
223 bucket: BUCKET_SCHEDULES_YAML.into(),
224 history: 5,
225 ..Default::default()
226 })
227 .await
228 .with_context(|| format!("create_or_update_key_value {BUCKET_SCHEDULES_YAML}"))?;
229 info!(bucket = BUCKET_SCHEDULES_YAML, "ready");
230
231 js.create_object_store(ObjectStoreConfig {
234 bucket: OBJECT_AGENT_RELEASES.into(),
235 ..Default::default()
236 })
237 .await
238 .with_context(|| format!("create_object_store {OBJECT_AGENT_RELEASES}"))?;
239 info!(store = OBJECT_AGENT_RELEASES, "ready");
240
241 js.create_object_store(ObjectStoreConfig {
247 bucket: OBJECT_APP_PACKAGES.into(),
248 ..Default::default()
249 })
250 .await
251 .with_context(|| format!("create_object_store {OBJECT_APP_PACKAGES}"))?;
252 info!(store = OBJECT_APP_PACKAGES, "ready");
253
254 js.create_object_store(ObjectStoreConfig {
260 bucket: OBJECT_SCRIPTS.into(),
261 ..Default::default()
262 })
263 .await
264 .with_context(|| format!("create_object_store {OBJECT_SCRIPTS}"))?;
265 info!(store = OBJECT_SCRIPTS, "ready");
266
267 js.create_object_store(ObjectStoreConfig {
277 bucket: OBJECT_RESULT_OUTPUT.into(),
278 max_age: Duration::from_secs(SECS_PER_DAY * 30),
279 ..Default::default()
280 })
281 .await
282 .with_context(|| format!("create_object_store {OBJECT_RESULT_OUTPUT}"))?;
283 info!(store = OBJECT_RESULT_OUTPUT, "ready");
284
285 Ok(())
286}