kanade_shared/
bootstrap.rs1use std::time::Duration;
14
15use anyhow::{Context, Result};
16use async_nats::jetstream::{
17 self,
18 kv::Config as KvConfig,
19 object_store::Config as ObjectStoreConfig,
20 stream::{Config as StreamConfig, DiscardPolicy},
21};
22use tracing::info;
23
24use crate::kv::{
25 BUCKET_AGENT_CONFIG, BUCKET_AGENT_GROUPS, BUCKET_AGENTS_STATE, BUCKET_JOBS, BUCKET_SCHEDULES,
26 BUCKET_SCRIPT_CURRENT, BUCKET_SCRIPT_STATUS, OBJECT_AGENT_RELEASES, STREAM_AUDIT,
27 STREAM_EVENTS, STREAM_EXEC, STREAM_INVENTORY, STREAM_RESULTS,
28};
29
30pub async fn ensure_jetstream_resources(js: &jetstream::Context) -> Result<()> {
38 js.create_stream(StreamConfig {
41 name: STREAM_INVENTORY.into(),
42 subjects: vec!["inventory.>".into()],
43 max_age: Duration::from_secs(90 * 24 * 60 * 60),
44 ..Default::default()
45 })
46 .await
47 .with_context(|| format!("create_stream {STREAM_INVENTORY}"))?;
48 info!(stream = STREAM_INVENTORY, "ready");
49
50 js.create_stream(StreamConfig {
52 name: STREAM_RESULTS.into(),
53 subjects: vec!["results.>".into()],
54 max_age: Duration::from_secs(30 * 24 * 60 * 60),
55 ..Default::default()
56 })
57 .await
58 .with_context(|| format!("create_stream {STREAM_RESULTS}"))?;
59 info!(stream = STREAM_RESULTS, "ready");
60
61 js.create_stream(StreamConfig {
65 name: STREAM_EXEC.into(),
66 subjects: vec!["commands.exec.>".into()],
67 max_messages_per_subject: 1,
68 discard: DiscardPolicy::Old,
69 max_age: Duration::from_secs(7 * 24 * 60 * 60),
70 ..Default::default()
71 })
72 .await
73 .with_context(|| format!("create_stream {STREAM_EXEC}"))?;
74 info!(stream = STREAM_EXEC, "ready");
75
76 js.create_stream(StreamConfig {
79 name: STREAM_EVENTS.into(),
80 subjects: vec!["events.>".into()],
81 max_age: Duration::from_secs(7 * 24 * 60 * 60),
82 ..Default::default()
83 })
84 .await
85 .with_context(|| format!("create_stream {STREAM_EVENTS}"))?;
86 info!(stream = STREAM_EVENTS, "ready");
87
88 js.create_stream(StreamConfig {
90 name: STREAM_AUDIT.into(),
91 subjects: vec!["audit.>".into()],
92 ..Default::default()
93 })
94 .await
95 .with_context(|| format!("create_stream {STREAM_AUDIT}"))?;
96 info!(stream = STREAM_AUDIT, "ready");
97
98 js.create_key_value(KvConfig {
101 bucket: BUCKET_SCRIPT_CURRENT.into(),
102 history: 5,
103 ..Default::default()
104 })
105 .await
106 .with_context(|| format!("create_key_value {BUCKET_SCRIPT_CURRENT}"))?;
107 info!(bucket = BUCKET_SCRIPT_CURRENT, "ready");
108
109 js.create_key_value(KvConfig {
111 bucket: BUCKET_SCRIPT_STATUS.into(),
112 history: 5,
113 ..Default::default()
114 })
115 .await
116 .with_context(|| format!("create_key_value {BUCKET_SCRIPT_STATUS}"))?;
117 info!(bucket = BUCKET_SCRIPT_STATUS, "ready");
118
119 js.create_key_value(KvConfig {
121 bucket: BUCKET_AGENTS_STATE.into(),
122 history: 1,
123 ..Default::default()
124 })
125 .await
126 .with_context(|| format!("create_key_value {BUCKET_AGENTS_STATE}"))?;
127 info!(bucket = BUCKET_AGENTS_STATE, "ready");
128
129 js.create_key_value(KvConfig {
132 bucket: BUCKET_AGENT_CONFIG.into(),
133 history: 5,
134 ..Default::default()
135 })
136 .await
137 .with_context(|| format!("create_key_value {BUCKET_AGENT_CONFIG}"))?;
138 info!(bucket = BUCKET_AGENT_CONFIG, "ready");
139
140 js.create_key_value(KvConfig {
142 bucket: BUCKET_AGENT_GROUPS.into(),
143 history: 5,
144 ..Default::default()
145 })
146 .await
147 .with_context(|| format!("create_key_value {BUCKET_AGENT_GROUPS}"))?;
148 info!(bucket = BUCKET_AGENT_GROUPS, "ready");
149
150 js.create_key_value(KvConfig {
154 bucket: BUCKET_SCHEDULES.into(),
155 history: 5,
156 ..Default::default()
157 })
158 .await
159 .with_context(|| format!("create_key_value {BUCKET_SCHEDULES}"))?;
160 info!(bucket = BUCKET_SCHEDULES, "ready");
161
162 js.create_key_value(KvConfig {
166 bucket: BUCKET_JOBS.into(),
167 history: 5,
168 ..Default::default()
169 })
170 .await
171 .with_context(|| format!("create_key_value {BUCKET_JOBS}"))?;
172 info!(bucket = BUCKET_JOBS, "ready");
173
174 js.create_object_store(ObjectStoreConfig {
177 bucket: OBJECT_AGENT_RELEASES.into(),
178 ..Default::default()
179 })
180 .await
181 .with_context(|| format!("create_object_store {OBJECT_AGENT_RELEASES}"))?;
182 info!(store = OBJECT_AGENT_RELEASES, "ready");
183
184 Ok(())
185}