xs/nu/commands/
cat_stream_command.rs1use nu_engine::CallExt;
2use nu_protocol::engine::{Call, Command, EngineState, Stack};
3use nu_protocol::{
4 Category, ListStream, PipelineData, ShellError, Signals, Signature, SyntaxShape, Type, Value,
5};
6use std::time::Duration;
7
8use crate::store::{FollowOption, ReadOptions, Store};
9
10#[derive(Clone)]
11pub struct CatStreamCommand {
12 store: Store,
13 context_id: scru128::Scru128Id,
14}
15
16impl CatStreamCommand {
17 pub fn new(store: Store, context_id: scru128::Scru128Id) -> Self {
18 Self { store, context_id }
19 }
20}
21
22impl Command for CatStreamCommand {
23 fn name(&self) -> &str {
24 ".cat"
25 }
26
27 fn signature(&self) -> Signature {
28 Signature::build(".cat")
29 .input_output_types(vec![(Type::Nothing, Type::Any)])
30 .switch("follow", "long poll for new events", Some('f'))
31 .named(
32 "pulse",
33 SyntaxShape::Int,
34 "interval in ms for synthetic xs.pulse events",
35 Some('p'),
36 )
37 .switch("tail", "start at end of stream", Some('t'))
38 .switch("detail", "include all frame fields", Some('d'))
39 .switch("all", "read across all contexts", Some('a'))
40 .named(
41 "limit",
42 SyntaxShape::Int,
43 "limit the number of frames to retrieve",
44 None,
45 )
46 .named(
47 "last-id",
48 SyntaxShape::String,
49 "start from a specific frame ID",
50 None,
51 )
52 .named("topic", SyntaxShape::String, "filter by topic", Some('T'))
53 .category(Category::Experimental)
54 }
55
56 fn description(&self) -> &str {
57 "Reads the event stream and returns frames (streaming version)"
58 }
59
60 fn run(
61 &self,
62 engine_state: &EngineState,
63 stack: &mut Stack,
64 call: &Call,
65 _input: PipelineData,
66 ) -> Result<PipelineData, ShellError> {
67 let follow = call.has_flag(engine_state, stack, "follow")?;
68 let pulse: Option<i64> = call.get_flag(engine_state, stack, "pulse")?;
69 let tail = call.has_flag(engine_state, stack, "tail")?;
70 let detail = call.has_flag(engine_state, stack, "detail")?;
71 let all = call.has_flag(engine_state, stack, "all")?;
72 let limit: Option<i64> = call.get_flag(engine_state, stack, "limit")?;
73 let last_id: Option<String> = call.get_flag(engine_state, stack, "last-id")?;
74 let topic: Option<String> = call.get_flag(engine_state, stack, "topic")?;
75
76 let last_id: Option<scru128::Scru128Id> = last_id
78 .as_deref()
79 .map(|s| {
80 s.parse().map_err(|e| ShellError::GenericError {
81 error: "Invalid last-id".into(),
82 msg: format!("Failed to parse Scru128Id: {e}"),
83 span: Some(call.head),
84 help: None,
85 inner: vec![],
86 })
87 })
88 .transpose()?;
89
90 let options = ReadOptions::builder()
95 .follow(if let Some(pulse_ms) = pulse {
96 FollowOption::WithHeartbeat(Duration::from_millis(pulse_ms as u64))
97 } else if follow {
98 FollowOption::On
99 } else {
100 FollowOption::Off
101 })
102 .tail(tail)
103 .maybe_last_id(last_id)
104 .maybe_limit(limit.map(|l| l as usize))
105 .maybe_context_id(if all { None } else { Some(self.context_id) })
106 .maybe_topic(topic.clone())
107 .build();
108
109 let store = self.store.clone();
110 let span = call.head;
111 let signals = engine_state.signals().clone();
112
113 let (tx, rx) = std::sync::mpsc::channel();
115
116 std::thread::spawn(move || {
118 let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
119 rt.block_on(async move {
120 let mut receiver = store.read(options).await;
121
122 while let Some(frame) = receiver.recv().await {
123 let mut value = crate::nu::util::frame_to_value(&frame, span);
125
126 if !detail {
128 value = match value {
129 Value::Record { val, .. } => {
130 let mut filtered = val.into_owned();
131 filtered.remove("context_id");
132 filtered.remove("ttl");
133 Value::record(filtered, span)
134 }
135 v => v,
136 };
137 }
138
139 if tx.send(value).is_err() {
140 break;
141 }
142 }
143 });
144 });
145
146 let stream = ListStream::new(
148 std::iter::from_fn(move || {
149 use std::sync::mpsc::RecvTimeoutError;
150 loop {
151 if signals.interrupted() {
152 return None;
153 }
154 match rx.recv_timeout(Duration::from_millis(100)) {
155 Ok(value) => return Some(value),
156 Err(RecvTimeoutError::Timeout) => continue,
157 Err(RecvTimeoutError::Disconnected) => return None,
158 }
159 }
160 }),
161 span,
162 Signals::empty(),
163 );
164
165 Ok(PipelineData::ListStream(stream, None))
166 }
167}