xs/nu/commands/
head_stream_command.rs1use nu_engine::CallExt;
2use nu_protocol::engine::{Call, Command, EngineState, Stack};
3use nu_protocol::{
4 Category, ListStream, PipelineData, ShellError, Signals, Signature, SyntaxShape, Type,
5};
6use std::time::Duration;
7
8use crate::store::{FollowOption, ReadOptions, Store};
9
10#[derive(Clone)]
11pub struct HeadStreamCommand {
12 store: Store,
13 context_id: scru128::Scru128Id,
14}
15
16impl HeadStreamCommand {
17 pub fn new(store: Store, context_id: scru128::Scru128Id) -> Self {
18 Self { store, context_id }
19 }
20}
21
22impl Command for HeadStreamCommand {
23 fn name(&self) -> &str {
24 ".head"
25 }
26
27 fn signature(&self) -> Signature {
28 Signature::build(".head")
29 .input_output_types(vec![(Type::Nothing, Type::Any)])
30 .required("topic", SyntaxShape::String, "topic to get head frame from")
31 .switch("follow", "long poll for new head frames", Some('f'))
32 .named(
33 "context",
34 SyntaxShape::String,
35 "context ID (defaults to current context)",
36 Some('c'),
37 )
38 .category(Category::Experimental)
39 }
40
41 fn description(&self) -> &str {
42 "get the most recent frame for a topic"
43 }
44
45 fn run(
46 &self,
47 engine_state: &EngineState,
48 stack: &mut Stack,
49 call: &Call,
50 _input: PipelineData,
51 ) -> Result<PipelineData, ShellError> {
52 let topic: String = call.req(engine_state, stack, 0)?;
53 let follow = call.has_flag(engine_state, stack, "follow")?;
54 let context_str: Option<String> = call.get_flag(engine_state, stack, "context")?;
55
56 let context_id = if let Some(ctx) = context_str {
57 ctx.parse::<scru128::Scru128Id>()
58 .map_err(|e| ShellError::GenericError {
59 error: "Invalid context ID".into(),
60 msg: e.to_string(),
61 span: Some(call.head),
62 help: None,
63 inner: vec![],
64 })?
65 } else {
66 self.context_id
67 };
68
69 let span = call.head;
70 let current_head = self.store.head(&topic, context_id);
71
72 if !follow {
73 return if let Some(frame) = current_head {
75 Ok(PipelineData::Value(
76 crate::nu::util::frame_to_value(&frame, span),
77 None,
78 ))
79 } else {
80 Ok(PipelineData::Empty)
81 };
82 }
83
84 let options = ReadOptions::builder()
86 .follow(FollowOption::On)
87 .maybe_last_id(current_head.as_ref().map(|f| f.id))
88 .maybe_context_id(Some(context_id))
89 .build();
90
91 let store = self.store.clone();
92 let signals = engine_state.signals().clone();
93 let topic_filter = topic.clone();
94
95 let (tx, rx) = std::sync::mpsc::channel();
97
98 if let Some(frame) = current_head {
100 let value = crate::nu::util::frame_to_value(&frame, span);
101 let _ = tx.send(value);
102 }
103
104 std::thread::spawn(move || {
106 let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
107 rt.block_on(async move {
108 let mut receiver = store.read(options).await;
109
110 while let Some(frame) = receiver.recv().await {
111 if frame.topic != topic_filter {
113 continue;
114 }
115
116 let value = crate::nu::util::frame_to_value(&frame, span);
117
118 if tx.send(value).is_err() {
119 break;
120 }
121 }
122 });
123 });
124
125 let stream = ListStream::new(
127 std::iter::from_fn(move || {
128 use std::sync::mpsc::RecvTimeoutError;
129 loop {
130 if signals.interrupted() {
131 return None;
132 }
133 match rx.recv_timeout(Duration::from_millis(100)) {
134 Ok(value) => return Some(value),
135 Err(RecvTimeoutError::Timeout) => continue,
136 Err(RecvTimeoutError::Disconnected) => return None,
137 }
138 }
139 }),
140 span,
141 Signals::empty(),
142 );
143
144 Ok(PipelineData::ListStream(stream, None))
145 }
146}