xs/nu/commands/
last_stream_command.rs

1use nu_engine::CallExt;
2use nu_protocol::engine::{Call, Command, EngineState, Stack};
3use nu_protocol::{
4    Category, ListStream, PipelineData, ShellError, Signals, Signature, SyntaxShape, Type,
5};
6use std::time::Duration;
7
8use crate::store::{FollowOption, ReadOptions, Store};
9
10#[derive(Clone)]
11pub struct LastStreamCommand {
12    store: Store,
13}
14
15impl LastStreamCommand {
16    pub fn new(store: Store) -> Self {
17        Self { store }
18    }
19}
20
21impl Command for LastStreamCommand {
22    fn name(&self) -> &str {
23        ".last"
24    }
25
26    fn signature(&self) -> Signature {
27        Signature::build(".last")
28            .input_output_types(vec![(Type::Nothing, Type::Any)])
29            .required(
30                "topic",
31                SyntaxShape::String,
32                "topic to get most recent frame from",
33            )
34            .switch(
35                "follow",
36                "long poll for updates to most recent frame",
37                Some('f'),
38            )
39            .category(Category::Experimental)
40    }
41
42    fn description(&self) -> &str {
43        "get the most recent frame for a topic"
44    }
45
46    fn run(
47        &self,
48        engine_state: &EngineState,
49        stack: &mut Stack,
50        call: &Call,
51        _input: PipelineData,
52    ) -> Result<PipelineData, ShellError> {
53        let topic: String = call.req(engine_state, stack, 0)?;
54        let follow = call.has_flag(engine_state, stack, "follow")?;
55
56        let span = call.head;
57        let current_head = self.store.last(&topic);
58
59        if !follow {
60            // Non-follow mode: just return current head or empty
61            return if let Some(frame) = current_head {
62                Ok(PipelineData::Value(
63                    crate::nu::util::frame_to_value(&frame, span),
64                    None,
65                ))
66            } else {
67                Ok(PipelineData::Empty)
68            };
69        }
70
71        // Follow mode: stream head updates
72        let options = ReadOptions::builder()
73            .follow(FollowOption::On)
74            .maybe_after(current_head.as_ref().map(|f| f.id))
75            .build();
76
77        let store = self.store.clone();
78        let signals = engine_state.signals().clone();
79        let topic_filter = topic.clone();
80
81        // Create channel for async -> sync bridge
82        let (tx, rx) = std::sync::mpsc::channel();
83
84        // If there's a current head, send it first
85        if let Some(frame) = current_head {
86            let value = crate::nu::util::frame_to_value(&frame, span);
87            let _ = tx.send(value);
88        }
89
90        // Spawn thread to handle async store.read()
91        std::thread::spawn(move || {
92            let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
93            rt.block_on(async move {
94                let mut receiver = store.read(options).await;
95
96                while let Some(frame) = receiver.recv().await {
97                    // Filter for matching topic
98                    if frame.topic != topic_filter {
99                        continue;
100                    }
101
102                    let value = crate::nu::util::frame_to_value(&frame, span);
103
104                    if tx.send(value).is_err() {
105                        break;
106                    }
107                }
108            });
109        });
110
111        // Create ListStream from channel with signal-aware polling
112        let stream = ListStream::new(
113            std::iter::from_fn(move || {
114                use std::sync::mpsc::RecvTimeoutError;
115                loop {
116                    if signals.interrupted() {
117                        return None;
118                    }
119                    match rx.recv_timeout(Duration::from_millis(100)) {
120                        Ok(value) => return Some(value),
121                        Err(RecvTimeoutError::Timeout) => continue,
122                        Err(RecvTimeoutError::Disconnected) => return None,
123                    }
124                }
125            }),
126            span,
127            Signals::empty(),
128        );
129
130        Ok(PipelineData::ListStream(stream, None))
131    }
132}