xs/nu/commands/
cat_command.rs1use nu_engine::CallExt;
2use nu_protocol::engine::{Call, Command, EngineState, Stack};
3use nu_protocol::{Category, PipelineData, ShellError, Signature, SyntaxShape, Type};
4
5use crate::store::Store;
6
7#[derive(Clone)]
8pub struct CatCommand {
9 store: Store,
10}
11
12impl CatCommand {
13 pub fn new(store: Store) -> Self {
14 Self { store }
15 }
16}
17
18impl Command for CatCommand {
19 fn name(&self) -> &str {
20 ".cat"
21 }
22
23 fn signature(&self) -> Signature {
24 Signature::build(".cat")
25 .input_output_types(vec![(Type::Nothing, Type::Any)])
26 .named(
27 "limit",
28 SyntaxShape::Int,
29 "limit the number of frames to retrieve",
30 None,
31 )
32 .named(
33 "after",
34 SyntaxShape::String,
35 "start after a specific frame ID (exclusive)",
36 Some('a'),
37 )
38 .named("topic", SyntaxShape::String, "filter by topic", Some('T'))
39 .category(Category::Experimental)
40 }
41
42 fn description(&self) -> &str {
43 "Reads the event stream and returns frames"
44 }
45
46 fn run(
47 &self,
48 engine_state: &EngineState,
49 stack: &mut Stack,
50 call: &Call,
51 _input: PipelineData,
52 ) -> Result<PipelineData, ShellError> {
53 let limit: Option<usize> = call.get_flag(engine_state, stack, "limit")?;
54
55 let after: Option<String> = call.get_flag(engine_state, stack, "after")?;
56 let after: Option<scru128::Scru128Id> = after
57 .as_deref()
58 .map(|s| s.parse().expect("Failed to parse Scru128Id"));
59
60 let topic: Option<String> = call.get_flag(engine_state, stack, "topic")?;
61
62 let frames = self
63 .store
64 .read_sync(after.as_ref(), None)
65 .filter(|frame| match &topic {
66 Some(t) => frame.topic == *t,
67 None => true,
68 })
69 .take(limit.unwrap_or(usize::MAX))
70 .collect::<Vec<_>>();
71
72 use nu_protocol::Value;
73
74 let output = Value::list(
75 frames
76 .into_iter()
77 .map(|frame| crate::nu::util::frame_to_value(&frame, call.head))
78 .collect(),
79 call.head,
80 );
81
82 Ok(PipelineData::Value(output, None))
83 }
84}