xs/nu/commands/
last_stream_command.rs1use nu_engine::CallExt;
2use nu_protocol::engine::{Call, Command, EngineState, Stack};
3use nu_protocol::{
4 Category, ListStream, PipelineData, ShellError, Signals, Signature, SyntaxShape, Type, Value,
5};
6use std::time::Duration;
7
8use crate::nu::util;
9use crate::store::{FollowOption, ReadOptions, Store};
10
11#[derive(Clone)]
12pub struct LastStreamCommand {
13 store: Store,
14}
15
16impl LastStreamCommand {
17 pub fn new(store: Store) -> Self {
18 Self { store }
19 }
20}
21
22impl Command for LastStreamCommand {
23 fn name(&self) -> &str {
24 ".last"
25 }
26
27 fn signature(&self) -> Signature {
28 Signature::build(".last")
29 .input_output_types(vec![(Type::Nothing, Type::Any)])
30 .optional(
31 "topic",
32 SyntaxShape::String,
33 "topic to get most recent frame from (default: all topics)",
34 )
35 .optional(
36 "count",
37 SyntaxShape::Int,
38 "number of frames to return (default: 1)",
39 )
40 .switch(
41 "follow",
42 "long poll for updates to most recent frame",
43 Some('f'),
44 )
45 .switch(
46 "with-timestamp",
47 "include timestamp extracted from frame ID",
48 None,
49 )
50 .category(Category::Experimental)
51 }
52
53 fn description(&self) -> &str {
54 "get the most recent frame(s) for a topic"
55 }
56
57 fn run(
58 &self,
59 engine_state: &EngineState,
60 stack: &mut Stack,
61 call: &Call,
62 _input: PipelineData,
63 ) -> Result<PipelineData, ShellError> {
64 let raw_topic: Option<String> = call.opt(engine_state, stack, 0)?;
65 let raw_count: Option<i64> = call.opt(engine_state, stack, 1)?;
66 let follow = call.has_flag(engine_state, stack, "follow")?;
67 let with_timestamp = call.has_flag(engine_state, stack, "with-timestamp")?;
68 let span = call.head;
69
70 let (topic, n) = match (&raw_topic, raw_count) {
73 (Some(t), None) if t.parse::<usize>().is_ok() => (None, t.parse::<usize>().unwrap()),
74 _ => (raw_topic, raw_count.map(|v| v as usize).unwrap_or(1)),
75 };
76
77 if !follow {
78 let options = ReadOptions::builder().last(n).maybe_topic(topic).build();
80
81 let frames: Vec<Value> = self
82 .store
83 .read_sync(options)
84 .map(|frame| util::frame_to_value(&frame, span, with_timestamp))
85 .collect();
86
87 return if frames.is_empty() {
88 Ok(PipelineData::Empty)
89 } else if frames.len() == 1 {
90 Ok(PipelineData::Value(
91 frames.into_iter().next().unwrap(),
92 None,
93 ))
94 } else {
95 Ok(PipelineData::Value(Value::list(frames, span), None))
96 };
97 }
98
99 let options = ReadOptions::builder()
101 .last(n)
102 .maybe_topic(topic)
103 .follow(FollowOption::On)
104 .build();
105
106 let store = self.store.clone();
107 let signals = engine_state.signals().clone();
108
109 let (tx, rx) = std::sync::mpsc::channel();
111
112 std::thread::spawn(move || {
114 let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
115 rt.block_on(async move {
116 let mut receiver = store.read(options).await;
117
118 while let Some(frame) = receiver.recv().await {
119 let value = util::frame_to_value(&frame, span, with_timestamp);
120
121 if tx.send(value).is_err() {
122 break;
123 }
124 }
125 });
126 });
127
128 let stream = ListStream::new(
130 std::iter::from_fn(move || {
131 use std::sync::mpsc::RecvTimeoutError;
132 loop {
133 if signals.interrupted() {
134 return None;
135 }
136 match rx.recv_timeout(Duration::from_millis(100)) {
137 Ok(value) => return Some(value),
138 Err(RecvTimeoutError::Timeout) => continue,
139 Err(RecvTimeoutError::Disconnected) => return None,
140 }
141 }
142 }),
143 span,
144 Signals::empty(),
145 );
146
147 Ok(PipelineData::ListStream(stream, None))
148 }
149}