use clap::Args;
use rc_core::{AliasManager, ObjectEncryptionRequest, ObjectStore as _, RemotePath};
use rc_s3::S3Client;
use serde::Serialize;
use std::io::Read;
use crate::exit_code::ExitCode;
use crate::output::{Formatter, OutputConfig};
#[derive(Args, Debug)]
pub struct PipeArgs {
pub target: String,
#[arg(long, default_value = "application/octet-stream")]
pub content_type: String,
#[arg(long)]
pub storage_class: Option<String>,
#[arg(long = "enc-s3", default_value = "false")]
pub enc_s3: bool,
#[arg(long = "enc-kms")]
pub enc_kms: Option<String>,
}
#[derive(Debug, Serialize)]
struct PipeOutput {
status: &'static str,
target: String,
size_bytes: i64,
size_human: String,
#[serde(skip_serializing_if = "Option::is_none")]
etag: Option<String>,
}
pub async fn execute(args: PipeArgs, output_config: OutputConfig) -> ExitCode {
let formatter = Formatter::new(output_config);
let encryption = match (args.enc_s3, args.enc_kms.as_deref()) {
(true, None) => Some(ObjectEncryptionRequest::SseS3),
(false, Some(key_id)) => Some(ObjectEncryptionRequest::SseKms {
key_id: key_id.to_string(),
}),
(false, None) => None,
(true, Some(_)) => {
return formatter.fail(
ExitCode::UsageError,
"--enc-s3 and --enc-kms cannot be used together",
);
}
};
let (alias_name, bucket, key) = match parse_pipe_path(&args.target) {
Ok(parsed) => parsed,
Err(e) => {
formatter.error(&e);
return ExitCode::UsageError;
}
};
if key.is_empty() {
formatter.error("Object key is required for pipe command.");
return ExitCode::UsageError;
}
let alias_manager = match AliasManager::new() {
Ok(am) => am,
Err(e) => {
formatter.error(&format!("Failed to load aliases: {e}"));
return ExitCode::GeneralError;
}
};
let alias = match alias_manager.get(&alias_name) {
Ok(a) => a,
Err(_) => {
formatter.error(&format!("Alias '{alias_name}' not found"));
return ExitCode::NotFound;
}
};
let client = match S3Client::new(alias).await {
Ok(c) => c,
Err(e) => {
formatter.error(&format!("Failed to create S3 client: {e}"));
return ExitCode::NetworkError;
}
};
let mut buffer = Vec::new();
if let Err(e) = std::io::stdin().read_to_end(&mut buffer) {
formatter.error(&format!("Failed to read from stdin: {e}"));
return ExitCode::GeneralError;
}
let size = buffer.len() as i64;
let target = RemotePath::new(&alias_name, &bucket, &key);
let target_display = format!("{alias_name}/{bucket}/{key}");
match client
.put_object(
&target,
buffer,
Some(&args.content_type),
encryption.as_ref(),
)
.await
{
Ok(info) => {
if formatter.is_json() {
let output = PipeOutput {
status: "success",
target: target_display,
size_bytes: size,
size_human: humansize::format_size(size as u64, humansize::BINARY),
etag: info.etag,
};
formatter.json(&output);
} else {
formatter.success(&format!(
"Uploaded to {target_display} ({})",
humansize::format_size(size as u64, humansize::BINARY)
));
}
ExitCode::Success
}
Err(e) => {
formatter.error(&format!("Failed to upload: {e}"));
ExitCode::NetworkError
}
}
}
fn parse_pipe_path(path: &str) -> Result<(String, String, String), String> {
if path.is_empty() {
return Err("Path cannot be empty".to_string());
}
let parts: Vec<&str> = path.splitn(3, '/').collect();
if parts.len() < 3 {
return Err(format!(
"Invalid path format: '{path}'. Expected: alias/bucket/key"
));
}
let alias = parts[0].to_string();
let bucket = parts[1].to_string();
let key = parts[2].to_string();
if bucket.is_empty() {
return Err("Bucket name cannot be empty".to_string());
}
if key.is_empty() {
return Err("Object key cannot be empty".to_string());
}
Ok((alias, bucket, key))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_pipe_path_valid() {
let (alias, bucket, key) = parse_pipe_path("myalias/mybucket/file.txt").unwrap();
assert_eq!(alias, "myalias");
assert_eq!(bucket, "mybucket");
assert_eq!(key, "file.txt");
}
#[test]
fn test_parse_pipe_path_with_prefix() {
let (alias, bucket, key) = parse_pipe_path("myalias/mybucket/path/to/file.txt").unwrap();
assert_eq!(alias, "myalias");
assert_eq!(bucket, "mybucket");
assert_eq!(key, "path/to/file.txt");
}
#[test]
fn test_parse_pipe_path_no_key() {
assert!(parse_pipe_path("myalias/mybucket").is_err());
}
#[test]
fn test_parse_pipe_path_empty() {
assert!(parse_pipe_path("").is_err());
}
#[tokio::test]
async fn pipe_conflicting_encryption_flags_return_usage_error() {
let args = PipeArgs {
target: "local/bucket/file.txt".to_string(),
content_type: "application/octet-stream".to_string(),
storage_class: None,
enc_s3: true,
enc_kms: Some("kms-key".to_string()),
};
let code = execute(args, OutputConfig::default()).await;
assert_eq!(code, ExitCode::UsageError);
}
}