use clap::Subcommand;
use serde_json::{Value, json};
use crate::util::{
api_request, exit_error, print_json_stderr, print_json_stdout, raw_api_request_with_query,
read_json_from_file,
};
const EVENT_LIST_API_PAGE_LIMIT: u32 = 200;
#[derive(Subcommand)]
pub enum EventCommands {
Create {
#[arg(long)]
event_type: String,
#[arg(long)]
timestamp: Option<String>,
#[arg(long, required_unless_present = "data_file")]
data: Option<String>,
#[arg(long, short = 'f', conflicts_with = "data")]
data_file: Option<String>,
#[arg(long)]
idempotency_key: Option<String>,
#[arg(long, default_value = "cli")]
source: String,
#[arg(long)]
agent: Option<String>,
},
List {
#[arg(long)]
event_type: Option<String>,
#[arg(long)]
since: Option<String>,
#[arg(long)]
until: Option<String>,
#[arg(long)]
limit: Option<u32>,
#[arg(long)]
cursor: Option<String>,
},
Batch {
#[arg(long)]
file: String,
},
}
pub async fn run(api_url: &str, token: Option<&str>, command: EventCommands) -> i32 {
match command {
EventCommands::Create {
event_type,
timestamp,
data,
data_file,
idempotency_key,
source,
agent,
} => {
create(
api_url,
token,
&event_type,
timestamp.as_deref(),
data.as_deref(),
data_file.as_deref(),
idempotency_key.as_deref(),
&source,
agent.as_deref(),
)
.await
}
EventCommands::List {
event_type,
since,
until,
limit,
cursor,
} => {
list(
api_url,
token,
event_type.as_deref(),
since.as_deref(),
until.as_deref(),
limit,
cursor.as_deref(),
)
.await
}
EventCommands::Batch { file } => batch(api_url, token, &file).await,
}
}
fn build_list_query(
event_type: Option<&str>,
since: Option<&str>,
until: Option<&str>,
limit: Option<u32>,
cursor: Option<&str>,
) -> Vec<(String, String)> {
let mut query = Vec::new();
if let Some(et) = event_type {
query.push(("event_type".to_string(), et.to_string()));
}
if let Some(s) = since {
query.push(("since".to_string(), s.to_string()));
}
if let Some(u) = until {
query.push(("until".to_string(), u.to_string()));
}
if let Some(l) = limit {
query.push(("limit".to_string(), l.to_string()));
}
if let Some(c) = cursor {
query.push(("cursor".to_string(), c.to_string()));
}
query
}
fn parse_paginated_events_response(
body: Value,
) -> Result<(Vec<Value>, Option<String>, bool), String> {
let data = body
.get("data")
.and_then(Value::as_array)
.cloned()
.ok_or_else(|| "Unexpected /v1/events response: missing data array".to_string())?;
let next_cursor = body
.get("next_cursor")
.and_then(Value::as_str)
.map(ToString::to_string);
let has_more = body
.get("has_more")
.and_then(Value::as_bool)
.unwrap_or(next_cursor.is_some());
Ok((data, next_cursor, has_more))
}
fn print_raw_request_error(message: &str) -> i32 {
if message.starts_with("Invalid URL:") {
print_json_stderr(&json!({
"error": "cli_error",
"message": message,
}));
4
} else {
print_json_stderr(&json!({
"error": "connection_error",
"message": message,
"docs_hint": "Is the API server running? Check KURA_API_URL."
}));
3
}
}
async fn create(
api_url: &str,
token: Option<&str>,
event_type: &str,
timestamp: Option<&str>,
data: Option<&str>,
data_file: Option<&str>,
idempotency_key: Option<&str>,
source: &str,
agent: Option<&str>,
) -> i32 {
let data_value: serde_json::Value = if let Some(d) = data {
match serde_json::from_str(d) {
Ok(v) => v,
Err(e) => exit_error(
&format!("Invalid JSON in --data: {e}"),
Some("Provide valid JSON, e.g. --data '{\"weight_kg\":100}'"),
),
}
} else if let Some(f) = data_file {
match read_json_from_file(f) {
Ok(v) => v,
Err(e) => exit_error(&e, Some("Provide a valid JSON file or use '-' for stdin")),
}
} else {
exit_error(
"Either --data or --data-file is required",
Some("Use --data '{...}' or --data-file path.json"),
)
};
let ts = match timestamp {
Some(t) => t.to_string(),
None => chrono::Utc::now().to_rfc3339(),
};
let idem_key = idempotency_key
.map(|k| k.to_string())
.unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
let mut metadata = json!({
"source": source,
"idempotency_key": idem_key
});
if let Some(a) = agent {
metadata["agent"] = json!(a);
}
let body = json!({
"timestamp": ts,
"event_type": event_type,
"data": data_value,
"metadata": metadata
});
api_request(
api_url,
reqwest::Method::POST,
"/v1/events",
token,
Some(body),
&[],
&[],
false,
false,
)
.await
}
async fn list(
api_url: &str,
token: Option<&str>,
event_type: Option<&str>,
since: Option<&str>,
until: Option<&str>,
limit: Option<u32>,
cursor: Option<&str>,
) -> i32 {
let requested_limit = limit.unwrap_or(50).max(1);
if requested_limit <= EVENT_LIST_API_PAGE_LIMIT {
let query = build_list_query(event_type, since, until, Some(requested_limit), cursor);
return api_request(
api_url,
reqwest::Method::GET,
"/v1/events",
token,
None,
&query,
&[],
false,
false,
)
.await;
}
let mut remaining = requested_limit;
let mut page_cursor = cursor.map(ToString::to_string);
let mut all_events = Vec::new();
let (next_cursor, has_more) = loop {
let page_limit = remaining.min(EVENT_LIST_API_PAGE_LIMIT);
let query = build_list_query(
event_type,
since,
until,
Some(page_limit),
page_cursor.as_deref(),
);
let (status, body) = match raw_api_request_with_query(
api_url,
reqwest::Method::GET,
"/v1/events",
token,
&query,
)
.await
{
Ok(response) => response,
Err(message) => return print_raw_request_error(&message),
};
if !(200..=299).contains(&status) {
if status < 500 {
print_json_stderr(&body);
return 1;
}
print_json_stderr(&body);
return 2;
}
let (mut page_events, page_next_cursor, page_has_more) =
match parse_paginated_events_response(body) {
Ok(parsed) => parsed,
Err(message) => {
print_json_stderr(&json!({
"error": "server_error",
"message": message,
}));
return 2;
}
};
all_events.append(&mut page_events);
remaining = requested_limit.saturating_sub(all_events.len() as u32);
if remaining == 0 {
break (
if page_has_more {
page_next_cursor
} else {
None
},
page_has_more,
);
}
let Some(cursor_value) = page_next_cursor else {
break (None, false);
};
if !page_has_more {
break (None, false);
}
page_cursor = Some(cursor_value);
};
print_json_stdout(&json!({
"data": all_events,
"next_cursor": next_cursor,
"has_more": has_more,
}));
0
}
async fn batch(api_url: &str, token: Option<&str>, file: &str) -> i32 {
let body = match read_json_from_file(file) {
Ok(v) => v,
Err(e) => exit_error(&e, Some("Provide a JSON file with {\"events\": [...]}")),
};
api_request(
api_url,
reqwest::Method::POST,
"/v1/events/batch",
token,
Some(body),
&[],
&[],
false,
false,
)
.await
}
#[cfg(test)]
mod tests {
use super::{build_list_query, parse_paginated_events_response};
use serde_json::json;
#[test]
fn build_list_query_includes_filters_limit_and_cursor() {
let query = build_list_query(
Some("set.logged"),
Some("2026-03-01T00:00:00Z"),
Some("2026-03-02T00:00:00Z"),
Some(200),
Some("cursor-1"),
);
assert_eq!(
query,
vec![
("event_type".to_string(), "set.logged".to_string()),
("since".to_string(), "2026-03-01T00:00:00Z".to_string()),
("until".to_string(), "2026-03-02T00:00:00Z".to_string()),
("limit".to_string(), "200".to_string()),
("cursor".to_string(), "cursor-1".to_string()),
]
);
}
#[test]
fn parse_paginated_events_response_reads_contract_fields() {
let (data, next_cursor, has_more) = parse_paginated_events_response(json!({
"data": [{"id": "evt-1"}, {"id": "evt-2"}],
"next_cursor": "cursor-2",
"has_more": true
}))
.expect("response should parse");
assert_eq!(data.len(), 2);
assert_eq!(next_cursor.as_deref(), Some("cursor-2"));
assert!(has_more);
}
#[test]
fn parse_paginated_events_response_rejects_missing_data_array() {
let err = parse_paginated_events_response(json!({
"next_cursor": "cursor-2",
"has_more": true
}))
.expect_err("missing data must fail");
assert!(err.contains("missing data array"));
}
}