Skip to main content

xs/nu/commands/
cat_command.rs

1use nu_engine::CallExt;
2use nu_protocol::engine::{Call, Command, EngineState, Stack};
3use nu_protocol::{Category, PipelineData, ShellError, Signature, SyntaxShape, Type};
4
5use crate::store::{ReadOptions, Store};
6
7#[derive(Clone)]
8pub struct CatCommand {
9    store: Store,
10}
11
12impl CatCommand {
13    pub fn new(store: Store) -> Self {
14        Self { store }
15    }
16}
17
18impl Command for CatCommand {
19    fn name(&self) -> &str {
20        ".cat"
21    }
22
23    fn signature(&self) -> Signature {
24        Signature::build(".cat")
25            .input_output_types(vec![(Type::Nothing, Type::Any)])
26            .named(
27                "limit",
28                SyntaxShape::Int,
29                "limit the number of frames to retrieve",
30                None,
31            )
32            .named(
33                "after",
34                SyntaxShape::String,
35                "start after a specific frame ID (exclusive)",
36                Some('a'),
37            )
38            .named(
39                "from",
40                SyntaxShape::String,
41                "start from a specific frame ID (inclusive)",
42                None,
43            )
44            .named(
45                "last",
46                SyntaxShape::Int,
47                "return the N most recent frames",
48                None,
49            )
50            .named("topic", SyntaxShape::String, "filter by topic", Some('T'))
51            .switch(
52                "with-timestamp",
53                "include timestamp extracted from frame ID",
54                None,
55            )
56            .category(Category::Experimental)
57    }
58
59    fn description(&self) -> &str {
60        "Reads the event stream and returns frames"
61    }
62
63    fn run(
64        &self,
65        engine_state: &EngineState,
66        stack: &mut Stack,
67        call: &Call,
68        _input: PipelineData,
69    ) -> Result<PipelineData, ShellError> {
70        let limit: Option<usize> = call.get_flag(engine_state, stack, "limit")?;
71        let last: Option<usize> = call.get_flag(engine_state, stack, "last")?;
72        let after: Option<String> = call.get_flag(engine_state, stack, "after")?;
73        let from: Option<String> = call.get_flag(engine_state, stack, "from")?;
74        let topic: Option<String> = call.get_flag(engine_state, stack, "topic")?;
75        let with_timestamp = call.has_flag(engine_state, stack, "with-timestamp")?;
76
77        // Helper to parse Scru128Id
78        let parse_id = |s: &str, name: &str| -> Result<scru128::Scru128Id, ShellError> {
79            s.parse().map_err(|e| ShellError::GenericError {
80                error: format!("Invalid {name}"),
81                msg: format!("Failed to parse Scru128Id: {e}"),
82                span: Some(call.head),
83                help: None,
84                inner: vec![],
85            })
86        };
87
88        let after: Option<scru128::Scru128Id> =
89            after.as_deref().map(|s| parse_id(s, "after")).transpose()?;
90        let from: Option<scru128::Scru128Id> =
91            from.as_deref().map(|s| parse_id(s, "from")).transpose()?;
92
93        let options = ReadOptions::builder()
94            .maybe_after(after)
95            .maybe_from(from)
96            .maybe_limit(limit)
97            .maybe_last(last)
98            .maybe_topic(topic)
99            .build();
100
101        let frames: Vec<_> = self.store.read_sync(options).collect();
102
103        use nu_protocol::Value;
104
105        let output = Value::list(
106            frames
107                .into_iter()
108                .map(|frame| crate::nu::util::frame_to_value(&frame, call.head, with_timestamp))
109                .collect(),
110            call.head,
111        );
112
113        Ok(PipelineData::Value(output, None))
114    }
115}