xs/nu/commands/
cat_command.rs

1use nu_engine::CallExt;
2use nu_protocol::engine::{Call, Command, EngineState, Stack};
3use nu_protocol::{Category, PipelineData, ShellError, Signature, SyntaxShape, Type};
4
5use crate::store::Store;
6
7#[derive(Clone)]
8pub struct CatCommand {
9    store: Store,
10    context_id: scru128::Scru128Id,
11}
12
13impl CatCommand {
14    pub fn new(store: Store, context_id: scru128::Scru128Id) -> Self {
15        Self { store, context_id }
16    }
17}
18
19impl Command for CatCommand {
20    fn name(&self) -> &str {
21        ".cat"
22    }
23
24    fn signature(&self) -> Signature {
25        Signature::build(".cat")
26            .input_output_types(vec![(Type::Nothing, Type::Any)])
27            .named(
28                "limit",
29                SyntaxShape::Int,
30                "limit the number of frames to retrieve",
31                None,
32            )
33            .named(
34                "last-id",
35                SyntaxShape::String,
36                "start from a specific frame ID",
37                None,
38            )
39            .named("topic", SyntaxShape::String, "filter by topic", Some('T'))
40            .category(Category::Experimental)
41    }
42
43    fn description(&self) -> &str {
44        "Reads the event stream and returns frames"
45    }
46
47    fn run(
48        &self,
49        engine_state: &EngineState,
50        stack: &mut Stack,
51        call: &Call,
52        _input: PipelineData,
53    ) -> Result<PipelineData, ShellError> {
54        let limit: Option<usize> = call.get_flag(engine_state, stack, "limit")?;
55
56        let last_id: Option<String> = call.get_flag(engine_state, stack, "last-id")?;
57        let last_id: Option<scru128::Scru128Id> = last_id
58            .as_deref()
59            .map(|s| s.parse().expect("Failed to parse Scru128Id"));
60
61        let topic: Option<String> = call.get_flag(engine_state, stack, "topic")?;
62
63        let frames = self
64            .store
65            .read_sync(last_id.as_ref(), None, Some(self.context_id))
66            .filter(|frame| match &topic {
67                Some(t) => frame.topic == *t,
68                None => true,
69            })
70            .take(limit.unwrap_or(usize::MAX))
71            .collect::<Vec<_>>();
72
73        use nu_protocol::Value;
74
75        let output = Value::list(
76            frames
77                .into_iter()
78                .map(|frame| crate::nu::util::frame_to_value(&frame, call.head))
79                .collect(),
80            call.head,
81        );
82
83        Ok(PipelineData::Value(output, None))
84    }
85}