use std::{
collections::BTreeMap,
future::Future,
sync::Arc,
time::{Duration, Instant},
};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value, json};
use crate::{
CommandResult, Credential, Dispatcher, Result, SchemaRegistry, Tier,
error::{CliCoreError, exit_code_for_error},
output::{
Envelope, HumanViewRegistry, OutputFormat, PipelineOpts, apply_pipeline,
build_error_envelope, is_valid_output_format, render_human_with_registry_for_schema,
},
};
pub type ValueMap = Map<String, Value>;
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct CommandMeta {
pub dry_run_prompt: bool,
pub auth_metadata: BTreeMap<String, String>,
pub scopes: Vec<String>,
}
impl CommandMeta {
#[must_use]
pub fn provider(&self) -> Option<&str> {
self.auth_metadata.get("provider").map(String::as_str)
}
#[must_use]
pub fn tier(&self) -> Tier {
self.auth_metadata
.get("tier")
.and_then(|value| value.parse::<Tier>().ok())
.unwrap_or(Tier::Read)
}
#[must_use]
pub fn fixed_env(&self) -> Option<&str> {
self.auth_metadata.get("fixed_env").map(String::as_str)
}
}
#[async_trait]
pub trait Authorizer: Send + Sync + std::fmt::Debug {
async fn authorize(
&self,
command_path: &str,
args: &ValueMap,
credential: Option<&Credential>,
reason: &str,
tier: Tier,
) -> Result<()>;
}
#[async_trait]
pub trait Auditor: Send + Sync + std::fmt::Debug {
async fn append(
&self,
command_path: &str,
args: &ValueMap,
identity: &str,
result: &str,
reason: &str,
) -> Result<()>;
}
#[async_trait]
pub trait ActivityEmitter: Send + Sync + std::fmt::Debug {
async fn emit(&self, event: ActivityEvent) -> Result<()>;
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct ActivityEvent {
pub timestamp: String,
pub app: String,
pub command: String,
pub env: String,
pub backend: String,
pub identity: String,
pub sub: String,
pub account_type: String,
pub status: String,
pub error: String,
pub reason: String,
pub args: ValueMap,
pub duration_ms: i64,
pub meta: ValueMap,
}
#[derive(Clone, Debug, Default)]
pub struct Middleware {
pub authz: Option<Arc<dyn Authorizer>>,
pub auth: Dispatcher,
pub auditor: Option<Arc<dyn Auditor>>,
pub activity: Option<Arc<dyn ActivityEmitter>>,
pub app_id: String,
pub default_auth_provider: String,
pub output_format: String,
pub env: String,
pub verbose: String,
pub dry_run: bool,
pub fields: String,
pub filter: String,
pub expr: String,
pub limit: i64,
pub offset: i64,
pub reason: String,
pub schema: bool,
pub timeout: Option<Duration>,
pub debug: String,
pub search: String,
pub schema_registry: SchemaRegistry,
pub human_views: HumanViewRegistry,
}
#[derive(Clone, Debug, PartialEq)]
pub struct MiddlewareOutput {
pub envelope: Envelope,
pub rendered: String,
pub exit_code: i32,
}
#[derive(Clone, Debug, PartialEq)]
pub struct MiddlewareRequest<'request> {
pub meta: CommandMeta,
pub command_path: &'request str,
pub system: &'request str,
pub user_args: ValueMap,
pub args: ValueMap,
pub default_fields: &'request str,
pub no_auth: bool,
}
impl Middleware {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub async fn run<F, Fut, Output>(
&self,
request: MiddlewareRequest<'_>,
command: F,
) -> Result<MiddlewareOutput>
where
F: FnOnce(Option<Credential>) -> Fut + Send,
Fut: Future<Output = Result<Output>> + Send,
Output: Into<CommandResult>,
{
let start = Instant::now();
let MiddlewareRequest {
meta,
command_path,
system,
user_args,
mut args,
default_fields,
no_auth,
} = request;
let command_system = effective_request_system(system, command_path);
if !no_auth && !self.env.is_empty() && !args.contains_key("env") {
args.insert("env".to_owned(), Value::String(self.env.clone()));
}
let credential = if no_auth {
None
} else {
let provider_name = meta
.provider()
.filter(|provider| !provider.is_empty())
.unwrap_or(&self.default_auth_provider);
let resolved_env = meta.fixed_env().unwrap_or(&self.env);
let tier_text = meta.auth_metadata.get("tier").map_or("", String::as_str);
match self
.auth
.get_credential(provider_name, resolved_env, command_path, tier_text)
.await
{
Ok(credential) => Some(credential),
Err(err) => {
self.write_audit(command_path, &args, "", "auth-error")
.await;
self.emit_activity(
command_path,
&args,
None,
"auth-error",
provider_name,
&err.to_string(),
start,
)
.await;
return self.render_error(&err, command_path, start, &user_args, &args, "");
}
}
};
let identity = credential
.as_ref()
.map_or("", |credential| credential.identity.as_str());
if no_auth
&& let Some(output) =
self.render_schema_if_requested(command_path, start, &user_args, &args, identity)?
{
return Ok(output);
}
if let Some(authz) = &self.authz
&& let Err(err) = authz
.authorize(
command_path,
&args,
credential.as_ref(),
&self.reason,
meta.tier(),
)
.await
{
self.write_audit(command_path, &args, identity, "denied")
.await;
self.emit_activity(
command_path,
&args,
credential.as_ref(),
"denied",
command_path,
&err.to_string(),
start,
)
.await;
return self.render_error(&err, command_path, start, &user_args, &args, identity);
}
if let Some(output) =
self.render_schema_if_requested(command_path, start, &user_args, &args, identity)?
{
return Ok(output);
}
if self.dry_run && meta.dry_run_prompt {
self.write_audit(command_path, &args, identity, "dry-run")
.await;
self.emit_activity(
command_path,
&args,
credential.as_ref(),
"dry-run",
command_path,
"",
start,
)
.await;
let envelope = Envelope::success(
json!({
"command": command_path,
"action": "dry-run: would execute",
}),
command_path,
)
.with_dry_run();
return self.render_envelope(
envelope,
"",
command_path,
start,
&user_args,
&args,
identity,
);
}
let result = match command(credential.clone()).await {
Ok(result) => result.into(),
Err(err) => {
let error_system = err.system().unwrap_or(&command_system);
self.write_audit(command_path, &args, identity, "error")
.await;
self.emit_activity(
command_path,
&args,
credential.as_ref(),
"error",
error_system,
&err.to_string(),
start,
)
.await;
return self.render_error(&err, error_system, start, &user_args, &args, identity);
}
};
self.write_audit(command_path, &args, identity, "ok").await;
self.emit_activity(
command_path,
&args,
credential.as_ref(),
"ok",
&command_system,
"",
start,
)
.await;
self.render_envelope(
Envelope::success(result.data, command_system),
default_fields,
command_path,
start,
&user_args,
&args,
identity,
)
}
#[doc(hidden)]
pub async fn run_no_auth<F, Fut>(
&self,
meta: CommandMeta,
command_path: &str,
user_args: ValueMap,
args: ValueMap,
default_fields: &str,
command: F,
) -> Result<MiddlewareOutput>
where
F: FnOnce() -> Fut + Send,
Fut: Future<Output = Result<CommandResult>> + Send,
{
self.run(
MiddlewareRequest {
meta,
command_path,
system: fallback_system(command_path),
user_args,
args,
default_fields,
no_auth: true,
},
async move |_credential| command().await,
)
.await
}
async fn write_audit(&self, command_path: &str, args: &ValueMap, identity: &str, result: &str) {
if let Some(auditor) = &self.auditor
&& let Err(err) = auditor
.append(command_path, args, identity, result, &self.reason)
.await
{
tracing::warn!(command = command_path, error = %err, "audit log write failed");
}
}
#[allow(clippy::too_many_arguments)]
async fn emit_activity(
&self,
command_path: &str,
args: &ValueMap,
credential: Option<&Credential>,
result: &str,
backend: &str,
error: &str,
start: Instant,
) {
let Some(activity) = &self.activity else {
return;
};
let (identity, sub, account_type) = credential.map_or_else(
|| (String::new(), String::new(), String::new()),
|credential| {
(
credential.identity.clone(),
credential.sub.clone(),
credential.account_type.clone(),
)
},
);
let duration_ms = i64::try_from(start.elapsed().as_millis()).unwrap_or(i64::MAX);
let event = ActivityEvent {
timestamp: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true),
app: self.app_id.clone(),
command: command_path.to_owned(),
env: self.env.clone(),
backend: backend.to_owned(),
identity,
sub,
account_type,
status: result.to_owned(),
error: error.to_owned(),
reason: self.reason.clone(),
args: args.clone(),
duration_ms,
meta: ValueMap::new(),
};
if let Err(err) = activity.emit(event).await {
tracing::warn!(command = command_path, error = %err, "activity emit failed");
}
}
fn render_schema_if_requested(
&self,
command_path: &str,
start: Instant,
user_args: &ValueMap,
effective_args: &ValueMap,
identity: &str,
) -> Result<Option<MiddlewareOutput>> {
if self.schema
&& let Some(schema) = self.schema_registry.get_by_path(command_path)
{
return self
.render_envelope(
Envelope::success(schema, self.app_id.clone()),
"",
command_path,
start,
user_args,
effective_args,
identity,
)
.map(Some);
}
Ok(None)
}
#[allow(clippy::too_many_arguments)]
fn render_envelope(
&self,
mut envelope: Envelope,
default_fields: &str,
command_path: &str,
start: Instant,
user_args: &ValueMap,
effective_args: &ValueMap,
identity: &str,
) -> Result<MiddlewareOutput> {
if !is_valid_output_format(&self.output_format) {
let err = CliCoreError::InvalidOutputFormat(self.output_format.clone());
return self.render_error(
&err,
&self.app_id,
start,
user_args,
effective_args,
identity,
);
}
let output_format = self.output_format.parse::<OutputFormat>()?;
let mut fields = if self.fields.is_empty() {
default_fields
} else {
&self.fields
};
if output_format == OutputFormat::Human && self.fields.is_empty() {
fields = "";
}
if let Some(data) = &mut envelope.data {
let pagination = apply_pipeline(
data,
&PipelineOpts {
filter: self.filter.clone(),
limit: self.limit,
offset: self.offset,
expr: self.expr.clone(),
fields: fields.to_owned(),
},
)?;
if let Some(pagination) = pagination
&& let Some(metadata) = &mut envelope.metadata
{
metadata.pagination = Some(pagination);
}
}
envelope.with_context(
command_path,
&self.env,
identity,
start.elapsed(),
Some(Value::Object(user_args.clone())),
Some(Value::Object(effective_args.clone())),
);
let system = envelope
.metadata
.as_ref()
.map(|metadata| metadata.system.as_str())
.unwrap_or_default()
.to_owned();
let prepared = envelope.prepare_for_render(&self.verbose);
let rendered = if output_format == OutputFormat::Human {
render_human_with_registry_for_schema(&prepared, &self.human_views, &system)
} else {
crate::output::render(output_format, &prepared)?
};
Ok(MiddlewareOutput {
envelope: prepared,
rendered,
exit_code: 0,
})
}
fn render_error(
&self,
err: &(dyn std::error::Error + 'static),
system: &str,
start: Instant,
user_args: &ValueMap,
effective_args: &ValueMap,
identity: &str,
) -> Result<MiddlewareOutput> {
let mut envelope = build_error_envelope(err, system);
envelope.with_context(
"",
&self.env,
identity,
start.elapsed(),
Some(Value::Object(user_args.clone())),
Some(Value::Object(effective_args.clone())),
);
let prepared = envelope.prepare_for_render(&self.verbose);
let rendered = crate::output::render_format(&self.output_format, &prepared)?;
Ok(MiddlewareOutput {
envelope: prepared,
rendered,
exit_code: exit_code_for_error(err),
})
}
}
#[must_use]
pub fn value_map(entries: impl IntoIterator<Item = (impl Into<String>, Value)>) -> ValueMap {
entries
.into_iter()
.map(|(key, value)| (key.into(), value))
.collect()
}
fn effective_request_system(system: &str, command_path: &str) -> String {
if system.is_empty() {
return fallback_system(command_path).to_owned();
}
system.to_owned()
}
fn fallback_system(command_path: &str) -> &str {
command_path
.split_once(':')
.map_or(command_path, |(system, _)| system)
}
impl From<CliCoreError> for Value {
fn from(error: CliCoreError) -> Self {
Value::String(error.to_string())
}
}