nu_plugin_nuts 0.1.0

Blazingly fast Nats client as a nushell plugin
Documentation
use futures::StreamExt;
use log::info;
use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
use nu_protocol::{
    Category, Example, IntoValue, LabeledError, ListStream, PipelineData, Signals, Signature, Span,
    SyntaxShape, Type,
};
use tokio::{select, sync::mpsc};
use tokio_util::sync::CancellationToken;

use crate::Nuts;

pub(crate) struct Subscribe;

impl PluginCommand for Subscribe {
    type Plugin = Nuts;

    fn name(&self) -> &str {
        "nuts sub"
    }

    fn signature(&self) -> Signature {
        Signature::build(self.name())
            .required("subject", SyntaxShape::String, "Subject to consume from")
            .switch("binary", "Do not decode binary as string", Some('b'))
            .input_output_type(Type::Any, Type::String)
            .input_output_type(Type::Any, Type::Binary)
            .category(Category::Generators)
    }

    fn description(&self) -> &str {
        "Consume from a subject"
    }

    fn search_terms(&self) -> Vec<&str> {
        vec!["nats", "sub", "subscribe"]
    }

    fn examples(&self) -> Vec<Example> {
        vec![Example {
            example: "nuts sub mysubject",
            description: "Subscribe to a subject",
            result: Some(["mymessage".into_value(Span::unknown())].into_value(Span::unknown())),
        }]
    }

    fn run(
        &self,
        plugin: &Self::Plugin,
        engine: &EngineInterface,
        call: &EvaluatedCall,
        _input: PipelineData,
    ) -> Result<PipelineData, LabeledError> {
        let subject: String = call.req(0)?;
        let binary_output = call.has_flag("binary")?;
        let client = plugin.nats.read().unwrap();
        match client.as_ref() {
            Some(client) => {
                let (tx, mut rx) = mpsc::unbounded_channel();
                plugin.runtime.spawn({
                    let client = client.clone();
                    let engine = engine.clone();
                    async move {
                        info!("Spawned subscription");
                        let mut subscription = client
                            .subscribe(subject.clone())
                            .await
                            .unwrap_or_else(|_| panic!("Failed to subscribe to subject {}", subject));

                        info!("Subscribed");
                        let cancellation = CancellationToken::new();
                        let _signal_guard = engine.register_signal_handler(Box::new({
                            let cancellation = cancellation.clone();
                            move |_| {
                                info!("Cancel");
                                cancellation.cancel();
                            }
                        })).expect("Failed to register signal handler");
                        loop {
                            select! {
                                _ = cancellation.cancelled() => {
                                    break;
                                }
                                Some(message) = subscription.next() => {
                                    tx.send(message).expect("Failed to send message through channel");
                                }
                            };
                        }
                    }
                });

                let handle = plugin.runtime.handle().clone();
                let stream_iter = std::iter::repeat_with(move || handle.block_on(rx.recv()))
                    .map_while(move |message| {
                        message.map(|message| {
                            if binary_output {
                                message.payload.into_value(Span::unknown())
                            } else {
                                String::from_utf8_lossy(&message.payload)
                                    .into_value(Span::unknown())
                            }
                        })
                    });

                Ok(PipelineData::ListStream(
                    ListStream::new(stream_iter, call.head, Signals::empty()),
                    None,
                ))
            }
            None => Err(LabeledError::new(
                "Not connected to NATS server. Call `nuts connect` first",
            )),
        }
    }
}