use std::time::Duration;
use anyhow::{Context, Result};
use async_nats::jetstream::{
self,
kv::Config as KvConfig,
object_store::Config as ObjectStoreConfig,
stream::{Config as StreamConfig, DiscardPolicy},
};
use tracing::info;
use crate::kv::{
BUCKET_AGENT_CONFIG, BUCKET_AGENT_GROUPS, BUCKET_AGENTS_STATE, BUCKET_SCHEDULES,
BUCKET_SCRIPT_CURRENT, BUCKET_SCRIPT_STATUS, OBJECT_AGENT_RELEASES, STREAM_AUDIT,
STREAM_DEPLOY, STREAM_EVENTS, STREAM_INVENTORY, STREAM_RESULTS,
};
pub async fn ensure_jetstream_resources(js: &jetstream::Context) -> Result<()> {
js.create_stream(StreamConfig {
name: STREAM_INVENTORY.into(),
subjects: vec!["inventory.>".into()],
max_age: Duration::from_secs(90 * 24 * 60 * 60),
..Default::default()
})
.await
.with_context(|| format!("create_stream {STREAM_INVENTORY}"))?;
info!(stream = STREAM_INVENTORY, "ready");
js.create_stream(StreamConfig {
name: STREAM_RESULTS.into(),
subjects: vec!["results.>".into()],
max_age: Duration::from_secs(30 * 24 * 60 * 60),
..Default::default()
})
.await
.with_context(|| format!("create_stream {STREAM_RESULTS}"))?;
info!(stream = STREAM_RESULTS, "ready");
js.create_stream(StreamConfig {
name: STREAM_DEPLOY.into(),
subjects: vec!["commands.deploy.>".into()],
max_messages_per_subject: 1,
discard: DiscardPolicy::Old,
max_age: Duration::from_secs(7 * 24 * 60 * 60),
..Default::default()
})
.await
.with_context(|| format!("create_stream {STREAM_DEPLOY}"))?;
info!(stream = STREAM_DEPLOY, "ready");
js.create_stream(StreamConfig {
name: STREAM_EVENTS.into(),
subjects: vec!["events.>".into()],
max_age: Duration::from_secs(7 * 24 * 60 * 60),
..Default::default()
})
.await
.with_context(|| format!("create_stream {STREAM_EVENTS}"))?;
info!(stream = STREAM_EVENTS, "ready");
js.create_stream(StreamConfig {
name: STREAM_AUDIT.into(),
subjects: vec!["audit.>".into()],
..Default::default()
})
.await
.with_context(|| format!("create_stream {STREAM_AUDIT}"))?;
info!(stream = STREAM_AUDIT, "ready");
js.create_key_value(KvConfig {
bucket: BUCKET_SCRIPT_CURRENT.into(),
history: 5,
..Default::default()
})
.await
.with_context(|| format!("create_key_value {BUCKET_SCRIPT_CURRENT}"))?;
info!(bucket = BUCKET_SCRIPT_CURRENT, "ready");
js.create_key_value(KvConfig {
bucket: BUCKET_SCRIPT_STATUS.into(),
history: 5,
..Default::default()
})
.await
.with_context(|| format!("create_key_value {BUCKET_SCRIPT_STATUS}"))?;
info!(bucket = BUCKET_SCRIPT_STATUS, "ready");
js.create_key_value(KvConfig {
bucket: BUCKET_AGENTS_STATE.into(),
history: 1,
..Default::default()
})
.await
.with_context(|| format!("create_key_value {BUCKET_AGENTS_STATE}"))?;
info!(bucket = BUCKET_AGENTS_STATE, "ready");
js.create_key_value(KvConfig {
bucket: BUCKET_AGENT_CONFIG.into(),
history: 5,
..Default::default()
})
.await
.with_context(|| format!("create_key_value {BUCKET_AGENT_CONFIG}"))?;
info!(bucket = BUCKET_AGENT_CONFIG, "ready");
js.create_key_value(KvConfig {
bucket: BUCKET_AGENT_GROUPS.into(),
history: 5,
..Default::default()
})
.await
.with_context(|| format!("create_key_value {BUCKET_AGENT_GROUPS}"))?;
info!(bucket = BUCKET_AGENT_GROUPS, "ready");
js.create_key_value(KvConfig {
bucket: BUCKET_SCHEDULES.into(),
history: 5,
..Default::default()
})
.await
.with_context(|| format!("create_key_value {BUCKET_SCHEDULES}"))?;
info!(bucket = BUCKET_SCHEDULES, "ready");
js.create_object_store(ObjectStoreConfig {
bucket: OBJECT_AGENT_RELEASES.into(),
..Default::default()
})
.await
.with_context(|| format!("create_object_store {OBJECT_AGENT_RELEASES}"))?;
info!(store = OBJECT_AGENT_RELEASES, "ready");
Ok(())
}