Skip to main content

xs/nu/commands/
cat_stream_command.rs

1use nu_engine::CallExt;
2use nu_protocol::engine::{Call, Command, EngineState, Stack};
3use nu_protocol::shell_error::generic::GenericError;
4use nu_protocol::{
5    Category, ListStream, PipelineData, ShellError, Signals, Signature, SyntaxShape, Type, Value,
6};
7use std::time::Duration;
8
9use crate::store::{FollowOption, ReadOptions, Store};
10
11#[derive(Clone)]
12pub struct CatStreamCommand {
13    store: Store,
14}
15
16impl CatStreamCommand {
17    pub fn new(store: Store) -> Self {
18        Self { store }
19    }
20}
21
22impl Command for CatStreamCommand {
23    fn name(&self) -> &str {
24        ".cat"
25    }
26
27    fn signature(&self) -> Signature {
28        Signature::build(".cat")
29            .input_output_types(vec![(Type::Nothing, Type::Any)])
30            .switch("follow", "long poll for new events", Some('f'))
31            .named(
32                "pulse",
33                SyntaxShape::Int,
34                "interval in ms for synthetic xs.pulse events",
35                Some('p'),
36            )
37            .switch("new", "skip existing, only show new", Some('n'))
38            .switch("detail", "include all frame fields", Some('d'))
39            .named(
40                "limit",
41                SyntaxShape::Int,
42                "limit the number of frames to retrieve",
43                None,
44            )
45            .named(
46                "after",
47                SyntaxShape::String,
48                "start after a specific frame ID (exclusive)",
49                Some('a'),
50            )
51            .named(
52                "from",
53                SyntaxShape::String,
54                "start from a specific frame ID (inclusive)",
55                None,
56            )
57            .named(
58                "last",
59                SyntaxShape::Int,
60                "return the N most recent frames",
61                None,
62            )
63            .named(
64                "topic",
65                SyntaxShape::OneOf(vec![
66                    SyntaxShape::String,
67                    SyntaxShape::List(Box::new(SyntaxShape::String)),
68                ]),
69                "filter by topic pattern(s): string (commas allowed) or list",
70                Some('T'),
71            )
72            .switch(
73                "with-timestamp",
74                "include timestamp extracted from frame ID",
75                None,
76            )
77            .category(Category::Experimental)
78    }
79
80    fn description(&self) -> &str {
81        "Reads the event stream and returns frames (streaming version)"
82    }
83
84    fn run(
85        &self,
86        engine_state: &EngineState,
87        stack: &mut Stack,
88        call: &Call,
89        _input: PipelineData,
90    ) -> Result<PipelineData, ShellError> {
91        let follow = call.has_flag(engine_state, stack, "follow")?;
92        let pulse: Option<i64> = call.get_flag(engine_state, stack, "pulse")?;
93        let new = call.has_flag(engine_state, stack, "new")?;
94        let detail = call.has_flag(engine_state, stack, "detail")?;
95        let with_timestamp = call.has_flag(engine_state, stack, "with-timestamp")?;
96        let limit: Option<i64> = call.get_flag(engine_state, stack, "limit")?;
97        let last: Option<i64> = call.get_flag(engine_state, stack, "last")?;
98        let after: Option<String> = call.get_flag(engine_state, stack, "after")?;
99        let from: Option<String> = call.get_flag(engine_state, stack, "from")?;
100        let topic: Option<nu_protocol::Value> = call.get_flag(engine_state, stack, "topic")?;
101        let topic: Option<String> = topic
102            .map(crate::nu::util::topic_value_to_string)
103            .transpose()?;
104
105        // Helper to parse Scru128Id
106        let parse_id = |s: &str, name: &str| -> Result<scru128::Scru128Id, ShellError> {
107            s.parse().map_err(|e| {
108                ShellError::Generic(GenericError::new(
109                    format!("Invalid {name}"),
110                    format!("Failed to parse Scru128Id: {e}"),
111                    call.head,
112                ))
113            })
114        };
115
116        let after: Option<scru128::Scru128Id> =
117            after.as_deref().map(|s| parse_id(s, "after")).transpose()?;
118        let from: Option<scru128::Scru128Id> =
119            from.as_deref().map(|s| parse_id(s, "from")).transpose()?;
120
121        // Build ReadOptions
122        let options = ReadOptions::builder()
123            .follow(if let Some(pulse_ms) = pulse {
124                FollowOption::WithHeartbeat(Duration::from_millis(pulse_ms as u64))
125            } else if follow {
126                FollowOption::On
127            } else {
128                FollowOption::Off
129            })
130            .new(new)
131            .maybe_after(after)
132            .maybe_from(from)
133            .maybe_limit(limit.map(|l| l as usize))
134            .maybe_last(last.map(|l| l as usize))
135            .maybe_topic(topic.clone())
136            .build();
137
138        let store = self.store.clone();
139        let span = call.head;
140        let signals = engine_state.signals().clone();
141
142        // Create channel for async -> sync bridge
143        let (tx, rx) = std::sync::mpsc::channel();
144
145        // Spawn thread to handle async store.read()
146        std::thread::spawn(move || {
147            let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
148            rt.block_on(async move {
149                let mut receiver = store.read(options).await;
150
151                while let Some(frame) = receiver.recv().await {
152                    // Convert frame to Nu value
153                    let mut value = crate::nu::util::frame_to_value(&frame, span, with_timestamp);
154
155                    // Filter fields if not --detail
156                    if !detail {
157                        value = match value {
158                            Value::Record { val, .. } => {
159                                let mut filtered = val.into_owned();
160                                filtered.remove("ttl");
161                                Value::record(filtered, span)
162                            }
163                            v => v,
164                        };
165                    }
166
167                    if tx.send(value).is_err() {
168                        break;
169                    }
170                }
171            });
172        });
173
174        // Create ListStream from channel with signal-aware polling
175        let stream = ListStream::new(
176            std::iter::from_fn(move || {
177                use std::sync::mpsc::RecvTimeoutError;
178                loop {
179                    if signals.interrupted() {
180                        return None;
181                    }
182                    match rx.recv_timeout(Duration::from_millis(100)) {
183                        Ok(value) => return Some(value),
184                        Err(RecvTimeoutError::Timeout) => continue,
185                        Err(RecvTimeoutError::Disconnected) => return None,
186                    }
187                }
188            }),
189            span,
190            Signals::empty(),
191        );
192
193        Ok(PipelineData::ListStream(stream, None))
194    }
195}