use crate::settings::{AppConfig, Commands, StreamCopyArgs};
use redis::streams::{StreamAddOptions, StreamReadOptions};
type RawStreamReadResponse = Vec<(String, Vec<(String, Vec<(String, Vec<u8>)>)>)>;
async fn xread(
conn: &mut redis::aio::MultiplexedConnection,
last_id: &str,
config: &StreamCopyArgs,
) -> redis::RedisResult<RawStreamReadResponse> {
if let Some(group) = &config.group {
let consumer = config.consumer.as_deref().unwrap_or("default");
let options = StreamReadOptions::default()
.block(config.block_ms)
.count(config.count)
.group(group, consumer);
let last_id = ">";
redis::AsyncCommands::xread_options(conn, &[&config.stream], &[&last_id], &options).await
} else {
let options = StreamReadOptions::default()
.block(config.block_ms)
.count(config.count);
redis::AsyncCommands::xread_options(conn, &[&config.stream], &[&last_id], &options).await
}
}
async fn xadd_all(
conn: &mut redis::aio::MultiplexedConnection,
data: &RawStreamReadResponse,
) -> redis::RedisResult<()> {
let options = StreamAddOptions::default();
for (stream_name, entries) in data {
for (entry_id, entry_items) in entries {
redis::AsyncCommands::xadd_options(conn, stream_name, entry_id, entry_items, &options)
.await?
}
}
Ok(())
}
pub async fn run(config: AppConfig) -> anyhow::Result<()> {
let Commands::StreamCopy(command_config) = config.command else {
panic!("Invalid command config received!");
};
if config.redis.sentinel && config.redis.sentinel_master.is_none() {
eprintln!("--sentinel requires --sentinel-master to be set");
std::process::exit(1);
}
let mut source_conn = crate::connect::connect(&config.redis).await?;
let mut target_conn = crate::connect::connect(&command_config.target).await?;
let mut last_id = command_config.start_id.clone();
loop {
let reply = xread(&mut source_conn, &last_id, &command_config).await?;
xadd_all(&mut target_conn, &reply).await?;
if reply.is_empty() && !command_config.retry_when_empty {
break;
}
for (_stream_name, entries) in reply {
for (entry_id, _entry_items) in entries {
last_id = entry_id;
}
}
}
Ok(())
}