1use nu_engine::CallExt;
2use nu_protocol::engine::{Call, Command, EngineState, Stack};
3use nu_protocol::shell_error::generic::GenericError;
4use nu_protocol::{
5 Category, ListStream, PipelineData, ShellError, Signals, Signature, SyntaxShape, Type, Value,
6};
7use std::time::Duration;
8
9use crate::store::{FollowOption, ReadOptions, Store};
10
11#[derive(Clone)]
12pub struct CatStreamCommand {
13 store: Store,
14}
15
16impl CatStreamCommand {
17 pub fn new(store: Store) -> Self {
18 Self { store }
19 }
20}
21
22impl Command for CatStreamCommand {
23 fn name(&self) -> &str {
24 ".cat"
25 }
26
27 fn signature(&self) -> Signature {
28 Signature::build(".cat")
29 .input_output_types(vec![(Type::Nothing, Type::Any)])
30 .switch("follow", "long poll for new events", Some('f'))
31 .named(
32 "pulse",
33 SyntaxShape::Int,
34 "interval in ms for synthetic xs.pulse events",
35 Some('p'),
36 )
37 .switch("new", "skip existing, only show new", Some('n'))
38 .switch("detail", "include all frame fields", Some('d'))
39 .named(
40 "limit",
41 SyntaxShape::Int,
42 "limit the number of frames to retrieve",
43 None,
44 )
45 .named(
46 "after",
47 SyntaxShape::String,
48 "start after a specific frame ID (exclusive)",
49 Some('a'),
50 )
51 .named(
52 "from",
53 SyntaxShape::String,
54 "start from a specific frame ID (inclusive)",
55 None,
56 )
57 .named(
58 "last",
59 SyntaxShape::Int,
60 "return the N most recent frames",
61 None,
62 )
63 .named(
64 "topic",
65 SyntaxShape::OneOf(vec![
66 SyntaxShape::String,
67 SyntaxShape::List(Box::new(SyntaxShape::String)),
68 ]),
69 "filter by topic pattern(s): string (commas allowed) or list",
70 Some('T'),
71 )
72 .switch(
73 "with-timestamp",
74 "include timestamp extracted from frame ID",
75 None,
76 )
77 .category(Category::Experimental)
78 }
79
80 fn description(&self) -> &str {
81 "Reads the event stream and returns frames (streaming version)"
82 }
83
84 fn run(
85 &self,
86 engine_state: &EngineState,
87 stack: &mut Stack,
88 call: &Call,
89 _input: PipelineData,
90 ) -> Result<PipelineData, ShellError> {
91 let follow = call.has_flag(engine_state, stack, "follow")?;
92 let pulse: Option<i64> = call.get_flag(engine_state, stack, "pulse")?;
93 let new = call.has_flag(engine_state, stack, "new")?;
94 let detail = call.has_flag(engine_state, stack, "detail")?;
95 let with_timestamp = call.has_flag(engine_state, stack, "with-timestamp")?;
96 let limit: Option<i64> = call.get_flag(engine_state, stack, "limit")?;
97 let last: Option<i64> = call.get_flag(engine_state, stack, "last")?;
98 let after: Option<String> = call.get_flag(engine_state, stack, "after")?;
99 let from: Option<String> = call.get_flag(engine_state, stack, "from")?;
100 let topic: Option<nu_protocol::Value> = call.get_flag(engine_state, stack, "topic")?;
101 let topic: Option<String> = topic
102 .map(crate::nu::util::topic_value_to_string)
103 .transpose()?;
104
105 let parse_id = |s: &str, name: &str| -> Result<scru128::Scru128Id, ShellError> {
107 s.parse().map_err(|e| {
108 ShellError::Generic(GenericError::new(
109 format!("Invalid {name}"),
110 format!("Failed to parse Scru128Id: {e}"),
111 call.head,
112 ))
113 })
114 };
115
116 let after: Option<scru128::Scru128Id> =
117 after.as_deref().map(|s| parse_id(s, "after")).transpose()?;
118 let from: Option<scru128::Scru128Id> =
119 from.as_deref().map(|s| parse_id(s, "from")).transpose()?;
120
121 let options = ReadOptions::builder()
123 .follow(if let Some(pulse_ms) = pulse {
124 FollowOption::WithHeartbeat(Duration::from_millis(pulse_ms as u64))
125 } else if follow {
126 FollowOption::On
127 } else {
128 FollowOption::Off
129 })
130 .new(new)
131 .maybe_after(after)
132 .maybe_from(from)
133 .maybe_limit(limit.map(|l| l as usize))
134 .maybe_last(last.map(|l| l as usize))
135 .maybe_topic(topic.clone())
136 .build();
137
138 let store = self.store.clone();
139 let span = call.head;
140 let signals = engine_state.signals().clone();
141
142 let (tx, rx) = std::sync::mpsc::channel();
144
145 std::thread::spawn(move || {
147 let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
148 rt.block_on(async move {
149 let mut receiver = store.read(options).await;
150
151 while let Some(frame) = receiver.recv().await {
152 let mut value = crate::nu::util::frame_to_value(&frame, span, with_timestamp);
154
155 if !detail {
157 value = match value {
158 Value::Record { val, .. } => {
159 let mut filtered = val.into_owned();
160 filtered.remove("ttl");
161 Value::record(filtered, span)
162 }
163 v => v,
164 };
165 }
166
167 if tx.send(value).is_err() {
168 break;
169 }
170 }
171 });
172 });
173
174 let stream = ListStream::new(
176 std::iter::from_fn(move || {
177 use std::sync::mpsc::RecvTimeoutError;
178 loop {
179 if signals.interrupted() {
180 return None;
181 }
182 match rx.recv_timeout(Duration::from_millis(100)) {
183 Ok(value) => return Some(value),
184 Err(RecvTimeoutError::Timeout) => continue,
185 Err(RecvTimeoutError::Disconnected) => return None,
186 }
187 }
188 }),
189 span,
190 Signals::empty(),
191 );
192
193 Ok(PipelineData::ListStream(stream, None))
194 }
195}