use nu_engine::CallExt;
use nu_protocol::engine::{Call, Command, EngineState, Stack};
use nu_protocol::shell_error::generic::GenericError;
use nu_protocol::{Category, PipelineData, ShellError, Signature, SyntaxShape, Type};
use crate::store::{ReadOptions, Store};
#[derive(Clone)]
pub struct CatCommand {
store: Store,
}
impl CatCommand {
pub fn new(store: Store) -> Self {
Self { store }
}
}
impl Command for CatCommand {
fn name(&self) -> &str {
".cat"
}
fn signature(&self) -> Signature {
Signature::build(".cat")
.input_output_types(vec![(Type::Nothing, Type::Any)])
.named(
"limit",
SyntaxShape::Int,
"limit the number of frames to retrieve",
None,
)
.named(
"after",
SyntaxShape::String,
"start after a specific frame ID (exclusive)",
Some('a'),
)
.named(
"from",
SyntaxShape::String,
"start from a specific frame ID (inclusive)",
None,
)
.named(
"last",
SyntaxShape::Int,
"return the N most recent frames",
None,
)
.named("topic", SyntaxShape::String, "filter by topic", Some('T'))
.switch(
"with-timestamp",
"include timestamp extracted from frame ID",
None,
)
.category(Category::Experimental)
}
fn description(&self) -> &str {
"Reads the event stream and returns frames"
}
fn run(
&self,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
_input: PipelineData,
) -> Result<PipelineData, ShellError> {
let limit: Option<usize> = call.get_flag(engine_state, stack, "limit")?;
let last: Option<usize> = call.get_flag(engine_state, stack, "last")?;
let after: Option<String> = call.get_flag(engine_state, stack, "after")?;
let from: Option<String> = call.get_flag(engine_state, stack, "from")?;
let topic: Option<String> = call.get_flag(engine_state, stack, "topic")?;
let with_timestamp = call.has_flag(engine_state, stack, "with-timestamp")?;
let parse_id = |s: &str, name: &str| -> Result<scru128::Scru128Id, ShellError> {
s.parse().map_err(|e| {
ShellError::Generic(GenericError::new(
format!("Invalid {name}"),
format!("Failed to parse Scru128Id: {e}"),
call.head,
))
})
};
let after: Option<scru128::Scru128Id> =
after.as_deref().map(|s| parse_id(s, "after")).transpose()?;
let from: Option<scru128::Scru128Id> =
from.as_deref().map(|s| parse_id(s, "from")).transpose()?;
let options = ReadOptions::builder()
.maybe_after(after)
.maybe_from(from)
.maybe_limit(limit)
.maybe_last(last)
.maybe_topic(topic)
.build();
let frames: Vec<_> = self.store.read_sync(options).collect();
use nu_protocol::Value;
let output = Value::list(
frames
.into_iter()
.map(|frame| crate::nu::util::frame_to_value(&frame, call.head, with_timestamp))
.collect(),
call.head,
);
Ok(PipelineData::Value(output, None))
}
}