use nu_engine::CallExt;
use nu_protocol::engine::{Call, Command, EngineState, Stack};
use nu_protocol::{
Category, ListStream, PipelineData, ShellError, Signals, Signature, SyntaxShape, Type, Value,
};
use std::time::Duration;
use crate::nu::util;
use crate::store::{FollowOption, ReadOptions, Store};
#[derive(Clone)]
pub struct LastStreamCommand {
store: Store,
}
impl LastStreamCommand {
pub fn new(store: Store) -> Self {
Self { store }
}
}
impl Command for LastStreamCommand {
fn name(&self) -> &str {
".last"
}
fn signature(&self) -> Signature {
Signature::build(".last")
.input_output_types(vec![(Type::Nothing, Type::Any)])
.optional(
"topic",
SyntaxShape::String,
"topic to get most recent frame from (default: all topics)",
)
.optional(
"count",
SyntaxShape::Int,
"number of frames to return (default: 1)",
)
.switch(
"follow",
"long poll for updates to most recent frame",
Some('f'),
)
.switch(
"with-timestamp",
"include timestamp extracted from frame ID",
None,
)
.category(Category::Experimental)
}
fn description(&self) -> &str {
"get the most recent frame(s) for a topic"
}
fn run(
&self,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
_input: PipelineData,
) -> Result<PipelineData, ShellError> {
let raw_topic: Option<String> = call.opt(engine_state, stack, 0)?;
let raw_count: Option<i64> = call.opt(engine_state, stack, 1)?;
let follow = call.has_flag(engine_state, stack, "follow")?;
let with_timestamp = call.has_flag(engine_state, stack, "with-timestamp")?;
let span = call.head;
let (topic, n) = match (&raw_topic, raw_count) {
(Some(t), None) if t.parse::<usize>().is_ok() => (None, t.parse::<usize>().unwrap()),
_ => (raw_topic, raw_count.map(|v| v as usize).unwrap_or(1)),
};
if !follow {
let options = ReadOptions::builder().last(n).maybe_topic(topic).build();
let frames: Vec<Value> = self
.store
.read_sync(options)
.map(|frame| util::frame_to_value(&frame, span, with_timestamp))
.collect();
return if frames.is_empty() {
Ok(PipelineData::Empty)
} else if frames.len() == 1 {
Ok(PipelineData::Value(
frames.into_iter().next().unwrap(),
None,
))
} else {
Ok(PipelineData::Value(Value::list(frames, span), None))
};
}
let options = ReadOptions::builder()
.last(n)
.maybe_topic(topic)
.follow(FollowOption::On)
.build();
let store = self.store.clone();
let signals = engine_state.signals().clone();
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
rt.block_on(async move {
let mut receiver = store.read(options).await;
while let Some(frame) = receiver.recv().await {
let value = util::frame_to_value(&frame, span, with_timestamp);
if tx.send(value).is_err() {
break;
}
}
});
});
let stream = ListStream::new(
std::iter::from_fn(move || {
use std::sync::mpsc::RecvTimeoutError;
loop {
if signals.interrupted() {
return None;
}
match rx.recv_timeout(Duration::from_millis(100)) {
Ok(value) => return Some(value),
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) => return None,
}
}
}),
span,
Signals::empty(),
);
Ok(PipelineData::ListStream(stream, None))
}
}