use std::collections::{BTreeMap, VecDeque};
use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use async_nats::jetstream;
use bytes::Bytes;
use cellos_core::{CellStateProjection, CellStateSnapshot, CloudEventV1};
use cellos_projector::{decode_event, EventVerifierConfig};
use futures_util::StreamExt;
use http_body_util::Full;
use hyper::body::Incoming;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Method, Request, Response, StatusCode};
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;
use tracing::{error, info, warn};
type StateMap = Arc<Mutex<BTreeMap<String, CellStateProjection>>>;
type ComplianceBuffer = Arc<Mutex<VecDeque<CloudEventV1>>>;
pub const COMPLIANCE_BUFFER_MAX: usize = 100_000;
pub const COMPLIANCE_SUMMARY_EVENT_TYPE: &str = "dev.cellos.events.cell.compliance.v1.summary";
#[tokio::main]
async fn main() -> anyhow::Result<()> {
if cellos_projector::build_info::print_version_if_requested(
"cellos-state-server",
env!("CARGO_PKG_VERSION"),
) {
return Ok(());
}
{
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::Layer;
let fmt_layer = tracing_subscriber::fmt::layer()
.with_filter(cellos_core::observability::redacted_filter());
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::from_default_env())
.with(fmt_layer)
.init();
}
let nats_url =
std::env::var("CELLOS_NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".into());
let stream_name = std::env::var("CELLOS_NATS_STREAM").unwrap_or_else(|_| "CELLOS".into());
let consumer_name =
std::env::var("CELLOS_NATS_CONSUMER").unwrap_or_else(|_| "cellos-state".into());
let event_subject =
std::env::var("CELLOS_EVENT_SUBJECT").unwrap_or_else(|_| "cellos.events.>".into());
let listen_addr: SocketAddr = std::env::var("CELLOS_STATE_ADDR")
.unwrap_or_else(|_| "0.0.0.0:8080".into())
.parse()?;
let verifier_cfg = EventVerifierConfig::from_env()
.map_err(|e| anyhow::anyhow!("event-verifier configuration: {e}"))?;
if verifier_cfg.has_keys() {
info!(
keys = verifier_cfg.verifying_keys.len(),
require_signed = verifier_cfg.require_signed,
"I5 per-event signing: verifier active"
);
}
let state: StateMap = Arc::new(Mutex::new(BTreeMap::new()));
let compliance: ComplianceBuffer = Arc::new(Mutex::new(VecDeque::new()));
info!(%nats_url, %stream_name, %consumer_name, %event_subject, "connecting to NATS JetStream");
let conn = async_nats::connect(&nats_url)
.await
.map_err(|e| anyhow::anyhow!("NATS connect: {e}"))?;
let js = jetstream::new(conn);
let stream = js
.get_stream(&stream_name)
.await
.map_err(|e| anyhow::anyhow!("get JetStream stream {stream_name:?}: {e}"))?;
let consumer = stream
.create_consumer(build_consumer_config(&consumer_name, &event_subject))
.await
.map_err(|e| anyhow::anyhow!("create JetStream consumer: {e}"))?;
let mut messages = consumer
.messages()
.await
.map_err(|e| anyhow::anyhow!("consumer messages stream: {e}"))?;
let state_bg = state.clone();
let compliance_bg = compliance.clone();
let verifier_bg = verifier_cfg.clone();
tokio::spawn(async move {
info!("JetStream consumer started — replaying from beginning");
let mut applied: u64 = 0;
while let Some(result) = messages.next().await {
match result {
Ok(msg) => {
if let Err(e) = msg.ack().await {
warn!(error = %e, "ack failed");
}
match decode_event(&msg.payload, &verifier_bg) {
Ok(event) => {
apply_event(&state_bg, &event);
push_compliance_event(&compliance_bg, &event);
applied += 1;
if applied.is_multiple_of(500) {
info!(applied, "projection replay progress");
}
}
Err(e) => warn!(error = %e, "skip undecodable/unverifiable payload"),
}
}
Err(e) => error!(error = %e, "consumer stream error"),
}
}
info!("JetStream consumer stream ended");
});
info!(%listen_addr, "starting HTTP state server");
let listener = TcpListener::bind(listen_addr).await?;
loop {
let (tcp_stream, remote_addr) = listener.accept().await?;
let state = state.clone();
let compliance = compliance.clone();
tokio::spawn(async move {
if let Err(e) = http1::Builder::new()
.serve_connection(
TokioIo::new(tcp_stream),
service_fn(move |req| serve(req, state.clone(), compliance.clone())),
)
.await
{
tracing::debug!(%remote_addr, error = %e, "HTTP connection closed");
}
});
}
}
pub fn push_compliance_event(buffer: &ComplianceBuffer, event: &CloudEventV1) {
if event.ty != COMPLIANCE_SUMMARY_EVENT_TYPE {
return;
}
let mut buf = buffer.lock().unwrap();
if buf.len() >= COMPLIANCE_BUFFER_MAX {
buf.pop_front();
}
buf.push_back(event.clone());
}
pub fn build_consumer_config(
consumer_name: &str,
event_subject: &str,
) -> jetstream::consumer::pull::Config {
jetstream::consumer::pull::Config {
durable_name: Some(consumer_name.to_string()),
deliver_policy: jetstream::consumer::DeliverPolicy::All,
filter_subject: event_subject.to_string(),
..Default::default()
}
}
pub fn apply_event(state: &StateMap, event: &CloudEventV1) {
let Some(key) = event_key(event) else { return };
let mut map = state.lock().unwrap();
let projection = map.entry(key.clone()).or_default();
if let Err(e) = projection.apply(event) {
warn!(
key = %key,
event_type = %event.ty,
error = %e,
"projection rejected event — skipping"
);
}
}
fn event_key(event: &CloudEventV1) -> Option<String> {
let data = event.data.as_ref()?;
if let Some(id) = data.get("cellId").and_then(|v| v.as_str()) {
return Some(format!("cell:{id}"));
}
if let Some(id) = data.get("specId").and_then(|v| v.as_str()) {
return Some(format!("spec:{id}"));
}
None
}
async fn serve(
req: Request<Incoming>,
state: StateMap,
compliance: ComplianceBuffer,
) -> Result<Response<Full<Bytes>>, Infallible> {
let method = req.method().clone();
let path = req.uri().path().to_owned();
let query = req.uri().query();
Ok(route_with_query(&method, &path, query, &state, &compliance))
}
pub fn route(
method: &Method,
path: &str,
state: &StateMap,
compliance: &ComplianceBuffer,
) -> Response<Full<Bytes>> {
route_with_query(method, path, None, state, compliance)
}
fn route_with_query(
method: &Method,
path: &str,
query: Option<&str>,
state: &StateMap,
compliance: &ComplianceBuffer,
) -> Response<Full<Bytes>> {
if method != Method::GET {
return Response::builder()
.status(StatusCode::METHOD_NOT_ALLOWED)
.body(Full::new(Bytes::new()))
.unwrap();
}
match path {
"/healthz" => json_response(StatusCode::OK, serde_json::json!({"status": "ok"})),
"/cells" => {
let filters = CellListFilters::from_query(query);
let snapshots: BTreeMap<String, CellStateSnapshot> = {
let map = state.lock().unwrap();
map.iter()
.filter_map(|(k, v)| {
let snapshot = v.snapshot();
filters.matches(&snapshot).then(|| (k.clone(), snapshot))
})
.collect()
};
json_response(StatusCode::OK, serde_json::json!({"cells": snapshots}))
}
"/compliance/export" => {
let filters = ComplianceExportFilters::from_query(query);
let buf = compliance.lock().unwrap();
let matched: Vec<&CloudEventV1> = buf.iter().filter(|ev| filters.matches(ev)).collect();
let mut body = String::new();
for ev in matched {
if let Ok(line) = serde_json::to_string(ev) {
body.push_str(&line);
body.push('\n');
}
}
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/x-ndjson")
.body(Full::new(Bytes::from(body)))
.unwrap()
}
_ if path.starts_with("/cells/") => {
let id = &path["/cells/".len()..];
if id.is_empty() {
return json_response(
StatusCode::NOT_FOUND,
serde_json::json!({"error": "not found"}),
);
}
let key = format!("cell:{id}");
let snapshot = {
let map = state.lock().unwrap();
map.get(&key).map(|p| p.snapshot())
};
match snapshot {
Some(s) => json_response(
StatusCode::OK,
serde_json::to_value(s).unwrap_or(serde_json::Value::Null),
),
None => json_response(
StatusCode::NOT_FOUND,
serde_json::json!({"error": "cell not found", "id": id}),
),
}
}
_ => json_response(
StatusCode::NOT_FOUND,
serde_json::json!({"error": "not found"}),
),
}
}
#[derive(Default)]
struct ComplianceExportFilters {
tenant_id: Option<String>,
from: Option<chrono::DateTime<chrono::Utc>>,
to: Option<chrono::DateTime<chrono::Utc>>,
}
impl ComplianceExportFilters {
fn from_query(query: Option<&str>) -> Self {
let mut filters = Self::default();
let Some(query) = query else {
return filters;
};
for pair in query.split('&') {
let mut parts = pair.splitn(2, '=');
let Some(key) = parts.next() else {
continue;
};
let Some(value) = parts.next() else {
continue;
};
if value.is_empty() {
continue;
}
let decoded = value.replace('+', " ");
match key {
"tenantId" => filters.tenant_id = Some(decoded),
"from" => {
if let Ok(t) = chrono::DateTime::parse_from_rfc3339(&decoded) {
filters.from = Some(t.with_timezone(&chrono::Utc));
}
}
"to" => {
if let Ok(t) = chrono::DateTime::parse_from_rfc3339(&decoded) {
filters.to = Some(t.with_timezone(&chrono::Utc));
}
}
_ => {}
}
}
filters
}
fn matches(&self, event: &CloudEventV1) -> bool {
if let Some(expected) = self.tenant_id.as_deref() {
let actual = event
.data
.as_ref()
.and_then(|d| d.get("tenantId"))
.and_then(|v| v.as_str());
if actual != Some(expected) {
return false;
}
}
if self.from.is_some() || self.to.is_some() {
let Some(ev_time) = event
.time
.as_deref()
.and_then(|t| chrono::DateTime::parse_from_rfc3339(t).ok())
.map(|t| t.with_timezone(&chrono::Utc))
else {
return false;
};
if let Some(from) = self.from {
if ev_time < from {
return false;
}
}
if let Some(to) = self.to {
if ev_time > to {
return false;
}
}
}
true
}
}
#[derive(Default)]
struct CellListFilters {
pool_id: Option<String>,
kubernetes_namespace: Option<String>,
queue_name: Option<String>,
}
impl CellListFilters {
fn from_query(query: Option<&str>) -> Self {
let mut filters = Self::default();
let Some(query) = query else {
return filters;
};
for pair in query.split('&') {
let mut parts = pair.splitn(2, '=');
let Some(key) = parts.next() else {
continue;
};
let Some(value) = parts.next() else {
continue;
};
if value.is_empty() {
continue;
}
match key {
"poolId" => filters.pool_id = Some(value.to_string()),
"kubernetesNamespace" => filters.kubernetes_namespace = Some(value.to_string()),
"queueName" => filters.queue_name = Some(value.to_string()),
_ => {}
}
}
filters
}
fn matches(&self, snapshot: &CellStateSnapshot) -> bool {
let placement = snapshot.placement.as_ref();
matches_optional(
self.pool_id.as_deref(),
placement.and_then(|value| value.pool_id.as_deref()),
) && matches_optional(
self.kubernetes_namespace.as_deref(),
placement.and_then(|value| value.kubernetes_namespace.as_deref()),
) && matches_optional(
self.queue_name.as_deref(),
placement.and_then(|value| value.queue_name.as_deref()),
)
}
}
fn matches_optional(expected: Option<&str>, observed: Option<&str>) -> bool {
match expected {
Some(expected) => observed == Some(expected),
None => true,
}
}
fn json_response(status: StatusCode, body: serde_json::Value) -> Response<Full<Bytes>> {
let bytes = serde_json::to_vec(&body).unwrap_or_default();
Response::builder()
.status(status)
.header("Content-Type", "application/json")
.body(Full::new(Bytes::from(bytes)))
.unwrap()
}
#[cfg(test)]
mod tests {
use super::*;
use http_body_util::BodyExt;
use serde_json::json;
#[test]
fn version_compiles() {
let _ = cellos_projector::build_info::version_line(
"cellos-state-server",
env!("CARGO_PKG_VERSION"),
);
}
fn make_event(id: &str, ty: &str, data: serde_json::Value) -> CloudEventV1 {
serde_json::from_value(json!({
"specversion": "1.0",
"id": id,
"source": "urn:test",
"type": ty,
"data": data
}))
.unwrap()
}
fn empty_state() -> StateMap {
Arc::new(Mutex::new(BTreeMap::new()))
}
fn empty_compliance() -> ComplianceBuffer {
Arc::new(Mutex::new(VecDeque::new()))
}
async fn body_bytes(resp: Response<Full<Bytes>>) -> Bytes {
resp.into_body().collect().await.unwrap().to_bytes()
}
#[test]
fn apply_event_creates_projection_keyed_by_cell_id() {
let state = empty_state();
let event = make_event(
"1",
"dev.cellos.events.cell.lifecycle.v1.started",
json!({"cellId": "cell-42", "specId": "spec-1"}),
);
apply_event(&state, &event);
let map = state.lock().unwrap();
assert!(map.contains_key("cell:cell-42"), "key not found");
assert_eq!(map["cell:cell-42"].cell_id.as_deref(), Some("cell-42"));
}
#[test]
fn apply_event_creates_projection_keyed_by_spec_id_when_no_cell_id() {
let state = empty_state();
let event = make_event(
"1",
"dev.cellos.events.cell.lifecycle.v1.started",
json!({"specId": "spec-77"}),
);
apply_event(&state, &event);
let map = state.lock().unwrap();
assert!(map.contains_key("spec:spec-77"), "spec key not found");
}
#[test]
fn apply_event_ignores_events_without_cell_or_spec_id() {
let state = empty_state();
let event = make_event(
"1",
"dev.cellos.events.cell.lifecycle.v1.started",
json!({"unrelated": "field"}),
);
apply_event(&state, &event);
assert!(state.lock().unwrap().is_empty());
}
#[test]
fn apply_event_skips_illegal_transitions_without_panicking() {
let state = empty_state();
let event = make_event(
"1",
"dev.cellos.events.cell.lifecycle.v1.destroyed",
json!({"cellId": "cell-99", "specId": "spec-1", "outcome": "succeeded"}),
);
apply_event(&state, &event); }
#[test]
fn apply_event_deduplicates_by_event_id() {
let state = empty_state();
let event = make_event(
"dup-id",
"dev.cellos.events.cell.lifecycle.v1.started",
json!({"cellId": "cell-5", "specId": "spec-1"}),
);
apply_event(&state, &event);
apply_event(&state, &event); let map = state.lock().unwrap();
assert_eq!(map["cell:cell-5"].processed_events, 1);
}
#[test]
fn healthz_returns_200() {
let state = empty_state();
let resp = route(&Method::GET, "/healthz", &state, &empty_compliance());
assert_eq!(resp.status(), StatusCode::OK);
}
#[tokio::test]
async fn cells_returns_empty_map_when_no_events() {
let state = empty_state();
let resp = route(&Method::GET, "/cells", &state, &empty_compliance());
assert_eq!(resp.status(), StatusCode::OK);
let body = body_bytes(resp).await;
let parsed: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(parsed["cells"], json!({}));
}
#[tokio::test]
async fn cells_returns_snapshot_after_event() {
let state = empty_state();
apply_event(
&state,
&make_event(
"1",
"dev.cellos.events.cell.lifecycle.v1.started",
json!({
"cellId": "cell-1",
"specId": "spec-1",
"placement": {
"poolId": "runner-pool-amd64",
"queueName": "ci-high"
}
}),
),
);
let resp = route(&Method::GET, "/cells", &state, &empty_compliance());
let body = body_bytes(resp).await;
let parsed: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert!(parsed["cells"]["cell:cell-1"].is_object());
assert_eq!(
parsed["cells"]["cell:cell-1"]["placement"]["poolId"],
"runner-pool-amd64"
);
}
#[tokio::test]
async fn cells_filters_by_queue_name() {
let state = empty_state();
apply_event(
&state,
&make_event(
"1",
"dev.cellos.events.cell.lifecycle.v1.started",
json!({
"cellId": "cell-1",
"specId": "spec-1",
"placement": {
"poolId": "runner-pool-amd64",
"queueName": "ci-high"
}
}),
),
);
apply_event(
&state,
&make_event(
"2",
"dev.cellos.events.cell.lifecycle.v1.started",
json!({
"cellId": "cell-2",
"specId": "spec-2",
"placement": {
"poolId": "runner-pool-amd64",
"queueName": "ci-low"
}
}),
),
);
let resp = route_with_query(
&Method::GET,
"/cells",
Some("queueName=ci-high"),
&state,
&empty_compliance(),
);
let body = body_bytes(resp).await;
let parsed: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert!(parsed["cells"]["cell:cell-1"].is_object());
assert!(parsed["cells"].get("cell:cell-2").is_none());
}
#[tokio::test]
async fn cells_filters_by_multiple_placement_fields() {
let state = empty_state();
apply_event(
&state,
&make_event(
"1",
"dev.cellos.events.cell.lifecycle.v1.started",
json!({
"cellId": "cell-1",
"specId": "spec-1",
"placement": {
"poolId": "runner-pool-amd64",
"kubernetesNamespace": "cellos-prod",
"queueName": "ci-high"
}
}),
),
);
apply_event(
&state,
&make_event(
"2",
"dev.cellos.events.cell.lifecycle.v1.started",
json!({
"cellId": "cell-2",
"specId": "spec-2",
"placement": {
"poolId": "runner-pool-arm64",
"kubernetesNamespace": "cellos-prod",
"queueName": "ci-high"
}
}),
),
);
let resp = route_with_query(
&Method::GET,
"/cells",
Some("queueName=ci-high&poolId=runner-pool-amd64"),
&state,
&empty_compliance(),
);
let body = body_bytes(resp).await;
let parsed: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert!(parsed["cells"]["cell:cell-1"].is_object());
assert!(parsed["cells"].get("cell:cell-2").is_none());
}
#[tokio::test]
async fn cells_filter_excludes_snapshots_without_placement() {
let state = empty_state();
apply_event(
&state,
&make_event(
"1",
"dev.cellos.events.cell.lifecycle.v1.started",
json!({
"cellId": "cell-1",
"specId": "spec-1"
}),
),
);
let resp = route_with_query(
&Method::GET,
"/cells",
Some("queueName=ci-high"),
&state,
&empty_compliance(),
);
let body = body_bytes(resp).await;
let parsed: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(parsed["cells"], json!({}));
}
#[test]
fn cells_by_id_returns_404_for_unknown_cell() {
let state = empty_state();
let resp = route(
&Method::GET,
"/cells/nonexistent",
&state,
&empty_compliance(),
);
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn cells_by_id_returns_snapshot_for_known_cell() {
let state = empty_state();
apply_event(
&state,
&make_event(
"1",
"dev.cellos.events.cell.lifecycle.v1.started",
json!({
"cellId": "cell-abc",
"specId": "spec-1",
"placement": {
"poolId": "runner-pool-amd64",
"queueName": "ci-high"
}
}),
),
);
let resp = route(&Method::GET, "/cells/cell-abc", &state, &empty_compliance());
assert_eq!(resp.status(), StatusCode::OK);
let body = body_bytes(resp).await;
let parsed: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(parsed["cellId"].as_str(), Some("cell-abc"));
assert_eq!(parsed["placement"]["queueName"], "ci-high");
}
#[test]
fn non_get_returns_405() {
let state = empty_state();
let resp = route(&Method::POST, "/cells", &state, &empty_compliance());
assert_eq!(resp.status(), StatusCode::METHOD_NOT_ALLOWED);
}
#[test]
fn unknown_path_returns_404() {
let state = empty_state();
let resp = route(&Method::GET, "/unknown", &state, &empty_compliance());
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
#[test]
fn build_consumer_config_uses_durable_name_and_subject() {
let cfg = build_consumer_config("my-consumer", "cellos.events.>");
assert_eq!(cfg.durable_name.as_deref(), Some("my-consumer"));
assert_eq!(cfg.filter_subject, "cellos.events.>");
assert!(matches!(
cfg.deliver_policy,
jetstream::consumer::DeliverPolicy::All
));
}
#[test]
fn build_consumer_config_reads_durable_name_from_env() {
const KEY: &str = "CELLOS_NATS_CONSUMER_TEST_BUILD_CFG";
unsafe {
std::env::set_var(KEY, "durable-from-env");
}
let consumer_name = std::env::var(KEY).unwrap_or_else(|_| "cellos-state".into());
let cfg = build_consumer_config(&consumer_name, "cellos.events.>");
assert_eq!(cfg.durable_name.as_deref(), Some("durable-from-env"));
unsafe {
std::env::remove_var(KEY);
}
}
fn compliance_event(id: &str, tenant: Option<&str>, time: Option<&str>) -> CloudEventV1 {
let data = match tenant {
Some(t) => json!({"cellId": format!("cell-{id}"), "specId": "spec-x", "tenantId": t}),
None => json!({"cellId": format!("cell-{id}"), "specId": "spec-x"}),
};
let mut v = json!({
"specversion": "1.0",
"id": id,
"source": "urn:test",
"type": COMPLIANCE_SUMMARY_EVENT_TYPE,
"data": data,
});
if let Some(t) = time {
v["time"] = json!(t);
}
serde_json::from_value(v).unwrap()
}
#[test]
fn push_compliance_event_only_keeps_summary_events() {
let buf = empty_compliance();
let summary = compliance_event("1", Some("tenant-a"), None);
let lifecycle = make_event(
"2",
"dev.cellos.events.cell.lifecycle.v1.started",
json!({"cellId": "cell-x", "specId": "spec-x"}),
);
push_compliance_event(&buf, &summary);
push_compliance_event(&buf, &lifecycle);
assert_eq!(buf.lock().unwrap().len(), 1);
}
#[tokio::test]
async fn compliance_export_filters_by_tenant_id() {
let state = empty_state();
let compliance = empty_compliance();
push_compliance_event(
&compliance,
&compliance_event("1", Some("tenant-a"), Some("2025-01-01T00:00:00Z")),
);
push_compliance_event(
&compliance,
&compliance_event("2", Some("tenant-b"), Some("2025-01-01T00:00:00Z")),
);
let resp = route_with_query(
&Method::GET,
"/compliance/export",
Some("tenantId=tenant-a"),
&state,
&compliance,
);
assert_eq!(resp.status(), StatusCode::OK);
let body = body_bytes(resp).await;
let text = String::from_utf8_lossy(&body);
let lines: Vec<&str> = text.lines().collect();
assert_eq!(
lines.len(),
1,
"expected exactly one tenant-a line; got {text:?}"
);
assert!(lines[0].contains("\"tenantId\":\"tenant-a\""));
assert!(!text.contains("tenant-b"));
}
#[tokio::test]
async fn compliance_export_filters_by_time_range() {
let state = empty_state();
let compliance = empty_compliance();
push_compliance_event(
&compliance,
&compliance_event("old", Some("tenant-a"), Some("2024-12-31T00:00:00Z")),
);
push_compliance_event(
&compliance,
&compliance_event("mid", Some("tenant-a"), Some("2025-06-15T00:00:00Z")),
);
push_compliance_event(
&compliance,
&compliance_event("new", Some("tenant-a"), Some("2026-01-01T00:00:00Z")),
);
let resp = route_with_query(
&Method::GET,
"/compliance/export",
Some("from=2025-01-01T00:00:00Z&to=2025-12-31T23:59:59Z"),
&state,
&compliance,
);
let body = body_bytes(resp).await;
let text = String::from_utf8_lossy(&body);
let lines: Vec<&str> = text.lines().filter(|l| !l.is_empty()).collect();
assert_eq!(
lines.len(),
1,
"only mid-range entry should match; got {text:?}"
);
assert!(lines[0].contains("\"id\":\"mid\""));
}
#[test]
fn build_consumer_config_defaults_when_env_unset() {
const KEY: &str = "CELLOS_NATS_CONSUMER_TEST_DEFAULT";
unsafe {
std::env::remove_var(KEY);
}
let consumer_name = std::env::var(KEY).unwrap_or_else(|_| "cellos-state".into());
let cfg = build_consumer_config(&consumer_name, "cellos.events.>");
assert_eq!(cfg.durable_name.as_deref(), Some("cellos-state"));
}
}