kanade_shared/
bootstrap.rs1use std::time::Duration;
14
15use anyhow::{Context, Result};
16use async_nats::jetstream::{
17 self,
18 kv::Config as KvConfig,
19 object_store::Config as ObjectStoreConfig,
20 stream::{Config as StreamConfig, DiscardPolicy},
21};
22use tracing::info;
23
24use crate::kv::{
25 BUCKET_AGENT_CONFIG, BUCKET_AGENT_GROUPS, BUCKET_AGENTS_STATE, BUCKET_SCHEDULES,
26 BUCKET_SCRIPT_CURRENT, BUCKET_SCRIPT_STATUS, OBJECT_AGENT_RELEASES, STREAM_AUDIT,
27 STREAM_DEPLOY, STREAM_EVENTS, STREAM_INVENTORY, STREAM_RESULTS,
28};
29
30pub async fn ensure_jetstream_resources(js: &jetstream::Context) -> Result<()> {
38 js.create_stream(StreamConfig {
41 name: STREAM_INVENTORY.into(),
42 subjects: vec!["inventory.>".into()],
43 max_age: Duration::from_secs(90 * 24 * 60 * 60),
44 ..Default::default()
45 })
46 .await
47 .with_context(|| format!("create_stream {STREAM_INVENTORY}"))?;
48 info!(stream = STREAM_INVENTORY, "ready");
49
50 js.create_stream(StreamConfig {
52 name: STREAM_RESULTS.into(),
53 subjects: vec!["results.>".into()],
54 max_age: Duration::from_secs(30 * 24 * 60 * 60),
55 ..Default::default()
56 })
57 .await
58 .with_context(|| format!("create_stream {STREAM_RESULTS}"))?;
59 info!(stream = STREAM_RESULTS, "ready");
60
61 js.create_stream(StreamConfig {
63 name: STREAM_DEPLOY.into(),
64 subjects: vec!["commands.deploy.>".into()],
65 max_messages_per_subject: 1,
66 discard: DiscardPolicy::Old,
67 max_age: Duration::from_secs(7 * 24 * 60 * 60),
68 ..Default::default()
69 })
70 .await
71 .with_context(|| format!("create_stream {STREAM_DEPLOY}"))?;
72 info!(stream = STREAM_DEPLOY, "ready");
73
74 js.create_stream(StreamConfig {
77 name: STREAM_EVENTS.into(),
78 subjects: vec!["events.>".into()],
79 max_age: Duration::from_secs(7 * 24 * 60 * 60),
80 ..Default::default()
81 })
82 .await
83 .with_context(|| format!("create_stream {STREAM_EVENTS}"))?;
84 info!(stream = STREAM_EVENTS, "ready");
85
86 js.create_stream(StreamConfig {
88 name: STREAM_AUDIT.into(),
89 subjects: vec!["audit.>".into()],
90 ..Default::default()
91 })
92 .await
93 .with_context(|| format!("create_stream {STREAM_AUDIT}"))?;
94 info!(stream = STREAM_AUDIT, "ready");
95
96 js.create_key_value(KvConfig {
99 bucket: BUCKET_SCRIPT_CURRENT.into(),
100 history: 5,
101 ..Default::default()
102 })
103 .await
104 .with_context(|| format!("create_key_value {BUCKET_SCRIPT_CURRENT}"))?;
105 info!(bucket = BUCKET_SCRIPT_CURRENT, "ready");
106
107 js.create_key_value(KvConfig {
109 bucket: BUCKET_SCRIPT_STATUS.into(),
110 history: 5,
111 ..Default::default()
112 })
113 .await
114 .with_context(|| format!("create_key_value {BUCKET_SCRIPT_STATUS}"))?;
115 info!(bucket = BUCKET_SCRIPT_STATUS, "ready");
116
117 js.create_key_value(KvConfig {
119 bucket: BUCKET_AGENTS_STATE.into(),
120 history: 1,
121 ..Default::default()
122 })
123 .await
124 .with_context(|| format!("create_key_value {BUCKET_AGENTS_STATE}"))?;
125 info!(bucket = BUCKET_AGENTS_STATE, "ready");
126
127 js.create_key_value(KvConfig {
130 bucket: BUCKET_AGENT_CONFIG.into(),
131 history: 5,
132 ..Default::default()
133 })
134 .await
135 .with_context(|| format!("create_key_value {BUCKET_AGENT_CONFIG}"))?;
136 info!(bucket = BUCKET_AGENT_CONFIG, "ready");
137
138 js.create_key_value(KvConfig {
140 bucket: BUCKET_AGENT_GROUPS.into(),
141 history: 5,
142 ..Default::default()
143 })
144 .await
145 .with_context(|| format!("create_key_value {BUCKET_AGENT_GROUPS}"))?;
146 info!(bucket = BUCKET_AGENT_GROUPS, "ready");
147
148 js.create_key_value(KvConfig {
152 bucket: BUCKET_SCHEDULES.into(),
153 history: 5,
154 ..Default::default()
155 })
156 .await
157 .with_context(|| format!("create_key_value {BUCKET_SCHEDULES}"))?;
158 info!(bucket = BUCKET_SCHEDULES, "ready");
159
160 js.create_object_store(ObjectStoreConfig {
163 bucket: OBJECT_AGENT_RELEASES.into(),
164 ..Default::default()
165 })
166 .await
167 .with_context(|| format!("create_object_store {OBJECT_AGENT_RELEASES}"))?;
168 info!(store = OBJECT_AGENT_RELEASES, "ready");
169
170 Ok(())
171}