xs/nu/commands/
last_stream_command.rs1use nu_engine::CallExt;
2use nu_protocol::engine::{Call, Command, EngineState, Stack};
3use nu_protocol::{
4 Category, ListStream, PipelineData, ShellError, Signals, Signature, SyntaxShape, Type,
5};
6use std::time::Duration;
7
8use crate::store::{FollowOption, ReadOptions, Store};
9
10#[derive(Clone)]
11pub struct LastStreamCommand {
12 store: Store,
13}
14
15impl LastStreamCommand {
16 pub fn new(store: Store) -> Self {
17 Self { store }
18 }
19}
20
21impl Command for LastStreamCommand {
22 fn name(&self) -> &str {
23 ".last"
24 }
25
26 fn signature(&self) -> Signature {
27 Signature::build(".last")
28 .input_output_types(vec![(Type::Nothing, Type::Any)])
29 .required(
30 "topic",
31 SyntaxShape::String,
32 "topic to get most recent frame from",
33 )
34 .switch(
35 "follow",
36 "long poll for updates to most recent frame",
37 Some('f'),
38 )
39 .category(Category::Experimental)
40 }
41
42 fn description(&self) -> &str {
43 "get the most recent frame for a topic"
44 }
45
46 fn run(
47 &self,
48 engine_state: &EngineState,
49 stack: &mut Stack,
50 call: &Call,
51 _input: PipelineData,
52 ) -> Result<PipelineData, ShellError> {
53 let topic: String = call.req(engine_state, stack, 0)?;
54 let follow = call.has_flag(engine_state, stack, "follow")?;
55
56 let span = call.head;
57 let current_head = self.store.last(&topic);
58
59 if !follow {
60 return if let Some(frame) = current_head {
62 Ok(PipelineData::Value(
63 crate::nu::util::frame_to_value(&frame, span),
64 None,
65 ))
66 } else {
67 Ok(PipelineData::Empty)
68 };
69 }
70
71 let options = ReadOptions::builder()
73 .follow(FollowOption::On)
74 .maybe_after(current_head.as_ref().map(|f| f.id))
75 .build();
76
77 let store = self.store.clone();
78 let signals = engine_state.signals().clone();
79 let topic_filter = topic.clone();
80
81 let (tx, rx) = std::sync::mpsc::channel();
83
84 if let Some(frame) = current_head {
86 let value = crate::nu::util::frame_to_value(&frame, span);
87 let _ = tx.send(value);
88 }
89
90 std::thread::spawn(move || {
92 let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
93 rt.block_on(async move {
94 let mut receiver = store.read(options).await;
95
96 while let Some(frame) = receiver.recv().await {
97 if frame.topic != topic_filter {
99 continue;
100 }
101
102 let value = crate::nu::util::frame_to_value(&frame, span);
103
104 if tx.send(value).is_err() {
105 break;
106 }
107 }
108 });
109 });
110
111 let stream = ListStream::new(
113 std::iter::from_fn(move || {
114 use std::sync::mpsc::RecvTimeoutError;
115 loop {
116 if signals.interrupted() {
117 return None;
118 }
119 match rx.recv_timeout(Duration::from_millis(100)) {
120 Ok(value) => return Some(value),
121 Err(RecvTimeoutError::Timeout) => continue,
122 Err(RecvTimeoutError::Disconnected) => return None,
123 }
124 }
125 }),
126 span,
127 Signals::empty(),
128 );
129
130 Ok(PipelineData::ListStream(stream, None))
131 }
132}