use super::super::{
config::ExploreConfig,
nu_common::{NuText, run_command_with_value},
pager::{
Frame, Transition, ViewInfo,
report::{Report, Severity},
},
views::{Layout, Orientation, Preview, RecordView, View, ViewConfig},
};
use super::ViewCommand;
use anyhow::Result;
use crossterm::event::{KeyCode, KeyEvent};
use nu_engine::get_columns;
use nu_protocol::{
PipelineData, Value,
engine::{EngineState, Stack},
};
use ratatui::layout::Rect;
use std::sync::mpsc::{self, Receiver, TryRecvError};
use std::thread::{self, JoinHandle};
#[derive(Debug, Default, Clone)]
pub struct NuCmd {
command: String,
}
impl NuCmd {
pub fn new() -> Self {
Self {
command: String::new(),
}
}
pub const NAME: &'static str = "nu";
}
impl ViewCommand for NuCmd {
type View = NuView;
fn name(&self) -> &'static str {
Self::NAME
}
fn description(&self) -> &'static str {
""
}
fn parse(&mut self, args: &str) -> Result<()> {
args.trim().clone_into(&mut self.command);
Ok(())
}
fn spawn(
&mut self,
engine_state: &EngineState,
stack: &mut Stack,
value: Option<Value>,
config: &ViewConfig,
) -> Result<Self::View> {
let value = value.unwrap_or_default();
let engine_state = engine_state.clone();
let mut stack = stack.clone();
let command = self.command.clone();
let explore_config = config.explore_config.clone();
let (sender, receiver) = mpsc::channel();
let handle = thread::spawn(move || {
stream_command(
&command,
&value,
&engine_state,
&mut stack,
&explore_config,
sender,
);
});
Ok(NuView {
state: ViewState::Loading,
receiver: Some(receiver),
_handle: Some(handle),
command_text: self.command.clone(),
explore_config: config.explore_config.clone(),
frame_count: 0,
columns: Vec::new(),
rows: Vec::new(),
is_record: false,
stream_done: false,
last_error: None,
last_row_count: 0,
})
}
}
enum StreamMessage {
Columns(Vec<String>),
Rows(Vec<Vec<Value>>),
IsRecord,
SimpleValue(String),
Done,
Error(String),
}
fn stream_command(
command: &str,
value: &Value,
engine_state: &EngineState,
stack: &mut Stack,
_explore_config: &ExploreConfig,
sender: mpsc::Sender<StreamMessage>,
) {
let pipeline = match run_command_with_value(command, value, engine_state, stack) {
Ok(p) => p,
Err(e) => {
let _ = sender.send(StreamMessage::Error(format!("Command failed: {e}")));
return;
}
};
match pipeline {
PipelineData::Empty => {
let _ = sender.send(StreamMessage::Done);
}
PipelineData::Value(Value::Record { val, .. }, ..) => {
let _ = sender.send(StreamMessage::IsRecord);
let (cols, vals): (Vec<_>, Vec<_>) = val.into_owned().into_iter().unzip();
let _ = sender.send(StreamMessage::Columns(cols));
if !vals.is_empty() {
let _ = sender.send(StreamMessage::Rows(vec![vals]));
}
let _ = sender.send(StreamMessage::Done);
}
PipelineData::Value(Value::List { vals, .. }, ..) => {
stream_values(vals.into_iter(), &sender);
}
PipelineData::Value(Value::String { val, .. }, ..) => {
let _ = sender.send(StreamMessage::SimpleValue(val));
}
PipelineData::Value(value, ..) => {
let text = value.to_abbreviated_string(&engine_state.config);
let _ = sender.send(StreamMessage::SimpleValue(text));
}
PipelineData::ListStream(stream, ..) => {
stream_values(stream.into_iter(), &sender);
}
PipelineData::ByteStream(stream, ..) => {
let span = stream.span();
match stream.into_string() {
Ok(text) => {
let _ = sender.send(StreamMessage::SimpleValue(text));
}
Err(e) => {
let _ = sender.send(StreamMessage::Error(format!(
"Failed to read stream: {}",
Value::error(e, span).to_debug_string()
)));
}
}
}
}
}
fn stream_values<I>(iter: I, sender: &mpsc::Sender<StreamMessage>)
where
I: Iterator<Item = Value>,
{
const BATCH_SIZE: usize = 1;
const INITIAL_ROWS_FOR_COLUMNS: usize = 1;
let mut columns: Option<Vec<String>> = None;
let mut batch: Vec<Vec<Value>> = Vec::with_capacity(BATCH_SIZE);
let mut initial_values: Vec<Value> = Vec::new();
for value in iter {
if columns.is_none() {
initial_values.push(value);
if initial_values.len() >= INITIAL_ROWS_FOR_COLUMNS {
let cols = get_columns(&initial_values);
if !cols.is_empty() {
let _ = sender.send(StreamMessage::Columns(cols.clone()));
}
let rows: Vec<Vec<Value>> = initial_values
.drain(..)
.map(|v| value_to_row(&cols, &v))
.collect();
if sender.send(StreamMessage::Rows(rows)).is_err() {
return; }
columns = Some(cols);
}
} else if let Some(ref cols) = columns {
batch.push(value_to_row(cols, &value));
if batch.len() >= BATCH_SIZE {
if sender
.send(StreamMessage::Rows(std::mem::take(&mut batch)))
.is_err()
{
return; }
batch = Vec::with_capacity(BATCH_SIZE);
}
}
}
if columns.is_none() && !initial_values.is_empty() {
let cols = get_columns(&initial_values);
if !cols.is_empty() {
let _ = sender.send(StreamMessage::Columns(cols.clone()));
}
let rows: Vec<Vec<Value>> = initial_values
.drain(..)
.map(|v| value_to_row(&cols, &v))
.collect();
let _ = sender.send(StreamMessage::Rows(rows));
}
if !batch.is_empty() {
let _ = sender.send(StreamMessage::Rows(batch));
}
let _ = sender.send(StreamMessage::Done);
}
fn value_to_row(cols: &[String], value: &Value) -> Vec<Value> {
if cols.is_empty() {
vec![value.clone()]
} else if let Value::Record { val, .. } = value {
cols.iter()
.map(|col| val.get(col).cloned().unwrap_or_default())
.collect()
} else {
let mut row = vec![Value::default(); cols.len()];
if !row.is_empty() {
row[0] = value.clone();
}
row
}
}
enum ViewState {
Loading,
Records(Box<RecordView>),
Preview(Preview),
Empty,
}
pub struct NuView {
state: ViewState,
receiver: Option<Receiver<StreamMessage>>,
_handle: Option<JoinHandle<()>>,
command_text: String,
explore_config: ExploreConfig,
frame_count: usize,
columns: Vec<String>,
rows: Vec<Vec<Value>>,
is_record: bool,
stream_done: bool,
last_error: Option<String>,
last_row_count: usize,
}
impl NuView {
fn process_messages(&mut self) {
let receiver = match self.receiver.take() {
Some(r) => r,
None => return,
};
let mut should_update_view = false;
loop {
match receiver.try_recv() {
Ok(StreamMessage::Columns(cols)) => {
self.columns = cols;
}
Ok(StreamMessage::Rows(new_rows)) => {
self.rows.extend(new_rows);
should_update_view = true;
}
Ok(StreamMessage::IsRecord) => {
self.is_record = true;
}
Ok(StreamMessage::SimpleValue(text)) => {
self.state = ViewState::Preview(Preview::new(&text));
self.stream_done = true;
return;
}
Ok(StreamMessage::Done) => {
self.stream_done = true;
if self.rows.is_empty()
&& !matches!(self.state, ViewState::Records(_) | ViewState::Preview(_))
{
self.state = ViewState::Empty;
}
if should_update_view {
self.update_record_view();
}
return;
}
Ok(StreamMessage::Error(e)) => {
self.last_error = Some(e);
if !matches!(self.state, ViewState::Records(_) | ViewState::Preview(_)) {
self.state = ViewState::Empty;
}
self.stream_done = true;
return;
}
Err(TryRecvError::Empty) => {
self.receiver = Some(receiver);
if should_update_view {
self.update_record_view();
}
return;
}
Err(TryRecvError::Disconnected) => {
self.stream_done = true;
if self.rows.is_empty()
&& !matches!(self.state, ViewState::Records(_) | ViewState::Preview(_))
{
self.state = ViewState::Empty;
}
if should_update_view {
self.update_record_view();
}
return;
}
}
}
}
fn update_record_view(&mut self) {
if self.rows.is_empty() {
return;
}
let cols = if self.columns.is_empty() {
vec![String::new()]
} else {
self.columns.clone()
};
match &mut self.state {
ViewState::Records(existing_view) => {
let layer = existing_view.get_top_layer_mut();
layer.record_values = self.rows.clone();
let _ = layer.cursor.y.view.set_size(layer.record_values.len());
let _ = layer.cursor.x.view.set_size(layer.column_names.len());
layer.record_text = None;
}
_ => {
let mut view =
RecordView::new(cols, self.rows.clone(), self.explore_config.clone());
if self.is_record {
view.set_top_layer_orientation(Orientation::Left);
}
self.state = ViewState::Records(Box::new(view));
}
}
}
fn spinner_char(&self) -> char {
const SPINNER: &[char] = &['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'];
SPINNER[self.frame_count % SPINNER.len()]
}
fn row_count(&self) -> usize {
self.rows.len()
}
fn is_streaming(&self) -> bool {
!self.stream_done
}
fn non_interactive_transition(key: KeyEvent) -> Transition {
match key.code {
KeyCode::Char('q') | KeyCode::Esc => Transition::Exit,
_ => Transition::None,
}
}
}
impl View for NuView {
fn draw(&mut self, f: &mut Frame, area: Rect, cfg: ViewConfig<'_>, layout: &mut Layout) {
self.frame_count = self.frame_count.wrapping_add(1);
if self.rows.len() > self.last_row_count {
self.last_row_count = self.rows.len();
if let ViewState::Records(view) = &mut self.state {
view.tail(area.width, area.height);
}
}
match &mut self.state {
ViewState::Loading => {
}
ViewState::Records(view) => {
view.draw(f, area, cfg, layout);
}
ViewState::Preview(view) => {
view.draw(f, area, cfg, layout);
}
ViewState::Empty => {
}
}
}
fn handle_input(
&mut self,
engine_state: &EngineState,
stack: &mut Stack,
layout: &Layout,
info: &mut ViewInfo,
key: KeyEvent,
) -> Transition {
match &mut self.state {
ViewState::Records(view) => view.handle_input(engine_state, stack, layout, info, key),
ViewState::Preview(view) => view.handle_input(engine_state, stack, layout, info, key),
_ => Self::non_interactive_transition(key),
}
}
fn update(&mut self, info: &mut ViewInfo) -> bool {
self.process_messages();
if let Some(message) = &self.last_error {
info.status = Some(Report::error(message.clone()));
return false;
}
match &self.state {
ViewState::Loading => {
let spinner = self.spinner_char();
let msg = format!("{} Running: {}", spinner, self.command_text);
info.status = Some(Report::message(msg, Severity::Info));
true }
ViewState::Records(_) => {
let row_count = self.row_count();
if self.is_streaming() {
let spinner = self.spinner_char();
let msg = format!("{} Streaming: {} rows", spinner, row_count);
info.status = Some(Report::message(msg, Severity::Info));
true } else {
info.status = Some(Report::new(
format!("{} rows", row_count),
Severity::Info,
String::new(),
String::new(),
String::new(),
));
false }
}
ViewState::Preview(_) => {
info.status = Some(Report::message("Preview", Severity::Info));
false }
ViewState::Empty => {
info.status = Some(Report::message("No output", Severity::Info));
false }
}
}
fn show_data(&mut self, i: usize) -> bool {
match &mut self.state {
ViewState::Records(view) => view.show_data(i),
ViewState::Preview(view) => view.show_data(i),
_ => false,
}
}
fn collect_data(&self) -> Vec<NuText> {
match &self.state {
ViewState::Records(view) => view.collect_data(),
ViewState::Preview(view) => view.collect_data(),
_ => Vec::new(),
}
}
fn exit(&mut self) -> Option<Value> {
match &mut self.state {
ViewState::Records(view) => view.exit(),
ViewState::Preview(view) => view.exit(),
_ => None,
}
}
}
#[cfg(test)]
mod test {
use super::*;
use crossterm::event::KeyModifiers;
#[test]
fn non_interactive_states_exit_on_q_or_esc() {
let key_q = KeyEvent::new(KeyCode::Char('q'), KeyModifiers::NONE);
let key_esc = KeyEvent::new(KeyCode::Esc, KeyModifiers::NONE);
assert!(matches!(
NuView::non_interactive_transition(key_q),
Transition::Exit
));
assert!(matches!(
NuView::non_interactive_transition(key_esc),
Transition::Exit
));
}
#[test]
fn non_interactive_states_ignore_other_keys() {
let key = KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE);
assert!(matches!(
NuView::non_interactive_transition(key),
Transition::None
));
}
#[test]
fn stream_error_is_reported_without_error_view() {
let (sender, receiver) = mpsc::channel();
sender
.send(StreamMessage::Error(String::from("Command failed")))
.expect("send error message");
let mut view = NuView {
state: ViewState::Loading,
receiver: Some(receiver),
_handle: None,
command_text: String::new(),
explore_config: ExploreConfig::default(),
frame_count: 0,
columns: Vec::new(),
rows: Vec::new(),
is_record: false,
stream_done: false,
last_error: None,
last_row_count: 0,
};
view.process_messages();
assert!(matches!(view.state, ViewState::Empty));
assert!(view.stream_done);
assert_eq!(view.last_error.as_deref(), Some("Command failed"));
}
#[test]
fn update_prioritizes_error_status() {
let mut view = NuView {
state: ViewState::Empty,
receiver: None,
_handle: None,
command_text: String::new(),
explore_config: ExploreConfig::default(),
frame_count: 0,
columns: Vec::new(),
rows: Vec::new(),
is_record: false,
stream_done: true,
last_error: Some(String::from("stream failed")),
last_row_count: 0,
};
let mut info = ViewInfo::default();
let keep_polling = view.update(&mut info);
assert!(!keep_polling);
let status = info.status.expect("status to be set");
assert!(matches!(status.level, Severity::Err));
assert_eq!(status.message, "stream failed");
}
}