ump-ng 0.2.1

Micro message passing library for threads/tasks communication.
Documentation
use crate::{err::Error, rctx::RCtxState, server::InnerMsgType};


/// Representation of a clonable client object.
///
/// Each instantiation of a `Client` object is itself an isolated client with
/// regards to the server context.  By cloning a client a new independent
/// client is created.  ("Independent" here meaning that it is still tied to
/// the same server object, but the new client can be passed to a separate
/// thread and can independently make calls to the server).
#[repr(transparent)]
pub struct Client<P, S, R, E>(
  pub(crate) sigq::Pusher<InnerMsgType<P, S, R, E>>
);

impl<P, S, R, E> Client<P, S, R, E>
where
  P: 'static + Send,
  R: 'static + Send,
  E: 'static + Send
{
  /// Transmit a uni-directional message to the server end-point.
  ///
  /// # Errors
  /// [`Error::ServerDisappeared`] means the [`Server`](super::Server)
  /// end-point has been dropped.
  pub fn post(&self, msg: P) -> Result<(), Error<E>> {
    self
      .0
      .push(InnerMsgType::Post(msg))
      .map_err(|_| Error::ServerDisappeared)?;

    Ok(())
  }

  /// Send a message to the server, wait for a reply, and return the reply.
  ///
  /// A complete round-trip (the message is delivered to the server, and the
  /// server sends a reply) must complete before this function returns
  /// success.
  ///
  /// This method is _currently_ reentrant: It is safe to use share a single
  /// `Client` among multiple threads.  _This may change in the future_; it's
  /// recommended to not rely on this.  The recommended way to send messages to
  /// a server from multiple threads is to clone the `Client` and assign one
  /// separate `Client` to each thread.
  ///
  /// # Return
  /// On success the function will return `Ok(msg)`.
  ///
  /// # Errors
  /// If the linked server object has been released, or is released while the
  /// message is in the server's queue, [`Error::ServerDisappeared`] will be
  /// returned.
  ///
  /// If the server never replied to the message and the
  /// [`ReplyContext`](super::ReplyContext) was dropped [`Error::NoReply`] will
  /// be returned.
  ///
  /// If an application specific error occurs it will be returned as a
  /// `Err(Error::App(E))`, where `E` is the error type used when creating the
  /// [`channel`](crate::channel).
  pub fn req(&self, out: S) -> Result<R, Error<E>> {
    // Create a per-call reply context.
    // This context could be created when the Client object is being created
    // and stored in the context, and thus be reused for reach client call.
    // One side-effect is that some of the state semantics becomes more
    // complicated.
    // The central repo has such an implementation checked in, but it seems to
    // have some more corner cases that aren't properly handled.
    let (sctx, wctx) = swctx::mkpair();

    self
      .0
      .push(InnerMsgType::Request(out, sctx))
      .map_err(|_| Error::ServerDisappeared)?;

    Ok(wctx.wait()?)
  }

  /// Issue a request, immediately returning a context that is used to wait for
  /// the server's reply.
  ///
  /// The `_async` naming is slightly misleading -- this method isn't an
  /// `async` in a language/`Future` sense, but rather it doesn't block and
  /// wait for a reply before returning.  Instead it returns a [`WaitReply`]
  /// object that is used to wait for the reply.
  ///
  /// This can be useful (in place of [`req()`](Client::req) or
  /// [`areq()`](Client::areq()) methods) if the caller knows that the server
  /// will take some time to respond to the request and the caller has other
  /// tasks it can perform in the meantime.
  ///
  /// ```
  /// use std::thread;
  ///
  /// use ump_ng::{channel, MsgType};
  ///
  /// let (server, client) = channel::<String, String, String, ()>();
  ///
  /// let server_thread = thread::spawn(move || {
  ///   // Wait for data to arrive from a client
  ///   println!("Server waiting for message ..");
  ///   let MsgType::Request(data, mut rctx) = server.wait().unwrap() else {
  ///     panic!("Unexpected message type");
  ///   };
  ///
  ///   println!("Server received: '{}'", data);
  ///
  ///   // Long processing of data from client
  ///
  ///   // Reply to client
  ///   let reply = format!("Hello, {}!", data);
  ///   println!("Server replying '{}'", reply);
  ///   rctx.reply(reply);
  ///
  ///   println!("Server done");
  /// });
  ///
  /// let msg = String::from("Client");
  /// println!("Client sending '{}'", msg);
  /// let wrctx = client.req_async(msg).unwrap();
  ///
  /// // .. perform some operation while server is processing the request ..
  ///
  /// let reply = wrctx.wait().unwrap();
  /// println!("Client received reply '{}'", reply);
  /// println!("Client done");
  ///
  /// server_thread.join().unwrap();
  /// ```
  ///
  /// # Errors
  /// [`Error::ServerDisappeared`] means the [`Server`](super::Server)
  /// end-point has been dropped.
  pub fn req_async(&self, out: S) -> Result<WaitReply<R, E>, Error<E>> {
    let (sctx, wctx) = swctx::mkpair();
    self
      .0
      .push(InnerMsgType::Request(out, sctx))
      .map_err(|_| Error::ServerDisappeared)?;

    Ok(WaitReply(wctx))
  }

  /// Same as [`Client::req()`], but for use in `async` contexts.
  #[allow(clippy::missing_errors_doc)]
  pub async fn areq(&self, out: S) -> Result<R, Error<E>>
  where
    S: Send
  {
    let (sctx, wctx) = swctx::mkpair();

    self
      .0
      .push(InnerMsgType::Request(out, sctx))
      .map_err(|_| Error::ServerDisappeared)?;

    Ok(wctx.wait_async().await?)
  }

  /// Create a weak `Client` reference.
  #[must_use]
  pub fn weak(&self) -> Weak<P, S, R, E> {
    Weak(self.0.weak())
  }

  /// Create a special-purpose client that can only perform post operations.
  #[must_use]
  pub fn postclient(&self) -> Post<P, S, R, E> {
    Post(self.clone())
  }
}

impl<P, S, R, E> Clone for Client<P, S, R, E> {
  /// Clone a client.
  ///
  /// When a client is cloned the new object will be linked to the same server,
  /// but in all other respects the clone is a completely independent client.
  ///
  /// This means that a cloned client can be passed to a new thread/task and
  /// make new independent calls to the server without any risk of collision
  /// between clone and the original client object.
  fn clone(&self) -> Self {
    Self(self.0.clone())
  }
}


/// Context used to wait for a server to reply to a request.
pub struct WaitReply<R, E>(swctx::WaitCtx<R, RCtxState, E>);

impl<R, E> WaitReply<R, E> {
  /// Block and wait for a reply.
  ///
  /// For use in non-`async` threads.
  ///
  /// # Errors
  /// [`Error::ServerDisappeared`] means the linked server object has been
  /// released.
  ///
  /// If the [`ReplyContext`](super::ReplyContext) is dropped by the server
  /// handler it replies to the message, [`Error::NoReply`] will be returned.
  ///
  /// If an application specific error occurs it will be returned in
  /// [`Error::App`].
  pub fn wait(self) -> Result<R, Error<E>> {
    Ok(self.0.wait()?)
  }

  /// Block and wait for a reply.
  ///
  /// Same as [`WaitReply::wait()`], but for use in `async` contexts.
  #[allow(clippy::missing_errors_doc)]
  pub async fn wait_async(self) -> Result<R, Error<E>>
  where
    R: Send,
    E: Send
  {
    Ok(self.0.wait_async().await?)
  }
}


/// A weak client reference that can be upgraded to a [`Client`] as long as
/// other `Client` objects till exist.
#[repr(transparent)]
pub struct Weak<P, S, R, E>(
  pub(crate) sigq::WeakPusher<InnerMsgType<P, S, R, E>>
);

impl<P, S, R, E> Clone for Weak<P, S, R, E> {
  fn clone(&self) -> Self {
    Self(self.0.clone())
  }
}

impl<P, S, R, E> Weak<P, S, R, E> {
  #[must_use]
  pub fn upgrade(&self) -> Option<Client<P, S, R, E>> {
    self.0.upgrade().map(|x| Client(x))
  }
}


/// Special purpose client end-point that can only issue `Post` requests.
#[derive(Clone)]
#[repr(transparent)]
pub struct Post<P, S, R, E>(Client<P, S, R, E>);

impl<P, S, R, E> Post<P, S, R, E>
where
  P: 'static + Send,
  S: 'static + Send,
  R: 'static + Send,
  E: 'static + Send
{
  /// Transmit a uni-directional message to the server end-point.
  ///
  /// # Errors
  /// [`Error::ServerDisappeared`] means the [`Server`](super::Server)
  /// end-point has been dropped.
  pub fn post(&self, msg: P) -> Result<(), Error<E>> {
    self.0.post(msg)
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :