use std::collections::{BTreeMap, HashMap};
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf};
use anyhow::{bail, Context, Result};
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
use base64::Engine as _;
use cellos_core::{
parse_trust_verify_keys, verify_signed_event_envelope, CellStateProjection, CellStateSnapshot,
CloudEventV1, SignedEventEnvelopeV1,
};
use ed25519_dalek::VerifyingKey;
const SIGNED_ENVELOPE_TRANSPORT_TYPE: &str = "dev.cellos.events.signed_envelope.v1";
fn main() -> Result<()> {
if cellos_projector::build_info::print_version_if_requested(
"cellos-projector",
env!("CARGO_PKG_VERSION"),
) {
return Ok(());
}
let cli = Cli::parse(std::env::args().skip(1))?;
let snapshots = project_jsonl(&cli.path)?;
if let Some(cell_id) = cli.cell_id.as_deref() {
let snapshot = snapshots
.values()
.find(|snapshot| snapshot.cell_id.as_deref() == Some(cell_id))
.with_context(|| format!("no snapshot found for cell-id {cell_id:?}"))?;
print_snapshot(snapshot, cli.pretty)?;
return Ok(());
}
if let Some(spec_id) = cli.spec_id.as_deref() {
let snapshot = snapshots
.values()
.find(|snapshot| snapshot.spec_id.as_deref() == Some(spec_id))
.with_context(|| format!("no snapshot found for spec-id {spec_id:?}"))?;
print_snapshot(snapshot, cli.pretty)?;
return Ok(());
}
if cli.pretty {
println!("{}", serde_json::to_string_pretty(&snapshots)?);
} else {
println!("{}", serde_json::to_string(&snapshots)?);
}
Ok(())
}
#[derive(Debug, Default)]
struct Cli {
path: PathBuf,
cell_id: Option<String>,
spec_id: Option<String>,
pretty: bool,
}
impl Cli {
fn parse<I>(args: I) -> Result<Self>
where
I: IntoIterator<Item = String>,
{
let mut cli = Cli::default();
let mut args = args.into_iter();
while let Some(arg) = args.next() {
match arg.as_str() {
"--cell-id" => {
cli.cell_id = Some(args.next().context("--cell-id requires a value")?);
}
"--spec-id" => {
cli.spec_id = Some(args.next().context("--spec-id requires a value")?);
}
"--pretty" => cli.pretty = true,
"-h" | "--help" => {
print_usage();
std::process::exit(0);
}
value if cli.path.as_os_str().is_empty() => {
cli.path = PathBuf::from(value);
}
_ => bail!("unexpected argument {arg:?}"),
}
}
if cli.path.as_os_str().is_empty() {
bail!(
"usage: cellos-projector <events.jsonl> [--cell-id ID | --spec-id ID] [--pretty]"
);
}
if cli.cell_id.is_some() && cli.spec_id.is_some() {
bail!("use only one of --cell-id or --spec-id");
}
Ok(cli)
}
}
fn print_usage() {
eprintln!("usage: cellos-projector <events.jsonl> [--cell-id ID | --spec-id ID] [--pretty]");
}
fn project_jsonl(path: &Path) -> Result<BTreeMap<String, CellStateSnapshot>> {
let file = File::open(path).with_context(|| format!("open {}", path.display()))?;
let reader = BufReader::new(file);
let mut projections: BTreeMap<String, CellStateProjection> = BTreeMap::new();
let verifying_keys = load_verifying_keys()?;
let hmac_keys = load_hmac_keys()?;
for (index, line) in reader.lines().enumerate() {
let line = line.with_context(|| format!("read line {}", index + 1))?;
if line.trim().is_empty() {
continue;
}
let event: CloudEventV1 = serde_json::from_str(&line)
.with_context(|| format!("parse CloudEvent JSON on line {}", index + 1))?;
let projected = unwrap_signed_envelope(&event, &verifying_keys, &hmac_keys)
.with_context(|| format!("unwrap signed envelope on line {}", index + 1))?;
let key = event_projection_key(&projected)?;
let projection = projections.entry(key).or_default();
projection
.apply(&projected)
.with_context(|| format!("apply event {:?} on line {}", projected.ty, index + 1))?;
}
Ok(projections
.into_iter()
.map(|(key, projection)| (key, projection.snapshot()))
.collect())
}
fn unwrap_signed_envelope(
event: &CloudEventV1,
verifying_keys: &HashMap<String, VerifyingKey>,
hmac_keys: &HashMap<String, Vec<u8>>,
) -> Result<CloudEventV1> {
if event.ty != SIGNED_ENVELOPE_TRANSPORT_TYPE {
return Ok(event.clone());
}
let data = event
.data
.as_ref()
.with_context(|| format!("wrapper event {:?} has no data payload", event.ty))?;
let envelope: SignedEventEnvelopeV1 = serde_json::from_value(data.clone())
.with_context(|| "parse SignedEventEnvelopeV1 from wrapper data")?;
let any_keys_configured = !verifying_keys.is_empty() || !hmac_keys.is_empty();
if any_keys_configured {
verify_signed_event_envelope(&envelope, verifying_keys, hmac_keys)
.map_err(|e| anyhow::anyhow!("verify signed envelope: {e}"))?;
}
Ok(envelope.event)
}
fn load_verifying_keys() -> Result<HashMap<String, VerifyingKey>> {
let Some(path) = std::env::var("CELLOS_EVENT_VERIFY_KEYS_PATH").ok() else {
return Ok(HashMap::new());
};
if path.trim().is_empty() {
return Ok(HashMap::new());
}
let raw = std::fs::read_to_string(&path)
.with_context(|| format!("read CELLOS_EVENT_VERIFY_KEYS_PATH={path}"))?;
parse_trust_verify_keys(&raw)
.map_err(|e| anyhow::anyhow!("parse CELLOS_EVENT_VERIFY_KEYS_PATH={path}: {e}"))
}
fn load_hmac_keys() -> Result<HashMap<String, Vec<u8>>> {
let Some(path) = std::env::var("CELLOS_EVENT_VERIFY_HMAC_KEYS_PATH").ok() else {
return Ok(HashMap::new());
};
if path.trim().is_empty() {
return Ok(HashMap::new());
}
let raw = std::fs::read_to_string(&path)
.with_context(|| format!("read CELLOS_EVENT_VERIFY_HMAC_KEYS_PATH={path}"))?;
let value: serde_json::Value = serde_json::from_str(&raw)
.with_context(|| format!("parse JSON CELLOS_EVENT_VERIFY_HMAC_KEYS_PATH={path}"))?;
let object = value
.as_object()
.with_context(|| "hmac verify keys: top-level value must be a JSON object")?;
let mut out: HashMap<String, Vec<u8>> = HashMap::with_capacity(object.len());
for (kid, val) in object {
let s = val
.as_str()
.with_context(|| format!("hmac verify keys: value for kid {kid:?} must be a string"))?;
let trimmed = s.trim_end_matches('=');
let bytes = URL_SAFE_NO_PAD
.decode(trimmed)
.with_context(|| format!("hmac verify keys: kid {kid:?} not valid base64url"))?;
if bytes.is_empty() {
bail!("hmac verify keys: kid {kid:?} decoded to empty");
}
out.insert(kid.clone(), bytes);
}
Ok(out)
}
fn event_projection_key(event: &CloudEventV1) -> Result<String> {
let Some(data) = event.data.as_ref() else {
bail!("event {:?} is missing data payload", event.ty);
};
if let Some(cell_id) = data.get("cellId").and_then(|value| value.as_str()) {
return Ok(format!("cell:{cell_id}"));
}
if let Some(spec_id) = data.get("specId").and_then(|value| value.as_str()) {
return Ok(format!("spec:{spec_id}"));
}
bail!("event {:?} is not a cell-scoped event", event.ty)
}
fn print_snapshot(snapshot: &CellStateSnapshot, pretty: bool) -> Result<()> {
if pretty {
println!("{}", serde_json::to_string_pretty(snapshot)?);
} else {
println!("{}", serde_json::to_string(snapshot)?);
}
Ok(())
}
#[cfg(test)]
mod tests {
use std::io::Write;
use super::*;
#[test]
fn projects_jsonl_to_snapshot_map() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("events.jsonl");
let mut file = File::create(&path).unwrap();
writeln!(
file,
"{}",
serde_json::json!({
"specversion": "1.0",
"id": "1",
"source": "urn:test",
"type": "dev.cellos.events.cell.lifecycle.v1.started",
"data": { "cellId": "cell-1", "specId": "spec-1" }
})
)
.unwrap();
writeln!(
file,
"{}",
serde_json::json!({
"specversion": "1.0",
"id": "2",
"source": "urn:test",
"type": "dev.cellos.events.cell.command.v1.completed",
"data": { "cellId": "cell-1", "specId": "spec-1", "exitCode": 0 }
})
)
.unwrap();
let snapshots = project_jsonl(&path).unwrap();
let snapshot = snapshots.get("cell:cell-1").unwrap();
assert_eq!(
snapshot.current_state,
cellos_core::ProjectionCurrentState::CommandSucceeded
);
assert_eq!(snapshot.processed_events, 2);
}
#[test]
fn version_compiles() {
let _ = cellos_projector::build_info::version_line(
"cellos-projector",
env!("CARGO_PKG_VERSION"),
);
}
#[test]
fn cli_rejects_both_filters() {
let err = Cli::parse([
"events.jsonl".to_string(),
"--cell-id".to_string(),
"a".to_string(),
"--spec-id".to_string(),
"b".to_string(),
])
.unwrap_err();
assert!(err.to_string().contains("only one"));
}
}