pulsar 6.7.2

Rust client for Apache Pulsar
Documentation
use std::{io::ErrorKind, sync::Arc, time::Instant};

use futures::channel::mpsc::{self, UnboundedReceiver};

use crate::{
    connection::Connection,
    error::{ConnectionError, ConsumerError},
    message::Message,
    proto, BrokerAddress, ConsumerOptions, Error, Executor, ProducerOptions, Pulsar, SubType,
};

pub async fn handle_retry_error<Exe: Executor>(
    client: &Pulsar<Exe>,
    connection: &mut Arc<Connection<Exe>>,
    addr: &mut BrokerAddress,
    topic: &str,
    operation_name: &str,
    current_retries: u32,
    err: ConnectionError,
) -> Result<(), Error> {
    let operation_retry_options = &client.operation_retry_options;
    let (kind, text) = match err {
        ConnectionError::PulsarError(Some(kind), ref text)
            if matches!(
                kind,
                proto::ServerError::ServiceNotReady
                    | proto::ServerError::ConsumerBusy
                    | proto::ServerError::ProducerBusy
                    | proto::ServerError::ProducerBlockedQuotaExceededError
                    | proto::ServerError::ProducerBlockedQuotaExceededException
            ) =>
        {
            (
                kind.as_str_name().to_owned(),
                text.as_ref()
                    .map(|text| format!(" (\"{text}\")"))
                    .unwrap_or_default(),
            )
        }
        ConnectionError::Io(ref kind)
            if matches!(
                kind.kind(),
                ErrorKind::ConnectionReset
                    | ErrorKind::ConnectionAborted
                    | ErrorKind::NotConnected
                    | ErrorKind::BrokenPipe
                    | ErrorKind::TimedOut
                    | ErrorKind::Interrupted
                    | ErrorKind::UnexpectedEof
            ) =>
        {
            (kind.kind().to_string(), "".to_owned())
        }
        err => {
            error!("{operation_name}({topic}) error: {err:?}");
            return Err(err.into());
        }
    };
    if !(operation_retry_options.allow_retry(current_retries)) {
        error!("{operation_name}({topic}) answered {kind}{text}, reached max retries");
        return Err(err.into());
    }
    error!(
        "{operation_name}({topic}) answered {kind}{text}, retrying request after {:?} (max_retries = {:?})",
        operation_retry_options.retry_delay,
        operation_retry_options.max_retries
    );
    client
        .executor
        .delay(operation_retry_options.retry_delay)
        .await;

    *addr = client.lookup_topic(topic).await?;
    *connection = client.manager.get_connection(addr).await?;
    Ok(())
}

pub async fn retry_subscribe_consumer<Exe: Executor>(
    client: &Pulsar<Exe>,
    connection: &mut Arc<Connection<Exe>>,
    mut addr: BrokerAddress,
    topic: &str,
    subscription: &str,
    sub_type: SubType,
    consumer_id: u64,
    consumer_name: &Option<String>,
    options: &ConsumerOptions,
    batch_size: u32,
) -> Result<UnboundedReceiver<Message>, Error> {
    *connection = client.manager.get_connection(&addr).await?;
    let (resolver, messages) = mpsc::unbounded();
    let mut current_retries = 0u32;
    let start = Instant::now();

    loop {
        warn!(
            "Retry #{current_retries} -> connecting consumer {consumer_id} using connection {:#} to broker {:#} to topic {topic}",
            connection.id(),
            addr.url,
        );
        match connection
            .sender()
            .subscribe(
                resolver.clone(),
                topic.to_owned(),
                subscription.to_owned(),
                sub_type,
                consumer_id,
                consumer_name.clone(),
                options.clone(),
            )
            .await
        {
            Ok(_) => {
                if current_retries > 0 {
                    let dur = (Instant::now() - start).as_secs();
                    info!(
                        "TopicConsumer::subscribe({topic}) success after {} retries over {dur} seconds",
                        current_retries + 1,
                    );
                }
                break;
            }
            Err(err) => {
                handle_retry_error(
                    client,
                    connection,
                    &mut addr,
                    topic,
                    "TopicConsumer::subscribe",
                    current_retries,
                    err,
                )
                .await?
            }
        }
        current_retries += 1;
    }
    connection
        .sender()
        .send_flow(consumer_id, batch_size)
        .await
        .map_err(|err| {
            error!("TopicConsumer::send_flow({topic}) error: {err:?}");
            Error::Consumer(ConsumerError::Connection(err))
        })?;

    Ok(messages)
}

pub async fn retry_create_producer<Exe: Executor>(
    client: &Pulsar<Exe>,
    connection: &mut Arc<Connection<Exe>>,
    mut addr: BrokerAddress,
    topic: &String,
    producer_id: u64,
    producer_name: Option<String>,
    options: &ProducerOptions,
) -> Result<(String, Option<Vec<u8>>), Error> {
    *connection = client.manager.get_connection(&addr).await?;
    let mut current_retries = 0u32;
    let start = Instant::now();

    loop {
        warn!(
            "Retry #{current_retries} -> connecting producer {producer_id} using connection {:#} to broker {:#} to topic {topic}",
            connection.id(),
            addr.url,
        );
        match connection
            .sender()
            .create_producer(
                topic.clone(),
                producer_id,
                producer_name.clone(),
                options.clone(),
            )
            .await
        {
            Ok(partial_success) => {
                let mut schema_version = partial_success.schema_version;
                // If producer is not "ready", the client will avoid to timeout the request
                // for creating the producer. Instead it will wait indefinitely until it gets
                // a subsequent  `CommandProducerSuccess` with `producer_ready==true`.
                if let Some(producer_ready) = partial_success.producer_ready {
                    if !producer_ready {
                        // wait until next commandproducersuccess message has been received
                        trace!("TopicProducer::create({topic}) waiting for exclusive access");
                        let result = connection
                            .sender()
                            .wait_for_exclusive_access(partial_success.request_id)
                            .await;
                        trace!("TopicProducer::create({topic}) received: {result:?}");
                        match result {
                            Ok(success) => {
                                if let Some(new_sv) = success.schema_version {
                                    if schema_version.as_ref() != Some(&new_sv) {
                                        debug!(
                                            "TopicProducer::create({topic}) schema_version \
                                             updated after exclusive access wait"
                                        );
                                        schema_version = Some(new_sv);
                                    }
                                } else if schema_version.is_some() {
                                    warn!(
                                        "TopicProducer::create({topic}) final \
                                         CommandProducerSuccess has no schema_version, \
                                         keeping initial value"
                                    );
                                }
                            }
                            Err(e) => {
                                error!(
                                    "TopicProducer::create({topic}) wait_for_exclusive_access \
                                     failed: {e:?}, proceeding with initial schema_version"
                                );
                            }
                        }
                    }
                }
                if current_retries > 0 {
                    let dur = (std::time::Instant::now() - start).as_secs();
                    log::info!(
                        "TopicProducer::create({topic}) success after {} retries over {dur} seconds",
                        current_retries + 1,
                    );
                }
                return Ok((partial_success.producer_name, schema_version));
            }
            Err(err) => {
                handle_retry_error(
                    client,
                    connection,
                    &mut addr,
                    topic,
                    "TopicProducer::create",
                    current_retries,
                    err,
                )
                .await?
            }
        }
        current_retries += 1;
    }
}