xs/nu/commands/
cat_command.rs1use nu_engine::CallExt;
2use nu_protocol::engine::{Call, Command, EngineState, Stack};
3use nu_protocol::{Category, PipelineData, ShellError, Signature, SyntaxShape, Type};
4
5use crate::store::{ReadOptions, Store};
6
7#[derive(Clone)]
8pub struct CatCommand {
9 store: Store,
10}
11
12impl CatCommand {
13 pub fn new(store: Store) -> Self {
14 Self { store }
15 }
16}
17
18impl Command for CatCommand {
19 fn name(&self) -> &str {
20 ".cat"
21 }
22
23 fn signature(&self) -> Signature {
24 Signature::build(".cat")
25 .input_output_types(vec![(Type::Nothing, Type::Any)])
26 .named(
27 "limit",
28 SyntaxShape::Int,
29 "limit the number of frames to retrieve",
30 None,
31 )
32 .named(
33 "after",
34 SyntaxShape::String,
35 "start after a specific frame ID (exclusive)",
36 Some('a'),
37 )
38 .named(
39 "from",
40 SyntaxShape::String,
41 "start from a specific frame ID (inclusive)",
42 None,
43 )
44 .named(
45 "last",
46 SyntaxShape::Int,
47 "return the N most recent frames",
48 None,
49 )
50 .named("topic", SyntaxShape::String, "filter by topic", Some('T'))
51 .switch(
52 "with-timestamp",
53 "include timestamp extracted from frame ID",
54 None,
55 )
56 .category(Category::Experimental)
57 }
58
59 fn description(&self) -> &str {
60 "Reads the event stream and returns frames"
61 }
62
63 fn run(
64 &self,
65 engine_state: &EngineState,
66 stack: &mut Stack,
67 call: &Call,
68 _input: PipelineData,
69 ) -> Result<PipelineData, ShellError> {
70 let limit: Option<usize> = call.get_flag(engine_state, stack, "limit")?;
71 let last: Option<usize> = call.get_flag(engine_state, stack, "last")?;
72 let after: Option<String> = call.get_flag(engine_state, stack, "after")?;
73 let from: Option<String> = call.get_flag(engine_state, stack, "from")?;
74 let topic: Option<String> = call.get_flag(engine_state, stack, "topic")?;
75 let with_timestamp = call.has_flag(engine_state, stack, "with-timestamp")?;
76
77 let parse_id = |s: &str, name: &str| -> Result<scru128::Scru128Id, ShellError> {
79 s.parse().map_err(|e| ShellError::GenericError {
80 error: format!("Invalid {name}"),
81 msg: format!("Failed to parse Scru128Id: {e}"),
82 span: Some(call.head),
83 help: None,
84 inner: vec![],
85 })
86 };
87
88 let after: Option<scru128::Scru128Id> =
89 after.as_deref().map(|s| parse_id(s, "after")).transpose()?;
90 let from: Option<scru128::Scru128Id> =
91 from.as_deref().map(|s| parse_id(s, "from")).transpose()?;
92
93 let options = ReadOptions::builder()
94 .maybe_after(after)
95 .maybe_from(from)
96 .maybe_limit(limit)
97 .maybe_last(last)
98 .maybe_topic(topic)
99 .build();
100
101 let frames: Vec<_> = self.store.read_sync(options).collect();
102
103 use nu_protocol::Value;
104
105 let output = Value::list(
106 frames
107 .into_iter()
108 .map(|frame| crate::nu::util::frame_to_value(&frame, call.head, with_timestamp))
109 .collect(),
110 call.head,
111 );
112
113 Ok(PipelineData::Value(output, None))
114 }
115}