nats 0.21.0

A Rust NATS client
Documentation
// Copyright 2020-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{
    fmt, io,
    sync::{
        atomic::{AtomicBool, Ordering},
        Arc,
    },
};

use crate::{
    client::Client,
    header::{self, HeaderMap},
};

use time::OffsetDateTime;

pub(crate) const MESSAGE_NOT_BOUND: &str = "message not bound to a connection";

/// A message received on a subject.
#[derive(Clone)]
pub struct Message {
    /// The subject this message came from.
    pub subject: String,

    /// Optional reply subject that may be used for sending a response to this
    /// message.
    pub reply: Option<String>,

    /// The message contents.
    pub data: Vec<u8>,

    /// Optional headers associated with this `Message`.
    pub headers: Option<HeaderMap>,

    /// Client for publishing on the reply subject.
    #[doc(hidden)]
    pub client: Option<Client>,

    /// Whether this message has already been successfully double-acked
    /// using `JetStream`.
    #[doc(hidden)]
    pub double_acked: Arc<AtomicBool>,
}

impl From<crate::asynk::Message> for Message {
    fn from(asynk: crate::asynk::Message) -> Message {
        Message {
            subject: asynk.subject,
            reply: asynk.reply,
            data: asynk.data,
            headers: asynk.headers,
            client: asynk.client,
            double_acked: asynk.double_acked,
        }
    }
}

impl Message {
    /// Creates new empty `Message`, without a Client.
    /// Useful for passing `Message` data or creating `Message` instance without caring about `Client`,
    /// but cannot be used on it's own for associated methods as those require `Client` injected into `Message`
    /// and will error without it.
    pub fn new(
        subject: &str,
        reply: Option<&str>,
        data: impl AsRef<[u8]>,
        headers: Option<HeaderMap>,
    ) -> Message {
        Message {
            subject: subject.to_string(),
            reply: reply.map(String::from),
            data: data.as_ref().to_vec(),
            headers,
            ..Default::default()
        }
    }

    /// Respond to a request message.
    pub fn respond(&self, msg: impl AsRef<[u8]>) -> io::Result<()> {
        let reply = self.reply.as_ref().ok_or_else(|| {
            io::Error::new(io::ErrorKind::InvalidInput, "No reply subject to reply to")
        })?;
        let client = self
            .client
            .as_ref()
            .ok_or_else(|| io::Error::new(io::ErrorKind::NotConnected, MESSAGE_NOT_BOUND))?;
        client.publish(reply.as_str(), None, None, msg.as_ref())?;
        Ok(())
    }

    /// Determine if the message is a no responders response from the server.
    pub fn is_no_responders(&self) -> bool {
        if !self.data.is_empty() {
            return false;
        }

        if let Some(headers) = &self.headers {
            if headers.get(header::STATUS) == Some(&"503".to_string()) {
                return true;
            }
        }

        false
    }

    /// Determine if a message is `404 No Messages`.
    pub(crate) fn is_no_messages(&self) -> bool {
        if let Some(headers) = &self.headers {
            if headers.get(header::STATUS) == Some(&"404".to_string()) {
                return true;
            }
        }

        false
    }

    // Determine if a message is `408 Request Timeout`.
    pub(crate) fn is_request_timeout(&self) -> bool {
        if let Some(headers) = &self.headers {
            if headers.get(header::STATUS) == Some(&"408".to_string()) {
                return true;
            }
        }
        false
    }

    // Helper for detecting flow control messages.
    pub(crate) fn is_flow_control(&self) -> bool {
        if !self.data.is_empty() {
            return false;
        }

        if let Some(headers) = &self.headers {
            if headers.get(header::STATUS) != Some(&"100".to_string()) {
                return false;
            }

            if headers.get(header::DESCRIPTION) == Some(&"Flow Control".to_string()) {
                return true;
            }

            if headers.get(header::DESCRIPTION) == Some(&"FlowControl Request".to_string()) {
                return true;
            }
        }

        false
    }

    // Helper for detecting idle heartbeat messages.
    pub(crate) fn is_idle_heartbeat(&self) -> bool {
        if !self.data.is_empty() {
            return false;
        }

        if let Some(headers) = &self.headers {
            if headers.get(header::STATUS) != Some(&"100".to_string()) {
                return false;
            }

            if headers.get(header::DESCRIPTION) == Some(&"Idle Heartbeat".to_string()) {
                return true;
            }
        }

        false
    }

    /// Acknowledge a `JetStream` message with a default acknowledgement.
    /// See `AckKind` documentation for details of what other types of
    /// acks are available. If you need to send a non-default ack, use
    /// the `ack_kind` method below. If you need to block until the
    /// server acks your ack, use the `double_ack` method instead.
    ///
    /// Returns immediately if this message has already been
    /// double-acked.
    pub fn ack(&self) -> io::Result<()> {
        if self.double_acked.load(Ordering::Acquire) {
            return Ok(());
        }
        self.respond(b"")
    }

    /// Acknowledge a `JetStream` message. See `AckKind` documentation for
    /// details of what each variant means. If you need to block until the
    /// server acks your ack, use the `double_ack` method instead.
    ///
    /// Does not check whether this message has already been double-acked.
    pub fn ack_kind(&self, ack_kind: crate::jetstream::AckKind) -> io::Result<()> {
        self.respond(ack_kind)
    }

    /// Acknowledge a `JetStream` message and wait for acknowledgement from the server
    /// that it has received our ack. Retry acknowledgement until we receive a response.
    /// See `AckKind` documentation for details of what each variant means.
    ///
    /// Returns immediately if this message has already been double-acked.
    pub fn double_ack(&self, ack_kind: crate::jetstream::AckKind) -> io::Result<()> {
        if self.double_acked.load(Ordering::Acquire) {
            return Ok(());
        }
        let original_reply = match self.reply.as_ref() {
            None => {
                return Err(io::Error::new(
                    io::ErrorKind::InvalidInput,
                    "No reply subject available (not a JetStream message)",
                ))
            }
            Some(original_reply) => original_reply,
        };
        let mut retries = 0;
        let client = self
            .client
            .as_ref()
            .ok_or_else(|| io::Error::new(io::ErrorKind::NotConnected, MESSAGE_NOT_BOUND))?;

        loop {
            retries += 1;
            if retries == 2 {
                log::warn!("double_ack is retrying until the server connection is reestablished");
            }
            let ack_reply = format!("_INBOX.{}", nuid::next());
            let sub_ret = client.subscribe(&ack_reply, None);
            if sub_ret.is_err() {
                std::thread::sleep(std::time::Duration::from_millis(100));
                continue;
            }
            let (sid, receiver) = sub_ret?;
            let sub =
                crate::Subscription::new(sid, ack_reply.to_string(), receiver, client.clone());

            let pub_ret = client.publish(original_reply, Some(&ack_reply), None, ack_kind.as_ref());
            if pub_ret.is_err() {
                std::thread::sleep(std::time::Duration::from_millis(100));
                continue;
            }
            if sub
                .next_timeout(std::time::Duration::from_millis(100))
                .is_ok()
            {
                self.double_acked.store(true, Ordering::Release);
                return Ok(());
            }
        }
    }

    /// Returns the `JetStream` message ID
    /// if this is a `JetStream` message.
    /// Returns `None` if this is not
    /// a `JetStream` message with headers
    /// set.
    #[allow(clippy::eval_order_dependence)]
    pub fn jetstream_message_info(&self) -> Option<crate::jetstream::JetStreamMessageInfo<'_>> {
        const PREFIX: &str = "$JS.ACK.";
        const SKIP: usize = PREFIX.len();

        let mut reply: &str = self.reply.as_ref()?;

        if !reply.starts_with(PREFIX) {
            return None;
        }

        reply = &reply[SKIP..];

        let mut split = reply.split('.');

        // we should avoid allocating to prevent
        // large performance degradations in
        // parsing this.
        let mut tokens: [Option<&str>; 10] = [None; 10];
        let mut n_tokens = 0;
        for each_token in &mut tokens {
            if let Some(token) = split.next() {
                *each_token = Some(token);
                n_tokens += 1;
            }
        }

        let mut token_index = 0;

        macro_rules! try_parse {
            () => {
                match str::parse(try_parse!(str)) {
                    Ok(parsed) => parsed,
                    Err(e) => {
                        log::error!(
                            "failed to parse jetstream reply \
                            subject: {}, error: {:?}. Is your \
                            nats-server up to date?",
                            reply,
                            e
                        );
                        return None;
                    }
                }
            };
            (str) => {
                if let Some(next) = tokens[token_index].take() {
                    #[allow(unused)]
                    {
                        // this isn't actually unused, but it's
                        // difficult for the compiler to infer this.
                        token_index += 1;
                    }
                    next
                } else {
                    log::error!(
                        "unexpectedly few tokens while parsing \
                        jetstream reply subject: {}. Is your \
                        nats-server up to date?",
                        reply
                    );
                    return None;
                }
            };
        }

        // now we can try to parse the tokens to
        // individual types. We use an if-else
        // chain instead of a match because it
        // produces more optimal code usually,
        // and we want to try the 9 (11 - the first 2)
        // case first because we expect it to
        // be the most common. We use >= to be
        // future-proof.
        if n_tokens >= 9 {
            Some(crate::jetstream::JetStreamMessageInfo {
                domain: {
                    let domain: &str = try_parse!(str);
                    if domain == "_" {
                        None
                    } else {
                        Some(domain)
                    }
                },
                acc_hash: Some(try_parse!(str)),
                stream: try_parse!(str),
                consumer: try_parse!(str),
                delivered: try_parse!(),
                stream_seq: try_parse!(),
                consumer_seq: try_parse!(),
                published: {
                    let nanos: i128 = try_parse!();
                    OffsetDateTime::from_unix_timestamp_nanos(nanos).ok()?
                },
                pending: try_parse!(),
                token: if n_tokens >= 9 {
                    Some(try_parse!(str))
                } else {
                    None
                },
            })
        } else if n_tokens == 7 {
            // we expect this to be increasingly rare, as older
            // servers are phased out.
            Some(crate::jetstream::JetStreamMessageInfo {
                domain: None,
                acc_hash: None,
                stream: try_parse!(str),
                consumer: try_parse!(str),
                delivered: try_parse!(),
                stream_seq: try_parse!(),
                consumer_seq: try_parse!(),
                published: {
                    let nanos: i128 = try_parse!();
                    OffsetDateTime::from_unix_timestamp_nanos(nanos).ok()?
                },
                pending: try_parse!(),
                token: None,
            })
        } else {
            None
        }
    }
}

impl Default for Message {
    fn default() -> Message {
        Message {
            subject: String::from(""),
            reply: None,
            data: Vec::new(),
            headers: None,
            client: None,
            double_acked: Arc::new(AtomicBool::new(false)),
        }
    }
}

impl fmt::Debug for Message {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
        f.debug_struct("Message")
            .field("subject", &self.subject)
            .field("headers", &self.headers)
            .field("reply", &self.reply)
            .field("length", &self.data.len())
            .finish()
    }
}

impl fmt::Display for Message {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        let mut body = format!("[{} bytes]", self.data.len());
        if let Ok(str) = std::str::from_utf8(&self.data) {
            body = str.to_string();
        }
        if let Some(reply) = &self.reply {
            write!(
                f,
                "Message {{\n  subject: \"{}\",\n  reply: \"{}\",\n  data: \
                 \"{}\"\n}}",
                self.subject, reply, body
            )
        } else {
            write!(
                f,
                "Message {{\n  subject: \"{}\",\n  data: \"{}\"\n}}",
                self.subject, body
            )
        }
    }
}