umpx 0.6.0

Collection of ump extensions.
Documentation
//! Integration utilities for creating sqlsrv handlers.

use super::{
  r2d2,
  rusqlite::{params, Connection},
  ConnPool, WrConn
};

use threadpool::ThreadPool;

use ump::ReplyContext;

// Re-export rusqlite from sqlsrv
pub use sqlsrv::rusqlite;

/// Given an [`ConnPool`] and a [`ReplyContext`], run a closure (presumably
/// performing read-only database operations).
///
/// Translates the closure's `Result<T, E>` into a `ReplyContext::reply()` on
/// `Ok` and `ReplyContext::fail()` on `Err`.
///
/// # Errors
/// [`r2d2::Error`] is returned if a read-only connection could not be
/// acquired from the connection pool.
pub fn proc_rodb_req<R, E, F>(
  cpool: &ConnPool,
  rctx: ReplyContext<R, E>,
  f: F
) -> Result<(), r2d2::Error>
where
  R: Send + 'static,
  E: std::error::Error + Send + 'static,
  F: FnOnce(&Connection) -> Result<R, E> + Send + 'static
{
  let roconn = cpool.reader()?;

  let res = match f(&roconn) {
    Ok(reply) => rctx.reply(reply),
    Err(e) => rctx.fail(e)
  };
  if let Err(e) = res {
    eprintln!("Reply message pass failed; {e}");
  }

  Ok(())
}

/// Given an [`ConnPool`], a [`ThreadPool`] and a [`ReplyContext`], run a
/// closure (presumably performing read-only database operations) on a thread.
///
/// Translates the closure's `Result<T, E>` into a `ReplyContext::reply()` on
/// `Ok` and `ReplyContext::fail()` on `Err`.
///
/// # Errors
/// [`r2d2::Error`] is returned if a read-only connection could not be
/// acquired from the connection pool.
pub fn proc_rodb_req_thrd<R, E, F>(
  cpool: &ConnPool,
  tpool: &ThreadPool,
  rctx: ReplyContext<R, E>,
  f: F
) -> Result<(), r2d2::Error>
where
  R: Send + 'static,
  E: std::error::Error + Send + 'static,
  F: FnOnce(&Connection) -> Result<R, E> + Send + 'static
{
  let roconn = cpool.reader()?;

  tpool.execute(move || {
    let res = match f(&roconn) {
      Ok(reply) => rctx.reply(reply),
      Err(e) => rctx.fail(e)
    };
    if let Err(e) = res {
      eprintln!("Reply message pass failed; {e}");
    }
  });

  Ok(())
}


/// Given an [`ConnPool`] and a [`ReplyContext`], run a
/// closure (presumably performing read/write database operations) on a thread.
///
/// Translates the closure's `Result<T, E>` into a `ReplyContext::reply()` on
/// `Ok` and `ReplyContext::fail()` on `Err`.
pub fn proc_rwdb_req<R, E, F>(cpool: &ConnPool, rctx: ReplyContext<R, E>, f: F)
where
  R: Send + 'static,
  E: std::error::Error + Send + 'static,
  F: FnOnce(&mut WrConn) -> Result<R, E> + Send + 'static
{
  let mut rwconn = cpool.writer();

  let res = match f(&mut rwconn) {
    Ok(reply) => rctx.reply(reply),
    Err(e) => rctx.fail(e)
  };
  if let Err(e) = res {
    eprintln!("Reply message pass failed; {e}");
  }
}


/// Given an [`ConnPool`], a [`ThreadPool`] and a [`ReplyContext`], run a
/// closure (presumably performing read/write database operations) on a thread.
///
/// Translates the closure's `Result<T, E>` into a `ReplyContext::reply()` on
/// `Ok` and `ReplyContext::fail()` on `Err`.
pub fn proc_rwdb_req_thrd<R, E, F>(
  cpool: &ConnPool,
  tpool: &ThreadPool,
  rctx: ReplyContext<R, E>,
  f: F
) where
  R: Send + 'static,
  E: std::error::Error + Send + 'static,
  F: FnOnce(&mut WrConn) -> Result<R, E> + Send + 'static
{
  let mut rwconn = cpool.writer();

  tpool.execute(move || {
    let res = match f(&mut rwconn) {
      Ok(reply) => rctx.reply(reply),
      Err(e) => rctx.fail(e)
    };
    if let Err(e) = res {
      eprintln!("Reply message pass failed; {e}");
    }
  });
}


/// Perform an incremental vacuum.
///
/// `n` is the number of freelist nodes to reclaim.  If `None` all nodes will
/// be reclaimed.
#[must_use]
pub fn incremental_vacuum(
  cpool: &ConnPool,
  tpool: &ThreadPool,
  n: Option<usize>
) -> swctx::WaitCtx<(), (), rusqlite::Error> {
  let (sctx, wctx) = swctx::mkpair();

  let conn = cpool.writer();

  tpool.execute(move || {
    #[allow(clippy::option_if_let_else)]
    let res = if let Some(n) = n {
      conn.execute("PRAGMA incremental_vacuum(?);", params![n])
    } else {
      conn.execute("PRAGMA incremental_vacuum;", params![])
    };
    let res = match res {
      Ok(_) => sctx.set(()),
      Err(e) => sctx.fail(e)
    };
    if let Err(e) = res {
      eprintln!("Reply message pass failed; {e}");
    }
  });

  wctx
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :