xs/nu/commands/
head_stream_command.rs

1use nu_engine::CallExt;
2use nu_protocol::engine::{Call, Command, EngineState, Stack};
3use nu_protocol::{
4    Category, ListStream, PipelineData, ShellError, Signals, Signature, SyntaxShape, Type,
5};
6use std::time::Duration;
7
8use crate::store::{FollowOption, ReadOptions, Store};
9
10#[derive(Clone)]
11pub struct HeadStreamCommand {
12    store: Store,
13    context_id: scru128::Scru128Id,
14}
15
16impl HeadStreamCommand {
17    pub fn new(store: Store, context_id: scru128::Scru128Id) -> Self {
18        Self { store, context_id }
19    }
20}
21
22impl Command for HeadStreamCommand {
23    fn name(&self) -> &str {
24        ".head"
25    }
26
27    fn signature(&self) -> Signature {
28        Signature::build(".head")
29            .input_output_types(vec![(Type::Nothing, Type::Any)])
30            .required("topic", SyntaxShape::String, "topic to get head frame from")
31            .switch("follow", "long poll for new head frames", Some('f'))
32            .named(
33                "context",
34                SyntaxShape::String,
35                "context ID (defaults to current context)",
36                Some('c'),
37            )
38            .category(Category::Experimental)
39    }
40
41    fn description(&self) -> &str {
42        "get the most recent frame for a topic"
43    }
44
45    fn run(
46        &self,
47        engine_state: &EngineState,
48        stack: &mut Stack,
49        call: &Call,
50        _input: PipelineData,
51    ) -> Result<PipelineData, ShellError> {
52        let topic: String = call.req(engine_state, stack, 0)?;
53        let follow = call.has_flag(engine_state, stack, "follow")?;
54        let context_str: Option<String> = call.get_flag(engine_state, stack, "context")?;
55
56        let context_id = if let Some(ctx) = context_str {
57            ctx.parse::<scru128::Scru128Id>()
58                .map_err(|e| ShellError::GenericError {
59                    error: "Invalid context ID".into(),
60                    msg: e.to_string(),
61                    span: Some(call.head),
62                    help: None,
63                    inner: vec![],
64                })?
65        } else {
66            self.context_id
67        };
68
69        let span = call.head;
70        let current_head = self.store.head(&topic, context_id);
71
72        if !follow {
73            // Non-follow mode: just return current head or empty
74            return if let Some(frame) = current_head {
75                Ok(PipelineData::Value(
76                    crate::nu::util::frame_to_value(&frame, span),
77                    None,
78                ))
79            } else {
80                Ok(PipelineData::Empty)
81            };
82        }
83
84        // Follow mode: stream head updates
85        let options = ReadOptions::builder()
86            .follow(FollowOption::On)
87            .maybe_last_id(current_head.as_ref().map(|f| f.id))
88            .maybe_context_id(Some(context_id))
89            .build();
90
91        let store = self.store.clone();
92        let signals = engine_state.signals().clone();
93        let topic_filter = topic.clone();
94
95        // Create channel for async -> sync bridge
96        let (tx, rx) = std::sync::mpsc::channel();
97
98        // If there's a current head, send it first
99        if let Some(frame) = current_head {
100            let value = crate::nu::util::frame_to_value(&frame, span);
101            let _ = tx.send(value);
102        }
103
104        // Spawn thread to handle async store.read()
105        std::thread::spawn(move || {
106            let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
107            rt.block_on(async move {
108                let mut receiver = store.read(options).await;
109
110                while let Some(frame) = receiver.recv().await {
111                    // Filter for matching topic
112                    if frame.topic != topic_filter {
113                        continue;
114                    }
115
116                    let value = crate::nu::util::frame_to_value(&frame, span);
117
118                    if tx.send(value).is_err() {
119                        break;
120                    }
121                }
122            });
123        });
124
125        // Create ListStream from channel with signal-aware polling
126        let stream = ListStream::new(
127            std::iter::from_fn(move || {
128                use std::sync::mpsc::RecvTimeoutError;
129                loop {
130                    if signals.interrupted() {
131                        return None;
132                    }
133                    match rx.recv_timeout(Duration::from_millis(100)) {
134                        Ok(value) => return Some(value),
135                        Err(RecvTimeoutError::Timeout) => continue,
136                        Err(RecvTimeoutError::Disconnected) => return None,
137                    }
138                }
139            }),
140            span,
141            Signals::empty(),
142        );
143
144        Ok(PipelineData::ListStream(stream, None))
145    }
146}