nats-aflowt 0.16.105

Unofficial port of NATS rust client to pure async
// Copyright 2020-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::Stream;
#[cfg(not(feature = "otel"))]
use log::error;
use std::{
    io,
    pin::Pin,
    sync::{
        atomic::{AtomicU64, Ordering},
        Arc,
    },
    thread,
    time::Duration,
};
#[cfg(feature = "otel")]
use tracing::error;

use crate::{
    jetstream::{AckPolicy, ConsumerInfo, ConsumerOwnership, JetStream},
    message::Message,
    DEFAULT_FLUSH_TIMEOUT,
};

#[derive(Debug)]
pub(crate) struct Inner {
    /// Subscription ID.
    pub(crate) sid: Arc<AtomicU64>,

    /// MSG operations received from the server.
    pub(crate) messages: crate::SubscriptionReceiver<Message>,

    /// Name of the stream associated with the subscription.
    pub(crate) stream: String,

    /// Name of the consumer associated with the subscription.
    pub(crate) consumer: String,

    /// Ack policy used in while processing messages.
    pub(crate) consumer_ack_policy: AckPolicy,

    /// Indicates if we own the consumer and are responsible for deleting it or not.
    pub(crate) consumer_ownership: ConsumerOwnership,

    /// Client associated with subscription.
    pub(crate) context: JetStream,
}

impl Drop for Inner {
    fn drop(&mut self) {
        let client = self.context.connection.0.client.clone();
        let sid = self.sid.clone();
        let context = if self.consumer_ownership == ConsumerOwnership::Yes {
            Some((
                self.context.clone(),
                self.stream.clone(),
                self.consumer.clone(),
            ))
        } else {
            None
        };
        let _ = tokio::spawn(async move {
            client.unsubscribe(sid.load(Ordering::Relaxed)).await.ok();
            // Delete the consumer, if we own it.
            if let Some((context, stream, consumer)) = context {
                context.delete_consumer(&stream, &consumer).await.ok();
            }
        });
    }
}

/// A `PushSubscription` receives `Message`s published
/// to specific NATS `Subject`s.
#[derive(Clone, Debug)]
pub struct PushSubscription(pub(crate) Arc<Inner>);

impl PushSubscription {
    /// Creates a subscription.
    pub(crate) fn new(
        sid: Arc<AtomicU64>,
        consumer_info: ConsumerInfo,
        consumer_ownership: ConsumerOwnership,
        messages: crate::SubscriptionReceiver<Message>,
        context: JetStream,
    ) -> PushSubscription {
        PushSubscription(Arc::new(Inner {
            sid,
            stream: consumer_info.stream_name,
            consumer: consumer_info.name,
            consumer_ack_policy: consumer_info.config.ack_policy,
            consumer_ownership,
            messages,
            context,
        }))
    }

    /// Preprocesses the given message.
    /// Returns true if the message was processed and should be filtered out from the user's view.
    async fn should_skip(&self, message: &Message) -> bool {
        if message.is_flow_control() {
            message.respond(b"").await.ok();
            return true;
        }

        if message.is_idle_heartbeat() {
            return true;
        }

        false
    }

    /// Get the next message non-protocol message, or None if the subscription has been
    /// unsubscribed or the connection closed.
    ///
    /// # Example
    ///
    /// ```
    /// # #[tokio::main]
    /// # async fn main() -> std::io::Result<()> {
    /// # let client = nats_aflowt::connect("127.0.0.1:14222").await?;
    /// # let context = nats_aflowt::jetstream::new(client);
    /// # let name=format!("next_{}", rand::random::<u64>());
    /// # context.add_stream(name.as_str()).await?;
    /// # context.publish(&name, "hello").await?;
    /// # let subscription = context.subscribe(&name).await?;
    /// if let Some(message) = subscription.next().await {
    ///     println!("Received: '{}'", message);
    /// } else {
    ///     println!("No more messages");
    /// }
    /// # Ok(())
    /// # }
    /// ```
    pub async fn next(&self) -> Option<Message> {
        loop {
            match self.0.messages.recv().await {
                Some(message) => {
                    if self.should_skip(&message).await {
                        continue;
                    }
                    return Some(message);
                }
                None => return None,
            }
        }
    }

    /// Try to get the next non-protocol message, or None if no messages
    /// are present or if the subscription has been unsubscribed
    /// or the connection closed.
    ///
    /// # Example
    ///
    /// ```
    /// # #[tokio::main]
    /// # async fn main() -> std::io::Result<()> {
    /// # let client = nats_aflowt::connect("127.0.0.1:14222").await?;
    /// # let context = nats_aflowt::jetstream::new(client);
    /// #
    /// # context.add_stream("try_next").await?;
    /// # let subscription = context.subscribe("try_next").await?;
    /// if let Some(message) = subscription.try_next().await {
    ///     println!("Received {}", message);
    /// }
    /// # Ok(())
    /// # }
    /// ```
    pub async fn try_next(&self) -> Option<Message> {
        loop {
            match self.0.messages.try_recv().await {
                Some(message) => {
                    if self.should_skip(&message).await {
                        continue;
                    }
                    return Some(message);
                }
                None => {
                    return None;
                }
            }
        }
    }

    /// Get the next message, or a timeout error
    /// if no messages are available for timout.
    ///
    /// # Example
    ///
    /// ```
    /// # #[tokio::main]
    /// # async fn main() -> std::io::Result<()> {
    /// # let client = nats_aflowt::connect("127.0.0.1:14222").await?;
    /// # let context = nats_aflowt::jetstream::new(client);
    /// # let name = format!("sub_{}", rand::random::<u64>());
    /// # context.add_stream(name.as_str()).await?;
    /// # let subscription = context.subscribe(name.as_str()).await?;
    /// if let Ok(message) = subscription.next_timeout(std::time::Duration::from_secs(1)).await {
    ///     println!("Received {}", message);
    /// }
    /// # Ok(())
    /// # }
    /// ```
    pub async fn next_timeout(&self, timeout: Duration) -> io::Result<Message> {
        loop {
            match tokio::time::timeout(timeout, self.0.messages.recv()).await {
                Ok(Some(message)) => {
                    if self.should_skip(&message).await {
                        continue;
                    }
                    return Ok(message);
                }
                Ok(None) => {
                    return Err(io::Error::new(
                        io::ErrorKind::TimedOut,
                        "next_timeout: timed out",
                    ))
                }
                Err(_) => {
                    return Err(io::Error::new(
                        io::ErrorKind::Other,
                        "next_timeout: unsubscribed",
                    ))
                }
            }
        }
    }

    /// Returns a pinned message stream.
    /// same as `stream()`
    ///
    /// # Example
    ///
    /// ```no_run
    /// use futures::stream::StreamExt;
    /// #[tokio::main]
    /// # async fn main() -> std::io::Result<()> {
    /// # let client = nats_aflowt::connect("127.0.0.1:14222").await?;
    /// # let context = nats_aflowt::jetstream::new(client);
    /// # context.add_stream("messages-100").await?;
    /// let mut subscription = context.subscribe("messages-100").await?.messages();
    /// while let Some(message) = subscription.next().await {
    ///     println!("Received message {:?}", message);
    /// }
    /// # Ok(())
    /// # }
    /// ```
    pub fn messages(self) -> Pin<Box<dyn Stream<Item = Message>>> {
        Box::pin(self.into_stream())
    }

    // convert the to unpinned stream
    #[doc(hidden)]
    fn into_stream(self) -> impl Stream<Item = Message> {
        async_stream::stream! {
            while let Some(message) = self.next().await {
                yield message;
            }
        }
    }

    /// Returns a pinned message stream.
    /// Same as `messages`
    ///
    /// # Example
    ///
    /// ```no_run
    /// use futures::stream::StreamExt;
    /// #[tokio::main]
    /// # async fn main() -> std::io::Result<()> {
    /// # let client = nats_aflowt::connect("127.0.0.1:14222").await?;
    /// # let context = nats_aflowt::jetstream::new(client);
    /// # context.add_stream("stream").await?;
    /// let mut sub = context.subscribe("stream").await?.stream();
    /// # context.publish("stream", "hello").await?;
    /// while let Some(message) = sub.next().await {
    ///     println!("Received message {:?}", message);
    /// }
    /// # Ok(())
    /// # }
    /// ```
    pub fn stream(self) -> Pin<Box<dyn Stream<Item = Message>>> {
        Box::pin(self.into_stream())
    }

    /// Attach a closure to handle messages. This closure will execute in a
    /// separate thread. The result of this call is a `Handler` which can
    /// not be iterated and must be unsubscribed or closed directly to
    /// unregister interest. A `Handler` will not unregister interest with
    /// the server when `drop(&mut self)` is called.
    ///
    /// # Example
    ///
    /// ```
    /// # #[tokio::main]
    /// # async fn main() -> std::io::Result<()> {
    /// # let client = nats_aflowt::connect("127.0.0.1:14222").await?;
    /// # let context = nats_aflowt::jetstream::new(client);
    /// # context.add_stream("with_handler").await?;
    /// context.subscribe("with_handler").await?.with_handler(move |message| {
    ///     println!("received {}", &message);
    ///     Ok(())
    /// });
    ///
    /// # Ok(())
    /// # }
    /// ```
    pub fn with_handler<F>(self, handler: F) -> Handler
    where
        F: Fn(Message) -> io::Result<()> + Send + Sync + 'static,
    {
        // This will allow us to not have to capture the return. When it is
        // dropped it will not unsubscribe from the server.
        let sub = self.clone();
        let handler = Arc::new(handler);
        tokio::spawn(async move {
            while let Some(m) = sub.next().await {
                let handler = handler.clone();
                let _ = tokio::task::spawn_blocking(move || {
                    if let Err(e) = handler(m) {
                        // TODO(dlc) - Capture for last error?
                        log::error!("Error in callback! {:?}", e);
                    }
                });
            }
        });
        /*

        thread::Builder::new()
            .name(format!(
                "nats_jetstream_push_subscriber_{}_{}",
                self.0.stream, self.0.consumer,
            ))
            .spawn(move || {
                futures::executor::block_on(async {
                    while let Some(m) = sub.next().await {
                        let handler = handler.clone();
                        let _ = tokio::spawn(async move {
                            if let Err(e) = handler(m) {
                                // TODO(dlc) - Capture for last error?
                                log::error!("Error in callback! {:?}", e);
                            }
                        });
                    }
                })
            })
            .expect("threads should be spawn-able");
        */
        Handler { subscription: self }
    }

    /// Attach an async closure to handle messages. The closure will run as a task
    /// within the current thread and must not be blocking.
    /// Any errors returned by the closure will be logged.
    /// A `Handler` will not unregister interest with
    /// the server when `drop(&mut self)` is called.
    ///
    /// # Example
    /// ```
    /// # #[tokio::main]
    /// # async fn main() -> std::io::Result<()> {
    /// # let nc = nats_aflowt::connect("127.0.0.1:14222").await?;
    /// let sub = nc.subscribe("foo").await?
    ///      .with_async_handler( move |m| async move { m.respond("ans=42").await?; Ok(()) });
    /// # Ok(())
    /// # }
    /// ```
    #[must_use]
    pub fn with_async_handler<F, T>(self, handler: F) -> Self
    where
        F: Fn(Message) -> T + 'static + Send + Sync,
        T: futures::Future<Output = io::Result<()>> + Send,
    {
        let sub = self.clone();
        let handler = Arc::new(Box::new(handler));
        tokio::spawn(async move {
            while let Some(m) = sub.next().await {
                let handler = handler.clone();
                let _ = tokio::spawn(async move {
                    if let Err(e) = handler(m).await {
                        // TODO(dlc) - Capture for last error?
                        log::error!("Error in callback! {:?}", e);
                    }
                });
            }
        });
        self
    }

    /// Attach a closure to process and acknowledge messages. This closure will execute in a separate thread.
    ///
    /// The result of this call is a `Handler`
    /// which can not be iterated and must be unsubscribed or closed directly to
    /// unregister interest. A `Handler` will not unregister interest with
    /// the server when `drop(&mut self)` is called.
    ///
    /// # Example
    ///
    /// ```
    /// # #[tokio::main]
    /// # async fn main() -> std::io::Result<()> {
    /// # let client = nats_aflowt::connect("127.0.0.1:14222").await?;
    /// # let context = nats_aflowt::jetstream::new(client);
    /// # context.add_stream("with_process_handler").await;
    /// context.subscribe("with_process_handler").await?.with_process_handler(|message| {
    ///     println!("Received {}", &message);
    ///     Ok(())
    /// });
    ///
    /// # Ok(())
    /// # }
    /// ```
    pub fn with_process_handler<F>(self, handler: F) -> Handler
    where
        F: Fn(&Message) -> io::Result<()> + Send + Sync + 'static,
    {
        let consumer_ack_policy = self.0.consumer_ack_policy;

        // This will allow us to not have to capture the return. When it is
        // dropped it will not unsubscribe from the server.
        let sub = self.clone();
        let handler = Arc::new(Box::new(handler));
        thread::Builder::new()
            .name(format!(
                "nats_push_subscriber_{}_{}",
                self.0.consumer, self.0.stream
            ))
            .spawn(move || {
                futures::executor::block_on(async {
                    while let Some(message) = sub.next().await {
                        let handler = handler.clone();
                        let _ = tokio::spawn(async move {
                            if let Err(err) = handler(&message) {
                                log::error!("Error in callback! {:?}", err);
                            }

                            if consumer_ack_policy != AckPolicy::None {
                                if let Err(err) = message.ack().await {
                                    log::error!("Error in callback! {:?}", err);
                                }
                            }
                        });
                    }
                })
            })
            .expect("threads should be spawnable");
        Handler { subscription: self }
    }

    /// Process and acknowledge a single message, waiting indefinitely for
    /// one to arrive. Caution: This should not be used with blocking io.
    ///
    /// Does not acknowledge the processed message if the closure returns an `Err`.
    ///
    /// Example
    ///
    /// ```
    /// # #[tokio::main]
    /// # async fn main() -> std::io::Result<()> {
    /// # let client = nats_aflowt::connect("127.0.0.1:14222").await?;
    /// # let context = nats_aflowt::jetstream::new(client);
    /// # let sub_name = format!("process_{}", rand::random::<u64>());
    /// # context.add_stream(sub_name.as_str()).await?;
    /// # let mut subscription = context.subscribe(&sub_name).await?;
    /// # context.publish(&sub_name, "hello").await?;
    /// #
    /// subscription.process(|message| {
    ///     println!("Received message {:?}", message);
    ///     Ok(())
    /// }).await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn process<R: Send + 'static, F: Fn(&Message) -> io::Result<R> + Send + Sync>(
        &mut self,
        f: F,
    ) -> io::Result<R>
    where
        F: 'static,
    {
        let ack_policy = self.0.consumer_ack_policy;
        match self.next().await {
            Some(next) => tokio::task::spawn(async move {
                // calling f() may block the tokio executor if f blocks.
                // the executor is not blocked for the call to next() or the ack()
                let result = f(&next);
                if ack_policy != AckPolicy::None {
                    if let Err(e) = next.ack().await {
                        error!("ack error: {}", e);
                    }
                }
                result
            })
            .await
            .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("join error: {}", e)))?,
            _ => Err(io::Error::new(
                io::ErrorKind::Other,
                "process: unsubscribed",
            )),
        }
    }

    /// Process and acknowledge a single message, waiting up to timeout configured `timeout` before
    /// returning a timeout error.
    ///
    /// Does not ack the processed message if the internal closure returns an `Err`.
    ///
    /// Example
    ///
    /// ```
    /// # use std::time::Duration;
    /// # #[tokio::main]
    /// # async fn main() -> std::io::Result<()> {
    /// # let client = nats_aflowt::connect("127.0.0.1:14222").await?;
    /// # let context = nats_aflowt::jetstream::new(client);
    /// # context.add_stream("process_timeout").await?;
    /// # context.publish("process_timeout", "hello").await?;
    /// # context.publish("process_timeout", "hello2").await?;
    /// #
    /// let mut subscription = context.subscribe("process_timeout").await?;
    /// subscription.process_timeout(Duration::from_secs(1), |message| {
    ///     println!("Received message {:?}", message);
    ///     Ok(())
    /// }).await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn process_timeout<R, F: Fn(&Message) -> io::Result<R>>(
        &mut self,
        timeout: Duration,
        f: F,
    ) -> io::Result<R> {
        let next = self.next_timeout(timeout).await?;

        let ret = f(&next)?;
        if self.0.consumer_ack_policy != AckPolicy::None {
            next.ack().await?;
        }

        Ok(ret)
    }

    /// Sends a request to fetch current information about the target consumer.
    ///
    /// # Example
    ///
    /// ```
    /// # #[tokio::main]
    /// # async fn main() -> std::io::Result<()> {
    /// # let client = nats_aflowt::connect("127.0.0.1:14222").await?;
    /// # let context = nats_aflowt::jetstream::new(client);
    /// #
    /// # context.add_stream("foo").await?;
    /// let subscription = context.subscribe("foo").await?;
    /// //let info = subscription.consumer_info().await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn consumer_info(&self) -> io::Result<ConsumerInfo> {
        self.0
            .context
            .consumer_info(&self.0.stream, &self.0.consumer)
            .await
    }

    /// Unsubscribe a subscription immediately without draining.
    /// Use `drain` instead if you want any pending messages
    /// to be processed by a handler, if one is configured.
    ///
    /// # Example
    ///
    /// ```
    /// # #[tokio::main]
    /// # async fn main() -> std::io::Result<()> {
    /// # let client = nats_aflowt::connect("127.0.0.1:14222").await?;
    /// # let context = nats_aflowt::jetstream::new(client);
    /// # context.add_stream("unsubscribe").await?;
    /// #
    /// let subscription = context.subscribe("unsubscribe").await?;
    /// subscription.unsubscribe().await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn unsubscribe(self) -> io::Result<()> {
        // Drain
        self.0
            .context
            .connection
            .0
            .client
            .flush(DEFAULT_FLUSH_TIMEOUT)
            .await?;

        self.0
            .context
            .connection
            .0
            .client
            .unsubscribe(self.0.sid.load(Ordering::Relaxed))
            .await?;

        // Discard all queued messages.
        while self.0.messages.try_recv().await.is_some() {}

        // Delete the consumer, if we own it.
        if self.0.consumer_ownership == ConsumerOwnership::Yes {
            self.0
                .context
                .delete_consumer(&self.0.stream, &self.0.consumer)
                .await
                .ok();
        }

        Ok(())
    }

    /// Close a subscription. Same as `unsubscribe`
    ///
    /// Use `drain` instead if you want any pending messages
    /// to be processed by a handler, if one is configured.
    ///
    /// # Example
    ///
    /// ```
    /// # #[tokio::main]
    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// # let client = nats_aflowt::connect("127.0.0.1:14222").await?;
    /// # let context = nats_aflowt::jetstream::new(client);
    /// # context.add_stream("close").await?;
    /// let subscription = context.subscribe("close").await?;
    /// subscription.close().await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn close(self) -> io::Result<()> {
        self.unsubscribe().await
    }

    /// Send an unsubscription then flush the connection,
    /// allowing any unprocessed messages to be handled
    /// by a handler function if one is configured.
    ///
    /// After the flush returns, we know that a round-trip
    /// to the server has happened after it received our
    /// unsubscription, so we shut down the subscriber
    /// afterwards.
    ///
    /// A similar method exists on the `Connection` struct
    /// which will drain all subscriptions for the NATS
    /// client, and transition the entire system into
    /// the closed state afterward.
    ///
    /// # Example
    ///
    /// ```
    /// # use std::time::Duration;
    /// # #[tokio::main]
    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// # let client = nats_aflowt::connect("127.0.0.1:14222").await?;
    /// # let context = nats_aflowt::jetstream::new(client);
    /// # let sub_name = format!("drain_{}", rand::random::<u32>());
    /// # context.add_stream(sub_name.as_str()).await?;
    /// # let mut subscription = context.subscribe(&sub_name).await?;
    /// # context.publish(&sub_name, "foo").await?;
    /// # context.publish(&sub_name, "bar").await?;
    /// # context.publish(&sub_name, "baz").await?;
    ///
    /// subscription.drain().await?;
    ///
    /// # // there are no more messages in subscription
    /// # assert!(subscription.next_timeout(Duration::from_secs(2)).await.is_err(), "emtpy");
    /// # Ok(())
    /// # }
    /// ```
    pub async fn drain(&mut self) -> io::Result<()> {
        // Unsubscribe
        self.0
            .context
            .connection
            .0
            .client
            .flush(DEFAULT_FLUSH_TIMEOUT)
            .await?;

        self.0
            .context
            .connection
            .0
            .client
            .unsubscribe(self.0.sid.load(Ordering::Relaxed))
            .await?;

        // Discard all queued messages.
        while self.0.messages.try_recv().await.is_some() {}

        // Delete the consumer, if we own it.
        if self.0.consumer_ownership == ConsumerOwnership::Yes {
            self.0
                .context
                .delete_consumer(&self.0.stream, &self.0.consumer)
                .await
                .ok();
        }

        Ok(())
    }
}

/// A `Handler` may be used to unsubscribe a handler thread.
pub struct Handler {
    subscription: PushSubscription,
}

impl Handler {
    /// Unsubscribe a subscription.
    ///
    /// # Example
    /// ```
    /// # #[tokio::main]
    /// # async fn main() -> std::io::Result<()> {
    /// # let nc = nats_aflowt::connect("127.0.0.1:14222").await?;
    /// let sub = nc.subscribe("foo").await?.with_handler(move |msg| {
    ///     println!("Received {}", &msg);
    ///     Ok(())
    /// });
    /// sub.unsubscribe().await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn unsubscribe(&mut self) -> io::Result<()> {
        self.subscription.drain().await
    }
}