kanade_shared/
bootstrap.rs1use std::time::Duration;
18
19use anyhow::{Context, Result};
20use async_nats::jetstream::{
21 self,
22 kv::Config as KvConfig,
23 object_store::Config as ObjectStoreConfig,
24 stream::{Config as StreamConfig, DiscardPolicy},
25};
26use tracing::info;
27
28use crate::kv::{
29 BUCKET_AGENT_CONFIG, BUCKET_AGENT_GROUPS, BUCKET_AGENTS_STATE, BUCKET_JOBS, BUCKET_JOBS_YAML,
30 BUCKET_SCHEDULES, BUCKET_SCHEDULES_YAML, BUCKET_SCRIPT_CURRENT, BUCKET_SCRIPT_STATUS,
31 OBJECT_AGENT_RELEASES, STREAM_AUDIT, STREAM_EVENTS, STREAM_EXEC, STREAM_INVENTORY,
32 STREAM_RESULTS,
33};
34
35pub async fn ensure_jetstream_resources(js: &jetstream::Context) -> Result<()> {
43 js.create_or_update_stream(StreamConfig {
46 name: STREAM_INVENTORY.into(),
47 subjects: vec!["inventory.>".into()],
48 max_age: Duration::from_secs(90 * 24 * 60 * 60),
49 ..Default::default()
50 })
51 .await
52 .with_context(|| format!("create_or_update_stream {STREAM_INVENTORY}"))?;
53 info!(stream = STREAM_INVENTORY, "ready");
54
55 js.create_or_update_stream(StreamConfig {
57 name: STREAM_RESULTS.into(),
58 subjects: vec!["results.>".into()],
59 max_age: Duration::from_secs(30 * 24 * 60 * 60),
60 ..Default::default()
61 })
62 .await
63 .with_context(|| format!("create_or_update_stream {STREAM_RESULTS}"))?;
64 info!(stream = STREAM_RESULTS, "ready");
65
66 js.create_or_update_stream(StreamConfig {
75 name: STREAM_EXEC.into(),
76 subjects: vec!["commands.>".into()],
77 max_messages_per_subject: 1,
78 discard: DiscardPolicy::Old,
79 max_age: Duration::from_secs(7 * 24 * 60 * 60),
80 ..Default::default()
81 })
82 .await
83 .with_context(|| format!("create_or_update_stream {STREAM_EXEC}"))?;
84 info!(stream = STREAM_EXEC, "ready");
85
86 js.create_or_update_stream(StreamConfig {
89 name: STREAM_EVENTS.into(),
90 subjects: vec!["events.>".into()],
91 max_age: Duration::from_secs(7 * 24 * 60 * 60),
92 ..Default::default()
93 })
94 .await
95 .with_context(|| format!("create_or_update_stream {STREAM_EVENTS}"))?;
96 info!(stream = STREAM_EVENTS, "ready");
97
98 js.create_or_update_stream(StreamConfig {
100 name: STREAM_AUDIT.into(),
101 subjects: vec!["audit.>".into()],
102 ..Default::default()
103 })
104 .await
105 .with_context(|| format!("create_or_update_stream {STREAM_AUDIT}"))?;
106 info!(stream = STREAM_AUDIT, "ready");
107
108 js.create_or_update_key_value(KvConfig {
111 bucket: BUCKET_SCRIPT_CURRENT.into(),
112 history: 5,
113 ..Default::default()
114 })
115 .await
116 .with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_CURRENT}"))?;
117 info!(bucket = BUCKET_SCRIPT_CURRENT, "ready");
118
119 js.create_or_update_key_value(KvConfig {
121 bucket: BUCKET_SCRIPT_STATUS.into(),
122 history: 5,
123 ..Default::default()
124 })
125 .await
126 .with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_STATUS}"))?;
127 info!(bucket = BUCKET_SCRIPT_STATUS, "ready");
128
129 js.create_or_update_key_value(KvConfig {
131 bucket: BUCKET_AGENTS_STATE.into(),
132 history: 1,
133 ..Default::default()
134 })
135 .await
136 .with_context(|| format!("create_or_update_key_value {BUCKET_AGENTS_STATE}"))?;
137 info!(bucket = BUCKET_AGENTS_STATE, "ready");
138
139 js.create_or_update_key_value(KvConfig {
142 bucket: BUCKET_AGENT_CONFIG.into(),
143 history: 5,
144 ..Default::default()
145 })
146 .await
147 .with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_CONFIG}"))?;
148 info!(bucket = BUCKET_AGENT_CONFIG, "ready");
149
150 js.create_or_update_key_value(KvConfig {
152 bucket: BUCKET_AGENT_GROUPS.into(),
153 history: 5,
154 ..Default::default()
155 })
156 .await
157 .with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_GROUPS}"))?;
158 info!(bucket = BUCKET_AGENT_GROUPS, "ready");
159
160 js.create_or_update_key_value(KvConfig {
164 bucket: BUCKET_SCHEDULES.into(),
165 history: 5,
166 ..Default::default()
167 })
168 .await
169 .with_context(|| format!("create_or_update_key_value {BUCKET_SCHEDULES}"))?;
170 info!(bucket = BUCKET_SCHEDULES, "ready");
171
172 js.create_or_update_key_value(KvConfig {
176 bucket: BUCKET_JOBS.into(),
177 history: 5,
178 ..Default::default()
179 })
180 .await
181 .with_context(|| format!("create_or_update_key_value {BUCKET_JOBS}"))?;
182 info!(bucket = BUCKET_JOBS, "ready");
183
184 js.create_or_update_key_value(KvConfig {
190 bucket: BUCKET_JOBS_YAML.into(),
191 history: 5,
192 ..Default::default()
193 })
194 .await
195 .with_context(|| format!("create_or_update_key_value {BUCKET_JOBS_YAML}"))?;
196 info!(bucket = BUCKET_JOBS_YAML, "ready");
197
198 js.create_or_update_key_value(KvConfig {
199 bucket: BUCKET_SCHEDULES_YAML.into(),
200 history: 5,
201 ..Default::default()
202 })
203 .await
204 .with_context(|| format!("create_or_update_key_value {BUCKET_SCHEDULES_YAML}"))?;
205 info!(bucket = BUCKET_SCHEDULES_YAML, "ready");
206
207 js.create_object_store(ObjectStoreConfig {
210 bucket: OBJECT_AGENT_RELEASES.into(),
211 ..Default::default()
212 })
213 .await
214 .with_context(|| format!("create_object_store {OBJECT_AGENT_RELEASES}"))?;
215 info!(store = OBJECT_AGENT_RELEASES, "ready");
216
217 Ok(())
218}