use std::str::FromStr;
mod args;
use anyhow::Context as _;
use clap::{Parser, ValueEnum};
use mqi::{
Object, Properties, Syncpoint,
connection::{ThreadNone, Tls},
constants,
prelude::*,
put::{Context, PropertyAction},
structs,
types::{ApplName, CipherSpec, MQCMHO, MessageFormat, QueueManagerName, QueueName},
};
const APP_NAME: ApplName = ApplName(mqstr!("forward"));
const DEFAULT_CIPHER: CipherSpec = CipherSpec(mqstr!("TLS_AES_128_GCM_SHA256"));
#[derive(Parser, Debug)]
struct Cli {
#[command(flatten)]
connection: args::ConnectionArgs,
#[arg(short = 'x', long, value_enum, default_value_t=ContextArg::Default)]
context: ContextArg,
#[arg(short, long, default_value_t = false)]
dry_run: bool,
#[arg(short, long)]
source_queue: String,
#[arg(short, long)]
queue: String,
#[arg(short = 'm', long, requires("queue"))]
queue_manager: Option<String>,
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, ValueEnum)]
enum ContextArg {
#[default]
Default,
None,
Identity,
All,
}
fn main() -> anyhow::Result<()> {
let subscriber = tracing_subscriber::fmt()
.compact()
.with_max_level(tracing::Level::TRACE)
.finish();
tracing::subscriber::set_global_default(subscriber)?;
let args = Cli::parse();
let client_method = args.connection.method.connect_option()?;
let qm_name = args
.connection
.queue_manager_name()
.context("Connection queue manager name is invalid")?;
let creds = args.connection.credentials();
let cno = args.connection.cno().context("MQCNO option is not valid")?;
let tls = args.connection.tls(&DEFAULT_CIPHER).context("TLS options are not valid")?;
let tls_connect = tls
.as_ref()
.map(|(repo, cipher, label)| Tls::new(repo, label.as_ref(), cipher));
let source_queue = QueueName::from_str(&args.source_queue)?;
let target_queue = QueueName::from_str(&args.queue)?;
let target_qm = args
.queue_manager
.as_deref()
.map(QueueManagerName::from_str)
.transpose()
.context("Target queue manager name is invalid")?;
let qm = mqi::connect::<ThreadNone>(&(APP_NAME, tls_connect, qm_name, creds, cno, client_method))
.already_connected_ref()
.discard_warning()
.context("Unable to connect to the queue manager")?;
let qm_ref = qm.connection_ref();
let obj = Object::open(
qm_ref.clone(),
&(
source_queue,
constants::MQOO_INPUT_AS_Q_DEF | constants::MQOO_SAVE_ALL_CONTEXT,
),
)
.warn_as_error() .context("Unable to open the object")?;
let mut buffer = Vec::<u8>::with_capacity(20 * 1024); let buf_write = buffer.spare_capacity_mut();
let syncpoint = Syncpoint::new(qm_ref.clone());
let mut properties = Properties::new(&qm, MQCMHO::default())?;
let message: Option<(_, structs::MQMD)> = obj
.get_data_with(
&(
constants::MQGMO_SYNCPOINT, &mut properties, ),
buf_write, )
.warn_as_error() .context("Unable to get a messsage")?;
if let Some((msg_data, md)) = message {
let len = msg_data.len();
unsafe {
buffer.set_len(len);
}
let mut target_properties = Properties::new(&qm, MQCMHO::default())?; let fmt = MessageFormat::from_mqmd(&md);
qm_ref
.put_message(
&(
constants::MQPMO_SYNCPOINT, match args.context {
ContextArg::Default => constants::MQPMO_DEFAULT_CONTEXT,
ContextArg::None => constants::MQPMO_NO_CONTEXT,
ContextArg::Identity => constants::MQPMO_PASS_IDENTITY_CONTEXT,
ContextArg::All => constants::MQPMO_PASS_ALL_CONTEXT,
},
target_qm, target_queue, ),
&(
md, Context(&obj), PropertyAction::Forward(&properties, &mut target_properties), ),
&(buffer, fmt), )
.warn_as_error() .context("Unable to put a message")?;
}
let _ = if args.dry_run {
syncpoint.backout().warn_as_error().context("Unable to backout") } else {
syncpoint.commit().warn_as_error().context("Unabel to commit") }?;
Ok(())
}