use std::path::PathBuf;
use clap::{Args, Subcommand};
use futures::StreamExt;
use crate::context::{resolve_profile, CliContext};
use crate::error::{generic, sdk, CliError};
use crate::prelude::{emit_stream_row, emit_value, OutputFormat};
#[derive(Subcommand, Debug)]
pub enum AuditCommand {
Recent(RecentArgs),
Stream(StreamArgs),
}
#[derive(Args, Debug)]
pub struct RecentArgs {
#[arg(short = 'n', long, default_value_t = 100)]
pub limit: usize,
#[arg(long)]
pub by_operator: Option<u64>,
#[arg(long, requires = "end_ms")]
pub start_ms: Option<u64>,
#[arg(long, requires = "start_ms")]
pub end_ms: Option<u64>,
#[arg(long)]
pub force_only: bool,
#[arg(long)]
pub since: Option<u64>,
#[arg(long)]
pub identity: Option<PathBuf>,
#[arg(long, default_value_t = crate::prelude::DEFAULT_SUPERVISOR_NODE)]
pub node: u64,
}
#[derive(Args, Debug)]
pub struct StreamArgs {
#[arg(long)]
pub by_operator: Option<u64>,
#[arg(long)]
pub force_only: bool,
#[arg(long)]
pub since: Option<u64>,
#[arg(long)]
pub identity: Option<PathBuf>,
#[arg(long, default_value_t = crate::prelude::DEFAULT_SUPERVISOR_NODE)]
pub node: u64,
}
pub async fn run(
cmd: AuditCommand,
output: Option<OutputFormat>,
config_path: Option<&std::path::Path>,
profile_name: &str,
) -> Result<(), CliError> {
match cmd {
AuditCommand::Recent(args) => run_recent(args, output, config_path, profile_name).await,
AuditCommand::Stream(args) => run_stream(args, output, config_path, profile_name).await,
}
}
async fn run_recent(
args: RecentArgs,
output: Option<OutputFormat>,
config_path: Option<&std::path::Path>,
profile_name: &str,
) -> Result<(), CliError> {
let profile = resolve_profile(config_path, profile_name).await?;
let ctx = CliContext::build(&profile, args.identity.as_deref(), args.node, false).await?;
let deck = ctx.deck();
let mut query = deck.audit().recent(args.limit);
if let Some(op) = args.by_operator {
query = query.by_operator(op);
}
if let (Some(start), Some(end)) = (args.start_ms, args.end_ms) {
query = query.between(start, end);
}
if args.force_only {
query = query.force_only();
}
if let Some(seq) = args.since {
query = query.since(seq);
}
let records = query.collect();
emit_value(OutputFormat::resolve_oneshot(output), &records)
.map_err(|e| generic(format!("write audit: {e}")))?;
Ok(())
}
async fn run_stream(
args: StreamArgs,
output: Option<OutputFormat>,
config_path: Option<&std::path::Path>,
profile_name: &str,
) -> Result<(), CliError> {
let profile = resolve_profile(config_path, profile_name).await?;
let ctx = CliContext::build(&profile, args.identity.as_deref(), args.node, false).await?;
let deck = ctx.deck();
let mut query = deck.audit();
if let Some(op) = args.by_operator {
query = query.by_operator(op);
}
if args.force_only {
query = query.force_only();
}
if let Some(seq) = args.since {
query = query.since(seq);
}
let mut stream = query.stream();
let fmt = OutputFormat::resolve_stream(output);
let mut ctrl_c = std::pin::pin!(tokio::signal::ctrl_c());
loop {
tokio::select! {
_ = ctrl_c.as_mut() => {
tracing::info!("audit stream cancelled by Ctrl-C");
return Ok(());
}
row = stream.next() => {
match row {
Some(Ok(record)) => {
emit_stream_row(fmt, &record)
.map_err(|e| generic(format!("write audit row: {e}")))?;
}
Some(Err(e)) => {
return Err(sdk(format!("audit stream error: {e}")));
}
None => return Ok(()),
}
}
}
}
}