mod render;
use std::marker::PhantomData;
use ratatui::prelude::*;
use super::{Component, EventContext, RenderContext};
use crate::input::{Event, Key};
use crate::scroll::ScrollState;
#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
#[cfg_attr(
feature = "serialization",
derive(serde::Serialize, serde::Deserialize)
)]
pub enum CorrelationLevel {
Debug,
#[default]
Info,
Warning,
Error,
}
impl CorrelationLevel {
pub fn color(&self) -> Color {
match self {
CorrelationLevel::Debug => Color::DarkGray,
CorrelationLevel::Info => Color::Blue,
CorrelationLevel::Warning => Color::Yellow,
CorrelationLevel::Error => Color::Red,
}
}
pub fn label(&self) -> &'static str {
match self {
CorrelationLevel::Debug => "DBG",
CorrelationLevel::Info => "INF",
CorrelationLevel::Warning => "WRN",
CorrelationLevel::Error => "ERR",
}
}
}
impl std::fmt::Display for CorrelationLevel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.label())
}
}
#[derive(Clone, Debug, PartialEq)]
#[cfg_attr(
feature = "serialization",
derive(serde::Serialize, serde::Deserialize)
)]
pub struct CorrelationEntry {
pub timestamp: f64,
pub level: CorrelationLevel,
pub message: String,
}
impl CorrelationEntry {
pub fn new(timestamp: f64, level: CorrelationLevel, message: impl Into<String>) -> Self {
Self {
timestamp,
level,
message: message.into(),
}
}
}
#[derive(Clone, Debug, PartialEq)]
#[cfg_attr(
feature = "serialization",
derive(serde::Serialize, serde::Deserialize)
)]
pub struct LogStream {
pub name: String,
pub color: Color,
pub entries: Vec<CorrelationEntry>,
pub filter: String,
pub min_level: Option<CorrelationLevel>,
}
impl LogStream {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
color: Color::White,
entries: Vec::new(),
filter: String::new(),
min_level: None,
}
}
pub fn with_color(mut self, color: Color) -> Self {
self.color = color;
self
}
pub fn set_color(&mut self, color: Color) {
self.color = color;
}
pub fn with_entry(mut self, entry: CorrelationEntry) -> Self {
self.entries.push(entry);
self
}
fn filtered_entries(&self) -> Vec<&CorrelationEntry> {
self.entries
.iter()
.filter(|e| self.passes_filter(e))
.collect()
}
fn passes_filter(&self, entry: &CorrelationEntry) -> bool {
if let Some(ref min_level) = self.min_level {
if entry.level < *min_level {
return false;
}
}
if self.filter.is_empty() {
return true;
}
let filter_lower = self.filter.to_lowercase();
entry.message.to_lowercase().contains(&filter_lower)
}
}
const TIMESTAMP_TOLERANCE: f64 = 0.1;
#[derive(Clone, Debug, PartialEq)]
#[cfg_attr(
feature = "serialization",
derive(serde::Serialize, serde::Deserialize)
)]
pub enum LogCorrelationMessage {
AddStream(LogStream),
SetStreams(Vec<LogStream>),
PushEntry {
stream: usize,
entry: CorrelationEntry,
},
Clear,
ScrollUp,
ScrollDown,
ScrollToTop,
ScrollToBottom,
SetStreamFilter {
stream: usize,
filter: String,
},
SetStreamLevelFilter {
stream: usize,
level: Option<CorrelationLevel>,
},
FocusNextStream,
FocusPrevStream,
ToggleSyncScroll,
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(
feature = "serialization",
derive(serde::Serialize, serde::Deserialize)
)]
pub enum LogCorrelationOutput {
StreamFocused(usize),
EntryAdded {
stream: usize,
},
}
#[derive(Clone, Debug)]
pub(crate) struct AlignedRow {
pub(crate) timestamp: f64,
pub(crate) stream_entries: Vec<Vec<usize>>,
}
#[derive(Clone, Debug)]
#[cfg_attr(
feature = "serialization",
derive(serde::Serialize, serde::Deserialize)
)]
pub struct LogCorrelationState {
streams: Vec<LogStream>,
scroll_timestamp: f64,
active_stream: usize,
scroll: ScrollState,
sync_scroll: bool,
title: Option<String>,
}
impl Default for LogCorrelationState {
fn default() -> Self {
Self {
streams: Vec::new(),
scroll_timestamp: 0.0,
active_stream: 0,
scroll: ScrollState::default(),
sync_scroll: true,
title: None,
}
}
}
impl PartialEq for LogCorrelationState {
fn eq(&self, other: &Self) -> bool {
self.streams == other.streams
&& (self.scroll_timestamp - other.scroll_timestamp).abs() < f64::EPSILON
&& self.active_stream == other.active_stream
&& self.scroll == other.scroll
&& self.sync_scroll == other.sync_scroll
&& self.title == other.title
}
}
impl LogCorrelationState {
pub fn new() -> Self {
Self::default()
}
pub fn with_streams(mut self, streams: Vec<LogStream>) -> Self {
self.streams = streams;
self
}
pub fn with_title(mut self, title: impl Into<String>) -> Self {
self.title = Some(title.into());
self
}
pub fn with_sync_scroll(mut self, sync: bool) -> Self {
self.sync_scroll = sync;
self
}
pub fn streams(&self) -> &[LogStream] {
&self.streams
}
pub fn stream_count(&self) -> usize {
self.streams.len()
}
pub fn active_stream(&self) -> usize {
self.active_stream
}
pub fn sync_scroll(&self) -> bool {
self.sync_scroll
}
pub fn scroll_offset(&self) -> usize {
self.scroll.offset()
}
pub fn scroll_timestamp(&self) -> f64 {
self.scroll_timestamp
}
pub fn title(&self) -> Option<&str> {
self.title.as_deref()
}
pub fn set_title(&mut self, title: impl Into<String>) {
self.title = Some(title.into());
}
pub fn add_stream(&mut self, stream: LogStream) {
self.streams.push(stream);
}
pub fn push_entry(&mut self, stream_idx: usize, entry: CorrelationEntry) {
if stream_idx < self.streams.len() {
self.streams[stream_idx].entries.push(entry);
}
}
pub(crate) fn aligned_rows(&self) -> Vec<AlignedRow> {
let filtered: Vec<Vec<&CorrelationEntry>> =
self.streams.iter().map(|s| s.filtered_entries()).collect();
let mut timestamps: Vec<f64> = Vec::new();
for stream_entries in &filtered {
for entry in stream_entries {
timestamps.push(entry.timestamp);
}
}
if timestamps.is_empty() {
return Vec::new();
}
timestamps.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let unique_timestamps = Self::deduplicate_timestamps(×tamps);
let mut rows = Vec::with_capacity(unique_timestamps.len());
for &ts in &unique_timestamps {
let mut stream_entries = Vec::with_capacity(self.streams.len());
for stream_filtered in &filtered {
let indices: Vec<usize> = stream_filtered
.iter()
.enumerate()
.filter(|(_, e)| (e.timestamp - ts).abs() <= TIMESTAMP_TOLERANCE)
.map(|(i, _)| i)
.collect();
stream_entries.push(indices);
}
rows.push(AlignedRow {
timestamp: ts,
stream_entries,
});
}
rows
}
fn deduplicate_timestamps(sorted: &[f64]) -> Vec<f64> {
if sorted.is_empty() {
return Vec::new();
}
let mut result = vec![sorted[0]];
for &ts in &sorted[1..] {
let last = *result.last().unwrap();
if (ts - last).abs() > TIMESTAMP_TOLERANCE {
result.push(ts);
}
}
result
}
pub(crate) fn total_display_rows(&self) -> usize {
let rows = self.aligned_rows();
rows.iter()
.map(|row| {
row.stream_entries
.iter()
.map(|indices| indices.len().max(1))
.max()
.unwrap_or(1)
})
.sum()
}
pub fn update(&mut self, msg: LogCorrelationMessage) -> Option<LogCorrelationOutput> {
LogCorrelation::update(self, msg)
}
}
pub struct LogCorrelation(PhantomData<()>);
impl Component for LogCorrelation {
type State = LogCorrelationState;
type Message = LogCorrelationMessage;
type Output = LogCorrelationOutput;
fn init() -> Self::State {
LogCorrelationState::default()
}
fn handle_event(
_state: &Self::State,
event: &Event,
ctx: &EventContext,
) -> Option<Self::Message> {
if !ctx.focused || ctx.disabled {
return None;
}
let key = event.as_key()?;
match key.code {
Key::Up | Key::Char('k') => Some(LogCorrelationMessage::ScrollUp),
Key::Down | Key::Char('j') => Some(LogCorrelationMessage::ScrollDown),
Key::Home => Some(LogCorrelationMessage::ScrollToTop),
Key::End => Some(LogCorrelationMessage::ScrollToBottom),
Key::Tab => {
if key.modifiers.shift() {
Some(LogCorrelationMessage::FocusPrevStream)
} else {
Some(LogCorrelationMessage::FocusNextStream)
}
}
Key::Char('s') => Some(LogCorrelationMessage::ToggleSyncScroll),
_ => None,
}
}
fn update(state: &mut Self::State, msg: Self::Message) -> Option<Self::Output> {
match msg {
LogCorrelationMessage::AddStream(stream) => {
state.streams.push(stream);
None
}
LogCorrelationMessage::SetStreams(streams) => {
state.streams = streams;
state.active_stream = 0;
state.scroll.set_offset(0);
None
}
LogCorrelationMessage::PushEntry { stream, entry } => {
if stream < state.streams.len() {
state.streams[stream].entries.push(entry);
Some(LogCorrelationOutput::EntryAdded { stream })
} else {
None
}
}
LogCorrelationMessage::Clear => {
for s in &mut state.streams {
s.entries.clear();
}
state.scroll.set_offset(0);
state.scroll_timestamp = 0.0;
None
}
LogCorrelationMessage::ScrollUp => {
let total = state.total_display_rows();
state.scroll.set_content_length(total);
state.scroll.set_viewport_height(1.min(total));
state.scroll.scroll_up();
Self::update_scroll_timestamp(state);
None
}
LogCorrelationMessage::ScrollDown => {
let total = state.total_display_rows();
state.scroll.set_content_length(total);
state.scroll.set_viewport_height(1.min(total));
state.scroll.scroll_down();
Self::update_scroll_timestamp(state);
None
}
LogCorrelationMessage::ScrollToTop => {
let total = state.total_display_rows();
state.scroll.set_content_length(total);
state.scroll.set_viewport_height(1.min(total));
state.scroll.scroll_to_start();
Self::update_scroll_timestamp(state);
None
}
LogCorrelationMessage::ScrollToBottom => {
let total = state.total_display_rows();
state.scroll.set_content_length(total);
state.scroll.set_viewport_height(1.min(total));
state.scroll.scroll_to_end();
Self::update_scroll_timestamp(state);
None
}
LogCorrelationMessage::SetStreamFilter { stream, filter } => {
if stream < state.streams.len() {
state.streams[stream].filter = filter;
}
None
}
LogCorrelationMessage::SetStreamLevelFilter { stream, level } => {
if stream < state.streams.len() {
state.streams[stream].min_level = level;
}
None
}
LogCorrelationMessage::FocusNextStream => {
if !state.streams.is_empty() {
state.active_stream = (state.active_stream + 1) % state.streams.len();
Some(LogCorrelationOutput::StreamFocused(state.active_stream))
} else {
None
}
}
LogCorrelationMessage::FocusPrevStream => {
if !state.streams.is_empty() {
state.active_stream = if state.active_stream == 0 {
state.streams.len() - 1
} else {
state.active_stream - 1
};
Some(LogCorrelationOutput::StreamFocused(state.active_stream))
} else {
None
}
}
LogCorrelationMessage::ToggleSyncScroll => {
state.sync_scroll = !state.sync_scroll;
None
}
}
}
fn view(state: &Self::State, ctx: &mut RenderContext<'_, '_>) {
render::render(
state,
ctx.frame,
ctx.area,
ctx.theme,
ctx.focused,
ctx.disabled,
);
}
}
impl LogCorrelation {
fn update_scroll_timestamp(state: &mut LogCorrelationState) {
let rows = state.aligned_rows();
let offset = state.scroll.offset();
let mut row_index = 0;
for row in &rows {
let max_entries = row
.stream_entries
.iter()
.map(|indices| indices.len().max(1))
.max()
.unwrap_or(1);
if row_index + max_entries > offset {
state.scroll_timestamp = row.timestamp;
return;
}
row_index += max_entries;
}
if let Some(last) = rows.last() {
state.scroll_timestamp = last.timestamp;
}
}
}
#[cfg(test)]
mod snapshot_tests;
#[cfg(test)]
mod tests;