ump 0.13.0

Micro message passing library for threads/tasks communication.
Documentation
use crate::{err::Error, server::QueueNode as ServerQueueNode};

use super::rctx::RCtxState;

/// Representation of a clonable client object that can issue requests to
/// [`Server`](super::Server) objects.
///
/// Each instantiation of a `Client` object is itself an isolated client with
/// regards to the server context.  By cloning a client a new independent
/// client is created.  ("Independent" here meaning that it is still tied to
/// the same server object, but the new client can be passed to a separate
/// thread and can independently make calls to the server).
#[repr(transparent)]
pub struct Client<S, R, E>(pub(crate) sigq::Pusher<ServerQueueNode<S, R, E>>);

impl<S, R, E> Client<S, R, E>
where
  R: 'static + Send,
  E: 'static + Send
{
  /// Send a message to the server, wait for a reply, and return the reply.
  ///
  /// A complete round-trip (the message is delivered to the server, and the
  /// server sends a reply) must complete before this function returns
  /// success.
  ///
  /// # Return
  /// On success the function will return `Ok(msg)`.
  ///
  /// # Errors
  /// If the linked server object has been released, or is released while the
  /// message is in the server's queue, [`Error::ServerDisappeared`] will be
  /// returned.
  ///
  /// If the server never replied to the message and the reply context was
  /// dropped [`Error::NoReply`] will be returned.
  ///
  /// If an application specific error occurs it will be returned as a
  /// [`Error::App`].
  ///
  /// # Implementation details
  /// This method is _currently_ reentrant: It is safe to use share a single
  /// `Client` among multiple threads.  _This may change in the future_; it's
  /// recommended to not rely on this.  The recommended way to send messages to
  /// a server from multiple threads is to clone the `Client` and assign one
  /// separate `Client` to each thread.
  pub fn req(&self, out: S) -> Result<R, Error<E>> {
    // Create a per-call reply context.
    // This context could be created when the Client object is being created
    // and stored in the context, and thus be reused for reach client call.
    // One side-effect is that some of the state semantics becomes more
    // complicated.
    // The central repo has such an implementation checked in, but it seems to
    // have some more corner cases that aren't properly handled.
    let (sctx, wctx) = swctx::mkpair();

    self
      .0
      .push(ServerQueueNode {
        msg: out,
        reply: sctx
      })
      .map_err(|_| Error::ServerDisappeared)?;

    // Wait for a reply to arrive
    Ok(wctx.wait()?)
  }

  /// Issue a request, immediately returning a context that is used to wait for
  /// the server's reply.
  ///
  /// The `_async` naming is slightly misleading -- this method isn't an
  /// `async` in a language/`Future` sense, but rather it doesn't block and
  /// wait for a reply before returning.  Instead it returns a [`WaitReply`]
  /// object that is used to wait for the reply.
  ///
  /// This can be useful (in place of [`req()`](Client::req) or
  /// [`areq()`](Client::areq()) methods) if the caller knows that the server
  /// will take some time to respond to the request and the caller has other
  /// tasks it can perform in the meantime.
  ///
  /// ```
  /// use std::thread;
  ///
  /// use ump::channel;
  ///
  /// let (server, client) = channel::<String, String, ()>();
  ///
  /// let server_thread = thread::spawn(move || {
  ///   // Wait for data to arrive from a client
  ///   println!("Server waiting for message ..");
  ///   let (data, mut rctx) = server.wait().unwrap();
  ///
  ///   println!("Server received: '{}'", data);
  ///
  ///   // Long processing of data from client
  ///
  ///   // Reply to client
  ///   let reply = format!("Hello, {}!", data);
  ///   println!("Server replying '{}'", reply);
  ///   rctx.reply(reply);
  ///
  ///   println!("Server done");
  /// });
  ///
  /// let msg = String::from("Client");
  /// println!("Client sending '{}'", msg);
  /// let wrctx = client.req_async(msg).unwrap();
  ///
  /// // .. perform some operation while server is processing the request ..
  ///
  /// let reply = wrctx.wait().unwrap();
  /// println!("Client received reply '{}'", reply);
  /// println!("Client done");
  ///
  /// server_thread.join().unwrap();
  /// ```
  ///
  /// # Errors
  /// If the linked server object has been released, or is released while the
  /// message is in the server's queue, [`Error::ServerDisappeared`] will be
  /// returned.
  pub fn req_async(&self, out: S) -> Result<WaitReply<R, E>, Error<E>>
  where
    S: Send
  {
    let (sctx, wctx) = swctx::mkpair();
    self
      .0
      .push(ServerQueueNode {
        msg: out,
        reply: sctx
      })
      .map_err(|_| Error::ServerDisappeared)?;
    Ok(WaitReply(wctx))
  }

  #[allow(clippy::missing_errors_doc)]
  #[deprecated(since = "0.10.2", note = "Use req() instead.")]
  pub fn send(&self, out: S) -> Result<R, Error<E>> {
    self.req(out)
  }

  /// Same as [`Client::req()`] but for use in `async` contexts.
  #[allow(clippy::missing_errors_doc)]
  pub async fn areq(&self, out: S) -> Result<R, Error<E>>
  where
    S: Send
  {
    let (sctx, wctx) = swctx::mkpair();

    self
      .0
      .push(ServerQueueNode {
        msg: out,
        reply: sctx
      })
      .map_err(|_| Error::ServerDisappeared)?;

    Ok(wctx.wait_async().await?)
  }

  #[deprecated(since = "0.10.2", note = "Use areq() instead.")]
  #[allow(clippy::missing_errors_doc)]
  pub async fn asend(&self, out: S) -> Result<R, Error<E>>
  where
    S: Send
  {
    self.areq(out).await
  }

  /// Create a weak `Client` reference.
  #[must_use]
  pub fn weak(&self) -> Weak<S, R, E> {
    Weak(self.0.weak())
  }
}


impl<S, R, E> Clone for Client<S, R, E> {
  /// Clone a client.
  ///
  /// When a client is cloned the new object will be linked to the same server,
  /// but in all other respects the clone is a completely independent client.
  ///
  /// This means that a cloned client can be passed to a new thread/task and
  /// make new independent calls to the server without any risk of collision
  /// between clone and the original client object.
  fn clone(&self) -> Self {
    Self(self.0.clone())
  }
}

/// Context used to wait for a server to reply to a request.
pub struct WaitReply<R, E>(swctx::WaitCtx<R, RCtxState, E>);

impl<R, E> WaitReply<R, E> {
  /// Block and wait for a reply.
  ///
  /// For use in non-`async` threads.
  ///
  /// # Errors
  /// [`Error::ServerDisappeared`] means the linked server object has been
  /// released.
  ///
  /// If the [`ReplyContext`](super::ReplyContext) is dropped by the server
  /// handler it replies to the message, [`Error::NoReply`] will be returned.
  ///
  /// If an application specific error occurs it will be returned in
  /// [`Error::App`].
  pub fn wait(self) -> Result<R, Error<E>> {
    Ok(self.0.wait()?)
  }

  /// Block and wait for a reply.
  ///
  /// Same as [`WaitReply::wait()`], but for use in `async` contexts.
  #[allow(clippy::missing_errors_doc)]
  pub async fn wait_async(self) -> Result<R, Error<E>>
  where
    R: Send,
    E: Send
  {
    Ok(self.0.wait_async().await?)
  }
}


/// A weak client reference that can be upgraded to a [`Client`] as long as
/// other `Client` objects till exist.
#[repr(transparent)]
pub struct Weak<S, R, E>(
  pub(crate) sigq::WeakPusher<ServerQueueNode<S, R, E>>
);

impl<S, R, E> Clone for Weak<S, R, E> {
  fn clone(&self) -> Self {
    Self(self.0.clone())
  }
}

impl<S, R, E> Weak<S, R, E> {
  /// Upgrade a `WeakClient` to a [`Client`].
  ///
  /// If no strong `Client` objects still exist then `None` is returned.
  ///
  /// # Examples
  ///
  /// Upgrading a weak client while stong clients exists works:
  /// ```
  /// use ump::{channel, Error};
  ///
  /// let (server, client) = channel::<String, String, ()>();
  /// let weak_client = client.weak();
  /// let Some(client2) = weak_client.upgrade() else {
  ///   panic!("Unable to upgrade weak client");
  /// };
  /// ```
  ///
  /// Upgrading a weak client when no stong clients exists fails:
  /// ```
  /// use ump::{channel, Error};
  ///
  /// let (server, client) = channel::<String, String, ()>();
  /// let weak_client = client.weak();
  /// drop(client);
  /// let Some(_) = weak_client.upgrade() else {
  ///   panic!("Unexpectedly able to upgrade weak client");
  /// };
  /// ```
  #[must_use]
  pub fn upgrade(&self) -> Option<Client<S, R, E>> {
    self.0.upgrade().map(|x| Client(x))
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :