Skip to main content

xs/nu/commands/
last_stream_command.rs

1use nu_engine::CallExt;
2use nu_protocol::engine::{Call, Command, EngineState, Stack};
3use nu_protocol::{
4    Category, ListStream, PipelineData, ShellError, Signals, Signature, SyntaxShape, Type, Value,
5};
6use std::time::Duration;
7
8use crate::nu::util;
9use crate::store::{FollowOption, ReadOptions, Store};
10
11#[derive(Clone)]
12pub struct LastStreamCommand {
13    store: Store,
14}
15
16impl LastStreamCommand {
17    pub fn new(store: Store) -> Self {
18        Self { store }
19    }
20}
21
22impl Command for LastStreamCommand {
23    fn name(&self) -> &str {
24        ".last"
25    }
26
27    fn signature(&self) -> Signature {
28        Signature::build(".last")
29            .input_output_types(vec![(Type::Nothing, Type::Any)])
30            .optional(
31                "topic",
32                SyntaxShape::String,
33                "topic to get most recent frame from (default: all topics)",
34            )
35            .named(
36                "last",
37                SyntaxShape::Int,
38                "number of frames to return",
39                Some('n'),
40            )
41            .switch(
42                "follow",
43                "long poll for updates to most recent frame",
44                Some('f'),
45            )
46            .category(Category::Experimental)
47    }
48
49    fn description(&self) -> &str {
50        "get the most recent frame(s) for a topic"
51    }
52
53    fn run(
54        &self,
55        engine_state: &EngineState,
56        stack: &mut Stack,
57        call: &Call,
58        _input: PipelineData,
59    ) -> Result<PipelineData, ShellError> {
60        let topic: Option<String> = call.opt(engine_state, stack, 0)?;
61        let n: usize = call
62            .get_flag::<i64>(engine_state, stack, "last")?
63            .map(|v| v as usize)
64            .unwrap_or(1);
65        let follow = call.has_flag(engine_state, stack, "follow")?;
66        let span = call.head;
67
68        if !follow {
69            // Non-follow mode: use sync path
70            let options = ReadOptions::builder().last(n).maybe_topic(topic).build();
71
72            let frames: Vec<Value> = self
73                .store
74                .read_sync(options)
75                .map(|frame| util::frame_to_value(&frame, span))
76                .collect();
77
78            return if frames.is_empty() {
79                Ok(PipelineData::Empty)
80            } else if frames.len() == 1 {
81                Ok(PipelineData::Value(
82                    frames.into_iter().next().unwrap(),
83                    None,
84                ))
85            } else {
86                Ok(PipelineData::Value(Value::list(frames, span), None))
87            };
88        }
89
90        // Follow mode: use async path with streaming
91        let options = ReadOptions::builder()
92            .last(n)
93            .maybe_topic(topic)
94            .follow(FollowOption::On)
95            .build();
96
97        let store = self.store.clone();
98        let signals = engine_state.signals().clone();
99
100        // Create channel for async -> sync bridge
101        let (tx, rx) = std::sync::mpsc::channel();
102
103        // Spawn thread to handle async store.read()
104        std::thread::spawn(move || {
105            let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
106            rt.block_on(async move {
107                let mut receiver = store.read(options).await;
108
109                while let Some(frame) = receiver.recv().await {
110                    let value = util::frame_to_value(&frame, span);
111
112                    if tx.send(value).is_err() {
113                        break;
114                    }
115                }
116            });
117        });
118
119        // Create ListStream from channel with signal-aware polling
120        let stream = ListStream::new(
121            std::iter::from_fn(move || {
122                use std::sync::mpsc::RecvTimeoutError;
123                loop {
124                    if signals.interrupted() {
125                        return None;
126                    }
127                    match rx.recv_timeout(Duration::from_millis(100)) {
128                        Ok(value) => return Some(value),
129                        Err(RecvTimeoutError::Timeout) => continue,
130                        Err(RecvTimeoutError::Disconnected) => return None,
131                    }
132                }
133            }),
134            span,
135            Signals::empty(),
136        );
137
138        Ok(PipelineData::ListStream(stream, None))
139    }
140}