use std::time::Duration;
use anyhow::{Context, Result};
use async_nats::jetstream::{
self,
kv::Config as KvConfig,
object_store::Config as ObjectStoreConfig,
stream::{Config as StreamConfig, DiscardPolicy},
};
use tracing::{info, warn};
use crate::kv::{
BUCKET_AGENT_CONFIG, BUCKET_AGENT_GROUPS, BUCKET_AGENTS_STATE, BUCKET_FLEET_CONFIG,
BUCKET_JOBS, BUCKET_JOBS_YAML, BUCKET_NOTIFICATIONS_READ, BUCKET_SCHEDULES,
BUCKET_SCHEDULES_YAML, BUCKET_SCRIPT_CURRENT, BUCKET_SCRIPT_STATUS, OBJECT_AGENT_RELEASES,
OBJECT_APP_PACKAGES, OBJECT_RESULT_OUTPUT, OBJECT_SCRIPTS, STREAM_AUDIT, STREAM_EVENTS,
STREAM_EXEC, STREAM_INVENTORY, STREAM_NOTIFICATIONS, STREAM_OBS_EVENTS, STREAM_RESULTS,
};
async fn ensure_object_store(js: &jetstream::Context, cfg: ObjectStoreConfig) -> Result<()> {
let name = cfg.bucket.clone();
if let Err(e) = js.create_object_store(cfg).await {
if js.get_object_store(&name).await.is_err() {
return Err(e).with_context(|| {
format!("create_object_store {name} (and no existing store to fall back to)")
});
}
warn!(
store = %name, error = %e,
"object store exists with a different config; using it as-is (cap not reconciled)",
);
}
info!(store = %name, "ready");
Ok(())
}
pub async fn ensure_jetstream_resources(js: &jetstream::Context) -> Result<()> {
const MIB: i64 = 1024 * 1024;
const GIB: i64 = 1024 * MIB;
js.create_or_update_stream(StreamConfig {
name: STREAM_INVENTORY.into(),
subjects: vec!["inventory.>".into()],
max_age: Duration::from_secs(90 * 24 * 60 * 60),
max_bytes: GIB,
discard: DiscardPolicy::Old,
..Default::default()
})
.await
.with_context(|| format!("create_or_update_stream {STREAM_INVENTORY}"))?;
info!(stream = STREAM_INVENTORY, "ready");
js.create_or_update_stream(StreamConfig {
name: STREAM_RESULTS.into(),
subjects: vec!["results.>".into()],
max_age: Duration::from_secs(30 * 24 * 60 * 60),
max_bytes: 2 * GIB,
discard: DiscardPolicy::Old,
..Default::default()
})
.await
.with_context(|| format!("create_or_update_stream {STREAM_RESULTS}"))?;
info!(stream = STREAM_RESULTS, "ready");
js.create_or_update_stream(StreamConfig {
name: STREAM_EXEC.into(),
subjects: vec!["commands.>".into()],
max_messages_per_subject: 1,
max_age: Duration::from_secs(7 * 24 * 60 * 60),
max_bytes: 64 * MIB,
discard: DiscardPolicy::Old,
..Default::default()
})
.await
.with_context(|| format!("create_or_update_stream {STREAM_EXEC}"))?;
info!(stream = STREAM_EXEC, "ready");
js.create_or_update_stream(StreamConfig {
name: STREAM_EVENTS.into(),
subjects: vec!["events.>".into()],
max_age: Duration::from_secs(7 * 24 * 60 * 60),
max_bytes: 256 * MIB,
discard: DiscardPolicy::Old,
..Default::default()
})
.await
.with_context(|| format!("create_or_update_stream {STREAM_EVENTS}"))?;
info!(stream = STREAM_EVENTS, "ready");
js.create_or_update_stream(StreamConfig {
name: STREAM_AUDIT.into(),
subjects: vec!["audit.>".into()],
max_age: Duration::from_secs(90 * 24 * 60 * 60),
max_bytes: 512 * MIB,
discard: DiscardPolicy::Old,
..Default::default()
})
.await
.with_context(|| format!("create_or_update_stream {STREAM_AUDIT}"))?;
info!(stream = STREAM_AUDIT, "ready");
const SECS_PER_DAY: u64 = 24 * 60 * 60;
const OBS_EVENTS_RETENTION_DAYS: u64 = 90;
js.create_or_update_stream(StreamConfig {
name: STREAM_OBS_EVENTS.into(),
subjects: vec!["obs.>".into()],
max_age: Duration::from_secs(OBS_EVENTS_RETENTION_DAYS * SECS_PER_DAY),
max_bytes: 512 * MIB,
discard: DiscardPolicy::Old,
..Default::default()
})
.await
.with_context(|| format!("create_or_update_stream {STREAM_OBS_EVENTS}"))?;
info!(stream = STREAM_OBS_EVENTS, "ready");
js.create_or_update_stream(StreamConfig {
name: STREAM_NOTIFICATIONS.into(),
subjects: vec!["notifications.>".into()],
max_age: Duration::from_secs(90 * 24 * 60 * 60),
max_bytes: 512 * MIB,
discard: DiscardPolicy::Old,
..Default::default()
})
.await
.with_context(|| format!("create_or_update_stream {STREAM_NOTIFICATIONS}"))?;
info!(stream = STREAM_NOTIFICATIONS, "ready");
js.create_or_update_key_value(KvConfig {
bucket: BUCKET_SCRIPT_CURRENT.into(),
history: 5,
..Default::default()
})
.await
.with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_CURRENT}"))?;
info!(bucket = BUCKET_SCRIPT_CURRENT, "ready");
js.create_or_update_key_value(KvConfig {
bucket: BUCKET_SCRIPT_STATUS.into(),
history: 5,
..Default::default()
})
.await
.with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_STATUS}"))?;
info!(bucket = BUCKET_SCRIPT_STATUS, "ready");
js.create_or_update_key_value(KvConfig {
bucket: BUCKET_AGENTS_STATE.into(),
history: 1,
..Default::default()
})
.await
.with_context(|| format!("create_or_update_key_value {BUCKET_AGENTS_STATE}"))?;
info!(bucket = BUCKET_AGENTS_STATE, "ready");
js.create_or_update_key_value(KvConfig {
bucket: BUCKET_AGENT_CONFIG.into(),
history: 5,
..Default::default()
})
.await
.with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_CONFIG}"))?;
info!(bucket = BUCKET_AGENT_CONFIG, "ready");
js.create_or_update_key_value(KvConfig {
bucket: BUCKET_AGENT_GROUPS.into(),
history: 5,
..Default::default()
})
.await
.with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_GROUPS}"))?;
info!(bucket = BUCKET_AGENT_GROUPS, "ready");
js.create_or_update_key_value(KvConfig {
bucket: BUCKET_SCHEDULES.into(),
history: 5,
..Default::default()
})
.await
.with_context(|| format!("create_or_update_key_value {BUCKET_SCHEDULES}"))?;
info!(bucket = BUCKET_SCHEDULES, "ready");
js.create_or_update_key_value(KvConfig {
bucket: BUCKET_JOBS.into(),
history: 5,
..Default::default()
})
.await
.with_context(|| format!("create_or_update_key_value {BUCKET_JOBS}"))?;
info!(bucket = BUCKET_JOBS, "ready");
js.create_or_update_key_value(KvConfig {
bucket: BUCKET_FLEET_CONFIG.into(),
history: 1,
..Default::default()
})
.await
.with_context(|| format!("create_or_update_key_value {BUCKET_FLEET_CONFIG}"))?;
info!(bucket = BUCKET_FLEET_CONFIG, "ready");
js.create_or_update_key_value(KvConfig {
bucket: BUCKET_NOTIFICATIONS_READ.into(),
history: 1,
..Default::default()
})
.await
.with_context(|| format!("create_or_update_key_value {BUCKET_NOTIFICATIONS_READ}"))?;
info!(bucket = BUCKET_NOTIFICATIONS_READ, "ready");
js.create_or_update_key_value(KvConfig {
bucket: BUCKET_JOBS_YAML.into(),
history: 5,
..Default::default()
})
.await
.with_context(|| format!("create_or_update_key_value {BUCKET_JOBS_YAML}"))?;
info!(bucket = BUCKET_JOBS_YAML, "ready");
js.create_or_update_key_value(KvConfig {
bucket: BUCKET_SCHEDULES_YAML.into(),
history: 5,
..Default::default()
})
.await
.with_context(|| format!("create_or_update_key_value {BUCKET_SCHEDULES_YAML}"))?;
info!(bucket = BUCKET_SCHEDULES_YAML, "ready");
ensure_object_store(
js,
ObjectStoreConfig {
bucket: OBJECT_AGENT_RELEASES.into(),
..Default::default()
},
)
.await?;
ensure_object_store(
js,
ObjectStoreConfig {
bucket: OBJECT_APP_PACKAGES.into(),
..Default::default()
},
)
.await?;
ensure_object_store(
js,
ObjectStoreConfig {
bucket: OBJECT_SCRIPTS.into(),
..Default::default()
},
)
.await?;
ensure_object_store(
js,
ObjectStoreConfig {
bucket: OBJECT_RESULT_OUTPUT.into(),
max_age: Duration::from_secs(SECS_PER_DAY * 30),
max_bytes: GIB,
..Default::default()
},
)
.await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::process::Stdio;
struct Broker {
js: jetstream::Context,
_server: tokio::process::Child,
_storage: tempfile::TempDir,
}
async fn spawn_broker() -> Broker {
let port = portpicker::pick_unused_port().expect("pick port");
let storage = tempfile::TempDir::new().expect("storage tempdir");
let server = tokio::process::Command::new("nats-server")
.arg("-js")
.arg("-p")
.arg(port.to_string())
.arg("-sd")
.arg(storage.path())
.stdout(Stdio::null())
.stderr(Stdio::null())
.kill_on_drop(true)
.spawn()
.expect("spawn nats-server (is it in PATH?)");
let url = format!("nats://127.0.0.1:{port}");
let mut client = None;
for _ in 0..50 {
if let Ok(c) = async_nats::connect(&url).await {
client = Some(c);
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
Broker {
js: jetstream::new(client.expect("nats-server did not come up in 5s")),
_server: server,
_storage: storage,
}
}
#[tokio::test]
#[ignore = "requires nats-server in PATH; cargo test -- --ignored"]
async fn ensure_object_store_accepts_config_drift() {
let b = spawn_broker().await;
ensure_object_store(
&b.js,
ObjectStoreConfig {
bucket: "result_output".into(),
..Default::default()
},
)
.await
.expect("fresh create");
ensure_object_store(
&b.js,
ObjectStoreConfig {
bucket: "result_output".into(),
max_bytes: 1024 * 1024 * 1024,
..Default::default()
},
)
.await
.expect("config drift must not wedge startup");
let store = b.js.get_object_store("result_output").await.expect("store");
store
.put("k", &mut &b"hi"[..])
.await
.expect("put after drift");
}
#[tokio::test]
#[ignore = "requires nats-server in PATH; cargo test -- --ignored"]
async fn ensure_object_store_fresh_create_with_cap() {
let b = spawn_broker().await;
ensure_object_store(
&b.js,
ObjectStoreConfig {
bucket: "fresh".into(),
max_bytes: 64 * 1024 * 1024,
..Default::default()
},
)
.await
.expect("fresh capped create");
b.js.get_object_store("fresh").await.expect("exists");
}
#[tokio::test]
#[ignore = "requires nats-server in PATH; cargo test -- --ignored"]
async fn ensure_object_store_propagates_when_no_fallback() {
let b = spawn_broker().await;
let err = ensure_object_store(
&b.js,
ObjectStoreConfig {
bucket: "bad name!".into(),
..Default::default()
},
)
.await
.expect_err("a create failure with no existing store must be fatal");
assert!(
err.to_string()
.contains("no existing store to fall back to"),
"unexpected error: {err:#}",
);
}
}