kanade_shared/
bootstrap.rs1use std::time::Duration;
14
15use anyhow::{Context, Result};
16use async_nats::jetstream::{
17 self,
18 kv::Config as KvConfig,
19 object_store::Config as ObjectStoreConfig,
20 stream::{Config as StreamConfig, DiscardPolicy},
21};
22use tracing::info;
23
24use crate::kv::{
25 BUCKET_AGENT_CONFIG, BUCKET_AGENT_GROUPS, BUCKET_AGENTS_STATE, BUCKET_JOBS, BUCKET_SCHEDULES,
26 BUCKET_SCRIPT_CURRENT, BUCKET_SCRIPT_STATUS, OBJECT_AGENT_RELEASES, STREAM_AUDIT,
27 STREAM_EVENTS, STREAM_EXEC, STREAM_INVENTORY, STREAM_RESULTS,
28};
29
30pub async fn ensure_jetstream_resources(js: &jetstream::Context) -> Result<()> {
38 js.create_stream(StreamConfig {
41 name: STREAM_INVENTORY.into(),
42 subjects: vec!["inventory.>".into()],
43 max_age: Duration::from_secs(90 * 24 * 60 * 60),
44 ..Default::default()
45 })
46 .await
47 .with_context(|| format!("create_stream {STREAM_INVENTORY}"))?;
48 info!(stream = STREAM_INVENTORY, "ready");
49
50 js.create_stream(StreamConfig {
52 name: STREAM_RESULTS.into(),
53 subjects: vec!["results.>".into()],
54 max_age: Duration::from_secs(30 * 24 * 60 * 60),
55 ..Default::default()
56 })
57 .await
58 .with_context(|| format!("create_stream {STREAM_RESULTS}"))?;
59 info!(stream = STREAM_RESULTS, "ready");
60
61 js.create_stream(StreamConfig {
70 name: STREAM_EXEC.into(),
71 subjects: vec!["commands.>".into()],
72 max_messages_per_subject: 1,
73 discard: DiscardPolicy::Old,
74 max_age: Duration::from_secs(7 * 24 * 60 * 60),
75 ..Default::default()
76 })
77 .await
78 .with_context(|| format!("create_stream {STREAM_EXEC}"))?;
79 info!(stream = STREAM_EXEC, "ready");
80
81 js.create_stream(StreamConfig {
84 name: STREAM_EVENTS.into(),
85 subjects: vec!["events.>".into()],
86 max_age: Duration::from_secs(7 * 24 * 60 * 60),
87 ..Default::default()
88 })
89 .await
90 .with_context(|| format!("create_stream {STREAM_EVENTS}"))?;
91 info!(stream = STREAM_EVENTS, "ready");
92
93 js.create_stream(StreamConfig {
95 name: STREAM_AUDIT.into(),
96 subjects: vec!["audit.>".into()],
97 ..Default::default()
98 })
99 .await
100 .with_context(|| format!("create_stream {STREAM_AUDIT}"))?;
101 info!(stream = STREAM_AUDIT, "ready");
102
103 js.create_key_value(KvConfig {
106 bucket: BUCKET_SCRIPT_CURRENT.into(),
107 history: 5,
108 ..Default::default()
109 })
110 .await
111 .with_context(|| format!("create_key_value {BUCKET_SCRIPT_CURRENT}"))?;
112 info!(bucket = BUCKET_SCRIPT_CURRENT, "ready");
113
114 js.create_key_value(KvConfig {
116 bucket: BUCKET_SCRIPT_STATUS.into(),
117 history: 5,
118 ..Default::default()
119 })
120 .await
121 .with_context(|| format!("create_key_value {BUCKET_SCRIPT_STATUS}"))?;
122 info!(bucket = BUCKET_SCRIPT_STATUS, "ready");
123
124 js.create_key_value(KvConfig {
126 bucket: BUCKET_AGENTS_STATE.into(),
127 history: 1,
128 ..Default::default()
129 })
130 .await
131 .with_context(|| format!("create_key_value {BUCKET_AGENTS_STATE}"))?;
132 info!(bucket = BUCKET_AGENTS_STATE, "ready");
133
134 js.create_key_value(KvConfig {
137 bucket: BUCKET_AGENT_CONFIG.into(),
138 history: 5,
139 ..Default::default()
140 })
141 .await
142 .with_context(|| format!("create_key_value {BUCKET_AGENT_CONFIG}"))?;
143 info!(bucket = BUCKET_AGENT_CONFIG, "ready");
144
145 js.create_key_value(KvConfig {
147 bucket: BUCKET_AGENT_GROUPS.into(),
148 history: 5,
149 ..Default::default()
150 })
151 .await
152 .with_context(|| format!("create_key_value {BUCKET_AGENT_GROUPS}"))?;
153 info!(bucket = BUCKET_AGENT_GROUPS, "ready");
154
155 js.create_key_value(KvConfig {
159 bucket: BUCKET_SCHEDULES.into(),
160 history: 5,
161 ..Default::default()
162 })
163 .await
164 .with_context(|| format!("create_key_value {BUCKET_SCHEDULES}"))?;
165 info!(bucket = BUCKET_SCHEDULES, "ready");
166
167 js.create_key_value(KvConfig {
171 bucket: BUCKET_JOBS.into(),
172 history: 5,
173 ..Default::default()
174 })
175 .await
176 .with_context(|| format!("create_key_value {BUCKET_JOBS}"))?;
177 info!(bucket = BUCKET_JOBS, "ready");
178
179 js.create_object_store(ObjectStoreConfig {
182 bucket: OBJECT_AGENT_RELEASES.into(),
183 ..Default::default()
184 })
185 .await
186 .with_context(|| format!("create_object_store {OBJECT_AGENT_RELEASES}"))?;
187 info!(store = OBJECT_AGENT_RELEASES, "ready");
188
189 Ok(())
190}