use crate::ui::{MessageBuffer, TopicMessage, TopicsListTui};
use anyhow::Result;
use crossterm::{
event::{self, Event},
execute,
terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen},
};
use ratatui::{backend::CrosstermBackend, Terminal};
use serde_json::Value;
use std::io::stdout;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
#[derive(Debug, Clone)]
pub struct TopicsListArgs {
pub pattern: String,
#[allow(dead_code)]
pub verbose: bool,
#[allow(dead_code)]
pub grouped: bool,
}
pub async fn handle_topics_list(ctx: &mut crate::context::CliContext, args: &TopicsListArgs) -> Result<()> {
let redis_url = ctx.redis_url().to_string();
let redis = ctx.redis()?;
let mut conn = redis.get_connection().await.map_err(|e| {
eprintln!("❌ Failed to connect to Redis at {}", redis_url);
eprintln!(" Error: {}", e);
eprintln!();
eprintln!("Hint: Start Redis with:");
eprintln!(" docker compose up -d redis");
eprintln!(" # or");
eprintln!(" redis-server");
e
})?;
let topics: Vec<String> = redis::cmd("KEYS")
.arg(&args.pattern)
.query_async(&mut conn)
.await
.map_err(|e| anyhow::anyhow!("Failed to scan Redis: {}", e))?;
if topics.is_empty() {
println!();
println!("No topics found matching pattern: {}", args.pattern);
println!();
println!("Hint: Start some nodes to publish data:");
println!(" mecha10 dev");
println!();
return Ok(());
}
enable_raw_mode()?;
let mut stdout_handle = stdout();
execute!(stdout_handle, EnterAlternateScreen)?;
let backend = CrosstermBackend::new(stdout_handle);
let mut terminal = Terminal::new(backend)?;
let mut tui = TopicsListTui::new(topics);
let running = Arc::new(AtomicBool::new(true));
let r = running.clone();
ctrlc::set_handler(move || {
r.store(false, Ordering::SeqCst);
})?;
let (monitor_tx, mut monitor_rx) = mpsc::channel::<MonitorCommand>(10);
let mut monitor_handle: Option<tokio::task::JoinHandle<()>> = None;
let (topic_tx, mut topic_rx) = mpsc::channel::<Vec<String>>(10);
let pattern_clone = args.pattern.clone();
let redis_url_clone = redis_url.clone();
let discovery_handle =
tokio::spawn(async move { discover_topics_background(&redis_url_clone, &pattern_clone, topic_tx).await });
let result = loop {
if !running.load(Ordering::SeqCst) || tui.should_quit() {
break Ok(());
}
if let Err(e) = terminal.draw(|f| tui.draw(f)) {
break Err(e.into());
}
if event::poll(Duration::from_millis(100))? {
if let Event::Key(key_event) = event::read()? {
let prev_monitoring = tui.monitoring_topic().map(|s| s.to_string());
tui.handle_key(key_event);
let current_monitoring = tui.monitoring_topic().map(|s| s.to_string());
if prev_monitoring != current_monitoring {
if monitor_handle.is_some() {
let _ = monitor_tx.send(MonitorCommand::Stop).await;
if let Some(handle) = monitor_handle.take() {
handle.abort();
}
}
if let Some(topic) = current_monitoring {
let buffer = tui.message_buffer();
let redis_url_clone = redis_url.clone();
let (stop_tx, stop_rx) = mpsc::channel::<()>(1);
monitor_handle = Some(tokio::spawn(async move {
let _ = monitor_topic_background(&redis_url_clone, &topic, buffer, stop_rx).await;
}));
drop(stop_tx);
}
}
}
}
if let Ok(cmd) = monitor_rx.try_recv() {
match cmd {
MonitorCommand::Stop => {
}
}
}
if let Ok(new_topics) = topic_rx.try_recv() {
tui.update_topics(new_topics);
}
};
if monitor_handle.is_some() {
let _ = monitor_tx.send(MonitorCommand::Stop).await;
if let Some(handle) = monitor_handle.take() {
handle.abort();
}
}
discovery_handle.abort();
disable_raw_mode()?;
execute!(terminal.backend_mut(), LeaveAlternateScreen)?;
terminal.show_cursor()?;
result
}
enum MonitorCommand {
Stop,
}
async fn discover_topics_background(redis_url: &str, pattern: &str, tx: mpsc::Sender<Vec<String>>) -> Result<()> {
let client = redis::Client::open(redis_url)?;
loop {
if let Ok(mut conn) = client.get_multiplexed_async_connection().await {
if let Ok(topics) = redis::cmd("KEYS")
.arg(pattern)
.query_async::<Vec<String>>(&mut conn)
.await
{
if tx.send(topics).await.is_err() {
break;
}
}
}
tokio::time::sleep(Duration::from_secs(2)).await;
}
Ok(())
}
async fn monitor_topic_background(
redis_url: &str,
topic: &str,
buffer: MessageBuffer,
mut stop_rx: mpsc::Receiver<()>,
) -> Result<()> {
let client = redis::Client::open(redis_url)?;
let mut conn = client.get_multiplexed_async_connection().await?;
let history_result: redis::RedisResult<redis::Value> = redis::cmd("XREVRANGE")
.arg(topic)
.arg("+") .arg("-") .arg("COUNT")
.arg(20)
.query_async(&mut conn)
.await;
let mut last_id = "$".to_string();
if let Ok(redis::Value::Array(entries)) = history_result {
let mut history_msgs: Vec<(String, TopicMessage)> = Vec::new();
for entry_data in entries {
if let Ok(Some(msg)) = parse_stream_entry(&entry_data) {
history_msgs.push((msg.entry_id.clone(), msg));
}
}
history_msgs.reverse();
for (entry_id, msg) in history_msgs {
last_id = entry_id;
buffer.push(msg);
}
tracing::debug!("Loaded recent history for topic '{}', last_id='{}'", topic, last_id);
}
tracing::debug!("Starting monitor for topic '{}' with last_id='{}'", topic, last_id);
loop {
tokio::select! {
_ = stop_rx.recv() => {
tracing::debug!("Stop signal received for topic '{}'", topic);
break;
}
result = read_stream_messages(&mut conn, topic, &last_id) => {
match result {
Ok(Some((messages, new_last_id))) => {
tracing::debug!(
"Read {} messages from '{}', last_id: '{}' -> '{}'",
messages.len(),
topic,
last_id,
new_last_id
);
for msg in messages {
buffer.push(msg);
}
last_id = new_last_id;
}
Ok(None) => {
tracing::trace!("No messages from '{}' (timeout)", topic);
}
Err(e) => {
tracing::error!("Error reading from '{}': {}", topic, e);
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
}
}
}
Ok(())
}
async fn read_stream_messages(
conn: &mut redis::aio::MultiplexedConnection,
topic: &str,
last_id: &str,
) -> Result<Option<(Vec<TopicMessage>, String)>> {
let result: redis::RedisResult<redis::Value> = redis::cmd("XREAD")
.arg("BLOCK")
.arg(1000) .arg("STREAMS")
.arg(topic)
.arg(last_id)
.query_async(conn)
.await;
match result {
Ok(redis::Value::Array(streams)) => {
let mut messages = Vec::new();
let mut new_last_id = last_id.to_string();
for stream_data in streams {
if let redis::Value::Array(stream_parts) = stream_data {
if stream_parts.len() >= 2 {
if let redis::Value::Array(entries) = &stream_parts[1] {
for entry_data in entries {
match parse_stream_entry(entry_data) {
Ok(Some(msg)) => {
new_last_id = msg.entry_id.clone();
messages.push(msg);
}
Ok(None) => {
tracing::warn!("Failed to parse stream entry (no data field?)");
}
Err(e) => {
tracing::error!("Error parsing stream entry: {}", e);
}
}
}
}
}
}
}
if messages.is_empty() {
Ok(None)
} else {
Ok(Some((messages, new_last_id)))
}
}
Ok(redis::Value::Nil) => Ok(None),
Ok(other) => {
tracing::warn!("Unexpected Redis value type: {:?}", other);
Ok(None)
}
Err(e) => {
tracing::error!("Redis XREAD error: {}", e);
Err(anyhow::anyhow!("Redis XREAD failed: {}", e))
}
}
}
fn parse_stream_entry(entry_data: &redis::Value) -> Result<Option<TopicMessage>> {
if let redis::Value::Array(entry_parts) = entry_data {
if entry_parts.len() >= 2 {
let entry_id = match &entry_parts[0] {
redis::Value::BulkString(bytes) => String::from_utf8_lossy(bytes).to_string(),
_ => return Ok(None),
};
if let redis::Value::Array(fields) = &entry_parts[1] {
let data_value = extract_data_field(fields)?;
if let Some(data) = data_value {
let payload = parse_message_payload(&data)?;
let now = chrono::Local::now();
let timestamp = now.format("%H:%M:%S").to_string();
return Ok(Some(TopicMessage {
entry_id,
timestamp,
payload,
}));
}
}
}
}
Ok(None)
}
fn extract_data_field(fields: &[redis::Value]) -> Result<Option<String>> {
for i in (0..fields.len()).step_by(2) {
if i + 1 < fields.len() {
if let redis::Value::BulkString(field_name) = &fields[i] {
if field_name.as_slice() == b"data" {
if let redis::Value::BulkString(value) = &fields[i + 1] {
return Ok(Some(String::from_utf8_lossy(value).to_string()));
}
}
}
}
}
Ok(None)
}
fn parse_message_payload(data: &str) -> Result<Value> {
if let Ok(envelope) = serde_json::from_str::<Value>(data) {
if let Some(payload) = envelope.get("payload") {
return Ok(payload.clone());
}
return Ok(envelope);
}
Ok(Value::String(data.to_string()))
}