1use nu_engine::CallExt;
2use nu_protocol::engine::{Call, Command, EngineState, Stack};
3use nu_protocol::{
4 Category, ListStream, PipelineData, ShellError, Signals, Signature, SyntaxShape, Type, Value,
5};
6use std::time::Duration;
7
8use crate::store::{FollowOption, ReadOptions, Store};
9
10#[derive(Clone)]
11pub struct CatStreamCommand {
12 store: Store,
13}
14
15impl CatStreamCommand {
16 pub fn new(store: Store) -> Self {
17 Self { store }
18 }
19}
20
21impl Command for CatStreamCommand {
22 fn name(&self) -> &str {
23 ".cat"
24 }
25
26 fn signature(&self) -> Signature {
27 Signature::build(".cat")
28 .input_output_types(vec![(Type::Nothing, Type::Any)])
29 .switch("follow", "long poll for new events", Some('f'))
30 .named(
31 "pulse",
32 SyntaxShape::Int,
33 "interval in ms for synthetic xs.pulse events",
34 Some('p'),
35 )
36 .switch("new", "skip existing, only show new", Some('n'))
37 .switch("detail", "include all frame fields", Some('d'))
38 .named(
39 "limit",
40 SyntaxShape::Int,
41 "limit the number of frames to retrieve",
42 None,
43 )
44 .named(
45 "after",
46 SyntaxShape::String,
47 "start after a specific frame ID (exclusive)",
48 Some('a'),
49 )
50 .named(
51 "from",
52 SyntaxShape::String,
53 "start from a specific frame ID (inclusive)",
54 None,
55 )
56 .named(
57 "last",
58 SyntaxShape::Int,
59 "return the N most recent frames",
60 None,
61 )
62 .named("topic", SyntaxShape::String, "filter by topic", Some('T'))
63 .switch(
64 "with-timestamp",
65 "include timestamp extracted from frame ID",
66 None,
67 )
68 .category(Category::Experimental)
69 }
70
71 fn description(&self) -> &str {
72 "Reads the event stream and returns frames (streaming version)"
73 }
74
75 fn run(
76 &self,
77 engine_state: &EngineState,
78 stack: &mut Stack,
79 call: &Call,
80 _input: PipelineData,
81 ) -> Result<PipelineData, ShellError> {
82 let follow = call.has_flag(engine_state, stack, "follow")?;
83 let pulse: Option<i64> = call.get_flag(engine_state, stack, "pulse")?;
84 let new = call.has_flag(engine_state, stack, "new")?;
85 let detail = call.has_flag(engine_state, stack, "detail")?;
86 let with_timestamp = call.has_flag(engine_state, stack, "with-timestamp")?;
87 let limit: Option<i64> = call.get_flag(engine_state, stack, "limit")?;
88 let last: Option<i64> = call.get_flag(engine_state, stack, "last")?;
89 let after: Option<String> = call.get_flag(engine_state, stack, "after")?;
90 let from: Option<String> = call.get_flag(engine_state, stack, "from")?;
91 let topic: Option<String> = call.get_flag(engine_state, stack, "topic")?;
92
93 let parse_id = |s: &str, name: &str| -> Result<scru128::Scru128Id, ShellError> {
95 s.parse().map_err(|e| ShellError::GenericError {
96 error: format!("Invalid {name}"),
97 msg: format!("Failed to parse Scru128Id: {e}"),
98 span: Some(call.head),
99 help: None,
100 inner: vec![],
101 })
102 };
103
104 let after: Option<scru128::Scru128Id> =
105 after.as_deref().map(|s| parse_id(s, "after")).transpose()?;
106 let from: Option<scru128::Scru128Id> =
107 from.as_deref().map(|s| parse_id(s, "from")).transpose()?;
108
109 let options = ReadOptions::builder()
111 .follow(if let Some(pulse_ms) = pulse {
112 FollowOption::WithHeartbeat(Duration::from_millis(pulse_ms as u64))
113 } else if follow {
114 FollowOption::On
115 } else {
116 FollowOption::Off
117 })
118 .new(new)
119 .maybe_after(after)
120 .maybe_from(from)
121 .maybe_limit(limit.map(|l| l as usize))
122 .maybe_last(last.map(|l| l as usize))
123 .maybe_topic(topic.clone())
124 .build();
125
126 let store = self.store.clone();
127 let span = call.head;
128 let signals = engine_state.signals().clone();
129
130 let (tx, rx) = std::sync::mpsc::channel();
132
133 std::thread::spawn(move || {
135 let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
136 rt.block_on(async move {
137 let mut receiver = store.read(options).await;
138
139 while let Some(frame) = receiver.recv().await {
140 let mut value = crate::nu::util::frame_to_value(&frame, span, with_timestamp);
142
143 if !detail {
145 value = match value {
146 Value::Record { val, .. } => {
147 let mut filtered = val.into_owned();
148 filtered.remove("ttl");
149 Value::record(filtered, span)
150 }
151 v => v,
152 };
153 }
154
155 if tx.send(value).is_err() {
156 break;
157 }
158 }
159 });
160 });
161
162 let stream = ListStream::new(
164 std::iter::from_fn(move || {
165 use std::sync::mpsc::RecvTimeoutError;
166 loop {
167 if signals.interrupted() {
168 return None;
169 }
170 match rx.recv_timeout(Duration::from_millis(100)) {
171 Ok(value) => return Some(value),
172 Err(RecvTimeoutError::Timeout) => continue,
173 Err(RecvTimeoutError::Disconnected) => return None,
174 }
175 }
176 }),
177 span,
178 Signals::empty(),
179 );
180
181 Ok(PipelineData::ListStream(stream, None))
182 }
183}