xs/nu/commands/
last_stream_command.rs1use nu_engine::CallExt;
2use nu_protocol::engine::{Call, Command, EngineState, Stack};
3use nu_protocol::{
4 Category, ListStream, PipelineData, ShellError, Signals, Signature, SyntaxShape, Type, Value,
5};
6use std::time::Duration;
7
8use crate::nu::util;
9use crate::store::{FollowOption, ReadOptions, Store};
10
11#[derive(Clone)]
12pub struct LastStreamCommand {
13 store: Store,
14}
15
16impl LastStreamCommand {
17 pub fn new(store: Store) -> Self {
18 Self { store }
19 }
20}
21
22impl Command for LastStreamCommand {
23 fn name(&self) -> &str {
24 ".last"
25 }
26
27 fn signature(&self) -> Signature {
28 Signature::build(".last")
29 .input_output_types(vec![(Type::Nothing, Type::Any)])
30 .optional(
31 "topic",
32 SyntaxShape::String,
33 "topic to get most recent frame from (default: all topics)",
34 )
35 .named(
36 "last",
37 SyntaxShape::Int,
38 "number of frames to return",
39 Some('n'),
40 )
41 .switch(
42 "follow",
43 "long poll for updates to most recent frame",
44 Some('f'),
45 )
46 .category(Category::Experimental)
47 }
48
49 fn description(&self) -> &str {
50 "get the most recent frame(s) for a topic"
51 }
52
53 fn run(
54 &self,
55 engine_state: &EngineState,
56 stack: &mut Stack,
57 call: &Call,
58 _input: PipelineData,
59 ) -> Result<PipelineData, ShellError> {
60 let topic: Option<String> = call.opt(engine_state, stack, 0)?;
61 let n: usize = call
62 .get_flag::<i64>(engine_state, stack, "last")?
63 .map(|v| v as usize)
64 .unwrap_or(1);
65 let follow = call.has_flag(engine_state, stack, "follow")?;
66 let span = call.head;
67
68 if !follow {
69 let options = ReadOptions::builder().last(n).maybe_topic(topic).build();
71
72 let frames: Vec<Value> = self
73 .store
74 .read_sync(options)
75 .map(|frame| util::frame_to_value(&frame, span))
76 .collect();
77
78 return if frames.is_empty() {
79 Ok(PipelineData::Empty)
80 } else if frames.len() == 1 {
81 Ok(PipelineData::Value(
82 frames.into_iter().next().unwrap(),
83 None,
84 ))
85 } else {
86 Ok(PipelineData::Value(Value::list(frames, span), None))
87 };
88 }
89
90 let options = ReadOptions::builder()
92 .last(n)
93 .maybe_topic(topic)
94 .follow(FollowOption::On)
95 .build();
96
97 let store = self.store.clone();
98 let signals = engine_state.signals().clone();
99
100 let (tx, rx) = std::sync::mpsc::channel();
102
103 std::thread::spawn(move || {
105 let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
106 rt.block_on(async move {
107 let mut receiver = store.read(options).await;
108
109 while let Some(frame) = receiver.recv().await {
110 let value = util::frame_to_value(&frame, span);
111
112 if tx.send(value).is_err() {
113 break;
114 }
115 }
116 });
117 });
118
119 let stream = ListStream::new(
121 std::iter::from_fn(move || {
122 use std::sync::mpsc::RecvTimeoutError;
123 loop {
124 if signals.interrupted() {
125 return None;
126 }
127 match rx.recv_timeout(Duration::from_millis(100)) {
128 Ok(value) => return Some(value),
129 Err(RecvTimeoutError::Timeout) => continue,
130 Err(RecvTimeoutError::Disconnected) => return None,
131 }
132 }
133 }),
134 span,
135 Signals::empty(),
136 );
137
138 Ok(PipelineData::ListStream(stream, None))
139 }
140}