use std::{collections::HashMap, io::Write};
use crate::settings::{AppConfig, Commands, StreamTailArgs};
use redis::{
FromRedisValue,
streams::{StreamReadOptions, StreamReadReply},
};
use serde_json::json;
fn print_stream_read_reply(
reply: &StreamReadReply,
as_json: bool,
raw_key: Option<&str>,
) -> anyhow::Result<()> {
for stream in &reply.keys {
for entry in &stream.ids {
if let Some(raw_key) = raw_key {
let value = entry
.map
.get(raw_key)
.map(String::from_redis_value)
.transpose()?
.unwrap_or(String::from("null"));
println!("{}", value);
continue;
}
let id = &entry.id;
let kvs: Result<HashMap<&str, String>, redis::RedisError> = entry
.map
.iter()
.map(|(key, val)| String::from_redis_value(val).map(|v| (key.as_str(), v)))
.collect();
if as_json {
let value = json!({
"id": id,
"map": kvs?,
});
println!("{}", value);
} else {
println!("ID: {}", id);
for (key, val) in kvs?.iter() {
println!(" {} => {}", key, val);
}
}
}
}
std::io::stdout().flush()?;
Ok(())
}
async fn xread(
conn: &mut redis::aio::MultiplexedConnection,
last_id: &str,
config: &StreamTailArgs,
) -> redis::RedisResult<redis::streams::StreamReadReply> {
if let Some(group) = &config.group {
let consumer = config.consumer.as_deref().unwrap_or("default");
let options = StreamReadOptions::default()
.block(config.block_ms)
.count(config.count)
.group(group, consumer);
let last_id = ">";
redis::AsyncCommands::xread_options(conn, &[&config.stream], &[&last_id], &options).await
} else {
let options = StreamReadOptions::default()
.block(config.block_ms)
.count(config.count);
redis::AsyncCommands::xread_options(conn, &[&config.stream], &[&last_id], &options).await
}
}
pub async fn run(config: AppConfig) -> anyhow::Result<()> {
let Commands::StreamTail(command_config) = config.command else {
panic!("Invalid command config received!");
};
if config.redis.sentinel && config.redis.sentinel_master.is_none() {
eprintln!("--sentinel requires --sentinel-master to be set");
std::process::exit(1);
}
let mut conn = crate::connect::connect(&config.redis).await?;
let mut last_id = command_config.start_id.clone();
loop {
let reply = xread(&mut conn, &last_id, &command_config).await?;
print_stream_read_reply(
&reply,
!command_config.plaintext,
command_config.raw_key.as_deref(),
)?;
if reply.keys.is_empty() && !command_config.retry_when_empty {
break;
}
for stream in reply.keys {
for entry in stream.ids {
last_id = entry.id;
}
}
}
Ok(())
}