Skip to main content

xs/nu/commands/
cat_stream_command.rs

1use nu_engine::CallExt;
2use nu_protocol::engine::{Call, Command, EngineState, Stack};
3use nu_protocol::{
4    Category, ListStream, PipelineData, ShellError, Signals, Signature, SyntaxShape, Type, Value,
5};
6use std::time::Duration;
7
8use crate::store::{FollowOption, ReadOptions, Store};
9
10#[derive(Clone)]
11pub struct CatStreamCommand {
12    store: Store,
13}
14
15impl CatStreamCommand {
16    pub fn new(store: Store) -> Self {
17        Self { store }
18    }
19}
20
21impl Command for CatStreamCommand {
22    fn name(&self) -> &str {
23        ".cat"
24    }
25
26    fn signature(&self) -> Signature {
27        Signature::build(".cat")
28            .input_output_types(vec![(Type::Nothing, Type::Any)])
29            .switch("follow", "long poll for new events", Some('f'))
30            .named(
31                "pulse",
32                SyntaxShape::Int,
33                "interval in ms for synthetic xs.pulse events",
34                Some('p'),
35            )
36            .switch("new", "skip existing, only show new", Some('n'))
37            .switch("detail", "include all frame fields", Some('d'))
38            .named(
39                "limit",
40                SyntaxShape::Int,
41                "limit the number of frames to retrieve",
42                None,
43            )
44            .named(
45                "after",
46                SyntaxShape::String,
47                "start after a specific frame ID (exclusive)",
48                Some('a'),
49            )
50            .named(
51                "from",
52                SyntaxShape::String,
53                "start from a specific frame ID (inclusive)",
54                None,
55            )
56            .named(
57                "last",
58                SyntaxShape::Int,
59                "return the N most recent frames",
60                None,
61            )
62            .named("topic", SyntaxShape::String, "filter by topic", Some('T'))
63            .switch(
64                "with-timestamp",
65                "include timestamp extracted from frame ID",
66                None,
67            )
68            .category(Category::Experimental)
69    }
70
71    fn description(&self) -> &str {
72        "Reads the event stream and returns frames (streaming version)"
73    }
74
75    fn run(
76        &self,
77        engine_state: &EngineState,
78        stack: &mut Stack,
79        call: &Call,
80        _input: PipelineData,
81    ) -> Result<PipelineData, ShellError> {
82        let follow = call.has_flag(engine_state, stack, "follow")?;
83        let pulse: Option<i64> = call.get_flag(engine_state, stack, "pulse")?;
84        let new = call.has_flag(engine_state, stack, "new")?;
85        let detail = call.has_flag(engine_state, stack, "detail")?;
86        let with_timestamp = call.has_flag(engine_state, stack, "with-timestamp")?;
87        let limit: Option<i64> = call.get_flag(engine_state, stack, "limit")?;
88        let last: Option<i64> = call.get_flag(engine_state, stack, "last")?;
89        let after: Option<String> = call.get_flag(engine_state, stack, "after")?;
90        let from: Option<String> = call.get_flag(engine_state, stack, "from")?;
91        let topic: Option<String> = call.get_flag(engine_state, stack, "topic")?;
92
93        // Helper to parse Scru128Id
94        let parse_id = |s: &str, name: &str| -> Result<scru128::Scru128Id, ShellError> {
95            s.parse().map_err(|e| ShellError::GenericError {
96                error: format!("Invalid {name}"),
97                msg: format!("Failed to parse Scru128Id: {e}"),
98                span: Some(call.head),
99                help: None,
100                inner: vec![],
101            })
102        };
103
104        let after: Option<scru128::Scru128Id> =
105            after.as_deref().map(|s| parse_id(s, "after")).transpose()?;
106        let from: Option<scru128::Scru128Id> =
107            from.as_deref().map(|s| parse_id(s, "from")).transpose()?;
108
109        // Build ReadOptions
110        let options = ReadOptions::builder()
111            .follow(if let Some(pulse_ms) = pulse {
112                FollowOption::WithHeartbeat(Duration::from_millis(pulse_ms as u64))
113            } else if follow {
114                FollowOption::On
115            } else {
116                FollowOption::Off
117            })
118            .new(new)
119            .maybe_after(after)
120            .maybe_from(from)
121            .maybe_limit(limit.map(|l| l as usize))
122            .maybe_last(last.map(|l| l as usize))
123            .maybe_topic(topic.clone())
124            .build();
125
126        let store = self.store.clone();
127        let span = call.head;
128        let signals = engine_state.signals().clone();
129
130        // Create channel for async -> sync bridge
131        let (tx, rx) = std::sync::mpsc::channel();
132
133        // Spawn thread to handle async store.read()
134        std::thread::spawn(move || {
135            let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
136            rt.block_on(async move {
137                let mut receiver = store.read(options).await;
138
139                while let Some(frame) = receiver.recv().await {
140                    // Convert frame to Nu value
141                    let mut value = crate::nu::util::frame_to_value(&frame, span, with_timestamp);
142
143                    // Filter fields if not --detail
144                    if !detail {
145                        value = match value {
146                            Value::Record { val, .. } => {
147                                let mut filtered = val.into_owned();
148                                filtered.remove("ttl");
149                                Value::record(filtered, span)
150                            }
151                            v => v,
152                        };
153                    }
154
155                    if tx.send(value).is_err() {
156                        break;
157                    }
158                }
159            });
160        });
161
162        // Create ListStream from channel with signal-aware polling
163        let stream = ListStream::new(
164            std::iter::from_fn(move || {
165                use std::sync::mpsc::RecvTimeoutError;
166                loop {
167                    if signals.interrupted() {
168                        return None;
169                    }
170                    match rx.recv_timeout(Duration::from_millis(100)) {
171                        Ok(value) => return Some(value),
172                        Err(RecvTimeoutError::Timeout) => continue,
173                        Err(RecvTimeoutError::Disconnected) => return None,
174                    }
175                }
176            }),
177            span,
178            Signals::empty(),
179        );
180
181        Ok(PipelineData::ListStream(stream, None))
182    }
183}