// Copyright 2020-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::{
connector::{Connector, NatsStream, ServerAddress},
header::HeaderMap,
inject_delay, inject_io_failure,
message::Message,
proto::{self, ClientOp, ServerOp},
BoxFuture, Options, ServerInfo,
};
#[cfg(not(feature = "otel"))]
use log::{debug, error};
use std::{
collections::{HashMap, HashSet, VecDeque},
fmt,
io::{self, Error, ErrorKind},
mem,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
task::{Context, Poll},
time::{Duration, Instant},
};
use tokio::{
io::{AsyncWrite, AsyncWriteExt, BufReader, BufWriter},
sync::Mutex,
};
#[cfg(feature = "otel")]
use tracing::{debug, error};
const BUF_CAPACITY: usize = 32 * 1024;
/// maximum messages to queue before applying backpressure
const MAX_SUBSCRIPTION_QUEUE: usize = 650;
const PING_FLUSH_TIMEOUT_SEC: u64 = 40;
// TODO(dlc) - Make configurable.
const PING_INTERVAL: Duration = Duration::from_secs(20); // was 120 sec
const MAX_PINGS_OUT: u8 = 2;
/// Client state.
///
/// NB: locking protocol - writes must ALWAYS be locked
/// first and released after when both are used.
/// Failure to follow this strict rule WILL create
/// a deadlock!
struct State {
write: Mutex<WriteState>,
read: Mutex<ReadState>,
meta: Mutex<MetaState>,
}
struct MetaState {
/// Set of subjects that are currently muted.
mutes: HashSet<u64>,
}
struct WriteState {
/// Buffered writer with an active connection.
///
/// When `None`, the client is either reconnecting or closed.
writer: Option<BufWriter<NatsStream>>,
/// Signals to the client thread that the writer needs a flush.
flush_kicker: tokio::sync::mpsc::Sender<()>,
/// The reconnect buffer.
///
/// When the client is reconnecting, PUB messages get buffered here. When
/// the connection is re-established, contents of the buffer are
/// flushed to the server.
buffer: Buffer,
/// Next subscription ID.
next_sid: u64,
}
struct ReadState {
/// Current subscriptions.
subscriptions: HashMap<u64, Subscription>,
/// Expected pongs and their notification channels.
pongs: VecDeque<tokio::sync::mpsc::Sender<()>>,
/// Tracks the last activity from the server.
last_active: Instant,
/// Used for client side monitoring of connection health.
pings_out: u8,
}
/// A handler for preprocess messages for a subscription as they arrive over the wire.
pub(crate) trait Preprocessor: Send + Sync {
fn process<'proc>(&'proc self, sid: u64, msg: &'proc Message) -> BoxFuture<'proc, bool>;
}
#[derive(Debug, Default, Clone)]
struct NoProcessing {}
impl<'p> Preprocessor for NoProcessing {
fn process(&self, _sid: u64, _msg: &Message) -> BoxFuture<'static, bool> {
Box::pin(async { false })
}
}
/// A registered subscription.
struct Subscription {
subject: String,
queue_group: Option<String>,
messages: tokio::sync::mpsc::Sender<Message>,
preprocess: Pin<Box<dyn Preprocessor>>,
}
/// A NATS client.
#[derive(Clone)]
pub struct Client {
/// Shared client state.
state: Arc<State>,
/// Server info provided by the last INFO message.
pub(crate) server_info: Arc<Mutex<ServerInfo>>,
/// Set to `true` if shutdown has been requested.
shutdown: Arc<AtomicBool>,
/// The options that this `Client` was created using.
pub(crate) options: Arc<Options>,
}
impl Client {
/// Creates a new client that will begin connecting in the background.
pub(crate) async fn connect(urls: Vec<ServerAddress>, options: Options) -> io::Result<Client> {
crate::init_tracing();
// A channel for coordinating flushes.
let (flush_kicker, mut flush_wanted) = tokio::sync::mpsc::channel(10);
// Channels for coordinating initial connect.
let (run_sender, run_receiver) = tokio::sync::oneshot::channel();
let (pong_sender, mut pong_receiver) = tokio::sync::mpsc::channel(10);
// The client state.
let _client = Client {
state: Arc::new(State {
meta: Mutex::new(MetaState {
mutes: HashSet::new(),
}),
write: Mutex::new(WriteState {
writer: None,
flush_kicker,
buffer: Buffer::new(options.reconnect_buffer_size),
next_sid: 1,
}),
read: Mutex::new(ReadState {
subscriptions: HashMap::new(),
pongs: VecDeque::from(vec![pong_sender]),
last_active: Instant::now(),
pings_out: 0,
}),
}),
server_info: Arc::new(Mutex::new(ServerInfo::default())),
shutdown: Arc::new(AtomicBool::new(false)),
options: Arc::new(options),
};
let options = _client.options.clone();
// Connector for creating the initial connection and reconnecting when
// it is broken.
let connector = Connector::new(urls, options.clone()).await?;
// Spawn the async task responsible for:
// - Maintaining a connection to the server and reconnecting when it is
// broken.
// - Reading messages from the server and processing them.
// - Forwarding MSG operations to subscribers.
let client = _client.clone();
let opt = options.clone();
tokio::spawn(async move {
{
let res = client.run(connector).await;
run_sender.send(res).ok();
// One final flush before shutting down.
// This way we make sure buffered published messages reach the
// server.
{
let mut write = client.state.write.lock().await;
if let Some(writer) = write.writer.as_mut() {
writer.shutdown().await.ok();
}
}
opt.close_callback.call().await;
}
});
tokio::select! {
res = run_receiver => {
res.expect("client thread has panicked")?;
unreachable!()
}
_ = pong_receiver.recv() => { }
};
// Spawn a thread that periodically flushes buffered messages.
let client = _client.clone();
tokio::spawn(async move {
// Track last flush/write time.
const MIN_FLUSH_BETWEEN: Duration = Duration::from_millis(5);
// Handle recv timeouts and check if we should send a PING.
let mut last = Instant::now() - MIN_FLUSH_BETWEEN;
// Wait until at least one message is buffered.
loop {
if tokio::time::timeout(PING_INTERVAL, flush_wanted.recv())
.await
.is_ok()
{
let since = last.elapsed();
if since < MIN_FLUSH_BETWEEN {
tokio::time::sleep(MIN_FLUSH_BETWEEN - since).await;
}
// Flush the writer.
let mut write = client.state.write.lock().await;
let mut read = client.state.read.lock().await;
if let Some(writer) = write.writer.as_mut() {
// If flushing fails, disconnect.
if writer.flush().await.is_err() {
last = Instant::now();
let _ = writer.shutdown().await;
write.writer = None;
read.pongs.clear();
}
}
// NB see locking protocol for state.write and state.read
drop(read);
drop(write);
} else {
// timeout
let mut write = client.state.write.lock().await;
let mut read = client.state.read.lock().await;
if read.pings_out >= MAX_PINGS_OUT {
if let Some(writer) = write.writer.as_mut() {
writer.get_mut().shutdown().await;
}
write.writer = None;
read.pongs.clear();
} else if read.last_active.elapsed() > PING_INTERVAL {
read.pings_out += 1;
read.pongs.push_back(write.flush_kicker.clone());
// Send out a PING here.
if let Some(mut writer) = write.writer.as_mut() {
// Ok to ignore errors here.
let _ = proto::encode(&mut writer, ClientOp::Ping).await;
if writer.flush().await.is_err() {
// NB see locking protocol for state.write and state.read
writer.shutdown().await.ok();
write.writer = None;
read.pongs.clear();
}
}
}
// NB see locking protocol for state.write and state.read
drop(read);
drop(write);
}
}
});
Ok(_client)
}
/// Retrieves server info as received by the most recent connection.
pub async fn server_info(&self) -> ServerInfo {
self.server_info.lock().await.clone()
}
/// Makes a round trip to the server to ensure buffered messages reach it.
pub(crate) async fn flush(&self, timeout: Duration) -> io::Result<()> {
let mut pong = {
// Inject random delays when testing.
inject_delay().await;
let mut write = self.state.write.lock().await;
// Check if the client is closed.
self.check_shutdown()?;
let (sender, receiver) = tokio::sync::mpsc::channel(1);
// If connected, send a PING.
match write.writer.as_mut() {
None => {}
Some(mut writer) => {
// uses timeout for duration, not per-write
tokio::time::timeout(timeout, async {
if let Ok(()) = proto::encode(&mut writer, ClientOp::Ping).await {
let _ = writer.flush().await;
}
})
.await?;
}
}
if let Err(e) = write.flush_kicker.send(()).await {
debug!("error triggering flush {}", e);
}
// Enqueue an expected PONG.
let mut read = self.state.read.lock().await;
read.pongs.push_back(sender);
// NB see locking protocol for state.write and state.read
drop(read);
drop(write);
receiver
};
// Wait until the PONG operation is received.
match tokio::time::timeout(Duration::from_secs(PING_FLUSH_TIMEOUT_SEC), pong.recv()).await {
Err(_) => {
// if socket is closed, don't fail
//error!("ping flush timed out after {} sec", PING_FLUSH_TIMEOUT_SEC);
//Err(Error::new(ErrorKind::ConnectionReset, "flush failed"))
Ok(())
}
Ok(Some(())) => Ok(()),
Ok(None) => {
error!("ping sender quit unexpectedly");
Err(Error::new(
ErrorKind::ConnectionReset,
"flush failed - ping sender quit",
))
}
}
}
/// Closes the client.
pub(crate) async fn close(&self) {
// Inject random delays when testing.
inject_delay().await;
let mut write = self.state.write.lock().await;
let mut read = self.state.read.lock().await;
// Initiate shutdown process.
if self.shutdown() {
// Clear all subscriptions.
let old_subscriptions = mem::take(&mut read.subscriptions);
for (sid, _) in old_subscriptions {
// Send an UNSUB message and ignore errors.
if let Some(writer) = write.writer.as_mut() {
let max_msgs = None;
proto::encode(writer, ClientOp::Unsub { sid, max_msgs })
.await
.ok();
write.flush_kicker.try_send(()).ok();
}
}
read.subscriptions.clear();
// Flush the writer in case there are buffered messages.
if let Some(writer) = write.writer.as_mut() {
writer.flush().await.ok();
}
// Wake up all pending flushes.
read.pongs.clear();
// NB see locking protocol for state.write and state.read
drop(read);
drop(write);
}
}
/// Kicks off the shutdown process, but doesn't wait for its completion.
/// Returns true if this is the first attempt to shut down the system.
pub(crate) fn shutdown(&self) -> bool {
self.shutdown
.compare_exchange(false, true, Ordering::Release, Ordering::Relaxed)
.is_ok()
}
fn check_shutdown(&self) -> io::Result<()> {
if self.shutdown.load(Ordering::Acquire) {
Err(Error::new(ErrorKind::NotConnected, "the client is closed"))
} else {
Ok(())
}
}
/// Subscribes to a subject.
pub(crate) async fn subscribe(
&self,
subject: &str,
queue_group: Option<String>,
) -> io::Result<(u64, crate::subscription::SubscriptionReceiver<Message>)> {
// Inject random delays when testing.
self.subscribe_with_preprocessor(
subject.to_string(),
queue_group,
Box::pin(NoProcessing::default()),
)
.await
}
/// Subscribe to a subject with a message preprocessor.
pub(crate) async fn subscribe_with_preprocessor(
&self,
subject: String,
queue_group: Option<String>,
message_processor: Pin<Box<dyn Preprocessor>>,
) -> io::Result<(u64, crate::subscription::SubscriptionReceiver<Message>)> {
inject_delay().await;
let mut write = self.state.write.lock().await;
let mut read = self.state.read.lock().await;
// Check if the client is closed.
self.check_shutdown()?;
// Generate a subject ID.
let sid = write.next_sid;
write.next_sid += 1;
// If connected, send a SUB operation.
if let Some(writer) = write.writer.as_mut() {
let queue_group = queue_group.as_ref();
let op = ClientOp::Sub {
subject: &subject,
queue_group: queue_group.map(|s| s.as_str()),
sid,
};
proto::encode(writer, op).await?;
write.flush_kicker.try_send(()).ok();
}
// Register the subscription in the hash map.
let (sender, receiver) = tokio::sync::mpsc::channel(MAX_SUBSCRIPTION_QUEUE);
read.subscriptions.insert(
sid,
Subscription {
subject,
queue_group,
messages: sender,
preprocess: message_processor,
},
);
// NB see locking protocol for state.write and state.read
drop(read);
drop(write);
Ok((sid, receiver.into()))
}
/// Marks a subscription as muted.
pub(crate) async fn mute(&self, sid: u64) -> io::Result<bool> {
let mut meta = self.state.meta.lock().await;
Ok(meta.mutes.insert(sid))
}
/// Resubscribes an existing subscription by unsubscribing from the old subject and subscribing
/// to the new subject returning a new sid while retaining the existing channel receiver.
pub(crate) async fn resubscribe(&self, old_sid: u64, new_subject: &str) -> io::Result<u64> {
// Inject random delays when testing.
inject_delay().await;
let mut write = self.state.write.lock().await;
let mut read = self.state.read.lock().await;
// Check if the client is closed.
self.check_shutdown()?;
let subscription = read
.subscriptions
.remove(&old_sid)
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "subscription not found"))?;
// Generate a subject ID.
let new_sid = write.next_sid;
write.next_sid += 1;
// Send an UNSUB and SUB messages.
if let Some(writer) = write.writer.as_mut() {
proto::encode(
writer,
ClientOp::Unsub {
sid: old_sid,
max_msgs: None,
},
)
.await?;
}
let queue_group = subscription.queue_group.clone();
read.subscriptions.insert(new_sid, subscription);
if let Some(writer) = write.writer.as_mut() {
proto::encode(
writer,
ClientOp::Sub {
sid: new_sid,
subject: new_subject,
queue_group: queue_group.as_deref(),
},
)
.await?;
write.flush_kicker.try_send(()).ok();
}
// NB see locking protocol for state.write and state.read
drop(read);
drop(write);
Ok(new_sid)
}
/// Unsubscribes from a subject.
pub(crate) async fn unsubscribe(&self, sid: u64) -> io::Result<()> {
// Inject random delays when testing.
inject_delay().await;
let mut write = self.state.write.lock().await;
let mut read = self.state.read.lock().await;
// Remove the subscription from the map.
if read.subscriptions.remove(&sid).is_none() {
// already unsubscribed
// NB see locking protocol for state.write and state.read
drop(read);
drop(write);
return Ok(());
}
// Send an UNSUB message.
if let Some(writer) = write.writer.as_mut() {
let max_msgs = None;
proto::encode(writer, ClientOp::Unsub { sid, max_msgs }).await?;
write.flush_kicker.try_send(()).ok();
}
// NB see locking protocol for state.write and state.read
drop(read);
drop(write);
Ok(())
}
/// Publishes a message with optional reply subject and headers.
pub async fn publish(
&self,
subject: &str,
reply_to: Option<&str>,
headers: Option<&HeaderMap>,
msg: &[u8],
) -> io::Result<()> {
// Inject random delays when testing.
inject_delay().await;
let server_info = self.server_info.lock().await;
if headers.is_some() && !server_info.headers {
return Err(Error::new(
ErrorKind::InvalidInput,
"the server does not support headers",
));
}
drop(server_info);
// Check if the client is closed.
self.check_shutdown()?;
let op = if let Some(headers) = headers {
ClientOp::Hpub {
subject,
reply_to,
payload: msg,
headers,
}
} else {
ClientOp::Pub {
subject,
reply_to,
payload: msg,
}
};
let mut write = self.state.write.lock().await;
let written = write.buffer.written;
match write.writer.as_mut() {
None => {
// If reconnecting, write into the buffer.
proto::encode(&mut write.buffer, op).await?;
write.buffer.flush().await?;
Ok(())
}
Some(mut writer) => {
assert_eq!(written, 0);
// If connected, write into the writer.
let res = proto::encode(&mut writer, op).await;
// If writing fails, disconnect.
if res.is_err() {
write.writer = None;
// NB see locking protocol for state.write and state.read
let mut read = self.state.read.lock().await;
read.pongs.clear();
}
write.flush_kicker.try_send(()).ok();
res
}
}
}
/// Attempts to publish a message without blocking.
///
/// This only works when the write buffer has enough space to encode the
/// whole message.
pub async fn try_publish(
&self,
subject: &str,
reply_to: Option<&str>,
headers: Option<&HeaderMap>,
msg: &[u8],
) -> Option<io::Result<()>> {
// Check if the client is closed.
if let Err(e) = self.check_shutdown() {
return Some(Err(e));
}
// Estimate how many bytes the message will consume when written into
// the stream. We must make a conservative guess: it's okay to
// overestimate but not to underestimate.
let mut estimate = 1024 + subject.len() + reply_to.map_or(0, str::len) + msg.len();
if let Some(headers) = headers {
estimate += headers
.iter()
.map(|(k, v)| k.len() + v.len() + 3)
.sum::<usize>();
}
let op = if let Some(headers) = headers {
ClientOp::Hpub {
subject,
reply_to,
payload: msg,
headers,
}
} else {
ClientOp::Pub {
subject,
reply_to,
payload: msg,
}
};
let mut write = match self.state.write.try_lock() {
Ok(w) => w,
Err(e) => {
return Some(Err(io::Error::new(
io::ErrorKind::Other,
format!("mutex error: {}", e),
)))
}
};
match write.writer.as_mut() {
None => {
// If reconnecting, write into the buffer.
Some(match proto::encode(&mut write.buffer, op).await {
Ok(()) => write.buffer.flush().await,
Err(e) => Err(e),
})
}
Some(mut writer) => {
// Check if there's enough space in the buffer to encode the
// whole message.
if BUF_CAPACITY - writer.buffer().len() < estimate {
return None;
}
// If connected, write into the writer. This is not going to
// block because there's enough space in the buffer.
let res = proto::encode(&mut writer, op).await;
write.flush_kicker.try_send(()).ok();
// If writing fails, disconnect.
if res.is_err() {
write.writer = None;
// NB see locking protocol for state.write and state.read
let mut read = self.state.read.lock().await;
read.pongs.clear();
}
Some(res)
}
}
}
/// Runs the loop that connects and reconnects the client.
async fn run(&self, mut connector: Connector) -> io::Result<()> {
let mut first_connect = true;
loop {
// Don't use backoff on first connect.
let use_backoff = !first_connect;
// Make a connection to the server.
let (server_info, stream) = connector.connect(use_backoff).await?;
self.process_info(&server_info, &connector).await;
let reader = BufReader::with_capacity(BUF_CAPACITY, stream.clone());
let writer = BufWriter::with_capacity(BUF_CAPACITY, stream);
// Set up the new connection for this client.
if self.reconnect(server_info, writer).await.is_ok() {
// Connected! Now dispatch MSG operations.
if !first_connect {
connector.get_options().reconnect_callback.call().await;
}
if self.dispatch(reader, &mut connector).await.is_ok() {
// If the client stopped gracefully, return.
return Ok(());
} else {
connector.get_options().disconnect_callback.call().await;
self.state.write.lock().await.writer = None;
}
}
// Clear our pings_out.
let mut read = self.state.read.lock().await;
read.pings_out = 0;
drop(read);
// Inject random delays when testing.
inject_delay().await;
// Quit if the client is closed.
if self.check_shutdown().is_err() {
return Ok(());
}
first_connect = false;
}
}
/// Puts the client back into connected state with the given writer.
async fn reconnect(
&self,
server_info: ServerInfo,
mut writer: BufWriter<NatsStream>,
) -> io::Result<()> {
// Inject random delays when testing.
inject_delay().await;
// Check if the client is closed.
self.check_shutdown()?;
let mut write = self.state.write.lock().await;
let mut read = self.state.read.lock().await;
// Drop the current writer, if there is one.
write.writer = None;
// Inject random I/O failures when testing.
inject_io_failure()?;
// Restart subscriptions that existed before the last reconnect.
for (sid, subscription) in &read.subscriptions {
// Send a SUB operation to the server.
proto::encode(
&mut writer,
ClientOp::Sub {
subject: subscription.subject.as_str(),
queue_group: subscription.queue_group.as_deref(),
sid: *sid,
},
)
.await?;
}
// Take out expected PONGs.
let pongs = mem::take(&mut read.pongs);
// Take out buffered operations.
let buffered = write.buffer.clear();
// Write buffered PUB operations into the new writer.
writer.write_all(buffered).await?;
writer.flush().await?;
// All good, continue with this connection.
*self.server_info.lock().await = server_info;
write.writer = Some(writer);
// Complete PONGs because the connection is healthy.
for p in pongs {
p.try_send(()).ok();
}
// NB see locking protocol for state.write and state.read
drop(read);
drop(write);
Ok(())
}
// processes action need to be performed based on retrieved server info.
async fn process_info(&self, server_info: &ServerInfo, connector: &Connector) {
if server_info.lame_duck_mode {
connector.get_options().lame_duck_callback.call().await;
}
}
/// Updates our last activity from the server.
async fn update_activity(&self) {
let mut read = self.state.read.lock().await;
read.last_active = Instant::now();
}
/// Reads messages from the server and dispatches them to subscribers.
async fn dispatch(
&self,
mut reader: impl tokio::io::AsyncBufRead + std::marker::Unpin,
connector: &mut Connector,
) -> io::Result<()> {
// Handle operations received from the server.
while let Some(op) = proto::decode(&mut reader).await? {
// Inject random delays when testing.
inject_delay().await;
if self.check_shutdown().is_err() {
break;
}
// Track activity.
self.update_activity().await;
match op {
ServerOp::Info(server_info) => {
for url in &server_info.connect_urls {
connector.add_server(url.parse()?);
}
self.process_info(&server_info, connector).await;
*self.server_info.lock().await = server_info;
}
ServerOp::Ping => {
// Respond with a PONG if connected.
let mut write = self.state.write.lock().await;
let read = self.state.read.lock().await;
if let Some(w) = write.writer.as_mut() {
proto::encode(w, ClientOp::Pong).await?;
write.flush_kicker.try_send(()).ok();
}
// NB see locking protocol for state.write and state.read
drop(read);
drop(write);
}
ServerOp::Pong => {
// If a PONG is received while disconnected, it came from a
// connection that isn't alive anymore and therefore doesn't
// correspond to the next expected PONG.
let write = self.state.write.lock().await;
let mut read = self.state.read.lock().await;
// Clear any outstanding pings.
read.pings_out = 0;
if write.writer.is_some() {
// Take the next expected PONG and complete it by
// sending a message.
if let Some(pong) = read.pongs.pop_front() {
pong.try_send(()).ok();
}
}
// NB see locking protocol for state.write and state.read
drop(read);
drop(write);
}
ServerOp::Msg {
subject,
sid,
reply_to,
payload,
} => {
// Ignore muted subscriptions
if self.state.meta.lock().await.mutes.get(&sid).is_some() {
continue;
}
let read = self.state.read.lock().await;
// Send the message to matching subscription.
if let Some(subscription) = read.subscriptions.get(&sid) {
let msg = Message {
subject,
reply: reply_to,
data: payload,
headers: None,
client: Some(self.clone()),
double_acked: Default::default(),
};
// Preprocess and drop the message from the buffer if it the predicate
// returns true
if (&subscription.preprocess).process(sid, &msg).await {
continue;
}
// Send a message or drop it if the channel is
// disconnected or full.
subscription.messages.send(msg).await.unwrap();
}
}
ServerOp::Hmsg {
subject,
headers,
sid,
reply_to,
payload,
} => {
// Ignore muted subscriptions
if self.state.meta.lock().await.mutes.get(&sid).is_some() {
continue;
}
let read = self.state.read.lock().await;
// Send the message to matching subscription.
if let Some(subscription) = read.subscriptions.get(&sid) {
let msg = Message {
subject,
reply: reply_to,
data: payload,
headers: Some(headers),
client: Some(self.clone()),
double_acked: Default::default(),
};
// Preprocess and drop the message from the buffer if it the predicate
// returns true
if (subscription.preprocess).process(sid, &msg).await {
continue;
}
// Send a message or drop it if the channel is
// disconnected or full.
subscription.messages.send(msg).await.unwrap();
}
}
ServerOp::Err(msg) => {
let si = self.server_info().await;
connector
.get_options()
.error_callback
.call(si, Error::new(ErrorKind::Other, msg))
.await;
}
ServerOp::Unknown(line) => {
log::warn!("unknown op: {}", line);
}
}
}
// The stream of operation is broken, meaning the connection was lost.
Err(ErrorKind::ConnectionReset.into())
}
}
impl fmt::Debug for Client {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("Client").finish()
}
}
/// Reconnect buffer.
///
/// If the connection was broken and the client is currently reconnecting, PUB
/// messages get stored in this buffer of limited size. As soon as the
/// connection is then re-established, buffered messages will be sent to the
/// server.
struct Buffer {
/// Bytes in the buffer.
///
/// There are three interesting ranges in this slice:
///
/// - `..flushed` contains buffered PUB messages.
/// - `flushed..written` contains a partial PUB message at the end.
/// - `written..` is empty space in the buffer.
bytes: Box<[u8]>,
/// Number of written bytes.
written: usize,
/// Number of bytes marked as "flushed".
flushed: usize,
}
impl Buffer {
/// Creates a new buffer with the given size.
fn new(size: usize) -> Buffer {
Buffer {
bytes: vec![0_u8; size].into_boxed_slice(),
written: 0,
flushed: 0,
}
}
/// Clears the buffer and returns buffered bytes.
fn clear(&mut self) -> &[u8] {
let buffered = &self.bytes[..self.flushed];
self.written = 0;
self.flushed = 0;
buffered
}
}
impl AsyncWrite for Buffer {
fn poll_write(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, Error>> {
let n = buf.len();
// Check if `buf` will fit into this `Buffer`.
if self.bytes.len() - self.written < n {
// Fill the buffer to prevent subsequent smaller writes.
self.written = self.bytes.len();
Poll::Ready(Err(Error::new(
ErrorKind::Other,
"the disconnect buffer is full",
)))
} else {
// Append `buf` into the buffer.
let range = self.written..self.written + n;
self.bytes[range].copy_from_slice(&buf[..n]);
self.written += n;
Poll::Ready(Ok(n))
}
}
fn poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
self.flushed = self.written;
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
Poll::Ready(Ok(()))
}
}