kanade_shared/
bootstrap.rs1use std::time::Duration;
18
19use anyhow::{Context, Result};
20use async_nats::jetstream::{
21 self,
22 kv::Config as KvConfig,
23 object_store::Config as ObjectStoreConfig,
24 stream::{Config as StreamConfig, DiscardPolicy},
25};
26use tracing::info;
27
28use crate::kv::{
29 BUCKET_AGENT_CONFIG, BUCKET_AGENT_GROUPS, BUCKET_AGENTS_STATE, BUCKET_JOBS, BUCKET_SCHEDULES,
30 BUCKET_SCRIPT_CURRENT, BUCKET_SCRIPT_STATUS, OBJECT_AGENT_RELEASES, STREAM_AUDIT,
31 STREAM_EVENTS, STREAM_EXEC, STREAM_INVENTORY, STREAM_RESULTS,
32};
33
34pub async fn ensure_jetstream_resources(js: &jetstream::Context) -> Result<()> {
42 js.create_or_update_stream(StreamConfig {
45 name: STREAM_INVENTORY.into(),
46 subjects: vec!["inventory.>".into()],
47 max_age: Duration::from_secs(90 * 24 * 60 * 60),
48 ..Default::default()
49 })
50 .await
51 .with_context(|| format!("create_or_update_stream {STREAM_INVENTORY}"))?;
52 info!(stream = STREAM_INVENTORY, "ready");
53
54 js.create_or_update_stream(StreamConfig {
56 name: STREAM_RESULTS.into(),
57 subjects: vec!["results.>".into()],
58 max_age: Duration::from_secs(30 * 24 * 60 * 60),
59 ..Default::default()
60 })
61 .await
62 .with_context(|| format!("create_or_update_stream {STREAM_RESULTS}"))?;
63 info!(stream = STREAM_RESULTS, "ready");
64
65 js.create_or_update_stream(StreamConfig {
74 name: STREAM_EXEC.into(),
75 subjects: vec!["commands.>".into()],
76 max_messages_per_subject: 1,
77 discard: DiscardPolicy::Old,
78 max_age: Duration::from_secs(7 * 24 * 60 * 60),
79 ..Default::default()
80 })
81 .await
82 .with_context(|| format!("create_or_update_stream {STREAM_EXEC}"))?;
83 info!(stream = STREAM_EXEC, "ready");
84
85 js.create_or_update_stream(StreamConfig {
88 name: STREAM_EVENTS.into(),
89 subjects: vec!["events.>".into()],
90 max_age: Duration::from_secs(7 * 24 * 60 * 60),
91 ..Default::default()
92 })
93 .await
94 .with_context(|| format!("create_or_update_stream {STREAM_EVENTS}"))?;
95 info!(stream = STREAM_EVENTS, "ready");
96
97 js.create_or_update_stream(StreamConfig {
99 name: STREAM_AUDIT.into(),
100 subjects: vec!["audit.>".into()],
101 ..Default::default()
102 })
103 .await
104 .with_context(|| format!("create_or_update_stream {STREAM_AUDIT}"))?;
105 info!(stream = STREAM_AUDIT, "ready");
106
107 js.create_or_update_key_value(KvConfig {
110 bucket: BUCKET_SCRIPT_CURRENT.into(),
111 history: 5,
112 ..Default::default()
113 })
114 .await
115 .with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_CURRENT}"))?;
116 info!(bucket = BUCKET_SCRIPT_CURRENT, "ready");
117
118 js.create_or_update_key_value(KvConfig {
120 bucket: BUCKET_SCRIPT_STATUS.into(),
121 history: 5,
122 ..Default::default()
123 })
124 .await
125 .with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_STATUS}"))?;
126 info!(bucket = BUCKET_SCRIPT_STATUS, "ready");
127
128 js.create_or_update_key_value(KvConfig {
130 bucket: BUCKET_AGENTS_STATE.into(),
131 history: 1,
132 ..Default::default()
133 })
134 .await
135 .with_context(|| format!("create_or_update_key_value {BUCKET_AGENTS_STATE}"))?;
136 info!(bucket = BUCKET_AGENTS_STATE, "ready");
137
138 js.create_or_update_key_value(KvConfig {
141 bucket: BUCKET_AGENT_CONFIG.into(),
142 history: 5,
143 ..Default::default()
144 })
145 .await
146 .with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_CONFIG}"))?;
147 info!(bucket = BUCKET_AGENT_CONFIG, "ready");
148
149 js.create_or_update_key_value(KvConfig {
151 bucket: BUCKET_AGENT_GROUPS.into(),
152 history: 5,
153 ..Default::default()
154 })
155 .await
156 .with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_GROUPS}"))?;
157 info!(bucket = BUCKET_AGENT_GROUPS, "ready");
158
159 js.create_or_update_key_value(KvConfig {
163 bucket: BUCKET_SCHEDULES.into(),
164 history: 5,
165 ..Default::default()
166 })
167 .await
168 .with_context(|| format!("create_or_update_key_value {BUCKET_SCHEDULES}"))?;
169 info!(bucket = BUCKET_SCHEDULES, "ready");
170
171 js.create_or_update_key_value(KvConfig {
175 bucket: BUCKET_JOBS.into(),
176 history: 5,
177 ..Default::default()
178 })
179 .await
180 .with_context(|| format!("create_or_update_key_value {BUCKET_JOBS}"))?;
181 info!(bucket = BUCKET_JOBS, "ready");
182
183 js.create_object_store(ObjectStoreConfig {
186 bucket: OBJECT_AGENT_RELEASES.into(),
187 ..Default::default()
188 })
189 .await
190 .with_context(|| format!("create_object_store {OBJECT_AGENT_RELEASES}"))?;
191 info!(store = OBJECT_AGENT_RELEASES, "ready");
192
193 Ok(())
194}