Skip to main content

xs/nu/commands/
last_stream_command.rs

1use nu_engine::CallExt;
2use nu_protocol::engine::{Call, Command, EngineState, Stack};
3use nu_protocol::{
4    Category, ListStream, PipelineData, ShellError, Signals, Signature, SyntaxShape, Type, Value,
5};
6use std::time::Duration;
7
8use crate::nu::util;
9use crate::store::{FollowOption, ReadOptions, Store};
10
11#[derive(Clone)]
12pub struct LastStreamCommand {
13    store: Store,
14}
15
16impl LastStreamCommand {
17    pub fn new(store: Store) -> Self {
18        Self { store }
19    }
20}
21
22impl Command for LastStreamCommand {
23    fn name(&self) -> &str {
24        ".last"
25    }
26
27    fn signature(&self) -> Signature {
28        Signature::build(".last")
29            .input_output_types(vec![(Type::Nothing, Type::Any)])
30            .optional(
31                "topic",
32                SyntaxShape::String,
33                "topic to get most recent frame from (default: all topics)",
34            )
35            .optional(
36                "count",
37                SyntaxShape::Int,
38                "number of frames to return (default: 1)",
39            )
40            .switch(
41                "follow",
42                "long poll for updates to most recent frame",
43                Some('f'),
44            )
45            .switch(
46                "with-timestamp",
47                "include timestamp extracted from frame ID",
48                None,
49            )
50            .category(Category::Experimental)
51    }
52
53    fn description(&self) -> &str {
54        "get the most recent frame(s) for a topic"
55    }
56
57    fn run(
58        &self,
59        engine_state: &EngineState,
60        stack: &mut Stack,
61        call: &Call,
62        _input: PipelineData,
63    ) -> Result<PipelineData, ShellError> {
64        let raw_topic: Option<String> = call.opt(engine_state, stack, 0)?;
65        let raw_count: Option<i64> = call.opt(engine_state, stack, 1)?;
66        let follow = call.has_flag(engine_state, stack, "follow")?;
67        let with_timestamp = call.has_flag(engine_state, stack, "with-timestamp")?;
68        let span = call.head;
69
70        // Disambiguate: if topic parses as a positive integer and count is absent,
71        // treat it as the count (topics cannot start with digits per ADR 0002)
72        let (topic, n) = match (&raw_topic, raw_count) {
73            (Some(t), None) if t.parse::<usize>().is_ok() => (None, t.parse::<usize>().unwrap()),
74            _ => (raw_topic, raw_count.map(|v| v as usize).unwrap_or(1)),
75        };
76
77        if !follow {
78            // Non-follow mode: use sync path
79            let options = ReadOptions::builder().last(n).maybe_topic(topic).build();
80
81            let frames: Vec<Value> = self
82                .store
83                .read_sync(options)
84                .map(|frame| util::frame_to_value(&frame, span, with_timestamp))
85                .collect();
86
87            return if frames.is_empty() {
88                Ok(PipelineData::Empty)
89            } else if frames.len() == 1 {
90                Ok(PipelineData::Value(
91                    frames.into_iter().next().unwrap(),
92                    None,
93                ))
94            } else {
95                Ok(PipelineData::Value(Value::list(frames, span), None))
96            };
97        }
98
99        // Follow mode: use async path with streaming
100        let options = ReadOptions::builder()
101            .last(n)
102            .maybe_topic(topic)
103            .follow(FollowOption::On)
104            .build();
105
106        let store = self.store.clone();
107        let signals = engine_state.signals().clone();
108
109        // Create channel for async -> sync bridge
110        let (tx, rx) = std::sync::mpsc::channel();
111
112        // Spawn thread to handle async store.read()
113        std::thread::spawn(move || {
114            let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
115            rt.block_on(async move {
116                let mut receiver = store.read(options).await;
117
118                while let Some(frame) = receiver.recv().await {
119                    let value = util::frame_to_value(&frame, span, with_timestamp);
120
121                    if tx.send(value).is_err() {
122                        break;
123                    }
124                }
125            });
126        });
127
128        // Create ListStream from channel with signal-aware polling
129        let stream = ListStream::new(
130            std::iter::from_fn(move || {
131                use std::sync::mpsc::RecvTimeoutError;
132                loop {
133                    if signals.interrupted() {
134                        return None;
135                    }
136                    match rx.recv_timeout(Duration::from_millis(100)) {
137                        Ok(value) => return Some(value),
138                        Err(RecvTimeoutError::Timeout) => continue,
139                        Err(RecvTimeoutError::Disconnected) => return None,
140                    }
141                }
142            }),
143            span,
144            Signals::empty(),
145        );
146
147        Ok(PipelineData::ListStream(stream, None))
148    }
149}