xs/nu/commands/
cat_stream_command.rs

1use nu_engine::CallExt;
2use nu_protocol::engine::{Call, Command, EngineState, Stack};
3use nu_protocol::{
4    Category, ListStream, PipelineData, ShellError, Signals, Signature, SyntaxShape, Type, Value,
5};
6use std::time::Duration;
7
8use crate::store::{FollowOption, ReadOptions, Store};
9
10#[derive(Clone)]
11pub struct CatStreamCommand {
12    store: Store,
13    context_id: scru128::Scru128Id,
14}
15
16impl CatStreamCommand {
17    pub fn new(store: Store, context_id: scru128::Scru128Id) -> Self {
18        Self { store, context_id }
19    }
20}
21
22impl Command for CatStreamCommand {
23    fn name(&self) -> &str {
24        ".cat"
25    }
26
27    fn signature(&self) -> Signature {
28        Signature::build(".cat")
29            .input_output_types(vec![(Type::Nothing, Type::Any)])
30            .switch("follow", "long poll for new events", Some('f'))
31            .named(
32                "pulse",
33                SyntaxShape::Int,
34                "interval in ms for synthetic xs.pulse events",
35                Some('p'),
36            )
37            .switch("tail", "start at end of stream", Some('t'))
38            .switch("detail", "include all frame fields", Some('d'))
39            .switch("all", "read across all contexts", Some('a'))
40            .named(
41                "limit",
42                SyntaxShape::Int,
43                "limit the number of frames to retrieve",
44                None,
45            )
46            .named(
47                "last-id",
48                SyntaxShape::String,
49                "start from a specific frame ID",
50                None,
51            )
52            .named("topic", SyntaxShape::String, "filter by topic", Some('T'))
53            .category(Category::Experimental)
54    }
55
56    fn description(&self) -> &str {
57        "Reads the event stream and returns frames (streaming version)"
58    }
59
60    fn run(
61        &self,
62        engine_state: &EngineState,
63        stack: &mut Stack,
64        call: &Call,
65        _input: PipelineData,
66    ) -> Result<PipelineData, ShellError> {
67        let follow = call.has_flag(engine_state, stack, "follow")?;
68        let pulse: Option<i64> = call.get_flag(engine_state, stack, "pulse")?;
69        let tail = call.has_flag(engine_state, stack, "tail")?;
70        let detail = call.has_flag(engine_state, stack, "detail")?;
71        let all = call.has_flag(engine_state, stack, "all")?;
72        let limit: Option<i64> = call.get_flag(engine_state, stack, "limit")?;
73        let last_id: Option<String> = call.get_flag(engine_state, stack, "last-id")?;
74        let topic: Option<String> = call.get_flag(engine_state, stack, "topic")?;
75
76        // Parse last_id
77        let last_id: Option<scru128::Scru128Id> = last_id
78            .as_deref()
79            .map(|s| {
80                s.parse().map_err(|e| ShellError::GenericError {
81                    error: "Invalid last-id".into(),
82                    msg: format!("Failed to parse Scru128Id: {e}"),
83                    span: Some(call.head),
84                    help: None,
85                    inner: vec![],
86                })
87            })
88            .transpose()?;
89
90        // For non-follow mode, always use async path for consistency
91        // The store.read() will handle topic filtering correctly
92
93        // Build ReadOptions for async path (follow mode or no topic filter)
94        let options = ReadOptions::builder()
95            .follow(if let Some(pulse_ms) = pulse {
96                FollowOption::WithHeartbeat(Duration::from_millis(pulse_ms as u64))
97            } else if follow {
98                FollowOption::On
99            } else {
100                FollowOption::Off
101            })
102            .tail(tail)
103            .maybe_last_id(last_id)
104            .maybe_limit(limit.map(|l| l as usize))
105            .maybe_context_id(if all { None } else { Some(self.context_id) })
106            .maybe_topic(topic.clone())
107            .build();
108
109        let store = self.store.clone();
110        let span = call.head;
111        let signals = engine_state.signals().clone();
112
113        // Create channel for async -> sync bridge
114        let (tx, rx) = std::sync::mpsc::channel();
115
116        // Spawn thread to handle async store.read()
117        std::thread::spawn(move || {
118            let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
119            rt.block_on(async move {
120                let mut receiver = store.read(options).await;
121
122                while let Some(frame) = receiver.recv().await {
123                    // Convert frame to Nu value
124                    let mut value = crate::nu::util::frame_to_value(&frame, span);
125
126                    // Filter fields if not --detail
127                    if !detail {
128                        value = match value {
129                            Value::Record { val, .. } => {
130                                let mut filtered = val.into_owned();
131                                filtered.remove("context_id");
132                                filtered.remove("ttl");
133                                Value::record(filtered, span)
134                            }
135                            v => v,
136                        };
137                    }
138
139                    if tx.send(value).is_err() {
140                        break;
141                    }
142                }
143            });
144        });
145
146        // Create ListStream from channel with signal-aware polling
147        let stream = ListStream::new(
148            std::iter::from_fn(move || {
149                use std::sync::mpsc::RecvTimeoutError;
150                loop {
151                    if signals.interrupted() {
152                        return None;
153                    }
154                    match rx.recv_timeout(Duration::from_millis(100)) {
155                        Ok(value) => return Some(value),
156                        Err(RecvTimeoutError::Timeout) => continue,
157                        Err(RecvTimeoutError::Disconnected) => return None,
158                    }
159                }
160            }),
161            span,
162            Signals::empty(),
163        );
164
165        Ok(PipelineData::ListStream(stream, None))
166    }
167}